diff --git a/foundation/rpc_server/rpc_server.go b/foundation/rpc_server/rpc_server.go index 3145d21..d06350a 100644 --- a/foundation/rpc_server/rpc_server.go +++ b/foundation/rpc_server/rpc_server.go @@ -37,8 +37,7 @@ type Server struct { readRetryCount int } -func NewServer(listenAddrGRPC, listenAddrHTTP string, logger *log.Logger, - qPool *qubic.Pool, maxTickFetchUrl string, readRetryCount int) *Server { +func NewServer(listenAddrGRPC, listenAddrHTTP string, logger *log.Logger, qPool *qubic.Pool, maxTickFetchUrl string, readRetryCount int) *Server { return &Server{ listenAddrGRPC: listenAddrGRPC, listenAddrHTTP: listenAddrHTTP, @@ -49,20 +48,22 @@ func NewServer(listenAddrGRPC, listenAddrHTTP string, logger *log.Logger, } } -func (s *Server) requestBalance(ctx context.Context, req *protobuff.GetBalanceRequest) (*protobuff.GetBalanceResponse, error) { - client, err := s.qPool.Get() - if err != nil { - return nil, errors.Wrap(err, "getting pool connection") - } +func (s *Server) GetBalance(ctx context.Context, req *protobuff.GetBalanceRequest) (*protobuff.GetBalanceResponse, error) { + + var identityInfo types.AddressInfo + err := WithRetry(ctx, s.qPool, s.readRetryCount, func(ctx context.Context, client *qubic.Client) error { + res, err := client.GetIdentity(ctx, req.Id) + if err != nil { + return errors.Wrap(err, "getting identity info from node") + } - identityInfo, err := client.GetIdentity(ctx, req.Id) + identityInfo = res + return nil + }) if err != nil { - s.qPool.Close(client) - return nil, errors.Wrap(err, "getting identity info from node") + return nil, status.Errorf(codes.Internal, "getting identity info from node %v", err) } - s.qPool.Put(client) - balance := protobuff.Balance{ Id: req.Id, Balance: identityInfo.AddressData.IncomingAmount - identityInfo.AddressData.OutgoingAmount, @@ -76,34 +77,23 @@ func (s *Server) requestBalance(ctx context.Context, req *protobuff.GetBalanceRe } return &protobuff.GetBalanceResponse{Balance: &balance}, nil } -func (s *Server) GetBalance(ctx context.Context, req *protobuff.GetBalanceRequest) (*protobuff.GetBalanceResponse, error) { - retryCount := 0 - for { - data, err := s.requestBalance(ctx, req) + +func (s *Server) GetTickInfo(ctx context.Context, _ *emptypb.Empty) (*protobuff.GetTickInfoResponse, error) { + + var tickInfo types.TickInfo + err := WithRetry(ctx, s.qPool, s.readRetryCount, func(ctx context.Context, client *qubic.Client) error { + res, err := client.GetTickInfo(ctx) if err != nil { - if retryCount >= s.readRetryCount { - return nil, status.Errorf(codes.Internal, "getting balance: %v", err) - } - retryCount++ - continue + return errors.Wrap(err, "getting tick info from node") } - return data, nil - } -} -func (s *Server) requestTickInfo(ctx context.Context, _ *emptypb.Empty) (*protobuff.GetTickInfoResponse, error) { - client, err := s.qPool.Get() + tickInfo = res + return nil + }) if err != nil { - return nil, errors.Wrap(err, "getting pool connection") + return nil, status.Errorf(codes.Internal, "getting tick info from node %v", err) } - tickInfo, err := client.GetTickInfo(ctx) - if err != nil { - s.qPool.Close(client) - return nil, errors.Wrap(err, "getting tick info from node") - } - - s.qPool.Put(client) return &protobuff.GetTickInfoResponse{TickInfo: &protobuff.TickInfo{ Tick: tickInfo.Tick, Duration: uint32(tickInfo.TickDuration), @@ -111,87 +101,75 @@ func (s *Server) requestTickInfo(ctx context.Context, _ *emptypb.Empty) (*protob InitialTick: tickInfo.InitialTick, }}, nil } -func (s *Server) GetTickInfo(ctx context.Context, _ *emptypb.Empty) (*protobuff.GetTickInfoResponse, error) { - retryCount := 0 - for { - data, err := s.requestTickInfo(ctx, nil) + +func (s *Server) GetBlockHeight(ctx context.Context, _ *emptypb.Empty) (*protobuff.GetBlockHeightResponse, error) { + + var tickInfo types.TickInfo + err := WithRetry(ctx, s.qPool, s.readRetryCount, func(ctx context.Context, client *qubic.Client) error { + res, err := client.GetTickInfo(ctx) if err != nil { - if retryCount >= s.readRetryCount { - return nil, status.Errorf(codes.Internal, "getting tick info: %v", err) - } - retryCount++ - continue + return errors.Wrap(err, "getting tick info from node") } - return data, nil - } -} -func (s *Server) GetBlockHeight(ctx context.Context, _ *emptypb.Empty) (*protobuff.GetBlockHeightResponse, error) { - // This is the same request as GetTickInfo - data, err := s.GetTickInfo(ctx, nil) + tickInfo = res + return nil + }) if err != nil { - return nil, err + return nil, status.Errorf(codes.Internal, "getting tick info from node %v", err) } - return &protobuff.GetBlockHeightResponse{ - BlockHeight: data.TickInfo, - }, nil + return &protobuff.GetBlockHeightResponse{BlockHeight: &protobuff.TickInfo{ + Tick: tickInfo.Tick, + Duration: uint32(tickInfo.TickDuration), + Epoch: uint32(tickInfo.Epoch), + InitialTick: tickInfo.InitialTick, + }}, nil } -func (s *Server) requestSmartContract(ctx context.Context, req *protobuff.QuerySmartContractRequest) (*protobuff.QuerySmartContractResponse, error) { +func (s *Server) QuerySmartContract(ctx context.Context, req *protobuff.QuerySmartContractRequest) (*protobuff.QuerySmartContractResponse, error) { reqData, err := base64.StdEncoding.DecodeString(req.RequestData) if err != nil { return nil, status.Errorf(codes.FailedPrecondition, "failed to decode from base64 the request data: %s", req.RequestData) } - client, err := s.qPool.Get() - if err != nil { - return nil, status.Errorf(codes.Internal, "getting pool connection %v", err) - } + var scData types.SmartContractData - scData, err := client.QuerySmartContract(ctx, qubic.RequestContractFunction{ - ContractIndex: req.ContractIndex, - InputType: uint16(req.InputType), - InputSize: uint16(req.InputSize), - }, reqData) + err = WithRetry(ctx, s.qPool, s.readRetryCount, func(ctx context.Context, client *qubic.Client) error { + res, err := client.QuerySmartContract(ctx, qubic.RequestContractFunction{ + ContractIndex: req.ContractIndex, + InputType: uint16(req.InputType), + InputSize: uint16(req.InputSize), + }, reqData) + if err != nil { + return errors.Wrap(err, "getting smart contract from node") + } + + scData = res + return nil + }) if err != nil { - s.qPool.Close(client) return nil, status.Errorf(codes.Internal, "query smart contract %v", err) } - s.qPool.Put(client) - return &protobuff.QuerySmartContractResponse{ResponseData: base64.StdEncoding.EncodeToString(scData.Data)}, nil } -func (s *Server) QuerySmartContract(ctx context.Context, req *protobuff.QuerySmartContractRequest) (*protobuff.QuerySmartContractResponse, error) { - retryCount := 0 - for { - data, err := s.requestSmartContract(ctx, req) + +func (s *Server) GetIssuedAssets(ctx context.Context, req *protobuff.IssuedAssetsRequest) (*protobuff.IssuedAssetsResponse, error) { + + var assets types.IssuedAssets + err := WithRetry(ctx, s.qPool, s.readRetryCount, func(ctx context.Context, client *qubic.Client) error { + res, err := client.GetIssuedAssets(ctx, req.Identity) if err != nil { - if retryCount >= s.readRetryCount { - return nil, errors.Wrap(err, "getting smart contract") - } - retryCount++ - continue + return errors.Wrap(err, "getting issued assets from node") } - return data, nil - } -} -func (s *Server) requestIssuedAssets(ctx context.Context, req *protobuff.IssuedAssetsRequest) (*protobuff.IssuedAssetsResponse, error) { - client, err := s.qPool.Get() + assets = res + return nil + }) if err != nil { - return nil, errors.Wrap(err, "getting pool connection") + return nil, status.Errorf(codes.Internal, "getting issued assets from node %v", err) } - assets, err := client.GetIssuedAssets(ctx, req.Identity) - if err != nil { - s.qPool.Close(client) - return nil, errors.Wrap(err, "getting issued assets from node") - } - - s.qPool.Put(client) - issuedAssets := make([]*protobuff.IssuedAsset, 0) for _, asset := range assets { @@ -226,35 +204,23 @@ func (s *Server) requestIssuedAssets(ctx context.Context, req *protobuff.IssuedA return &protobuff.IssuedAssetsResponse{IssuedAssets: issuedAssets}, nil } -func (s *Server) GetIssuedAssets(ctx context.Context, req *protobuff.IssuedAssetsRequest) (*protobuff.IssuedAssetsResponse, error) { - retryCount := 0 - for { - data, err := s.requestIssuedAssets(ctx, req) + +func (s *Server) GetOwnedAssets(ctx context.Context, req *protobuff.OwnedAssetsRequest) (*protobuff.OwnedAssetsResponse, error) { + + var assets types.OwnedAssets + err := WithRetry(ctx, s.qPool, s.readRetryCount, func(ctx context.Context, client *qubic.Client) error { + res, err := client.GetOwnedAssets(ctx, req.Identity) if err != nil { - if retryCount >= s.readRetryCount { - return nil, status.Errorf(codes.Internal, "getting issued assets: %v", err) - } - retryCount++ - continue + return errors.Wrap(err, "getting owned assets from node") } - return data, nil - } -} -func (s *Server) requestOwnedAssets(ctx context.Context, req *protobuff.OwnedAssetsRequest) (*protobuff.OwnedAssetsResponse, error) { - client, err := s.qPool.Get() + assets = res + return nil + }) if err != nil { - return nil, errors.Wrap(err, "getting pool connection") + return nil, status.Errorf(codes.Internal, "getting owned assets from node %v", err) } - assets, err := client.GetOwnedAssets(ctx, req.Identity) - if err != nil { - s.qPool.Close(client) - return nil, errors.Wrap(err, "getting owned assets from node") - } - - s.qPool.Put(client) - ownedAssets := make([]*protobuff.OwnedAsset, 0) for _, asset := range assets { @@ -307,35 +273,23 @@ func (s *Server) requestOwnedAssets(ctx context.Context, req *protobuff.OwnedAss return &protobuff.OwnedAssetsResponse{OwnedAssets: ownedAssets}, nil } -func (s *Server) GetOwnedAssets(ctx context.Context, req *protobuff.OwnedAssetsRequest) (*protobuff.OwnedAssetsResponse, error) { - retryCount := 0 - for { - data, err := s.requestOwnedAssets(ctx, req) + +func (s *Server) GetPossessedAssets(ctx context.Context, req *protobuff.PossessedAssetsRequest) (*protobuff.PossessedAssetsResponse, error) { + + var assets types.PossessedAssets + err := WithRetry(ctx, s.qPool, s.readRetryCount, func(ctx context.Context, client *qubic.Client) error { + res, err := client.GetPossessedAssets(ctx, req.Identity) if err != nil { - if retryCount >= s.readRetryCount { - return nil, status.Errorf(codes.Internal, "getting owned assets: %v", err) - } - retryCount++ - continue + return errors.Wrap(err, "getting possessed assets from node") } - return data, nil - } -} - -func (s *Server) requestPossessedAssets(ctx context.Context, req *protobuff.PossessedAssetsRequest) (*protobuff.PossessedAssetsResponse, error) { - client, err := s.qPool.Get() - if err != nil { - return nil, errors.Wrap(err, "getting pool connection") - } - assets, err := client.GetPossessedAssets(ctx, req.Identity) + assets = res + return nil + }) if err != nil { - s.qPool.Close(client) - return nil, errors.Wrap(err, "getting possessed assets from node") + return nil, status.Errorf(codes.Internal, "getting possessed assets from node %v", err) } - s.qPool.Put(client) - possessedAssets := make([]*protobuff.PossessedAsset, 0) for _, asset := range assets { @@ -399,24 +353,11 @@ func (s *Server) requestPossessedAssets(ctx context.Context, req *protobuff.Poss } possessedAssets = append(possessedAssets, &possessedAsset) + } return &protobuff.PossessedAssetsResponse{PossessedAssets: possessedAssets}, nil } -func (s *Server) GetPossessedAssets(ctx context.Context, req *protobuff.PossessedAssetsRequest) (*protobuff.PossessedAssetsResponse, error) { - retryCount := 0 - for { - data, err := s.requestPossessedAssets(ctx, req) - if err != nil { - if retryCount >= s.readRetryCount { - return nil, status.Errorf(codes.Internal, "getting possessed assets: %v", err) - } - retryCount++ - continue - } - return data, nil - } -} func (s *Server) BroadcastTransaction(ctx context.Context, req *protobuff.BroadcastTransactionRequest) (*protobuff.BroadcastTransactionResponse, error) { decodedTx, err := base64.StdEncoding.DecodeString(req.EncodedTransaction) @@ -482,39 +423,33 @@ func (s *Server) BroadcastTransaction(ctx context.Context, req *protobuff.Broadc }, nil } -type maxTickResponse struct { - MaxTick uint32 `json:"max_tick"` -} +type RetryableCall func(ctx context.Context, client *qubic.Client) error -func fetchMaxTick(ctx context.Context, maxTickFetchUrl string) (uint32, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, maxTickFetchUrl, nil) - if err != nil { - return 0, errors.Wrap(err, "creating new request") - } +func WithRetry(ctx context.Context, pool *qubic.Pool, maxRetries int, call RetryableCall) error { + var lastErr error - res, err := http.DefaultClient.Do(req) - if err != nil { - return 0, errors.Wrap(err, "performing request") - } - defer res.Body.Close() + for attempt := 0; attempt <= maxRetries; attempt++ { + client, err := pool.Get() + if err != nil { + lastErr = status.Errorf(codes.Internal, "failed to get client from pool: %v", err) + continue + } - var resp maxTickResponse - body, err := io.ReadAll(res.Body) - if err != nil { - return 0, errors.Wrap(err, "reading response body") - } - err = json.Unmarshal(body, &resp) - if err != nil { - return 0, errors.Wrap(err, "unmarshalling response") - } + err = call(ctx, client) - tick := resp.MaxTick + if err == nil { + pool.Put(client) + return nil + } - if tick == 0 { - return 0, errors.New("Fetched max tick is 0.") + lastErr = err + pool.Close(client) } + return lastErr +} - return tick, nil +type maxTickResponse struct { + MaxTick uint32 `json:"max_tick"` } func broadcastTxToMultiple(ctx context.Context, pool *qubic.Pool, decodedTx []byte) int { @@ -561,6 +496,37 @@ func int8ArrayToInt32Array(array []int8) []int32 { return ints } +func fetchMaxTick(ctx context.Context, maxTickFetchUrl string) (uint32, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, maxTickFetchUrl, nil) + if err != nil { + return 0, errors.Wrap(err, "creating new request") + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return 0, errors.Wrap(err, "performing request") + } + defer res.Body.Close() + + var resp maxTickResponse + body, err := io.ReadAll(res.Body) + if err != nil { + return 0, errors.Wrap(err, "reading response body") + } + err = json.Unmarshal(body, &resp) + if err != nil { + return 0, errors.Wrap(err, "unmarshalling response") + } + + tick := resp.MaxTick + + if tick == 0 { + return 0, errors.New("Fetched max tick is 0.") + } + + return tick, nil +} + func (s *Server) Start() error { srv := grpc.NewServer( grpc.MaxRecvMsgSize(600*1024*1024),