Skip to content

Commit

Permalink
Merge pull request #121 from blinklabs-io/feat/chainsync-input-auto-r…
Browse files Browse the repository at this point in the history
…econnect

feat: auto-reconnect on connection failure in chainsync input
  • Loading branch information
agaffney authored Nov 5, 2023
2 parents d879c4a + 67dc321 commit 1c2bac5
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 11 deletions.
6 changes: 3 additions & 3 deletions cmd/snek/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ func main() {

// Start API after plugins are configured
if err := apiInstance.Start(); err != nil {
logger.Fatalf("failed to start API: %s\n", err)
logger.Fatalf("failed to start API: %s", err)
}

// Start pipeline and wait for error
if err := pipe.Start(); err != nil {
logger.Fatalf("failed to start pipeline: %s\n", err)
logger.Fatalf("failed to start pipeline: %s", err)
}
err, ok := <-pipe.ErrorChan()
if ok {
logger.Fatalf("pipeline failed: %s\n", err)
logger.Fatalf("pipeline failed: %s", err)
}
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/blinklabs-io/snek
go 1.20

require (
github.com/blinklabs-io/gouroboros v0.60.0
github.com/blinklabs-io/gouroboros v0.61.0
github.com/gen2brain/beeep v0.0.0-20230602101333-f384c29b62dd
github.com/gin-gonic/gin v1.9.1
github.com/kelseyhightower/envconfig v1.4.0
Expand All @@ -17,7 +17,7 @@ require (
)

// XXX: uncomment when testing local changes to gouroboros
// replace github.com/blinklabs-io/gouroboros v0.52.0 => ../gouroboros
// replace github.com/blinklabs-io/gouroboros => ../gouroboros

require (
cloud.google.com/go/compute v1.20.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/blinklabs-io/gouroboros v0.60.0 h1:G08+r15QwTlak8UfCzZmJVlzIBMIy2iyYI7CL2+Jcxs=
github.com/blinklabs-io/gouroboros v0.60.0/go.mod h1:D5YJka8EyVmiXNMbRvjH23H9lNMLA4+qSlNNC/j7R0k=
github.com/blinklabs-io/gouroboros v0.61.0 h1:HSAo2thM/4JM6tVF4e/o9f20aVElSckJVX6LdkvuNyE=
github.com/blinklabs-io/gouroboros v0.61.0/go.mod h1:D5YJka8EyVmiXNMbRvjH23H9lNMLA4+qSlNNC/j7R0k=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM=
github.com/bytedance/sonic v1.10.2 h1:GQebETVBxYB7JGWJtLBi07OVzWwt+8dWA00gEVW2ZFE=
Expand Down
38 changes: 34 additions & 4 deletions input/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
)

const (
// Size of cache for recent chainsync cursors
cursorCacheSize = 20
)

type ChainSync struct {
oConn *ouroboros.Connection
logger plugin.Logger
Expand All @@ -41,12 +46,14 @@ type ChainSync struct {
intersectTip bool
intersectPoints []ocommon.Point
includeCbor bool
autoReconnect bool
statusUpdateFunc StatusUpdateFunc
status *ChainSyncStatus
errorChan chan error
eventChan chan event.Event
bulkRangeStart ocommon.Point
bulkRangeEnd ocommon.Point
cursorCache []ocommon.Point
}

type ChainSyncStatus struct {
Expand Down Expand Up @@ -79,6 +86,7 @@ func (c *ChainSync) Start() error {
if err := c.setupConnection(); err != nil {
return err
}
// Start chainsync client
c.oConn.ChainSync().Client.Start()
if c.oConn.BlockFetch() != nil {
c.oConn.BlockFetch().Client.Start()
Expand Down Expand Up @@ -201,11 +209,27 @@ func (c *ChainSync) setupConnection() error {
go func() {
err, ok := <-c.oConn.ErrorChan()
if ok {
// Pass error through our own error channel
c.errorChan <- err
return
if c.autoReconnect {
if c.logger != nil {
c.logger.Infof("reconnecting to %s due to error: %s", dialAddress, err)
}
// Shutdown current connection
if err := c.oConn.Close(); err != nil {
c.errorChan <- err
return
}
// Set the intersect points from the cursor cache
c.intersectPoints = c.cursorCache[:]
// Restart the connection
if err := c.Start(); err != nil {
c.errorChan <- err
return
}
} else {
// Pass error through our own error channel
c.errorChan <- err
}
}
close(c.errorChan)
}()
return nil
}
Expand Down Expand Up @@ -297,6 +321,12 @@ func (c *ChainSync) updateStatus(
tipSlotNumber uint64,
tipBlockHash string,
) {
// Update cursor cache
blockHashBytes, _ := hex.DecodeString(blockHash)
c.cursorCache = append(c.cursorCache, ocommon.Point{Slot: slotNumber, Hash: blockHashBytes})
if len(c.cursorCache) > cursorCacheSize {
c.cursorCache = c.cursorCache[len(c.cursorCache)-cursorCacheSize:]
}
// Determine if we've reached the chain tip
if !c.status.TipReached {
// Make sure we're past the end slot in any bulk range, since we don't update the tip during bulk sync
Expand Down
7 changes: 7 additions & 0 deletions input/chainsync/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func WithIncludeCbor(includeCbor bool) ChainSyncOptionFunc {
}
}

// WithAutoReconnect specified whether to automatically reconnect if the connection is broken
func WithAutoReconnect(autoReconnect bool) ChainSyncOptionFunc {
return func(c *ChainSync) {
c.autoReconnect = autoReconnect
}
}

// WithStatusUpdateFunc specifies a callback function for status updates. This is useful for tracking the chain-sync status
// to be able to resume a sync at a later time, especially when any filtering could prevent you from getting all block update events
func WithStatusUpdateFunc(
Expand Down
9 changes: 9 additions & 0 deletions input/chainsync/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var cmdlineOptions struct {
intersectTip bool
intersectPoint string
includeCbor bool
autoReconnect bool
}

func init() {
Expand Down Expand Up @@ -110,6 +111,13 @@ func init() {
DefaultValue: false,
Dest: &(cmdlineOptions.includeCbor),
},
{
Name: "auto-reconnect",
Type: plugin.PluginOptionTypeBool,
Description: "auto-reconnect if the connection is broken",
DefaultValue: true,
Dest: &(cmdlineOptions.autoReconnect),
},
},
},
)
Expand All @@ -127,6 +135,7 @@ func NewFromCmdlineOptions() plugin.Plugin {
WithNtcTcp(cmdlineOptions.ntcTcp),
WithBulkMode(cmdlineOptions.bulkMode),
WithIncludeCbor(cmdlineOptions.includeCbor),
WithAutoReconnect(cmdlineOptions.autoReconnect),
}
if cmdlineOptions.intersectPoint != "" {
intersectPoints := []ocommon.Point{}
Expand Down

0 comments on commit 1c2bac5

Please sign in to comment.