From 75f7cdcf65ff892db472ddbcc477666ca4f3f612 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 09:39:18 -0700 Subject: [PATCH 01/13] introduce magic bytes to stop --- src/recipient/recipient.go | 10 +++++++--- src/sender/sender.go | 6 ++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 652225a..3860fd0 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -216,6 +216,10 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we log.Error(err) return err } + if bytes.Equal(message, []byte("magic")) { + log.Debug("got magic") + break + } // do decryption var enc crypt.Encryption @@ -244,9 +248,9 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we // update the progress bar bar.Add(n) - if int64(bytesWritten) == fstats.Size { - break - } + // if int64(bytesWritten) == fstats.Size { + // break + // } } c.WriteMessage(websocket.BinaryMessage, []byte("done")) diff --git a/src/sender/sender.go b/src/sender/sender.go index a660c1b..00b28cf 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -224,6 +224,12 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso } } // finish + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: len([]byte("magic")), + err: nil, + } + // finish dataChan <- DataChan{ b: nil, bytesRead: 0, From f62459e1a4b9f37b8505a0e872ac815584af566a Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 09:55:35 -0700 Subject: [PATCH 02/13] move reading to goroutine --- src/recipient/recipient.go | 85 +++++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 3860fd0..1df0a93 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -49,6 +49,7 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we var hash256 []byte var otherIP string var tcpConnection comm.Comm + dataChan := make(chan []byte, 1024*1024) useWebsockets := true switch forceSend { @@ -195,10 +196,48 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we progressbar.OptionSetBytes(int(fstats.Size)), progressbar.OptionSetWriter(os.Stderr), ) + finished := make(chan bool) + go func(finished chan bool, dataChan chan []byte) (err error) { + for { + message := <-dataChan + // do decryption + var enc crypt.Encryption + err = json.Unmarshal(message, &enc) + if err != nil { + // log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) + log.Error(err) + return err + } + decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) + if err != nil { + log.Error(err) + return err + } + + // do decompression + if fstats.IsCompressed && !fstats.IsDir { + decrypted = compress.Decompress(decrypted) + } + + // write to file + n, err := f.Write(decrypted) + if err != nil { + return err + } + // update the bytes written + bytesWritten += n + // update the progress bar + bar.Add(n) + if int64(bytesWritten) == fstats.Size { + log.Debug("finished") + break + } + } + finished <- true + return + }(finished, dataChan) c.WriteMessage(websocket.BinaryMessage, []byte("ready")) startTime := time.Now() - var numBytes int - var bs []byte for { if useWebsockets { var messageType int @@ -209,7 +248,7 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we } } else { // read from TCP connection - message, numBytes, bs, err = tcpConnection.Read() + message, _, _, err = tcpConnection.Read() // log.Debugf("message: %s", message) } if err != nil { @@ -220,39 +259,19 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we log.Debug("got magic") break } - - // do decryption - var enc crypt.Encryption - err = json.Unmarshal(message, &enc) - if err != nil { - log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) - return err + select { + case dataChan <- message: + continue + default: + log.Debug("blocked") + // no message sent + // block + dataChan <- message } - decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) - if err != nil { - return err - } - - // do decompression - if fstats.IsCompressed && !fstats.IsDir { - decrypted = compress.Decompress(decrypted) - } - - // write to file - n, err := f.Write(decrypted) - if err != nil { - return err - } - // update the bytes written - bytesWritten += n - // update the progress bar - bar.Add(n) - - // if int64(bytesWritten) == fstats.Size { - // break - // } } + _ = <-finished + c.WriteMessage(websocket.BinaryMessage, []byte("done")) // we are finished transferTime = time.Since(startTime) From 68896f6ef78a366c9225fcd108f9c2bbc4c5cb9d Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 12:11:36 -0700 Subject: [PATCH 03/13] use all cpu --- main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/main.go b/main.go index 1188d9d..65a706c 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "log" "os" "path/filepath" + "runtime" "time" humanize "github.com/dustin/go-humanize" @@ -22,6 +23,7 @@ var codePhrase string var cr *croc.Croc func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) app := cli.NewApp() app.Name = "croc" if version == "" { From 7fea858252e9c796138d933788fd90a76266bdd7 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 12:26:32 -0700 Subject: [PATCH 04/13] update deps --- go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 110af3a..db36022 100644 --- a/go.mod +++ b/go.mod @@ -13,11 +13,11 @@ require ( github.com/schollz/pake v1.0.2 github.com/schollz/peerdiscovery v1.2.2 github.com/schollz/progressbar/v2 v2.5.3 - github.com/schollz/spinner v0.0.0-20180922210718-ea497ee41258 + github.com/schollz/spinner v0.0.0-20180925172146-6bbc5f7804f9 github.com/schollz/utils v1.0.0 github.com/stretchr/testify v1.2.2 github.com/tscholl2/siec v0.0.0-20180721101609-21667da05937 github.com/urfave/cli v1.20.0 golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b - golang.org/x/net v0.0.0-20180921000356-2f5d2388922f // indirect + golang.org/x/net v0.0.0-20180925072008-f04abc6bdfa7 // indirect ) From a7b848804083978c13f8844664a97ef684b9c887 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 12:33:51 -0700 Subject: [PATCH 05/13] use all procs --- main.go | 2 -- src/croc/croc.go | 5 +++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 65a706c..1188d9d 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,6 @@ import ( "log" "os" "path/filepath" - "runtime" "time" humanize "github.com/dustin/go-humanize" @@ -23,7 +22,6 @@ var codePhrase string var cr *croc.Croc func main() { - runtime.GOMAXPROCS(runtime.NumCPU()) app := cli.NewApp() app.Name = "croc" if version == "" { diff --git a/src/croc/croc.go b/src/croc/croc.go index c25c2cb..9f83346 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -1,6 +1,7 @@ package croc import ( + "runtime" "time" "github.com/schollz/croc/src/logger" @@ -10,6 +11,10 @@ import ( "github.com/schollz/croc/src/zipper" ) +func init() { + runtime.GOMAXPROCS(runtime.NumCPU()) +} + // Croc options type Croc struct { // Options for all From 667edd037379cb88fe9e5ef82d2b4e1cdbcb3122 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 15:14:58 -0700 Subject: [PATCH 06/13] multiple tcp ports --- main.go | 9 +++++---- src/croc/croc.go | 8 ++++---- src/croc/sending.go | 16 ++++++++-------- src/relay/relay.go | 8 +++++--- 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/main.go b/main.go index 1188d9d..362338d 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "log" "os" "path/filepath" + "strings" "time" humanize "github.com/dustin/go-humanize" @@ -63,7 +64,7 @@ func main() { app.Flags = []cli.Flag{ cli.StringFlag{Name: "addr", Value: "198.199.67.130", Usage: "address of the public relay"}, cli.StringFlag{Name: "addr-ws", Value: "8153", Usage: "port of the public relay websocket server to connect"}, - cli.StringFlag{Name: "addr-tcp", Value: "8154", Usage: "tcp port of the public relay serer to connect"}, + cli.StringFlag{Name: "addr-tcp", Value: "8154,8155,8156,8157", Usage: "tcp ports of the public relay serer to connect"}, cli.BoolFlag{Name: "no-local", Usage: "disable local mode"}, cli.BoolFlag{Name: "local", Usage: "use only local mode"}, cli.BoolFlag{Name: "debug", Usage: "increase verbosity (a lot)"}, @@ -72,7 +73,7 @@ func main() { cli.BoolFlag{Name: "force-tcp", Usage: "force TCP"}, cli.BoolFlag{Name: "force-web", Usage: "force websockets"}, cli.StringFlag{Name: "port", Value: "8153", Usage: "port that the websocket listens on"}, - cli.StringFlag{Name: "tcp-port", Value: "8154", Usage: "port that the tcp server listens on"}, + cli.StringFlag{Name: "tcp-port", Value: "8154,8155,8156,8157", Usage: "ports that the tcp server listens on"}, cli.StringFlag{Name: "curve", Value: "siec", Usage: "specify elliptic curve to use (p224, p256, p384, p521, siec)"}, } app.EnableBashCompletion = true @@ -88,7 +89,7 @@ func main() { cr = croc.Init(c.GlobalBool("debug")) cr.AllowLocalDiscovery = true cr.Address = c.GlobalString("addr") - cr.AddressTCPPort = c.GlobalString("addr-tcp") + cr.AddressTCPPorts = strings.Split(c.GlobalString("addr-tcp"), ",") cr.AddressWebsocketPort = c.GlobalString("addr-ws") cr.NoRecipientPrompt = c.GlobalBool("yes") cr.Stdout = c.GlobalBool("stdout") @@ -96,7 +97,7 @@ func main() { cr.NoLocal = c.GlobalBool("no-local") cr.ShowText = true cr.RelayWebsocketPort = c.String("port") - cr.RelayTCPPort = c.String("tcp-port") + cr.RelayTCPPorts = strings.Split(c.String("tcp-port"), ",") cr.CurveType = c.String("curve") if c.GlobalBool("force-tcp") { cr.ForceSend = 2 diff --git a/src/croc/croc.go b/src/croc/croc.go index 9f83346..233702c 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -24,12 +24,12 @@ type Croc struct { // Options for relay RelayWebsocketPort string - RelayTCPPort string + RelayTCPPorts []string CurveType string // Options for connecting to server Address string - AddressTCPPort string + AddressTCPPorts []string AddressWebsocketPort string Timeout time.Duration LocalOnly bool @@ -61,11 +61,11 @@ func Init(debug bool) (c *Croc) { c.UseEncryption = true c.AllowLocalDiscovery = true c.RelayWebsocketPort = "8153" - c.RelayTCPPort = "8154" + c.RelayTCPPorts = []string{"8154", "8155", "8156", "8156"} c.CurveType = "siec" c.Address = "198.199.67.130" c.AddressWebsocketPort = "8153" - c.AddressTCPPort = "8154" + c.AddressTCPPorts = []string{"8154", "8155", "8156", "8156"} c.NoRecipientPrompt = true debugLevel := "info" if debug { diff --git a/src/croc/sending.go b/src/croc/sending.go index b55092e..1edf3b9 100644 --- a/src/croc/sending.go +++ b/src/croc/sending.go @@ -40,7 +40,7 @@ func (c *Croc) Send(fname, codephrase string) (err error) { if !c.NoLocal { go func() { // start own relay and connect to it - go relay.Run(c.RelayWebsocketPort, c.RelayTCPPort) + go relay.Run(c.RelayWebsocketPort, c.RelayTCPPorts) time.Sleep(250 * time.Millisecond) // race condition here, but this should work most of the time :( // broadcast for peer discovery @@ -50,12 +50,12 @@ func (c *Croc) Send(fname, codephrase string) (err error) { Limit: 1, TimeLimit: 600 * time.Second, Delay: 50 * time.Millisecond, - Payload: []byte(c.RelayWebsocketPort + "- " + c.RelayTCPPort), + Payload: []byte(c.RelayWebsocketPort + "- " + strings.Join(c.RelayTCPPorts, ",")), }) }() // connect to own relay - errChan <- c.sendReceive("localhost", c.RelayWebsocketPort, c.RelayTCPPort, fname, codephrase, true, true) + errChan <- c.sendReceive("localhost", c.RelayWebsocketPort, c.RelayTCPPorts, fname, codephrase, true, true) }() } else { waitingFor = 1 @@ -106,7 +106,7 @@ func (c *Croc) Receive(codephrase string) (err error) { if err == nil { if resp.StatusCode == http.StatusOK { // we connected, so use this - return c.sendReceive(discovered[0].Address, strings.TrimSpace(ports[0]), strings.TrimSpace(ports[1]), "", codephrase, false, true) + return c.sendReceive(discovered[0].Address, strings.TrimSpace(ports[0]), strings.TrimSpace(strings.Split(ports[1], ",")), "", codephrase, false, true) } } else { log.Debugf("could not connect: %s", err.Error()) @@ -125,7 +125,7 @@ func (c *Croc) Receive(codephrase string) (err error) { return errors.New("must use local or public relay") } -func (c *Croc) sendReceive(address, websocketPort, tcpPort, fname, codephrase string, isSender bool, isLocal bool) (err error) { +func (c *Croc) sendReceive(address, websocketPort string, tcpPorts []string, fname string, codephrase string, isSender bool, isLocal bool) (err error) { defer log.Flush() if len(codephrase) < 4 { return fmt.Errorf("codephrase is too short") @@ -157,9 +157,9 @@ func (c *Croc) sendReceive(address, websocketPort, tcpPort, fname, codephrase st } if isSender { - go sender.Send(c.ForceSend, address, tcpPort, isLocal, done, sock, fname, codephrase, c.UseCompression, c.UseEncryption) + go sender.Send(c.ForceSend, address, tcpPorts, isLocal, done, sock, fname, codephrase, c.UseCompression, c.UseEncryption) } else { - go recipient.Receive(c.ForceSend, address, tcpPort, isLocal, done, sock, codephrase, c.NoRecipientPrompt, c.Stdout) + go recipient.Receive(c.ForceSend, address, tcpPorts, isLocal, done, sock, codephrase, c.NoRecipientPrompt, c.Stdout) } for { @@ -192,5 +192,5 @@ func (c *Croc) sendReceive(address, websocketPort, tcpPort, fname, codephrase st // Relay will start a relay on the specified port func (c *Croc) Relay() (err error) { - return relay.Run(c.RelayWebsocketPort, c.RelayTCPPort) + return relay.Run(c.RelayWebsocketPort, c.RelayTCPPorts) } diff --git a/src/relay/relay.go b/src/relay/relay.go index 42aa596..e713e44 100644 --- a/src/relay/relay.go +++ b/src/relay/relay.go @@ -12,11 +12,13 @@ import ( var DebugLevel string // Run is the async operation for running a server -func Run(port string, tcpPort string) (err error) { +func Run(port string, tcpPorts []string) (err error) { logger.SetLogLevel(DebugLevel) - if tcpPort != "" { - go tcp.Run(DebugLevel, tcpPort) + if len(tcpPorts) > 0 { + for _, tcpPort := range tcpPorts { + go tcp.Run(DebugLevel, tcpPort) + } } go h.run() From 02e80217356d8af83076a33510a13bd353a33d4f Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 16:14:41 -0700 Subject: [PATCH 07/13] fix recipient --- src/croc/sending.go | 6 +-- src/models/constants.go | 5 ++ src/recipient/recipient.go | 107 +++++++++++++++++++++++++------------ 3 files changed, 82 insertions(+), 36 deletions(-) diff --git a/src/croc/sending.go b/src/croc/sending.go index 1edf3b9..2869644 100644 --- a/src/croc/sending.go +++ b/src/croc/sending.go @@ -30,7 +30,7 @@ func (c *Croc) Send(fname, codephrase string) (err error) { if !c.LocalOnly { go func() { // atttempt to connect to public relay - errChan <- c.sendReceive(c.Address, c.AddressWebsocketPort, c.AddressTCPPort, fname, codephrase, true, false) + errChan <- c.sendReceive(c.Address, c.AddressWebsocketPort, c.AddressTCPPorts, fname, codephrase, true, false) }() } else { waitingFor = 1 @@ -106,7 +106,7 @@ func (c *Croc) Receive(codephrase string) (err error) { if err == nil { if resp.StatusCode == http.StatusOK { // we connected, so use this - return c.sendReceive(discovered[0].Address, strings.TrimSpace(ports[0]), strings.TrimSpace(strings.Split(ports[1], ",")), "", codephrase, false, true) + return c.sendReceive(discovered[0].Address, strings.TrimSpace(ports[0]), strings.Split(strings.TrimSpace(ports[1]), ","), "", codephrase, false, true) } } else { log.Debugf("could not connect: %s", err.Error()) @@ -119,7 +119,7 @@ func (c *Croc) Receive(codephrase string) (err error) { // use public relay if !c.LocalOnly { log.Debug("using public relay") - return c.sendReceive(c.Address, c.AddressWebsocketPort, c.AddressTCPPort, "", codephrase, false, false) + return c.sendReceive(c.Address, c.AddressWebsocketPort, c.AddressTCPPorts, "", codephrase, false, false) } return errors.New("must use local or public relay") diff --git a/src/models/constants.go b/src/models/constants.go index 0c23ef9..6ec61d7 100644 --- a/src/models/constants.go +++ b/src/models/constants.go @@ -2,3 +2,8 @@ package models const WEBSOCKET_BUFFER_SIZE = 1024 * 1024 * 32 const TCP_BUFFER_SIZE = 1024 * 64 + +type BytesAndLocation struct { + Bytes []byte `json:"b"` + Location int64 `json:"l"` +} diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 1df0a93..2154ed0 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -31,9 +31,9 @@ import ( var DebugLevel string // Receive is the async operation to receive a file -func Receive(forceSend int, serverAddress, serverTCP string, isLocal bool, done chan struct{}, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) { +func Receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) { logger.SetLogLevel(DebugLevel) - err := receive(forceSend, serverAddress, serverTCP, isLocal, c, codephrase, noPrompt, useStdout) + err := receive(forceSend, serverAddress, tcpPorts, isLocal, c, codephrase, noPrompt, useStdout) if err != nil { if !strings.HasPrefix(err.Error(), "websocket: close 100") { fmt.Fprintf(os.Stderr, "\n"+err.Error()) @@ -42,13 +42,13 @@ func Receive(forceSend int, serverAddress, serverTCP string, isLocal bool, done done <- struct{}{} } -func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) (err error) { +func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) (err error) { var fstats models.FileStats var sessionKey []byte var transferTime time.Duration var hash256 []byte var otherIP string - var tcpConnection comm.Comm + var tcpConnections []comm.Comm dataChan := make(chan []byte, 1024*1024) useWebsockets := true @@ -174,12 +174,15 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we // connect to TCP to receive file if !useWebsockets { log.Debugf("connecting to server") - tcpConnection, err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%x", sessionKey)), serverAddress+":"+serverTCP) - if err != nil { - log.Error(err) - return err + tcpConnections := make([]comm.Comm, len(tcpPorts)) + for i, tcpPort := range tcpPorts { + tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%x", sessionKey)), serverAddress+":"+tcpPort) + if err != nil { + log.Error(err) + return err + } + defer tcpConnections[i].Close() } - defer tcpConnection.Close() } // await file @@ -219,8 +222,20 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we decrypted = compress.Decompress(decrypted) } - // write to file - n, err := f.Write(decrypted) + var n int + if !useWebsockets { + var bl models.BytesAndLocation + err = json.Unmarshal(decrypted, &bl) + if err != nil { + log.Error(err) + return err + } + n, err = f.WriteAt(bl.Bytes, bl.Location) + } else { + // write to file + n, err = f.Write(decrypted) + } + if err != nil { return err } @@ -238,35 +253,61 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we }(finished, dataChan) c.WriteMessage(websocket.BinaryMessage, []byte("ready")) startTime := time.Now() - for { - if useWebsockets { + if useWebsockets { + for { var messageType int // read from websockets messageType, message, err = c.ReadMessage() if messageType != websocket.BinaryMessage { continue } - } else { - // read from TCP connection - message, _, _, err = tcpConnection.Read() - // log.Debugf("message: %s", message) + if err != nil { + log.Error(err) + return err + } + if bytes.Equal(message, []byte("magic")) { + log.Debug("got magic") + break + } + select { + case dataChan <- message: + continue + default: + log.Debug("blocked") + // no message sent + // block + dataChan <- message + } } - if err != nil { - log.Error(err) - return err - } - if bytes.Equal(message, []byte("magic")) { - log.Debug("got magic") - break - } - select { - case dataChan <- message: - continue - default: - log.Debug("blocked") - // no message sent - // block - dataChan <- message + _ = <-finished + + } else { + // using TCP + for i := range tcpConnections { + go func(tcpConnection comm.Comm) { + for { + // read from TCP connection + message, _, _, err = tcpConnection.Read() + // log.Debugf("message: %s", message) + if err != nil { + log.Error(err) + return + } + if bytes.Equal(message, []byte("magic")) { + log.Debug("got magic") + break + } + select { + case dataChan <- message: + continue + default: + log.Debug("blocked") + // no message sent + // block + dataChan <- message + } + } + }(tcpConnections[i]) } } From ec5d45307a86623ec6391f26c4f278df9fa3d145 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 17:09:45 -0700 Subject: [PATCH 08/13] seems to work, with problems --- src/croc/croc.go | 4 +- src/recipient/recipient.go | 37 ++++++++---- src/sender/sender.go | 116 ++++++++++++++++++++++++++----------- 3 files changed, 108 insertions(+), 49 deletions(-) diff --git a/src/croc/croc.go b/src/croc/croc.go index 233702c..30618cd 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -61,11 +61,11 @@ func Init(debug bool) (c *Croc) { c.UseEncryption = true c.AllowLocalDiscovery = true c.RelayWebsocketPort = "8153" - c.RelayTCPPorts = []string{"8154", "8155", "8156", "8156"} + c.RelayTCPPorts = []string{"8154", "8155", "8156", "8157"} c.CurveType = "siec" c.Address = "198.199.67.130" c.AddressWebsocketPort = "8153" - c.AddressTCPPorts = []string{"8154", "8155", "8156", "8156"} + c.AddressTCPPorts = []string{"8154", "8155", "8156", "8157"} c.NoRecipientPrompt = true debugLevel := "info" if debug { diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 2154ed0..fc8f368 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -174,15 +174,16 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo // connect to TCP to receive file if !useWebsockets { log.Debugf("connecting to server") - tcpConnections := make([]comm.Comm, len(tcpPorts)) + tcpConnections = make([]comm.Comm, len(tcpPorts)) for i, tcpPort := range tcpPorts { - tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%x", sessionKey)), serverAddress+":"+tcpPort) + tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) if err != nil { log.Error(err) return err } defer tcpConnections[i].Close() } + log.Debugf("fully connected") } // await file @@ -191,6 +192,10 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo log.Error(err) return err } + if err = f.Truncate(fstats.Size); err != nil { + log.Error(err) + return err + } bytesWritten := 0 fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) bar := progressbar.NewOptions( @@ -200,6 +205,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo progressbar.OptionSetWriter(os.Stderr), ) finished := make(chan bool) + go func(finished chan bool, dataChan chan []byte) (err error) { for { message := <-dataChan @@ -217,6 +223,14 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo return err } + var bl models.BytesAndLocation + err = json.Unmarshal(decrypted, &bl) + if err != nil { + log.Error(err) + return err + } + decrypted = bl.Bytes + // do decompression if fstats.IsCompressed && !fstats.IsDir { decrypted = compress.Decompress(decrypted) @@ -224,13 +238,11 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo var n int if !useWebsockets { - var bl models.BytesAndLocation - err = json.Unmarshal(decrypted, &bl) if err != nil { log.Error(err) return err } - n, err = f.WriteAt(bl.Bytes, bl.Location) + n, err = f.WriteAt(decrypted, bl.Location) } else { // write to file n, err = f.Write(decrypted) @@ -251,6 +263,8 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo finished <- true return }(finished, dataChan) + + log.Debug("telling sender i'm ready") c.WriteMessage(websocket.BinaryMessage, []byte("ready")) startTime := time.Now() if useWebsockets { @@ -271,7 +285,6 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } select { case dataChan <- message: - continue default: log.Debug("blocked") // no message sent @@ -280,8 +293,8 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } } _ = <-finished - } else { + log.Debugf("starting listening with tcp with %d connections", len(tcpConnections)) // using TCP for i := range tcpConnections { go func(tcpConnection comm.Comm) { @@ -295,11 +308,10 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } if bytes.Equal(message, []byte("magic")) { log.Debug("got magic") - break + return } select { case dataChan <- message: - continue default: log.Debug("blocked") // no message sent @@ -312,6 +324,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } _ = <-finished + log.Debug("finished") c.WriteMessage(websocket.BinaryMessage, []byte("done")) // we are finished @@ -389,7 +402,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } func connectToTCPServer(room string, address string) (com comm.Comm, err error) { - log.Debugf("connecting to %s", address) + log.Debugf("recipient connecting to %s", address) connection, err := net.Dial("tcp", address) if err != nil { return @@ -404,14 +417,14 @@ func connectToTCPServer(room string, address string) (com comm.Comm, err error) if err != nil { return } - log.Debugf("server says: %s", ok) + log.Debugf("[%s] server says: %s", address, ok) err = com.Send(room) if err != nil { return } ok, err = com.Receive() - log.Debugf("server says: %s", ok) + log.Debugf("[%s] server says: %s", address, ok) if err != nil { return } diff --git a/src/sender/sender.go b/src/sender/sender.go index 00b28cf..049c3ca 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -30,10 +30,10 @@ import ( var DebugLevel string // Send is the async call to send data -func Send(forceSend int, serverAddress, serverTCP string, isLocal bool, done chan struct{}, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) { +func Send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) { logger.SetLogLevel(DebugLevel) log.Debugf("sending %s", fname) - err := send(forceSend, serverAddress, serverTCP, isLocal, c, fname, codephrase, useCompression, useEncryption) + err := send(forceSend, serverAddress, tcpPorts, isLocal, c, fname, codephrase, useCompression, useEncryption) if err != nil { if !strings.HasPrefix(err.Error(), "websocket: close 100") { fmt.Fprintf(os.Stderr, "\n"+err.Error()) @@ -43,19 +43,20 @@ func Send(forceSend int, serverAddress, serverTCP string, isLocal bool, done cha done <- struct{}{} } -func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) (err error) { +func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) (err error) { var f *os.File defer f.Close() // ignore the error if it wasn't opened :( var fstats models.FileStats var fileHash []byte var otherIP string var startTransfer time.Time - var tcpConnection comm.Comm + var tcpConnections []comm.Comm type DataChan struct { - b []byte - bytesRead int - err error + b []byte + currentPostition int64 + bytesRead int + err error } dataChan := make(chan DataChan, 1024*1024) defer close(dataChan) @@ -175,6 +176,7 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso } else { buffer = make([]byte, models.TCP_BUFFER_SIZE/2) } + currentPostition := int64(0) for { bytesread, err := f.Read(buffer) if bytesread > 0 { @@ -186,8 +188,11 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso compressedBytes = buffer[:bytesread] } + // put number of byte read + transferBytes, err := json.Marshal(models.BytesAndLocation{Bytes: compressedBytes, Location: currentPostition}) + // do encryption - enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) + enc := crypt.Encrypt(transferBytes, sessionKey, !useEncryption) encBytes, err := json.Marshal(enc) if err != nil { dataChan <- DataChan{ @@ -198,13 +203,21 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso return } + if err != nil { + dataChan <- DataChan{ + b: nil, + bytesRead: 0, + err: err, + } + return + } + select { case dataChan <- DataChan{ b: encBytes, bytesRead: bytesread, err: nil, }: - continue default: log.Debug("blocked") // no message sent @@ -215,6 +228,7 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso err: nil, } } + currentPostition += int64(bytesread) } if err != nil { if err != io.EOF { @@ -229,11 +243,14 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso bytesRead: len([]byte("magic")), err: nil, } - // finish - dataChan <- DataChan{ - b: nil, - bytesRead: 0, - err: nil, + if !useWebsockets { + for i := 0; i < len(tcpConnections)-1; i++ { + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: len([]byte("magic")), + err: nil, + } + } } }(dataChan) }() @@ -285,14 +302,19 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso return errors.New("recipient refused file") } + // connect to TCP to receive file if !useWebsockets { - // connection to TCP - tcpConnection, err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%x", sessionKey)), serverAddress+":"+serverTCP) - if err != nil { - log.Error(err) - return + log.Debugf("connecting to server") + tcpConnections = make([]comm.Comm, len(tcpPorts)) + for i, tcpPort := range tcpPorts { + log.Debug(tcpPort) + tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) + if err != nil { + log.Error(err) + return err + } + defer tcpConnections[i].Close() } - defer tcpConnection.Close() } fmt.Fprintf(os.Stderr, "\rSending (->%s)...\n", otherIP) @@ -305,27 +327,51 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso progressbar.OptionSetBytes(int(fstats.Size)), progressbar.OptionSetWriter(os.Stderr), ) - for { - data := <-dataChan - if data.err != nil { - return data.err - } - if data.bytesRead > 0 { - bar.Add(data.bytesRead) - if !useWebsockets { - // write data to tcp connection - _, err = tcpConnection.Write(data.b) - } else { - // write data to websockets - err = c.WriteMessage(websocket.BinaryMessage, data.b) + + if useWebsockets { + for { + data := <-dataChan + if data.err != nil { + return data.err } + bar.Add(data.bytesRead) + // write data to websockets + err = c.WriteMessage(websocket.BinaryMessage, data.b) if err != nil { err = errors.Wrap(err, "problem writing message") return err } - } else { - break + if bytes.Equal(data.b, []byte("magic")) { + return + } } + + } else { + for i := range tcpConnections { + go func(tcpConnection comm.Comm) { + for { + data := <-dataChan + if data.err != nil { + log.Error(data.err) + return + } + + bar.Add(data.bytesRead) + // write data to tcp connection + _, err = tcpConnection.Write(data.b) + if err != nil { + err = errors.Wrap(err, "problem writing message") + log.Error(err) + return + } + if bytes.Equal(data.b, []byte("magic")) { + return + } + + } + }(tcpConnections[i]) + } + } bar.Finish() From 71fee31da0dc4e7804aaa387a0371867e876dc7c Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 17:18:43 -0700 Subject: [PATCH 09/13] TCP ports use lots --- src/recipient/recipient.go | 4 +--- src/sender/sender.go | 5 ++--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index fc8f368..b6db4c9 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -292,7 +292,6 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo dataChan <- message } } - _ = <-finished } else { log.Debugf("starting listening with tcp with %d connections", len(tcpConnections)) // using TCP @@ -324,8 +323,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } _ = <-finished - log.Debug("finished") - + log.Debug("telling sender i'm done") c.WriteMessage(websocket.BinaryMessage, []byte("done")) // we are finished transferTime = time.Since(startTime) diff --git a/src/sender/sender.go b/src/sender/sender.go index 049c3ca..9c3c832 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -342,10 +342,9 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, return err } if bytes.Equal(data.b, []byte("magic")) { - return + break } } - } else { for i := range tcpConnections { go func(tcpConnection comm.Comm) { @@ -355,7 +354,6 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, log.Error(data.err) return } - bar.Add(data.bytesRead) // write data to tcp connection _, err = tcpConnection.Write(data.b) @@ -383,6 +381,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, case 5: transferTime := time.Since(startTransfer) if !bytes.HasPrefix(message, []byte("hash:")) { + log.Debugf("%s", message) continue } c.WriteMessage(websocket.BinaryMessage, fileHash) From c8c532c5dd488755653548a082b18e7fabbe7074 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 17:24:06 -0700 Subject: [PATCH 10/13] small fix --- src/sender/sender.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/sender/sender.go b/src/sender/sender.go index 9c3c832..97ceb75 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -190,10 +190,6 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, // put number of byte read transferBytes, err := json.Marshal(models.BytesAndLocation{Bytes: compressedBytes, Location: currentPostition}) - - // do encryption - enc := crypt.Encrypt(transferBytes, sessionKey, !useEncryption) - encBytes, err := json.Marshal(enc) if err != nil { dataChan <- DataChan{ b: nil, @@ -203,6 +199,9 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, return } + // do encryption + enc := crypt.Encrypt(transferBytes, sessionKey, !useEncryption) + encBytes, err := json.Marshal(enc) if err != nil { dataChan <- DataChan{ b: nil, @@ -347,9 +346,8 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } } else { for i := range tcpConnections { - go func(tcpConnection comm.Comm) { - for { - data := <-dataChan + go func(dataChan <-chan DataChan, tcpConnection comm.Comm) { + for data := range dataChan { if data.err != nil { log.Error(data.err) return @@ -367,7 +365,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } } - }(tcpConnections[i]) + }(dataChan, tcpConnections[i]) } } From 7f0b919b0bbcc38324fc38f14cc6d51109be2401 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 18:25:12 -0700 Subject: [PATCH 11/13] fix bar --- src/sender/sender.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/sender/sender.go b/src/sender/sender.go index 97ceb75..c78a6b3 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" log "github.com/cihub/seelog" @@ -345,8 +346,11 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } } } else { + var wg sync.WaitGroup + wg.Add(len(tcpConnections)) for i := range tcpConnections { go func(dataChan <-chan DataChan, tcpConnection comm.Comm) { + defer wg.Done() for data := range dataChan { if data.err != nil { log.Error(data.err) @@ -367,7 +371,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } }(dataChan, tcpConnections[i]) } - + wg.Wait() } bar.Finish() From efa66f7cbe2de142a7d81e9c04251e3bc00b4647 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 18:38:19 -0700 Subject: [PATCH 12/13] better marshaling --- src/recipient/recipient.go | 15 ++++++++------- src/sender/sender.go | 14 ++++---------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index b6db4c9..6685f51 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "net" "os" + "strconv" "strings" "time" @@ -223,13 +224,13 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo return err } - var bl models.BytesAndLocation - err = json.Unmarshal(decrypted, &bl) - if err != nil { - log.Error(err) - return err + // get location if TCP + var locationToWrite int + if !useWebsockets { + pieces := bytes.SplitN(decrypted, []byte("-"), 2) + decrypted = pieces[1] + locationToWrite, _ = strconv.Atoi(string(pieces[0])) } - decrypted = bl.Bytes // do decompression if fstats.IsCompressed && !fstats.IsDir { @@ -242,7 +243,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo log.Error(err) return err } - n, err = f.WriteAt(decrypted, bl.Location) + n, err = f.WriteAt(decrypted, int64(locationToWrite)) } else { // write to file n, err = f.Write(decrypted) diff --git a/src/sender/sender.go b/src/sender/sender.go index c78a6b3..521edf9 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -189,19 +189,13 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, compressedBytes = buffer[:bytesread] } - // put number of byte read - transferBytes, err := json.Marshal(models.BytesAndLocation{Bytes: compressedBytes, Location: currentPostition}) - if err != nil { - dataChan <- DataChan{ - b: nil, - bytesRead: 0, - err: err, - } - return + // if using TCP, prepend the location to write the data to in the resulting file + if !useWebsockets { + compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...) } // do encryption - enc := crypt.Encrypt(transferBytes, sessionKey, !useEncryption) + enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) encBytes, err := json.Marshal(enc) if err != nil { dataChan <- DataChan{ From 162b12f42b0e25c539e97e36d6f9bb60c31de1e3 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 25 Sep 2018 18:56:36 -0700 Subject: [PATCH 13/13] don't need this anymore --- src/models/constants.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/models/constants.go b/src/models/constants.go index 6ec61d7..0c23ef9 100644 --- a/src/models/constants.go +++ b/src/models/constants.go @@ -2,8 +2,3 @@ package models const WEBSOCKET_BUFFER_SIZE = 1024 * 1024 * 32 const TCP_BUFFER_SIZE = 1024 * 64 - -type BytesAndLocation struct { - Bytes []byte `json:"b"` - Location int64 `json:"l"` -}