Skip to content

Commit

Permalink
Merge pull request #147 from cloudstruct/feat/handle-muxer-error-duri…
Browse files Browse the repository at this point in the history
…ng-handshake

feat: properly handle muxer error during handshake
  • Loading branch information
agaffney authored Dec 22, 2022
2 parents 41f006b + 5eb3f5f commit 289bdec
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/go-ouroboros-network/localtxsubmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func testLocalTxSubmission(f *globalFlags) {
go func() {
for {
err := <-errorChan
fmt.Printf("ERROR: %s\n", err)
fmt.Printf("ERROR(async): %s\n", err)
os.Exit(1)
}
}()
Expand Down
71 changes: 56 additions & 15 deletions ouroboros.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@ import (
"github.com/cloudstruct/go-ouroboros-network/protocol/txsubmission"
"io"
"net"
"sync"
)

type Ouroboros struct {
conn net.Conn
networkMagic uint32
server bool
useNodeToNodeProto bool
muxer *muxer.Muxer
ErrorChan chan error
protoErrorChan chan error
sendKeepAlives bool
delayMuxerStart bool
fullDuplex bool
conn net.Conn
networkMagic uint32
server bool
useNodeToNodeProto bool
muxer *muxer.Muxer
ErrorChan chan error
protoErrorChan chan error
handshakeFinishedChan chan interface{}
doneChan chan interface{}
closeMutex sync.Mutex
sendKeepAlives bool
delayMuxerStart bool
fullDuplex bool
// Mini-protocols
Handshake *handshake.Handshake
ChainSync *chainsync.ChainSync
Expand All @@ -45,7 +49,9 @@ type Ouroboros struct {

func New(options ...OuroborosOptionFunc) (*Ouroboros, error) {
o := &Ouroboros{
protoErrorChan: make(chan error, 10),
protoErrorChan: make(chan error, 10),
handshakeFinishedChan: make(chan interface{}),
doneChan: make(chan interface{}),
}
// Apply provided options functions
for _, option := range options {
Expand Down Expand Up @@ -81,6 +87,18 @@ func (o *Ouroboros) Dial(proto string, address string) error {
}

func (o *Ouroboros) Close() error {
// We use a mutex to prevent this function from being called multiple times
// concurrently, which would cause a race condition
o.closeMutex.Lock()
defer o.closeMutex.Unlock()
// Immediately return if we're already shutting down
select {
case <-o.doneChan:
return nil
default:
}
// Close doneChan to signify that we're shutting down
close(o.doneChan)
// Gracefully stop the muxer
if o.muxer != nil {
o.muxer.Stop()
Expand All @@ -91,6 +109,22 @@ func (o *Ouroboros) Close() error {
return err
}
}
// Close channels
close(o.ErrorChan)
close(o.protoErrorChan)
// We can only close a channel once, so we have to jump through a few hoops
select {
// The channel is either closed or has an item pending
case _, ok := <-o.handshakeFinishedChan:
// We successfully retrieved an item
// This will probably never happen, but it doesn't hurt to cover this case
if ok {
close(o.handshakeFinishedChan)
}
// The channel is open and has no pending items
default:
close(o.handshakeFinishedChan)
}
return nil
}

Expand Down Expand Up @@ -131,7 +165,6 @@ func (o *Ouroboros) setupConnection() error {
protoOptions.Role = protocol.ProtocolRoleClient
}
// Perform handshake
handshakeFinishedChan := make(chan interface{})
var handshakeVersion uint16
var handshakeFullDuplex bool
handshakeConfig := &handshake.Config{
Expand All @@ -141,7 +174,7 @@ func (o *Ouroboros) setupConnection() error {
FinishedFunc: func(version uint16, fullDuplex bool) error {
handshakeVersion = version
handshakeFullDuplex = fullDuplex
close(handshakeFinishedChan)
close(o.handshakeFinishedChan)
return nil
},
}
Expand All @@ -153,9 +186,13 @@ func (o *Ouroboros) setupConnection() error {
}
// Wait for handshake completion or error
select {
case <-o.doneChan:
// Return an error if we're shutting down
return io.EOF
case err := <-o.protoErrorChan:
return err
case <-handshakeFinishedChan:
case <-o.handshakeFinishedChan:
// This is purposely empty, but we need this case to break out when this channel is closed
}
// Provide the negotiated protocol version to the various mini-protocols
protoOptions.Version = handshakeVersion
Expand All @@ -165,7 +202,11 @@ func (o *Ouroboros) setupConnection() error {
}
// Start Goroutine to pass along errors from the mini-protocols
go func() {
err := <-o.protoErrorChan
err, ok := <-o.protoErrorChan
// The channel is closed, which means we're already shutting down
if !ok {
return
}
o.ErrorChan <- fmt.Errorf("protocol error: %s", err)
// Close connection on mini-protocol errors
o.Close()
Expand Down

0 comments on commit 289bdec

Please sign in to comment.