diff --git a/cmd/snek/main.go b/cmd/snek/main.go index 7cf002a..f0d232a 100644 --- a/cmd/snek/main.go +++ b/cmd/snek/main.go @@ -149,15 +149,15 @@ func main() { // Start API after plugins are configured if err := apiInstance.Start(); err != nil { - logger.Fatalf("failed to start API: %s\n", err) + logger.Fatalf("failed to start API: %s", err) } // Start pipeline and wait for error if err := pipe.Start(); err != nil { - logger.Fatalf("failed to start pipeline: %s\n", err) + logger.Fatalf("failed to start pipeline: %s", err) } err, ok := <-pipe.ErrorChan() if ok { - logger.Fatalf("pipeline failed: %s\n", err) + logger.Fatalf("pipeline failed: %s", err) } } diff --git a/go.mod b/go.mod index 628ad40..f2f59ab 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/blinklabs-io/snek go 1.20 require ( - github.com/blinklabs-io/gouroboros v0.60.0 + github.com/blinklabs-io/gouroboros v0.61.0 github.com/gen2brain/beeep v0.0.0-20230602101333-f384c29b62dd github.com/gin-gonic/gin v1.9.1 github.com/kelseyhightower/envconfig v1.4.0 @@ -17,7 +17,7 @@ require ( ) // XXX: uncomment when testing local changes to gouroboros -// replace github.com/blinklabs-io/gouroboros v0.52.0 => ../gouroboros +// replace github.com/blinklabs-io/gouroboros => ../gouroboros require ( cloud.google.com/go/compute v1.20.1 // indirect diff --git a/go.sum b/go.sum index 01022e1..5f4d3eb 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= -github.com/blinklabs-io/gouroboros v0.60.0 h1:G08+r15QwTlak8UfCzZmJVlzIBMIy2iyYI7CL2+Jcxs= -github.com/blinklabs-io/gouroboros v0.60.0/go.mod h1:D5YJka8EyVmiXNMbRvjH23H9lNMLA4+qSlNNC/j7R0k= +github.com/blinklabs-io/gouroboros v0.61.0 h1:HSAo2thM/4JM6tVF4e/o9f20aVElSckJVX6LdkvuNyE= +github.com/blinklabs-io/gouroboros v0.61.0/go.mod h1:D5YJka8EyVmiXNMbRvjH23H9lNMLA4+qSlNNC/j7R0k= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM= github.com/bytedance/sonic v1.10.2 h1:GQebETVBxYB7JGWJtLBi07OVzWwt+8dWA00gEVW2ZFE= diff --git a/input/chainsync/chainsync.go b/input/chainsync/chainsync.go index a04fe79..6a27b10 100644 --- a/input/chainsync/chainsync.go +++ b/input/chainsync/chainsync.go @@ -29,6 +29,11 @@ import ( ocommon "github.com/blinklabs-io/gouroboros/protocol/common" ) +const ( + // Size of cache for recent chainsync cursors + cursorCacheSize = 20 +) + type ChainSync struct { oConn *ouroboros.Connection logger plugin.Logger @@ -41,12 +46,14 @@ type ChainSync struct { intersectTip bool intersectPoints []ocommon.Point includeCbor bool + autoReconnect bool statusUpdateFunc StatusUpdateFunc status *ChainSyncStatus errorChan chan error eventChan chan event.Event bulkRangeStart ocommon.Point bulkRangeEnd ocommon.Point + cursorCache []ocommon.Point } type ChainSyncStatus struct { @@ -79,6 +86,7 @@ func (c *ChainSync) Start() error { if err := c.setupConnection(); err != nil { return err } + // Start chainsync client c.oConn.ChainSync().Client.Start() if c.oConn.BlockFetch() != nil { c.oConn.BlockFetch().Client.Start() @@ -201,11 +209,27 @@ func (c *ChainSync) setupConnection() error { go func() { err, ok := <-c.oConn.ErrorChan() if ok { - // Pass error through our own error channel - c.errorChan <- err - return + if c.autoReconnect { + if c.logger != nil { + c.logger.Infof("reconnecting to %s due to error: %s", dialAddress, err) + } + // Shutdown current connection + if err := c.oConn.Close(); err != nil { + c.errorChan <- err + return + } + // Set the intersect points from the cursor cache + c.intersectPoints = c.cursorCache[:] + // Restart the connection + if err := c.Start(); err != nil { + c.errorChan <- err + return + } + } else { + // Pass error through our own error channel + c.errorChan <- err + } } - close(c.errorChan) }() return nil } @@ -297,6 +321,12 @@ func (c *ChainSync) updateStatus( tipSlotNumber uint64, tipBlockHash string, ) { + // Update cursor cache + blockHashBytes, _ := hex.DecodeString(blockHash) + c.cursorCache = append(c.cursorCache, ocommon.Point{Slot: slotNumber, Hash: blockHashBytes}) + if len(c.cursorCache) > cursorCacheSize { + c.cursorCache = c.cursorCache[len(c.cursorCache)-cursorCacheSize:] + } // Determine if we've reached the chain tip if !c.status.TipReached { // Make sure we're past the end slot in any bulk range, since we don't update the tip during bulk sync diff --git a/input/chainsync/options.go b/input/chainsync/options.go index 53215d8..6ced851 100644 --- a/input/chainsync/options.go +++ b/input/chainsync/options.go @@ -84,6 +84,13 @@ func WithIncludeCbor(includeCbor bool) ChainSyncOptionFunc { } } +// WithAutoReconnect specified whether to automatically reconnect if the connection is broken +func WithAutoReconnect(autoReconnect bool) ChainSyncOptionFunc { + return func(c *ChainSync) { + c.autoReconnect = autoReconnect + } +} + // WithStatusUpdateFunc specifies a callback function for status updates. This is useful for tracking the chain-sync status // to be able to resume a sync at a later time, especially when any filtering could prevent you from getting all block update events func WithStatusUpdateFunc( diff --git a/input/chainsync/plugin.go b/input/chainsync/plugin.go index 3b5d4f4..fdc172e 100644 --- a/input/chainsync/plugin.go +++ b/input/chainsync/plugin.go @@ -35,6 +35,7 @@ var cmdlineOptions struct { intersectTip bool intersectPoint string includeCbor bool + autoReconnect bool } func init() { @@ -110,6 +111,13 @@ func init() { DefaultValue: false, Dest: &(cmdlineOptions.includeCbor), }, + { + Name: "auto-reconnect", + Type: plugin.PluginOptionTypeBool, + Description: "auto-reconnect if the connection is broken", + DefaultValue: true, + Dest: &(cmdlineOptions.autoReconnect), + }, }, }, ) @@ -127,6 +135,7 @@ func NewFromCmdlineOptions() plugin.Plugin { WithNtcTcp(cmdlineOptions.ntcTcp), WithBulkMode(cmdlineOptions.bulkMode), WithIncludeCbor(cmdlineOptions.includeCbor), + WithAutoReconnect(cmdlineOptions.autoReconnect), } if cmdlineOptions.intersectPoint != "" { intersectPoints := []ocommon.Point{}