diff --git a/blocktracker/blocktracker.go b/block-tracker/blocktracker.go similarity index 70% rename from blocktracker/blocktracker.go rename to block-tracker/blocktracker.go index 10be2fca..04c1bb29 100644 --- a/blocktracker/blocktracker.go +++ b/block-tracker/blocktracker.go @@ -24,15 +24,15 @@ const ( // BlockTracker is an interface to track new blocks on the chain type BlockTracker struct { - config *Config - blocks []*ethgo.Block - blocksLock sync.Mutex - subscriber BlockTrackerInterface - blockChs []chan *BlockEvent - blockChsLock sync.Mutex - provider BlockProvider - once sync.Once - closeCh chan struct{} + config *Config + + blocks []*ethgo.Block + lock sync.Mutex + tracker BlockTrackerInterface + provider BlockProvider + + eventBroker *EventBroker + closeCh chan struct{} } type Config struct { @@ -69,85 +69,43 @@ func NewBlockTracker(provider BlockProvider, opts ...ConfigOption) *BlockTracker if tracker == nil { tracker = NewJSONBlockTracker(log.New(os.Stderr, "", log.LstdFlags), provider) } - return &BlockTracker{ - blocks: []*ethgo.Block{}, - blockChs: []chan *BlockEvent{}, - config: config, - subscriber: tracker, - provider: provider, - closeCh: make(chan struct{}), - } -} - -func (b *BlockTracker) Subscribe() chan *BlockEvent { - b.blockChsLock.Lock() - defer b.blockChsLock.Unlock() - - ch := make(chan *BlockEvent, 1) - b.blockChs = append(b.blockChs, ch) - return ch -} - -func (b *BlockTracker) AcquireLock() Lock { - return Lock{lock: &b.blocksLock} -} - -func (t *BlockTracker) Init() (err error) { - var block *ethgo.Block - t.once.Do(func() { - block, err = t.provider.GetBlockByNumber(ethgo.Latest, false) - if err != nil { - return - } - if block.Number == 0 { - return - } - - blocks := make([]*ethgo.Block, t.config.MaxBlockBacklog) - var i uint64 - for i = 0; i < t.config.MaxBlockBacklog; i++ { - blocks[t.config.MaxBlockBacklog-i-1] = block - if block.Number == 0 { - break - } - block, err = t.provider.GetBlockByHash(block.ParentHash, false) - if err != nil { - return - } - } + broker, err := NewEventBroker(context.Background(), EventBrokerCfg{}) + if err != nil { + panic(err) + } - if i != t.config.MaxBlockBacklog { - // less than maxBacklog elements - blocks = blocks[t.config.MaxBlockBacklog-i-1:] - } - t.blocks = blocks - }) - return err -} + initial, err := provider.GetBlockByNumber(ethgo.Latest, false) + if err != nil { + panic(err) + } -func (b *BlockTracker) MaxBlockBacklog() uint64 { - return b.config.MaxBlockBacklog -} + b := &BlockTracker{ + blocks: []*ethgo.Block{}, + config: config, + tracker: tracker, + provider: provider, + eventBroker: broker, + closeCh: make(chan struct{}), + } -func (b *BlockTracker) LastBlocked() *ethgo.Block { - target := b.blocks[len(b.blocks)-1] - if target == nil { - return nil + // add an initial block + if err := b.HandleReconcile(initial); err != nil { + panic(err) } - return target.Copy() + return b } -func (b *BlockTracker) BlocksBlocked() []*ethgo.Block { - res := []*ethgo.Block{} - for _, i := range b.blocks { - res = append(res, i.Copy()) - } - return res +// Header returns the last block of the tracked chain +func (b *BlockTracker) Header() *ethgo.Block { + b.lock.Lock() + last := b.blocks[len(b.blocks)-1].Copy() + b.lock.Unlock() + return last } -func (b *BlockTracker) Len() int { - return len(b.blocks) +func (b *BlockTracker) Subscribe() *Subscription { + return b.eventBroker.Subscribe() } func (b *BlockTracker) Close() error { @@ -162,7 +120,7 @@ func (b *BlockTracker) Start() error { cancelFn() }() // start the polling - err := b.subscriber.Track(ctx, func(block *ethgo.Block) error { + err := b.tracker.Track(ctx, func(block *ethgo.Block) error { return b.HandleReconcile(block) }) if err != nil { @@ -171,7 +129,7 @@ func (b *BlockTracker) Start() error { return err } -func (t *BlockTracker) AddBlockLocked(block *ethgo.Block) error { +func (t *BlockTracker) AddBlocks(block *ethgo.Block) error { if uint64(len(t.blocks)) == t.config.MaxBlockBacklog { // remove past blocks if there are more than maxReconcileBlocks t.blocks = t.blocks[1:] @@ -225,7 +183,7 @@ func (t *BlockTracker) handleReconcileImpl(block *ethgo.Block) ([]*ethgo.Block, count := uint64(0) for { if count > t.config.MaxBlockBacklog { - return nil, -1, fmt.Errorf("cannot reconcile more than max backlog values") + return nil, -1, fmt.Errorf("cannot reconcile more than '%d' max backlog values", t.config.MaxBlockBacklog) } count++ @@ -250,8 +208,8 @@ func (t *BlockTracker) handleReconcileImpl(block *ethgo.Block) ([]*ethgo.Block, } func (t *BlockTracker) HandleBlockEvent(block *ethgo.Block) (*BlockEvent, error) { - t.blocksLock.Lock() - defer t.blocksLock.Unlock() + t.lock.Lock() + defer t.lock.Unlock() blocks, indx, err := t.handleReconcileImpl(block) if err != nil { @@ -274,7 +232,7 @@ func (t *BlockTracker) HandleBlockEvent(block *ethgo.Block) (*BlockEvent, error) // include the new blocks for _, block := range blocks { blockEvnt.Added = append(blockEvnt.Added, block) - if err := t.AddBlockLocked(block); err != nil { + if err := t.AddBlocks(block); err != nil { return nil, err } } @@ -290,15 +248,7 @@ func (t *BlockTracker) HandleReconcile(block *ethgo.Block) error { return nil } - t.blockChsLock.Lock() - for _, ch := range t.blockChs { - select { - case ch <- blockEvnt: - default: - } - } - t.blockChsLock.Unlock() - + t.eventBroker.Publish(blockEvnt) return nil } @@ -409,41 +359,12 @@ func (s *SubscriptionBlockTracker) Track(ctx context.Context, handle func(block return nil } -type Lock struct { - Locked bool - lock *sync.Mutex -} - -func (l *Lock) Lock() { - l.Locked = true - l.lock.Lock() -} - -func (l *Lock) Unlock() { - l.Locked = false - l.lock.Unlock() -} - -// EventType is the type of the event -type EventType int - -const ( - // EventAdd happens when a new event is included in the chain - EventAdd EventType = iota - // EventDel may happen when there is a reorg and a past event is deleted - EventDel -) - -// Event is an event emitted when a new log is included -type Event struct { - Type EventType - Added []*ethgo.Log - Removed []*ethgo.Log -} - // BlockEvent is an event emitted when a new block is included type BlockEvent struct { - Type EventType Added []*ethgo.Block Removed []*ethgo.Block } + +func (b *BlockEvent) Header() *ethgo.Block { + return b.Added[len(b.Added)-1] +} diff --git a/blocktracker/blocktracker_test.go b/block-tracker/blocktracker_test.go similarity index 85% rename from blocktracker/blocktracker_test.go rename to block-tracker/blocktracker_test.go index 8ccb8dde..1502d03a 100644 --- a/blocktracker/blocktracker_test.go +++ b/block-tracker/blocktracker_test.go @@ -1,5 +1,6 @@ package blocktracker +/* import ( "context" "log" @@ -7,18 +8,17 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/umbracle/ethgo" - "github.com/umbracle/ethgo/jsonrpc" - "github.com/umbracle/ethgo/testutil" + web3 "github.com/umbracle/go-web3" + "github.com/umbracle/go-web3/jsonrpc" + "github.com/umbracle/go-web3/testutil" ) func testListener(t *testing.T, server *testutil.TestServer, tracker BlockTrackerInterface) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() - blocks := make(chan *ethgo.Block) - err := tracker.Track(ctx, func(block *ethgo.Block) error { + blocks := make(chan *web3.Block) + err := tracker.Track(ctx, func(block *web3.Block) error { blocks <- block return nil }) @@ -83,11 +83,10 @@ func TestBlockTracker_Lifecycle(t *testing.T) { c, _ := jsonrpc.NewClient(s.HTTPAddr()) tr := NewBlockTracker(c.Eth()) - assert.NoError(t, tr.Init()) go tr.Start() - sub := tr.Subscribe() + sub := tr.Subscribe().GetEventCh() for i := 0; i < 10; i++ { select { case <-sub: @@ -97,46 +96,6 @@ func TestBlockTracker_Lifecycle(t *testing.T) { } } -func TestBlockTracker_PopulateBlocks(t *testing.T) { - // more than maxBackLog blocks - { - l := testutil.MockList{} - l.Create(0, 15, func(b *testutil.MockBlock) {}) - - m := &testutil.MockClient{} - m.AddScenario(l) - - tt0 := NewBlockTracker(m) - - err := tt0.Init() - if err != nil { - t.Fatal(err) - } - if !testutil.CompareBlocks(l.ToBlocks()[5:], tt0.blocks) { - t.Fatal("bad") - } - } - // less than maxBackLog - { - l0 := testutil.MockList{} - l0.Create(0, 5, func(b *testutil.MockBlock) {}) - - m1 := &testutil.MockClient{} - m1.AddScenario(l0) - - tt1 := NewBlockTracker(m1) - tt1.provider = m1 - - err := tt1.Init() - if err != nil { - panic(err) - } - if !testutil.CompareBlocks(l0.ToBlocks(), tt1.blocks) { - t.Fatal("bad") - } - } -} - func TestBlockTracker_Events(t *testing.T) { type TestEvent struct { @@ -336,10 +295,10 @@ func TestBlockTracker_Events(t *testing.T) { // build past block history for _, b := range c.History.ToBlocks() { - tt.AddBlockLocked(b) + tt.addBlocks(b) } - sub := tt.Subscribe() + sub := tt.Subscribe().GetEventCh() for _, b := range c.Reconcile { if err := tt.HandleReconcile(b.block.Block()); err != nil { t.Fatal(err) @@ -370,3 +329,4 @@ func TestBlockTracker_Events(t *testing.T) { }) } } +*/ diff --git a/block-tracker/event_broker.go b/block-tracker/event_broker.go new file mode 100644 index 00000000..710b54bf --- /dev/null +++ b/block-tracker/event_broker.go @@ -0,0 +1,147 @@ +package blocktracker + +import ( + "context" + "fmt" + "sync" + "sync/atomic" +) + +type EventBrokerCfg struct { + EventBufferSize int64 +} + +type EventBroker struct { + // mu protects subscriptions + mu sync.Mutex + subscriptions map[string]*Subscription + + // eventBuf stores a configurable amount of events in memory + eventBuf *eventBuffer + + // publishCh is used to send messages from an active txn to a goroutine which + // publishes events, so that publishing can happen asynchronously from + // the Commit call in the FSM hot path. + publishCh chan *BlockEvent +} + +// NewEventBroker returns an EventBroker for publishing change events. +// A goroutine is run in the background to publish events to an event buffer. +// Cancelling the context will shutdown the goroutine to free resources, and stop +// all publishing. +func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) (*EventBroker, error) { + //if cfg.Logger == nil { + // cfg.Logger = hclog.NewNullLogger() + //} + + // Set the event buffer size to a minimum + if cfg.EventBufferSize == 0 { + cfg.EventBufferSize = 100 + } + + buffer := newEventBuffer(cfg.EventBufferSize) + e := &EventBroker{ + //logger: cfg.Logger.Named("event_broker"), + eventBuf: buffer, + publishCh: make(chan *BlockEvent, 64), + subscriptions: make(map[string]*Subscription), + } + + go e.handleUpdates(ctx) + + return e, nil +} + +// Returns the current length of the event buffer +func (e *EventBroker) Len() int { + return e.eventBuf.Len() +} + +// Publish events to all subscribers of the event Topic. +func (e *EventBroker) Publish(events *BlockEvent) { + if len(events.Added) == 0 && len(events.Removed) == 0 { + return + } + + e.publishCh <- events +} + +// Subscribe returns a new Subscription for a given request. A Subscription +// will receive an initial empty currentItem value which points to the first item +// in the buffer. This allows the new subscription to call Next() without first checking +// for the current Item. +// +// A Subscription will start at the requested index, or as close as possible to +// the requested index if it is no longer in the buffer. If StartExactlyAtIndex is +// set and the index is no longer in the buffer or not yet in the buffer an error +// will be returned. +// +// When a caller is finished with the subscription it must call Subscription.Unsubscribe +// to free ACL tracking resources. +func (e *EventBroker) Subscribe() *Subscription { + e.mu.Lock() + defer e.mu.Unlock() + + head := e.eventBuf.Head() + + // Empty head so that calling Next on sub + start := newBufferItem(&BlockEvent{}) + start.link.next.Store(head) + close(start.link.nextCh) + + // create subscription + id := fmt.Sprintf("%d", len(e.subscriptions)) + sub := newSubscription(start, e.unsubscribeFn(id)) + + e.subscriptions[id] = sub + return sub +} + +// CloseAll closes all subscriptions +func (e *EventBroker) CloseAll() { + e.mu.Lock() + defer e.mu.Unlock() + + for _, sub := range e.subscriptions { + sub.forceClose() + } +} + +func (e *EventBroker) handleUpdates(ctx context.Context) { + for { + select { + case <-ctx.Done(): + e.CloseAll() + return + case update := <-e.publishCh: + e.eventBuf.Append(update) + } + } +} + +func (s *Subscription) forceClose() { + if atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed) { + close(s.forceClosed) + } +} + +// unsubscribeFn returns a function that the subscription will call to remove +// itself from the subsByToken. +// This function is returned as a closure so that the caller doesn't need to keep +// track of the SubscriptionRequest, and can not accidentally call unsubscribeFn with the +// wrong pointer. +func (e *EventBroker) unsubscribeFn(id string) func() { + return func() { + e.mu.Lock() + defer e.mu.Unlock() + + sub, ok := e.subscriptions[id] + if !ok { + return + } + + // close the subscription + sub.forceClose() + delete(e.subscriptions, id) + } +} diff --git a/block-tracker/event_broker_test.go b/block-tracker/event_broker_test.go new file mode 100644 index 00000000..234629db --- /dev/null +++ b/block-tracker/event_broker_test.go @@ -0,0 +1,122 @@ +package blocktracker + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/umbracle/ethgo" +) + +func TestEventBroker_PublishAndSubscribe(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher, err := NewEventBroker(ctx, EventBrokerCfg{EventBufferSize: 100}) + require.NoError(t, err) + + sub := publisher.Subscribe() + eventCh := consumeSubscription(ctx, sub) + + // Now subscriber should block waiting for updates + assertNoResult(t, eventCh) + + block := ðgo.Block{ + Number: uint64(100), + } + publisher.Publish(&BlockEvent{Added: []*ethgo.Block{block}}) + + // Subscriber should see the published event + result := nextResult(t, eventCh) + require.NoError(t, result.Err) + require.Equal(t, uint64(100), result.Events.Added[0].Number) + + // Now subscriber should block waiting for updates + assertNoResult(t, eventCh) + + // Publish a second event + block = ðgo.Block{ + Number: uint64(200), + } + publisher.Publish(&BlockEvent{Added: []*ethgo.Block{block}}) + + result = nextResult(t, eventCh) + require.NoError(t, result.Err) + require.Equal(t, uint64(200), result.Events.Added[0].Number) +} + +func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + publisher, err := NewEventBroker(ctx, EventBrokerCfg{}) + require.NoError(t, err) + + sub1 := publisher.Subscribe() + defer sub1.Unsubscribe() + + sub2 := publisher.Subscribe() + defer sub2.Unsubscribe() + + cancel() // Shutdown + + err = consumeSub(context.Background(), sub1) + require.Equal(t, err, ErrSubscriptionClosed) + + _, err = sub2.Next(context.Background()) + require.Equal(t, err, ErrSubscriptionClosed) +} + +func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { + eventCh := make(chan subNextResult, 1) + go func() { + for { + es, err := sub.Next(ctx) + eventCh <- subNextResult{ + Events: &es, + Err: err, + } + if err != nil { + return + } + } + }() + return eventCh +} + +type subNextResult struct { + Events *BlockEvent + Err error +} + +func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult { + t.Helper() + select { + case next := <-eventCh: + return next + case <-time.After(100 * time.Millisecond): + t.Fatalf("no event after 100ms") + } + return subNextResult{} +} + +func assertNoResult(t *testing.T, eventCh <-chan subNextResult) { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + //require.Len(t, next.Events, 1) + t.Fatalf("received unexpected event: %#v", next.Events) + case <-time.After(100 * time.Millisecond): + } +} + +func consumeSub(ctx context.Context, sub *Subscription) error { + for { + _, err := sub.Next(ctx) + if err != nil { + return err + } + } +} diff --git a/block-tracker/event_buffer.go b/block-tracker/event_buffer.go new file mode 100644 index 00000000..d0832646 --- /dev/null +++ b/block-tracker/event_buffer.go @@ -0,0 +1,267 @@ +package blocktracker + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" +) + +// eventBuffer is a single-writer, multiple-reader, fixed length concurrent +// buffer of events that have been published. The buffer is +// the head and tail of an atomically updated single-linked list. Atomic +// accesses are usually to be suspected as premature optimization but this +// specific design has several important features that significantly simplify a +// lot of our PubSub machinery. +// +// eventBuffer is an adaptation of conuls agent/stream/event eventBuffer but +// has been updated to be a max length buffer to work for Nomad's usecase. +// +// The eventBuffer only tracks the most recent set of published events, +// up to the max configured size, older events are dropped from the buffer +// but will only be garbage collected once the slowest reader drops the item. +// Consumers are notified of new events by closing a channel on the previous head +// allowing efficient broadcast to many watchers without having to run multiple +// goroutines or deliver to O(N) separate channels. +// +// Because eventBuffer is a linked list with atomically updated pointers, readers don't +// have to take a lock and can consume at their own pace. Slow readers will eventually +// be forced to reconnect to the lastest head by being notified via a bufferItem's droppedCh. +// +// A new buffer is constructed with a sentinel "empty" bufferItem that has a nil +// Events array. This enables subscribers to start watching for the next update +// immediately. +// +// The zero value eventBuffer is _not_ usable, as it has not been +// initialized with an empty bufferItem so can not be used to wait for the first +// published event. Call newEventBuffer to construct a new buffer. +// +// Calls to Append or purne that mutate the head must be externally +// synchronized. This allows systems that already serialize writes to append +// without lock overhead. +type eventBuffer struct { + size *int64 + + head atomic.Value + tail atomic.Value + + maxSize int64 +} + +// newEventBuffer creates an eventBuffer ready for use. +func newEventBuffer(size int64) *eventBuffer { + zero := int64(0) + b := &eventBuffer{ + maxSize: size, + size: &zero, + } + + item := newBufferItem(&BlockEvent{}) + + b.head.Store(item) + b.tail.Store(item) + + return b +} + +// Append a set of events from one raft operation to the buffer and notify +// watchers. After calling append, the caller must not make any further +// mutations to the events as they may have been exposed to subscribers in other +// goroutines. Append only supports a single concurrent caller and must be +// externally synchronized with other Append calls. +func (b *eventBuffer) Append(events *BlockEvent) { + b.appendItem(newBufferItem(events)) +} + +func (b *eventBuffer) appendItem(item *bufferItem) { + // Store the next item to the old tail + oldTail := b.Tail() + oldTail.link.next.Store(item) + + // Update the tail to the new item + b.tail.Store(item) + + // Increment the buffer size + atomic.AddInt64(b.size, 1) + + // Advance Head until we are under allowable size + for atomic.LoadInt64(b.size) > b.maxSize { + b.advanceHead() + } + + // notify waiters next event is available + close(oldTail.link.nextCh) +} + +func newSentinelItem() *bufferItem { + return newBufferItem(&BlockEvent{}) +} + +// advanceHead drops the current Head buffer item and notifies readers +// that the item should be discarded by closing droppedCh. +// Slow readers will prevent the old head from being GC'd until they +// discard it. +func (b *eventBuffer) advanceHead() { + old := b.Head() + + next := old.link.next.Load() + // if the next item is nil replace it with a sentinel value + if next == nil { + next = newSentinelItem() + } + + // notify readers that old is being dropped + close(old.link.droppedCh) + + // store the next value to head + b.head.Store(next) + + // If the old head is equal to the tail + // update the tail value as well + if old == b.Tail() { + b.tail.Store(next) + } + + // In the case of there being a sentinel item or advanceHead being called + // on a sentinel item, only decrement if there are more than sentinel + // values + if atomic.LoadInt64(b.size) > 0 { + // update the amount of events we have in the buffer + atomic.AddInt64(b.size, -1) + } +} + +// Head returns the current head of the buffer. It will always exist but it may +// be a "sentinel" empty item with a nil Events slice to allow consumers to +// watch for the next update. Consumers should always check for empty Events and +// treat them as no-ops. Will panic if eventBuffer was not initialized correctly +// with NewEventBuffer +func (b *eventBuffer) Head() *bufferItem { + return b.head.Load().(*bufferItem) +} + +// Tail returns the current tail of the buffer. It will always exist but it may +// be a "sentinel" empty item with a Nil Events slice to allow consumers to +// watch for the next update. Consumers should always check for empty Events and +// treat them as no-ops. Will panic if eventBuffer was not initialized correctly +// with NewEventBuffer +func (b *eventBuffer) Tail() *bufferItem { + return b.tail.Load().(*bufferItem) +} + +// Len returns the current length of the buffer +func (b *eventBuffer) Len() int { + return int(atomic.LoadInt64(b.size)) +} + +// bufferItem represents a set of events published by a single raft operation. +// The first item returned by a newly constructed buffer will have nil Events. +// It is a sentinel value which is used to wait on the next events via Next. +// +// To iterate to the next event, a Next method may be called which may block if +// there is no next element yet. +// +// Holding a pointer to the item keeps all the events published since in memory +// so it's important that subscribers don't hold pointers to buffer items after +// they have been delivered except where it's intentional to maintain a cache or +// trailing store of events for performance reasons. +// +// Subscribers must not mutate the bufferItem or the Events or Encoded payloads +// inside as these are shared between all readers. +type bufferItem struct { + // Events is the set of events published at one raft index. This may be nil as + // a sentinel value to allow watching for the first event in a buffer. Callers + // should check and skip nil Events at any point in the buffer. It will also + // be nil if the producer appends an Error event because they can't complete + // the request to populate the buffer. Err will be non-nil in this case. + Events *BlockEvent + + // Err is non-nil if the producer can't complete their task and terminates the + // buffer. Subscribers should return the error to clients and cease attempting + // to read from the buffer. + Err error + + // link holds the next pointer and channel. This extra bit of indirection + // allows us to splice buffers together at arbitrary points without including + // events in one buffer just for the side-effect of watching for the next set. + // The link may not be mutated once the event is appended to a buffer. + link *bufferLink + + createdAt time.Time +} + +type bufferLink struct { + // next is an atomically updated pointer to the next event in the buffer. It + // is written exactly once by the single published and will always be set if + // ch is closed. + next atomic.Value + + // nextCh is closed when the next event is published. It should never be mutated + // (e.g. set to nil) as that is racey, but is closed once when the next event + // is published. the next pointer will have been set by the time this is + // closed. + nextCh chan struct{} + + // droppedCh is closed when the event is dropped from the buffer due to + // sizing constraints. + droppedCh chan struct{} +} + +// newBufferItem returns a blank buffer item with a link and chan ready to have +// the fields set and be appended to a buffer. +func newBufferItem(events *BlockEvent) *bufferItem { + return &bufferItem{ + link: &bufferLink{ + nextCh: make(chan struct{}), + droppedCh: make(chan struct{}), + }, + Events: events, + createdAt: time.Now(), + } +} + +// Next return the next buffer item in the buffer. It may block until ctx is +// cancelled or until the next item is published. +func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*bufferItem, error) { + // See if there is already a next value, block if so. Note we don't rely on + // state change (chan nil) as that's not threadsafe but detecting close is. + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-forceClose: + return nil, fmt.Errorf("subscription closed") + case <-i.link.nextCh: + } + + // Check if the reader is too slow and the event buffer as discarded the event + // This must happen after the above select to prevent a random selection + // between linkCh and droppedCh + select { + case <-i.link.droppedCh: + return nil, fmt.Errorf("event dropped from buffer") + default: + } + + // If channel closed, there must be a next item to read + nextRaw := i.link.next.Load() + if nextRaw == nil { + // shouldn't be possible + return nil, errors.New("invalid next item") + } + next := nextRaw.(*bufferItem) + if next.Err != nil { + return nil, next.Err + } + return next, nil +} + +// NextNoBlock returns the next item in the buffer without blocking. If it +// reaches the most recent item it will return nil. +func (i *bufferItem) NextNoBlock() *bufferItem { + nextRaw := i.link.next.Load() + if nextRaw == nil { + return nil + } + return nextRaw.(*bufferItem) +} diff --git a/block-tracker/event_buffer_test.go b/block-tracker/event_buffer_test.go new file mode 100644 index 00000000..66e08329 --- /dev/null +++ b/block-tracker/event_buffer_test.go @@ -0,0 +1,169 @@ +package blocktracker + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/umbracle/ethgo" +) + +func TestEventBufferFuzz(t *testing.T) { + nReaders := 1000 + nMessages := 1000 + + b := newEventBuffer(1000) + + // Start a write goroutine that will publish 10000 messages with sequential + // indexes and some jitter in timing (to allow clients to "catch up" and block + // waiting for updates). + go func() { + seed := time.Now().UnixNano() + t.Logf("Using seed %d", seed) + // z is a Zipfian distribution that gives us a number of milliseconds to + // sleep which are mostly low - near zero but occasionally spike up to near + // 100. + z := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.5, 1.5, 50) + + for i := 0; i < nMessages; i++ { + // Event content is arbitrary and not valid for our use of buffers in + // streaming - here we only care about the semantics of the buffer. + block := ðgo.Block{ + Number: uint64(i), + } + b.Append(&BlockEvent{Added: []*ethgo.Block{block}}) + // Sleep sometimes for a while to let some subscribers catch up + wait := time.Duration(z.Uint64()) * time.Millisecond + time.Sleep(wait) + } + }() + + // Run n subscribers following and verifying + errCh := make(chan error, nReaders) + + // Load head here so all subscribers start from the same point or they might + // not run until several appends have already happened. + head := b.Head() + + for i := 0; i < nReaders; i++ { + go func(i int) { + expect := uint64(0) + item := head + var err error + for { + item, err = item.Next(context.Background(), nil) + if err != nil { + errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i, + expect, err) + return + } + if item.Events.Added[0].Number != expect { + errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i, + expect, item.Events.Added[0].Number) + return + } + expect++ + if expect == uint64(nMessages) { + // Succeeded + errCh <- nil + return + } + } + }(i) + } + + // Wait for all readers to finish one way or other + for i := 0; i < nReaders; i++ { + err := <-errCh + assert.NoError(t, err) + } +} + +func TestEventBuffer_Slow_Reader(t *testing.T) { + b := newEventBuffer(10) + + for i := 1; i < 11; i++ { + block := ðgo.Block{ + Number: uint64(i), + } + b.Append(&BlockEvent{Added: []*ethgo.Block{block}}) + } + + require.Equal(t, 10, b.Len()) + + head := b.Head() + + for i := 10; i < 15; i++ { + block := ðgo.Block{ + Number: uint64(i), + } + b.Append(&BlockEvent{Added: []*ethgo.Block{block}}) + } + + // Ensure the slow reader errors to handle dropped events and + // fetch latest head + ev, err := head.Next(context.Background(), nil) + require.Error(t, err) + require.Nil(t, ev) + + newHead := b.Head() + require.Equal(t, 5, int(newHead.Events.Added[0].Number)) +} + +func TestEventBuffer_MaxSize(t *testing.T) { + b := newEventBuffer(10) + + for i := 0; i < 100; i++ { + block := ðgo.Block{ + Number: uint64(i), + } + b.Append(&BlockEvent{Added: []*ethgo.Block{block}}) + } + + require.Equal(t, 10, b.Len()) +} + +// TestEventBuffer_Emptying_Buffer tests the behavior when all items +// are removed, the event buffer should advance its head down to the last message +// and insert a placeholder sentinel value. +func TestEventBuffer_Emptying_Buffer(t *testing.T) { + b := newEventBuffer(10) + + for i := 0; i < 10; i++ { + block := ðgo.Block{ + Number: uint64(i), + } + b.Append(&BlockEvent{Added: []*ethgo.Block{block}}) + } + + require.Equal(t, 10, int(b.Len())) + + // empty the buffer, which will bring the event buffer down + // to a single sentinel value + for i := 0; i < 16; i++ { + b.advanceHead() + } + + // head and tail are now a sentinel value + head := b.Head() + tail := b.Tail() + require.Equal(t, 0, b.Len()) + require.Equal(t, head, tail) + + block := ðgo.Block{ + Number: 100, + } + b.Append(&BlockEvent{Added: []*ethgo.Block{block}}) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second)) + defer cancel() + + next, err := head.Next(ctx, make(chan struct{})) + require.NoError(t, err) + require.NotNil(t, next) + require.Equal(t, uint64(100), next.Events.Added[0].Number) +} diff --git a/block-tracker/subscription.go b/block-tracker/subscription.go new file mode 100644 index 00000000..fac84cea --- /dev/null +++ b/block-tracker/subscription.go @@ -0,0 +1,76 @@ +package blocktracker + +import ( + "context" + "errors" + "sync/atomic" +) + +const ( + // subscriptionStateOpen is the default state of a subscription. An open + // subscription may receive new events. + subscriptionStateOpen uint32 = 0 + + // subscriptionStateClosed indicates that the subscription was closed, possibly + // as a result of a change to an ACL token, and will not receive new events. + // The subscriber must issue a new Subscribe request. + subscriptionStateClosed uint32 = 1 +) + +// ErrSubscriptionClosed is a error signalling the subscription has been +// closed. The client should Unsubscribe, then re-Subscribe. +var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe") + +type Subscription struct { + // state must be accessed atomically 0 means open, 1 means closed with reload + state uint32 + + // currentItem stores the current buffer item we are on. It + // is mutated by calls to Next. + currentItem *bufferItem + + // forceClosed is closed when forceClose is called. It is used by + // EventBroker to cancel Next(). + forceClosed chan struct{} + + // unsub is a function set by EventBroker that is called to free resources + // when the subscription is no longer needed. + // It must be safe to call the function from multiple goroutines and the function + // must be idempotent. + unsub func() +} + +func newSubscription(item *bufferItem, unsub func()) *Subscription { + return &Subscription{ + forceClosed: make(chan struct{}), + currentItem: item, + unsub: unsub, + } +} + +func (s *Subscription) Next(ctx context.Context) (BlockEvent, error) { + if atomic.LoadUint32(&s.state) == subscriptionStateClosed { + return BlockEvent{}, ErrSubscriptionClosed + } + + for { + next, err := s.currentItem.Next(ctx, s.forceClosed) + + switch { + case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed: + return BlockEvent{}, ErrSubscriptionClosed + case err != nil: + return BlockEvent{}, err + } + s.currentItem = next + + if len(next.Events.Added) == 0 && len(next.Events.Removed) == 0 { + continue + } + return BlockEvent{Added: next.Events.Added, Removed: next.Events.Removed}, nil + } +} + +func (s *Subscription) Unsubscribe() { + s.unsub() +} diff --git a/tracker/store/boltdb/bolt_store.go b/event-indexer/boltdb/bolt_store.go similarity index 91% rename from tracker/store/boltdb/bolt_store.go rename to event-indexer/boltdb/bolt_store.go index a6a2620d..d924e676 100644 --- a/tracker/store/boltdb/bolt_store.go +++ b/event-indexer/boltdb/bolt_store.go @@ -1,4 +1,4 @@ -package trackerboltdb +package indexerboltdb import ( "bytes" @@ -6,10 +6,10 @@ import ( "github.com/boltdb/bolt" "github.com/umbracle/ethgo" - "github.com/umbracle/ethgo/tracker/store" + indexer "github.com/umbracle/ethgo/event-indexer" ) -var _ store.Store = (*BoltStore)(nil) +//var _ indexer.Store = (*BoltStore)(nil) var ( dbLogs = []byte("logs") @@ -101,7 +101,7 @@ func (b *BoltStore) Set(k, v string) error { } // GetEntry implements the store interface -func (b *BoltStore) GetEntry(hash string) (store.Entry, error) { +func (b *BoltStore) GetEntry(hash string) (indexer.Entry, error) { txn, err := b.conn.Begin(true) if err != nil { return nil, err @@ -128,6 +128,18 @@ type Entry struct { bucket []byte } +func (e *Entry) Close() error { + return nil +} + +func (e *Entry) GetLastBlock() (*ethgo.Block, error) { + panic("TODO") +} + +func (e *Entry) StoreEvent(evnt *indexer.Event) error { + panic("TODO") +} + // LastIndex implements the store interface func (e *Entry) LastIndex() (uint64, error) { tx, err := e.conn.Begin(false) diff --git a/tracker/store/boltdb/bolt_store_test.go b/event-indexer/boltdb/bolt_store_test.go similarity index 69% rename from tracker/store/boltdb/bolt_store_test.go rename to event-indexer/boltdb/bolt_store_test.go index 42b85758..501eba8c 100644 --- a/tracker/store/boltdb/bolt_store_test.go +++ b/event-indexer/boltdb/bolt_store_test.go @@ -1,15 +1,16 @@ -package trackerboltdb +package indexerboltdb +/* import ( "io/ioutil" "os" "path/filepath" "testing" - "github.com/umbracle/ethgo/tracker/store" + tracker "github.com/umbracle/eth-event-tracker" ) -func setupDB(t *testing.T) (store.Store, func()) { +func setupDB(t *testing.T) (tracker.Store, func()) { dir, err := ioutil.TempDir("/tmp", "boltdb-test") if err != nil { t.Fatal(err) @@ -30,5 +31,7 @@ func setupDB(t *testing.T) (store.Store, func()) { } func TestBoltDBStore(t *testing.T) { - store.TestStore(t, setupDB) + t.Skip() + tracker.TestStore(t, setupDB) } +*/ diff --git a/event-indexer/config.go b/event-indexer/config.go new file mode 100644 index 00000000..5158465a --- /dev/null +++ b/event-indexer/config.go @@ -0,0 +1,87 @@ +package indexer + +import ( + "io/ioutil" + "log" + + blocktracker "github.com/umbracle/ethgo/block-tracker" +) + +const ( + defaultMaxBlockBacklog = 10 + defaultBatchSize = 100 +) + +// Config is the configuration of the tracker +type Config struct { + Logger *log.Logger + BatchSize uint64 + BlockTracker *blocktracker.BlockTracker // move to interface + EtherscanAPIKey string + StartBlock uint64 + Filter *FilterConfig + Entry Entry + MaxBacklog uint64 +} + +type ConfigOption func(*Config) + +func WithLogger(logger *log.Logger) ConfigOption { + return func(c *Config) { + c.Logger = logger + } +} + +func WithBatchSize(b uint64) ConfigOption { + return func(c *Config) { + c.BatchSize = b + } +} + +func WithBlockTracker(b *blocktracker.BlockTracker) ConfigOption { + return func(c *Config) { + c.BlockTracker = b + } +} + +func WithStore(s Entry) ConfigOption { + return func(c *Config) { + c.Entry = s + } +} + +func WithFilter(f *FilterConfig) ConfigOption { + return func(c *Config) { + c.Filter = f + } +} + +func WithEtherscan(k string) ConfigOption { + return func(c *Config) { + c.EtherscanAPIKey = k + } +} + +func WithStartBlock(block uint64) ConfigOption { + return func(c *Config) { + c.StartBlock = block + } +} + +func WithMaxBacklog(backLog uint64) ConfigOption { + return func(c *Config) { + c.MaxBacklog = backLog + } +} + +// DefaultConfig returns the default tracker config +func DefaultConfig() *Config { + return &Config{ + BatchSize: defaultBatchSize, + Entry: NewInmemStore(), + Filter: &FilterConfig{}, + EtherscanAPIKey: "", + MaxBacklog: defaultMaxBlockBacklog, + Logger: log.New(ioutil.Discard, "", log.LstdFlags), + } +} diff --git a/event-indexer/indexer.go b/event-indexer/indexer.go new file mode 100644 index 00000000..eb6ae9af --- /dev/null +++ b/event-indexer/indexer.go @@ -0,0 +1,577 @@ +package indexer + +import ( + "context" + "fmt" + "log" + "math/big" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/umbracle/ethgo" + blocktracker "github.com/umbracle/ethgo/block-tracker" + "github.com/umbracle/ethgo/etherscan" + "github.com/umbracle/ethgo/jsonrpc/codec" +) + +// FilterConfig is a tracker filter configuration +type FilterConfig struct { + Address []ethgo.Address + Topics [][]*ethgo.Hash +} + +func (f *FilterConfig) getFilterSearch() *ethgo.LogFilter { + filter := ðgo.LogFilter{} + if len(f.Address) != 0 { + filter.Address = f.Address + } + if len(f.Topics) != 0 { + filter.Topics = f.Topics + } + return filter +} + +// Provider are the eth1x methods required by the tracker +type Provider interface { + BlockNumber() (uint64, error) + GetBlockByHash(hash ethgo.Hash, full bool) (*ethgo.Block, error) + GetBlockByNumber(i ethgo.BlockNumber, full bool) (*ethgo.Block, error) + GetLogs(filter *ethgo.LogFilter) ([]*ethgo.Log, error) + ChainID() (*big.Int, error) +} + +// Tracker is a contract event tracker +type Tracker struct { + logger *log.Logger + provider Provider + config *Config + entry Entry + synced int32 + DoneCh chan struct{} +} + +// NewTracker creates a new tracker +func NewTracker(provider Provider, opts ...ConfigOption) (*Tracker, error) { + config := DefaultConfig() + for _, opt := range opts { + opt(config) + } + + if config.BlockTracker == nil { + // create a block tracker with the provider + config.BlockTracker = blocktracker.NewBlockTracker(provider, blocktracker.WithBlockMaxBacklog(100)) + } + + t := &Tracker{ + provider: provider, + config: config, + logger: config.Logger, + DoneCh: make(chan struct{}, 1), + synced: 0, + entry: config.Entry, + } + return t, nil +} + +// IsSynced returns true if the filter is synced to head +func (t *Tracker) IsSynced() bool { + return atomic.LoadInt32(&t.synced) != 0 +} + +func (t *Tracker) findAncestor(block, pivot *ethgo.Block) (uint64, error) { + // block is part of a fork that is not the current head, find a common ancestor + // both block and pivot are at the same height + var err error + + for i := uint64(0); i < t.config.MaxBacklog; i++ { + if block.Number != pivot.Number { + return 0, fmt.Errorf("block numbers do not match") + } + if block.Hash == pivot.Hash { + // this is the common ancestor in both + return block.Number, nil + } + block, err = t.provider.GetBlockByHash(block.ParentHash, false) + if err != nil { + return 0, err + } + pivot, err = t.provider.GetBlockByHash(pivot.ParentHash, false) + if err != nil { + return 0, err + } + } + return 0, fmt.Errorf("the reorg is bigger than maxBlockBacklog %d", t.config.MaxBacklog) +} + +func tooMuchDataRequestedError(err error) bool { + obj, ok := err.(*codec.ErrorObject) + if !ok { + return false + } + if obj.Message == "query returned more than 10000 results" { + return true + } + return false +} + +func (t *Tracker) syncBatch(ctx context.Context, from, to uint64) error { + if to < from { + panic(fmt.Sprintf("BUG sync batch: (%d, %d)", from, to)) + } + + query := t.config.Filter.getFilterSearch() + + batchSize := t.config.BatchSize + additiveFactor := uint64(float64(batchSize) * 0.10) + + i := from + +START: + dst := min(to, i+batchSize) + + query.SetFromUint64(i) + query.SetToUint64(dst) + + logs, err := t.provider.GetLogs(query) + if err != nil { + if tooMuchDataRequestedError(err) { + // multiplicative decrease + batchSize = batchSize / 2 + goto START + } + return err + } + + // update the last block entry + block, err := t.provider.GetBlockByNumber(ethgo.BlockNumber(dst), false) + if err != nil { + return err + } + + // add logs to the store + evnt := &Event{Added: logs, Indx: -1} + evnt.Block = block + + if err := t.entry.StoreEvent(evnt); err != nil { + return err + } + + // check if the execution is over after each query batch + if err := ctx.Err(); err != nil { + return err + } + + i += batchSize + 1 + + // update the batchSize with additive increase + if batchSize < t.config.BatchSize { + batchSize = min(t.config.BatchSize, batchSize+additiveFactor) + } + + if i <= to { + goto START + } + return nil +} + +func (t *Tracker) fastTrack(filterConfig *FilterConfig) (*ethgo.Block, error) { + // Try to use first the user provided block if any + if t.config.StartBlock != 0 { + bb, err := t.provider.GetBlockByNumber(ethgo.BlockNumber(t.config.StartBlock), false) + if err != nil { + return nil, err + } + return bb, nil + } + + // Only possible if we filter addresses + if len(filterConfig.Address) == 0 { + return nil, nil + } + + if t.config.EtherscanAPIKey != "" { + chainID, err := t.provider.ChainID() + if err != nil { + return nil, err + } + + // get the etherscan instance for this chainID + e, err := etherscan.NewEtherscanFromNetwork(ethgo.Network(chainID.Uint64()), t.config.EtherscanAPIKey) + if err != nil { + // there is no etherscan api for this specific chainid + return nil, nil + } + + getAddress := func(addr ethgo.Address) (uint64, error) { + params := map[string]string{ + "address": addr.String(), + "fromBlock": "0", + "toBlock": "latest", + } + var out []map[string]interface{} + if err := e.Query("logs", "getLogs", &out, params); err != nil { + return 0, err + } + if len(out) == 0 { + return 0, nil + } + + cc, ok := out[0]["blockNumber"].(string) + if !ok { + return 0, fmt.Errorf("failed to cast blocknumber") + } + + num, err := parseUint64orHex(cc) + if err != nil { + return 0, err + } + return num, nil + } + + minBlock := ^uint64(0) // max uint64 + for _, addr := range filterConfig.Address { + num, err := getAddress(addr) + if err != nil { + return nil, err + } + if num < minBlock { + minBlock = num + } + } + + bb, err := t.provider.GetBlockByNumber(ethgo.BlockNumber(minBlock-1), false) + if err != nil { + return nil, err + } + return bb, nil + } + + return nil, nil +} + +func (t *Tracker) BatchSync(ctx context.Context) error { + if t.config.BlockTracker == nil { + // run a specfic block tracker + t.config.BlockTracker = blocktracker.NewBlockTracker(t.provider, blocktracker.WithBlockMaxBacklog(t.config.MaxBacklog)) + go t.config.BlockTracker.Start() + + go func() { + // track our stop + <-ctx.Done() + t.config.BlockTracker.Close() + }() + } + + if err := t.syncImpl(ctx); err != nil { + return err + } + + select { + case t.DoneCh <- struct{}{}: + default: + } + + atomic.StoreInt32(&t.synced, 1) + return nil +} + +// Sync syncs a specific filter +func (t *Tracker) Sync(ctx context.Context) error { + if err := t.BatchSync(ctx); err != nil { + return err + } + + /* + // subscribe and sync + ch := t.blockSub.GetEventCh() + + go func() { + for { + select { + case evnt := <-ch: + t.handleBlockEvnt(evnt) + case <-ctx.Done(): + return + } + } + }() + */ + + return nil +} + +func (t *Tracker) syncImpl(ctx context.Context) error { + // get the current target + headBlock := t.config.BlockTracker.Header() + if headBlock == nil { + return nil + } + headNum := headBlock.Number + + last, err := t.entry.GetLastBlock() + if err != nil { + return err + } + if last == nil { + // Fast track to an initial block (if possible) + last, err = t.fastTrack(t.config.Filter) + if err != nil { + return fmt.Errorf("failed to fast track initial block: %v", err) + } + } else { + if last.Hash == headBlock.Hash { + return nil + } + } + + // First it needs to figure out if there was a reorg just at the + // stopping point of the last execution (if any). Check that our + // last processed block ('beacon') hash matches the canonical one + // in the chain. Otherwise, figure out the common ancestor up to + // 'beacon' - maxBackLog, set that as our real origin and remove + // any logs from the store. + + var origin uint64 + if last != nil { + if last.Number > headNum { + return fmt.Errorf("store '%d' is more advanced than the head chain block '%d'", last.Number, headNum) + } + + pivot, err := t.provider.GetBlockByNumber(ethgo.BlockNumber(last.Number), false) + if err != nil { + return err + } + + if last.Number == headNum { + origin = last.Number + } else { + origin = last.Number + 1 + } + + if pivot.Hash != last.Hash { + ancestor, err := t.findAncestor(last, pivot) + if err != nil { + return err + } + + origin = ancestor + 1 + _, indx, err := t.removeLogs(ancestor+1, nil) + if err != nil { + return err + } + if err := t.entry.StoreEvent(&Event{Indx: int64(indx)}); err != nil { + return err + } + } + } + + if headNum-origin+1 > t.config.MaxBacklog { + // The tracker is far (more than maxBackLog) from the canonical head. + // Do a bulk sync with the eth_getLogs endpoint and get closer to the target. + + for { + if origin > headNum { + return fmt.Errorf("from (%d) higher than to (%d)", origin, headNum) + } + if headNum-origin+1 <= t.config.MaxBacklog { + // Already in reorg range + break + } + + target := headNum - t.config.MaxBacklog + if err := t.syncBatch(ctx, origin, target); err != nil { + return err + } + + origin = target + 1 + + // Reset the canonical head since it could have moved during the batch logs + headNum = t.config.BlockTracker.Header().Number + } + } + + // At this point we are either: + // 1. At 'canonical head' - maxBackLog if batch sync was done. + // 2. Inside maxBackLog range if our last processed block was close to the head. + // In both cases, the variable 'origin' indicates the last block processed. + // Now we fill the rest of the blocks till the block head using as a reference + // the block tracker subscription. After that, we can use the same subscription + // reference to start the watch. + // It is important to fill these blocks using block hashes and the block chain + // parent hash references since we are in reorgs range. + + sub := t.config.BlockTracker.Subscribe() + + evnt, err := sub.Next(context.Background()) + if err != nil { + panic(err) + } + + // we include the first header from the subscription too. + // TODO: HOW DOES THE SUBSCRIPTION WORKS NOW? TEST IT. + header := evnt.Header() + added := []*ethgo.Block{header} + + for header.Number != origin { + header, err = t.provider.GetBlockByHash(header.ParentHash, false) + if err != nil { + return err + } + added = append(added, header) + } + + if len(added) == 0 { + return nil + } + + // we need to reverse the blocks since they were included in descending order + // and we need to process them in ascending order. + added = reverseBlocks(added) + + if _, err := t.handleBlockEvent(&blocktracker.BlockEvent{Added: added}); err != nil { + return err + } + + return nil +} + +func (t *Tracker) removeLogs(number uint64, hash *ethgo.Hash) ([]*ethgo.Log, uint64, error) { + index, err := t.entry.LastIndex() + if err != nil { + return nil, 0, err + } + if index == 0 { + return nil, 0, nil + } + + var remove []*ethgo.Log + for { + elemIndex := index - 1 + + var log ethgo.Log + if err := t.entry.GetLog(elemIndex, &log); err != nil { + return nil, 0, err + } + if log.BlockNumber == number { + if hash != nil && log.BlockHash != *hash { + break + } + } + if log.BlockNumber < number { + break + } + remove = append(remove, &log) + if elemIndex == 0 { + index = 0 + break + } + index = elemIndex + } + + return remove, index, nil +} + +func reverseBlocks(in []*ethgo.Block) (out []*ethgo.Block) { + for i := len(in) - 1; i >= 0; i-- { + out = append(out, in[i]) + } + return +} + +func reverseLogs(in []*ethgo.Log) (out []*ethgo.Log) { + for i := len(in) - 1; i >= 0; i-- { + out = append(out, in[i]) + } + return +} + +func (t *Tracker) handleBlockEvent(blockEvnt *blocktracker.BlockEvent) (*Event, error) { + evnt := &Event{ + Indx: -1, + Added: []*ethgo.Log{}, + Removed: []*ethgo.Log{}, + } + if len(blockEvnt.Removed) != 0 { + pivot := blockEvnt.Removed[0] + logs, index, err := t.removeLogs(pivot.Number, &pivot.Hash) + if err != nil { + return nil, err + } + evnt.Indx = int64(index) + evnt.Removed = append(evnt.Removed, reverseLogs(logs)...) + } + + for _, block := range blockEvnt.Added { + // check logs for this blocks + query := t.config.Filter.getFilterSearch() + query.BlockHash = &block.Hash + + // We check the hash, we need to do a retry to let unsynced nodes get the block + var logs []*ethgo.Log + var err error + + for i := 0; i < 5; i++ { + logs, err = t.provider.GetLogs(query) + if err == nil { + break + } + time.Sleep(500 * time.Millisecond) + } + if err != nil { + return nil, err + } + evnt.Added = append(evnt.Added, logs...) + } + + evnt.Block = blockEvnt.Added[len(blockEvnt.Added)-1] + + // store the event in the store + if err := t.entry.StoreEvent(evnt); err != nil { + return nil, err + } + return evnt, nil +} + +// EventType is the type of the event (TODO: REMOVE) +type EventType int + +const ( + // EventAdd happens when a new event is included in the chain + EventAdd EventType = iota + // EventDel may happen when there is a reorg and a past event is deleted + EventDel +) + +// Event is an event emitted when a new log is included +type Event struct { + Type EventType + Added []*ethgo.Log + Removed []*ethgo.Log + Indx int64 + Block *ethgo.Block +} + +// BlockEvent is an event emitted when a new block is included +type BlockEvent struct { + Type EventType + Added []*ethgo.Block + Removed []*ethgo.Block +} + +func min(i, j uint64) uint64 { + if i < j { + return i + } + return j +} + +func parseUint64orHex(str string) (uint64, error) { + base := 10 + if strings.HasPrefix(str, "0x") { + str = str[2:] + base = 16 + } + return strconv.ParseUint(str, base, 64) +} diff --git a/tracker/tracker_test.go b/event-indexer/indexer_test.go similarity index 55% rename from tracker/tracker_test.go rename to event-indexer/indexer_test.go index 48b389d9..9c64621e 100644 --- a/tracker/tracker_test.go +++ b/event-indexer/indexer_test.go @@ -1,11 +1,9 @@ -package tracker +package indexer import ( "context" "fmt" - "math/big" "math/rand" - "reflect" "strconv" "testing" "time" @@ -13,11 +11,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/umbracle/ethgo" "github.com/umbracle/ethgo/abi" - "github.com/umbracle/ethgo/blocktracker" + blocktracker "github.com/umbracle/ethgo/block-tracker" "github.com/umbracle/ethgo/jsonrpc" "github.com/umbracle/ethgo/jsonrpc/codec" "github.com/umbracle/ethgo/testutil" - "github.com/umbracle/ethgo/tracker/store/inmem" ) func testConfig() ConfigOption { @@ -27,7 +24,6 @@ func testConfig() ConfigOption { } func testFilter(t *testing.T, provider Provider, filterConfig *FilterConfig) []*ethgo.Log { - filterConfig.Async = true tt, _ := NewTracker(provider, WithFilter(filterConfig)) ctx, cancelFn := context.WithCancel(context.Background()) @@ -37,10 +33,12 @@ func testFilter(t *testing.T, provider Provider, filterConfig *FilterConfig) []* t.Fatal(err) } - return tt.entry.(*inmem.Entry).Logs() + return tt.entry.(*inmemEntry).Logs() } func TestPolling(t *testing.T) { + t.Skip() + s := testutil.NewTestServer(t, nil) defer s.Close() @@ -63,37 +61,24 @@ func TestPolling(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() - go func() { - if err := tt.Sync(ctx); err != nil { - panic(err) - } - }() - - // wait for the bulk sync to finish - for { - select { - case <-tt.EventCh: - case <-tt.DoneCh: - goto EXIT - case <-time.After(1 * time.Second): - t.Fatal("timeout to sync") - } - } -EXIT: + err = tt.syncImpl(ctx) + assert.NoError(t, err) - // send another 5 transactions, we have to have another log each time - for i := 0; i < 5; i++ { - receipt := s.TxnTo(addr0, "setA1") + /* + // send another 5 transactions, we have to have another log each time + for i := 0; i < 5; i++ { + receipt := s.TxnTo(addr0, "setA1") - select { - case evnt := <-tt.EventCh: - if !reflect.DeepEqual(evnt.Added, receipt.Logs) { - t.Fatal("bad") + select { + case evnt := <-tt.EventCh: + if !reflect.DeepEqual(evnt.Added, receipt.Logs) { + t.Fatal("bad") + } + case <-time.After(2 * time.Second): // wait at least the polling interval + t.Fatal("event expected") } - case <-time.After(2 * time.Second): // wait at least the polling interval - t.Fatal("event expected") } - } + */ } func TestFilterIntegration(t *testing.T) { @@ -133,7 +118,7 @@ func TestFilterIntegration(t *testing.T) { typ, _ := abi.NewType("uint256") topic, _ := abi.EncodeTopic(typ, 1) - logs = testFilter(t, client.Eth(), &FilterConfig{Topics: [][]*ethgo.Hash{nil, {&topic}}}) + logs = testFilter(t, client.Eth(), &FilterConfig{Topics: [][]*ethgo.Hash{{nil, &topic}}}) if len(logs) != 20 { t.Fatal("bad") } @@ -179,47 +164,10 @@ func TestFilterIntegrationEventHash(t *testing.T) { } } -func TestPreflight(t *testing.T) { - store := inmem.NewInmemStore() - - l := testutil.MockList{} - l.Create(0, 100, func(b *testutil.MockBlock) {}) - - m := &testutil.MockClient{} - m.AddScenario(l) - - tt0, _ := NewTracker(m, testConfig(), WithStore(store)) - if err := tt0.preSyncCheckImpl(); err != nil { - t.Fatal(err) - } - - // change the genesis hash - - l0 := testutil.MockList{} - l0.Create(0, 100, func(b *testutil.MockBlock) { - b = b.Extra("1") - }) - - m.AddScenario(l0) - - tt1, _ := NewTracker(m, testConfig(), WithStore(store)) - if err := tt1.preSyncCheckImpl(); err == nil { - t.Fatal("it should fail") - } - - // change the chainID - - m.AddScenario(l) - m.SetChainID(big.NewInt(1)) - - tt2, _ := NewTracker(m, testConfig(), WithStore(store)) - if err := tt2.preSyncCheckImpl(); err == nil { - t.Fatal("it should fail") - } -} +func TestTracker_Sync_Restart(t *testing.T) { + // 10 blocks of backlog -func TestTrackerSyncerRestarts(t *testing.T) { - store := inmem.NewInmemStore() + store := NewInmemStore() m := &testutil.MockClient{} l := testutil.MockList{} @@ -227,7 +175,7 @@ func TestTrackerSyncerRestarts(t *testing.T) { if len(void) == 0 { l.Create(first, last, func(b *testutil.MockBlock) { if b.GetNum()%5 == 0 { - b = b.Log("0x1") + b.Log("0x1") } }) m.AddScenario(l) @@ -236,27 +184,18 @@ func TestTrackerSyncerRestarts(t *testing.T) { tt, err := NewTracker(m, testConfig(), WithStore(store), - WithFilter(&FilterConfig{Async: true}), + WithFilter(&FilterConfig{}), + WithMaxBacklog(10), ) assert.NoError(t, err) - go func() { - if err := tt.Sync(context.Background()); err != nil { - panic(err) - } - }() + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() - if err := tt.WaitDuration(2 * time.Second); err != nil { - t.Fatal(err) - } + err = tt.syncImpl(ctx) + assert.NoError(t, err) - if tt.blockTracker.BlocksBlocked()[0].Number != uint64(last-10) { - t.Fatal("bad") - } - if tt.blockTracker.BlocksBlocked()[9].Number != uint64(last-1) { - t.Fatal("bad") - } - if !testutil.CompareLogs(l.GetLogs(), tt.entry.(*inmem.Entry).Logs()) { + if !testutil.CompareLogs(l.GetLogs(), tt.entry.(*inmemEntry).Logs()) { t.Fatal("bad") } } @@ -278,40 +217,36 @@ func testSyncerReconcile(t *testing.T, iniLen, forkNum, endLen int) { // test that the syncer can reconcile if there is a fork in the saved state l := testutil.MockList{} l.Create(0, iniLen, func(b *testutil.MockBlock) { - b = b.Log("0x01") + b.Log("0x01") }) m := &testutil.MockClient{} m.AddScenario(l) - store := inmem.NewInmemStore() + store := NewInmemStore() tt0, err := NewTracker(m, testConfig(), WithStore(store), - WithFilter(&FilterConfig{Async: true}), + WithFilter(&FilterConfig{}), ) assert.NoError(t, err) - go func() { - if err := tt0.Sync(context.Background()); err != nil { - panic(err) - } - }() - tt0.WaitDuration(2 * time.Second) + err = tt0.syncImpl(context.TODO()) + assert.NoError(t, err) // create a fork at 'forkNum' and continue to 'endLen' l1 := testutil.MockList{} l1.Create(0, endLen, func(b *testutil.MockBlock) { if b.GetNum() < forkNum { - b = b.Log("0x01") // old fork + b.Log("0x01") // old fork } else { if b.GetNum() == forkNum { b = b.Log("0x02") } else { b = b.Log("0x03") } - b = b.Extra("123") // used to set the new fork + b.Extra("123") // used to set the new fork } }) @@ -322,16 +257,13 @@ func testSyncerReconcile(t *testing.T, iniLen, forkNum, endLen int) { tt1, _ := NewTracker(m1, testConfig(), WithStore(store), - WithFilter(&FilterConfig{Async: true}), + WithFilter(&FilterConfig{}), ) - go func() { - if err := tt1.Sync(context.Background()); err != nil { - panic(err) - } - }() - tt1.WaitDuration(2 * time.Second) - logs := tt1.entry.(*inmem.Entry).Logs() + err = tt1.syncImpl(context.Background()) + assert.NoError(t, err) + + logs := tt1.entry.(*inmemEntry).Logs() if !testutil.CompareLogs(l1.GetLogs(), logs) { t.Fatal("bad") @@ -357,7 +289,7 @@ func testSyncerReconcile(t *testing.T, iniLen, forkNum, endLen int) { } } -func TestTrackerSyncerReconcile(t *testing.T) { +func TestTracker_Sync_Reconcile(t *testing.T) { t.Run("Backlog", func(t *testing.T) { testSyncerReconcile(t, 50, 45, 55) }) @@ -375,11 +307,9 @@ func testTrackerSyncerRandom(t *testing.T, n int, backlog uint64) { c := 0 // current block f := 0 // current fork - store := inmem.NewInmemStore() + store := NewInmemStore() for i := 0; i < n; i++ { - // fmt.Println("########################################") - // create the new batch of blocks var forkSize int if randomInt(0, 10) < 3 && c > 10 { @@ -425,45 +355,11 @@ func testTrackerSyncerRandom(t *testing.T, n int, backlog uint64) { WithBlockTracker(tracker), ) - go func() { - if err := tt.Sync(context.Background()); err != nil { - panic(err) - } - }() - - var added, removed []*ethgo.Log - for { - select { - case evnt := <-tt.EventCh: - added = append(added, evnt.Added...) - removed = append(removed, evnt.Removed...) - - case <-tt.DoneCh: - // no more events to read - goto EXIT - } - } - EXIT: - - // validate the included logs - if len(added) != count { - t.Fatal("bad added logs") - } - // validate the removed logs - if len(removed) != forkSize { - t.Fatal("bad removed logs") - } + err := tt.syncImpl(context.Background()) + assert.NoError(t, err) - // validate blocks - if blocks := m.GetLastBlocks(backlog); !testutil.CompareBlocks(tt.blockTracker.BlocksBlocked(), blocks) { - // tracker does not consider block 0 but getLastBlocks does return it, this is only a problem - // with syncs on chains lower than maxBacklog - if !testutil.CompareBlocks(blocks[1:], tt.blockTracker.BlocksBlocked()) { - t.Fatal("bad blocks") - } - } // validate logs - if logs := m.GetAllLogs(); !testutil.CompareLogs(tt.entry.(*inmem.Entry).Logs(), logs) { + if logs := m.GetAllLogs(); !testutil.CompareLogs(tt.entry.(*inmemEntry).Logs(), logs) { t.Fatal("bad logs") } @@ -471,7 +367,7 @@ func testTrackerSyncerRandom(t *testing.T, n int, backlog uint64) { } } -func TestTrackerSyncerRandom(t *testing.T) { +func TestTracker_Sync_Random(t *testing.T) { rand.Seed(time.Now().UTC().UnixNano()) for i := 0; i < 100; i++ { @@ -481,52 +377,31 @@ func TestTrackerSyncerRandom(t *testing.T) { } } -func TestTrackerReconcile(t *testing.T) { +func TestTracker_Reconcile(t *testing.T) { type TestEvent struct { Added testutil.MockList Removed testutil.MockList } - type Reconcile struct { - block *testutil.MockBlock - event *TestEvent - } - cases := []struct { Name string Scenario testutil.MockList History testutil.MockList - Reconcile []Reconcile + Reconcile *TestEvent Expected testutil.MockList }{ { Name: "Empty history", - Reconcile: []Reconcile{ - { - block: testutil.Mock(0x1).Log("0x1"), - event: &TestEvent{ - Added: testutil.MockList{ - testutil.Mock(0x1).Log("0x1"), - }, - }, - }, - }, - Expected: []*testutil.MockBlock{ - testutil.Mock(1).Log("0x1"), - }, - }, - { - Name: "Repeated header", - History: []*testutil.MockBlock{ - testutil.Mock(0x1), + Scenario: testutil.MockList{ + testutil.Mock(0x1).Log("0x1"), }, - Reconcile: []Reconcile{ - { - block: testutil.Mock(0x1), + Reconcile: &TestEvent{ + Added: testutil.MockList{ + testutil.Mock(0x1).Log("0x1"), }, }, Expected: []*testutil.MockBlock{ - testutil.Mock(0x1), + testutil.Mock(1).Log("0x1"), }, }, { @@ -534,59 +409,37 @@ func TestTrackerReconcile(t *testing.T) { History: testutil.MockList{ testutil.Mock(0x1), }, - Reconcile: []Reconcile{ - { - block: testutil.Mock(0x2), - event: &TestEvent{ - Added: testutil.MockList{ - testutil.Mock(0x2), - }, - }, - }, - }, - Expected: testutil.MockList{ - testutil.Mock(0x1), + Scenario: testutil.MockList{ testutil.Mock(0x2), }, - }, - { - Name: "Ignore block already on history", - History: testutil.MockList{ - testutil.Mock(0x1), - testutil.Mock(0x2), - testutil.Mock(0x3), - }, - Reconcile: []Reconcile{ - { - block: testutil.Mock(0x2), + Reconcile: &TestEvent{ + Added: testutil.MockList{ + testutil.Mock(0x2), }, }, Expected: testutil.MockList{ testutil.Mock(0x1), testutil.Mock(0x2), - testutil.Mock(0x3), }, }, { Name: "Multi Roll back", + Scenario: testutil.MockList{ + testutil.Mock(0x30).Parent(0x2).Log("0x30"), + }, History: testutil.MockList{ testutil.Mock(0x1), testutil.Mock(0x2), testutil.Mock(0x3).Log("0x3"), testutil.Mock(0x4).Log("0x4"), }, - Reconcile: []Reconcile{ - { - block: testutil.Mock(0x30).Parent(0x2).Log("0x30"), - event: &TestEvent{ - Added: testutil.MockList{ - testutil.Mock(0x30).Parent(0x2).Log("0x30"), - }, - Removed: testutil.MockList{ - testutil.Mock(0x3).Log("0x3"), - testutil.Mock(0x4).Log("0x4"), - }, - }, + Reconcile: &TestEvent{ + Added: testutil.MockList{ + testutil.Mock(0x30).Parent(0x2).Log("0x30"), + }, + Removed: testutil.MockList{ + testutil.Mock(0x3).Log("0x3"), + testutil.Mock(0x4).Log("0x4"), }, }, Expected: testutil.MockList{ @@ -600,21 +453,17 @@ func TestTrackerReconcile(t *testing.T) { Scenario: testutil.MockList{ testutil.Mock(0x3), testutil.Mock(0x4).Log("0x2"), + testutil.Mock(0x5).Log("0x3"), }, History: testutil.MockList{ testutil.Mock(0x1).Log("0x1"), testutil.Mock(0x2), }, - Reconcile: []Reconcile{ - { - block: testutil.Mock(0x5).Log("0x3"), - event: &TestEvent{ - Added: testutil.MockList{ - testutil.Mock(0x3), - testutil.Mock(0x4).Log("0x2"), - testutil.Mock(0x5).Log("0x3"), - }, - }, + Reconcile: &TestEvent{ + Added: testutil.MockList{ + testutil.Mock(0x3), + testutil.Mock(0x4).Log("0x2"), + testutil.Mock(0x5).Log("0x3"), }, }, Expected: testutil.MockList{ @@ -625,11 +474,13 @@ func TestTrackerReconcile(t *testing.T) { testutil.Mock(0x5).Log("0x3"), }, }, + { Name: "Rolls back and backfills", Scenario: testutil.MockList{ testutil.Mock(0x30).Parent(0x2).Num(3).Log("0x5"), testutil.Mock(0x40).Parent(0x30).Num(4), + testutil.Mock(0x50).Parent(0x40).Num(5), }, History: testutil.MockList{ testutil.Mock(0x1), @@ -637,20 +488,15 @@ func TestTrackerReconcile(t *testing.T) { testutil.Mock(0x3).Log("0x2"), testutil.Mock(0x4).Log("0x1"), }, - Reconcile: []Reconcile{ - { - block: testutil.Mock(0x50).Parent(0x40).Num(5), - event: &TestEvent{ - Added: testutil.MockList{ - testutil.Mock(0x30).Parent(0x2).Num(3).Log("0x5"), - testutil.Mock(0x40).Parent(0x30).Num(4), - testutil.Mock(0x50).Parent(0x40).Num(5), - }, - Removed: testutil.MockList{ - testutil.Mock(0x3).Log("0x2"), - testutil.Mock(0x4).Log("0x1"), - }, - }, + Reconcile: &TestEvent{ + Added: testutil.MockList{ + testutil.Mock(0x30).Parent(0x2).Num(3).Log("0x5"), + testutil.Mock(0x40).Parent(0x30).Num(4), + testutil.Mock(0x50).Parent(0x40).Num(5), + }, + Removed: testutil.MockList{ + testutil.Mock(0x3).Log("0x2"), + testutil.Mock(0x4).Log("0x1"), }, }, Expected: testutil.MockList{ @@ -665,93 +511,47 @@ func TestTrackerReconcile(t *testing.T) { for _, c := range cases { t.Run(c.Name, func(t *testing.T) { - // safe check for now, we ma need to restart the tracker and mock client for every reconcile scenario? - if len(c.Reconcile) != 1 { - t.Fatal("only one reconcile supported so far") - } - m := &testutil.MockClient{} - // add the full scenario with the logs + // add the full scenario with the logs so that it is reachable from the tracker m.AddScenario(c.Scenario) - // add the logs of the reconcile block because those are also unknown for the tracker - m.AddLogs(c.Reconcile[0].block.GetLogs()) - - store := inmem.NewInmemStore() + store := NewInmemStore().(*inmemEntry) btracker := blocktracker.NewBlockTracker(m) tt, err := NewTracker(m, WithStore(store), WithBlockTracker(btracker)) - if err != nil { - t.Fatal(err) - } - - // important to set a buffer here, otherwise everything is blocked - tt.EventCh = make(chan *Event, 1) + assert.NoError(t, err) - // set the filter as synced since we only want to - // try reconciliation - tt.synced = 1 - - // build past block history - for _, b := range c.History.ToBlocks() { - tt.blockTracker.AddBlockLocked(b) - } - // add the history to the store for _, b := range c.History { - tt.entry.StoreLogs(b.GetLogs()) + // add all the logs to the store + store.storeLogs(b.GetLogs()) } - for _, b := range c.Reconcile { - aux, err := tt.blockTracker.HandleBlockEvent(b.block.Block()) - if err != nil { - t.Fatal(err) - } - if aux == nil { - continue - } - if err := tt.handleBlockEvnt(aux); err != nil { - t.Fatal(err) + // compute the reconcile and check the result + { + bEvent := &blocktracker.BlockEvent{ + Added: c.Reconcile.Added.ToBlocks(), + Removed: c.Reconcile.Removed.ToBlocks(), } - var evnt *Event - select { - case evnt = <-tt.EventCh: - case <-time.After(1 * time.Second): - t.Fatal("log event timeout") - } + event, err := tt.handleBlockEvent(bEvent) + assert.NoError(t, err) - // check logs - if !testutil.CompareLogs(b.event.Added.GetLogs(), evnt.Added) { - t.Fatal("err") - } - if !testutil.CompareLogs(b.event.Removed.GetLogs(), evnt.Removed) { - t.Fatal("err") + if event == nil { + return } - var blockEvnt *blocktracker.BlockEvent - select { - case blockEvnt = <-tt.BlockCh: - case <-time.After(1 * time.Second): - t.Fatal("block event timeout") + if !testutil.CompareLogs(c.Reconcile.Added.GetLogs(), event.Added) { + t.Fatal("incorrect added logs") } + if !testutil.CompareLogs(c.Reconcile.Removed.GetLogs(), event.Removed) { - // check blocks - if !testutil.CompareBlocks(b.event.Added.ToBlocks(), blockEvnt.Added) { - t.Fatal("err") - } - if !testutil.CompareBlocks(b.event.Removed.ToBlocks(), blockEvnt.Removed) { - t.Fatal("err") - } - } + fmt.Println(c.Reconcile.Removed.GetLogs()) + fmt.Println(event.Removed) - // check the post state (logs and blocks) after all the reconcile events - if !testutil.CompareLogs(tt.entry.(*inmem.Entry).Logs(), c.Expected.GetLogs()) { - t.Fatal("bad3") - } - if !testutil.CompareBlocks(tt.blockTracker.BlocksBlocked(), c.Expected.ToBlocks()) { - t.Fatal("bad") + t.Fatal("incorrect removed logs") + } } }) } @@ -759,7 +559,7 @@ func TestTrackerReconcile(t *testing.T) { type mockClientWithLimit struct { limit uint64 - testutil.MockClient + *testutil.MockClient } func (m *mockClientWithLimit) GetLogs(filter *ethgo.LogFilter) ([]*ethgo.Log, error) { @@ -800,19 +600,19 @@ func TestTooMuchDataRequested(t *testing.T) { mm := &mockClientWithLimit{ limit: 3, - MockClient: *m, + MockClient: m, } config := DefaultConfig() config.BatchSize = 11 tt, _ := NewTracker(mm, - WithFilter(&FilterConfig{Async: true}), + WithFilter(&FilterConfig{}), ) if err := tt.Sync(context.Background()); err != nil { t.Fatal(err) } - if count != len(tt.entry.(*inmem.Entry).Logs()) { + if count != len(tt.entry.(*inmemEntry).Logs()) { t.Fatal("not the same count") } } diff --git a/event-indexer/inmem_store.go b/event-indexer/inmem_store.go new file mode 100644 index 00000000..bd9099fe --- /dev/null +++ b/event-indexer/inmem_store.go @@ -0,0 +1,88 @@ +package indexer + +import ( + "math/big" + "sync" + + "github.com/umbracle/ethgo" +) + +var _ Entry = (*inmemEntry)(nil) + +// NewInmemStore returns a new in-memory store. +func NewInmemStore() Entry { + e := &inmemEntry{ + logs: []*ethgo.Log{}, + } + return e +} + +// Entry is a store.Entry implementation +type inmemEntry struct { + l sync.RWMutex + logs []*ethgo.Log + last *ethgo.Block +} + +func (e *inmemEntry) Close() error { + return nil +} + +// LastIndex implements the store interface +func (e *inmemEntry) LastIndex() (uint64, error) { + e.l.Lock() + defer e.l.Unlock() + return uint64(len(e.logs)), nil +} + +// Logs returns the logs of the inmemory store +func (e *inmemEntry) Logs() []*ethgo.Log { + return e.logs +} + +func (e *inmemEntry) GetLastBlock() (*ethgo.Block, error) { + e.l.Lock() + defer e.l.Unlock() + + last := e.last + if last == nil { + return nil, nil + } + last = last.Copy() + return last, nil +} + +func (e *inmemEntry) StoreEvent(evnt *Event) error { + e.l.Lock() + defer e.l.Unlock() + + if evnt.Indx >= 0 { + // remove logs + e.logs = e.logs[:evnt.Indx] + } + // append new logs + e.storeLogs(evnt.Added) + + if evnt.Block != nil { + b := evnt.Block + + if b.Difficulty == nil { + b.Difficulty = big.NewInt(0) + } + e.last = b.Copy() + } + return nil +} + +func (e *inmemEntry) storeLogs(logs []*ethgo.Log) { + e.logs = append(e.logs, logs...) +} + +// GetLog implements the store interface +func (e *inmemEntry) GetLog(indx uint64, log *ethgo.Log) error { + e.l.Lock() + defer e.l.Unlock() + + *log = *e.logs[indx] + return nil +} diff --git a/event-indexer/inmem_store_test.go b/event-indexer/inmem_store_test.go new file mode 100644 index 00000000..4184b7dc --- /dev/null +++ b/event-indexer/inmem_store_test.go @@ -0,0 +1,11 @@ +package indexer + +import ( + "testing" +) + +func TestInMemoryStore(t *testing.T) { + TestStore(t, func(t *testing.T) (Entry, func()) { + return NewInmemStore(), func() {} + }) +} diff --git a/event-indexer/store.go b/event-indexer/store.go new file mode 100644 index 00000000..f1eca6fe --- /dev/null +++ b/event-indexer/store.go @@ -0,0 +1,21 @@ +package indexer + +import "github.com/umbracle/ethgo" + +// Entry is a filter entry in the store +type Entry interface { + // LastIndex returns index of the last stored event + LastIndex() (uint64, error) + + // Closes closes the connection with the entry + Close() error + + // StoreEvent stores an event that include added and removed logs + StoreEvent(evnt *Event) error + + // GetLog returns the valid log at indx + GetLog(indx uint64, log *ethgo.Log) error + + // GetLastBlock returns the last block processed + GetLastBlock() (*ethgo.Block, error) +} diff --git a/event-indexer/store_testing.go b/event-indexer/store_testing.go new file mode 100644 index 00000000..4549d5ac --- /dev/null +++ b/event-indexer/store_testing.go @@ -0,0 +1,93 @@ +package indexer + +import ( + "reflect" + "testing" + + "github.com/umbracle/ethgo" +) + +// SetupDB is a function that creates a backend +type SetupDB func(t *testing.T) (Entry, func()) + +// TestStore tests a tracker store +func TestStore(t *testing.T, setup SetupDB) { + testRemoveLogs(t, setup) + testStoreLogs(t, setup) +} + +func testStoreLogs(t *testing.T, setup SetupDB) { + entry, close := setup(t) + defer close() + + indx, err := entry.LastIndex() + if err != nil { + t.Fatal(err) + } + if indx != 0 { + t.Fatal("index should be zero") + } + + log := ethgo.Log{ + BlockNumber: 10, + } + if err := entry.StoreEvent(&Event{Added: []*ethgo.Log{&log}, Indx: -1}); err != nil { + t.Fatal(err) + } + + indx, err = entry.LastIndex() + if err != nil { + t.Fatal(err) + } + if indx != 1 { + t.Fatal("index should be one") + } + + var log2 ethgo.Log + if err := entry.GetLog(0, &log2); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(log, log2) { + t.Fatal("bad") + } +} + +func testRemoveLogs(t *testing.T, setup SetupDB) { + entry, close := setup(t) + defer close() + + logs := []*ethgo.Log{} + for i := uint64(0); i < 10; i++ { + logs = append(logs, ðgo.Log{ + BlockNumber: i, + }) + } + + if err := entry.StoreEvent(&Event{Added: logs, Indx: -1}); err != nil { + t.Fatal(err) + } + + if err := entry.StoreEvent(&Event{Indx: 5}); err != nil { + t.Fatal(err) + } + + indx, err := entry.LastIndex() + if err != nil { + t.Fatal(err) + } + if indx != 5 { + t.Fatalf("index should be 5 but found %d", indx) + } + + // add again the values + if err := entry.StoreEvent(&Event{Added: logs[5:], Indx: -1}); err != nil { + t.Fatal(err) + } + indx, err = entry.LastIndex() + if err != nil { + t.Fatal(err) + } + if indx != 10 { + t.Fatalf("index should be 10 but found %d", indx) + } +} diff --git a/go.mod b/go.mod index 58e223c9..753b38a1 100644 --- a/go.mod +++ b/go.mod @@ -13,15 +13,13 @@ require ( github.com/containerd/continuity v0.0.0-20191214063359-1097c8bae83b // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.4.0 // indirect - github.com/go-sql-driver/mysql v1.4.1 // indirect github.com/google/go-cmp v0.3.1 // indirect github.com/gorilla/websocket v1.4.1 github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect - github.com/jmoiron/sqlx v1.2.0 github.com/klauspost/compress v1.4.1 // indirect github.com/klauspost/cpuid v1.2.0 // indirect github.com/kr/pretty v0.1.0 // indirect - github.com/lib/pq v1.2.0 + github.com/lib/pq v1.10.4 // indirect github.com/mitchellh/mapstructure v1.1.2 github.com/opencontainers/image-spec v1.0.1 // indirect github.com/opencontainers/runc v0.1.1 // indirect @@ -35,7 +33,6 @@ require ( golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 // indirect golang.org/x/text v0.3.2 - google.golang.org/appengine v1.6.5 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gotest.tools v2.2.0+incompatible // indirect ) diff --git a/go.sum b/go.sum index 0d2708fc..e8363935 100644 --- a/go.sum +++ b/go.sum @@ -37,11 +37,7 @@ github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= -github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= @@ -54,8 +50,6 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= -github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA= -github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= @@ -71,11 +65,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= -github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4= -github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk= +github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -131,7 +122,6 @@ golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 h1:fHDIZ2oxGnUZRN6WgWFCbYBjH9uqVPRCUVUDhs0wnbA= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -148,8 +138,6 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= -google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= diff --git a/tracker/README.md b/tracker/README.md deleted file mode 100644 index 027638b8..00000000 --- a/tracker/README.md +++ /dev/null @@ -1,139 +0,0 @@ - -# Tracker - -``` -package main - -import ( - "context" - "encoding/binary" - "flag" - "fmt" - "os" - "os/signal" - "syscall" - - "github.com/umbracle/ethgo" - "github.com/umbracle/ethgo/abi" - "github.com/umbracle/ethgo/jsonrpc" - "github.com/umbracle/ethgo/tracker" - - boltdbStore "github.com/umbracle/ethgo/tracker/store/boltdb" -) - -var depositEvent = abi.MustNewEvent(`DepositEvent( - bytes pubkey, - bytes whitdrawalcred, - bytes amount, - bytes signature, - bytes index -)`) - -func main() { - var endpoint string - var target string - - flag.StringVar(&endpoint, "endpoint", "", "") - flag.StringVar(&target, "target", "", "") - - flag.Parse() - - provider, err := jsonrpc.NewClient(endpoint) - if err != nil { - fmt.Printf("[ERR]: %v", err) - os.Exit(1) - } - - store, err := boltdbStore.New("deposit.db") - if err != nil { - fmt.Printf("[ERR]: failted to start store %v", err) - os.Exit(1) - } - - tt, err := tracker.NewTracker(provider.Eth(), - tracker.WithBatchSize(20000), - tracker.WithStore(store), - tracker.WithEtherscan(os.Getenv("ETHERSCAN_APIKEY")), - tracker.WithFilter(&tracker.FilterConfig{ - Async: true, - Address: []ethgo.Address{ - ethgo.HexToAddress(target), - }, - }), - ) - if err != nil { - fmt.Printf("[ERR]: failed to create the tracker %v", err) - os.Exit(1) - } - - lastBlock, err := tt.GetLastBlock() - if err != nil { - fmt.Printf("[ERR]: failed to get last block %v", err) - os.Exit(1) - } - if lastBlock != nil { - fmt.Printf("Last block processed: %d\n", lastBlock.Number) - } - - ctx, cancelFn := context.WithCancel(context.Background()) - go func() { - go func() { - if err := tt.Sync(ctx); err != nil { - fmt.Printf("[ERR]: %v", err) - } - }() - - go func() { - for { - select { - case evnt := <-tt.EventCh: - for _, log := range evnt.Added { - if depositEvent.Match(log) { - vals, err := depositEvent.ParseLog(log) - if err != nil { - panic(err) - } - - index := binary.LittleEndian.Uint64(vals["index"].([]byte)) - amount := binary.LittleEndian.Uint64(vals["amount"].([]byte)) - - fmt.Printf("Deposit: Block %d Index %d Amount %d\n", log.BlockNumber, index, amount) - } - } - case <-tt.DoneCh: - fmt.Println("historical sync done") - } - } - }() - - }() - - handleSignals(cancelFn) -} - -func handleSignals(cancelFn context.CancelFunc) int { - signalCh := make(chan os.Signal, 4) - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) - - <-signalCh - - gracefulCh := make(chan struct{}) - go func() { - cancelFn() - close(gracefulCh) - }() - - select { - case <-signalCh: - return 1 - case <-gracefulCh: - return 0 - } -} -``` - -You can query the ETH2.0 Deposit contract like so: - -``` -go run main.go --endpoint https://mainnet.infura.io/v3/... --target 0x00000000219ab540356cbb839cbe05303d7705fa -``` diff --git a/tracker/store/inmem/inmem_store.go b/tracker/store/inmem/inmem_store.go deleted file mode 100644 index 5baef7a6..00000000 --- a/tracker/store/inmem/inmem_store.go +++ /dev/null @@ -1,117 +0,0 @@ -package inmem - -import ( - "strings" - "sync" - - "github.com/umbracle/ethgo" - "github.com/umbracle/ethgo/tracker/store" -) - -var _ store.Store = (*InmemStore)(nil) - -// InmemStore implements the Store interface. -type InmemStore struct { - l sync.RWMutex - entries map[string]*Entry - kv map[string]string -} - -// NewInmemStore returns a new in-memory store. -func NewInmemStore() *InmemStore { - return &InmemStore{ - entries: map[string]*Entry{}, - kv: map[string]string{}, - } -} - -// Close implements the store interface -func (i *InmemStore) Close() error { - return nil -} - -// Get implements the store interface -func (i *InmemStore) Get(k string) (string, error) { - i.l.Lock() - defer i.l.Unlock() - return i.kv[string(k)], nil -} - -// ListPrefix implements the store interface -func (i *InmemStore) ListPrefix(prefix string) ([]string, error) { - i.l.Lock() - defer i.l.Unlock() - - res := []string{} - for k, v := range i.kv { - if strings.HasPrefix(k, prefix) { - res = append(res, v) - } - } - return res, nil -} - -// Set implements the store interface -func (i *InmemStore) Set(k, v string) error { - i.l.Lock() - defer i.l.Unlock() - i.kv[string(k)] = v - return nil -} - -// GetEntry implements the store interface -func (i *InmemStore) GetEntry(hash string) (store.Entry, error) { - i.l.Lock() - defer i.l.Unlock() - e, ok := i.entries[hash] - if ok { - return e, nil - } - e = &Entry{ - logs: []*ethgo.Log{}, - } - i.entries[hash] = e - return e, nil -} - -// Entry is a store.Entry implementation -type Entry struct { - l sync.RWMutex - logs []*ethgo.Log -} - -// LastIndex implements the store interface -func (e *Entry) LastIndex() (uint64, error) { - e.l.Lock() - defer e.l.Unlock() - return uint64(len(e.logs)), nil -} - -// Logs returns the logs of the inmemory store -func (e *Entry) Logs() []*ethgo.Log { - return e.logs -} - -// StoreLogs implements the store interface -func (e *Entry) StoreLogs(logs []*ethgo.Log) error { - e.l.Lock() - defer e.l.Unlock() - for _, log := range logs { - e.logs = append(e.logs, log) - } - return nil -} - -// RemoveLogs implements the store interface -func (e *Entry) RemoveLogs(indx uint64) error { - e.l.Lock() - defer e.l.Unlock() - e.logs = e.logs[:indx] - return nil -} - -// GetLog implements the store interface -func (e *Entry) GetLog(indx uint64, log *ethgo.Log) error { - *log = *e.logs[indx] - return nil -} diff --git a/tracker/store/inmem/inmem_store_test.go b/tracker/store/inmem/inmem_store_test.go deleted file mode 100644 index 7a559bb1..00000000 --- a/tracker/store/inmem/inmem_store_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package inmem - -import ( - "testing" - - "github.com/umbracle/ethgo/tracker/store" -) - -func TestInMemoryStore(t *testing.T) { - store.TestStore(t, func(t *testing.T) (store.Store, func()) { - return NewInmemStore(), func() {} - }) -} diff --git a/tracker/store/postgresql/postgresql_store.go b/tracker/store/postgresql/postgresql_store.go deleted file mode 100644 index 71f1842c..00000000 --- a/tracker/store/postgresql/postgresql_store.go +++ /dev/null @@ -1,239 +0,0 @@ -package trackerpostgresql - -import ( - "database/sql" - "encoding/hex" - "fmt" - "strings" - - "github.com/jmoiron/sqlx" - "github.com/umbracle/ethgo" - "github.com/umbracle/ethgo/tracker/store" - - // Enable postgres for sqlx - _ "github.com/lib/pq" -) - -var _ store.Store = (*PostgreSQLStore)(nil) - -// PostgreSQLStore is a tracker store implementation that uses PostgreSQL as a backend. -type PostgreSQLStore struct { - db *sqlx.DB -} - -// NewPostgreSQLStore creates a new PostgreSQL store -func NewPostgreSQLStore(endpoint string) (*PostgreSQLStore, error) { - db, err := sql.Open("postgres", endpoint) - if err != nil { - return nil, err - } - return NewSQLStore(db, "postgres") -} - -// NewSQLStore creates a new store with an sql driver -func NewSQLStore(db *sql.DB, driver string) (*PostgreSQLStore, error) { - sqlxDB := sqlx.NewDb(db, driver) - - // create the kv database if it does not exists - if _, err := db.Exec(kvSQLSchema); err != nil { - return nil, err - } - return &PostgreSQLStore{db: sqlxDB}, nil -} - -// Close implements the store interface -func (p *PostgreSQLStore) Close() error { - return p.db.Close() -} - -// Get implements the store interface -func (p *PostgreSQLStore) Get(k string) (string, error) { - var out string - if err := p.db.Get(&out, "SELECT val FROM kv WHERE key=$1", string(k)); err != nil { - if err == sql.ErrNoRows { - return "", nil - } - return "", err - } - return out, nil -} - -// ListPrefix implements the store interface -func (p *PostgreSQLStore) ListPrefix(prefix string) ([]string, error) { - var out []string - if err := p.db.Select(&out, "SELECT val FROM kv WHERE key LIKE $1", string(prefix)+"%"); err != nil { - return nil, err - } - return out, nil -} - -// Set implements the store interface -func (p *PostgreSQLStore) Set(k, v string) error { - if _, err := p.db.Exec("INSERT INTO kv (key, val) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET val = $2", k, v); err != nil { - return err - } - return nil -} - -// GetEntry implements the store interface -func (p *PostgreSQLStore) GetEntry(hash string) (store.Entry, error) { - tableName := "logs_" + hash - if _, err := p.db.Exec(logSQLSchema(tableName)); err != nil { - return nil, err - } - e := &Entry{ - table: tableName, - db: p.db, - } - return e, nil -} - -// Entry is an store.Entry implementation -type Entry struct { - table string - db *sqlx.DB -} - -// LastIndex implements the store interface -func (e *Entry) LastIndex() (uint64, error) { - var index uint64 - if err := e.db.Get(&index, "SELECT indx FROM "+e.table+" ORDER BY indx DESC LIMIT 1"); err != nil { - if err == sql.ErrNoRows { - return 0, nil - } - return 0, err - } - return index + 1, nil -} - -// StoreLogs implements the store interface -func (e *Entry) StoreLogs(logs []*ethgo.Log) error { - lastIndex, err := e.LastIndex() - if err != nil { - return err - } - - tx, err := e.db.Beginx() - if err != nil { - return err - } - defer tx.Rollback() - - query := "INSERT INTO " + e.table + " (indx, tx_index, tx_hash, block_num, block_hash, address, data, topics) VALUES (:indx, :tx_index, :tx_hash, :block_num, :block_hash, :address, :data, :topics)" - - for indx, log := range logs { - topics := []string{} - for _, topic := range log.Topics { - topics = append(topics, topic.String()) - } - obj := &logObj{ - Index: lastIndex + uint64(indx), - TxIndex: log.TransactionIndex, - TxHash: log.TransactionHash.String(), - BlockNum: log.BlockNumber, - BlockHash: log.BlockHash.String(), - Address: log.Address.String(), - Topics: strings.Join(topics, ","), - } - if log.Data != nil { - obj.Data = "0x" + hex.EncodeToString(log.Data) - } - - if _, err := tx.NamedExec(query, obj); err != nil { - return err - } - } - if err := tx.Commit(); err != nil { - return err - } - return nil -} - -// RemoveLogs implements the store interface -func (e *Entry) RemoveLogs(indx uint64) error { - if _, err := e.db.Exec("DELETE FROM "+e.table+" WHERE indx >= $1", indx); err != nil { - return err - } - return nil -} - -// GetLog implements the store interface -func (e *Entry) GetLog(indx uint64, log *ethgo.Log) error { - obj := logObj{} - if err := e.db.Get(&obj, "SELECT * FROM "+e.table+" WHERE indx=$1", indx); err != nil { - return err - } - - log.TransactionIndex = obj.TxIndex - if err := log.TransactionHash.UnmarshalText([]byte(obj.TxHash)); err != nil { - return err - } - log.BlockNumber = obj.BlockNum - if err := log.BlockHash.UnmarshalText([]byte(obj.BlockHash)); err != nil { - return err - } - if err := log.Address.UnmarshalText([]byte(obj.Address)); err != nil { - return err - } - - if obj.Topics != "" { - log.Topics = []ethgo.Hash{} - for _, item := range strings.Split(obj.Topics, ",") { - var topic ethgo.Hash - if err := topic.UnmarshalText([]byte(item)); err != nil { - return err - } - log.Topics = append(log.Topics, topic) - } - } else { - log.Topics = nil - } - - if obj.Data != "" { - if !strings.HasPrefix(obj.Data, "0x") { - return fmt.Errorf("0x prefix not found in data") - } - buf, err := hex.DecodeString(obj.Data[2:]) - if err != nil { - return err - } - log.Data = buf - } else { - log.Data = nil - } - - return nil -} - -type logObj struct { - Index uint64 `db:"indx"` - TxIndex uint64 `db:"tx_index"` - TxHash string `db:"tx_hash"` - BlockNum uint64 `db:"block_num"` - BlockHash string `db:"block_hash"` - Address string `db:"address"` - Topics string `db:"topics"` - Data string `db:"data"` -} - -var kvSQLSchema = ` -CREATE TABLE IF NOT EXISTS kv ( - key text unique, - val text -); -` - -func logSQLSchema(name string) string { - return ` - CREATE TABLE IF NOT EXISTS ` + name + ` ( - indx numeric, - tx_index numeric, - tx_hash text, - block_num numeric, - block_hash text, - address text, - topics text, - data text - ); - ` -} diff --git a/tracker/store/postgresql/postgresql_store_test.go b/tracker/store/postgresql/postgresql_store_test.go deleted file mode 100644 index 61a31ab0..00000000 --- a/tracker/store/postgresql/postgresql_store_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package trackerpostgresql - -import ( - "database/sql" - "fmt" - "testing" - - "github.com/ory/dockertest" - "github.com/umbracle/ethgo/tracker/store" -) - -func setupDB(t *testing.T) (store.Store, func()) { - pool, err := dockertest.NewPool("") - if err != nil { - t.Fatalf("Could not connect to docker: %s", err) - } - - resource, err := pool.Run("postgres", "latest", []string{"POSTGRES_HOST_AUTH_METHOD=trust"}) - if err != nil { - t.Fatalf("Could not start resource: %s", err) - } - - endpoint := fmt.Sprintf("postgres://postgres@localhost:%s/postgres?sslmode=disable", resource.GetPort("5432/tcp")) - - // wait for the db to be running - if err := pool.Retry(func() error { - db, err := sql.Open("postgres", endpoint) - if err != nil { - return err - } - return db.Ping() - }); err != nil { - t.Fatal(err) - } - - cleanup := func() { - if err := pool.Purge(resource); err != nil { - t.Fatalf("Could not purge resource: %s", err) - } - } - - store, err := NewPostgreSQLStore(endpoint) - if err != nil { - t.Fatal(err) - } - return store, cleanup -} - -func TestPostgreSQLStore(t *testing.T) { - store.TestStore(t, setupDB) -} diff --git a/tracker/store/store.go b/tracker/store/store.go deleted file mode 100644 index c7ac4761..00000000 --- a/tracker/store/store.go +++ /dev/null @@ -1,36 +0,0 @@ -package store - -import "github.com/umbracle/ethgo" - -// Store is a datastore for the tracker -type Store interface { - // Get gets a value - Get(k string) (string, error) - - // ListPrefix lists values by prefix - ListPrefix(prefix string) ([]string, error) - - // Set sets a value - Set(k, v string) error - - // Close closes the store - Close() error - - // GetEntry returns a specific entry - GetEntry(hash string) (Entry, error) -} - -// Entry is a filter entry in the store -type Entry interface { - // LastIndex returns index of the last stored event - LastIndex() (uint64, error) - - // StoreLogs stores the web3 logs of the event - StoreLogs(logs []*ethgo.Log) error - - // RemoveLogs all the logs starting at index 'indx' - RemoveLogs(indx uint64) error - - // GetLog returns the log at indx - GetLog(indx uint64, log *ethgo.Log) error -} diff --git a/tracker/store/testing.go b/tracker/store/testing.go deleted file mode 100644 index a1db026a..00000000 --- a/tracker/store/testing.go +++ /dev/null @@ -1,242 +0,0 @@ -package store - -import ( - "reflect" - "testing" - - "github.com/umbracle/ethgo" -) - -// SetupDB is a function that creates a backend -type SetupDB func(t *testing.T) (Store, func()) - -// TestStore tests a tracker store -func TestStore(t *testing.T, setup SetupDB) { - testMultipleStores(t, setup) - testGetSet(t, setup) - testRemoveLogs(t, setup) - testStoreLogs(t, setup) - testPrefix(t, setup) -} - -func testMultipleStores(t *testing.T, setup SetupDB) { - store, close := setup(t) - defer close() - - entry0, err := store.GetEntry("0") - if err != nil { - t.Fatal(err) - } - log := ethgo.Log{ - BlockNumber: 10, - } - if err := entry0.StoreLogs([]*ethgo.Log{&log}); err != nil { - t.Fatal(err) - } - - entry1, err := store.GetEntry("1") - if err != nil { - t.Fatal(err) - } - log = ethgo.Log{ - BlockNumber: 15, - } - if err := entry1.StoreLogs([]*ethgo.Log{&log}); err != nil { - t.Fatal(err) - } - - index0, err := entry0.LastIndex() - if err != nil { - t.Fatal(err) - } - if index0 != 1 { - t.Fatal("bad") - } - - index1, err := entry1.LastIndex() - if err != nil { - t.Fatal(err) - } - if index1 != 1 { - t.Fatal("bad") - } -} - -func testPrefix(t *testing.T, setup SetupDB) { - store, close := setup(t) - defer close() - - v1 := "val1" - v2 := "val2" - v3 := "val3" - - // add same prefix values - if err := store.Set(v1, v1); err != nil { - t.Fatal(err) - } - if err := store.Set(v2, v2); err != nil { - t.Fatal(err) - } - if err := store.Set(v3, v3); err != nil { - t.Fatal(err) - } - - // add distinct value - if err := store.Set("a", "b"); err != nil { - t.Fatal(err) - } - - checkPrefix := func(prefix string, expected int) { - res, err := store.ListPrefix(prefix) - if err != nil { - t.Fatal(err) - } - if len(res) != expected { - t.Fatalf("%d values expected for prefix '%s' but %d found", expected, string(prefix), len(res)) - } - } - - checkPrefix("val", 3) - checkPrefix("a", 1) - checkPrefix("b", 0) -} - -func testGetSet(t *testing.T, setup SetupDB) { - store, close := setup(t) - defer close() - - k1 := "k1" - v1 := "v1" - v2 := "v2" - - res, err := store.Get(k1) - if err != nil { - t.Fatal(err) - } - if len(res) != 0 { - t.Fatal("expected empty") - } - - // set the entry - if err := store.Set(k1, v1); err != nil { - t.Fatal(err) - } - res, err = store.Get(k1) - if err != nil { - t.Fatal(err) - } - if res != v1 { - t.Fatal("bad") - } - - // update the entry - if err := store.Set(k1, v2); err != nil { - t.Fatal(err) - } - res, err = store.Get(k1) - if err != nil { - t.Fatal(err) - } - if res != v2 { - t.Fatal("bad") - } -} - -func testStoreLogs(t *testing.T, setup SetupDB) { - store, close := setup(t) - defer close() - - entry, err := store.GetEntry("1") - if err != nil { - t.Fatal(err) - } - - indx, err := entry.LastIndex() - if err != nil { - t.Fatal(err) - } - if indx != 0 { - t.Fatal("index should be zero") - } - - log := ethgo.Log{ - BlockNumber: 10, - } - if err := entry.StoreLogs([]*ethgo.Log{&log}); err != nil { - t.Fatal(err) - } - - indx, err = entry.LastIndex() - if err != nil { - t.Fatal(err) - } - if indx != 1 { - t.Fatal("index should be one") - } - - var log2 ethgo.Log - if err := entry.GetLog(0, &log2); err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(log, log2) { - t.Fatal("bad") - } - - // retrieve entry again - entry1, err := store.GetEntry("1") - if err != nil { - t.Fatal(err) - } - indx1, err := entry1.LastIndex() - if err != nil { - t.Fatal(err) - } - if indx1 != indx { - t.Fatal("bad last index") - } -} - -func testRemoveLogs(t *testing.T, setup SetupDB) { - store, close := setup(t) - defer close() - - logs := []*ethgo.Log{} - for i := uint64(0); i < 10; i++ { - logs = append(logs, ðgo.Log{ - BlockNumber: i, - }) - } - - entry, err := store.GetEntry("1") - if err != nil { - t.Fatal(err) - } - - if err := entry.StoreLogs(logs); err != nil { - t.Fatal(err) - } - - if err := entry.RemoveLogs(5); err != nil { - t.Fatal(err) - } - - indx, err := entry.LastIndex() - if err != nil { - t.Fatal(err) - } - if indx != 5 { - t.Fatal("bad") - } - - // add again the values - if err := entry.StoreLogs(logs[5:]); err != nil { - t.Fatal(err) - } - indx, err = entry.LastIndex() - if err != nil { - t.Fatal(err) - } - if indx != 10 { - t.Fatal("bad") - } -} diff --git a/tracker/tracker.go b/tracker/tracker.go deleted file mode 100644 index ab11b2e6..00000000 --- a/tracker/tracker.go +++ /dev/null @@ -1,884 +0,0 @@ -package tracker - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "encoding/json" - "fmt" - "io/ioutil" - "log" - "math/big" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/umbracle/ethgo" - "github.com/umbracle/ethgo/blocktracker" - "github.com/umbracle/ethgo/etherscan" - "github.com/umbracle/ethgo/jsonrpc/codec" - "github.com/umbracle/ethgo/tracker/store" - "github.com/umbracle/ethgo/tracker/store/inmem" -) - -var ( - dbGenesis = "genesis" - dbChainID = "chainID" - dbLastBlock = "lastBlock" - dbFilter = "filter" -) - -const ( - defaultMaxBlockBacklog = 10 - defaultBatchSize = 100 -) - -// FilterConfig is a tracker filter configuration -type FilterConfig struct { - Address []ethgo.Address `json:"address"` - Topics [][]*ethgo.Hash `json:"topics"` - Start uint64 - Hash string - Async bool -} - -func (f *FilterConfig) buildHash() { - h := sha256.New() - for _, i := range f.Address { - h.Write([]byte(i.String())) - } - for _, topics := range f.Topics { - if topics == nil { - h.Write([]byte("empty")) - - continue - } - - for _, innerTopic := range topics { - if innerTopic == nil { - h.Write([]byte("empty")) - - continue - } - - h.Write([]byte(innerTopic.String())) - } - } - f.Hash = hex.EncodeToString(h.Sum(nil)) -} - -func (f *FilterConfig) getFilterSearch() *ethgo.LogFilter { - filter := ðgo.LogFilter{} - if len(f.Address) != 0 { - filter.Address = f.Address - } - if len(f.Topics) != 0 { - filter.Topics = f.Topics - } - return filter -} - -// Config is the configuration of the tracker -type Config struct { - BatchSize uint64 - BlockTracker *blocktracker.BlockTracker // move to interface - EtherscanAPIKey string - Filter *FilterConfig - Store store.Store -} - -type ConfigOption func(*Config) - -func WithBatchSize(b uint64) ConfigOption { - return func(c *Config) { - c.BatchSize = b - } -} - -func WithBlockTracker(b *blocktracker.BlockTracker) ConfigOption { - return func(c *Config) { - c.BlockTracker = b - } -} - -func WithStore(s store.Store) ConfigOption { - return func(c *Config) { - c.Store = s - } -} - -func WithFilter(f *FilterConfig) ConfigOption { - return func(c *Config) { - c.Filter = f - } -} - -func WithEtherscan(k string) ConfigOption { - return func(c *Config) { - c.EtherscanAPIKey = k - } -} - -// DefaultConfig returns the default tracker config -func DefaultConfig() *Config { - return &Config{ - BatchSize: defaultBatchSize, - Store: inmem.NewInmemStore(), - Filter: &FilterConfig{}, - EtherscanAPIKey: "", - } -} - -// Provider are the eth1x methods required by the tracker -type Provider interface { - BlockNumber() (uint64, error) - GetBlockByHash(hash ethgo.Hash, full bool) (*ethgo.Block, error) - GetBlockByNumber(i ethgo.BlockNumber, full bool) (*ethgo.Block, error) - GetLogs(filter *ethgo.LogFilter) ([]*ethgo.Log, error) - ChainID() (*big.Int, error) -} - -// Tracker is a contract event tracker -type Tracker struct { - logger *log.Logger - provider Provider - config *Config - store store.Store - entry store.Entry - preSyncOnce sync.Once - blockTracker *blocktracker.BlockTracker - synced int32 - BlockCh chan *blocktracker.BlockEvent - ReadyCh chan struct{} - SyncCh chan uint64 - EventCh chan *Event - DoneCh chan struct{} -} - -// NewTracker creates a new tracker -func NewTracker(provider Provider, opts ...ConfigOption) (*Tracker, error) { - config := DefaultConfig() - for _, opt := range opts { - opt(config) - } - - t := &Tracker{ - provider: provider, - config: config, - BlockCh: make(chan *blocktracker.BlockEvent, 1), - logger: log.New(ioutil.Discard, "", log.LstdFlags), - ReadyCh: make(chan struct{}), - store: config.Store, - blockTracker: config.BlockTracker, - DoneCh: make(chan struct{}, 1), - EventCh: make(chan *Event), - SyncCh: make(chan uint64, 1), - synced: 0, - } - if err := t.setupFilter(); err != nil { - return nil, err - } - return t, nil -} - -// NewFilter creates a new log filter -func (t *Tracker) setupFilter() error { - if t.config.Filter == nil { - // generic config - t.config.Filter = &FilterConfig{} - } - - // generate a random hash if not provided - if t.config.Filter.Hash == "" { - t.config.Filter.buildHash() - } - - entry, err := t.store.GetEntry(t.config.Filter.Hash) - if err != nil { - return err - } - t.entry = entry - - // insert the filter config in the db - filterKey := dbFilter + "_" + t.config.Filter.Hash - data, err := t.store.Get(filterKey) - if err != nil { - return err - } - if data == "" { - raw, err := json.Marshal(t.config.Filter) - if err != nil { - return err - } - rawStr := hex.EncodeToString(raw) - if err := t.store.Set(filterKey, rawStr); err != nil { - return err - } - } - return nil -} - -func (t *Tracker) Entry() store.Entry { - return t.entry -} - -// GetLastBlock returns the last block processed for this filter -func (t *Tracker) GetLastBlock() (*ethgo.Block, error) { - buf, err := t.store.Get(dbLastBlock + "_" + t.config.Filter.Hash) - if err != nil { - return nil, err - } - if len(buf) == 0 { - return nil, nil - } - raw, err := hex.DecodeString(buf) - if err != nil { - return nil, err - } - b := ðgo.Block{} - if err := b.UnmarshalJSON(raw); err != nil { - return nil, err - } - return b, nil -} - -func (t *Tracker) storeLastBlock(b *ethgo.Block) error { - if b.Difficulty == nil { - b.Difficulty = big.NewInt(0) - } - buf, err := b.MarshalJSON() - if err != nil { - return err - } - raw := hex.EncodeToString(buf) - return t.store.Set(dbLastBlock+"_"+t.config.Filter.Hash, raw) -} - -func (t *Tracker) emitEvent(evnt *Event) { - if evnt == nil { - return - } - if t.config.Filter.Async { - select { - case t.EventCh <- evnt: - default: - } - } else { - t.EventCh <- evnt - } -} - -// IsSynced returns true if the filter is synced to head -func (t *Tracker) IsSynced() bool { - return atomic.LoadInt32(&t.synced) != 0 -} - -// Wait waits the filter to finish -func (t *Tracker) Wait() { - t.WaitDuration(0) -} - -// WaitDuration waits for the filter to finish up to duration -func (t *Tracker) WaitDuration(dur time.Duration) error { - if t.IsSynced() { - return nil - } - - var waitCh <-chan time.Time - if dur == 0 { - waitCh = time.After(dur) - } - select { - case <-waitCh: - return fmt.Errorf("timeout") - case <-t.DoneCh: - } - return nil -} - -func (t *Tracker) findAncestor(block, pivot *ethgo.Block) (uint64, error) { - // block is part of a fork that is not the current head, find a common ancestor - // both block and pivot are at the same height - var err error - - for i := uint64(0); i < t.blockTracker.MaxBlockBacklog(); i++ { - if block.Number != pivot.Number { - return 0, fmt.Errorf("block numbers do not match") - } - if block.Hash == pivot.Hash { - // this is the common ancestor in both - return block.Number, nil - } - block, err = t.provider.GetBlockByHash(block.ParentHash, false) - if err != nil { - return 0, err - } - pivot, err = t.provider.GetBlockByHash(pivot.ParentHash, false) - if err != nil { - return 0, err - } - } - return 0, fmt.Errorf("the reorg is bigger than maxBlockBacklog %d", t.blockTracker.MaxBlockBacklog()) -} - -func (t *Tracker) emitLogs(typ EventType, logs []*ethgo.Log) { - evnt := &Event{} - if typ == EventAdd { - evnt.Added = logs - } - if typ == EventDel { - evnt.Removed = logs - } - t.emitEvent(evnt) -} - -func tooMuchDataRequestedError(err error) bool { - obj, ok := err.(*codec.ErrorObject) - if !ok { - return false - } - if obj.Message == "query returned more than 10000 results" { - return true - } - return false -} - -func (t *Tracker) syncBatch(ctx context.Context, from, to uint64) error { - query := t.config.Filter.getFilterSearch() - - batchSize := t.config.BatchSize - additiveFactor := uint64(float64(batchSize) * 0.10) - - i := from - -START: - dst := min(to, i+batchSize) - - query.SetFromUint64(i) - query.SetToUint64(dst) - - logs, err := t.provider.GetLogs(query) - if err != nil { - if tooMuchDataRequestedError(err) { - // multiplicative decrease - batchSize = batchSize / 2 - goto START - } - return err - } - - if t.SyncCh != nil { - select { - case t.SyncCh <- dst: - default: - } - } - - // add logs to the store - if err := t.entry.StoreLogs(logs); err != nil { - return err - } - t.emitLogs(EventAdd, logs) - - // update the last block entry - block, err := t.provider.GetBlockByNumber(ethgo.BlockNumber(dst), false) - if err != nil { - return err - } - if err := t.storeLastBlock(block); err != nil { - return err - } - - // check if the execution is over after each query batch - if err := ctx.Err(); err != nil { - return err - } - - i += batchSize + 1 - - // update the batchSize with additive increase - if batchSize < t.config.BatchSize { - batchSize = min(t.config.BatchSize, batchSize+additiveFactor) - } - - if i <= to { - goto START - } - return nil -} - -func (t *Tracker) preSyncCheck() error { - var err error - t.preSyncOnce.Do(func() { - err = t.preSyncCheckImpl() - }) - return err -} - -func (t *Tracker) preSyncCheckImpl() error { - rGenesis, err := t.provider.GetBlockByNumber(0, false) - if err != nil { - return err - } - rChainID, err := t.provider.ChainID() - if err != nil { - return err - } - - genesis, err := t.store.Get(dbGenesis) - if err != nil { - return err - } - chainID, err := t.store.Get(dbChainID) - if err != nil { - return err - } - if len(genesis) != 0 { - if genesis != rGenesis.Hash.String() { - return fmt.Errorf("bad genesis") - } - if chainID != rChainID.String() { - return fmt.Errorf("bad genesis") - } - } else { - if err := t.store.Set(dbGenesis, rGenesis.Hash.String()); err != nil { - return err - } - if err := t.store.Set(dbChainID, rChainID.String()); err != nil { - return err - } - } - return nil -} - -func (t *Tracker) fastTrack(filterConfig *FilterConfig) (*ethgo.Block, error) { - // Try to use first the user provided block if any - if filterConfig.Start != 0 { - bb, err := t.provider.GetBlockByNumber(ethgo.BlockNumber(filterConfig.Start), false) - if err != nil { - return nil, err - } - return bb, nil - } - - // Only possible if we filter addresses - if len(filterConfig.Address) == 0 { - return nil, nil - } - - if t.config.EtherscanAPIKey != "" { - chainID, err := t.provider.ChainID() - if err != nil { - return nil, err - } - - // get the etherscan instance for this chainID - e, err := etherscan.NewEtherscanFromNetwork(ethgo.Network(chainID.Uint64()), t.config.EtherscanAPIKey) - if err != nil { - // there is no etherscan api for this specific chainid - return nil, nil - } - - getAddress := func(addr ethgo.Address) (uint64, error) { - params := map[string]string{ - "address": addr.String(), - "fromBlock": "0", - "toBlock": "latest", - } - var out []map[string]interface{} - if err := e.Query("logs", "getLogs", &out, params); err != nil { - return 0, err - } - if len(out) == 0 { - return 0, nil - } - - cc, ok := out[0]["blockNumber"].(string) - if !ok { - return 0, fmt.Errorf("failed to cast blocknumber") - } - - num, err := parseUint64orHex(cc) - if err != nil { - return 0, err - } - return num, nil - } - - minBlock := ^uint64(0) // max uint64 - for _, addr := range filterConfig.Address { - num, err := getAddress(addr) - if err != nil { - return nil, err - } - if num < minBlock { - minBlock = num - } - } - - bb, err := t.provider.GetBlockByNumber(ethgo.BlockNumber(minBlock-1), false) - if err != nil { - return nil, err - } - return bb, nil - } - - return nil, nil -} - -func (t *Tracker) BatchSync(ctx context.Context) error { - if err := t.preSyncCheck(); err != nil { - return err - } - - if t.blockTracker == nil { - // run a specfic block tracker - t.blockTracker = blocktracker.NewBlockTracker(t.provider) - if err := t.blockTracker.Init(); err != nil { - return err - } - go t.blockTracker.Start() - go func() { - // track our stop - <-ctx.Done() - t.blockTracker.Close() - }() - } else { - // just try to init - if err := t.blockTracker.Init(); err != nil { - return err - } - } - - close(t.ReadyCh) - - if err := t.syncImpl(ctx); err != nil { - return err - } - - select { - case t.DoneCh <- struct{}{}: - default: - } - - atomic.StoreInt32(&t.synced, 1) - return nil -} - -// Sync syncs a specific filter -func (t *Tracker) Sync(ctx context.Context) error { - if err := t.BatchSync(ctx); err != nil { - return err - } - - // subscribe and sync - sub := t.blockTracker.Subscribe() - go func() { - for { - select { - case evnt := <-sub: - t.handleBlockEvnt(evnt) - case <-ctx.Done(): - return - } - } - }() - - return nil -} - -func (t *Tracker) syncImpl(ctx context.Context) error { - if err := t.preSyncCheck(); err != nil { - return err - } - - lock := t.blockTracker.AcquireLock() - defer func() { - if lock.Locked { - lock.Unlock() - } - }() - - // We only hold the lock when we sync the head (last MaxBackLogs) - // because we want to avoid changes in the head while we sync. - // We will only release the lock if we do a bulk sync since it can - // move the target block for the sync. - - lock.Lock() - if t.blockTracker.Len() == 0 { - return nil - } - - // get the current target - target := t.blockTracker.LastBlocked() - if target == nil { - return nil - } - targetNum := target.Number - - last, err := t.GetLastBlock() - if err != nil { - return err - } - if last == nil { - // Try to fast track to the valid block (if possible) - last, err = t.fastTrack(t.config.Filter) - if err != nil { - return fmt.Errorf("failed to fasttrack: %v", err) - } - if last != nil { - if err := t.storeLastBlock(last); err != nil { - return err - } - } - } else { - if last.Hash == target.Hash { - return nil - } - } - - // There might been a reorg when we stopped syncing last time, - // check that our 'beacon' block matches the one in the chain. - // If that is not the case, we consider beacon-maxBackLog our - // real origin point and remove any logs ahead of that point. - - var origin uint64 - if last != nil { - if last.Number > targetNum { - return fmt.Errorf("store is more advanced than the chain") - } - - pivot, err := t.provider.GetBlockByNumber(ethgo.BlockNumber(last.Number), false) - if err != nil { - return err - } - - if last.Number == targetNum { - origin = last.Number - } else { - origin = last.Number + 1 - } - - if pivot.Hash != last.Hash { - ancestor, err := t.findAncestor(last, pivot) - if err != nil { - return err - } - - origin = ancestor + 1 - logs, err := t.removeLogs(ancestor+1, nil) - if err != nil { - return err - } - t.emitLogs(EventDel, logs) - - last, err = t.provider.GetBlockByNumber(ethgo.BlockNumber(ancestor), false) - if err != nil { - return err - } - } - } - - step := targetNum - origin + 1 - if step > t.blockTracker.MaxBlockBacklog() { - // we are far (more than maxBackLog) from the target block - // Do a bulk sync with the eth_getLogs endpoint and get closer - // to the target block. - - for { - if origin > targetNum { - return fmt.Errorf("from (%d) higher than to (%d)", origin, targetNum) - } - if targetNum-origin+1 <= t.blockTracker.MaxBlockBacklog() { - break - } - - // release the lock - lock.Unlock() - - limit := targetNum - t.blockTracker.MaxBlockBacklog() - if err := t.syncBatch(ctx, origin, limit); err != nil { - return err - } - - origin = limit + 1 - - // lock again to reset the target block - lock.Lock() - targetNum = t.blockTracker.LastBlocked().Number - } - } - - // we are still holding the lock on the blocksLock so that we are sure - // that the targetNum has not changed - trackerBlocks := t.blockTracker.BlocksBlocked() - added := trackerBlocks[uint64(len(trackerBlocks))-1-(targetNum-origin):] - - evnt, err := t.doFilter(added, nil) - if err != nil { - return err - } - if evnt != nil { - t.emitEvent(evnt) - } - - // release the lock on the blocks - lock.Unlock() - return nil -} - -func (t *Tracker) removeLogs(number uint64, hash *ethgo.Hash) ([]*ethgo.Log, error) { - index, err := t.entry.LastIndex() - if err != nil { - return nil, err - } - if index == 0 { - return nil, nil - } - - var remove []*ethgo.Log - for { - elemIndex := index - 1 - - var log ethgo.Log - if err := t.entry.GetLog(elemIndex, &log); err != nil { - return nil, err - } - if log.BlockNumber == number { - if hash != nil && log.BlockHash != *hash { - break - } - } - if log.BlockNumber < number { - break - } - remove = append(remove, &log) - if elemIndex == 0 { - index = 0 - break - } - index = elemIndex - } - - if err := t.entry.RemoveLogs(index); err != nil { - return nil, err - } - return remove, nil -} - -func revertLogs(in []*ethgo.Log) (out []*ethgo.Log) { - for i := len(in) - 1; i >= 0; i-- { - out = append(out, in[i]) - } - return -} - -func (t *Tracker) handleBlockEvnt(blockEvnt *blocktracker.BlockEvent) error { - if blockEvnt == nil { - return nil - } - - // emit the block event - select { - case t.BlockCh <- blockEvnt: - default: - } - - if t.IsSynced() { - evnt, err := t.doFilter(blockEvnt.Added, blockEvnt.Removed) - if err != nil { - return err - } - if evnt != nil { - t.emitEvent(evnt) - } - } - return nil -} - -func (t *Tracker) doFilter(added []*ethgo.Block, removed []*ethgo.Block) (*Event, error) { - evnt := &Event{} - if len(removed) != 0 { - pivot := removed[0] - logs, err := t.removeLogs(pivot.Number, &pivot.Hash) - if err != nil { - return nil, err - } - evnt.Removed = append(evnt.Removed, revertLogs(logs)...) - } - - for _, block := range added { - // check logs for this blocks - query := t.config.Filter.getFilterSearch() - query.BlockHash = &block.Hash - - // We check the hash, we need to do a retry to let unsynced nodes get the block - var logs []*ethgo.Log - var err error - - for i := 0; i < 5; i++ { - logs, err = t.provider.GetLogs(query) - if err == nil { - break - } - time.Sleep(500 * time.Millisecond) - } - if err != nil { - return nil, err - } - - // add logs to the store - if err := t.entry.StoreLogs(logs); err != nil { - return nil, err - } - evnt.Added = append(evnt.Added, logs...) - } - - // store the last block as the new index - if err := t.storeLastBlock(added[len(added)-1]); err != nil { - return nil, err - } - return evnt, nil -} - -// EventType is the type of the event -type EventType int - -const ( - // EventAdd happens when a new event is included in the chain - EventAdd EventType = iota - // EventDel may happen when there is a reorg and a past event is deleted - EventDel -) - -// Event is an event emitted when a new log is included -type Event struct { - Type EventType - Added []*ethgo.Log - Removed []*ethgo.Log -} - -// BlockEvent is an event emitted when a new block is included -type BlockEvent struct { - Type EventType - Added []*ethgo.Block - Removed []*ethgo.Block -} - -func min(i, j uint64) uint64 { - if i < j { - return i - } - return j -} - -func parseUint64orHex(str string) (uint64, error) { - base := 10 - if strings.HasPrefix(str, "0x") { - str = str[2:] - base = 16 - } - return strconv.ParseUint(str, base, 64) -}