diff --git a/protocol/common/endpoints.go b/protocol/common/endpoints.go index 098b301b57..9075f918df 100644 --- a/protocol/common/endpoints.go +++ b/protocol/common/endpoints.go @@ -251,9 +251,9 @@ type ConflictHandlerInterface interface { } type ProviderInfo struct { - ProviderAddress string - ProviderQoSExcellenceSummery sdk.Dec // the number represents the average qos for this provider session - ProviderStake sdk.Coin + ProviderAddress string + ProviderReputationSummary sdk.Dec // the number represents the average qos for this provider session + ProviderStake sdk.Coin } type RelayResult struct { diff --git a/protocol/lavaprotocol/finalizationconsensus/finalization_consensus_test.go b/protocol/lavaprotocol/finalizationconsensus/finalization_consensus_test.go index 194265384c..4f1bd7b552 100644 --- a/protocol/lavaprotocol/finalizationconsensus/finalization_consensus_test.go +++ b/protocol/lavaprotocol/finalizationconsensus/finalization_consensus_test.go @@ -16,6 +16,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/lavanet/lava/v4/protocol/chainlib" "github.com/lavanet/lava/v4/protocol/lavasession" + "github.com/lavanet/lava/v4/protocol/qos" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" spectypes "github.com/lavanet/lava/v4/x/spec/types" "github.com/stretchr/testify/require" @@ -202,9 +203,10 @@ func TestConsensusHashesInsertion(t *testing.T) { func TestQoS(t *testing.T) { decToSet, _ := sdk.NewDecFromStr("0.05") // test values fit 0.05 Availability requirements - lavasession.AvailabilityPercentage = decToSet + qos.AvailabilityPercentage = decToSet rand.InitRandomSeed() chainsToTest := []string{"APT1", "LAV1", "ETH1"} + for i := 0; i < 10; i++ { for _, chainID := range chainsToTest { t.Run(chainID, func(t *testing.T) { @@ -282,54 +284,63 @@ func TestQoS(t *testing.T) { currentLatency := time.Millisecond expectedLatency := time.Millisecond latestServicedBlock := expectedBH - singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1) - require.Equal(t, uint64(1), singleConsumerSession.QoSInfo.AnsweredRelays) - require.Equal(t, uint64(1), singleConsumerSession.QoSInfo.TotalRelays) - require.Equal(t, int64(1), singleConsumerSession.QoSInfo.SyncScoreSum) - require.Equal(t, int64(1), singleConsumerSession.QoSInfo.TotalSyncScore) - require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability) - require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Sync) - require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency) + singleConsumerSession.QoSManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1) + require.Equal(t, uint64(1), singleConsumerSession.QoSManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId)) + require.Equal(t, uint64(1), singleConsumerSession.QoSManager.GetTotalRelays(epoch, singleConsumerSession.SessionId)) + require.Equal(t, int64(1), singleConsumerSession.QoSManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId)) + require.Equal(t, int64(1), singleConsumerSession.QoSManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId)) + + lastQoSReport := singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId) + require.Equal(t, sdk.OneDec(), lastQoSReport.Availability) + require.Equal(t, sdk.OneDec(), lastQoSReport.Sync) + require.Equal(t, sdk.OneDec(), lastQoSReport.Latency) latestServicedBlock = expectedBH + 1 - singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1) - require.Equal(t, uint64(2), singleConsumerSession.QoSInfo.AnsweredRelays) - require.Equal(t, uint64(2), singleConsumerSession.QoSInfo.TotalRelays) - require.Equal(t, int64(2), singleConsumerSession.QoSInfo.SyncScoreSum) - require.Equal(t, int64(2), singleConsumerSession.QoSInfo.TotalSyncScore) - require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability) - require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Sync) - require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency) - - singleConsumerSession.QoSInfo.TotalRelays++ // this is how we add a failure - singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1) - require.Equal(t, uint64(3), singleConsumerSession.QoSInfo.AnsweredRelays) - require.Equal(t, uint64(4), singleConsumerSession.QoSInfo.TotalRelays) - require.Equal(t, int64(3), singleConsumerSession.QoSInfo.SyncScoreSum) - require.Equal(t, int64(3), singleConsumerSession.QoSInfo.TotalSyncScore) - - require.Equal(t, sdk.ZeroDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0 - require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Sync) - require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency) + singleConsumerSession.QoSManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1) + require.Equal(t, uint64(2), singleConsumerSession.QoSManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId)) + require.Equal(t, uint64(2), singleConsumerSession.QoSManager.GetTotalRelays(epoch, singleConsumerSession.SessionId)) + require.Equal(t, int64(2), singleConsumerSession.QoSManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId)) + require.Equal(t, int64(2), singleConsumerSession.QoSManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId)) + + lastQoSReport = singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId) + require.Equal(t, sdk.OneDec(), lastQoSReport.Availability) + require.Equal(t, sdk.OneDec(), lastQoSReport.Sync) + require.Equal(t, sdk.OneDec(), lastQoSReport.Latency) + + singleConsumerSession.QoSManager.AddFailedRelay(epoch, singleConsumerSession.SessionId) // this is how we add a failure + singleConsumerSession.QoSManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1) + require.Equal(t, uint64(3), singleConsumerSession.QoSManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId)) + require.Equal(t, uint64(4), singleConsumerSession.QoSManager.GetTotalRelays(epoch, singleConsumerSession.SessionId)) + require.Equal(t, int64(3), singleConsumerSession.QoSManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId)) + require.Equal(t, int64(3), singleConsumerSession.QoSManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId)) + + lastQoSReport = singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId) + require.Equal(t, sdk.ZeroDec(), lastQoSReport.Availability) // because availability below 95% is 0 + require.Equal(t, sdk.OneDec(), lastQoSReport.Sync) + require.Equal(t, sdk.OneDec(), lastQoSReport.Latency) latestServicedBlock = expectedBH - 1 // is one block below threshold - singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1) - require.Equal(t, uint64(4), singleConsumerSession.QoSInfo.AnsweredRelays) - require.Equal(t, uint64(5), singleConsumerSession.QoSInfo.TotalRelays) - require.Equal(t, int64(3), singleConsumerSession.QoSInfo.SyncScoreSum) - require.Equal(t, int64(4), singleConsumerSession.QoSInfo.TotalSyncScore) - - require.Equal(t, sdk.ZeroDec(), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0 - require.Equal(t, sdk.MustNewDecFromStr("0.75"), singleConsumerSession.QoSInfo.LastQoSReport.Sync) - require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency) + singleConsumerSession.QoSManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1) + require.Equal(t, uint64(4), singleConsumerSession.QoSManager.GetAnsweredRelays(epoch, singleConsumerSession.SessionId)) + require.Equal(t, uint64(5), singleConsumerSession.QoSManager.GetTotalRelays(epoch, singleConsumerSession.SessionId)) + require.Equal(t, int64(3), singleConsumerSession.QoSManager.GetSyncScoreSum(epoch, singleConsumerSession.SessionId)) + require.Equal(t, int64(4), singleConsumerSession.QoSManager.GetTotalSyncScore(epoch, singleConsumerSession.SessionId)) + + lastQoSReport = singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId) + require.Equal(t, sdk.ZeroDec(), lastQoSReport.Availability) // because availability below 95% is 0 + require.Equal(t, sdk.MustNewDecFromStr("0.75"), lastQoSReport.Sync) + require.Equal(t, sdk.OneDec(), lastQoSReport.Latency) + latestServicedBlock = expectedBH + 1 // add in a loop so availability goes above 95% for i := 5; i < 100; i++ { - singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1) + singleConsumerSession.QoSManager.CalculateQoS(epoch, singleConsumerSession.SessionId, "", currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1) } - require.Equal(t, sdk.MustNewDecFromStr("0.8"), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0 - require.Equal(t, sdk.MustNewDecFromStr("0.989898989898989898"), singleConsumerSession.QoSInfo.LastQoSReport.Sync) - require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency) + + lastQoSReport = singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId) + require.Equal(t, sdk.MustNewDecFromStr("0.8"), lastQoSReport.Availability) // because availability below 95% is 0 + require.Equal(t, sdk.MustNewDecFromStr("0.989898989898989898"), lastQoSReport.Sync) + require.Equal(t, sdk.OneDec(), lastQoSReport.Latency) finalizationInsertionsSpreadBlocks := []finalizationTestInsertion{ finalizationInsertionForProviders(chainID, epoch, 200, 0, 1, true, "", blocksInFinalizationProof, blockDistanceForFinalizedData)[0], diff --git a/protocol/lavaprotocol/request_builder.go b/protocol/lavaprotocol/request_builder.go index 0c4ff6e60c..dff9090259 100644 --- a/protocol/lavaprotocol/request_builder.go +++ b/protocol/lavaprotocol/request_builder.go @@ -71,8 +71,8 @@ func ConstructRelaySession(lavaChainID string, relayRequestData *pairingtypes.Re return nil } - copiedQOS := copyQoSServiceReport(singleConsumerSession.QoSInfo.LastQoSReport) - copiedExcellenceQOS := copyQoSServiceReport(singleConsumerSession.QoSInfo.LastExcellenceQoSReport) + copiedQOS := copyQoSServiceReport(singleConsumerSession.QoSManager.GetLastQoSReport(uint64(epoch), singleConsumerSession.SessionId)) + copiedReputation := copyQoSServiceReport(singleConsumerSession.QoSManager.GetLastReputationQoSReport(uint64(epoch), singleConsumerSession.SessionId)) // copy reputation report for the node return &pairingtypes.RelaySession{ SpecId: chainID, @@ -87,7 +87,7 @@ func ConstructRelaySession(lavaChainID string, relayRequestData *pairingtypes.Re LavaChainId: lavaChainID, Sig: nil, Badge: nil, - QosExcellenceReport: copiedExcellenceQOS, + QosExcellenceReport: copiedReputation, } } diff --git a/protocol/lavaprotocol/response_builder_test.go b/protocol/lavaprotocol/response_builder_test.go index e8d545c706..7693035417 100644 --- a/protocol/lavaprotocol/response_builder_test.go +++ b/protocol/lavaprotocol/response_builder_test.go @@ -8,6 +8,7 @@ import ( "github.com/lavanet/lava/v4/protocol/lavaprotocol/finalizationverification" "github.com/lavanet/lava/v4/protocol/lavasession" + "github.com/lavanet/lava/v4/protocol/qos" "github.com/lavanet/lava/v4/utils/sigs" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" spectypes "github.com/lavanet/lava/v4/x/spec/types" @@ -29,7 +30,7 @@ func TestSignAndExtractResponse(t *testing.T) { singleConsumerSession := &lavasession.SingleConsumerSession{ CuSum: 20, LatestRelayCu: 10, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + QoSManager: qos.NewQoSManager(), SessionId: 123, Parent: nil, RelayNum: 1, @@ -77,7 +78,7 @@ func TestSignAndExtractResponseLatest(t *testing.T) { singleConsumerSession := &lavasession.SingleConsumerSession{ CuSum: 20, LatestRelayCu: 10, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + QoSManager: qos.NewQoSManager(), SessionId: 123, Parent: nil, RelayNum: 1, diff --git a/protocol/lavaprotocol/reuqest_builder_test.go b/protocol/lavaprotocol/reuqest_builder_test.go index 8f49fb9add..fd6c7c61b0 100644 --- a/protocol/lavaprotocol/reuqest_builder_test.go +++ b/protocol/lavaprotocol/reuqest_builder_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/lavanet/lava/v4/protocol/lavasession" + "github.com/lavanet/lava/v4/protocol/qos" "github.com/lavanet/lava/v4/utils/sigs" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" "github.com/stretchr/testify/require" @@ -18,7 +19,7 @@ func TestSignAndExtract(t *testing.T) { singleConsumerSession := &lavasession.SingleConsumerSession{ CuSum: 20, LatestRelayCu: 10, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + QoSManager: qos.NewQoSManager(), SessionId: 123, Parent: nil, RelayNum: 1, diff --git a/protocol/lavasession/common.go b/protocol/lavasession/common.go index 90f65e7b1e..ca7ebece6e 100644 --- a/protocol/lavasession/common.go +++ b/protocol/lavasession/common.go @@ -15,7 +15,6 @@ import ( sdkerrors "cosmossdk.io/errors" "golang.org/x/exp/slices" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/gogo/status" "github.com/lavanet/lava/v4/protocol/chainlib/chainproxy" "github.com/lavanet/lava/v4/utils" @@ -48,14 +47,11 @@ const ( unixPrefix = "unix:" ) -var AvailabilityPercentage sdk.Dec = sdk.NewDecWithPrec(1, 1) // TODO move to params pairing const ( - PercentileToCalculateLatency = 0.9 - MinProvidersForSync = 0.6 - OptimizerPerturbation = 0.10 - LatencyThresholdStatic = 1 * time.Second - LatencyThresholdSlope = 1 * time.Millisecond - StaleEpochDistance = 3 // relays done 3 epochs back are ready to be rewarded + OptimizerPerturbation = 0.10 + LatencyThresholdStatic = 1 * time.Second + LatencyThresholdSlope = 1 * time.Millisecond + StaleEpochDistance = 3 // relays done 3 epochs back are ready to be rewarded ) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 1552041bd3..90264976e8 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -558,12 +558,12 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS ReportedProviders: reportedProviders, } - // adding qos summery for error parsing. + // adding qos summary for error parsing. // consumer session is locked here so its ok to read the qos report. - sessionInfo.QoSSummeryResult = consumerSession.getQosComputedResultOrZero() + sessionInfo.QoSSummaryResult = consumerSession.getQosComputedResultOrZero() sessions[providerAddress] = sessionInfo - qosReport, _ := csm.providerOptimizer.GetExcellenceQoSReportForProvider(providerAddress) + qosReport, _ := csm.providerOptimizer.GetReputationReportForProvider(providerAddress) if csm.rpcEndpoint.Geolocation != uint64(endpoint.endpoint.Geolocation) { // rawQosReport is used only when building the relay payment message to be used to update // the provider's reputation on-chain. If the consumer and provider don't share geolocation @@ -936,7 +936,7 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu consumerSession.BlockListed = true } - consumerSession.QoSInfo.TotalRelays++ + consumerSession.QoSManager.AddFailedRelay(consumerSession.epoch, consumerSession.SessionId) consumerSession.ConsecutiveErrors = append(consumerSession.ConsecutiveErrors, errorReceived) // copy consecutive errors for report. errorsForConsumerSession := consumerSession.ConsecutiveErrors @@ -1047,7 +1047,7 @@ func (csm *ConsumerSessionManager) OnSessionDone( consumerSession.ConsecutiveErrors = []error{} consumerSession.LatestBlock = latestServicedBlock // update latest serviced block // calculate QoS - consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount)) + consumerSession.QoSManager.CalculateQoS(csm.atomicReadCurrentEpoch(), consumerSession.SessionId, consumerSession.Parent.PublicLavaAddress, currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount)) if !isHangingApi { // append relay data only for non hanging apis go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, specComputeUnits, uint64(latestServicedBlock)) @@ -1066,21 +1066,25 @@ func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleC info := csm.RPCEndpoint() apiInterface := info.ApiInterface chainId := info.ChainID + var lastQos *pairingtypes.QualityOfServiceReport - var lastQosExcellence *pairingtypes.QualityOfServiceReport - if consumerSession.QoSInfo.LastQoSReport != nil { - qos := *consumerSession.QoSInfo.LastQoSReport + lastQoSReport := consumerSession.QoSManager.GetLastQoSReport(csm.atomicReadCurrentEpoch(), consumerSession.SessionId) + if lastQoSReport != nil { + qos := *lastQoSReport lastQos = &qos } - if consumerSession.QoSInfo.LastExcellenceQoSReport != nil { - qosEx := *consumerSession.QoSInfo.LastExcellenceQoSReport - lastQosExcellence = &qosEx + + var lastReputation *pairingtypes.QualityOfServiceReport + lastReputationReport := consumerSession.QoSManager.GetLastReputationQoSReport(csm.atomicReadCurrentEpoch(), consumerSession.SessionId) + if lastReputationReport != nil { + qosRep := *lastReputationReport + lastReputation = &qosRep } publicProviderAddress := consumerSession.Parent.PublicLavaAddress publicProviderEndpoint := consumerSession.Parent.Endpoints[0].NetworkAddress go func() { - csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, publicProviderEndpoint, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum, relayLatency, sessionSuccessful) + csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, publicProviderEndpoint, lastQos, lastReputation, consumerSession.LatestBlock, consumerSession.RelayNum, relayLatency, sessionSuccessful) }() } diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 95041f7e05..b00b9cc35b 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -9,6 +9,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/lavanet/lava/v4/protocol/provideroptimizer" + "github.com/lavanet/lava/v4/protocol/qos" "github.com/lavanet/lava/v4/utils" "github.com/lavanet/lava/v4/utils/rand" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" @@ -62,7 +63,7 @@ type UsedProvidersInf interface { type SessionInfo struct { Session *SingleConsumerSession StakeSize sdk.Coin - QoSSummeryResult sdk.Dec // using ComputeQoS to get the total QOS + QoSSummaryResult sdk.Dec // using ComputeQoS to get the total QOS Epoch uint64 ReportedProviders []*pairingtypes.ReportedProvider } @@ -74,7 +75,7 @@ type ProviderOptimizer interface { AppendRelayFailure(providerAddress string) AppendRelayData(providerAddress string, latency time.Duration, cu, syncBlock uint64) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int) - GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, time.Time) + GetReputationReportForProvider(string) (*pairingtypes.QualityOfServiceReport, time.Time) Strategy() provideroptimizer.Strategy UpdateWeights(map[string]int64, uint64) } @@ -84,16 +85,6 @@ type ignoredProviders struct { currentEpoch uint64 } -type QoSReport struct { - LastQoSReport *pairingtypes.QualityOfServiceReport - LastExcellenceQoSReport *pairingtypes.QualityOfServiceReport - LatencyScoreList []sdk.Dec - SyncScoreSum int64 - TotalSyncScore int64 - TotalRelays uint64 - AnsweredRelays uint64 -} - type DataReliabilitySession struct { SingleConsumerSession *SingleConsumerSession Epoch uint64 @@ -439,6 +430,8 @@ func (cswp *ConsumerSessionsWithProvider) GetConsumerSessionInstanceFromEndpoint EndpointConnection: endpointConnection, StaticProvider: cswp.StaticProvider, routerKey: NewRouterKey(nil), + epoch: cswp.PairingEpoch, + QoSManager: qos.NewQoSManager(), } consumerSession.TryUseSession() // we must lock the session so other requests wont get it. @@ -592,12 +585,6 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes return connected, endpointsList, cswp.PublicLavaAddress, nil } -func CalculateAvailabilityScore(qosReport *QoSReport) (downtimePercentageRet, scaledAvailabilityScoreRet sdk.Dec) { - downtimePercentage := sdk.NewDecWithPrec(int64(qosReport.TotalRelays-qosReport.AnsweredRelays), 0).Quo(sdk.NewDecWithPrec(int64(qosReport.TotalRelays), 0)) - scaledAvailabilityScore := sdk.MaxDec(sdk.ZeroDec(), AvailabilityPercentage.Sub(downtimePercentage).Quo(AvailabilityPercentage)) - return downtimePercentage, scaledAvailabilityScore -} - func CalcWeightsByStake(providers map[uint64]*ConsumerSessionsWithProvider) (weights map[string]int64) { weights = make(map[string]int64) staticProviders := make([]*ConsumerSessionsWithProvider, 0) diff --git a/protocol/lavasession/single_consumer_session.go b/protocol/lavasession/single_consumer_session.go index 54f5a6f41b..395835c00b 100644 --- a/protocol/lavasession/single_consumer_session.go +++ b/protocol/lavasession/single_consumer_session.go @@ -1,12 +1,10 @@ package lavasession import ( - "math" - "sort" - "strconv" "time" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/lavanet/lava/v4/protocol/qos" "github.com/lavanet/lava/v4/utils" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" ) @@ -14,7 +12,7 @@ import ( type SingleConsumerSession struct { CuSum uint64 LatestRelayCu uint64 // set by GetSessions cuNeededForSession - QoSInfo QoSReport + QoSManager *qos.QoSManager SessionId int64 Parent *ConsumerSessionsWithProvider lock utils.LavaMutex @@ -29,6 +27,7 @@ type SingleConsumerSession struct { providerUniqueId string StaticProvider bool routerKey RouterKey + epoch uint64 } // returns the expected latency to a threshold. @@ -39,77 +38,26 @@ func (cs *SingleConsumerSession) CalculateExpectedLatency(timeoutGivenToRelay ti // cs should be locked here to use this method, returns the computed qos or zero if last qos is nil or failed to compute. func (cs *SingleConsumerSession) getQosComputedResultOrZero() sdk.Dec { - if cs.QoSInfo.LastExcellenceQoSReport != nil { - qosComputed, errComputing := cs.QoSInfo.LastExcellenceQoSReport.ComputeQoSExcellence() + lastReputationReport := cs.QoSManager.GetLastReputationQoSReport(cs.epoch, cs.SessionId) + if lastReputationReport != nil { + computedReputation, errComputing := lastReputationReport.ComputeReputation() if errComputing == nil { // if we failed to compute the qos will be 0 so this provider wont be picked to return the error in case we get it - return qosComputed + return computedReputation } - utils.LavaFormatDebug("Failed computing QoS used for error parsing, could happen if we have no sync data or one of the fields is zero", utils.LogAttr("Report", cs.QoSInfo.LastExcellenceQoSReport), utils.LogAttr("error", errComputing)) + utils.LavaFormatDebug("Failed computing QoS used for error parsing, could happen if we have no sync data or one of the fields is zero", + utils.LogAttr("Report", lastReputationReport), + utils.LogAttr("error", errComputing), + ) } return sdk.ZeroDec() } -func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Duration, blockHeightDiff int64, numOfProviders int, servicersToCount int64) { - // Add current Session QoS - cs.QoSInfo.TotalRelays++ // increase total relays - cs.QoSInfo.AnsweredRelays++ // increase answered relays - - if cs.QoSInfo.LastQoSReport == nil { - cs.QoSInfo.LastQoSReport = &pairingtypes.QualityOfServiceReport{} - } - - downtimePercentage, scaledAvailabilityScore := CalculateAvailabilityScore(&cs.QoSInfo) - cs.QoSInfo.LastQoSReport.Availability = scaledAvailabilityScore - if sdk.OneDec().GT(cs.QoSInfo.LastQoSReport.Availability) { - utils.LavaFormatDebug("QoS Availability report", utils.Attribute{Key: "Availability", Value: cs.QoSInfo.LastQoSReport.Availability}, utils.Attribute{Key: "down percent", Value: downtimePercentage}) - } - - latencyScore := sdk.MinDec(sdk.OneDec(), sdk.NewDecFromInt(sdk.NewInt(int64(expectedLatency))).Quo(sdk.NewDecFromInt(sdk.NewInt(int64(latency))))) - - insertSorted := func(list []sdk.Dec, value sdk.Dec) []sdk.Dec { - index := sort.Search(len(list), func(i int) bool { - return list[i].GTE(value) - }) - if len(list) == index { // nil or empty slice or after last element - return append(list, value) - } - list = append(list[:index+1], list[index:]...) // index < len(a) - list[index] = value - return list - } - cs.QoSInfo.LatencyScoreList = insertSorted(cs.QoSInfo.LatencyScoreList, latencyScore) - cs.QoSInfo.LastQoSReport.Latency = cs.QoSInfo.LatencyScoreList[int(float64(len(cs.QoSInfo.LatencyScoreList))*PercentileToCalculateLatency)] - - // checking if we have enough information to calculate the sync score for the providers, if we haven't talked - // with enough providers we don't have enough information and we will wait to have more information before setting the sync score - shouldCalculateSyncScore := int64(numOfProviders) > int64(math.Ceil(float64(servicersToCount)*MinProvidersForSync)) - if shouldCalculateSyncScore { // - if blockHeightDiff <= 0 { // if the diff is bigger than 0 than the block is too old (blockHeightDiff = expected - allowedLag - blockHeight) and we don't give him the score - cs.QoSInfo.SyncScoreSum++ - } - cs.QoSInfo.TotalSyncScore++ - cs.QoSInfo.LastQoSReport.Sync = sdk.NewDec(cs.QoSInfo.SyncScoreSum).QuoInt64(cs.QoSInfo.TotalSyncScore) - if sdk.OneDec().GT(cs.QoSInfo.LastQoSReport.Sync) { - utils.LavaFormatDebug("QoS Sync report", - utils.Attribute{Key: "Sync", Value: cs.QoSInfo.LastQoSReport.Sync}, - utils.Attribute{Key: "block diff", Value: blockHeightDiff}, - utils.Attribute{Key: "sync score", Value: strconv.FormatInt(cs.QoSInfo.SyncScoreSum, 10) + "/" + strconv.FormatInt(cs.QoSInfo.TotalSyncScore, 10)}, - utils.Attribute{Key: "session_id", Value: cs.SessionId}, - utils.Attribute{Key: "provider", Value: cs.Parent.PublicLavaAddress}, - ) - } - } else { - // we prefer to give them a score of 1 when there is no other data, since otherwise we damage their payments - cs.QoSInfo.LastQoSReport.Sync = sdk.NewDec(1) - } -} - -func (scs *SingleConsumerSession) SetUsageForSession(cuNeededForSession uint64, qoSExcellenceReport *pairingtypes.QualityOfServiceReport, usedProviders UsedProvidersInf, routerKey RouterKey) error { +func (scs *SingleConsumerSession) SetUsageForSession(cuNeededForSession uint64, reputationReport *pairingtypes.QualityOfServiceReport, usedProviders UsedProvidersInf, routerKey RouterKey) error { scs.LatestRelayCu = cuNeededForSession // set latestRelayCu scs.RelayNum += RelayNumberIncrement // increase relayNum if scs.RelayNum > 1 { - // we only set excellence for sessions with more than one successful relays, this guarantees data within the epoch exists - scs.QoSInfo.LastExcellenceQoSReport = qoSExcellenceReport + // we only set reputation for sessions with more than one successful relays, this guarantees data within the epoch exists + scs.QoSManager.SetLastReputationQoSReport(scs.epoch, scs.SessionId, reputationReport) } scs.usedProviders = usedProviders scs.routerKey = routerKey diff --git a/protocol/metrics/consumer_metrics_manager.go b/protocol/metrics/consumer_metrics_manager.go index bb17287a47..838491d0c3 100644 --- a/protocol/metrics/consumer_metrics_manager.go +++ b/protocol/metrics/consumer_metrics_manager.go @@ -53,7 +53,7 @@ type ConsumerMetricsManager struct { blockMetric *prometheus.GaugeVec latencyMetric *prometheus.GaugeVec qosMetric *MappedLabelsGaugeVec - qosExcellenceMetric *MappedLabelsGaugeVec + providerReputationMetric *MappedLabelsGaugeVec LatestBlockMetric *MappedLabelsGaugeVec LatestProviderRelay *prometheus.GaugeVec virtualEpochMetric *prometheus.GaugeVec @@ -163,14 +163,14 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM Labels: qosMetricLabels, }) - qosExcellenceMetricLabels := []string{"spec", "provider_address", "qos_metric"} + providerReputationMetricLabels := []string{"spec", "provider_address", "qos_metric"} if ShowProviderEndpointInMetrics { - qosExcellenceMetricLabels = append(qosExcellenceMetricLabels, "provider_endpoint") + providerReputationMetricLabels = append(providerReputationMetricLabels, "provider_endpoint") } - qosExcellenceMetric := NewMappedLabelsGaugeVec(MappedLabelsMetricOpts{ - Name: "lava_consumer_qos_excellence_metrics", - Help: "The QOS metrics per provider excellence", - Labels: qosExcellenceMetricLabels, + providerReputationMetric := NewMappedLabelsGaugeVec(MappedLabelsMetricOpts{ + Name: "lava_consumer_provider_reputation_metrics", + Help: "The provider reputation metrics per provider", + Labels: providerReputationMetricLabels, }) latestBlockMetricLabels := []string{"spec", "provider_address", "apiInterface"} @@ -289,7 +289,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM blockMetric: blockMetric, latencyMetric: latencyMetric, qosMetric: qosMetric, - qosExcellenceMetric: qosExcellenceMetric, + providerReputationMetric: providerReputationMetric, LatestBlockMetric: latestBlockMetric, LatestProviderRelay: latestProviderRelay, providerRelays: map[string]uint64{}, @@ -463,7 +463,7 @@ func (pme *ConsumerMetricsManager) getKeyForProcessingLatency(chainId string, ap return header + "_" + chainId + "_" + apiInterface } -func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, providerEndpoint string, qos *pairingtypes.QualityOfServiceReport, qosExcellence *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64, relayLatency time.Duration, sessionSuccessful bool) { +func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, providerEndpoint string, qos *pairingtypes.QualityOfServiceReport, reputation *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64, relayLatency time.Duration, sessionSuccessful bool) { if pme == nil { return } @@ -521,7 +521,7 @@ func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface st } } setMetricsForQos(qos, pme.qosMetric, apiInterface, providerEndpoint) - setMetricsForQos(qosExcellence, pme.qosExcellenceMetric, "", providerEndpoint) // it's one api interface for all of them + setMetricsForQos(reputation, pme.providerReputationMetric, "", providerEndpoint) // it's one api interface for all of them labels := map[string]string{"spec": chainId, "provider_address": providerAddress, "apiInterface": apiInterface, "provider_endpoint": providerEndpoint} pme.LatestBlockMetric.WithLabelValues(labels).Set(float64(latestBlock)) @@ -565,7 +565,7 @@ func (pme *ConsumerMetricsManager) ResetSessionRelatedMetrics() { pme.lock.Lock() defer pme.lock.Unlock() pme.qosMetric.Reset() - pme.qosExcellenceMetric.Reset() + pme.providerReputationMetric.Reset() pme.providerRelays = map[string]uint64{} } diff --git a/protocol/metrics/consumer_reports_client_test.go b/protocol/metrics/consumer_reports_client_test.go index dfbc630290..570e81ac20 100644 --- a/protocol/metrics/consumer_reports_client_test.go +++ b/protocol/metrics/consumer_reports_client_test.go @@ -8,12 +8,15 @@ import ( "testing" "time" + "github.com/lavanet/lava/v4/utils" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" "github.com/stretchr/testify/require" ) func TestReportsClientFlows(t *testing.T) { t.Run("one-shot", func(t *testing.T) { + serverWaitGroup := utils.NewChanneledWaitGroup() + serverWaitGroup.Add(3) // 2 reports + 1 conflict messages := []map[string]interface{}{} reqMap := []map[string]interface{}{} serverHandle := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -26,6 +29,9 @@ func TestReportsClientFlows(t *testing.T) { reqMap = []map[string]interface{}{} w.WriteHeader(http.StatusOK) fmt.Fprint(w, `{"jsonrpc":"2.0","id":1,"result":"0x10a7a08"}`) + for range messages { + serverWaitGroup.Done() + } }) mockServer := httptest.NewServer(serverHandle) @@ -45,7 +51,14 @@ func TestReportsClientFlows(t *testing.T) { SigBlocks: []byte{}, Metadata: []pairingtypes.Metadata{}, }, &pairingtypes.RelayRequest{}, &pairingtypes.RelayReply{})) - time.Sleep(110 * time.Millisecond) + + select { + case <-serverWaitGroup.Wait(): + // all done + case <-time.After(2 * time.Second): + t.Fatal("Timeout reached before reports were received") + } + require.Len(t, messages, 3) reports := 0 conflicts := 0 diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index 77f64311c7..326c925118 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -23,9 +23,9 @@ import ( // The provider optimizer is a mechanism within the consumer that is responsible for choosing // the optimal provider for the consumer. -// The choice depends on the provider's QoS excellence metrics: latency, sync and availability. +// The choice depends on the provider's QoS reputation metrics: latency, sync and availability. // Providers are picked by selection tiers that take into account their stake amount and QoS -// excellence score. +// reputation score. const ( CacheMaxCost = 20000 // each item cost would be 1 @@ -229,7 +229,7 @@ func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, igno continue } - qos, lastUpdateTime := po.GetExcellenceQoSReportForProvider(providerAddress) + qos, lastUpdateTime := po.GetReputationReportForProvider(providerAddress) if qos == nil { utils.LavaFormatWarning("[Optimizer] cannot calculate selection tiers", fmt.Errorf("could not get QoS excellece report for provider"), @@ -265,7 +265,7 @@ func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, igno ) return NewSelectionTier(), Exploration{}, nil } - score, err := qos.ComputeQoSExcellenceFloat64(opts...) + score, err := qos.ComputeReputationFloat64(opts...) if err != nil { utils.LavaFormatWarning("[Optimizer] cannot calculate selection tiers", err, utils.LogAttr("provider", providerAddress), @@ -588,7 +588,7 @@ func NewProviderOptimizer(strategy Strategy, averageBlockTIme time.Duration, wan } } -func (po *ProviderOptimizer) GetExcellenceQoSReportForProvider(providerAddress string) (report *pairingtypes.QualityOfServiceReport, lastUpdateTime time.Time) { +func (po *ProviderOptimizer) GetReputationReportForProvider(providerAddress string) (report *pairingtypes.QualityOfServiceReport, lastUpdateTime time.Time) { providerData, found := po.getProviderData(providerAddress) if !found { utils.LavaFormatWarning("provider data not found, using default", nil, utils.LogAttr("address", providerAddress)) diff --git a/protocol/provideroptimizer/provider_optimizer_test.go b/protocol/provideroptimizer/provider_optimizer_test.go index a7a78969ef..acc3f58da2 100644 --- a/protocol/provideroptimizer/provider_optimizer_test.go +++ b/protocol/provideroptimizer/provider_optimizer_test.go @@ -342,9 +342,9 @@ func TestProviderOptimizerUpdatingLatency(t *testing.T) { // add good latency probe relays, score should improve for i := 0; i < 10; i++ { // get current score - qos, _ := providerOptimizer.GetExcellenceQoSReportForProvider(providerAddress) + qos, _ := providerOptimizer.GetReputationReportForProvider(providerAddress) require.NotNil(t, qos) - score, err := qos.ComputeQoSExcellence() + score, err := qos.ComputeReputation() require.NoError(t, err) // add good latency probe @@ -352,9 +352,9 @@ func TestProviderOptimizerUpdatingLatency(t *testing.T) { time.Sleep(4 * time.Millisecond) // check score again and compare to the last score - qos, _ = providerOptimizer.GetExcellenceQoSReportForProvider(providerAddress) + qos, _ = providerOptimizer.GetReputationReportForProvider(providerAddress) require.NotNil(t, qos) - newScore, err := qos.ComputeQoSExcellence() + newScore, err := qos.ComputeReputation() require.NoError(t, err) require.True(t, newScore.LT(score), "newScore: "+newScore.String()+", score: "+score.String()) } @@ -367,9 +367,9 @@ func TestProviderOptimizerUpdatingLatency(t *testing.T) { // add good latency relays, score should improve for i := 0; i < 10; i++ { // get current score - qos, _ := providerOptimizer.GetExcellenceQoSReportForProvider(providerAddress) + qos, _ := providerOptimizer.GetReputationReportForProvider(providerAddress) require.NotNil(t, qos) - score, err := qos.ComputeQoSExcellence() + score, err := qos.ComputeReputation() require.NoError(t, err) // add good latency relay @@ -377,9 +377,9 @@ func TestProviderOptimizerUpdatingLatency(t *testing.T) { time.Sleep(4 * time.Millisecond) // check score again and compare to the last score - qos, _ = providerOptimizer.GetExcellenceQoSReportForProvider(providerAddress) + qos, _ = providerOptimizer.GetReputationReportForProvider(providerAddress) require.NotNil(t, qos) - newScore, err := qos.ComputeQoSExcellence() + newScore, err := qos.ComputeReputation() require.NoError(t, err) require.True(t, newScore.LT(score), "newScore: "+newScore.String()+", score: "+score.String()) } @@ -583,7 +583,7 @@ func TestProviderOptimizerStrategiesScoring(t *testing.T) { require.Equal(t, providersGen.providersAddresses[0], tier0[0].Address) } -func TestExcellence(t *testing.T) { +func TestReputation(t *testing.T) { providerOptimizer := setupProviderOptimizer(1) providersCount := 5 providersGen := (&providersGenerator{}).setupProvidersForTest(providersCount) @@ -597,10 +597,10 @@ func TestExcellence(t *testing.T) { } time.Sleep(4 * time.Millisecond) } - report, sampleTime1 := providerOptimizer.GetExcellenceQoSReportForProvider(providersGen.providersAddresses[0]) + report, sampleTime1 := providerOptimizer.GetReputationReportForProvider(providersGen.providersAddresses[0]) require.NotNil(t, report) require.True(t, sampleTime.Equal(sampleTime1)) - report2, sampleTime2 := providerOptimizer.GetExcellenceQoSReportForProvider(providersGen.providersAddresses[1]) + report2, sampleTime2 := providerOptimizer.GetReputationReportForProvider(providersGen.providersAddresses[1]) require.NotNil(t, report2) require.Equal(t, report, report2) require.True(t, sampleTime.Equal(sampleTime2)) @@ -1026,9 +1026,9 @@ func TestProviderOptimizerLatencySyncScore(t *testing.T) { // verify both providers have the same score scores := []math.LegacyDec{} for _, provider := range providersGen.providersAddresses { - qos, _ := providerOptimizer.GetExcellenceQoSReportForProvider(provider) + qos, _ := providerOptimizer.GetReputationReportForProvider(provider) require.NotNil(t, qos) - score, err := qos.ComputeQoSExcellence() + score, err := qos.ComputeReputation() require.NoError(t, err) scores = append(scores, score) } diff --git a/protocol/qos/common.go b/protocol/qos/common.go new file mode 100644 index 0000000000..231214afd6 --- /dev/null +++ b/protocol/qos/common.go @@ -0,0 +1,10 @@ +package qos + +import sdk "github.com/cosmos/cosmos-sdk/types" + +var AvailabilityPercentage sdk.Dec = sdk.NewDecWithPrec(1, 1) // TODO move to params pairing + +const ( + PercentileToCalculateLatency = 0.9 + MinProvidersForSync = 0.6 +) diff --git a/protocol/qos/qos_manager.go b/protocol/qos/qos_manager.go new file mode 100644 index 0000000000..89b5a39f44 --- /dev/null +++ b/protocol/qos/qos_manager.go @@ -0,0 +1,126 @@ +package qos + +import ( + "sync" + "time" + + pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" +) + +type QoSManager struct { + qosReports map[uint64]map[int64]*QoSReport // first key is the epoch, second key is the session id + lock sync.RWMutex +} + +func NewQoSManager() *QoSManager { + qosManager := &QoSManager{} + qosManager.qosReports = make(map[uint64]map[int64]*QoSReport) + return qosManager +} + +func (qosManager *QoSManager) fetchOrSetSessionFromMap(epoch uint64, sessionId int64) *QoSReport { + qosManager.lock.Lock() + defer qosManager.lock.Unlock() + if qosManager.qosReports[epoch] == nil { + qosManager.qosReports[epoch] = make(map[int64]*QoSReport) + } + if qosManager.qosReports[epoch][sessionId] == nil { + qosManager.qosReports[epoch][sessionId] = &QoSReport{} + } + return qosManager.qosReports[epoch][sessionId] +} + +func (qosManager *QoSManager) createQoSMutatorBase(epoch uint64, sessionId int64) *QoSMutatorBase { + qosMutatorBase := &QoSMutatorBase{ + epoch: epoch, + sessionId: sessionId, + } + return qosMutatorBase +} + +func (qm *QoSManager) mutate(mutator Mutator) { + qosReport := qm.fetchOrSetSessionFromMap(mutator.GetEpochAndSessionId()) + qosReport.mutate(mutator) +} + +func (qosManager *QoSManager) CalculateQoS(epoch uint64, sessionId int64, providerAddress string, latency, expectedLatency time.Duration, blockHeightDiff int64, numOfProviders int, servicersToCount int64) { + qosManager.mutate(&QoSMutatorRelaySuccess{ + QoSMutatorBase: qosManager.createQoSMutatorBase(epoch, sessionId), + providerAddress: providerAddress, + latency: latency, + expectedLatency: expectedLatency, + blockHeightDiff: blockHeightDiff, + numOfProviders: numOfProviders, + servicersToCount: servicersToCount, + }) +} + +func (qosManager *QoSManager) AddFailedRelay(epoch uint64, sessionId int64) { + qosManager.mutate(&QoSMutatorRelayFailure{ + QoSMutatorBase: qosManager.createQoSMutatorBase(epoch, sessionId), + }) +} + +func (qosManager *QoSManager) SetLastReputationQoSReport(epoch uint64, sessionId int64, report *pairingtypes.QualityOfServiceReport) { + qosManager.mutate(&QoSMutatorSetReputation{ + QoSMutatorBase: qosManager.createQoSMutatorBase(epoch, sessionId), + report: report, + }) +} + +func (qosManager *QoSManager) getQoSReport(epoch uint64, sessionId int64) *QoSReport { + qosManager.lock.RLock() + defer qosManager.lock.RUnlock() + if qosManager.qosReports[epoch] == nil { + return nil + } + return qosManager.qosReports[epoch][sessionId] +} + +func (qosManager *QoSManager) GetLastQoSReport(epoch uint64, sessionId int64) *pairingtypes.QualityOfServiceReport { + qosReport := qosManager.getQoSReport(epoch, sessionId) + if qosReport == nil { + return nil + } + return qosReport.getLastQoSReport() +} + +func (qosManager *QoSManager) GetLastReputationQoSReport(epoch uint64, sessionId int64) *pairingtypes.QualityOfServiceReport { + qosReport := qosManager.getQoSReport(epoch, sessionId) + if qosReport == nil { + return nil + } + return qosReport.getLastReputationQoSReport() +} + +func (qosManager *QoSManager) GetAnsweredRelays(epoch uint64, sessionId int64) uint64 { + qosReport := qosManager.getQoSReport(epoch, sessionId) + if qosReport == nil { + return 0 + } + return qosReport.getAnsweredRelays() +} + +func (qosManager *QoSManager) GetTotalRelays(epoch uint64, sessionId int64) uint64 { + qosReport := qosManager.getQoSReport(epoch, sessionId) + if qosReport == nil { + return 0 + } + return qosReport.getTotalRelays() +} + +func (qosManager *QoSManager) GetSyncScoreSum(epoch uint64, sessionId int64) int64 { + qosReport := qosManager.getQoSReport(epoch, sessionId) + if qosReport == nil { + return 0 + } + return qosReport.getSyncScoreSum() +} + +func (qosManager *QoSManager) GetTotalSyncScore(epoch uint64, sessionId int64) int64 { + qosReport := qosManager.getQoSReport(epoch, sessionId) + if qosReport == nil { + return 0 + } + return qosReport.getTotalSyncScore() +} diff --git a/protocol/qos/qos_manager_test.go b/protocol/qos/qos_manager_test.go new file mode 100644 index 0000000000..62567b9fc6 --- /dev/null +++ b/protocol/qos/qos_manager_test.go @@ -0,0 +1,306 @@ +package qos + +import ( + "math" + "sync" + "testing" + "time" + + sdk "github.com/cosmos/cosmos-sdk/types" + pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" + "github.com/stretchr/testify/require" +) + +func TestCalculateQoS(t *testing.T) { + qosManager := NewQoSManager() + epoch := uint64(1) + sessionID := int64(1) + providerAddr := "provider1" + + // Test successful relay + qosManager.CalculateQoS( + epoch, + sessionID, + providerAddr, + 100*time.Millisecond, + 200*time.Millisecond, + 1, + 3, + 2, + ) + + report := qosManager.GetLastQoSReport(epoch, sessionID) + require.NotNil(t, report) + + totalRelays := qosManager.GetTotalRelays(epoch, sessionID) + require.Equal(t, uint64(1), totalRelays) + + answeredRelays := qosManager.GetAnsweredRelays(epoch, sessionID) + require.Equal(t, uint64(1), answeredRelays) +} + +func TestAddFailedRelay(t *testing.T) { + qosManager := NewQoSManager() + epoch := uint64(1) + sessionID := int64(1) + + qosManager.AddFailedRelay(epoch, sessionID) + totalRelays := qosManager.GetTotalRelays(epoch, sessionID) + require.Equal(t, uint64(1), totalRelays) + + answeredRelays := qosManager.GetAnsweredRelays(epoch, sessionID) + require.Equal(t, uint64(0), answeredRelays) +} + +func TestSetLastReputationQoSReport(t *testing.T) { + qosManager := NewQoSManager() + epoch := uint64(1) + sessionID := int64(1) + + testReport := &pairingtypes.QualityOfServiceReport{ + Latency: sdk.NewDec(95), + Availability: sdk.NewDec(100), + } + + qosManager.SetLastReputationQoSReport(epoch, sessionID, testReport) + report := qosManager.GetLastReputationQoSReport(epoch, sessionID) + require.NotNil(t, report) + require.Equal(t, testReport.Latency, report.Latency) + require.Equal(t, testReport.Availability, report.Availability) +} + +func TestMultipleEpochsAndSessions(t *testing.T) { + qosManager := NewQoSManager() + + // Test multiple epochs and sessions simultaneously + for epoch := uint64(1); epoch <= 3; epoch++ { + for sessionID := int64(1); sessionID <= 3; sessionID++ { + qosManager.CalculateQoS( + epoch, + sessionID, + "provider1", + 100*time.Millisecond, + 200*time.Millisecond, + 1, + 3, + 2, + ) + } + } + + // Verify each epoch/session combination + for epoch := uint64(1); epoch <= 3; epoch++ { + for sessionID := int64(1); sessionID <= 3; sessionID++ { + require.Equal(t, uint64(1), qosManager.GetTotalRelays(epoch, sessionID)) + require.NotNil(t, qosManager.GetLastQoSReport(epoch, sessionID)) + } + } +} + +func TestEdgeCaseLatencies(t *testing.T) { + qosManager := NewQoSManager() + epoch := uint64(1) + sessionID := int64(1) + + testCases := []struct { + name string + latency time.Duration + expectedLatency time.Duration + }{ + {"Zero Latency", 0, 100 * time.Millisecond}, + {"Extremely High Latency", 24 * time.Hour, 100 * time.Millisecond}, + {"Negative Expected Latency", 100 * time.Millisecond, -100 * time.Millisecond}, + {"Equal Latencies", 100 * time.Millisecond, 100 * time.Millisecond}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + qosManager.CalculateQoS( + epoch, + sessionID, + "provider1", + tc.latency, + tc.expectedLatency, + 1, + 3, + 2, + ) + require.NotNil(t, qosManager.GetLastQoSReport(epoch, sessionID)) + }) + } +} + +func TestNilReportHandling(t *testing.T) { + qosManager := NewQoSManager() + epoch := uint64(1) + sessionID := int64(1) + // Test setting nil report + qosManager.SetLastReputationQoSReport(epoch, sessionID, nil) + // Verify nil handling + report := qosManager.GetLastReputationQoSReport(epoch, sessionID) + require.Nil(t, report) + + // Test non-existent epoch/session + require.Nil(t, qosManager.GetLastQoSReport(999, 999)) + require.Equal(t, uint64(0), qosManager.GetTotalRelays(999, 999)) + require.Equal(t, uint64(0), qosManager.GetAnsweredRelays(999, 999)) +} + +func TestHighConcurrencyScenario(t *testing.T) { + qosManager := NewQoSManager() + numGoroutines := 10 + operationsPerGoroutine := 1000 + + var wg sync.WaitGroup + wg.Add(numGoroutines * 3) // 3 different operation types + + // Launch multiple goroutines for CalculateQoS + for i := 0; i < numGoroutines; i++ { + go func(routineID int) { + defer wg.Done() + for j := 0; j < operationsPerGoroutine; j++ { + qosManager.CalculateQoS( + uint64(routineID), + int64(j), + "provider1", + 100*time.Millisecond, + 200*time.Millisecond, + 1, + 3, + 2, + ) + } + }(i) + } + + // Launch multiple goroutines for AddFailedRelay + for i := 0; i < numGoroutines; i++ { + go func(routineID int) { + defer wg.Done() + for j := 0; j < operationsPerGoroutine; j++ { + qosManager.AddFailedRelay(uint64(routineID), int64(j)) + } + }(i) + } + + // Launch multiple goroutines for SetLastReputationQoSReport + for i := 0; i < numGoroutines; i++ { + go func(routineID int) { + defer wg.Done() + for j := 0; j < operationsPerGoroutine; j++ { + report := &pairingtypes.QualityOfServiceReport{ + Latency: sdk.NewDec(95), + Availability: sdk.NewDec(100), + } + qosManager.SetLastReputationQoSReport(uint64(routineID), int64(j), report) + } + }(i) + } + + wg.Wait() + + // Verify some results + for i := 0; i < numGoroutines; i++ { + for j := 0; j < operationsPerGoroutine; j++ { + totalRelays := qosManager.GetTotalRelays(uint64(i), int64(j)) + require.Equal(t, uint64(2), totalRelays) // 1 successful + 1 failed relay + require.NotNil(t, qosManager.GetLastReputationQoSReport(uint64(i), int64(j))) + } + } +} + +func TestQoSParameterBoundaries(t *testing.T) { + qosManager := NewQoSManager() + epoch := uint64(1) + sessionID := int64(1) + + testCases := []struct { + name string + latency time.Duration + expectedLatency time.Duration + blockHeightDiff int64 + numOfProviders int + servicersToCount int64 + }{ + {"Max Values", time.Duration(math.MaxInt64), time.Duration(math.MaxInt64), math.MaxInt, math.MaxInt, math.MaxInt}, + {"Min Values", 1, 1, 1, 1, 1}, + {"Zero Values", 0, 0, 0, 0, 0}, + {"Inverted Weights", 100 * time.Millisecond, 100 * time.Millisecond, 10, 5, 7}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + qosManager.CalculateQoS( + epoch, + sessionID, + "provider1", + tc.latency, + tc.expectedLatency, + tc.blockHeightDiff, + tc.numOfProviders, + tc.servicersToCount, + ) + // Verify that the manager doesn't panic and returns a report + report := qosManager.GetLastQoSReport(epoch, sessionID) + require.NotNil(t, report) + }) + } +} + +func TestSequentialOperations(t *testing.T) { + t.Run("Mixed Operations Sequence", func(t *testing.T) { + qosManager := NewQoSManager() + epoch := uint64(1) + sessionID := int64(1) + + // Sequence: Calculate -> Fail -> Calculate + qosManager.CalculateQoS( + epoch, + sessionID, + "provider1", + 100*time.Millisecond, + 200*time.Millisecond, + 1, 3, 2, + ) + qosManager.AddFailedRelay(epoch, sessionID) + qosManager.CalculateQoS( + epoch, + sessionID, + "provider1", + 100*time.Millisecond, + 200*time.Millisecond, + 1, 3, 2, + ) + require.Equal(t, uint64(3), qosManager.GetTotalRelays(epoch, sessionID)) + require.Equal(t, uint64(2), qosManager.GetAnsweredRelays(epoch, sessionID)) + }) +} + +// TODO: Enable this test when we register the QoSManager to epoch updater +// func TestMemoryManagement(t *testing.T) { +// qosManager := NewQoSManager() + +// // Create data for multiple epochs +// for epoch := uint64(1); epoch <= 100; epoch++ { +// doneChan := qosManager.CalculateQoS( +// epoch, +// 1, +// "provider1", +// 100*time.Millisecond, +// 200*time.Millisecond, +// 1, 3, 2, +// ) +// <-doneChan +// } + +// // Verify old data is not taking up memory (if cleanup is implemented) +// // Note: This test might need adjustment based on actual cleanup implementation +// t.Run("Memory Cleanup", func(t *testing.T) { +// // Add implementation-specific verification here +// // For example, verify that very old epochs are cleaned up +// veryOldEpoch := uint64(1) +// report := qosManager.GetLastQoSReport(veryOldEpoch, 1) +// require.Nil(t, report, "Old epoch data should be cleaned up") +// t.Log("Memory cleanup behavior should be verified based on implementation") +// }) +// } diff --git a/protocol/qos/qos_mutator_base.go b/protocol/qos/qos_mutator_base.go new file mode 100644 index 0000000000..894534cad1 --- /dev/null +++ b/protocol/qos/qos_mutator_base.go @@ -0,0 +1,18 @@ +package qos + +import "sync/atomic" + +// Base interface for all mutators +type Mutator interface { + Mutate(report *QoSReport) + GetEpochAndSessionId() (epoch uint64, sessionId int64) +} + +type QoSMutatorBase struct { + epoch uint64 + sessionId int64 +} + +func (qoSMutatorBase *QoSMutatorBase) GetEpochAndSessionId() (epoch uint64, sessionId int64) { + return atomic.LoadUint64(&qoSMutatorBase.epoch), atomic.LoadInt64(&qoSMutatorBase.sessionId) +} diff --git a/protocol/qos/qos_mutator_relay_failure.go b/protocol/qos/qos_mutator_relay_failure.go new file mode 100644 index 0000000000..8e05a2b6a9 --- /dev/null +++ b/protocol/qos/qos_mutator_relay_failure.go @@ -0,0 +1,10 @@ +package qos + +// Mutator for relay failure +type QoSMutatorRelayFailure struct { + *QoSMutatorBase +} + +func (qoSMutatorRelayFailure *QoSMutatorRelayFailure) Mutate(report *QoSReport) { + report.totalRelays++ +} diff --git a/protocol/qos/qos_mutator_relay_success.go b/protocol/qos/qos_mutator_relay_success.go new file mode 100644 index 0000000000..1726a727bb --- /dev/null +++ b/protocol/qos/qos_mutator_relay_success.go @@ -0,0 +1,92 @@ +package qos + +import ( + "math" + "sort" + "strconv" + "time" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/lavanet/lava/v4/utils" + pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" +) + +// Mutator for relay success +type QoSMutatorRelaySuccess struct { + *QoSMutatorBase + latency time.Duration + expectedLatency time.Duration + blockHeightDiff int64 + numOfProviders int + servicersToCount int64 + providerAddress string +} + +func (qoSMutatorRelaySuccess *QoSMutatorRelaySuccess) calculateAvailabilityScore(qosReport *QoSReport) (downtimePercentageRet, scaledAvailabilityScoreRet sdk.Dec) { + downtimePercentage := sdk.NewDecWithPrec(int64(qosReport.totalRelays-qosReport.answeredRelays), 0).Quo(sdk.NewDecWithPrec(int64(qosReport.totalRelays), 0)) + scaledAvailabilityScore := sdk.MaxDec(sdk.ZeroDec(), AvailabilityPercentage.Sub(downtimePercentage).Quo(AvailabilityPercentage)) + return downtimePercentage, scaledAvailabilityScore +} + +func (qoSMutatorRelaySuccess *QoSMutatorRelaySuccess) Mutate(report *QoSReport) { + report.totalRelays++ + report.answeredRelays++ + + if report.lastQoSReport == nil { + report.lastQoSReport = &pairingtypes.QualityOfServiceReport{} + } + + downtimePercentage, scaledAvailabilityScore := qoSMutatorRelaySuccess.calculateAvailabilityScore(report) + report.lastQoSReport.Availability = scaledAvailabilityScore + if sdk.OneDec().GT(report.lastQoSReport.Availability) { + utils.LavaFormatDebug("QoS Availability report", + utils.LogAttr("availability", report.lastQoSReport.Availability), + utils.LogAttr("down_percent", downtimePercentage), + utils.LogAttr("session_id", qoSMutatorRelaySuccess.sessionId), + utils.LogAttr("provider", qoSMutatorRelaySuccess.providerAddress), + ) + } + + if qoSMutatorRelaySuccess.latency == 0 { + qoSMutatorRelaySuccess.latency = 1 * time.Microsecond + } + + latencyScore := sdk.MinDec(sdk.OneDec(), sdk.NewDecFromInt(sdk.NewInt(int64(qoSMutatorRelaySuccess.expectedLatency))).Quo(sdk.NewDecFromInt(sdk.NewInt(int64(qoSMutatorRelaySuccess.latency))))) + + insertSorted := func(list []sdk.Dec, value sdk.Dec) []sdk.Dec { + index := sort.Search(len(list), func(i int) bool { + return list[i].GTE(value) + }) + if len(list) == index { // nil or empty slice or after last element + return append(list, value) + } + list = append(list[:index+1], list[index:]...) // index < len(a) + list[index] = value + return list + } + report.latencyScoreList = insertSorted(report.latencyScoreList, latencyScore) + report.lastQoSReport.Latency = report.latencyScoreList[int(float64(len(report.latencyScoreList))*PercentileToCalculateLatency)] + + // checking if we have enough information to calculate the sync score for the providers, if we haven't talked + // with enough providers we don't have enough information and we will wait to have more information before setting the sync score + shouldCalculateSyncScore := int64(qoSMutatorRelaySuccess.numOfProviders) > int64(math.Ceil(float64(qoSMutatorRelaySuccess.servicersToCount)*MinProvidersForSync)) + if shouldCalculateSyncScore { // + if qoSMutatorRelaySuccess.blockHeightDiff <= 0 { // if the diff is bigger than 0 than the block is too old (blockHeightDiff = expected - allowedLag - blockHeight) and we don't give him the score + report.syncScoreSum++ + } + report.totalSyncScore++ + report.lastQoSReport.Sync = sdk.NewDec(report.syncScoreSum).QuoInt64(report.totalSyncScore) + if sdk.OneDec().GT(report.lastQoSReport.Sync) { + utils.LavaFormatDebug("QoS Sync report", + utils.LogAttr("sync", report.lastQoSReport.Sync), + utils.LogAttr("block_diff", qoSMutatorRelaySuccess.blockHeightDiff), + utils.LogAttr("sync_score", strconv.FormatInt(report.syncScoreSum, 10)+"/"+strconv.FormatInt(report.totalSyncScore, 10)), + utils.LogAttr("session_id", qoSMutatorRelaySuccess.sessionId), + utils.LogAttr("provider", qoSMutatorRelaySuccess.providerAddress), + ) + } + } else { + // we prefer to give them a score of 1 when there is no other data, since otherwise we damage their payments + report.lastQoSReport.Sync = sdk.NewDec(1) + } +} diff --git a/protocol/lavasession/consumer_types_test.go b/protocol/qos/qos_mutator_relay_success_test.go similarity index 56% rename from protocol/lavasession/consumer_types_test.go rename to protocol/qos/qos_mutator_relay_success_test.go index 931226cfed..309a501e37 100644 --- a/protocol/lavasession/consumer_types_test.go +++ b/protocol/qos/qos_mutator_relay_success_test.go @@ -1,4 +1,4 @@ -package lavasession +package qos import ( "testing" @@ -11,21 +11,20 @@ func TestCalculateAvailabilityScore(t *testing.T) { avialabilityAsFloat, err := AvailabilityPercentage.Float64() require.NoError(t, err) precision := uint64(10000) - qosReport := &QoSReport{ - TotalRelays: precision, - AnsweredRelays: precision - uint64(avialabilityAsFloat*float64(precision)), - } - downTime, availabilityScore := CalculateAvailabilityScore(qosReport) + + qosReport := QoSReport{} + qosReport.totalRelays = precision + qosReport.answeredRelays = precision - uint64(avialabilityAsFloat*float64(precision)) + qoSMutatorRelaySuccess := QoSMutatorRelaySuccess{} + downTime, availabilityScore := qoSMutatorRelaySuccess.calculateAvailabilityScore(&qosReport) downTimeFloat, err := downTime.Float64() require.NoError(t, err) require.Equal(t, downTimeFloat, avialabilityAsFloat) require.Zero(t, availabilityScore.BigInt().Uint64()) - qosReport = &QoSReport{ - TotalRelays: 2 * precision, - AnsweredRelays: 2*precision - uint64(avialabilityAsFloat*float64(precision)), - } - downTime, availabilityScore = CalculateAvailabilityScore(qosReport) + qosReport.totalRelays = 2 * precision + qosReport.answeredRelays = 2*precision - uint64(avialabilityAsFloat*float64(precision)) + downTime, availabilityScore = qoSMutatorRelaySuccess.calculateAvailabilityScore(&qosReport) downTimeFloat, err = downTime.Float64() require.NoError(t, err) halfDec, err := sdk.NewDecFromStr("0.5") diff --git a/protocol/qos/qos_mutator_set_reputation.go b/protocol/qos/qos_mutator_set_reputation.go new file mode 100644 index 0000000000..3961ad81a5 --- /dev/null +++ b/protocol/qos/qos_mutator_set_reputation.go @@ -0,0 +1,15 @@ +package qos + +import ( + pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" +) + +// Mutator to set usage for a session +type QoSMutatorSetReputation struct { + *QoSMutatorBase + report *pairingtypes.QualityOfServiceReport +} + +func (qoSMutatorSetReputation *QoSMutatorSetReputation) Mutate(report *QoSReport) { + report.lastReputationQoSReport = qoSMutatorSetReputation.report +} diff --git a/protocol/qos/qos_report.go b/protocol/qos/qos_report.go new file mode 100644 index 0000000000..33b5fd5364 --- /dev/null +++ b/protocol/qos/qos_report.go @@ -0,0 +1,61 @@ +package qos + +import ( + "sync" + + sdk "github.com/cosmos/cosmos-sdk/types" + pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" +) + +type QoSReport struct { + lastQoSReport *pairingtypes.QualityOfServiceReport + lastReputationQoSReport *pairingtypes.QualityOfServiceReport + latencyScoreList []sdk.Dec + syncScoreSum int64 + totalSyncScore int64 + totalRelays uint64 + answeredRelays uint64 + lock sync.RWMutex +} + +func (qr *QoSReport) mutate(mutator Mutator) { + qr.lock.Lock() + defer qr.lock.Unlock() + mutator.Mutate(qr) +} + +func (qr *QoSReport) getLastQoSReport() *pairingtypes.QualityOfServiceReport { + qr.lock.RLock() + defer qr.lock.RUnlock() + return qr.lastQoSReport +} + +func (qr *QoSReport) getLastReputationQoSReport() *pairingtypes.QualityOfServiceReport { + qr.lock.RLock() + defer qr.lock.RUnlock() + return qr.lastReputationQoSReport +} + +func (qr *QoSReport) getAnsweredRelays() uint64 { + qr.lock.RLock() + defer qr.lock.RUnlock() + return qr.answeredRelays +} + +func (qr *QoSReport) getTotalRelays() uint64 { + qr.lock.RLock() + defer qr.lock.RUnlock() + return qr.totalRelays +} + +func (qr *QoSReport) getSyncScoreSum() int64 { + qr.lock.RLock() + defer qr.lock.RUnlock() + return qr.syncScoreSum +} + +func (qr *QoSReport) getTotalSyncScore() int64 { + qr.lock.RLock() + defer qr.lock.RUnlock() + return qr.totalSyncScore +} diff --git a/protocol/rpcconsumer/relay_errors.go b/protocol/rpcconsumer/relay_errors.go index 2f8edbdfbd..cfc39740c8 100644 --- a/protocol/rpcconsumer/relay_errors.go +++ b/protocol/rpcconsumer/relay_errors.go @@ -55,10 +55,10 @@ func (r *RelayErrors) GetBestErrorMessageForUser() RelayError { for idx, relayError := range r.relayErrors { errorMessage := r.sanitizeError(relayError.err) errorMap[errorMessage] = append(errorMap[errorMessage], idx) - if relayError.ProviderInfo.ProviderQoSExcellenceSummery.IsNil() || relayError.ProviderInfo.ProviderStake.Amount.IsNil() { + if relayError.ProviderInfo.ProviderReputationSummary.IsNil() || relayError.ProviderInfo.ProviderStake.Amount.IsNil() { continue } - currentResult := relayError.ProviderInfo.ProviderQoSExcellenceSummery.MulInt(relayError.ProviderInfo.ProviderStake.Amount) + currentResult := relayError.ProviderInfo.ProviderReputationSummary.MulInt(relayError.ProviderInfo.ProviderStake.Amount) if currentResult.GTE(bestResult) { // 0 or 1 here are valid replacements, so even 0 scores will return the error value bestResult.Set(currentResult) bestIndex = idx diff --git a/protocol/rpcconsumer/relay_errors_test.go b/protocol/rpcconsumer/relay_errors_test.go index 5e48db5975..7aefce983c 100644 --- a/protocol/rpcconsumer/relay_errors_test.go +++ b/protocol/rpcconsumer/relay_errors_test.go @@ -23,36 +23,36 @@ func TestRelayError(t *testing.T) { { err: fmt.Errorf("test1"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, { err: fmt.Errorf("test2"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 20), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 20), }, }, { err: fmt.Errorf("test3"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 30), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 30), }, }, { err: fmt.Errorf("test4"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 40), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 40), }, }, { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 50), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 50), }, }, }, @@ -66,50 +66,50 @@ func TestRelayError(t *testing.T) { { err: fmt.Errorf("test1"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.MustNewDecFromStr("0.5"), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.MustNewDecFromStr("0.5"), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, { err: fmt.Errorf("test1"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.MustNewDecFromStr("0.25"), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.MustNewDecFromStr("0.25"), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, { err: fmt.Errorf("test3"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.MustNewDecFromStr("0.6"), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.MustNewDecFromStr("0.6"), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, { err: fmt.Errorf("test3"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.MustNewDecFromStr("0.7"), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.MustNewDecFromStr("0.7"), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, { err: fmt.Errorf("test4"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.MustNewDecFromStr("0.7"), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.MustNewDecFromStr("0.7"), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, { err: fmt.Errorf("test4"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.MustNewDecFromStr("0.7"), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.MustNewDecFromStr("0.7"), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.MustNewDecFromStr("0.8"), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.MustNewDecFromStr("0.8"), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, }, @@ -123,36 +123,36 @@ func TestRelayError(t *testing.T) { { err: fmt.Errorf("test1"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 1000), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 1000), }, }, { err: fmt.Errorf("test2"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 1000), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 1000), }, }, { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.ZeroDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 0), + ProviderReputationSummary: sdk.ZeroDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 0), }, }, { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.ZeroDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 0), + ProviderReputationSummary: sdk.ZeroDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 0), }, }, { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.ZeroDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 0), + ProviderReputationSummary: sdk.ZeroDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 0), }, }, }, @@ -166,36 +166,36 @@ func TestRelayError(t *testing.T) { { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 20), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 20), }, }, { err: fmt.Errorf("test3"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 30), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 30), }, }, { err: fmt.Errorf("test4"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 40), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 40), }, }, { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, }, @@ -209,36 +209,36 @@ func TestRelayError(t *testing.T) { { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 20), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 20), }, }, { err: fmt.Errorf("test3"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 30), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 30), }, }, { err: fmt.Errorf("test4"), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 40), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 40), }, }, { err: fmt.Errorf("%s", expectedValue), ProviderInfo: common.ProviderInfo{ - ProviderQoSExcellenceSummery: sdk.OneDec(), - ProviderStake: sdk.NewInt64Coin("ulava", 10), + ProviderReputationSummary: sdk.OneDec(), + ProviderStake: sdk.NewInt64Coin("ulava", 10), }, }, }, diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index d68520a8a8..705a57e87b 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -314,10 +314,10 @@ func (rp *RelayProcessor) responsesQuorum(results []common.RelayResult, quorumSi if result.Reply != nil && result.Reply.Data != nil { countMap[string(result.Reply.Data)]++ if !deterministic { - if result.ProviderInfo.ProviderQoSExcellenceSummery.IsNil() || result.ProviderInfo.ProviderStake.Amount.IsNil() { + if result.ProviderInfo.ProviderReputationSummary.IsNil() || result.ProviderInfo.ProviderStake.Amount.IsNil() { continue } - currentResult := result.ProviderInfo.ProviderQoSExcellenceSummery.MulInt(result.ProviderInfo.ProviderStake.Amount) + currentResult := result.ProviderInfo.ProviderReputationSummary.MulInt(result.ProviderInfo.ProviderStake.Amount) if currentResult.GTE(bestQos) { bestQos.Set(currentResult) bestQosResult = result diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 8592da8408..dd8fde3397 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -735,7 +735,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( go func(providerPublicAddress string, sessionInfo *lavasession.SessionInfo) { // add ticker launch metrics localRelayResult := &common.RelayResult{ - ProviderInfo: common.ProviderInfo{ProviderAddress: providerPublicAddress, ProviderStake: sessionInfo.StakeSize, ProviderQoSExcellenceSummery: sessionInfo.QoSSummeryResult}, + ProviderInfo: common.ProviderInfo{ProviderAddress: providerPublicAddress, ProviderStake: sessionInfo.StakeSize, ProviderReputationSummary: sessionInfo.QoSSummaryResult}, Finalized: false, // setting the single consumer session as the conflict handler. // to be able to validate if we need to report this provider or not. @@ -862,17 +862,20 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( utils.Attribute{Key: "providersCount", Value: pairingAddressesLen}, ) } - if rpccs.debugRelays && singleConsumerSession.QoSInfo.LastQoSReport != nil && - singleConsumerSession.QoSInfo.LastQoSReport.Sync.BigInt() != nil && - singleConsumerSession.QoSInfo.LastQoSReport.Sync.LT(sdk.MustNewDecFromStr("0.9")) { - utils.LavaFormatDebug("identified QoS mismatch", - utils.Attribute{Key: "expectedBH", Value: expectedBH}, - utils.Attribute{Key: "latestServicedBlock", Value: latestBlock}, - utils.Attribute{Key: "session_id", Value: singleConsumerSession.SessionId}, - utils.Attribute{Key: "provider_address", Value: singleConsumerSession.Parent.PublicLavaAddress}, - utils.Attribute{Key: "providersCount", Value: pairingAddressesLen}, - utils.Attribute{Key: "singleConsumerSession.QoSInfo", Value: singleConsumerSession.QoSInfo}, - ) + + if rpccs.debugRelays { + lastQoSReport := singleConsumerSession.QoSManager.GetLastQoSReport(epoch, singleConsumerSession.SessionId) + if lastQoSReport != nil && lastQoSReport.Sync.BigInt() != nil && + lastQoSReport.Sync.LT(sdk.MustNewDecFromStr("0.9")) { + utils.LavaFormatDebug("identified QoS mismatch", + utils.Attribute{Key: "expectedBH", Value: expectedBH}, + utils.Attribute{Key: "latestServicedBlock", Value: latestBlock}, + utils.Attribute{Key: "session_id", Value: singleConsumerSession.SessionId}, + utils.Attribute{Key: "provider_address", Value: singleConsumerSession.Parent.PublicLavaAddress}, + utils.Attribute{Key: "providersCount", Value: pairingAddressesLen}, + utils.Attribute{Key: "singleConsumerSession.QoSInfo", Value: singleConsumerSession.QoSManager}, + ) + } } errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(protocolMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(expectedRelayTimeoutForQOS), expectedBH, numOfProviders, pairingAddressesLen, protocolMessage.GetApi().Category.HangingApi, extensions) // session done successfully @@ -1051,7 +1054,7 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe utils.LogAttr("provider", relayRequest.RelaySession.Provider), utils.LogAttr("cuSum", relayRequest.RelaySession.CuSum), utils.LogAttr("QosReport", relayRequest.RelaySession.QosReport), - utils.LogAttr("QosReportExcellence", relayRequest.RelaySession.QosExcellenceReport), + utils.LogAttr("ReputationReport", relayRequest.RelaySession.QosExcellenceReport), utils.LogAttr("relayNum", relayRequest.RelaySession.RelayNum), utils.LogAttr("sessionId", relayRequest.RelaySession.SessionId), utils.LogAttr("latency", relayLatency), diff --git a/protocol/rpcprovider/reliabilitymanager/reliability_manager_test.go b/protocol/rpcprovider/reliabilitymanager/reliability_manager_test.go index 29ad44c3f6..45b4768f00 100644 --- a/protocol/rpcprovider/reliabilitymanager/reliability_manager_test.go +++ b/protocol/rpcprovider/reliabilitymanager/reliability_manager_test.go @@ -16,6 +16,7 @@ import ( "github.com/lavanet/lava/v4/protocol/lavaprotocol" "github.com/lavanet/lava/v4/protocol/lavaprotocol/finalizationverification" "github.com/lavanet/lava/v4/protocol/lavasession" + "github.com/lavanet/lava/v4/protocol/qos" "github.com/lavanet/lava/v4/protocol/rpcprovider/reliabilitymanager" "github.com/lavanet/lava/v4/protocol/statetracker" testkeeper "github.com/lavanet/lava/v4/testutil/keeper" @@ -47,7 +48,7 @@ func TestFullFlowReliabilityCompare(t *testing.T) { singleConsumerSession := &lavasession.SingleConsumerSession{ CuSum: 20, LatestRelayCu: 10, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + QoSManager: qos.NewQoSManager(), SessionId: 123, Parent: nil, RelayNum: 1, @@ -58,7 +59,7 @@ func TestFullFlowReliabilityCompare(t *testing.T) { singleConsumerSession2 := &lavasession.SingleConsumerSession{ CuSum: 200, LatestRelayCu: 100, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + QoSManager: qos.NewQoSManager(), SessionId: 456, Parent: nil, RelayNum: 5, @@ -200,7 +201,7 @@ func TestFullFlowReliabilityConflict(t *testing.T) { singleConsumerSession := &lavasession.SingleConsumerSession{ CuSum: 20, LatestRelayCu: 10, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + QoSManager: qos.NewQoSManager(), SessionId: 123, Parent: nil, RelayNum: 1, @@ -212,7 +213,7 @@ func TestFullFlowReliabilityConflict(t *testing.T) { singleConsumerSession2 := &lavasession.SingleConsumerSession{ CuSum: 200, LatestRelayCu: 100, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + QoSManager: qos.NewQoSManager(), SessionId: 456, Parent: consumerSessionWithProvider, RelayNum: 5, diff --git a/utils/channeled_wait_group.go b/utils/channeled_wait_group.go new file mode 100644 index 0000000000..a480fb2c8a --- /dev/null +++ b/utils/channeled_wait_group.go @@ -0,0 +1,22 @@ +package utils + +import "sync" + +type ChanneledWaitGroup struct { + sync.WaitGroup + doneChan chan struct{} +} + +func NewChanneledWaitGroup() *ChanneledWaitGroup { + return &ChanneledWaitGroup{ + doneChan: make(chan struct{}, 1), + } +} + +func (wg *ChanneledWaitGroup) Wait() <-chan struct{} { + go func() { + wg.WaitGroup.Wait() + wg.doneChan <- struct{}{} + }() + return wg.doneChan +} diff --git a/utils/channeled_wait_group_test.go b/utils/channeled_wait_group_test.go new file mode 100644 index 0000000000..75a430bb03 --- /dev/null +++ b/utils/channeled_wait_group_test.go @@ -0,0 +1,90 @@ +package utils + +import ( + "testing" + "time" +) + +func TestChanneledWaitGroup(t *testing.T) { + t.Run("basic functionality", func(t *testing.T) { + wg := NewChanneledWaitGroup() + wg.Add(2) + + go func() { + time.Sleep(50 * time.Millisecond) + wg.Done() + }() + + go func() { + time.Sleep(100 * time.Millisecond) + wg.Done() + }() + + select { + case <-wg.Wait(): + // Success + case <-time.After(200 * time.Millisecond): + t.Fatal("timeout waiting for goroutines") + } + }) + + t.Run("zero count should complete immediately", func(t *testing.T) { + wg := NewChanneledWaitGroup() + + select { + case <-wg.Wait(): + // Success + case <-time.After(100 * time.Millisecond): + t.Fatal("should complete immediately with zero count") + } + }) + + t.Run("multiple waits should all receive completion", func(t *testing.T) { + wg := NewChanneledWaitGroup() + wg.Add(1) + + // Start three goroutines waiting + for i := 0; i < 3; i++ { + go func() { + select { + case <-wg.Wait(): + // Success + case <-time.After(200 * time.Millisecond): + t.Error("timeout waiting for completion") + } + }() + } + + time.Sleep(50 * time.Millisecond) // Give waiters time to start + wg.Done() + time.Sleep(100 * time.Millisecond) // Give waiters time to complete + }) + + t.Run("reuse after completion", func(t *testing.T) { + wg := NewChanneledWaitGroup() + wg.Add(1) + wg.Done() + + // First wait should complete + select { + case <-wg.Wait(): + // Success + case <-time.After(100 * time.Millisecond): + t.Fatal("first wait should complete") + } + + // Reset and use again + wg.Add(1) + go func() { + time.Sleep(50 * time.Millisecond) + wg.Done() + }() + + select { + case <-wg.Wait(): + // Success + case <-time.After(100 * time.Millisecond): + t.Fatal("second wait should complete") + } + }) +} diff --git a/x/pairing/keeper/msg_server_relay_payment.go b/x/pairing/keeper/msg_server_relay_payment.go index df4fb3bd14..9c2f8a317a 100644 --- a/x/pairing/keeper/msg_server_relay_payment.go +++ b/x/pairing/keeper/msg_server_relay_payment.go @@ -501,7 +501,7 @@ func (k Keeper) aggregateReputationEpochQosScore(ctx sdk.Context, subscription s } syncFactor := k.ReputationLatencyOverSyncFactor(ctx) - score, err := relay.QosExcellenceReport.ComputeQoSExcellence(types.WithSyncFactor(syncFactor)) + score, err := relay.QosExcellenceReport.ComputeReputation(types.WithSyncFactor(syncFactor)) if err != nil { return utils.LavaFormatWarning("RelayPayment: could not compute qos excellence score", err, utils.LogAttr("consumer", subscription), diff --git a/x/pairing/types/qos_report.go b/x/pairing/types/qos_report.go index 01367e71ec..a1c5eb5ea4 100644 --- a/x/pairing/types/qos_report.go +++ b/x/pairing/types/qos_report.go @@ -100,7 +100,7 @@ func WithBlockErrorProbability(probability sdk.Dec) Option { } } -// ComputeQoSExcellence calculates a score from the QoS excellence report by the following formula: +// ComputeReputation calculates a score from the QoS excellence report by the following formula: // If the requested block is the latest block or "not applicable" (called from the node's code): // // score = latency + sync*syncFactor + ((1/availability) - 1) * FailureCost @@ -114,7 +114,7 @@ func WithBlockErrorProbability(probability sdk.Dec) Option { // Important: when using this function from the node's code, do not configure the block error probability // (in default mode, it's unused) // TODO: after the reputation feature is merged, use this method to calculate the QoS excellence score -func (qos *QualityOfServiceReport) ComputeQoSExcellence(opts ...Option) (sdk.Dec, error) { +func (qos *QualityOfServiceReport) ComputeReputation(opts ...Option) (sdk.Dec, error) { if err := qos.Validate(); err != nil { return sdk.ZeroDec(), err } @@ -138,8 +138,8 @@ func (qos *QualityOfServiceReport) ComputeQoSExcellence(opts ...Option) (sdk.Dec return latency.Add(sync).Add(availability), nil } -func (qos *QualityOfServiceReport) ComputeQoSExcellenceFloat64(opts ...Option) (float64, error) { - scoreDec, err := qos.ComputeQoSExcellence(opts...) +func (qos *QualityOfServiceReport) ComputeReputationFloat64(opts ...Option) (float64, error) { + scoreDec, err := qos.ComputeReputation(opts...) if err != nil { return 0, err } diff --git a/x/pairing/types/qos_report_test.go b/x/pairing/types/qos_report_test.go index 6f7eaaa0f1..74a937e45a 100644 --- a/x/pairing/types/qos_report_test.go +++ b/x/pairing/types/qos_report_test.go @@ -110,7 +110,7 @@ func TestQosCompute(t *testing.T) { for _, tt := range template { t.Run(tt.name, func(t *testing.T) { - score, err := qos.ComputeQoSExcellence(tt.opts...) + score, err := qos.ComputeReputation(tt.opts...) require.NoError(t, err) require.True(t, tt.expectedScore.Equal(score)) }) @@ -122,15 +122,15 @@ func TestQosFailureCost(t *testing.T) { qos := types.QualityOfServiceReport{Latency: sdk.OneDec(), Sync: sdk.OneDec(), Availability: sdk.NewDecWithPrec(5, 1)} failureCost, highFailureCost := int64(1), int64(3) - score, err := qos.ComputeQoSExcellence(types.WithFailureCost(failureCost)) + score, err := qos.ComputeReputation(types.WithFailureCost(failureCost)) require.NoError(t, err) - scoreHighFailure, err := qos.ComputeQoSExcellence(types.WithFailureCost(highFailureCost)) + scoreHighFailure, err := qos.ComputeReputation(types.WithFailureCost(highFailureCost)) require.NoError(t, err) require.True(t, scoreHighFailure.GT(score)) - scoreWithProb, err := qos.ComputeQoSExcellence(types.WithFailureCost(failureCost), types.WithBlockErrorProbability(sdk.OneDec())) + scoreWithProb, err := qos.ComputeReputation(types.WithFailureCost(failureCost), types.WithBlockErrorProbability(sdk.OneDec())) require.NoError(t, err) - scoreHighFailureWithProb, err := qos.ComputeQoSExcellence(types.WithFailureCost(highFailureCost), types.WithBlockErrorProbability(sdk.OneDec())) + scoreHighFailureWithProb, err := qos.ComputeReputation(types.WithFailureCost(highFailureCost), types.WithBlockErrorProbability(sdk.OneDec())) require.NoError(t, err) require.True(t, scoreHighFailureWithProb.GT(scoreWithProb)) } @@ -140,9 +140,9 @@ func TestQosSyncFactor(t *testing.T) { qos := types.QualityOfServiceReport{Latency: sdk.OneDec(), Sync: sdk.OneDec(), Availability: sdk.NewDecWithPrec(5, 1)} syncFactor, highSyncFactor := sdk.NewDecWithPrec(5, 1), sdk.NewDecWithPrec(8, 1) - score, err := qos.ComputeQoSExcellence(types.WithSyncFactor(syncFactor)) + score, err := qos.ComputeReputation(types.WithSyncFactor(syncFactor)) require.NoError(t, err) - scoreHighSyncFactor, err := qos.ComputeQoSExcellence(types.WithSyncFactor(highSyncFactor)) + scoreHighSyncFactor, err := qos.ComputeReputation(types.WithSyncFactor(highSyncFactor)) require.NoError(t, err) require.True(t, scoreHighSyncFactor.GT(score)) } @@ -156,18 +156,18 @@ func TestQosStrategyFactor(t *testing.T) { // we get the balancedScore with a balanced strategy and subtract the latency component of the balancedScore // this way, our balancedScore will only be syncFactor*sync (syncFactor = configuredSyncFactor * strategyFactor) - balancedScore, err := qos.ComputeQoSExcellence(types.WithStrategyFactor(types.BalancedStrategyFactor)) + balancedScore, err := qos.ComputeReputation(types.WithStrategyFactor(types.BalancedStrategyFactor)) require.NoError(t, err) balancedScore = balancedScore.Sub(sdk.OneDec()) // calculate score with latency strategy - sync component should be smaller than the component in balancedScore - latencyScore, err := qos.ComputeQoSExcellence(types.WithStrategyFactor(types.LatencyStrategyFactor)) + latencyScore, err := qos.ComputeReputation(types.WithStrategyFactor(types.LatencyStrategyFactor)) require.NoError(t, err) latencyScore = latencyScore.Sub(sdk.OneDec()) require.True(t, balancedScore.GT(latencyScore)) // calculate score with sync freshness strategy - sync component should be bigger than the component in balancedScore - syncScore, err := qos.ComputeQoSExcellence(types.WithStrategyFactor(types.SyncFreshnessStrategyFactor)) + syncScore, err := qos.ComputeReputation(types.WithStrategyFactor(types.SyncFreshnessStrategyFactor)) require.NoError(t, err) syncScore = syncScore.Sub(sdk.OneDec()) require.True(t, balancedScore.LT(syncScore)) @@ -178,9 +178,9 @@ func TestQosBlockErrorProbability(t *testing.T) { qos := types.QualityOfServiceReport{Latency: sdk.OneDec(), Sync: sdk.OneDec(), Availability: sdk.OneDec()} probabililty, highProbabililty := sdk.NewDecWithPrec(5, 1), sdk.NewDecWithPrec(8, 1) - score, err := qos.ComputeQoSExcellence(types.WithBlockErrorProbability(probabililty)) + score, err := qos.ComputeReputation(types.WithBlockErrorProbability(probabililty)) require.NoError(t, err) - scoreHighProbabililty, err := qos.ComputeQoSExcellence(types.WithBlockErrorProbability(highProbabililty)) + scoreHighProbabililty, err := qos.ComputeReputation(types.WithBlockErrorProbability(highProbabililty)) require.NoError(t, err) require.True(t, scoreHighProbabililty.GT(score)) } @@ -207,10 +207,10 @@ func TestQosReport(t *testing.T) { Sync: sdk.MustNewDecFromStr("0.5"), } - qos1Res, errQos1 := qos1.ComputeQoSExcellence() - qos2Res, errQos2 := qos2.ComputeQoSExcellence() - qos3Res, errQos3 := qos3.ComputeQoSExcellence() - qos4Res, errQos4 := qos4.ComputeQoSExcellence() + qos1Res, errQos1 := qos1.ComputeReputation() + qos2Res, errQos2 := qos2.ComputeReputation() + qos3Res, errQos3 := qos3.ComputeReputation() + qos4Res, errQos4 := qos4.ComputeReputation() require.NoError(t, errQos1) require.NoError(t, errQos2) require.NoError(t, errQos3)