Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-9760 - Tunneling over grpc #4745

Merged
merged 15 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions examples/tunnel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Tunnel
This example shows how to use the traffic tunneling feature in the viam-server


### Running
Run this example with `go run tunnel.go -addr {address of machine} -api-key {api key to use to connect to machine} -api-key-id {api key id to use to connect to machine} -dest {destination address to tunnel to (default 3389)} -src {source address to listen on (default 9090)}`

API key and API key id can be left blank if the machine is insecure.
116 changes: 116 additions & 0 deletions examples/tunnel/tunnel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// main is an example of tunneling traffic over grpc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to have this example lying around once we implement the CLI version? Or, do we expect users to write their own programmatic Golang code creating their own tunnels?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I'll remove it during the CLI task. In theory, users could write a tunneling program using a passed in connection/rwcloser, but the CLI code should suffice as an example

package main

import (
"context"
"flag"
"net"
"strconv"
"sync"

"go.viam.com/utils/rpc"

"go.viam.com/rdk/logging"
"go.viam.com/rdk/robot/client"
)

var (
address = ""

defaultSourcePort = 9090
defaultDestinationPort = 3389
)

func main() {
logger := logging.NewDebugLogger("client")
var src int
flag.IntVar(&src, "src", defaultSourcePort, "source address to listen on")

var dest int
flag.IntVar(&dest, "dest", defaultDestinationPort, "destination address to tunnel to")

var addr string
flag.StringVar(&addr, "addr", address, "machine name to connect to")

var apiKey string
flag.StringVar(&apiKey, "api-key", apiKey, "api key to use to connect to machine")

var apiKeyID string
flag.StringVar(&apiKeyID, "api-key-id", apiKeyID, "api key id to use to connect to machine")

flag.Parse()

if addr == "" {
logger.Error("please enter an address with flag --addr")
return
}

logger.Infow("starting tunnel", "source address", src, "destination address", dest)
ctx := context.Background()

opts := []client.RobotClientOption{
client.WithRefreshEvery(0),
client.WithCheckConnectedEvery(0),
client.WithDisableSessions(),
}

if apiKey != "" && apiKeyID != "" {
opts = append(opts,
client.WithDialOptions(rpc.WithEntityCredentials(
apiKeyID,
rpc.Credentials{
Type: rpc.CredentialsTypeAPIKey,
Payload: apiKey,
}),
),
)
} else {
opts = append(opts,
client.WithDialOptions(
rpc.WithInsecure(),
rpc.WithDisableDirectGRPC(),
),
)
}
machine, err := client.New(ctx, addr, logger, opts...)
if err != nil {
logger.Error(err)
return
}

defer machine.Close(context.Background())
tunnelTraffic(ctx, machine, src, dest, logger)
}

