-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RSDK-9760 - Tunneling over grpc #4745
Conversation
works better working checkpoint update go mod another checkpoint cleanup fix go mod fix parse refactor minor refactor move tunnel example, final set of changes move log location some fixes doc
cancel() | ||
// We do close the connection to unblock the reader/sender loop, which is not clean | ||
// but there isn't a cleaner way to exit from the reader/sender loop. | ||
err = conn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like that I have to close the connection here in order for reader.Read in ReaderSenderLoop to exit, but not sure if there are better options here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as above; can you leverage two error channels or an errgroup to communicate an error from one side to the other? Instead of leveraging context cancelation or closing the underlying connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using two channels, it feels kind of complicated but works fairly well. will think about how to improve it as a followup, but wanted to stop tinkering with the implementation for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handling in followup SGTM if what you have now works and doesn't obviously leak or deadlock.
robot/client/client.go
Outdated
@@ -1190,6 +1192,64 @@ func (rc *RobotClient) Version(ctx context.Context) (robot.VersionResponse, erro | |||
return mVersion, nil | |||
} | |||
|
|||
// Tunnel tunnels data to/from the read writer from/to the destination port on the server. This | |||
// function will close the connection passed in as part of cleanup. | |||
func (rc *RobotClient) Tunnel(ctx context.Context, conn net.Conn, dest int) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may add this to the Robot interface, but since this is only useful in the client and doesn't touch the Robot in anyway, I'm inclined to keep it out for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping it off the robot interface is probably preferable to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Some initial questions/comments.
@@ -0,0 +1,116 @@ | |||
// main is an example of tunneling traffic over grpc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to have this example lying around once we implement the CLI version? Or, do we expect users to write their own programmatic Golang code creating their own tunnels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I'll remove it during the CLI task. In theory, users could write a tunneling program using a passed in connection/rwcloser, but the CLI code should suffice as an example
go.mod
Outdated
@@ -75,7 +75,7 @@ require ( | |||
go.uber.org/atomic v1.11.0 | |||
go.uber.org/multierr v1.11.0 | |||
go.uber.org/zap v1.27.0 | |||
go.viam.com/api v0.1.383 | |||
go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool how did you do this? There's an autogenerated pseudoversion just from PRs? Or from your branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was from a branch on the api repo, so might not work if on a fork. but otherwise, all I did was go get go.viam.com/api@{HASH}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can update now I assume?
robot/client/client.go
Outdated
@@ -1190,6 +1192,64 @@ func (rc *RobotClient) Version(ctx context.Context) (robot.VersionResponse, erro | |||
return mVersion, nil | |||
} | |||
|
|||
// Tunnel tunnels data to/from the read writer from/to the destination port on the server. This | |||
// function will close the connection passed in as part of cleanup. | |||
func (rc *RobotClient) Tunnel(ctx context.Context, conn net.Conn, dest int) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping it off the robot interface is probably preferable to me.
robot/client/client.go
Outdated
}); err != nil { | ||
return err | ||
} | ||
rc.Logger().CDebugw(ctx, "creating tunnel to server", "port", dest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Info level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
robot/client/client.go
Outdated
// which is why we don't need an EOF field on the request message. | ||
client.CloseSend() | ||
// By cancelling this ctx, we will close the client, meaning client.Recv() in the RecvWriterLoop will exit | ||
// and return an error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Particularly because of the no stream for id; discarding
error, I'm inclined to ask: are there any other synchronization mechanisms between the reader-sender and recv-writer goroutines besides a context cancelation? I buy that this works, but it's a little odd to force an error on the underlying bidi stream (cancel the context) rather than use a channel or something to communicate a required exit between the two goroutines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking if there's an alternative to mucking with our logs: would instead scheduling a time.AfterFunc(5*time.Second, cancel)
help. Such that we only log when our closing takes > 5 seconds to "roundtrip".
IIUC we're properly propagating closing up and down the tunnel, but we just don't give it time to politely disconnect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added an after func - with the current set of changes we do not get the no stream for id
error at all, and usually everything cleans up before the 5 second mark
cancel() | ||
// We do close the connection to unblock the reader/sender loop, which is not clean | ||
// but there isn't a cleaner way to exit from the reader/sender loop. | ||
err = conn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as above; can you leverage two error channels or an errgroup to communicate an error from one side to the other? Instead of leveraging context cancelation or closing the underlying connection?
robot/client/client.go
Outdated
@@ -1190,6 +1192,64 @@ func (rc *RobotClient) Version(ctx context.Context) (robot.VersionResponse, erro | |||
return mVersion, nil | |||
} | |||
|
|||
// Tunnel tunnels data to/from the read writer from/to the destination port on the server. This | |||
// function will close the connection passed in as part of cleanup. | |||
func (rc *RobotClient) Tunnel(ctx context.Context, conn net.Conn, dest int) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any restriction on multiple tunnels? Can I open multiple tunnels to the same destination port at once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no restrictions - during testing, I find that browsers will open a lot of connections to the same port at the same time.
func ReaderSenderLoop( | ||
ctx context.Context, | ||
r io.Reader, | ||
sendFunc func(buf []byte) error, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you try using an io.Writer
here? The signature seems to largely match. And then I expect we could use io.Copy
: https://pkg.go.dev/io#Copy
Cheuk and I talked offline. We feel that might just move the for-loop around instead of getting rid of it. I'm happy to not worry about it.
func RecvWriterLoop( | ||
ctx context.Context, | ||
w io.Writer, | ||
recvFunc func() ([]byte, error), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Symmetrical question, it seems an io.Reader
could be substituted allowing us to go to io.Copy
Talked offline
return fmt.Errorf("failed to receive first message from stream: %w", err) | ||
} | ||
|
||
dest := strconv.Itoa(int(req.DestinationPort)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without finding the proto for this, I'm guessing DestinationPort
is typed as an int32? Or is this a uint thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a uint32
) | ||
wg.Add(1) | ||
utils.PanicCapturingGo(func() { | ||
defer func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra func() { }()
doesn't seem necessary. In case this was an accidental leftover from a time where the defer may have had multiple statements to process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add another statement, but yes this was accidental
defer func() { | ||
wg.Done() | ||
}() | ||
sendFunc := func(data []byte) error { return srv.Send(&pb.TunnelResponse{Data: data}) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Being our buffer is set to 32KB, this seems safe. But maybe worth calling out that assumption about input size here. As our gRPC limits can hypothetically get angry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
called out
robot/server/server.go
Outdated
} | ||
return req.Data, nil | ||
} | ||
recvWriterErr := tunnel.RecvWriterLoop(srv.Context(), conn, recvFunc, s.robot.Logger()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have swapped the arguments to be recvFunc, conn
to match the naming of RecvWriterLoop
. But given the types are different, I don't see a typo "incorrectly" compiling.
robot/client/client.go
Outdated
// which is why we don't need an EOF field on the request message. | ||
client.CloseSend() | ||
// By cancelling this ctx, we will close the client, meaning client.Recv() in the RecvWriterLoop will exit | ||
// and return an error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking if there's an alternative to mucking with our logs: would instead scheduling a time.AfterFunc(5*time.Second, cancel)
help. Such that we only log when our closing takes > 5 seconds to "roundtrip".
IIUC we're properly propagating closing up and down the tunnel, but we just don't give it time to politely disconnect.
// For bidi streaming, Recv should be called on the client/server until it errors. | ||
// See [grpc.NewStream] for related docs. | ||
// | ||
// If the reader/sender loop is done, stop copying bytes as that means the connection is no longer accepting bytes. | ||
select { | ||
case <-rsDone: | ||
continue | ||
default: | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here, we continue until Recv errors. we're just straight discarding any bytes we get because the connection is likely no longer accepting bytes if reader/sender loop is done. In theory, maybe we only need to skip the Copy if the reader sender loop exits with a connection error of some kind, but in practice I start seeing closed connection/broken pipe errors if I try to make the condition more specific.
made some hopefully final impl changes - main difference is that we no longer cancel until 5s after we exit the reader/sender loop. The receiver also continues to receive until it gets an error but will skip copying the data to the net.Conn if we expect it to be closed. working on some tests for ReaderSenderLoop and RecvWriterLoop. more comprehensive tests tracked at https://viam.atlassian.net/browse/RSDK-9852 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
go.mod
Outdated
@@ -75,7 +75,7 @@ require ( | |||
go.uber.org/atomic v1.11.0 | |||
go.uber.org/multierr v1.11.0 | |||
go.uber.org/zap v1.27.0 | |||
go.viam.com/api v0.1.383 | |||
go.viam.com/api v0.1.386-0.20250124195735-0d8303ff6186 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can update now I assume?
// to show up because the handler will have exited before we receive the last messages from the server. | ||
// This is not an issue and is expected. | ||
timerMu.Lock() | ||
timer = time.AfterFunc(5*time.Second, cancel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] How does this delay appear to the user? Just a five second delay in ending the tunnel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would be a failsafe, so at most the delay will be 5s
cancel() | ||
// We do close the connection to unblock the reader/sender loop, which is not clean | ||
// but there isn't a cleaner way to exit from the reader/sender loop. | ||
err = conn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handling in followup SGTM if what you have now works and doesn't obviously leak or deadlock.
"go.viam.com/rdk/logging" | ||
) | ||
|
||
func filterError(ctx context.Context, err error, closeChan <-chan struct{}, logger logging.Logger) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice read-only on the channel 🫡 . This is a cool implementation.
added a few tests for the helpers, will merge as soon as tests pass |
still working on a small test for the helpers, but wanted to get some feedback early
I was able to run a server on windows and use remote desktop with no issues.
Also able to share both reader/sender and receiver/writer loops in the client/server implementations.
corresponding API PR at viamrobotics/api#623