diff --git a/go.mod b/go.mod index 110af3a..db36022 100644 --- a/go.mod +++ b/go.mod @@ -13,11 +13,11 @@ require ( github.com/schollz/pake v1.0.2 github.com/schollz/peerdiscovery v1.2.2 github.com/schollz/progressbar/v2 v2.5.3 - github.com/schollz/spinner v0.0.0-20180922210718-ea497ee41258 + github.com/schollz/spinner v0.0.0-20180925172146-6bbc5f7804f9 github.com/schollz/utils v1.0.0 github.com/stretchr/testify v1.2.2 github.com/tscholl2/siec v0.0.0-20180721101609-21667da05937 github.com/urfave/cli v1.20.0 golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b - golang.org/x/net v0.0.0-20180921000356-2f5d2388922f // indirect + golang.org/x/net v0.0.0-20180925072008-f04abc6bdfa7 // indirect ) diff --git a/main.go b/main.go index 1188d9d..362338d 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "log" "os" "path/filepath" + "strings" "time" humanize "github.com/dustin/go-humanize" @@ -63,7 +64,7 @@ func main() { app.Flags = []cli.Flag{ cli.StringFlag{Name: "addr", Value: "198.199.67.130", Usage: "address of the public relay"}, cli.StringFlag{Name: "addr-ws", Value: "8153", Usage: "port of the public relay websocket server to connect"}, - cli.StringFlag{Name: "addr-tcp", Value: "8154", Usage: "tcp port of the public relay serer to connect"}, + cli.StringFlag{Name: "addr-tcp", Value: "8154,8155,8156,8157", Usage: "tcp ports of the public relay serer to connect"}, cli.BoolFlag{Name: "no-local", Usage: "disable local mode"}, cli.BoolFlag{Name: "local", Usage: "use only local mode"}, cli.BoolFlag{Name: "debug", Usage: "increase verbosity (a lot)"}, @@ -72,7 +73,7 @@ func main() { cli.BoolFlag{Name: "force-tcp", Usage: "force TCP"}, cli.BoolFlag{Name: "force-web", Usage: "force websockets"}, cli.StringFlag{Name: "port", Value: "8153", Usage: "port that the websocket listens on"}, - cli.StringFlag{Name: "tcp-port", Value: "8154", Usage: "port that the tcp server listens on"}, + cli.StringFlag{Name: "tcp-port", Value: "8154,8155,8156,8157", Usage: "ports that the tcp server listens on"}, cli.StringFlag{Name: "curve", Value: "siec", Usage: "specify elliptic curve to use (p224, p256, p384, p521, siec)"}, } app.EnableBashCompletion = true @@ -88,7 +89,7 @@ func main() { cr = croc.Init(c.GlobalBool("debug")) cr.AllowLocalDiscovery = true cr.Address = c.GlobalString("addr") - cr.AddressTCPPort = c.GlobalString("addr-tcp") + cr.AddressTCPPorts = strings.Split(c.GlobalString("addr-tcp"), ",") cr.AddressWebsocketPort = c.GlobalString("addr-ws") cr.NoRecipientPrompt = c.GlobalBool("yes") cr.Stdout = c.GlobalBool("stdout") @@ -96,7 +97,7 @@ func main() { cr.NoLocal = c.GlobalBool("no-local") cr.ShowText = true cr.RelayWebsocketPort = c.String("port") - cr.RelayTCPPort = c.String("tcp-port") + cr.RelayTCPPorts = strings.Split(c.String("tcp-port"), ",") cr.CurveType = c.String("curve") if c.GlobalBool("force-tcp") { cr.ForceSend = 2 diff --git a/src/croc/croc.go b/src/croc/croc.go index c25c2cb..30618cd 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -1,6 +1,7 @@ package croc import ( + "runtime" "time" "github.com/schollz/croc/src/logger" @@ -10,6 +11,10 @@ import ( "github.com/schollz/croc/src/zipper" ) +func init() { + runtime.GOMAXPROCS(runtime.NumCPU()) +} + // Croc options type Croc struct { // Options for all @@ -19,12 +24,12 @@ type Croc struct { // Options for relay RelayWebsocketPort string - RelayTCPPort string + RelayTCPPorts []string CurveType string // Options for connecting to server Address string - AddressTCPPort string + AddressTCPPorts []string AddressWebsocketPort string Timeout time.Duration LocalOnly bool @@ -56,11 +61,11 @@ func Init(debug bool) (c *Croc) { c.UseEncryption = true c.AllowLocalDiscovery = true c.RelayWebsocketPort = "8153" - c.RelayTCPPort = "8154" + c.RelayTCPPorts = []string{"8154", "8155", "8156", "8157"} c.CurveType = "siec" c.Address = "198.199.67.130" c.AddressWebsocketPort = "8153" - c.AddressTCPPort = "8154" + c.AddressTCPPorts = []string{"8154", "8155", "8156", "8157"} c.NoRecipientPrompt = true debugLevel := "info" if debug { diff --git a/src/croc/sending.go b/src/croc/sending.go index b55092e..2869644 100644 --- a/src/croc/sending.go +++ b/src/croc/sending.go @@ -30,7 +30,7 @@ func (c *Croc) Send(fname, codephrase string) (err error) { if !c.LocalOnly { go func() { // atttempt to connect to public relay - errChan <- c.sendReceive(c.Address, c.AddressWebsocketPort, c.AddressTCPPort, fname, codephrase, true, false) + errChan <- c.sendReceive(c.Address, c.AddressWebsocketPort, c.AddressTCPPorts, fname, codephrase, true, false) }() } else { waitingFor = 1 @@ -40,7 +40,7 @@ func (c *Croc) Send(fname, codephrase string) (err error) { if !c.NoLocal { go func() { // start own relay and connect to it - go relay.Run(c.RelayWebsocketPort, c.RelayTCPPort) + go relay.Run(c.RelayWebsocketPort, c.RelayTCPPorts) time.Sleep(250 * time.Millisecond) // race condition here, but this should work most of the time :( // broadcast for peer discovery @@ -50,12 +50,12 @@ func (c *Croc) Send(fname, codephrase string) (err error) { Limit: 1, TimeLimit: 600 * time.Second, Delay: 50 * time.Millisecond, - Payload: []byte(c.RelayWebsocketPort + "- " + c.RelayTCPPort), + Payload: []byte(c.RelayWebsocketPort + "- " + strings.Join(c.RelayTCPPorts, ",")), }) }() // connect to own relay - errChan <- c.sendReceive("localhost", c.RelayWebsocketPort, c.RelayTCPPort, fname, codephrase, true, true) + errChan <- c.sendReceive("localhost", c.RelayWebsocketPort, c.RelayTCPPorts, fname, codephrase, true, true) }() } else { waitingFor = 1 @@ -106,7 +106,7 @@ func (c *Croc) Receive(codephrase string) (err error) { if err == nil { if resp.StatusCode == http.StatusOK { // we connected, so use this - return c.sendReceive(discovered[0].Address, strings.TrimSpace(ports[0]), strings.TrimSpace(ports[1]), "", codephrase, false, true) + return c.sendReceive(discovered[0].Address, strings.TrimSpace(ports[0]), strings.Split(strings.TrimSpace(ports[1]), ","), "", codephrase, false, true) } } else { log.Debugf("could not connect: %s", err.Error()) @@ -119,13 +119,13 @@ func (c *Croc) Receive(codephrase string) (err error) { // use public relay if !c.LocalOnly { log.Debug("using public relay") - return c.sendReceive(c.Address, c.AddressWebsocketPort, c.AddressTCPPort, "", codephrase, false, false) + return c.sendReceive(c.Address, c.AddressWebsocketPort, c.AddressTCPPorts, "", codephrase, false, false) } return errors.New("must use local or public relay") } -func (c *Croc) sendReceive(address, websocketPort, tcpPort, fname, codephrase string, isSender bool, isLocal bool) (err error) { +func (c *Croc) sendReceive(address, websocketPort string, tcpPorts []string, fname string, codephrase string, isSender bool, isLocal bool) (err error) { defer log.Flush() if len(codephrase) < 4 { return fmt.Errorf("codephrase is too short") @@ -157,9 +157,9 @@ func (c *Croc) sendReceive(address, websocketPort, tcpPort, fname, codephrase st } if isSender { - go sender.Send(c.ForceSend, address, tcpPort, isLocal, done, sock, fname, codephrase, c.UseCompression, c.UseEncryption) + go sender.Send(c.ForceSend, address, tcpPorts, isLocal, done, sock, fname, codephrase, c.UseCompression, c.UseEncryption) } else { - go recipient.Receive(c.ForceSend, address, tcpPort, isLocal, done, sock, codephrase, c.NoRecipientPrompt, c.Stdout) + go recipient.Receive(c.ForceSend, address, tcpPorts, isLocal, done, sock, codephrase, c.NoRecipientPrompt, c.Stdout) } for { @@ -192,5 +192,5 @@ func (c *Croc) sendReceive(address, websocketPort, tcpPort, fname, codephrase st // Relay will start a relay on the specified port func (c *Croc) Relay() (err error) { - return relay.Run(c.RelayWebsocketPort, c.RelayTCPPort) + return relay.Run(c.RelayWebsocketPort, c.RelayTCPPorts) } diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 652225a..6685f51 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "net" "os" + "strconv" "strings" "time" @@ -31,9 +32,9 @@ import ( var DebugLevel string // Receive is the async operation to receive a file -func Receive(forceSend int, serverAddress, serverTCP string, isLocal bool, done chan struct{}, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) { +func Receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) { logger.SetLogLevel(DebugLevel) - err := receive(forceSend, serverAddress, serverTCP, isLocal, c, codephrase, noPrompt, useStdout) + err := receive(forceSend, serverAddress, tcpPorts, isLocal, c, codephrase, noPrompt, useStdout) if err != nil { if !strings.HasPrefix(err.Error(), "websocket: close 100") { fmt.Fprintf(os.Stderr, "\n"+err.Error()) @@ -42,13 +43,14 @@ func Receive(forceSend int, serverAddress, serverTCP string, isLocal bool, done done <- struct{}{} } -func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) (err error) { +func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, codephrase string, noPrompt bool, useStdout bool) (err error) { var fstats models.FileStats var sessionKey []byte var transferTime time.Duration var hash256 []byte var otherIP string - var tcpConnection comm.Comm + var tcpConnections []comm.Comm + dataChan := make(chan []byte, 1024*1024) useWebsockets := true switch forceSend { @@ -173,12 +175,16 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we // connect to TCP to receive file if !useWebsockets { log.Debugf("connecting to server") - tcpConnection, err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%x", sessionKey)), serverAddress+":"+serverTCP) - if err != nil { - log.Error(err) - return err + tcpConnections = make([]comm.Comm, len(tcpPorts)) + for i, tcpPort := range tcpPorts { + tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) + if err != nil { + log.Error(err) + return err + } + defer tcpConnections[i].Close() } - defer tcpConnection.Close() + log.Debugf("fully connected") } // await file @@ -187,6 +193,10 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we log.Error(err) return err } + if err = f.Truncate(fstats.Size); err != nil { + log.Error(err) + return err + } bytesWritten := 0 fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) bar := progressbar.NewOptions( @@ -195,60 +205,126 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we progressbar.OptionSetBytes(int(fstats.Size)), progressbar.OptionSetWriter(os.Stderr), ) + finished := make(chan bool) + + go func(finished chan bool, dataChan chan []byte) (err error) { + for { + message := <-dataChan + // do decryption + var enc crypt.Encryption + err = json.Unmarshal(message, &enc) + if err != nil { + // log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) + log.Error(err) + return err + } + decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) + if err != nil { + log.Error(err) + return err + } + + // get location if TCP + var locationToWrite int + if !useWebsockets { + pieces := bytes.SplitN(decrypted, []byte("-"), 2) + decrypted = pieces[1] + locationToWrite, _ = strconv.Atoi(string(pieces[0])) + } + + // do decompression + if fstats.IsCompressed && !fstats.IsDir { + decrypted = compress.Decompress(decrypted) + } + + var n int + if !useWebsockets { + if err != nil { + log.Error(err) + return err + } + n, err = f.WriteAt(decrypted, int64(locationToWrite)) + } else { + // write to file + n, err = f.Write(decrypted) + } + + if err != nil { + return err + } + // update the bytes written + bytesWritten += n + // update the progress bar + bar.Add(n) + if int64(bytesWritten) == fstats.Size { + log.Debug("finished") + break + } + } + finished <- true + return + }(finished, dataChan) + + log.Debug("telling sender i'm ready") c.WriteMessage(websocket.BinaryMessage, []byte("ready")) startTime := time.Now() - var numBytes int - var bs []byte - for { - if useWebsockets { + if useWebsockets { + for { var messageType int // read from websockets messageType, message, err = c.ReadMessage() if messageType != websocket.BinaryMessage { continue } - } else { - // read from TCP connection - message, numBytes, bs, err = tcpConnection.Read() - // log.Debugf("message: %s", message) + if err != nil { + log.Error(err) + return err + } + if bytes.Equal(message, []byte("magic")) { + log.Debug("got magic") + break + } + select { + case dataChan <- message: + default: + log.Debug("blocked") + // no message sent + // block + dataChan <- message + } } - if err != nil { - log.Error(err) - return err - } - - // do decryption - var enc crypt.Encryption - err = json.Unmarshal(message, &enc) - if err != nil { - log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) - return err - } - decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) - if err != nil { - return err - } - - // do decompression - if fstats.IsCompressed && !fstats.IsDir { - decrypted = compress.Decompress(decrypted) - } - - // write to file - n, err := f.Write(decrypted) - if err != nil { - return err - } - // update the bytes written - bytesWritten += n - // update the progress bar - bar.Add(n) - - if int64(bytesWritten) == fstats.Size { - break + } else { + log.Debugf("starting listening with tcp with %d connections", len(tcpConnections)) + // using TCP + for i := range tcpConnections { + go func(tcpConnection comm.Comm) { + for { + // read from TCP connection + message, _, _, err = tcpConnection.Read() + // log.Debugf("message: %s", message) + if err != nil { + log.Error(err) + return + } + if bytes.Equal(message, []byte("magic")) { + log.Debug("got magic") + return + } + select { + case dataChan <- message: + default: + log.Debug("blocked") + // no message sent + // block + dataChan <- message + } + } + }(tcpConnections[i]) } } + _ = <-finished + log.Debug("telling sender i'm done") c.WriteMessage(websocket.BinaryMessage, []byte("done")) // we are finished transferTime = time.Since(startTime) @@ -325,7 +401,7 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we } func connectToTCPServer(room string, address string) (com comm.Comm, err error) { - log.Debugf("connecting to %s", address) + log.Debugf("recipient connecting to %s", address) connection, err := net.Dial("tcp", address) if err != nil { return @@ -340,14 +416,14 @@ func connectToTCPServer(room string, address string) (com comm.Comm, err error) if err != nil { return } - log.Debugf("server says: %s", ok) + log.Debugf("[%s] server says: %s", address, ok) err = com.Send(room) if err != nil { return } ok, err = com.Receive() - log.Debugf("server says: %s", ok) + log.Debugf("[%s] server says: %s", address, ok) if err != nil { return } diff --git a/src/relay/relay.go b/src/relay/relay.go index 42aa596..e713e44 100644 --- a/src/relay/relay.go +++ b/src/relay/relay.go @@ -12,11 +12,13 @@ import ( var DebugLevel string // Run is the async operation for running a server -func Run(port string, tcpPort string) (err error) { +func Run(port string, tcpPorts []string) (err error) { logger.SetLogLevel(DebugLevel) - if tcpPort != "" { - go tcp.Run(DebugLevel, tcpPort) + if len(tcpPorts) > 0 { + for _, tcpPort := range tcpPorts { + go tcp.Run(DebugLevel, tcpPort) + } } go h.run() diff --git a/src/sender/sender.go b/src/sender/sender.go index a660c1b..521edf9 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" log "github.com/cihub/seelog" @@ -30,10 +31,10 @@ import ( var DebugLevel string // Send is the async call to send data -func Send(forceSend int, serverAddress, serverTCP string, isLocal bool, done chan struct{}, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) { +func Send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, done chan struct{}, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) { logger.SetLogLevel(DebugLevel) log.Debugf("sending %s", fname) - err := send(forceSend, serverAddress, serverTCP, isLocal, c, fname, codephrase, useCompression, useEncryption) + err := send(forceSend, serverAddress, tcpPorts, isLocal, c, fname, codephrase, useCompression, useEncryption) if err != nil { if !strings.HasPrefix(err.Error(), "websocket: close 100") { fmt.Fprintf(os.Stderr, "\n"+err.Error()) @@ -43,19 +44,20 @@ func Send(forceSend int, serverAddress, serverTCP string, isLocal bool, done cha done <- struct{}{} } -func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) (err error) { +func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, c *websocket.Conn, fname string, codephrase string, useCompression bool, useEncryption bool) (err error) { var f *os.File defer f.Close() // ignore the error if it wasn't opened :( var fstats models.FileStats var fileHash []byte var otherIP string var startTransfer time.Time - var tcpConnection comm.Comm + var tcpConnections []comm.Comm type DataChan struct { - b []byte - bytesRead int - err error + b []byte + currentPostition int64 + bytesRead int + err error } dataChan := make(chan DataChan, 1024*1024) defer close(dataChan) @@ -175,6 +177,7 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso } else { buffer = make([]byte, models.TCP_BUFFER_SIZE/2) } + currentPostition := int64(0) for { bytesread, err := f.Read(buffer) if bytesread > 0 { @@ -186,6 +189,11 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso compressedBytes = buffer[:bytesread] } + // if using TCP, prepend the location to write the data to in the resulting file + if !useWebsockets { + compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...) + } + // do encryption enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) encBytes, err := json.Marshal(enc) @@ -204,7 +212,6 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso bytesRead: bytesread, err: nil, }: - continue default: log.Debug("blocked") // no message sent @@ -215,6 +222,7 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso err: nil, } } + currentPostition += int64(bytesread) } if err != nil { if err != io.EOF { @@ -225,10 +233,19 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso } // finish dataChan <- DataChan{ - b: nil, - bytesRead: 0, + b: []byte("magic"), + bytesRead: len([]byte("magic")), err: nil, } + if !useWebsockets { + for i := 0; i < len(tcpConnections)-1; i++ { + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: len([]byte("magic")), + err: nil, + } + } + } }(dataChan) }() @@ -279,14 +296,19 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso return errors.New("recipient refused file") } + // connect to TCP to receive file if !useWebsockets { - // connection to TCP - tcpConnection, err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%x", sessionKey)), serverAddress+":"+serverTCP) - if err != nil { - log.Error(err) - return + log.Debugf("connecting to server") + tcpConnections = make([]comm.Comm, len(tcpPorts)) + for i, tcpPort := range tcpPorts { + log.Debug(tcpPort) + tcpConnections[i], err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%d%x", i, sessionKey)), serverAddress+":"+tcpPort) + if err != nil { + log.Error(err) + return err + } + defer tcpConnections[i].Close() } - defer tcpConnection.Close() } fmt.Fprintf(os.Stderr, "\rSending (->%s)...\n", otherIP) @@ -299,27 +321,51 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso progressbar.OptionSetBytes(int(fstats.Size)), progressbar.OptionSetWriter(os.Stderr), ) - for { - data := <-dataChan - if data.err != nil { - return data.err - } - if data.bytesRead > 0 { - bar.Add(data.bytesRead) - if !useWebsockets { - // write data to tcp connection - _, err = tcpConnection.Write(data.b) - } else { - // write data to websockets - err = c.WriteMessage(websocket.BinaryMessage, data.b) + + if useWebsockets { + for { + data := <-dataChan + if data.err != nil { + return data.err } + bar.Add(data.bytesRead) + // write data to websockets + err = c.WriteMessage(websocket.BinaryMessage, data.b) if err != nil { err = errors.Wrap(err, "problem writing message") return err } - } else { - break + if bytes.Equal(data.b, []byte("magic")) { + break + } } + } else { + var wg sync.WaitGroup + wg.Add(len(tcpConnections)) + for i := range tcpConnections { + go func(dataChan <-chan DataChan, tcpConnection comm.Comm) { + defer wg.Done() + for data := range dataChan { + if data.err != nil { + log.Error(data.err) + return + } + bar.Add(data.bytesRead) + // write data to tcp connection + _, err = tcpConnection.Write(data.b) + if err != nil { + err = errors.Wrap(err, "problem writing message") + log.Error(err) + return + } + if bytes.Equal(data.b, []byte("magic")) { + return + } + + } + }(dataChan, tcpConnections[i]) + } + wg.Wait() } bar.Finish() @@ -331,6 +377,7 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso case 5: transferTime := time.Since(startTransfer) if !bytes.HasPrefix(message, []byte("hash:")) { + log.Debugf("%s", message) continue } c.WriteMessage(websocket.BinaryMessage, fileHash)