Skip to content

Commit

Permalink
Merge branch 'main' into fix-reputation-rank-query
Browse files Browse the repository at this point in the history
  • Loading branch information
oren-lava authored Feb 3, 2025
2 parents 486e060 + e06adcc commit a6ead60
Show file tree
Hide file tree
Showing 13 changed files with 736 additions and 185 deletions.
7 changes: 7 additions & 0 deletions proto/lavanet/lava/pairing/relay.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message ProbeRequest {
uint64 guid = 1;
string spec_id = 2;
string api_interface = 3;
bool with_verifications = 4;
}

message ProbeReply {
Expand All @@ -26,6 +27,12 @@ message ProbeReply {
bytes finalized_blocks_hashes = 3;
uint64 lava_epoch = 4;
uint64 lava_latest_block = 5;
repeated Verification verifications = 6;
}

message Verification {
string name = 1;
bool passed = 2;
}

message RelaySession {
Expand Down
64 changes: 58 additions & 6 deletions protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,73 @@ const (
ChainFetcherHeaderName = "X-LAVA-Provider"
)

type ChainFetcherIf interface {
type IChainFetcher interface {
FetchLatestBlockNum(ctx context.Context) (int64, error)
FetchBlockHashByNum(ctx context.Context, blockNum int64) (string, error)
FetchEndpoint() lavasession.RPCProviderEndpoint
Validate(ctx context.Context) error
GetVerificationsStatus() []*pairingtypes.Verification
CustomMessage(ctx context.Context, path string, data []byte, connectionType string, apiName string) ([]byte, error)
}

type ChainFetcher struct {
endpoint *lavasession.RPCProviderEndpoint
chainRouter ChainRouter
chainParser ChainParser
cache *performance.Cache
latestBlock int64
endpoint *lavasession.RPCProviderEndpoint
chainRouter ChainRouter
chainParser ChainParser
cache *performance.Cache
latestBlock int64
verificationsStatus common.SafeSyncMap[string, bool]
cachedVerifications atomic.Value // holds []*pairingtypes.Verification for faster access
cacheValid atomic.Bool
}

func (cf *ChainFetcher) GetVerificationsStatus() []*pairingtypes.Verification {
// Try to get from cache first
if cf.cacheValid.Load() {
value, ok := cf.cachedVerifications.Load().([]*pairingtypes.Verification)
if ok {
return value
} else {
utils.LavaFormatError("invalid usage of cachedVerifications, could not cast result into []*pairingtypes.Verification type", nil, utils.Attribute{Key: "cachedVerifications", Value: cf.cachedVerifications.Load()})
}
}

// If not in cache, create new slice
verifications := make([]*pairingtypes.Verification, 0)
cf.verificationsStatus.Range(func(name string, passed bool) bool {
verifications = append(verifications, &pairingtypes.Verification{
Name: name,
Passed: passed,
})
return true
})

// Store in cache
cf.cachedVerifications.Store(verifications)
cf.cacheValid.Store(true)
return verifications
}

// Add this method to invalidate cache when verification status changes
func (cf *ChainFetcher) invalidateVerificationsCache() {
cf.cacheValid.Store(false)
}

func (cf *ChainFetcher) FetchEndpoint() lavasession.RPCProviderEndpoint {
return *cf.endpoint
}

func (cf *ChainFetcher) getVerificationsKey(verification VerificationContainer, apiInterface string, chainId string) string {
key := chainId + "-" + apiInterface + "-" + verification.Name
if verification.Addon != "" {
key += "-" + verification.Addon
}
if verification.Extension != "" {
key += "-" + verification.Extension
}
return key
}

func (cf *ChainFetcher) Validate(ctx context.Context) error {
for _, url := range cf.endpoint.NodeUrls {
addons := url.Addons
Expand All @@ -67,6 +114,8 @@ func (cf *ChainFetcher) Validate(ctx context.Context) error {
if err != nil {
return err
}
// invalidating cache as value might change
defer cf.invalidateVerificationsCache()
for _, verification := range verifications {
if slices.Contains(url.SkipVerifications, verification.Name) {
utils.LavaFormatDebug("Skipping Verification", utils.LogAttr("verification", verification.Name))
Expand All @@ -81,9 +130,12 @@ func (cf *ChainFetcher) Validate(ctx context.Context) error {
}
}
if err != nil {
cf.verificationsStatus.Store(cf.getVerificationsKey(verification, cf.endpoint.ApiInterface, cf.endpoint.ChainID), false)
if verification.Severity == spectypes.ParseValue_Fail {
return utils.LavaFormatError("invalid Verification on provider startup", err, utils.Attribute{Key: "Addons", Value: addons}, utils.Attribute{Key: "verification", Value: verification.Name})
}
} else {
cf.verificationsStatus.Store(cf.getVerificationsKey(verification, cf.endpoint.ApiInterface, cf.endpoint.ChainID), true)
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions protocol/chainlib/chain_fetcher_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions protocol/common/safe_sync_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (ssm *SafeSyncMap[K, V]) Load(key K) (ret V, ok bool, err error) {
}
ret, ok = value.(V)
if !ok {
return ret, false, utils.LavaFormatError("invalid usage of syncmap, could not cast result into a PolicyUpdater", nil)
return ret, false, utils.LavaFormatError("invalid usage of sync map, could not cast result into V type", nil)
}
return ret, true, nil
}
Expand All @@ -37,7 +37,7 @@ func (ssm *SafeSyncMap[K, V]) LoadOrStore(key K, value V) (ret V, loaded bool, e
var ok bool
ret, ok = actual.(V)
if !ok {
return ret, false, utils.LavaFormatError("invalid usage of sync map, could not cast result into a PolicyUpdater", nil)
return ret, false, utils.LavaFormatError("invalid usage of sync map, could not cast result into V type", nil)
}
return ret, true, nil
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc
chainTracker.StartAndServe(ctx)
reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser)
mockReliabilityManager := NewMockReliabilityManager(reliabilityManager)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, nil, numberOfRetriesOnNodeErrorsProviderSide)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, nil, nil, numberOfRetriesOnNodeErrorsProviderSide)
listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health")
err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint)
require.NoError(t, err)
Expand Down
11 changes: 1 addition & 10 deletions protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,8 @@ func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, igno
}
// add block error probability config if the request block is positive
opts = append(opts, pairingtypes.WithBlockErrorProbability(po.CalculateProbabilityOfBlockError(requestedBlock, providerData)))
} else if requestedBlock == spectypes.EARLIEST_BLOCK {
// if the request block is earliest, we use the latest block as the requested block
} else { // all negative blocks (latest/earliest/pending/safe/finalized) will be considered as latest
requestedBlock = spectypes.LATEST_BLOCK
} else if requestedBlock != spectypes.LATEST_BLOCK && requestedBlock != spectypes.NOT_APPLICABLE {
// if the request block is not positive but not latest/not-applicable - return an error
utils.LavaFormatWarning("[Optimizer] cannot calculate selection tiers",
fmt.Errorf("could not configure block error probability, invalid requested block (must be >0 or -1 or -2)"),
utils.LogAttr("provider", providerAddress),
utils.LogAttr("requested_block", requestedBlock),
)
return NewSelectionTier(), Exploration{}, nil
}
score, err := qos.ComputeReputationFloat64(opts...)
if err != nil {
Expand Down
Loading

0 comments on commit a6ead60

Please sign in to comment.