Skip to content

Commit

Permalink
start block to qeury operator info must be provided to NewOperatorsIn…
Browse files Browse the repository at this point in the history
…foServiceInMemory
  • Loading branch information
not4x217 committed Sep 16, 2024
1 parent 7e4891d commit 5b4a125
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
23 changes: 17 additions & 6 deletions services/operatorsinfo/operatorsinfo_inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package operatorsinfo

import (
"context"
"math/big"
"sync"

"github.com/Layr-Labs/eigensdk-go/chainio/clients/avsregistry"
Expand Down Expand Up @@ -56,6 +57,8 @@ func NewOperatorsInfoServiceInMemory(
avsRegistrySubscriber avsregistry.AvsRegistrySubscriber,
avsRegistryReader avsregistry.AvsRegistryReader,
logger logging.Logger,
queryOperators bool,
queryOperatorStartBlock *big.Int,
) *OperatorsInfoServiceInMemory {
queryC := make(chan query)
pkcs := &OperatorsInfoServiceInMemory{
Expand All @@ -71,12 +74,18 @@ func NewOperatorsInfoServiceInMemory(
// which requires querying the past events of the pubkey registration contract
wg := sync.WaitGroup{}
wg.Add(1)
pkcs.startServiceInGoroutine(ctx, queryC, &wg)
pkcs.startServiceInGoroutine(ctx, queryC, &wg, queryOperators, queryOperatorStartBlock)
wg.Wait()
return pkcs
}

func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(ctx context.Context, queryC <-chan query, wg *sync.WaitGroup) {
func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(
ctx context.Context,
queryC <-chan query,
wg *sync.WaitGroup,
queryOperators bool,
queryOperatorStartBlock *big.Int,
) {
go func() {

// TODO(samlaf): we should probably save the service in the logger itself and add it automatically to all logs
Expand All @@ -92,7 +101,9 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(ctx context.Con
ops.logger.Error("Fatal error opening websocket subscription for new socket registrations", "err", err, "service", "OperatorPubkeysServiceInMemory")
panic(err)
}
ops.queryPastRegisteredOperatorEventsAndFillDb(ctx)
if queryOperators {
ops.queryPastRegisteredOperatorEventsAndFillDb(ctx, queryOperatorStartBlock)
}
// The constructor can return after we have backfilled the db by querying the events of operators that have registered with the blsApkRegistry
// before the block at which we started the ws subscription above
wg.Done()
Expand Down Expand Up @@ -151,17 +162,17 @@ func (ops *OperatorsInfoServiceInMemory) startServiceInGoroutine(ctx context.Con
}()
}

func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb(ctx context.Context) {
func (ops *OperatorsInfoServiceInMemory) queryPastRegisteredOperatorEventsAndFillDb(ctx context.Context, startBlock *big.Int) {
// Querying with nil startBlock and stopBlock will return all events. It doesn't matter if we query some events that we will receive again in the websocket,
// since we will just overwrite the pubkey dict with the same values.
alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, err := ops.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, nil, nil)
alreadyRegisteredOperatorAddrs, alreadyRegisteredOperatorPubkeys, err := ops.avsRegistryReader.QueryExistingRegisteredOperatorPubKeys(ctx, startBlock, nil)
if err != nil {
ops.logger.Error("Fatal error querying existing registered operators", "err", err, "service", "OperatorPubkeysServiceInMemory")
panic(err)
}
ops.logger.Debug("List of queried operator registration events in blsApkRegistry", "alreadyRegisteredOperatorAddr", alreadyRegisteredOperatorAddrs, "service", "OperatorPubkeysServiceInMemory")

socketsMap, err := ops.avsRegistryReader.QueryExistingRegisteredOperatorSockets(ctx, nil, nil)
socketsMap, err := ops.avsRegistryReader.QueryExistingRegisteredOperatorSockets(ctx, startBlock, nil)
if err != nil {
ops.logger.Error("Fatal error querying existing registered operator sockets", "err", err, "service", "OperatorPubkeysServiceInMemory")
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion services/operatorsinfo/operatorsinfo_inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestGetOperatorInfo(t *testing.T) {
tt.mocksInitializationFunc(mockAvsRegistrySubscriber, mockAvsReader, mockSubscription)
}
// Create a new instance of the operatorpubkeys service
service := NewOperatorsInfoServiceInMemory(context.Background(), mockAvsRegistrySubscriber, mockAvsReader, logger)
service := NewOperatorsInfoServiceInMemory(context.Background(), mockAvsRegistrySubscriber, mockAvsReader, logger, true, nil)
time.Sleep(2 * time.Second) // need to give it time to process the subscription events.. not sure if there's a better way to do this.

// Call the GetOperatorPubkeys method with the test operator address
Expand Down

0 comments on commit 5b4a125

Please sign in to comment.