From 7251ec651415354caf8b935f8bb42d5267527f39 Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Fri, 10 Jan 2025 10:36:41 -0500 Subject: [PATCH 01/12] working tunnel works better working checkpoint update go mod another checkpoint cleanup fix go mod fix parse refactor minor refactor move tunnel example, final set of changes move log location some fixes doc --- examples/tunnel/README.md | 8 +++ examples/tunnel/tunnel.go | 118 ++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- robot/client/client.go | 60 +++++++++++++++++++ robot/server/server.go | 53 ++++++++++++++++- tunnel/tunnel.go | 100 ++++++++++++++++++++++++++++++++ 7 files changed, 339 insertions(+), 6 deletions(-) create mode 100644 examples/tunnel/README.md create mode 100644 examples/tunnel/tunnel.go create mode 100644 tunnel/tunnel.go diff --git a/examples/tunnel/README.md b/examples/tunnel/README.md new file mode 100644 index 00000000000..56d9bf8d972 --- /dev/null +++ b/examples/tunnel/README.md @@ -0,0 +1,8 @@ +# Tunnel +This example shows how to use the traffic tunneling feature in the viam-server + + +### Running +Run this example with `go run tunnel.go -addr {address of machine} -api-key {api key to use to connect to machine} -api-key-id {api key id to use to connect to machine} -dest {destination address to tunnel to (default 3389)} -src {source address to listen on (default 9090)}` + +API key and API key id can be left blank if the machine is insecure. diff --git a/examples/tunnel/tunnel.go b/examples/tunnel/tunnel.go new file mode 100644 index 00000000000..82d2e0090bc --- /dev/null +++ b/examples/tunnel/tunnel.go @@ -0,0 +1,118 @@ +// main TBD +package main + +import ( + "context" + "flag" + "fmt" + "net" + "strconv" + "sync" + + "go.viam.com/utils/rpc" + + "go.viam.com/rdk/logging" + "go.viam.com/rdk/robot/client" +) + +var ( + ADDRESS = "" + API_KEY = "" + API_KEY_ID = "" + + DEFAULT_SOURCE_PORT = 9090 + DEFAULT_DESTINATION_PORT = 3389 +) + +func main() { + logger := logging.NewDebugLogger("client") + var src int + flag.IntVar(&src, "src", DEFAULT_SOURCE_PORT, "source address to listen on") + + var dest int + flag.IntVar(&dest, "dest", DEFAULT_DESTINATION_PORT, "destination address to tunnel to") + + var addr string + flag.StringVar(&addr, "addr", ADDRESS, "machine name to connect to") + + var apiKey string + flag.StringVar(&apiKey, "api-key", apiKey, "api key to use to connect to machine") + + var apiKeyID string + flag.StringVar(&apiKeyID, "api-key-id", apiKeyID, "api key id to use to connect to machine") + + flag.Parse() + + if addr == "" { + logger.Error("please enter an address with flag --addr") + return + } + + logger.Infow("starting tunnel", "source address", src, "destination address", dest) + ctx := context.Background() + + opts := []client.RobotClientOption{ + client.WithRefreshEvery(0), + client.WithCheckConnectedEvery(0), + client.WithDisableSessions(), + } + + if apiKey != "" && apiKeyID != "" { + opts = append(opts, + client.WithDialOptions(rpc.WithEntityCredentials( + apiKeyID, + rpc.Credentials{ + Type: rpc.CredentialsTypeAPIKey, + Payload: apiKey, + }), + ), + ) + + } else { + opts = append(opts, + client.WithDialOptions( + rpc.WithInsecure(), + rpc.WithDisableDirectGRPC(), + ), + ) + } + machine, err := client.New(ctx, addr, logger, opts...) + if err != nil { + logger.Error(err) + return + } + + defer machine.Close(context.Background()) + TunnelTraffic(ctx, machine, src, dest, logger) +} + +func TunnelTraffic(ctx context.Context, machine *client.RobotClient, src, dest int, logger logging.Logger) { + // create listener + li, err := net.Listen("tcp", net.JoinHostPort("localhost", strconv.Itoa(src))) + if err != nil { + logger.CErrorw(ctx, "failed to create listener", "err", err) + return + } + defer li.Close() + + var wg sync.WaitGroup + for { + if ctx.Err() != nil { + break + } + conn, err := li.Accept() + if err != nil { + fmt.Printf("failed to accept conn: %v\n", err) + } + wg.Add(1) + go func() { + defer wg.Done() + // call tunnel once per connection + if err := machine.Tunnel(ctx, conn, dest); err != nil { + logger.CError(ctx, err) + } + conn.Close() + }() + } + wg.Wait() +} diff --git a/go.mod b/go.mod index 21a493f6602..c3e4a2b97a1 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - go.viam.com/api v0.1.383 + go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186 go.viam.com/test v1.2.4 go.viam.com/utils v0.1.126 goji.io v2.0.2+incompatible diff --git a/go.sum b/go.sum index bed3d572edb..556d9b1d3e5 100644 --- a/go.sum +++ b/go.sum @@ -1513,8 +1513,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.383 h1:HE9EpUWoyDBYJLTVLo29f66oCjzH31V1YJ02tlzCtyo= -go.viam.com/api v0.1.383/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= +go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186 h1:aGdngrg62FRgHRhb5d2ONCFVfNHQ1aQ081tbA+cXy/o= +go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.1.126 h1:ecFlzln5/u1NqzVMOVxwgwbkg4dDWvQmcCS2fMg0ZNU= diff --git a/robot/client/client.go b/robot/client/client.go index 2dc5941e5a6..30be6778e4a 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "net" "strings" "sync" "sync/atomic" @@ -47,6 +48,7 @@ import ( "go.viam.com/rdk/robot/packages" "go.viam.com/rdk/session" "go.viam.com/rdk/spatialmath" + "go.viam.com/rdk/tunnel" "go.viam.com/rdk/utils/contextutils" ) @@ -1190,6 +1192,64 @@ func (rc *RobotClient) Version(ctx context.Context) (robot.VersionResponse, erro return mVersion, nil } +// Tunnel tunnels data to/from the read writer from/to the destination port on the server. This +// function will close the connection passed in as part of cleanup. +func (rc *RobotClient) Tunnel(ctx context.Context, conn net.Conn, dest int) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + client, err := rc.client.Tunnel(ctx) + if err != nil { + return err + } + + if err := client.Send(&pb.TunnelRequest{ + DestinationPort: uint32(dest), + }); err != nil { + return err + } + rc.Logger().CDebugw(ctx, "creating tunnel to server", "port", dest) + var ( + wg sync.WaitGroup + readerSenderErr error + ) + wg.Add(1) + utils.PanicCapturingGo(func() { + defer func() { + // we communicate an end to the stream by calling CloseSend(), + // which is why we don't need an EOF field on the request message. + client.CloseSend() + // By cancelling this ctx, we will close the client, meaning client.Recv() in the RecvWriterLoop will exit + // and return an error. + // + // NOTE(cheukt): This will cause DEBUG messages from WebRTC stating `no stream for id; discarding` + // to show up because the handler will have exited before we receive the last messages from the server. + // This is not an issue and is expected. + // + // TODO: Don't log `no stream for id; discarding` if client is canceled. + cancel() + wg.Done() + }() + sendFunc := func(data []byte) error { return client.Send(&pb.TunnelRequest{Data: data}) } + readerSenderErr = tunnel.ReaderSenderLoop(ctx, conn, sendFunc, rc.logger) + }) + + recvFunc := func() ([]byte, error) { + resp, err := client.Recv() + if err != nil { + return nil, err + } + return resp.Data, nil + } + recvWriterErr := tunnel.RecvWriterLoop(ctx, conn, recvFunc, rc.logger) + cancel() + // We do close the connection to unblock the reader/sender loop, which is not clean + // but there isn't a cleaner way to exit from the reader/sender loop. + err = conn.Close() + + wg.Wait() + return errors.Join(err, readerSenderErr, recvWriterErr) +} + func unaryClientInterceptor() googlegrpc.UnaryClientInterceptor { return func( ctx context.Context, diff --git a/robot/server/server.go b/robot/server/server.go index 93b02e1b827..f57c748c806 100644 --- a/robot/server/server.go +++ b/robot/server/server.go @@ -6,12 +6,15 @@ package server import ( "bytes" "context" + "errors" "fmt" + "net" + "strconv" "strings" + "sync" "time" "github.com/google/uuid" - "github.com/pkg/errors" "go.uber.org/zap/zapcore" commonpb "go.viam.com/api/common/v1" pb "go.viam.com/api/robot/v1" @@ -30,6 +33,7 @@ import ( "go.viam.com/rdk/resource" "go.viam.com/rdk/robot" "go.viam.com/rdk/session" + "go.viam.com/rdk/tunnel" ) // logTSKey is the key used in conjunction with the timestamp of logs received @@ -56,6 +60,49 @@ func New(robot robot.Robot) pb.RobotServiceServer { func (s *Server) Close() { } +// Tunnel tunnels traffic to/from the client from/to a specified port on the server. +func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error { + req, err := srv.Recv() + if err != nil { + return fmt.Errorf("failed to receive first message from stream: %w", err) + } + + dest := strconv.Itoa(int(req.DestinationPort)) + s.robot.Logger().CDebugw(srv.Context(), "dialing to destination port", "port", dest) + + dialTimeout := 10 * time.Second + conn, err := net.DialTimeout("tcp", net.JoinHostPort("localhost", dest), dialTimeout) + if err != nil { + return fmt.Errorf("failed to dial to destination port %v: %w", dest, err) + } + s.robot.Logger().CDebugw(srv.Context(), "successfully dialed to destination port, creating tunnel", "port", dest) + + var ( + wg sync.WaitGroup + readerSenderErr error + ) + wg.Add(1) + utils.PanicCapturingGo(func() { + defer func() { + wg.Done() + }() + sendFunc := func(data []byte) error { return srv.Send(&pb.TunnelResponse{Data: data}) } + readerSenderErr = tunnel.ReaderSenderLoop(srv.Context(), conn, sendFunc, s.robot.Logger()) + }) + recvFunc := func() ([]byte, error) { + req, err := srv.Recv() + if err != nil { + return nil, err + } + return req.Data, nil + } + recvWriterErr := tunnel.RecvWriterLoop(srv.Context(), conn, recvFunc, s.robot.Logger()) + // close the connection to unblock the read + conn.Close() + wg.Wait() + return errors.Join(readerSenderErr, recvWriterErr) +} + // GetOperations lists all running operations. func (s *Server) GetOperations(ctx context.Context, req *pb.GetOperationsRequest) (*pb.GetOperationsResponse, error) { me := operation.Get(ctx) @@ -201,7 +248,7 @@ func (s *Server) DiscoverComponents(ctx context.Context, req *pb.DiscoverCompone for _, discovery := range discoveries { pbResults, err := vprotoutils.StructToStructPb(discovery.Results) if err != nil { - return nil, errors.Wrapf(err, "unable to construct a structpb.Struct from discovery for %q", discovery.Query) + return nil, fmt.Errorf("unable to construct a structpb.Struct from discovery for %q: %w", discovery.Query, err) } extra, err := structpb.NewStruct(discovery.Query.Extra) if err != nil { @@ -386,7 +433,7 @@ func (s *Server) Log(ctx context.Context, req *pb.LogRequest) (*pb.LogResponse, for _, fieldP := range log.Fields { field, err := logging.FieldFromProto(fieldP) if err != nil { - return nil, errors.Wrap(err, "error converting LogRequest log field from proto") + return nil, fmt.Errorf("error converting LogRequest log field from proto: %w", err) } fields = append(fields, field) } diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go new file mode 100644 index 00000000000..7d7d6b8630d --- /dev/null +++ b/tunnel/tunnel.go @@ -0,0 +1,100 @@ +// Package tunnel contains helpers for a traffic tunneling implementation +package tunnel + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + + "go.viam.com/rdk/logging" +) + +// ReaderSenderLoop implements a loop that reads bytes from the reader passed in and sends those bytes +// using sendFunc. The loop will exit for any error received or if the context errors. +func ReaderSenderLoop( + ctx context.Context, + r io.Reader, + sendFunc func(buf []byte) error, + logger logging.Logger, +) (retErr error) { + var err error + defer func() { + if err != nil { + // EOF indicates that the connection passed in is not going to receive any more data + // and is not expecting any more data to be written to it. We should exit from this + // function and clean up this connection. + // + // This is expected and does not indicate an error, so filter it out. + // + // TODO: filter out use of closed network connection error + // if we already explicitly closed the connection already + if errors.Is(err, io.EOF) { + logger.CDebug(ctx, "received EOF from local connection") + } else { + retErr = fmt.Errorf("reader/sender loop err: %w", err) + } + } + logger.CDebug(ctx, "exiting reader/sender loop") + }() + + for { + if ctx.Err() != nil { + return + } + // copying io.Copy's default buffer size + size := 32 * 1024 + buf := make([]byte, size) + nr, err := r.Read(buf) + if err != nil { + return + } + if nr == 0 { + continue + } + if err = sendFunc(buf[:nr]); err != nil { + return + } + } +} + +// RecvWriterLoop implements a loop that receives bytes using recvFunc and writes those bytes +// to the writer. The loop will exit for any error received or if the context errors. +func RecvWriterLoop( + ctx context.Context, + w io.Writer, + recvFunc func() ([]byte, error), + logger logging.Logger, +) (retErr error) { + var err error + defer func() { + if err != nil { + // EOF indicates that the server is not going to receive any more data + // and is not expecting any more data to be written to it. We should exit from this + // function and clean up the connection. + // + // This is expected and does not indicate an error, so filter it out. + if errors.Is(err, io.EOF) { + logger.CDebug(ctx, "received EOF from remote connection") + } else { + retErr = fmt.Errorf("receiver/writer loop err: %w", err) + } + } + logger.CDebug(ctx, "exiting receiver/writer loop") + }() + for { + if ctx.Err() != nil { + return + } + data, err := recvFunc() + if err != nil { + return + } + in := bytes.NewReader(data) + _, err = io.Copy(w, in) + if err != nil { + return + } + } +} From bb23f012fcad20d8cf9f99251c18732ba5f4c5c1 Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Fri, 24 Jan 2025 17:19:40 -0500 Subject: [PATCH 02/12] return error from closing conn --- robot/server/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/robot/server/server.go b/robot/server/server.go index f57c748c806..2e8658e4e65 100644 --- a/robot/server/server.go +++ b/robot/server/server.go @@ -98,9 +98,9 @@ func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error { } recvWriterErr := tunnel.RecvWriterLoop(srv.Context(), conn, recvFunc, s.robot.Logger()) // close the connection to unblock the read - conn.Close() + err = conn.Close() wg.Wait() - return errors.Join(readerSenderErr, recvWriterErr) + return errors.Join(err, readerSenderErr, recvWriterErr) } // GetOperations lists all running operations. From aec741c06b42b09e5a4500426f12e0f2d4733ca5 Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Fri, 24 Jan 2025 17:56:21 -0500 Subject: [PATCH 03/12] lint --- examples/tunnel/tunnel.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/examples/tunnel/tunnel.go b/examples/tunnel/tunnel.go index 82d2e0090bc..205f74579a2 100644 --- a/examples/tunnel/tunnel.go +++ b/examples/tunnel/tunnel.go @@ -1,10 +1,9 @@ -// main TBD +// main is an example of tunneling traffic over grpc package main import ( "context" "flag" - "fmt" "net" "strconv" "sync" @@ -16,24 +15,22 @@ import ( ) var ( - ADDRESS = "" - API_KEY = "" - API_KEY_ID = "" + address = "" - DEFAULT_SOURCE_PORT = 9090 - DEFAULT_DESTINATION_PORT = 3389 + defaultSourcePort = 9090 + defaultDestinationPort = 3389 ) func main() { logger := logging.NewDebugLogger("client") var src int - flag.IntVar(&src, "src", DEFAULT_SOURCE_PORT, "source address to listen on") + flag.IntVar(&src, "src", defaultSourcePort, "source address to listen on") var dest int - flag.IntVar(&dest, "dest", DEFAULT_DESTINATION_PORT, "destination address to tunnel to") + flag.IntVar(&dest, "dest", defaultDestinationPort, "destination address to tunnel to") var addr string - flag.StringVar(&addr, "addr", ADDRESS, "machine name to connect to") + flag.StringVar(&addr, "addr", address, "machine name to connect to") var apiKey string flag.StringVar(&apiKey, "api-key", apiKey, "api key to use to connect to machine") @@ -67,7 +64,6 @@ func main() { }), ), ) - } else { opts = append(opts, client.WithDialOptions( @@ -83,10 +79,10 @@ func main() { } defer machine.Close(context.Background()) - TunnelTraffic(ctx, machine, src, dest, logger) + tunnelTraffic(ctx, machine, src, dest, logger) } -func TunnelTraffic(ctx context.Context, machine *client.RobotClient, src, dest int, logger logging.Logger) { +func tunnelTraffic(ctx context.Context, machine *client.RobotClient, src, dest int, logger logging.Logger) { // create listener li, err := net.Listen("tcp", net.JoinHostPort("localhost", strconv.Itoa(src))) if err != nil { @@ -102,7 +98,7 @@ func TunnelTraffic(ctx context.Context, machine *client.RobotClient, src, dest i } conn, err := li.Accept() if err != nil { - fmt.Printf("failed to accept conn: %v\n", err) + logger.CErrorw(ctx, "failed to accept conn: %v\n", err) } wg.Add(1) go func() { @@ -111,7 +107,9 @@ func TunnelTraffic(ctx context.Context, machine *client.RobotClient, src, dest i if err := machine.Tunnel(ctx, conn, dest); err != nil { logger.CError(ctx, err) } - conn.Close() + if err := conn.Close(); err != nil { + logger.CError(ctx, err) + } }() } wg.Wait() From 192d4072d96ed5b8ba3fa6e926209a48d488e488 Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Sat, 25 Jan 2025 11:39:19 -0500 Subject: [PATCH 04/12] lint --- examples/tunnel/tunnel.go | 12 ++++++++++-- robot/client/client.go | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/examples/tunnel/tunnel.go b/examples/tunnel/tunnel.go index 205f74579a2..dfb70dd58a6 100644 --- a/examples/tunnel/tunnel.go +++ b/examples/tunnel/tunnel.go @@ -78,7 +78,11 @@ func main() { return } - defer machine.Close(context.Background()) + defer func() { + if err := machine.Close(context.Background()); err != nil { + logger.CErrorw(ctx, "error closing machine", "err", err) + } + }() tunnelTraffic(ctx, machine, src, dest, logger) } @@ -89,7 +93,11 @@ func tunnelTraffic(ctx context.Context, machine *client.RobotClient, src, dest i logger.CErrorw(ctx, "failed to create listener", "err", err) return } - defer li.Close() + defer func() { + if err := li.Close(); err != nil { + logger.CErrorw(ctx, "error closing listener", "err", err) + } + }() var wg sync.WaitGroup for { diff --git a/robot/client/client.go b/robot/client/client.go index 30be6778e4a..45dcc49a78c 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -1217,7 +1217,7 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn net.Conn, dest int) erro defer func() { // we communicate an end to the stream by calling CloseSend(), // which is why we don't need an EOF field on the request message. - client.CloseSend() + readerSenderErr = errors.Join(readerSenderErr, client.CloseSend()) // By cancelling this ctx, we will close the client, meaning client.Recv() in the RecvWriterLoop will exit // and return an error. // From cf97e89e602dac65c0b2f84624838cd80f3f0599 Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Mon, 27 Jan 2025 12:03:36 -0500 Subject: [PATCH 05/12] nits --- robot/client/client.go | 5 ++--- robot/server/server.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/robot/client/client.go b/robot/client/client.go index 45dcc49a78c..2c3d6608e76 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "net" "strings" "sync" "sync/atomic" @@ -1194,7 +1193,7 @@ func (rc *RobotClient) Version(ctx context.Context) (robot.VersionResponse, erro // Tunnel tunnels data to/from the read writer from/to the destination port on the server. This // function will close the connection passed in as part of cleanup. -func (rc *RobotClient) Tunnel(ctx context.Context, conn net.Conn, dest int) error { +func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest int) error { ctx, cancel := context.WithCancel(ctx) defer cancel() client, err := rc.client.Tunnel(ctx) @@ -1207,7 +1206,7 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn net.Conn, dest int) erro }); err != nil { return err } - rc.Logger().CDebugw(ctx, "creating tunnel to server", "port", dest) + rc.Logger().CInfow(ctx, "creating tunnel to server", "port", dest) var ( wg sync.WaitGroup readerSenderErr error diff --git a/robot/server/server.go b/robot/server/server.go index 2e8658e4e65..89343af85cd 100644 --- a/robot/server/server.go +++ b/robot/server/server.go @@ -75,7 +75,7 @@ func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error { if err != nil { return fmt.Errorf("failed to dial to destination port %v: %w", dest, err) } - s.robot.Logger().CDebugw(srv.Context(), "successfully dialed to destination port, creating tunnel", "port", dest) + s.robot.Logger().CInfow(srv.Context(), "successfully dialed to destination port, creating tunnel", "port", dest) var ( wg sync.WaitGroup From cdd26d486af95b334ce7fbeafcfc01fef790ed44 Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Mon, 27 Jan 2025 14:35:48 -0500 Subject: [PATCH 06/12] minor --- examples/tunnel/tunnel.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/tunnel/tunnel.go b/examples/tunnel/tunnel.go index dfb70dd58a6..e25f5881a76 100644 --- a/examples/tunnel/tunnel.go +++ b/examples/tunnel/tunnel.go @@ -111,13 +111,11 @@ func tunnelTraffic(ctx context.Context, machine *client.RobotClient, src, dest i wg.Add(1) go func() { defer wg.Done() - // call tunnel once per connection + // call tunnel once per connection, the connection passed in will be closed + // by Tunnel. if err := machine.Tunnel(ctx, conn, dest); err != nil { logger.CError(ctx, err) } - if err := conn.Close(); err != nil { - logger.CError(ctx, err) - } }() } wg.Wait() From 94dbdbb02f718f35ab0e46941dbd877fc7e1573f Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Tue, 28 Jan 2025 15:02:45 -0500 Subject: [PATCH 07/12] pr feedback --- robot/client/client.go | 42 +++++++++++++++------ robot/server/server.go | 11 +++++- tunnel/tunnel.go | 84 ++++++++++++++++++++++++++---------------- 3 files changed, 92 insertions(+), 45 deletions(-) diff --git a/robot/client/client.go b/robot/client/client.go index 83ab633f545..ce372f3dbbf 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -1236,26 +1236,34 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest var ( wg sync.WaitGroup readerSenderErr error + + timerMu sync.Mutex + timer *time.Timer ) + connClosed := make(chan struct{}) + rsDone := make(chan struct{}) wg.Add(1) utils.PanicCapturingGo(func() { defer func() { - // we communicate an end to the stream by calling CloseSend(), - // which is why we don't need an EOF field on the request message. + // We communicate an end to the stream by calling CloseSend(). + // Close the channel first so that network errors can be filtered + // and prevented in the RecvWriterLoop. + close(rsDone) readerSenderErr = errors.Join(readerSenderErr, client.CloseSend()) - // By cancelling this ctx, we will close the client, meaning client.Recv() in the RecvWriterLoop will exit - // and return an error. + + // Schedule a task to cancel the context if we do not exit out of the recvWriterLoop within 5 seconds. + // This will close the client, meaning client.Recv() in the RecvWriterLoop will exit and return an error. // - // NOTE(cheukt): This will cause DEBUG messages from WebRTC stating `no stream for id; discarding` + // NOTE(cheukt): This may cause DEBUG messages from WebRTC stating `no stream for id; discarding` // to show up because the handler will have exited before we receive the last messages from the server. // This is not an issue and is expected. - // - // TODO: Don't log `no stream for id; discarding` if client is canceled. - cancel() + timerMu.Lock() + timer = time.AfterFunc(5*time.Second, cancel) + timerMu.Unlock() wg.Done() }() sendFunc := func(data []byte) error { return client.Send(&pb.TunnelRequest{Data: data}) } - readerSenderErr = tunnel.ReaderSenderLoop(ctx, conn, sendFunc, rc.logger) + readerSenderErr = tunnel.ReaderSenderLoop(ctx, conn, sendFunc, connClosed, rc.logger.WithFields("loop", "reader/sender")) }) recvFunc := func() ([]byte, error) { @@ -1265,13 +1273,23 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest } return resp.Data, nil } - recvWriterErr := tunnel.RecvWriterLoop(ctx, conn, recvFunc, rc.logger) - cancel() - // We do close the connection to unblock the reader/sender loop, which is not clean + recvWriterErr := tunnel.RecvWriterLoop(ctx, recvFunc, conn, rsDone, rc.logger.WithFields("loop", "recv/writer")) + timerMu.Lock() + // cancel the timer if we've successfully returned from the RecvWriterLoop + if timer != nil { + timer.Stop() + } + timerMu.Unlock() + + // We close the connection to unblock the reader/sender loop, which is not clean // but there isn't a cleaner way to exit from the reader/sender loop. + // Close the channel first so that network errors can be filtered + // and prevented in the ReaderSenderLoop. + close(connClosed) err = conn.Close() wg.Wait() + rc.Logger().CInfow(ctx, "tunnel to server closed", "port", dest) return errors.Join(err, readerSenderErr, recvWriterErr) } diff --git a/robot/server/server.go b/robot/server/server.go index 3f7eabbc2ab..6d7b06da138 100644 --- a/robot/server/server.go +++ b/robot/server/server.go @@ -81,13 +81,16 @@ func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error { wg sync.WaitGroup readerSenderErr error ) + connClosed := make(chan struct{}) + rsDone := make(chan struct{}) wg.Add(1) utils.PanicCapturingGo(func() { defer func() { + close(rsDone) wg.Done() }() sendFunc := func(data []byte) error { return srv.Send(&pb.TunnelResponse{Data: data}) } - readerSenderErr = tunnel.ReaderSenderLoop(srv.Context(), conn, sendFunc, s.robot.Logger()) + readerSenderErr = tunnel.ReaderSenderLoop(srv.Context(), conn, sendFunc, connClosed, s.robot.Logger().WithFields("loop", "reader/sender")) }) recvFunc := func() ([]byte, error) { req, err := srv.Recv() @@ -96,10 +99,14 @@ func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error { } return req.Data, nil } - recvWriterErr := tunnel.RecvWriterLoop(srv.Context(), conn, recvFunc, s.robot.Logger()) + recvWriterErr := tunnel.RecvWriterLoop(srv.Context(), recvFunc, conn, rsDone, s.robot.Logger().WithFields("loop", "recv/writer")) // close the connection to unblock the read + // close the channel first so that network errors can be filtered + // and prevented in the ReaderSenderLoop. + close(connClosed) err = conn.Close() wg.Wait() + s.robot.Logger().CInfow(srv.Context(), "tunnel to client closed", "port", dest) return errors.Join(err, readerSenderErr, recvWriterErr) } diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 7d7d6b8630d..948326c5580 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -7,46 +7,64 @@ import ( "errors" "fmt" "io" + "net" "go.viam.com/rdk/logging" ) +func filterError(ctx context.Context, err error, closeChan <-chan struct{}, logger logging.Logger) error { + // if connection is expected to be closed, filter out "use of closed network connection" errors + select { + case <-closeChan: + if errors.Is(err, net.ErrClosed) { + logger.CDebugw(ctx, "expected error received", "err", err) + return nil + } + default: + } + + // EOF indicates that the connection passed in is not going to receive any more data + // and is not expecting any more data to be written to it. + // + // This is expected and does not indicate an error, so filter it out. + if errors.Is(err, io.EOF) { + logger.CDebugw(ctx, "expected EOF received") + return nil + } + return err +} + // ReaderSenderLoop implements a loop that reads bytes from the reader passed in and sends those bytes // using sendFunc. The loop will exit for any error received or if the context errors. func ReaderSenderLoop( ctx context.Context, r io.Reader, sendFunc func(buf []byte) error, + connClosed <-chan struct{}, logger logging.Logger, ) (retErr error) { var err error defer func() { - if err != nil { - // EOF indicates that the connection passed in is not going to receive any more data - // and is not expecting any more data to be written to it. We should exit from this - // function and clean up this connection. - // - // This is expected and does not indicate an error, so filter it out. - // - // TODO: filter out use of closed network connection error - // if we already explicitly closed the connection already - if errors.Is(err, io.EOF) { - logger.CDebug(ctx, "received EOF from local connection") - } else { - retErr = fmt.Errorf("reader/sender loop err: %w", err) - } + retErr = filterError(ctx, err, connClosed, logger) + if retErr != nil { + retErr = fmt.Errorf("reader/sender loop err: %w", err) } logger.CDebug(ctx, "exiting reader/sender loop") }() for { - if ctx.Err() != nil { + select { + case <-ctx.Done(): return + case <-connClosed: + return + default: } - // copying io.Copy's default buffer size + // copying io.Copy's default buffer size (32kb) size := 32 * 1024 buf := make([]byte, size) - nr, err := r.Read(buf) + var nr int + nr, err = r.Read(buf) if err != nil { return } @@ -63,23 +81,16 @@ func ReaderSenderLoop( // to the writer. The loop will exit for any error received or if the context errors. func RecvWriterLoop( ctx context.Context, - w io.Writer, recvFunc func() ([]byte, error), + w io.Writer, + rsDone <-chan struct{}, logger logging.Logger, ) (retErr error) { var err error defer func() { - if err != nil { - // EOF indicates that the server is not going to receive any more data - // and is not expecting any more data to be written to it. We should exit from this - // function and clean up the connection. - // - // This is expected and does not indicate an error, so filter it out. - if errors.Is(err, io.EOF) { - logger.CDebug(ctx, "received EOF from remote connection") - } else { - retErr = fmt.Errorf("receiver/writer loop err: %w", err) - } + retErr = filterError(ctx, err, rsDone, logger) + if retErr != nil { + retErr = fmt.Errorf("receiver/writer loop err: %w", err) } logger.CDebug(ctx, "exiting receiver/writer loop") }() @@ -87,14 +98,25 @@ func RecvWriterLoop( if ctx.Err() != nil { return } - data, err := recvFunc() + var data []byte + data, err = recvFunc() if err != nil { return } + // For bidi streaming, Recv should be called on the client/server until it errors. + // See [grpc.NewStream] for related docs. + // + // If the reader/sender loop is done, stop copying bytes as that means the connection is no longer accepting bytes. + select { + case <-rsDone: + continue + default: + } in := bytes.NewReader(data) _, err = io.Copy(w, in) if err != nil { - return + logger.CDebugw(ctx, "error while copying bytes", "err", err) + continue } } } From 66b982d5eab922982fc46e935422de0284d45644 Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Tue, 28 Jan 2025 15:44:12 -0500 Subject: [PATCH 08/12] add comment --- robot/client/client.go | 1 + robot/server/server.go | 1 + 2 files changed, 2 insertions(+) diff --git a/robot/client/client.go b/robot/client/client.go index ce372f3dbbf..b4c456f175f 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -1262,6 +1262,7 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest timerMu.Unlock() wg.Done() }() + // a max of 32kb will be sent per message (based on io.Copy's default buffer size) sendFunc := func(data []byte) error { return client.Send(&pb.TunnelRequest{Data: data}) } readerSenderErr = tunnel.ReaderSenderLoop(ctx, conn, sendFunc, connClosed, rc.logger.WithFields("loop", "reader/sender")) }) diff --git a/robot/server/server.go b/robot/server/server.go index 6d7b06da138..1188d450684 100644 --- a/robot/server/server.go +++ b/robot/server/server.go @@ -89,6 +89,7 @@ func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error { close(rsDone) wg.Done() }() + // a max of 32kb will be sent per message (based on io.Copy's default buffer size) sendFunc := func(data []byte) error { return srv.Send(&pb.TunnelResponse{Data: data}) } readerSenderErr = tunnel.ReaderSenderLoop(srv.Context(), conn, sendFunc, connClosed, s.robot.Logger().WithFields("loop", "reader/sender")) }) From 53ff83a2f63f3dbad1112bc8172e8e29354d006b Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Tue, 28 Jan 2025 17:25:58 -0500 Subject: [PATCH 09/12] add tests --- tunnel/tunnel.go | 21 +-- tunnel/tunnel_test.go | 276 +++++++++++++++++++++++++++++++++++++ tunnel/verify_main_test.go | 12 ++ 3 files changed, 299 insertions(+), 10 deletions(-) create mode 100644 tunnel/tunnel_test.go create mode 100644 tunnel/verify_main_test.go diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 948326c5580..6410b198e40 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -43,11 +43,11 @@ func ReaderSenderLoop( connClosed <-chan struct{}, logger logging.Logger, ) (retErr error) { - var err error + var err, sendErr error defer func() { - retErr = filterError(ctx, err, connClosed, logger) + retErr = filterError(ctx, errors.Join(err, sendErr), connClosed, logger) if retErr != nil { - retErr = fmt.Errorf("reader/sender loop err: %w", err) + retErr = fmt.Errorf("reader/sender loop err: %w", retErr) } logger.CDebug(ctx, "exiting reader/sender loop") }() @@ -65,13 +65,14 @@ func ReaderSenderLoop( buf := make([]byte, size) var nr int nr, err = r.Read(buf) - if err != nil { - return + // based on [io.Reader], callers should always process the n > 0 bytes returned before + // considering the error + if nr > 0 { + if sendErr = sendFunc(buf[:nr]); sendErr != nil { + return + } } - if nr == 0 { - continue - } - if err = sendFunc(buf[:nr]); err != nil { + if err != nil { return } } @@ -90,7 +91,7 @@ func RecvWriterLoop( defer func() { retErr = filterError(ctx, err, rsDone, logger) if retErr != nil { - retErr = fmt.Errorf("receiver/writer loop err: %w", err) + retErr = fmt.Errorf("receiver/writer loop err: %w", retErr) } logger.CDebug(ctx, "exiting receiver/writer loop") }() diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go new file mode 100644 index 00000000000..9112048178d --- /dev/null +++ b/tunnel/tunnel_test.go @@ -0,0 +1,276 @@ +package tunnel_test + +import ( + "context" + "errors" + "io" + "testing" + + "go.viam.com/rdk/logging" + "go.viam.com/rdk/testutils/inject" + "go.viam.com/rdk/tunnel" + "go.viam.com/test" +) + +func TestReaderSenderLoop(t *testing.T) { + ctx := context.Background() + logger := logging.NewTestLogger(t) + + t.Run("closed channel", func(t *testing.T) { + connClosed := make(chan struct{}) + close(connClosed) + + readCt := 0 + injectReader := &inject.ReadWriteCloser{ + ReadFunc: func(p []byte) (n int, err error) { + readCt++ + return 0, io.EOF + }, + } + sendCt := 0 + sendFunc := func(buf []byte) error { sendCt++; return nil } + err := tunnel.ReaderSenderLoop(ctx, injectReader, sendFunc, connClosed, logger) + test.That(t, err, test.ShouldBeNil) + test.That(t, readCt, test.ShouldEqual, 0) + test.That(t, sendCt, test.ShouldEqual, 0) + }) + + t.Run("one message", func(t *testing.T) { + connClosed := make(chan struct{}) + defer close(connClosed) + + readCt := 0 + injectReader := &inject.ReadWriteCloser{ + ReadFunc: func(p []byte) (n int, err error) { + readCt++ + p[0] = 1 + p[1] = 2 + return 2, io.EOF + }, + } + sendCt := 0 + sendFunc := func(buf []byte) error { + sendCt++ + test.That(t, buf, test.ShouldResemble, []byte{1, 2}) + return nil + } + err := tunnel.ReaderSenderLoop(ctx, injectReader, sendFunc, connClosed, logger) + test.That(t, err, test.ShouldBeNil) + test.That(t, readCt, test.ShouldEqual, 1) + test.That(t, sendCt, test.ShouldEqual, 1) + }) + + t.Run("two messages", func(t *testing.T) { + connClosed := make(chan struct{}) + defer close(connClosed) + + readCt := 0 + injectReader := &inject.ReadWriteCloser{ + ReadFunc: func(p []byte) (n int, err error) { + readCt++ + if readCt == 1 { + p[0] = 1 + p[1] = 2 + return 2, nil + } + return 0, io.EOF + }, + } + sendCt := 0 + sendFunc := func(buf []byte) error { + sendCt++ + test.That(t, buf, test.ShouldResemble, []byte{1, 2}) + return nil + } + err := tunnel.ReaderSenderLoop(ctx, injectReader, sendFunc, connClosed, logger) + test.That(t, err, test.ShouldBeNil) + test.That(t, readCt, test.ShouldEqual, 2) + test.That(t, sendCt, test.ShouldEqual, 1) + }) + + t.Run("one message with read err", func(t *testing.T) { + connClosed := make(chan struct{}) + defer close(connClosed) + + newErr := errors.New("oops") + readCt := 0 + injectReader := &inject.ReadWriteCloser{ + ReadFunc: func(p []byte) (n int, err error) { + readCt++ + p[0] = 1 + p[1] = 2 + return 2, newErr + }, + } + sendCt := 0 + sendFunc := func(buf []byte) error { + sendCt++ + test.That(t, buf, test.ShouldResemble, []byte{1, 2}) + return nil + } + err := tunnel.ReaderSenderLoop(ctx, injectReader, sendFunc, connClosed, logger) + test.That(t, errors.Is(err, newErr), test.ShouldBeTrue) + test.That(t, readCt, test.ShouldEqual, 1) + test.That(t, sendCt, test.ShouldEqual, 1) + }) + + t.Run("one message with send err", func(t *testing.T) { + connClosed := make(chan struct{}) + defer close(connClosed) + + newErr := errors.New("oops") + readCt := 0 + injectReader := &inject.ReadWriteCloser{ + ReadFunc: func(p []byte) (n int, err error) { + readCt++ + p[0] = 1 + p[1] = 2 + return 2, nil + }, + } + sendCt := 0 + sendFunc := func(buf []byte) error { + sendCt++ + test.That(t, buf, test.ShouldResemble, []byte{1, 2}) + return newErr + } + err := tunnel.ReaderSenderLoop(ctx, injectReader, sendFunc, connClosed, logger) + test.That(t, errors.Is(err, newErr), test.ShouldBeTrue) + test.That(t, readCt, test.ShouldEqual, 1) + test.That(t, sendCt, test.ShouldEqual, 1) + }) +} + +func TestRecvWriterLoop(t *testing.T) { + ctx := context.Background() + logger := logging.NewTestLogger(t) + + t.Run("cancelled context", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + rsDone := make(chan struct{}) + defer close(rsDone) + + recvCt := 0 + recvFunc := func() ([]byte, error) { + recvCt++ + if recvCt == 1 { + return []byte{1, 2}, nil + } + return nil, io.EOF + } + writeCt := 0 + injectWriter := &inject.ReadWriteCloser{ + WriteFunc: func(p []byte) (n int, err error) { + writeCt++ + return 0, nil + }, + } + err := tunnel.RecvWriterLoop(ctx, recvFunc, injectWriter, rsDone, logger) + test.That(t, err, test.ShouldBeNil) + test.That(t, recvCt, test.ShouldEqual, 0) + test.That(t, writeCt, test.ShouldEqual, 0) + }) + + t.Run("closed channel", func(t *testing.T) { + rsDone := make(chan struct{}) + close(rsDone) + + recvCt := 0 + recvFunc := func() ([]byte, error) { + recvCt++ + if recvCt == 1 { + return []byte{1, 2}, nil + } + return nil, io.EOF + } + writeCt := 0 + injectWriter := &inject.ReadWriteCloser{ + WriteFunc: func(p []byte) (n int, err error) { + writeCt++ + return 0, nil + }, + } + err := tunnel.RecvWriterLoop(ctx, recvFunc, injectWriter, rsDone, logger) + test.That(t, err, test.ShouldBeNil) + test.That(t, recvCt, test.ShouldEqual, 2) + test.That(t, writeCt, test.ShouldEqual, 0) + }) + + t.Run("one message", func(t *testing.T) { + rsDone := make(chan struct{}) + defer close(rsDone) + + recvCt := 0 + recvFunc := func() ([]byte, error) { + recvCt++ + if recvCt == 1 { + return []byte{1, 2}, nil + } + return nil, io.EOF + } + writeCt := 0 + injectWriter := &inject.ReadWriteCloser{ + WriteFunc: func(p []byte) (n int, err error) { + writeCt++ + return 0, nil + }, + } + err := tunnel.RecvWriterLoop(ctx, recvFunc, injectWriter, rsDone, logger) + test.That(t, err, test.ShouldBeNil) + // there is a second call to recvFunc because recvFunc should be called until it errors + test.That(t, recvCt, test.ShouldEqual, 2) + test.That(t, writeCt, test.ShouldEqual, 1) + }) + + t.Run("one message with recv err", func(t *testing.T) { + rsDone := make(chan struct{}) + defer close(rsDone) + + newErr := errors.New("oops") + recvCt := 0 + recvFunc := func() ([]byte, error) { + recvCt++ + return nil, newErr + } + writeCt := 0 + injectWriter := &inject.ReadWriteCloser{ + WriteFunc: func(p []byte) (n int, err error) { + writeCt++ + return 0, nil + }, + } + err := tunnel.RecvWriterLoop(ctx, recvFunc, injectWriter, rsDone, logger) + test.That(t, errors.Is(err, newErr), test.ShouldBeTrue) + test.That(t, recvCt, test.ShouldEqual, 1) + test.That(t, writeCt, test.ShouldEqual, 0) + }) + + t.Run("two messages with write err", func(t *testing.T) { + rsDone := make(chan struct{}) + defer close(rsDone) + + newErr := errors.New("oops") + recvCt := 0 + recvFunc := func() ([]byte, error) { + recvCt++ + if recvCt < 3 { + return []byte{1, 2}, nil + } + return nil, io.EOF + } + writeCt := 0 + injectWriter := &inject.ReadWriteCloser{ + WriteFunc: func(p []byte) (n int, err error) { + writeCt++ + return 0, newErr + }, + } + err := tunnel.RecvWriterLoop(ctx, recvFunc, injectWriter, rsDone, logger) + test.That(t, err, test.ShouldBeNil) + test.That(t, recvCt, test.ShouldEqual, 3) + // write errors are ignored, so Write should be called as many times as there + // are actual messages. + test.That(t, writeCt, test.ShouldEqual, 2) + }) +} diff --git a/tunnel/verify_main_test.go b/tunnel/verify_main_test.go new file mode 100644 index 00000000000..b9672bafa56 --- /dev/null +++ b/tunnel/verify_main_test.go @@ -0,0 +1,12 @@ +package tunnel + +import ( + "testing" + + testutilsext "go.viam.com/utils/testutils/ext" +) + +// TestMain is used to control the execution of all tests run within this package (including _test packages). +func TestMain(m *testing.M) { + testutilsext.VerifyTestMain(m) +} From 49e79cdd95267e4998499c623294f4ae6d6d5709 Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Tue, 28 Jan 2025 17:29:18 -0500 Subject: [PATCH 10/12] mod update --- go.mod | 2 +- go.sum | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index ca4fa1cec5f..e3966546630 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186 + go.viam.com/api v0.1.387 go.viam.com/test v1.2.4 go.viam.com/utils v0.1.127 goji.io v2.0.2+incompatible diff --git a/go.sum b/go.sum index 325e74b66f4..85ae77a2fff 100644 --- a/go.sum +++ b/go.sum @@ -1515,6 +1515,10 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186 h1:aGdngrg62FRgHRhb5d2ONCFVfNHQ1aQ081tbA+cXy/o= go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= +go.viam.com/api v0.1.386 h1:3AuCrI7N3LbQVTw2LJweefxuq7+N4/BoVhHf7kRADzw= +go.viam.com/api v0.1.386/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= +go.viam.com/api v0.1.387 h1:nxWF+dO+z2mwTpTkcs8mr8L0ic4ZwcqOAumBtHnvqsc= +go.viam.com/api v0.1.387/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.1.127 h1:Ju7SKelAVzTWNMSJyoomY7svn/HN5+xdFR7gltYePjE= From 9a8c5f57874dfb75a69bb9fc574144927853fb58 Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Tue, 28 Jan 2025 17:31:43 -0500 Subject: [PATCH 11/12] mod tidy --- go.sum | 4 ---- 1 file changed, 4 deletions(-) diff --git a/go.sum b/go.sum index 85ae77a2fff..a98ae73fa7c 100644 --- a/go.sum +++ b/go.sum @@ -1513,10 +1513,6 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186 h1:aGdngrg62FRgHRhb5d2ONCFVfNHQ1aQ081tbA+cXy/o= -go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= -go.viam.com/api v0.1.386 h1:3AuCrI7N3LbQVTw2LJweefxuq7+N4/BoVhHf7kRADzw= -go.viam.com/api v0.1.386/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= go.viam.com/api v0.1.387 h1:nxWF+dO+z2mwTpTkcs8mr8L0ic4ZwcqOAumBtHnvqsc= go.viam.com/api v0.1.387/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= From 7fb3bff161d592e60303e38e9b2f9b380076ae6f Mon Sep 17 00:00:00 2001 From: Cheuk Tse Date: Tue, 28 Jan 2025 17:38:15 -0500 Subject: [PATCH 12/12] lint --- tunnel/tunnel_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index 9112048178d..8975854e5e6 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -6,10 +6,11 @@ import ( "io" "testing" + "go.viam.com/test" + "go.viam.com/rdk/logging" "go.viam.com/rdk/testutils/inject" "go.viam.com/rdk/tunnel" - "go.viam.com/test" ) func TestReaderSenderLoop(t *testing.T) { @@ -88,6 +89,7 @@ func TestReaderSenderLoop(t *testing.T) { test.That(t, sendCt, test.ShouldEqual, 1) }) + //nolint:dupl t.Run("one message with read err", func(t *testing.T) { connClosed := make(chan struct{}) defer close(connClosed) @@ -114,6 +116,7 @@ func TestReaderSenderLoop(t *testing.T) { test.That(t, sendCt, test.ShouldEqual, 1) }) + //nolint:dupl t.Run("one message with send err", func(t *testing.T) { connClosed := make(chan struct{}) defer close(connClosed)