func tunnelTraffic(ctx context.Context, machine *client.RobotClient, src, dest int, logger logging.Logger) {
// create listener
li, err := net.Listen("tcp", net.JoinHostPort("localhost", strconv.Itoa(src)))
if err != nil {
logger.CErrorw(ctx, "failed to create listener", "err", err)
return
}
defer li.Close()

var wg sync.WaitGroup
for {
if ctx.Err() != nil {
break
}
conn, err := li.Accept()
if err != nil {
logger.CErrorw(ctx, "failed to accept conn: %v\n", err)
}
wg.Add(1)
go func() {
defer wg.Done()
// call tunnel once per connection
if err := machine.Tunnel(ctx, conn, dest); err != nil {
logger.CError(ctx, err)
}
if err := conn.Close(); err != nil {
logger.CError(ctx, err)
}
}()
}
wg.Wait()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
go.viam.com/api v0.1.383
go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool how did you do this? There's an autogenerated pseudoversion just from PRs? Or from your branch?

Copy link
Member Author

@cheukt cheukt Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was from a branch on the api repo, so might not work if on a fork. but otherwise, all I did was go get go.viam.com/api@{HASH}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can update now I assume?

go.viam.com/test v1.2.4
go.viam.com/utils v0.1.126
goji.io v2.0.2+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1513,8 +1513,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.viam.com/api v0.1.383 h1:HE9EpUWoyDBYJLTVLo29f66oCjzH31V1YJ02tlzCtyo=
go.viam.com/api v0.1.383/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls=
go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186 h1:aGdngrg62FRgHRhb5d2ONCFVfNHQ1aQ081tbA+cXy/o=
go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls=
go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug=
go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI=
go.viam.com/utils v0.1.126 h1:ecFlzln5/u1NqzVMOVxwgwbkg4dDWvQmcCS2fMg0ZNU=
Expand Down
60 changes: 60 additions & 0 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -47,6 +48,7 @@ import (
"go.viam.com/rdk/robot/packages"
"go.viam.com/rdk/session"
"go.viam.com/rdk/spatialmath"
"go.viam.com/rdk/tunnel"
"go.viam.com/rdk/utils/contextutils"
)

Expand Down Expand Up @@ -1190,6 +1192,64 @@ func (rc *RobotClient) Version(ctx context.Context) (robot.VersionResponse, erro
return mVersion, nil
}

// Tunnel tunnels data to/from the read writer from/to the destination port on the server. This
// function will close the connection passed in as part of cleanup.
func (rc *RobotClient) Tunnel(ctx context.Context, conn net.Conn, dest int) error {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may add this to the Robot interface, but since this is only useful in the client and doesn't touch the Robot in anyway, I'm inclined to keep it out for now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping it off the robot interface is probably preferable to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any restriction on multiple tunnels? Can I open multiple tunnels to the same destination port at once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no restrictions - during testing, I find that browsers will open a lot of connections to the same port at the same time.

ctx, cancel := context.WithCancel(ctx)
defer cancel()
client, err := rc.client.Tunnel(ctx)
if err != nil {
return err
}

if err := client.Send(&pb.TunnelRequest{
DestinationPort: uint32(dest),
}); err != nil {
return err
}
rc.Logger().CDebugw(ctx, "creating tunnel to server", "port", dest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Info level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

var (
wg sync.WaitGroup
readerSenderErr error
)
wg.Add(1)
utils.PanicCapturingGo(func() {
defer func() {
// we communicate an end to the stream by calling CloseSend(),
// which is why we don't need an EOF field on the request message.
client.CloseSend()
// By cancelling this ctx, we will close the client, meaning client.Recv() in the RecvWriterLoop will exit
// and return an error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Particularly because of the no stream for id; discarding error, I'm inclined to ask: are there any other synchronization mechanisms between the reader-sender and recv-writer goroutines besides a context cancelation? I buy that this works, but it's a little odd to force an error on the underlying bidi stream (cancel the context) rather than use a channel or something to communicate a required exit between the two goroutines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking if there's an alternative to mucking with our logs: would instead scheduling a time.AfterFunc(5*time.Second, cancel) help. Such that we only log when our closing takes > 5 seconds to "roundtrip".

IIUC we're properly propagating closing up and down the tunnel, but we just don't give it time to politely disconnect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added an after func - with the current set of changes we do not get the no stream for id error at all, and usually everything cleans up before the 5 second mark

//
// NOTE(cheukt): This will cause DEBUG messages from WebRTC stating `no stream for id; discarding`
// to show up because the handler will have exited before we receive the last messages from the server.
// This is not an issue and is expected.
//
// TODO: Don't log `no stream for id; discarding` if client is canceled.
cancel()
wg.Done()
}()
sendFunc := func(data []byte) error { return client.Send(&pb.TunnelRequest{Data: data}) }
readerSenderErr = tunnel.ReaderSenderLoop(ctx, conn, sendFunc, rc.logger)
})

recvFunc := func() ([]byte, error) {
resp, err := client.Recv()
if err != nil {
return nil, err
}
return resp.Data, nil
}
recvWriterErr := tunnel.RecvWriterLoop(ctx, conn, recvFunc, rc.logger)
cancel()
// We do close the connection to unblock the reader/sender loop, which is not clean
// but there isn't a cleaner way to exit from the reader/sender loop.
err = conn.Close()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that I have to close the connection here in order for reader.Read in ReaderSenderLoop to exit, but not sure if there are better options here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above; can you leverage two error channels or an errgroup to communicate an error from one side to the other? Instead of leveraging context cancelation or closing the underlying connection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using two channels, it feels kind of complicated but works fairly well. will think about how to improve it as a followup, but wanted to stop tinkering with the implementation for now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling in followup SGTM if what you have now works and doesn't obviously leak or deadlock.


wg.Wait()
return errors.Join(err, readerSenderErr, recvWriterErr)
}

func unaryClientInterceptor() googlegrpc.UnaryClientInterceptor {
return func(
ctx context.Context,
Expand Down
53 changes: 50 additions & 3 deletions robot/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ package server
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"
commonpb "go.viam.com/api/common/v1"
pb "go.viam.com/api/robot/v1"
Expand All @@ -30,6 +33,7 @@ import (
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
"go.viam.com/rdk/session"
"go.viam.com/rdk/tunnel"
)

// logTSKey is the key used in conjunction with the timestamp of logs received
Expand All @@ -56,6 +60,49 @@ func New(robot robot.Robot) pb.RobotServiceServer {
func (s *Server) Close() {
}

// Tunnel tunnels traffic to/from the client from/to a specified port on the server.
func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error {
req, err := srv.Recv()
if err != nil {
return fmt.Errorf("failed to receive first message from stream: %w", err)
}

dest := strconv.Itoa(int(req.DestinationPort))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without finding the proto for this, I'm guessing DestinationPort is typed as an int32? Or is this a uint thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a uint32

s.robot.Logger().CDebugw(srv.Context(), "dialing to destination port", "port", dest)

dialTimeout := 10 * time.Second
conn, err := net.DialTimeout("tcp", net.JoinHostPort("localhost", dest), dialTimeout)
if err != nil {
return fmt.Errorf("failed to dial to destination port %v: %w", dest, err)
}
s.robot.Logger().CDebugw(srv.Context(), "successfully dialed to destination port, creating tunnel", "port", dest)

var (
wg sync.WaitGroup
readerSenderErr error
)
wg.Add(1)
utils.PanicCapturingGo(func() {
defer func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra func() { }() doesn't seem necessary. In case this was an accidental leftover from a time where the defer may have had multiple statements to process?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add another statement, but yes this was accidental

wg.Done()
}()
sendFunc := func(data []byte) error { return srv.Send(&pb.TunnelResponse{Data: data}) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being our buffer is set to 32KB, this seems safe. But maybe worth calling out that assumption about input size here. As our gRPC limits can hypothetically get angry.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

called out

readerSenderErr = tunnel.ReaderSenderLoop(srv.Context(), conn, sendFunc, s.robot.Logger())
})
recvFunc := func() ([]byte, error) {
req, err := srv.Recv()
if err != nil {
return nil, err
}
return req.Data, nil
}
recvWriterErr := tunnel.RecvWriterLoop(srv.Context(), conn, recvFunc, s.robot.Logger())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have swapped the arguments to be recvFunc, conn to match the naming of RecvWriterLoop. But given the types are different, I don't see a typo "incorrectly" compiling.

// close the connection to unblock the read
err = conn.Close()
wg.Wait()
return errors.Join(err, readerSenderErr, recvWriterErr)
}

// GetOperations lists all running operations.
func (s *Server) GetOperations(ctx context.Context, req *pb.GetOperationsRequest) (*pb.GetOperationsResponse, error) {
me := operation.Get(ctx)
Expand Down Expand Up @@ -201,7 +248,7 @@ func (s *Server) DiscoverComponents(ctx context.Context, req *pb.DiscoverCompone
for _, discovery := range discoveries {
pbResults, err := vprotoutils.StructToStructPb(discovery.Results)
if err != nil {
return nil, errors.Wrapf(err, "unable to construct a structpb.Struct from discovery for %q", discovery.Query)
return nil, fmt.Errorf("unable to construct a structpb.Struct from discovery for %q: %w", discovery.Query, err)
}
extra, err := structpb.NewStruct(discovery.Query.Extra)
if err != nil {
Expand Down Expand Up @@ -386,7 +433,7 @@ func (s *Server) Log(ctx context.Context, req *pb.LogRequest) (*pb.LogResponse,
for _, fieldP := range log.Fields {
field, err := logging.FieldFromProto(fieldP)
if err != nil {
return nil, errors.Wrap(err, "error converting LogRequest log field from proto")
return nil, fmt.Errorf("error converting LogRequest log field from proto: %w", err)
}
fields = append(fields, field)
}
Expand Down
Loading
Loading