diff --git a/go.mod b/go.mod index 9f16e3b357..676656d085 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,9 @@ require ( github.com/cosmos/cosmos-proto v1.0.0-beta.5 github.com/cosmos/gogoproto v1.4.10 github.com/cosmos/ibc-apps/middleware/packet-forward-middleware/v7 v7.1.3 + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 github.com/dgraph-io/badger/v4 v4.1.0 + github.com/dgraph-io/ristretto/v2 v2.0.1 github.com/fullstorydev/grpcurl v1.8.5 github.com/goccy/go-json v0.10.2 github.com/gogo/status v1.1.0 @@ -44,6 +46,7 @@ require ( github.com/joho/godotenv v1.3.0 github.com/newrelic/go-agent/v3 v3.20.4 github.com/praserx/ipconv v1.2.1 + github.com/sergi/go-diff v1.3.1 github.com/spf13/pflag v1.0.5 github.com/tidwall/gjson v1.16.0 github.com/tidwall/sjson v1.2.5 @@ -80,13 +83,10 @@ require ( github.com/cosmos/ics23/go v0.10.0 // indirect github.com/cosmos/rosetta-sdk-go v0.10.0 // indirect github.com/creachadair/taskgroup v0.4.2 // indirect - github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect - github.com/dgraph-io/ristretto/v2 v2.0.1 // indirect github.com/getsentry/sentry-go v0.23.0 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/googleapis v1.4.1 // indirect - github.com/golang/glog v1.2.0 // indirect github.com/google/flatbuffers v1.12.1 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/s2a-go v0.1.7 // indirect @@ -155,7 +155,7 @@ require ( github.com/deckarep/golang-set v1.8.0 github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect github.com/dgraph-io/badger/v2 v2.2007.4 // indirect - github.com/dgraph-io/ristretto v0.2.0 + github.com/dgraph-io/ristretto v0.2.0 // indirect github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect diff --git a/go.sum b/go.sum index 4cc939a4c2..4adb899db5 100644 --- a/go.sum +++ b/go.sum @@ -336,8 +336,6 @@ github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheggaaa/pb v1.0.27/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s= @@ -461,8 +459,6 @@ github.com/dgraph-io/badger/v4 v4.1.0 h1:E38jc0f+RATYrycSUf9LMv/t47XAy+3CApyYSq4 github.com/dgraph-io/badger/v4 v4.1.0/go.mod h1:P50u28d39ibBRmIJuQC/NSdBOg46HnHw7al2SW5QRHg= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.0.3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= -github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= -github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgraph-io/ristretto v0.2.0 h1:XAfl+7cmoUDWW/2Lx8TGZQjjxIQ2Ley9DSf52dru4WE= github.com/dgraph-io/ristretto v0.2.0/go.mod h1:8uBHCU/PBV4Ag0CJrP47b9Ofby5dqWNh4FicAdoqFNU= github.com/dgraph-io/ristretto/v2 v2.0.1 h1:7W0LfEP+USCmtrUjJsk+Jv2jbhJmb72N4yRI7GrLdMI= @@ -621,8 +617,6 @@ github.com/golang-jwt/jwt/v4 v4.3.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzw github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= -github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -1176,6 +1170,8 @@ github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KR github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= +github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1:Bn1aCHHRnjv4Bl16T8rcaFjYSrGrIZvpiGO6P3Q4GpU= github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= @@ -1234,8 +1230,6 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= @@ -1622,12 +1616,9 @@ golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= diff --git a/protocol/chainlib/base_chain_parser.go b/protocol/chainlib/base_chain_parser.go index 00b03ea90d..525ed72abc 100644 --- a/protocol/chainlib/base_chain_parser.go +++ b/protocol/chainlib/base_chain_parser.go @@ -12,6 +12,7 @@ import ( "github.com/lavanet/lava/v4/protocol/chainlib/extensionslib" "github.com/lavanet/lava/v4/protocol/common" + "github.com/lavanet/lava/v4/protocol/parser" "github.com/lavanet/lava/v4/utils" "github.com/lavanet/lava/v4/utils/lavaslices" "github.com/lavanet/lava/v4/utils/maps" @@ -35,25 +36,74 @@ type InternalPath struct { Addon string } +type ErrorPattern struct { + TooNewPattern string + TooOldPattern string +} + +func (bep *ErrorPattern) IsEmpty() bool { + return bep.TooNewPattern == "" && bep.TooOldPattern == "" +} + type BaseChainParser struct { - internalPaths map[string]InternalPath - taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer - spec spectypes.Spec - rwLock sync.RWMutex - serverApis map[ApiKey]ApiContainer - apiCollections map[CollectionKey]*spectypes.ApiCollection - headers map[ApiKey]*spectypes.Header - verifications map[VerificationKey]map[string][]VerificationContainer // map[VerificationKey]map[InternalPath][]VerificationContainer - allowedAddons map[string]bool - extensionParser extensionslib.ExtensionParser - active bool + internalPaths map[string]InternalPath + taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer + spec spectypes.Spec + rwLock sync.RWMutex + serverApis map[ApiKey]ApiContainer + apiCollections map[CollectionKey]*spectypes.ApiCollection + headers map[ApiKey]*spectypes.Header + verifications map[VerificationKey]map[string][]VerificationContainer // map[VerificationKey]map[InternalPath][]VerificationContainer + allowedAddons map[string]bool + extensionParser extensionslib.ExtensionParser + active bool + blockErrorPattern ErrorPattern +} + +// allows an optional kind to specify the type of error to identify +// LATEST is identifying too new error +// EARLIEST is identifying too old error +func (bcp *BaseChainParser) IdentifyNodeError(message string, kind ...DataKind) (isBlockError bool, blockHeight int64) { + bcp.rwLock.RLock() + defer bcp.rwLock.RUnlock() + if bcp.blockErrorPattern.IsEmpty() { + return false, 0 + } + if len(kind) == 0 || kind[0] == LATEST { + var success bool + success, blockHeight := parser.ParseNumberFromPattern(bcp.blockErrorPattern.TooNewPattern, message) + if success { + return true, blockHeight + } + } + if len(kind) == 0 || kind[0] == EARLIEST { + success, blockHeight := parser.ParseNumberFromPattern(bcp.blockErrorPattern.TooOldPattern, message) + if success { + return true, blockHeight + } + } + return false, 0 +} + +func (bcp *BaseChainParser) SetBlockErrorPattern(pattern string, kind DataKind) { + bcp.rwLock.Lock() + defer bcp.rwLock.Unlock() + if kind == EARLIEST { + bcp.blockErrorPattern.TooOldPattern = pattern + } else if kind == LATEST { + bcp.blockErrorPattern.TooNewPattern = pattern + } } func (bcp *BaseChainParser) Activate() { + bcp.rwLock.Lock() + defer bcp.rwLock.Unlock() bcp.active = true } func (bcp *BaseChainParser) Active() bool { + bcp.rwLock.RLock() + defer bcp.rwLock.RUnlock() return bcp.active } diff --git a/protocol/chainlib/chain_fetcher.go b/protocol/chainlib/chain_fetcher.go index 5a6ca38362..66e0b3c11f 100644 --- a/protocol/chainlib/chain_fetcher.go +++ b/protocol/chainlib/chain_fetcher.go @@ -43,7 +43,11 @@ type ChainFetcher struct { } func (cf *ChainFetcher) FetchEndpoint() lavasession.RPCProviderEndpoint { - return *cf.endpoint + endpoint := cf.endpoint + if endpoint == nil { + return lavasession.RPCProviderEndpoint{} + } + return *endpoint } func (cf *ChainFetcher) Validate(ctx context.Context) error { @@ -369,28 +373,56 @@ func (cf *ChainFetcher) constructRelayData(conectionType string, path string, da return relayData } -func (cf *ChainFetcher) FetchBlockHashByNum(ctx context.Context, blockNum int64) (string, error) { +func findFormatDirective(template, data string) string { + for i := 0; i < len(template); i++ { + if template[i] != data[i] { + return template[i : i+2] // Assuming the directive is two characters long, like %x or %d + } + } + return "" +} + +func (cf *ChainFetcher) FetchBlock(ctx context.Context, blockNum int64) (response string, errorMessage string, format string, err error) { + parsing, _, _, _, data, chainMessage, reply, _, _, err := cf.fetchSpecificBlock(ctx, blockNum) + if err != nil { + return "", "", "", err + } + template := parsing.FunctionTemplate + format = findFormatDirective(template, string(data)) + _, errorMessage = chainMessage.CheckResponseError(reply.RelayReply.Data, reply.StatusCode) + return string(reply.RelayReply.Data), errorMessage, format, nil +} + +func (cf *ChainFetcher) fetchSpecificBlock(ctx context.Context, blockNum int64) (*spectypes.ParseDirective, string, spectypes.CollectionData, string, []byte, ChainMessageForSend, *RelayReplyWrapper, common.NodeUrl, string, error) { parsing, apiCollection, ok := cf.chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCK_BY_NUM) tagName := spectypes.FUNCTION_TAG_GET_BLOCK_BY_NUM.String() if !ok { - return "", utils.LavaFormatError(tagName+" tag function not found", nil, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) + return nil, "", spectypes.CollectionData{}, "", nil, nil, nil, common.NodeUrl{}, "", utils.LavaFormatError(tagName+" tag function not found", nil, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) } collectionData := apiCollection.CollectionData if parsing.FunctionTemplate == "" { - return "", utils.LavaFormatError(tagName+" missing function template", nil, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) + return nil, "", spectypes.CollectionData{}, "", nil, nil, nil, common.NodeUrl{}, "", utils.LavaFormatError(tagName+" missing function template", nil, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) } path := parsing.ApiName data := []byte(fmt.Sprintf(parsing.FunctionTemplate, blockNum)) chainMessage, err := CraftChainMessage(parsing, collectionData.Type, cf.chainParser, &CraftData{Path: path, Data: data, ConnectionType: collectionData.Type}, cf.ChainFetcherMetadata()) if err != nil { - return "", utils.LavaFormatError(tagName+" failed CraftChainMessage on function template", err, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) + return nil, "", spectypes.CollectionData{}, "", nil, nil, nil, common.NodeUrl{}, "", utils.LavaFormatError(tagName+" failed CraftChainMessage on function template", err, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) } start := time.Now() reply, _, _, proxyUrl, chainId, err := cf.chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil) if err != nil { timeTaken := time.Since(start) - return "", utils.LavaFormatDebug(tagName+" failed sending chainMessage", []utils.Attribute{{Key: "sendTime", Value: timeTaken}, {Key: "error", Value: err}, {Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) + return nil, "", spectypes.CollectionData{}, "", nil, nil, nil, common.NodeUrl{}, "", utils.LavaFormatDebug(tagName+" failed sending chainMessage", []utils.Attribute{{Key: "sendTime", Value: timeTaken}, {Key: "error", Value: err}, {Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...) + } + return parsing, tagName, collectionData, path, data, chainMessage, reply, proxyUrl, chainId, nil +} + +func (cf *ChainFetcher) FetchBlockHashByNum(ctx context.Context, blockNum int64) (string, error) { + parsing, tagName, collectionData, path, data, chainMessage, reply, proxyUrl, chainId, err := cf.fetchSpecificBlock(ctx, blockNum) + if err != nil { + return "", err } parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage) if err != nil { diff --git a/protocol/chainlib/chain_message.go b/protocol/chainlib/chain_message.go index 39c0110c95..b5d211aa04 100644 --- a/protocol/chainlib/chain_message.go +++ b/protocol/chainlib/chain_message.go @@ -12,9 +12,14 @@ import ( spectypes "github.com/lavanet/lava/v4/x/spec/types" ) +type OriginalRaw struct { + Path string + Data []byte +} + type updatableRPCInput interface { rpcInterfaceMessages.GenericMessage - UpdateLatestBlockInMessage(latestBlock uint64, modifyContent bool) (success bool) + UpdateLatestBlockInMessage(latestBlock uint64) (success bool) AppendHeader(metadata []pairingtypes.Metadata) SubscriptionIdExtractor(reply *rpcclient.JsonrpcMessage) string GetRawRequestHash() ([]byte, error) @@ -32,13 +37,17 @@ type baseChainMessageContainer struct { forceCacheRefresh bool parseDirective *spectypes.ParseDirective // setting the parse directive related to the api, can be nil usedDefaultValue bool - - inputHashCache []byte + originalRaw OriginalRaw + inputHashCache []byte // resultErrorParsingMethod passed by each api interface message to parse the result of the message // and validate it doesn't contain a node error resultErrorParsingMethod func(data []byte, httpStatusCode int) (hasError bool, errorMessage string) } +func (bcmc *baseChainMessageContainer) GetOriginal() (path string, data []byte) { + return bcmc.originalRaw.Path, bcmc.originalRaw.Data +} + func (bcmc *baseChainMessageContainer) UpdateEarliestInMessage(incomingEarliest int64) bool { updatedSuccessfully := false if bcmc.earliestRequestedBlock != spectypes.EARLIEST_BLOCK { @@ -130,17 +139,14 @@ func (bcnc baseChainMessageContainer) GetRPCMessage() rpcInterfaceMessages.Gener return bcnc.msg } -func (bcnc *baseChainMessageContainer) UpdateLatestBlockInMessage(latestBlock int64, modifyContent bool) (modifiedOnLatestReq bool) { +func (bcnc *baseChainMessageContainer) UpdateLatestBlockInMessage(latestBlock int64) (modifiedOnLatestReq bool) { requestedBlock, _ := bcnc.RequestedBlock() - if latestBlock <= spectypes.NOT_APPLICABLE || requestedBlock != spectypes.LATEST_BLOCK { + // we disallow setting latest block to 0 or one of the wildcards as an override + if requestedBlock > latestBlock || latestBlock <= 0 { return false } - success := bcnc.msg.UpdateLatestBlockInMessage(uint64(latestBlock), modifyContent) - if success { - bcnc.latestRequestedBlock = latestBlock - return true - } - return false + bcnc.latestRequestedBlock = latestBlock + return true } func (bcnc *baseChainMessageContainer) GetExtensions() []*spectypes.Extension { diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 41c024ae04..2446765c6c 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -83,12 +83,14 @@ type ChainParser interface { ExtensionsParser() *extensionslib.ExtensionParser ExtractDataFromRequest(*http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error) SetResponseFromRelayResult(*common.RelayResult) (*http.Response, error) + SetBlockErrorPattern(string, DataKind) + IdentifyNodeError(message string, kind ...DataKind) (isBlockError bool, blockHeight int64) } type ChainMessage interface { SubscriptionIdExtractor(reply *rpcclient.JsonrpcMessage) string RequestedBlock() (latest int64, earliest int64) - UpdateLatestBlockInMessage(latestBlock int64, modifyContent bool) (modified bool) + UpdateLatestBlockInMessage(latestBlock int64) (modified bool) AppendHeader(metadata []pairingtypes.Metadata) GetExtensions() []*spectypes.Extension OverrideExtensions(extensionNames []string, extensionParser *extensionslib.ExtensionParser) @@ -113,6 +115,7 @@ type ChainMessageForSend interface { GetApiCollection() *spectypes.ApiCollection GetParseDirective() *spectypes.ParseDirective CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string) + GetOriginal() (path string, data []byte) } type HealthReporter interface { diff --git a/protocol/chainlib/chainlib_mock.go b/protocol/chainlib/chainlib_mock.go index 3978d88b83..63fa4e57f5 100644 --- a/protocol/chainlib/chainlib_mock.go +++ b/protocol/chainlib/chainlib_mock.go @@ -320,6 +320,14 @@ func (m *MockChainMessage) CheckResponseError(data []byte, httpStatusCode int) ( return ret0, ret1 } +func (m *MockChainMessage) GetOriginal() (string,[]byte) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOriginal") + ret0, _ := ret[0].(string) + ret1, _ := ret[0].([]byte) + return ret0,ret1 +} + // CheckResponseError indicates an expected call of CheckResponseError. func (mr *MockChainMessageMockRecorder) CheckResponseError(data, httpStatusCode interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() @@ -525,17 +533,16 @@ func (mr *MockChainMessageMockRecorder) TimeoutOverride(arg0 ...interface{}) *go } // UpdateLatestBlockInMessage mocks base method. -func (m *MockChainMessage) UpdateLatestBlockInMessage(latestBlock int64, modifyContent bool) bool { +func (m *MockChainMessage) UpdateLatestBlockInMessage(latestBlock int64) bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateLatestBlockInMessage", latestBlock, modifyContent) + ret := m.ctrl.Call(m, "UpdateLatestBlockInMessage", latestBlock) ret0, _ := ret[0].(bool) return ret0 } -// UpdateLatestBlockInMessage indicates an expected call of UpdateLatestBlockInMessage. -func (mr *MockChainMessageMockRecorder) UpdateLatestBlockInMessage(latestBlock, modifyContent interface{}) *gomock.Call { +func (mr *MockChainMessageMockRecorder) UpdateLatestBlockInMessage(latestBlock interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateLatestBlockInMessage", reflect.TypeOf((*MockChainMessage)(nil).UpdateLatestBlockInMessage), latestBlock, modifyContent) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateLatestBlockInMessage", reflect.TypeOf((*MockChainMessage)(nil).UpdateLatestBlockInMessage), latestBlock) } // MockChainMessageForSend is a mock of ChainMessageForSend interface. diff --git a/protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go b/protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go index 0bdaa8173a..00abe3280b 100644 --- a/protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go +++ b/protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go @@ -82,11 +82,9 @@ func (gm GrpcMessage) GetParams() interface{} { return parsedData } -func (gm *GrpcMessage) UpdateLatestBlockInMessage(latestBlock uint64, modifyContent bool) (success bool) { - // return gm.SetLatestBlockWithHeader(latestBlock, modifyContent) - // disabled due to cosmos sdk inconsistency with the headers that needs to be handled +func (gm *GrpcMessage) UpdateLatestBlockInMessage(latestBlock uint64) (success bool) { + // return gm.SetLatestBlockWithHeader(latestBlock) return false - // when !done: we need a different setter } func (gm GrpcMessage) dynamicResolve() (interface{}, error) { diff --git a/protocol/chainlib/chainproxy/rpcInterfaceMessages/jsonRPCMessage.go b/protocol/chainlib/chainproxy/rpcInterfaceMessages/jsonRPCMessage.go index 59148df36d..a8169bf90f 100644 --- a/protocol/chainlib/chainproxy/rpcInterfaceMessages/jsonRPCMessage.go +++ b/protocol/chainlib/chainproxy/rpcInterfaceMessages/jsonRPCMessage.go @@ -59,6 +59,11 @@ func (jm JsonrpcMessage) CheckResponseError(data []byte, httpStatusCode int) (ha if result.Error == nil { // no error return false, "" } + if result.Error.Data != nil { + if st, ok := result.Error.Data.(string); ok && st != "" { + return true, result.Error.Message + ",data: " + st + } + } return result.Error.Message != "", result.Error.Message } @@ -110,7 +115,7 @@ func ConvertBatchElement(batchElement rpcclient.BatchElemWithId) (JsonrpcMessage return msg, nil } -func (jm *JsonrpcMessage) UpdateLatestBlockInMessage(latestBlock uint64, modifyContent bool) (success bool) { +func (jm *JsonrpcMessage) UpdateLatestBlockInMessage(latestBlock uint64) (success bool) { return false } @@ -191,7 +196,7 @@ func (jbm JsonrpcBatchMessage) GetRawRequestHash() ([]byte, error) { return nil, WontCalculateBatchHash } -func (jbm *JsonrpcBatchMessage) UpdateLatestBlockInMessage(latestBlock uint64, modifyContent bool) (success bool) { +func (jbm *JsonrpcBatchMessage) UpdateLatestBlockInMessage(latestBlock uint64) (success bool) { return false } diff --git a/protocol/chainlib/chainproxy/rpcInterfaceMessages/restMessage.go b/protocol/chainlib/chainproxy/rpcInterfaceMessages/restMessage.go index 8d3da122fd..71b08a8caa 100644 --- a/protocol/chainlib/chainproxy/rpcInterfaceMessages/restMessage.go +++ b/protocol/chainlib/chainproxy/rpcInterfaceMessages/restMessage.go @@ -84,11 +84,10 @@ func (rm RestMessage) GetParams() interface{} { return parameters } -func (rm *RestMessage) UpdateLatestBlockInMessage(latestBlock uint64, modifyContent bool) (success bool) { - // return rm.SetLatestBlockWithHeader(latestBlock, modifyContent) +func (rm *RestMessage) UpdateLatestBlockInMessage(latestBlock uint64) (success bool) { + // return rm.SetLatestBlockWithHeader(latestBlock) // removed until behavior inconsistency with the cosmos sdk header is solved return false - // if !done else we need a different setter } // GetResult will be deprecated after we remove old client diff --git a/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go b/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go index f370f00cd0..4f4560bcb4 100644 --- a/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go +++ b/protocol/chainlib/chainproxy/rpcInterfaceMessages/tendermintRPCMessage.go @@ -78,6 +78,11 @@ func (jm TendermintrpcMessage) CheckResponseError(data []byte, httpStatusCode in } return false, "" } + if result.Error.Data != nil { + if st, ok := result.Error.Data.(string); ok && st != "" { + return true, result.Error.Message + ",data: " + st + } + } return result.Error.Message != "", result.Error.Message } diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index 08a7374b4a..14e80257e2 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -43,6 +43,13 @@ var ( NoActiveSubscriptionFound = sdkerrors.New("failed finding an active subscription on provider side", 1016, "no active subscriptions for hashed params.") ) +type DataKind int + +const ( + EARLIEST DataKind = iota + LATEST +) + type RelayReplyWrapper struct { StatusCode int RelayReply *pairingtypes.RelayReply diff --git a/protocol/chainlib/common_test.go b/protocol/chainlib/common_test.go index 762f05f74d..4d3c4f48b6 100644 --- a/protocol/chainlib/common_test.go +++ b/protocol/chainlib/common_test.go @@ -323,7 +323,7 @@ func (m *mockRPCInput) GetResult() json.RawMessage { return nil } -func (m *mockRPCInput) UpdateLatestBlockInMessage(uint64, bool) bool { +func (m *mockRPCInput) UpdateLatestBlockInMessage(uint64) bool { return false } diff --git a/protocol/chainlib/common_test_utils.go b/protocol/chainlib/common_test_utils.go index 9b05bdd6c8..1d602d187b 100644 --- a/protocol/chainlib/common_test_utils.go +++ b/protocol/chainlib/common_test_utils.go @@ -148,6 +148,24 @@ func CreateChainLibMocks( Geolocation: 1, NodeUrls: []common.NodeUrl{}, } + policy := &plantypes.Policy{ + ChainPolicies: []plantypes.ChainPolicy{ + { + ChainId: spec.Index, + Apis: []string{}, + Requirements: []plantypes.ChainRequirement{ + { + Collection: spectypes.CollectionData{ + ApiInterface: apiInterface, + }, + Extensions: services, + Mixed: true, + }, + }, + }, + }, + } + chainParser.SetPolicy(policy, spec.Index, apiInterface) addons, extensions, err := chainParser.SeparateAddonsExtensions(services) if err != nil { return nil, nil, nil, nil, nil, err @@ -171,7 +189,10 @@ func CreateChainLibMocks( endpoint.NodeUrls = append(endpoint.NodeUrls, common.NodeUrl{Url: lis.Addr().String(), Addons: addons}) allCombinations := generateCombinations(extensions) for _, extensionsList := range allCombinations { - endpoint.NodeUrls = append(endpoint.NodeUrls, common.NodeUrl{Url: lis.Addr().String(), Addons: append(addons, extensionsList...)}) + nodeUrl := common.NodeUrl{Url: lis.Addr().String(), Addons: append(addons, extensionsList...)} + // this is used to identify this header in the handler + nodeUrl.AuthConfig = common.AuthConfig{AuthHeaders: map[string]string{"Addon": strings.Join(extensionsList, ",")}} + endpoint.NodeUrls = append(endpoint.NodeUrls, nodeUrl) } go func() { service := myServiceImplementation{serverCallback: httpServerCallback} diff --git a/protocol/chainlib/grpc.go b/protocol/chainlib/grpc.go index 081941df2b..1912f5587f 100644 --- a/protocol/chainlib/grpc.go +++ b/protocol/chainlib/grpc.go @@ -117,7 +117,7 @@ func (apip *GrpcChainParser) CraftMessage(parsing *spectypes.ParseDirective, con } parsedInput := &parser.ParsedInput{} parsedInput.SetBlock(spectypes.NOT_APPLICABLE) - return apip.newChainMessage(apiCont.api, parsedInput, grpcMessage, apiCollection), nil + return apip.newChainMessage(apiCont.api, parsedInput, grpcMessage, apiCollection, OriginalRaw{Path: grpcMessage.Path, Data: grpcMessage.Msg}), nil } // ParseMsg parses message data into chain message object @@ -172,12 +172,12 @@ func (apip *GrpcChainParser) ParseMsg(url string, data []byte, connectionType st } } - nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &grpcMessage, apiCollection) + nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &grpcMessage, apiCollection, OriginalRaw{Data: data, Path: url}) apip.BaseChainParser.ExtensionParsing(apiCollection.CollectionData.AddOn, nodeMsg, extensionInfo) return nodeMsg, apip.BaseChainParser.Validate(nodeMsg) } -func (*GrpcChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, grpcMessage *rpcInterfaceMessages.GrpcMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer { +func (*GrpcChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, grpcMessage *rpcInterfaceMessages.GrpcMessage, apiCollection *spectypes.ApiCollection, originalRaw OriginalRaw) *baseChainMessageContainer { requestedBlock := parsedInput.GetBlock() requestedHashes, _ := parsedInput.GetBlockHashes() nodeMsg := &baseChainMessageContainer{ @@ -189,6 +189,7 @@ func (*GrpcChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser. resultErrorParsingMethod: grpcMessage.CheckResponseError, parseDirective: GetParseDirective(api, apiCollection), usedDefaultValue: parsedInput.UsedDefaultValue, + originalRaw: originalRaw, } return nodeMsg } diff --git a/protocol/chainlib/grpc_test.go b/protocol/chainlib/grpc_test.go index cd3404b2b6..102ecaa0b9 100644 --- a/protocol/chainlib/grpc_test.go +++ b/protocol/chainlib/grpc_test.go @@ -279,7 +279,7 @@ func TestSettingBlocksHeadersGrpc(t *testing.T) { require.NoError(t, err) requestedBlock, _ := chainMessage.RequestedBlock() require.Equal(t, test.requestedBlock, requestedBlock) - chainMessage.UpdateLatestBlockInMessage(test.block, true) // will update the request only if it's latest + chainMessage.UpdateLatestBlockInMessage(test.block) // will update the request only if it's latest requestedBlock, _ = chainMessage.RequestedBlock() require.Equal(t, test.block, requestedBlock) reply, _, _, _, _, err := chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil) diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index f19dc71d95..f12f20d5b4 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -93,7 +93,12 @@ func (apip *JsonRPCChainParser) CraftMessage(parsing *spectypes.ParseDirective, if err != nil { return nil, err } - return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, msg, apiCollection, false), nil + originalRaw := OriginalRaw{Path: "", Data: []byte{}} + data, err := json.Marshal(msg) + if err == nil { + originalRaw.Data = data + } + return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, msg, apiCollection, false, originalRaw), nil } // this func parses message data into chain message object @@ -216,9 +221,9 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType var nodeMsg *baseChainMessageContainer if len(msgs) == 1 { - nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &msgs[0], apiCollection, parsedDefault) + nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &msgs[0], apiCollection, parsedDefault, OriginalRaw{Path: url, Data: data}) } else { - nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault) + nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault, OriginalRaw{Path: url, Data: data}) if err != nil { return nil, err } @@ -227,7 +232,7 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType return nodeMsg, apip.BaseChainParser.Validate(nodeMsg) } -func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedBlockHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) (*baseChainMessageContainer, error) { +func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedBlockHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) (*baseChainMessageContainer, error) { batchMessage, err := rpcInterfaceMessages.NewBatchMessage(msgs) if err != nil { return nil, err @@ -242,11 +247,12 @@ func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, reque resultErrorParsingMethod: rpcInterfaceMessages.CheckResponseErrorForJsonRpcBatch, parseDirective: nil, usedDefaultValue: usedDefaultValue, + originalRaw: originalRaw, } return nodeMsg, err } -func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedBlockHashes []string, msg *rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) *baseChainMessageContainer { +func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedBlockHashes []string, msg *rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) *baseChainMessageContainer { nodeMsg := &baseChainMessageContainer{ api: serviceApi, apiCollection: apiCollection, @@ -256,6 +262,7 @@ func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedB resultErrorParsingMethod: msg.CheckResponseError, parseDirective: GetParseDirective(serviceApi, apiCollection), usedDefaultValue: usedDefaultValue, + originalRaw: originalRaw, } return nodeMsg } diff --git a/protocol/chainlib/protocol_message.go b/protocol/chainlib/protocol_message.go index 9a054d456d..c8c012c5cc 100644 --- a/protocol/chainlib/protocol_message.go +++ b/protocol/chainlib/protocol_message.go @@ -15,11 +15,17 @@ type UserData struct { DappId string } +type hashedRequestResponse struct { + hashed []byte + outputFormatter func([]byte) []byte +} + type BaseProtocolMessage struct { ChainMessage directiveHeaders map[string]string relayRequestData *pairingtypes.RelayPrivateData userData common.UserData + hashedRequest hashedRequestResponse } func (bpm *BaseProtocolMessage) GetUserData() common.UserData { @@ -35,7 +41,17 @@ func (bpm *BaseProtocolMessage) RelayPrivateData() *pairingtypes.RelayPrivateDat } func (bpm *BaseProtocolMessage) HashCacheRequest(chainId string) ([]byte, func([]byte) []byte, error) { - return HashCacheRequest(bpm.relayRequestData, chainId) + if bpm == nil { + return nil, nil, nil + } + if bpm.hashedRequest.hashed != nil && bpm.hashedRequest.outputFormatter != nil { + return bpm.hashedRequest.hashed, bpm.hashedRequest.outputFormatter, nil + } + hash, formatter, err := HashCacheRequest(bpm.relayRequestData, chainId) + if err == nil { + bpm.hashedRequest = hashedRequestResponse{hash, formatter} + } + return hash, formatter, err } // addMissingExtensions adds any extensions from updatedProtocolExtensions that are not in currentPrivateDataExtensions diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index f9fa524106..ba4f718de2 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -75,7 +75,7 @@ func (apip *RestChainParser) CraftMessage(parsing *spectypes.ParseDirective, con } parsedInput := parser.NewParsedInput() parsedInput.SetBlock(spectypes.NOT_APPLICABLE) - return apip.newChainMessage(apiCont.api, parsedInput, restMessage, apiCollection), nil + return apip.newChainMessage(apiCont.api, parsedInput, restMessage, apiCollection, OriginalRaw{Path: restMessage.Path, Data: restMessage.Msg}), nil } // ParseMsg parses message data into chain message object @@ -132,12 +132,12 @@ func (apip *RestChainParser) ParseMsg(urlPath string, data []byte, connectionTyp } } - nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &restMessage, apiCollection) + nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &restMessage, apiCollection, OriginalRaw{Data: data, Path: urlPath}) apip.BaseChainParser.ExtensionParsing(apiCollection.CollectionData.AddOn, nodeMsg, extensionInfo) return nodeMsg, apip.BaseChainParser.Validate(nodeMsg) } -func (*RestChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, restMessage *rpcInterfaceMessages.RestMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer { +func (*RestChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, restMessage *rpcInterfaceMessages.RestMessage, apiCollection *spectypes.ApiCollection, originalRaw OriginalRaw) *baseChainMessageContainer { requestedBlock := parsedInput.GetBlock() requestedHashes, _ := parsedInput.GetBlockHashes() nodeMsg := &baseChainMessageContainer{ @@ -149,6 +149,7 @@ func (*RestChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser. resultErrorParsingMethod: restMessage.CheckResponseError, parseDirective: GetParseDirective(api, apiCollection), usedDefaultValue: parsedInput.UsedDefaultValue, + originalRaw: originalRaw, } return nodeMsg } diff --git a/protocol/chainlib/rest_test.go b/protocol/chainlib/rest_test.go index 18370af01a..0069239e55 100644 --- a/protocol/chainlib/rest_test.go +++ b/protocol/chainlib/rest_test.go @@ -284,7 +284,7 @@ func TestSettingRequestedBlocksHeadersRest(t *testing.T) { require.NoError(t, err) latestReqBlock, _ := chainMessage.RequestedBlock() require.Equal(t, test.requestedBlock, latestReqBlock) - chainMessage.UpdateLatestBlockInMessage(test.block, true) // will update the block only if it's a latest request + chainMessage.UpdateLatestBlockInMessage(test.block) // will update the block only if it's a latest request latestReqBlock, _ = chainMessage.RequestedBlock() require.Equal(t, test.block, latestReqBlock) // expected behavior is that it doesn't change the original requested block reply, _, _, _, _, err := chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil) diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index 3bb8867095..18e0055e3b 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -90,7 +90,12 @@ func (apip *TendermintChainParser) CraftMessage(parsing *spectypes.ParseDirectiv return nil, err } tenderMsg := rpcInterfaceMessages.TendermintrpcMessage{JsonrpcMessage: msg, Path: parsing.ApiName} - return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, &tenderMsg, apiCollection, false), nil + originalRaw := OriginalRaw{Path: tenderMsg.Path, Data: []byte{}} + data, err := json.Marshal(msg) + if err == nil { + originalRaw.Data = data + } + return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, &tenderMsg, apiCollection, false, originalRaw), nil } // ParseMsg parses message data into chain message object @@ -243,10 +248,10 @@ func (apip *TendermintChainParser) ParseMsg(urlPath string, data []byte, connect if !isJsonrpc { tenderMsg.Path = urlPath // add path } - nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &tenderMsg, apiCollection, parsedDefault) + nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &tenderMsg, apiCollection, parsedDefault, OriginalRaw{Data: data, Path: urlPath}) } else { var err error - nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault) + nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault, OriginalRaw{Data: data, Path: urlPath}) if err != nil { return nil, err } @@ -256,7 +261,7 @@ func (apip *TendermintChainParser) ParseMsg(urlPath string, data []byte, connect return nodeMsg, apip.BaseChainParser.Validate(nodeMsg) } -func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) (*baseChainMessageContainer, error) { +func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) (*baseChainMessageContainer, error) { batchMessage, err := rpcInterfaceMessages.NewBatchMessage(msgs) if err != nil { return nil, err @@ -271,6 +276,7 @@ func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, re resultErrorParsingMethod: rpcInterfaceMessages.CheckResponseErrorForJsonRpcBatch, parseDirective: GetParseDirective(serviceApi, apiCollection), usedDefaultValue: usedDefaultValue, + originalRaw: originalRaw, } return nodeMsg, err } @@ -281,7 +287,7 @@ func (apip *TendermintChainParser) ExtractDataFromRequest(request *http.Request) return url, data, "", metadata, err } -func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedHashes []string, msg *rpcInterfaceMessages.TendermintrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) *baseChainMessageContainer { +func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedHashes []string, msg *rpcInterfaceMessages.TendermintrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool, originalRaw OriginalRaw) *baseChainMessageContainer { nodeMsg := &baseChainMessageContainer{ api: serviceApi, apiCollection: apiCollection, @@ -291,6 +297,7 @@ func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, request resultErrorParsingMethod: msg.CheckResponseError, parseDirective: GetParseDirective(serviceApi, apiCollection), usedDefaultValue: usedDefaultValue, + originalRaw: originalRaw, } return nodeMsg } diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index c1a50554a6..3fe07c03f4 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -12,7 +12,6 @@ import ( "os" "strconv" "strings" - "sync" "sync/atomic" "testing" "time" @@ -301,15 +300,16 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc } type rpcProviderOptions struct { - consumerAddress string - specId string - apiInterface string - listenAddress string - account sigs.Account - lavaChainID string - addons []string - providerUniqueId string - cacheListenAddress string + consumerAddress string + specId string + apiInterface string + listenAddress string + account sigs.Account + lavaChainID string + addons []string + providerUniqueId string + cacheListenAddress string + providerSideInterceptor func(input interface{}) // this function will be called on each relay request } func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpcProviderOptions) (*rpcprovider.RPCProviderServer, *lavasession.RPCProviderEndpoint, *ReplySetter, *MockChainFetcher, *MockReliabilityManager) { @@ -404,8 +404,14 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc mockReliabilityManager := NewMockReliabilityManager(reliabilityManager) rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, nil, numberOfRetriesOnNodeErrorsProviderSide) listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health") - err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) - require.NoError(t, err) + if rpcProviderOptions.providerSideInterceptor != nil { + relayReceiverInterceptor := rpcprovider.NewRelayReceiverInterceptor(rpcProviderServer, rpcProviderOptions.providerSideInterceptor) + err = listener.RegisterReceiver(relayReceiverInterceptor, rpcProviderEndpoint) + require.NoError(t, err) + } else { + err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) + require.NoError(t, err) + } chainParser.Activate() chainTracker.RegisterForBlockTimeUpdates(chainParser) providerUp := checkGrpcServerStatusWithTimeout(rpcProviderEndpoint.NetworkAddress.Address, time.Millisecond*261) @@ -1409,14 +1415,12 @@ func TestSameProviderConflictReport(t *testing.T) { lavaChainID: lavaChainID, } rpcConsumerOut := createRpcConsumer(t, ctx, rpcConsumerOptions) - + conflict := make(chan bool) conflictSent := false - wg := sync.WaitGroup{} - wg.Add(1) txConflictDetectionMock := func(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error { if finalizationConflict == nil { - wg.Done() require.FailNow(t, "Finalization conflict should not be nil") + conflict <- true return nil } utils.LavaFormatDebug("@@@@@@@@@@@@@@@ Called conflict mock tx", utils.LogAttr("provider0", finalizationConflict.RelayFinalization_0.RelaySession.Provider), utils.LogAttr("provider0", finalizationConflict.RelayFinalization_1.RelaySession.Provider)) @@ -1428,8 +1432,8 @@ func TestSameProviderConflictReport(t *testing.T) { if finalizationConflict.RelayFinalization_0.RelaySession.Provider != providers[0].account.Addr.String() { require.FailNow(t, "Finalization conflict provider address is not the provider address") } - wg.Done() conflictSent = true + conflict <- false return nil } rpcConsumerOut.mockConsumerStateTracker.SetTxConflictDetectionWrapper(txConflictDetectionMock) @@ -1459,7 +1463,11 @@ func TestSameProviderConflictReport(t *testing.T) { require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) // conflict calls happen concurrently, therefore we need to wait the call. - wg.Wait() + select { + case <-time.After(3 * time.Minute): + require.FailNow(t, "timeout waiting for conflict") + case <-conflict: + } require.True(t, conflictSent) }) @@ -2163,3 +2171,153 @@ func TestArchiveProvidersRetryOnParsedHash(t *testing.T) { }) } } + +func TestUnconfiguredApiWithArchiveRequest(t *testing.T) { + playbook := []struct { + name string + apiInterface string + errorFormat string + managedToParseBlock bool + reqFormat string + }{ + { + name: "tendermint", + apiInterface: spectypes.APIInterfaceTendermintRPC, + errorFormat: `{"jsonrpc":"2.0","id":1,"error":{"code":-32603,"message":"Internal error","data":"height %d must be less than or equal to the current blockchain height %d"}}`, + managedToParseBlock: true, + reqFormat: `{"jsonrpc":"2.0","method":"block_undefined","params":[123],"id":1}`, + }, + { + name: "tendermint_empty", + apiInterface: spectypes.APIInterfaceTendermintRPC, + errorFormat: `{"jsonrpc":"2.0","id":1,"result":null}}`, + managedToParseBlock: false, + reqFormat: `{"jsonrpc":"2.0","method":"block_undefined","params":[123],"id":1}`, + }, + // {"code":2,"message":"height 1 is not available, lowest height is 340778","details":[]} + } + for _, play := range playbook { + t.Run("unconfiguredApiWithArchiveRequest", func(t *testing.T) { + ctx := context.Background() + // can be any spec and api interface + specId := "LAV1" + apiInterface := play.apiInterface + epoch := uint64(100) + lavaChainID := "lava" + numProviders := 2 + + consumerListenAddress := addressGen.GetAddress() + + type providerData struct { + account sigs.Account + endpoint *lavasession.RPCProviderEndpoint + server *rpcprovider.RPCProviderServer + replySetter *ReplySetter + mockChainFetcher *MockChainFetcher + mockReliabilityManager *MockReliabilityManager + } + providers := []providerData{} + + for i := 0; i < numProviders; i++ { + account := sigs.GenerateDeterministicFloatingKey(randomizer) + providerDataI := providerData{account: account} + providers = append(providers, providerDataI) + } + consumerAccount := sigs.GenerateDeterministicFloatingKey(randomizer) + archiveCalled := make(chan bool, 10) + for i := 0; i < numProviders; i++ { + ctx := context.Background() + providerDataI := providers[i] + listenAddress := addressGen.GetAddress() + addons := []string(nil) + if i == 0 { + // one provider will have archive + addons = []string{"archive"} + } + + rpcProviderOptions := rpcProviderOptions{ + consumerAddress: consumerAccount.Addr.String(), + specId: specId, + apiInterface: apiInterface, + listenAddress: listenAddress, + account: providerDataI.account, + lavaChainID: lavaChainID, + addons: addons, + providerUniqueId: fmt.Sprintf("provider%d", i), + } + providers[i].server, providers[i].endpoint, providers[i].replySetter, providers[i].mockChainFetcher, providers[i].mockReliabilityManager = createRpcProvider(t, ctx, rpcProviderOptions) + providers[i].replySetter.handler = func(req []byte, header http.Header) (data []byte, status int) { + if bytes.Equal(req, []byte("{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"block\",\"params\":[\"9223372036854775807\"]}")) { + return []byte(fmt.Sprintf(play.errorFormat, 9223372036854775807, 1000)), 500 + } + if bytes.Equal(req, []byte("{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"block\",\"params\":[\"9223372036854775806\"]}")) { + return []byte(fmt.Sprintf(play.errorFormat, 9223372036854775806, 1001)), 500 + } + for key, val := range header { + if key == "Addon" { + if val[0] == "archive" { + archiveCalled <- true + } + } + } + return []byte(`{"jsonrpc":"2.0","id":1,"error":{"code":-32603,"message":"Internal error","data":"height 1 must be less than or equal to the current blockchain height 1002"}}`), 500 + } + } + + pairingList := map[uint64]*lavasession.ConsumerSessionsWithProvider{} + for i := 0; i < numProviders; i++ { + extensions := map[string]struct{}{} + if i == 0 { + extensions = map[string]struct{}{"archive": {}} + } + pairingList[uint64(i)] = &lavasession.ConsumerSessionsWithProvider{ + PublicLavaAddress: providers[i].account.Addr.String(), + + Endpoints: []*lavasession.Endpoint{ + { + NetworkAddress: providers[i].endpoint.NetworkAddress.Address, + Enabled: true, + Geolocation: 1, + Extensions: extensions, + }, + }, + Sessions: map[int64]*lavasession.SingleConsumerSession{}, + MaxComputeUnits: 10000, + UsedComputeUnits: 0, + PairingEpoch: epoch, + } + } + + rpcConsumerOptions := rpcConsumerOptions{ + specId: specId, + apiInterface: apiInterface, + account: consumerAccount, + consumerListenAddress: consumerListenAddress, + epoch: epoch, + pairingList: pairingList, + requiredResponses: 1, + lavaChainID: lavaChainID, + } + rpcConsumerOut := createRpcConsumer(t, ctx, rpcConsumerOptions) + require.NotNil(t, rpcConsumerOut.rpcConsumerServer) + success := rpcConsumerOut.rpcConsumerServer.ExtractNodeData(ctx, chainlib.LATEST) + require.Equal(t, play.managedToParseBlock, success) + if success { + start := time.Now() + isError, height := rpcConsumerOut.rpcConsumerServer.ChainParser.IdentifyNodeError("Internal error,data: height 777 must be less than or equal to the current blockchain height 1001", chainlib.LATEST) + elapsed := time.Since(start) + require.LessOrEqual(t, elapsed.Microseconds(), int64(5000), "IdentifyNodeError took too long", elapsed) + require.True(t, isError) + require.Equal(t, int64(777), height) + isError, height = rpcConsumerOut.rpcConsumerServer.ChainParser.IdentifyNodeError("Internal error,data: height 778 must be less than or equal to the current blockchain height 5555555555551001", chainlib.LATEST) + require.True(t, isError) + require.Equal(t, int64(778), height) + isError, _ = rpcConsumerOut.rpcConsumerServer.ChainParser.IdentifyNodeError("Internal error,data: height 778 must be less than Banana or equal to the current blockchain height 5555555555551002", chainlib.LATEST) + require.False(t, isError) + } else { + isError, _ := rpcConsumerOut.rpcConsumerServer.ChainParser.IdentifyNodeError("Internal error,data: height 777 must be less than or equal to the current blockchain height 1001", chainlib.LATEST) + require.False(t, isError) + } + }) + } +} diff --git a/protocol/parser/misc.go b/protocol/parser/misc.go index 8e69e321ed..fc81be293e 100644 --- a/protocol/parser/misc.go +++ b/protocol/parser/misc.go @@ -1,8 +1,130 @@ package parser +import ( + "fmt" + "strings" + + "github.com/lavanet/lava/v4/utils" + "github.com/sergi/go-diff/diffmatchpatch" +) + +const ( + MinLettersForPattern = 10 + wildcard = "%*s" +) + func CapStringLen(inp string) string { if len(inp) > 250 { return inp[:150] + "...Truncated..." + inp[len(inp)-100:] } return inp } + +func isDigitOrLetter(r rune) bool { + return (r >= '0' && r <= '9') || (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') +} + +func ParseNumberFromPattern(pattern string, input string) (fitsPattern bool, parsedHeight int64) { + if len(pattern) > 0 { + percentIndex := strings.Index(pattern, "%") + if percentIndex == -1 { + return false, 0 + } + if pattern[:percentIndex] != input[:percentIndex] { + return false, 0 + } + } else { + // too short to be a pattern + return false, 0 + } + dmp := diffmatchpatch.New() + nextOneFormat := "" + diffs := dmp.DiffMain(pattern, CapStringLen(input), false) + for _, diff := range diffs { + if diff.Type == diffmatchpatch.DiffEqual { + nextOneFormat = "" + continue + } + if diff.Type == diffmatchpatch.DiffDelete { + if (len(diff.Text) == 2 || len(diff.Text) == 3) && diff.Text[0] == '%' { + nextOneFormat = diff.Text + } else { + // something is different other than %s + return false, 0 + } + } else if diff.Type == diffmatchpatch.DiffInsert { + if nextOneFormat == "%*s" { + // skip these + nextOneFormat = "" + } else if nextOneFormat != "" { + var err error + _, err = fmt.Sscanf(diff.Text, nextOneFormat, &parsedHeight) + if err != nil { + utils.LavaFormatError("invalid height in pattern", err, utils.Attribute{Key: "diff.Text", Value: diff.Text}, utils.Attribute{Key: "pattern", Value: pattern}, utils.Attribute{Key: "input", Value: input}) + return false, 0 + } + nextOneFormat = "" + } else { + // something is different other than %s + return false, 0 + } + } + } + if parsedHeight == 0 { + return false, 0 + } + return true, parsedHeight +} + +func LongestCommonSubsequenceWithFormat(s1, s2 string) (string, int) { + lettersDiff := 0 + dmp := diffmatchpatch.New() + diffs := dmp.DiffMain(s1, s2, false) + stringsArr := []string{} + + for _, diff := range diffs { + if diff.Type == diffmatchpatch.DiffEqual { + text := diff.Text + // trim digit or number elements right after a wildcard + if len(stringsArr) > 0 { + last_element := stringsArr[len(stringsArr)-1] + if last_element == wildcard { + trim := 0 + for _, element := range text { + if !isDigitOrLetter(element) { + break + } + trim++ + } + if trim > 0 { + text = text[trim:] + } + } + } + stringsArr = append(stringsArr, text) + lettersDiff += len(text) + } else { + if len(stringsArr) > 0 { + last_element := stringsArr[len(stringsArr)-1] + // ignore duplicate wildcards + if last_element == wildcard { + continue + } + // trim digit or number elements right before the wildcard + trim := 0 + for i := len(last_element) - 1; i >= 0; i-- { + if !isDigitOrLetter(rune(last_element[i])) { + break + } + trim++ + } + if trim > 0 { + stringsArr[len(stringsArr)-1] = last_element[:len(last_element)-trim] + lettersDiff -= trim + } + } + stringsArr = append(stringsArr, "%*s") + } + } + return strings.Join(stringsArr, ""), lettersDiff +} diff --git a/protocol/parser/misc_test.go b/protocol/parser/misc_test.go new file mode 100644 index 0000000000..9140dc6a1b --- /dev/null +++ b/protocol/parser/misc_test.go @@ -0,0 +1,97 @@ +package parser + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLongestCommonSubsequenceWithFormat(t *testing.T) { + playbook := []struct { + s1 string + s2 string + expected string + len int + }{ + { + s1: "123", + s2: "123", + expected: "123", + len: 3, + }, + { + s1: "123 4", + s2: "123 ", + expected: "123 %*s", + len: 4, + }, + { + s1: "123 ", + s2: "123 4", + expected: "123 %*s", + len: 4, + }, + { + s1: "123 5", + s2: "123 4", + expected: "123 %*s", + len: 4, + }, + { // make sure the entire word is marked as a wildcard + s1: "1235", + s2: "1234", + expected: "%*s", + len: 0, + }, + { // make sure partial matches don't break it + s1: "123 456 789", + s2: "123 457 789", + expected: "123 %*s 789", + len: 8, + }, + { + s1: "height 1234567 is too high current height is 01230123", + s2: "height 1234566 is too high current height is 01230124", + expected: "height %*s is too high current height is %*s", + len: 38, + }, + { + s1: "1234567 is too high current height is 01230123 bro", + s2: "1234566 is too high current height is 01230124 bro", + expected: "%*s is too high current height is %*s bro", + len: 35, + }, + { + s1: "1234567 is too high current height is 01230123 bro", + s2: "1234566 is too high current height is 01230124 nisan", + expected: "%*s is too high current height is %*s %*s", + len: 32, + }, + { + s1: "yes5no", + s2: "yes6no", + expected: "%*s", + len: 0, + }, + { + s1: "yes5no ", + s2: "yes6no ", + expected: "%*s ", + len: 1, + }, + { + s1: "yes 5.no ", + s2: "yes 6.no ", + expected: "yes %*s.no ", + len: 8, + }, + } + for idx, play := range playbook { + t.Run("Test "+strconv.Itoa(idx), func(t *testing.T) { + expected, len := LongestCommonSubsequenceWithFormat(play.s1, play.s2) + require.Equal(t, play.expected, expected) + require.Equal(t, play.len, len) + }) + } +} diff --git a/protocol/rpcconsumer/consumer_relay_state_machine.go b/protocol/rpcconsumer/consumer_relay_state_machine.go index b45e82a502..5bff10c9de 100644 --- a/protocol/rpcconsumer/consumer_relay_state_machine.go +++ b/protocol/rpcconsumer/consumer_relay_state_machine.go @@ -44,6 +44,14 @@ type ConsumerRelaySender interface { consumerIp string, metadata []pairingtypes.Metadata, ) (protocolMessage chainlib.ProtocolMessage, err error) + GetEndpoint() *lavasession.RPCEndpoint + UpdateProtocolMessageIfNeededWithNewData( + ctx context.Context, + relayState *RelayState, + protocolMessage chainlib.ProtocolMessage, + newEarliestBlockRequested int64, + dataKind chainlib.DataKind, + ) chainlib.ProtocolMessage } type tickerMetricSetterInf interface { @@ -133,9 +141,9 @@ func (crsm *ConsumerRelayStateMachine) stateTransition(relayState *RelayState, n batchNumber := crsm.usedProviders.BatchNumber() var nextState *RelayState if relayState == nil { // initial state - nextState = NewRelayState(crsm.ctx, crsm.protocolMessage, 0, crsm.relayRetriesManager, crsm.relaySender, &ArchiveStatus{}) + nextState = NewRelayState(crsm.ctx, crsm.protocolMessage, 0, crsm.relayRetriesManager, crsm.relaySender, &ArchiveStatus{}, TransitionData{}) } else { - nextState = NewRelayState(crsm.ctx, crsm.GetProtocolMessage(), relayState.GetStateNumber()+1, crsm.relayRetriesManager, crsm.relaySender, relayState.archiveStatus.Copy()) + nextState = NewRelayState(crsm.ctx, crsm.GetProtocolMessage(), relayState.GetStateNumber()+1, crsm.relayRetriesManager, crsm.relaySender, relayState.archiveStatus.Copy(), relayState.GetTransitionData()) nextState.upgradeToArchiveIfNeeded(batchNumber, numberOfNodeErrors) } crsm.appendRelayState(nextState) diff --git a/protocol/rpcconsumer/consumer_relay_state_machine_test.go b/protocol/rpcconsumer/consumer_relay_state_machine_test.go index a6804c56fc..d45d20315b 100644 --- a/protocol/rpcconsumer/consumer_relay_state_machine_test.go +++ b/protocol/rpcconsumer/consumer_relay_state_machine_test.go @@ -44,6 +44,47 @@ func (a PolicySt) GetSupportedExtensions(string) ([]epochstoragetypes.EndpointSe type ConsumerRelaySenderMock struct { retValue error tickerValue time.Duration + ChainParser chainlib.ChainParser +} + +func (crsm *ConsumerRelaySenderMock) UpdateProtocolMessageIfNeededWithNewData( + ctx context.Context, + relayState *RelayState, + protocolMessage chainlib.ProtocolMessage, + newEarliestBlockRequested int64, + dataKind chainlib.DataKind, +) chainlib.ProtocolMessage { + if newEarliestBlockRequested != spectypes.NOT_APPLICABLE { + if !relayState.GetIsEarliestUsed() || dataKind == chainlib.LATEST { + // we can't make changes in the protocol message without creating a new one + if dataKind == chainlib.EARLIEST { + // if We got a earliest block data from cache, we need to create a new protocol message with the new earliest block hash parsed + // and update the extension rules with the new earliest block data as it might be archive. + // Setting earliest used to attempt this only once. + relayState.SetIsEarliestUsed() + } + relayRequestData := protocolMessage.RelayPrivateData() + if dataKind == chainlib.EARLIEST { + addon := chainlib.GetAddon(protocolMessage) + if crsm.ChainParser == nil { + panic("ChainParser is nil") + } + extensionAdded := protocolMessage.UpdateEarliestAndValidateExtensionRules(crsm.ChainParser.ExtensionsParser(), newEarliestBlockRequested, addon, relayRequestData.SeenBlock) + if extensionAdded && relayState.CheckIsArchive(protocolMessage.RelayPrivateData()) { + relayState.SetIsArchive(true) + } + } else if dataKind == chainlib.LATEST { + protocolMessage.UpdateLatestBlockInMessage(newEarliestBlockRequested) + } + relayState.SetProtocolMessage(protocolMessage) + return protocolMessage + } + } + return protocolMessage +} + +func (crsm *ConsumerRelaySenderMock) GetEndpoint() *lavasession.RPCEndpoint { + return nil } func (crsm *ConsumerRelaySenderMock) getProcessingTimeout(chainMessage chainlib.ChainMessage) (processingTimeout time.Duration, relayTimeout time.Duration) { @@ -288,7 +329,7 @@ func TestConsumerStateMachineArchiveRetry(t *testing.T) { require.False(t, task.IsDone()) usedProviders.AddUsed(consumerSessionsMap, nil) relayProcessor.UpdateBatch(nil) - sendNodeErrorJsonRpc(relayProcessor, "lava2@test", time.Millisecond*1) + sendNodeErrorJsonRpc(relayProcessor, "lava2@test", time.Millisecond*1, nil) case 1: require.False(t, task.IsDone()) require.True(t, @@ -314,3 +355,91 @@ func TestConsumerStateMachineArchiveRetry(t *testing.T) { } }) } + +func TestConsumerStateMachineTransitionData(t *testing.T) { + ctx := context.Background() + serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + w.WriteHeader(http.StatusOK) + }) + specId := "NEAR" + chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, specId, spectypes.APIInterfaceJsonRPC, serverHandler, nil, "../../", nil) + + require.NoError(t, err) + + params, _ := json.Marshal(map[string]interface{}{"block_id": 123}) + id, _ := json.Marshal(1) + reqBody := rpcclient.JsonrpcMessage{ + Version: "2.0", + Method: "block", // Query latest block + Params: params, // Use "final" to get the latest final block + ID: id, + } + + // Convert request to JSON + jsonData, err := json.Marshal(reqBody) + if err != nil { + log.Fatalf("Error marshalling request: %v", err) + } + + chainMsg, err := chainParser.ParseMsg("", jsonData, http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + require.NoError(t, err) + dappId := "dapp" + consumerIp := "123.11" + reqBlock, _ := chainMsg.RequestedBlock() + var seenBlock int64 = 0 + relayRequestData := lavaprotocol.NewRelayData(ctx, http.MethodPost, "", jsonData, seenBlock, reqBlock, spectypes.APIInterfaceJsonRPC, chainMsg.GetRPCMessage().GetHeaders(), chainlib.GetAddon(chainMsg), common.GetExtensionNames(chainMsg.GetExtensions())) + protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, relayRequestData, dappId, consumerIp) + consistency := NewConsumerConsistency(specId) + usedProviders := lavasession.NewUsedProviders(nil) + relayProcessor := NewRelayProcessor( + ctx, + 1, + consistency, + relayProcessorMetrics, + relayProcessorMetrics, + relayRetriesManagerInstance, + NewRelayStateMachine( + ctx, + usedProviders, + &ConsumerRelaySenderMock{retValue: nil, tickerValue: 100 * time.Second}, + protocolMessage, + nil, + false, + relayProcessorMetrics, + )) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + canUse := usedProviders.TryLockSelection(ctx) + require.NoError(t, ctx.Err()) + require.Nil(t, canUse) + require.Zero(t, usedProviders.CurrentlyUsed()) + require.Zero(t, usedProviders.SessionsLatestBatch()) + const transitionDataBlock = 1234 + consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} + relayTaskChannel, err := relayProcessor.GetRelayTaskChannel() + require.NoError(t, err) + taskNumber := 0 + for task := range relayTaskChannel { + switch taskNumber { + case 0: + // first task is to return a node error with an updated transition data + require.False(t, task.IsDone()) + usedProviders.AddUsed(consumerSessionsMap, nil) + relayProcessor.UpdateBatch(nil) // successfully sent messages + task.relayState.SetBlockTransitionData(transitionDataBlock) + sendNodeErrorJsonRpc(relayProcessor, "lava2@test", time.Millisecond*1, nil) + case 1: + require.False(t, task.IsDone()) + // verify this was updated + latest, _ := relayProcessor.GetProtocolMessage().RequestedBlock() + require.Equal(t, int64(transitionDataBlock), latest) + return + } + taskNumber++ + } + if closeServer != nil { + defer closeServer() + } +} diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index 1c7c60205c..2f23666b9a 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -122,13 +122,13 @@ func sendNodeError(relayProcessor *RelayProcessor, provider string, delay time.D relayProcessor.SetResponse(response) } -func sendNodeErrorJsonRpc(relayProcessor *RelayProcessor, provider string, delay time.Duration) { +func sendNodeErrorJsonRpc(relayProcessor *RelayProcessor, provider string, delay time.Duration, data []byte) { time.Sleep(delay) id, _ := json.Marshal(1) res := rpcclient.JsonrpcMessage{ Version: "2.0", ID: id, - Error: &rpcclient.JsonError{Code: 1, Message: "test"}, + Error: &rpcclient.JsonError{Code: 1, Message: "test", Data: data}, } resBytes, _ := json.Marshal(res) diff --git a/protocol/rpcconsumer/relay_state.go b/protocol/rpcconsumer/relay_state.go index 37264979ea..89187e07f2 100644 --- a/protocol/rpcconsumer/relay_state.go +++ b/protocol/rpcconsumer/relay_state.go @@ -9,11 +9,14 @@ import ( "github.com/lavanet/lava/v4/protocol/chainlib" "github.com/lavanet/lava/v4/protocol/chainlib/extensionslib" common "github.com/lavanet/lava/v4/protocol/common" + lavasession "github.com/lavanet/lava/v4/protocol/lavasession" "github.com/lavanet/lava/v4/utils" slices "github.com/lavanet/lava/v4/utils/lavaslices" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" ) +const NumberOfRetriesToUpgradeToArchive = 4 + type RetryHashCacheInf interface { CheckHashInCache(hash string) bool AddHashToCache(hash string) @@ -29,6 +32,14 @@ type RelayParserInf interface { consumerIp string, metadata []pairingtypes.Metadata, ) (protocolMessage chainlib.ProtocolMessage, err error) + GetEndpoint() *lavasession.RPCEndpoint + UpdateProtocolMessageIfNeededWithNewData( + ctx context.Context, + relayState *RelayState, + protocolMessage chainlib.ProtocolMessage, + newEarliestBlockRequested int64, + dataKind chainlib.DataKind, + ) chainlib.ProtocolMessage } type ArchiveStatus struct { @@ -47,6 +58,10 @@ func (as *ArchiveStatus) Copy() *ArchiveStatus { return archiveStatus } +type TransitionData struct { + block int64 +} + type RelayState struct { archiveStatus *ArchiveStatus stateNumber int @@ -55,6 +70,19 @@ type RelayState struct { relayParser RelayParserInf ctx context.Context lock sync.RWMutex + transitionData TransitionData +} + +func (rs *RelayState) GetTransitionData() TransitionData { + rs.lock.RLock() + defer rs.lock.RUnlock() + return rs.transitionData +} + +func (rs *RelayState) SetBlockTransitionData(blockNumber int64) { + rs.lock.Lock() + defer rs.lock.Unlock() + rs.transitionData.block = blockNumber } func GetEmptyRelayState(ctx context.Context, protocolMessage chainlib.ProtocolMessage) *RelayState { @@ -67,7 +95,7 @@ func GetEmptyRelayState(ctx context.Context, protocolMessage chainlib.ProtocolMe } } -func NewRelayState(ctx context.Context, protocolMessage chainlib.ProtocolMessage, stateNumber int, cache RetryHashCacheInf, relayParser RelayParserInf, archiveStatus *ArchiveStatus) *RelayState { +func NewRelayState(ctx context.Context, protocolMessage chainlib.ProtocolMessage, stateNumber int, cache RetryHashCacheInf, relayParser RelayParserInf, archiveStatus *ArchiveStatus, transitionData TransitionData) *RelayState { relayRequestData := protocolMessage.RelayPrivateData() if archiveStatus == nil { utils.LavaFormatError("misuse detected archiveStatus is nil", nil, utils.Attribute{Key: "protocolMessage.GetApi", Value: protocolMessage.GetApi()}) @@ -80,8 +108,22 @@ func NewRelayState(ctx context.Context, protocolMessage chainlib.ProtocolMessage cache: cache, relayParser: relayParser, archiveStatus: archiveStatus, + transitionData: TransitionData{}, // we set an empty transition for the next state } rs.archiveStatus.isArchive.Store(rs.CheckIsArchive(relayRequestData)) + // if we have transition data, we need to update the protocol message with the new earliest block + if transitionData.block > 0 { + latest, earliest := protocolMessage.RequestedBlock() + if transitionData.block < earliest { + // allows overwriting earliest requested block + rs.archiveStatus.isEarliestUsed.Store(false) + protocolMessage = relayParser.UpdateProtocolMessageIfNeededWithNewData(ctx, rs, protocolMessage, transitionData.block, chainlib.EARLIEST) + } + if latest > 0 && transitionData.block > latest { + protocolMessage = relayParser.UpdateProtocolMessageIfNeededWithNewData(ctx, rs, protocolMessage, transitionData.block, chainlib.LATEST) + } + rs.protocolMessage = protocolMessage + } return rs } @@ -153,6 +195,7 @@ func (rs *RelayState) upgradeToArchiveIfNeeded(numberOfRetriesLaunched int, numb if rs == nil || rs.archiveStatus == nil { return } + protocolMessage := rs.GetProtocolMessage() hashes := rs.GetProtocolMessage().GetRequestedBlocksHashes() // If we got upgraded and we still got a node error (>= 2) we know upgrade didn't work if rs.archiveStatus.isUpgraded.Load() && numberOfNodeErrors >= 2 { @@ -164,6 +207,16 @@ func (rs *RelayState) upgradeToArchiveIfNeeded(numberOfRetriesLaunched int, numb // 1. We can remove the archive, return to the original protocol message, // 2. Set all hashes as irrelevant for future queries. if !rs.archiveStatus.isHashCached.Load() { + if len(hashes) == 0 { + // we got here because we had a lot of failed retries and we got upgraded without hashes, cache the relay + endpoint := rs.relayParser.GetEndpoint() + if endpoint != nil { + hash, _, err := protocolMessage.HashCacheRequest(endpoint.ChainID) + if err == nil { + hashes = []string{string(hash)} + } + } + } for _, hash := range hashes { rs.cache.AddHashToCache(hash) } @@ -171,36 +224,45 @@ func (rs *RelayState) upgradeToArchiveIfNeeded(numberOfRetriesLaunched int, numb } return } - if !rs.archiveStatus.isArchive.Load() && len(hashes) > 0 { - // Launch archive only on the second retry attempt. + if !rs.archiveStatus.isArchive.Load() && (len(hashes) > 0 || numberOfRetriesLaunched == NumberOfRetriesToUpgradeToArchive) { + // Launch archive only on the second retry attempt or if we have a lot of failed relays. if numberOfRetriesLaunched == 1 { // Iterate over all hashes found in relay, if we don't have them in the cache we can try retry on archive. // If we are familiar with all, we don't want to allow archive. for _, hash := range hashes { if !rs.cache.CheckHashInCache(hash) { - // If we didn't find the hash in the cache we can try archive relay. - protocolMessage := rs.GetProtocolMessage() - relayRequestData := protocolMessage.RelayPrivateData() - // We need to set archive. - // Create a new relay private data containing the extension. - userData := protocolMessage.GetUserData() - // add all existing extensions including archive split by "," so the override will work - existingExtensionsPlusArchive := strings.Join(append(relayRequestData.Extensions, extensionslib.ArchiveExtension), ",") - metaDataForArchive := []pairingtypes.Metadata{{Name: common.EXTENSION_OVERRIDE_HEADER_NAME, Value: existingExtensionsPlusArchive}} - newProtocolMessage, err := rs.relayParser.ParseRelay(rs.ctx, relayRequestData.ApiUrl, string(relayRequestData.Data), relayRequestData.ConnectionType, userData.DappId, userData.ConsumerIp, metaDataForArchive) - if err != nil { - utils.LavaFormatError("Failed converting to archive message in shouldRetry", err, utils.LogAttr("relayRequestData", relayRequestData), utils.LogAttr("metadata", metaDataForArchive)) - } else { - // Creating an archive protocol message, and set it to current protocol message - rs.SetProtocolMessage(newProtocolMessage) - // for future batches. - rs.archiveStatus.isUpgraded.Store(true) - rs.archiveStatus.isArchive.Store(true) - } + rs.upgradeToArchive() break } } // We had node error, and we have a hash parsed. + } else if NumberOfRetriesToUpgradeToArchive == 5 { + endpoint := rs.relayParser.GetEndpoint() + hash, _, err := protocolMessage.HashCacheRequest(endpoint.ChainID) + if err == nil && !rs.cache.CheckHashInCache(string(hash)) { + rs.upgradeToArchive() + } } } } + +func (rs *RelayState) upgradeToArchive() { + protocolMessage := rs.GetProtocolMessage() + relayRequestData := protocolMessage.RelayPrivateData() + // We need to set archive. + // Create a new relay private data containing the extension. + userData := protocolMessage.GetUserData() + // add all existing extensions including archive split by "," so the override will work + existingExtensionsPlusArchive := strings.Join(append(relayRequestData.Extensions, extensionslib.ArchiveExtension), ",") + metaDataForArchive := []pairingtypes.Metadata{{Name: common.EXTENSION_OVERRIDE_HEADER_NAME, Value: existingExtensionsPlusArchive}} + newProtocolMessage, err := rs.relayParser.ParseRelay(rs.ctx, relayRequestData.ApiUrl, string(relayRequestData.Data), relayRequestData.ConnectionType, userData.DappId, userData.ConsumerIp, metaDataForArchive) + if err != nil { + utils.LavaFormatError("Failed converting to archive message in shouldRetry", err, utils.LogAttr("relayRequestData", relayRequestData), utils.LogAttr("metadata", metaDataForArchive)) + } else { + // Creating an archive protocol message, and set it to current protocol message + rs.SetProtocolMessage(newProtocolMessage) + // for future batches. + rs.archiveStatus.isUpgraded.Store(true) + rs.archiveStatus.isArchive.Store(true) + } +} diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 8592da8408..1ea78dbaa3 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "math" "net/http" + "regexp" "strconv" "strings" "sync" @@ -26,6 +28,7 @@ import ( "github.com/lavanet/lava/v4/protocol/lavaprotocol/protocolerrors" "github.com/lavanet/lava/v4/protocol/lavasession" "github.com/lavanet/lava/v4/protocol/metrics" + "github.com/lavanet/lava/v4/protocol/parser" "github.com/lavanet/lava/v4/protocol/performance" "github.com/lavanet/lava/v4/protocol/statetracker" "github.com/lavanet/lava/v4/protocol/upgrade" @@ -60,7 +63,7 @@ type CancelableContextHolder struct { // implements Relay Sender interfaced and uses an ChainListener to get it called type RPCConsumerServer struct { consumerProcessGuid string - chainParser chainlib.ChainParser + ChainParser chainlib.ChainParser consumerSessionManager *lavasession.ConsumerSessionManager listenEndpoint *lavasession.RPCEndpoint rpcConsumerLogs *metrics.RPCConsumerLogs @@ -121,7 +124,7 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp rpccs.lavaChainID = lavaChainID rpccs.rpcConsumerLogs = rpcConsumerLogs rpccs.privKey = privKey - rpccs.chainParser = chainParser + rpccs.ChainParser = chainParser rpccs.finalizationConsensus = finalizationConsensus rpccs.ConsumerAddress = consumerAddress rpccs.consumerConsistency = consumerConsistency @@ -161,6 +164,10 @@ func (rpccs *RPCConsumerServer) SetConsistencySeenBlock(blockSeen int64, key str rpccs.consumerConsistency.SetSeenBlockFromKey(blockSeen, key) } +func (rpccs *RPCConsumerServer) GetEndpoint() *lavasession.RPCEndpoint { + return rpccs.listenEndpoint +} + func (rpccs *RPCConsumerServer) GetListeningAddress() string { return rpccs.chainListener.GetListeningAddress() } @@ -174,6 +181,7 @@ func (rpccs *RPCConsumerServer) sendCraftedRelaysWrapper(initialRelays bool) (bo if success { rpccs.initialized.Store(true) } + go rpccs.tryExtractNodeData(context.Background()) return success, err } @@ -207,7 +215,7 @@ func (rpccs *RPCConsumerServer) waitForPairing() { } func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay *pairingtypes.RelayPrivateData, chainMessage chainlib.ChainMessage, err error) { - parsing, apiCollection, ok := rpccs.chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCKNUM) + parsing, apiCollection, ok := rpccs.ChainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCKNUM) if !ok { return false, nil, nil, utils.LavaFormatWarning("did not send initial relays because the spec does not contain "+spectypes.FUNCTION_TAG_GET_BLOCKNUM.String(), nil, utils.LogAttr("chainID", rpccs.listenEndpoint.ChainID), @@ -218,7 +226,7 @@ func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay path := parsing.ApiName data := []byte(parsing.FunctionTemplate) - chainMessage, err = rpccs.chainParser.ParseMsg(path, data, collectionData.Type, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + chainMessage, err = rpccs.ChainParser.ParseMsg(path, data, collectionData.Type, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) if err != nil { return false, nil, nil, utils.LavaFormatError("failed creating chain message in rpc consumer init relays", err, utils.LogAttr("chainID", rpccs.listenEndpoint.ChainID), @@ -303,7 +311,7 @@ func (rpccs *RPCConsumerServer) sendCraftedRelays(retries int, initialRelays boo ctx := utils.WithUniqueIdentifier(context.Background(), utils.GenerateUniqueIdentifier()) ok, relay, chainMessage, err := rpccs.craftRelay(ctx) if !ok { - enabled, _ := rpccs.chainParser.DataReliabilityParams() + enabled, _ := rpccs.ChainParser.DataReliabilityParams() // if DR is disabled it's okay to not have GET_BLOCKNUM if !enabled { return true, nil @@ -315,7 +323,7 @@ func (rpccs *RPCConsumerServer) sendCraftedRelays(retries int, initialRelays boo } func (rpccs *RPCConsumerServer) getLatestBlock() uint64 { - latestKnownBlock, numProviders := rpccs.finalizationConsensus.GetExpectedBlockHeight(rpccs.chainParser) + latestKnownBlock, numProviders := rpccs.finalizationConsensus.GetExpectedBlockHeight(rpccs.ChainParser) if numProviders > 0 && latestKnownBlock > 0 { return uint64(latestKnownBlock) } @@ -356,7 +364,7 @@ func (rpccs *RPCConsumerServer) ParseRelay( // remove lava directive headers metadata, directiveHeaders := rpccs.LavaDirectiveHeaders(metadata) - chainMessage, err := rpccs.chainParser.ParseMsg(url, []byte(req), connectionType, metadata, rpccs.getExtensionsFromDirectiveHeaders(directiveHeaders)) + chainMessage, err := rpccs.ChainParser.ParseMsg(url, []byte(req), connectionType, metadata, rpccs.getExtensionsFromDirectiveHeaders(directiveHeaders)) if err != nil { return nil, err } @@ -395,7 +403,7 @@ func (rpccs *RPCConsumerServer) SendParsedRelay( } // Handle Data Reliability - enabled, dataReliabilityThreshold := rpccs.chainParser.DataReliabilityParams() + enabled, dataReliabilityThreshold := rpccs.ChainParser.DataReliabilityParams() // check if data reliability is enabled and relay processor allows us to perform data reliability if enabled && !relayProcessor.getSkipDataReliability() { // new context is needed for data reliability as some clients cancel the context they provide when the relay returns @@ -684,7 +692,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( addon := chainlib.GetAddon(protocolMessage) reqBlock = rpccs.resolveRequestedBlock(reqBlock, protocolMessage.RelayPrivateData().SeenBlock, latestBlockHashRequested, protocolMessage) // check whether we need a new protocol message with the new earliest block hash requested - protocolMessage = rpccs.updateProtocolMessageIfNeededWithNewEarliestData(ctx, relayState, protocolMessage, earliestBlockHashRequested, addon) + protocolMessage = rpccs.UpdateProtocolMessageIfNeededWithNewData(ctx, relayState, protocolMessage, earliestBlockHashRequested, chainlib.EARLIEST) // consumerEmergencyTracker always use latest virtual epoch virtualEpoch := rpccs.consumerTxSender.GetLatestVirtualEpoch() @@ -850,7 +858,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( } // get here only if performed a regular relay successfully - expectedBH, numOfProviders := rpccs.finalizationConsensus.GetExpectedBlockHeight(rpccs.chainParser) + expectedBH, numOfProviders := rpccs.finalizationConsensus.GetExpectedBlockHeight(rpccs.ChainParser) pairingAddressesLen := rpccs.consumerSessionManager.GetAtomicPairingAddressesLength() latestBlock := localRelayResult.Reply.LatestBlock if expectedBH-latestBlock > 1000 { @@ -876,8 +884,17 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( } errResponse = rpccs.consumerSessionManager.OnSessionDone(singleConsumerSession, latestBlock, chainlib.GetComputeUnits(protocolMessage), relayLatency, singleConsumerSession.CalculateExpectedLatency(expectedRelayTimeoutForQOS), expectedBH, numOfProviders, pairingAddressesLen, protocolMessage.GetApi().Category.HangingApi, extensions) // session done successfully - isNodeError, _ := protocolMessage.CheckResponseError(localRelayResult.Reply.Data, localRelayResult.StatusCode) + isNodeError, errorMessage := protocolMessage.CheckResponseError(localRelayResult.Reply.Data, localRelayResult.StatusCode) localRelayResult.IsNodeError = isNodeError + if isNodeError { + // if it's a node error we might be able to extract a block number from the error message + blockError, blockNumber := rpccs.ChainParser.IdentifyNodeError(errorMessage) + if blockError { + // we identified a block number in the error message, meaning we requested a specific block + // we can't modify the chain message here, only on the creation of a new state so store this in the state for the transition to use + relayState.SetBlockTransitionData(blockNumber) + } + } if rpccs.debugRelays { utils.LavaFormatDebug("Result Code", utils.LogAttr("isNodeError", isNodeError), utils.LogAttr("StatusCode", localRelayResult.StatusCode)) } @@ -927,7 +944,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( new_ctx := context.Background() new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease) defer cancel() - _, averageBlockTime, _, _ := rpccs.chainParser.ChainBlockStats() + _, averageBlockTime, _, _ := rpccs.ChainParser.ChainBlockStats() err2 := rpccs.cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{ RequestHash: hashKey, ChainId: chainId, @@ -1099,10 +1116,10 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe // Update relay request requestedBlock to the provided one in case it was arbitrary lavaprotocol.UpdateRequestedBlock(relayRequest.RelayData, reply) - _, _, blockDistanceForFinalizedData, blocksInFinalizationProof := rpccs.chainParser.ChainBlockStats() + _, _, blockDistanceForFinalizedData, blocksInFinalizationProof := rpccs.ChainParser.ChainBlockStats() isFinalized := spectypes.IsFinalizedBlock(relayRequest.RelayData.RequestBlock, reply.LatestBlock, int64(blockDistanceForFinalizedData)) - filteredHeaders, _, ignoredHeaders := rpccs.chainParser.HandleHeaders(reply.Metadata, chainMessage.GetApiCollection(), spectypes.Header_pass_reply) + filteredHeaders, _, ignoredHeaders := rpccs.ChainParser.HandleHeaders(reply.Metadata, chainMessage.GetApiCollection(), spectypes.Header_pass_reply) reply.Metadata = filteredHeaders // check the signature on the reply @@ -1116,7 +1133,7 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe reply.Metadata = append(reply.Metadata, ignoredHeaders...) // TODO: response data sanity, check its under an expected format add that format to spec - enabled, _ := rpccs.chainParser.DataReliabilityParams() + enabled, _ := rpccs.ChainParser.DataReliabilityParams() if enabled && !singleConsumerSession.StaticProvider { // TODO: allow static providers to detect hash mismatches, // triggering conflict with them is impossible so we skip this for now, but this can be used to block malicious providers @@ -1337,7 +1354,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context for i := 0; i < len(results)-1; i++ { relayResult := results[i] relayResultDataReliability := results[i+1] - conflict := lavaprotocol.VerifyReliabilityResults(ctx, &relayResult, &relayResultDataReliability, protocolMessage.GetApiCollection(), rpccs.chainParser) + conflict := lavaprotocol.VerifyReliabilityResults(ctx, &relayResult, &relayResultDataReliability, protocolMessage.GetApiCollection(), rpccs.ChainParser) if conflict != nil { // TODO: remove this check when we fix the missing extensions information on conflict detection transaction if len(protocolMessage.GetExtensions()) == 0 { @@ -1358,7 +1375,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context } func (rpccs *RPCConsumerServer) getProcessingTimeout(chainMessage chainlib.ChainMessage) (processingTimeout time.Duration, relayTimeout time.Duration) { - _, averageBlockTime, _, _ := rpccs.chainParser.ChainBlockStats() + _, averageBlockTime, _, _ := rpccs.ChainParser.ChainBlockStats() relayTimeout = chainlib.GetRelayTimeout(chainMessage, averageBlockTime) processingTimeout = common.GetTimeoutForProcessing(relayTimeout, chainlib.GetTimeoutInfo(chainMessage)) return processingTimeout, relayTimeout @@ -1382,7 +1399,7 @@ func (rpccs *RPCConsumerServer) getExtensionsFromDirectiveHeaders(directiveHeade extensionsStr, ok := directiveHeaders[common.EXTENSION_OVERRIDE_HEADER_NAME] if ok { extensions := strings.Split(extensionsStr, ",") - _, extensions, _ = rpccs.chainParser.SeparateAddonsExtensions(extensions) + _, extensions, _ = rpccs.ChainParser.SeparateAddonsExtensions(extensions) if len(extensions) == 1 && extensions[0] == "none" { // none eliminates existing extensions return extensionslib.ExtensionInfo{LatestBlock: rpccs.getLatestBlock(), ExtensionOverride: []string{}} @@ -1578,7 +1595,7 @@ func (rpccs *RPCConsumerServer) RoundTrip(req *http.Request) (*http.Response, er defer cancel() guid := utils.GenerateUniqueIdentifier() ctx = utils.WithUniqueIdentifier(ctx, guid) - url, data, connectionType, metadata, err := rpccs.chainParser.ExtractDataFromRequest(req) + url, data, connectionType, metadata, err := rpccs.ChainParser.ExtractDataFromRequest(req) if err != nil { return nil, err } @@ -1586,37 +1603,168 @@ func (rpccs *RPCConsumerServer) RoundTrip(req *http.Request) (*http.Response, er if err != nil { return nil, err } - resp, err := rpccs.chainParser.SetResponseFromRelayResult(relayResult) + resp, err := rpccs.ChainParser.SetResponseFromRelayResult(relayResult) rpccs.rpcConsumerLogs.SetLoLResponse(err == nil) return resp, err } -func (rpccs *RPCConsumerServer) updateProtocolMessageIfNeededWithNewEarliestData( +func (rpccs *RPCConsumerServer) UpdateProtocolMessageIfNeededWithNewData( ctx context.Context, relayState *RelayState, protocolMessage chainlib.ProtocolMessage, - earliestBlockHashRequested int64, - addon string, + newEarliestBlockRequested int64, + dataKind chainlib.DataKind, ) chainlib.ProtocolMessage { - if !relayState.GetIsEarliestUsed() && earliestBlockHashRequested != spectypes.NOT_APPLICABLE { - // We got a earliest block data from cache, we need to create a new protocol message with the new earliest block hash parsed - // and update the extension rules with the new earliest block data as it might be archive. - // Setting earliest used to attempt this only once. - relayState.SetIsEarliestUsed() - relayRequestData := protocolMessage.RelayPrivateData() - userData := protocolMessage.GetUserData() - newProtocolMessage, err := rpccs.ParseRelay(ctx, relayRequestData.ApiUrl, string(relayRequestData.Data), relayRequestData.ConnectionType, userData.DappId, userData.ConsumerIp, nil) - if err != nil { - utils.LavaFormatError("Failed copying protocol message in sendRelayToProvider", err) - return protocolMessage + if newEarliestBlockRequested != spectypes.NOT_APPLICABLE { + if !relayState.GetIsEarliestUsed() || dataKind == chainlib.LATEST { + // we can't make changes in the protocol message without creating a new one + if dataKind == chainlib.EARLIEST { + // if We got a earliest block data from cache, we need to create a new protocol message with the new earliest block hash parsed + // and update the extension rules with the new earliest block data as it might be archive. + // Setting earliest used to attempt this only once. + relayState.SetIsEarliestUsed() + } + relayRequestData := protocolMessage.RelayPrivateData() + userData := protocolMessage.GetUserData() + newProtocolMessage, err := rpccs.ParseRelay(ctx, relayRequestData.ApiUrl, string(relayRequestData.Data), relayRequestData.ConnectionType, userData.DappId, userData.ConsumerIp, nil) + if err != nil { + utils.LavaFormatError("Failed copying protocol message in sendRelayToProvider", err) + return protocolMessage + } + if dataKind == chainlib.EARLIEST { + addon := chainlib.GetAddon(protocolMessage) + extensionAdded := newProtocolMessage.UpdateEarliestAndValidateExtensionRules(rpccs.ChainParser.ExtensionsParser(), newEarliestBlockRequested, addon, relayRequestData.SeenBlock) + if extensionAdded && relayState.CheckIsArchive(newProtocolMessage.RelayPrivateData()) { + relayState.SetIsArchive(true) + } + } else if dataKind == chainlib.LATEST { + newProtocolMessage.UpdateLatestBlockInMessage(newEarliestBlockRequested) + } + relayState.SetProtocolMessage(newProtocolMessage) + return newProtocolMessage } + } + return protocolMessage +} + +// we implement rpcConsumerServer as a chain router so we can use it in a chainFetcher +// SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) // has to be thread safe, reuse code within ParseMsg as common functionality +// ExtensionsSupported(internalPath string, extensions []string) bool +func (rpccs *RPCConsumerServer) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessageForSend, extensions []string) (relayReply *chainlib.RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) { + ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) + url, data := chainMessage.GetOriginal() + userData := common.UserData{DappId: initRelaysDappId, ConsumerIp: initRelaysConsumerIp} + collectionData := chainMessage.GetApiCollection().CollectionData + metadata := chainMessage.GetRPCMessage().GetHeaders() + relayResult, err := rpccs.SendRelay(ctx, url, string(data), collectionData.Type, userData.DappId, userData.ConsumerIp, nil, metadata) + if relayResult == nil { + return nil, "", nil, proxyUrl, chainId, err + } + return &chainlib.RelayReplyWrapper{StatusCode: relayResult.StatusCode, RelayReply: relayResult.Reply}, "", nil, proxyUrl, chainId, err +} - extensionAdded := newProtocolMessage.UpdateEarliestAndValidateExtensionRules(rpccs.chainParser.ExtensionsParser(), earliestBlockHashRequested, addon, relayRequestData.SeenBlock) - if extensionAdded && relayState.CheckIsArchive(newProtocolMessage.RelayPrivateData()) { - relayState.SetIsArchive(true) +func (rpccs *RPCConsumerServer) ExtensionsSupported(internalPath string, extensions []string) bool { + configuredExtensions := rpccs.ChainParser.ExtensionsParser().GetConfiguredExtensions() + configured := map[string]struct{}{} + for _, extension := range configuredExtensions { + configured[extension.Name] = struct{}{} + } + for _, extension := range extensions { + if _, found := configured[extension]; !found { + return false } - relayState.SetProtocolMessage(newProtocolMessage) - return newProtocolMessage } - return protocolMessage + return true +} + +const ( + RetryInterval = 10 * time.Minute // time for retrying on failure + RefreshInterval = 3 * 24 * time.Hour // time for refreshing on success, so if something changes lava adapts +) + +func (rpccs *RPCConsumerServer) tryExtractNodeData(ctx context.Context) { + successTooNew := false + for { + // identify the pattern for node errors with node too new + successTooNew = rpccs.ExtractNodeData(ctx, chainlib.LATEST) + // successTooOld := rpccs.ExtractNodeData(ctx, chainlib.EARLIEST) // TODO: add support for earliest + var timer *time.Timer + // if successTooNew && successTooOld { // TODO: when earliest is supported, consider separate timers + if successTooNew { + timer = time.NewTimer(RefreshInterval) + } else { + timer = time.NewTimer(RetryInterval) + } + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + timer.Stop() + } + } +} + +func (rpccs *RPCConsumerServer) ExtractNodeData(ctx context.Context, kind chainlib.DataKind) bool { + endpoint := &lavasession.RPCProviderEndpoint{ChainID: rpccs.listenEndpoint.ChainID, ApiInterface: rpccs.listenEndpoint.ApiInterface, NodeUrls: []common.NodeUrl{{ + Url: "Internal", + }}} + chainFetcher := chainlib.NewChainFetcher(ctx, &chainlib.ChainFetcherOptions{ + ChainRouter: rpccs, + ChainParser: rpccs.ChainParser, + Endpoint: endpoint, + Cache: nil, + }) + // we want a block that will surely fail + fetchBlock := int64(0) + if kind == chainlib.LATEST { + fetchBlock = math.MaxInt64 + } else if kind == chainlib.EARLIEST { + // TODO: make it earliest and make sure it fails + // add a directive not to add archive + // add a directive not to do retry + // should fetch block 2 + return false + } + + // blockError contains static error data, dynamic data and the requested block replaced + // this shouldn't happen if the block exists in the response + blockError, success := fetchFormattedBlock(ctx, chainFetcher, fetchBlock) + if !success { + return false + } + fetchBlock-- // we now ask a different block + blockError2, success := fetchFormattedBlock(ctx, chainFetcher, fetchBlock) + if !success { + return false + } + // now we need the common part between blockError and blockError2 + blockErrorFormat, lettersDiff := parser.LongestCommonSubsequenceWithFormat(parser.CapStringLen(blockError), parser.CapStringLen(blockError2)) + if lettersDiff < parser.MinLettersForPattern { + return false + } + utils.LavaFormatInfo("[+] identified pattern for node errors, setting in chain parser", utils.LogAttr("pattern", blockError)) + rpccs.ChainParser.SetBlockErrorPattern(blockErrorFormat, kind) + return true +} + +func fetchFormattedBlock(ctx context.Context, chainFetcher *chainlib.ChainFetcher, fetchBlock int64) (blockError string, success bool) { + _, responseErrorMessage, format, err := chainFetcher.FetchBlock(ctx, fetchBlock) + if err != nil { + utils.LavaFormatError("[-] failed sending a fault block fetch to parse errors", err) + return "", false + } + if responseErrorMessage == "" { + return "", false + } + responseErrorMessage = parser.CapStringLen(responseErrorMessage) + formatted := fmt.Sprintf(format, fetchBlock) + // make sure we capture only the right thing + re := regexp.MustCompile(fmt.Sprintf(`(^|\W)(%s)(\W|$)`, regexp.QuoteMeta(formatted))) + // re := regexp.MustCompile(fmt.Sprintf(`%s`, regexp.QuoteMeta(formatted))) + blockError = re.ReplaceAllString(responseErrorMessage, "$1"+format+"$3") + if blockError == responseErrorMessage { + return "", false + } + return blockError, true } diff --git a/protocol/rpcprovider/provider_listener.go b/protocol/rpcprovider/provider_listener.go index 79388f2beb..41562efdbf 100644 --- a/protocol/rpcprovider/provider_listener.go +++ b/protocol/rpcprovider/provider_listener.go @@ -175,3 +175,27 @@ func (rs *relayServer) findReceiver(apiInterface string, specID string) (RelayRe } return *relayReceiver.relayReceiver, nil } + +type RelayReceiverInterceptor struct { + interceptFunction func(input any) + originalReceiver RelayReceiver +} + +func (rs *RelayReceiverInterceptor) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { + rs.interceptFunction(request) + return rs.originalReceiver.Relay(ctx, request) +} + +func (rs *RelayReceiverInterceptor) Probe(ctx context.Context, probeReq *pairingtypes.ProbeRequest) (*pairingtypes.ProbeReply, error) { + rs.interceptFunction(probeReq) + return rs.originalReceiver.Probe(ctx, probeReq) +} + +func (rs *RelayReceiverInterceptor) RelaySubscribe(request *pairingtypes.RelayRequest, srv pairingtypes.Relayer_RelaySubscribeServer) error { + rs.interceptFunction(request) + return rs.originalReceiver.RelaySubscribe(request, srv) +} + +func NewRelayReceiverInterceptor(originalReceiver RelayReceiver, interceptFunction func(input any)) RelayReceiver { + return &RelayReceiverInterceptor{interceptFunction: interceptFunction, originalReceiver: originalReceiver} +} diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index ab0e15559d..9c635156a0 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -380,21 +380,12 @@ func (rpcps *RPCProviderServer) ValidateRequest(chainMessage chainlib.ChainMessa // the consumer either configured an invalid value or is modifying the requested block as part of a data reliability message // see if this modification is supported providerRequestedBlockPreUpdate := reqBlock - chainMessage.UpdateLatestBlockInMessage(request.RelayData.RequestBlock, true) + chainMessage.UpdateLatestBlockInMessage(request.RelayData.RequestBlock) // if after UpdateLatestBlockInMessage it's not aligned we have a problem reqBlock, _ = chainMessage.RequestedBlock() if reqBlock != request.RelayData.RequestBlock { - utils.LavaFormatDebug("requested block mismatch between consumer and provider", - utils.LogAttr("request data", request.RelayData.Data), - utils.LogAttr("request path", request.RelayData.ApiUrl), - utils.LogAttr("method", chainMessage.GetApi().Name), - utils.Attribute{Key: "provider_parsed_block_pre_update", Value: providerRequestedBlockPreUpdate}, - utils.Attribute{Key: "provider_requested_block", Value: reqBlock}, - utils.Attribute{Key: "consumer_requested_block", Value: request.RelayData.RequestBlock}, - utils.Attribute{Key: "GUID", Value: ctx}) - // TODO, we need to return an error here, this was disabled so relays will pass, but it will cause data reliability issues. - // once we understand the issue return the error. - utils.LavaFormatError("requested block mismatch between consumer and provider", nil, + // this can happen if the consumer has additional information regarding the requested block + utils.LavaFormatWarning("requested block mismatch between consumer and provider", nil, utils.LogAttr("request data", string(request.RelayData.Data)), utils.LogAttr("request path", request.RelayData.ApiUrl), utils.LogAttr("method", chainMessage.GetApi().Name), @@ -1000,7 +991,7 @@ func (rpcps *RPCProviderServer) GetParametersForRelayDataReliability( } // TODO: take latestBlock and lastSeenBlock and put the greater one of them - updatedChainMessage = chainMsg.UpdateLatestBlockInMessage(latestBlock, true) + updatedChainMessage = chainMsg.UpdateLatestBlockInMessage(latestBlock) modifiedReqBlock = lavaprotocol.ReplaceRequestedBlock(request.RelayData.RequestBlock, latestBlock) if modifiedReqBlock != request.RelayData.RequestBlock { diff --git a/testutil/e2e/proxy/mockMaps/eth.json b/testutil/e2e/proxy/mockMaps/eth.json index 325d8d0bf8..d79d0a9b67 100644 --- a/testutil/e2e/proxy/mockMaps/eth.json +++ b/testutil/e2e/proxy/mockMaps/eth.json @@ -1,5 +1,7 @@ { "{\"jsonrpc\":\"2.0\",\"method\":\"debug_getRawHeader\",\"params\":[\"latest\"],\"id\":1}": "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"debug_works\"}\n", + "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"eth_getBlockByNumber\",\"params\":[\"0x7fffffffffffffff\",false]}":"{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}", + "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"eth_getBlockByNumber\",\"params\":[\"0x7ffffffffffffffe\",false]}":"{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}", "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"eth_getBlockByNumber\",\"params\":[\"0x0\",true]}": "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"difficulty\":\"0x400000000\",\"extraData\":\"0x11bbe8db4e347b4e8c937c1c8370e4b5ed33adb3db69cbdb7a38e1e50b1b82fa\",\"gasLimit\":\"0x1388\",\"gasUsed\":\"0x0\",\"hash\":\"0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3\",\"logsBloom\":\"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000\",\"miner\":\"0x0000000000000000000000000000000000000000\",\"mixHash\":\"0x0000000000000000000000000000000000000000000000000000000000000000\",\"nonce\":\"0x0000000000000042\",\"number\":\"0x0\",\"parentHash\":\"0x0000000000000000000000000000000000000000000000000000000000000000\",\"receiptsRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"sha3Uncles\":\"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347\",\"size\":\"0x21c\",\"stateRoot\":\"0xd7f8974fb5ac78d9ac099b9ad5018bedc2ce0a72dad1827a1709da30580f0544\",\"timestamp\":\"0x0\",\"totalDifficulty\":\"0x400000000\",\"transactions\":[],\"transactionsRoot\":\"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421\",\"uncles\":[]}}\n", "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"eth_getCode\",\"params\":[\"0x0000000000000000000000000000000000000000\",\"latest\"]}": "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0x\"}\n", "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"eth_chainId\",\"params\":[]}": "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0x1\"}\n",