Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding batching options for RPC client #72

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/rpcserver/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -45,7 +45,7 @@ func NewServer(ctx context.Context, wallet ethsigner.Wallet) (ss Server, err err
return nil, err
}
s := &rpcServer{
backend: rpcbackend.NewRPCClient(httpClient),
backend: rpcbackend.NewRPCClient(ctx, httpClient),
apiServerDone: make(chan error),
wallet: wallet,
chainID: config.GetInt64(signerconfig.BackendChainID),
Expand Down
1 change: 1 addition & 0 deletions internal/signermsgs/en_error_messges.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,5 @@ var (
MsgInvalidEIP1559Transaction = ffe("FF22084", "Transaction payload invalid (EIP-1559): %v")
MsgInvalidEIP155TransactionV = ffe("FF22085", "Invalid V value from EIP-155 transaction (chainId=%d)")
MsgInvalidChainID = ffe("FF22086", "Invalid chainId expected=%d actual=%d")
MsgRPCRequestBatchFailed = ffe("FF22087", "Received response doesn't match the number of batched requests.")
)
309 changes: 244 additions & 65 deletions pkg/rpcbackend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ type Backend interface {
}

// NewRPCClient Constructor
func NewRPCClient(client *resty.Client) Backend {
return NewRPCClientWithOption(client, RPCClientOptions{})
func NewRPCClient(ctx context.Context, client *resty.Client) Backend {
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
return NewRPCClientWithOption(ctx, client, RPCClientOptions{})
}

// NewRPCClientWithOption Constructor
func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Backend {
func NewRPCClientWithOption(ctx context.Context, client *resty.Client, options RPCClientOptions) Backend {
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
rpcClient := &RPCClient{
client: client,
}
Expand All @@ -64,17 +64,44 @@ func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Back
rpcClient.concurrencySlots = make(chan bool, options.MaxConcurrentRequest)
}

if options.BatchOptions != nil {
batchDelay := 50 * time.Millisecond
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
batchSize := 500
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
batchWorkerCount := 50
if options.BatchOptions.BatchDelay != nil {
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
batchDelay = *options.BatchOptions.BatchDelay
}

if options.BatchOptions.BatchSize != 0 {
batchSize = options.BatchOptions.BatchSize
}
if options.BatchOptions.BatchWorkerCount != 0 {
batchWorkerCount = options.BatchOptions.BatchWorkerCount
}
rpcClient.requestBatchWorkerSlots = make(chan bool, batchWorkerCount)
rpcClient.startBatcher(ctx, batchDelay, batchSize)
}

return rpcClient
}

type RPCClient struct {
client *resty.Client
concurrencySlots chan bool
requestCounter int64
client *resty.Client
concurrencySlots chan bool
requestCounter int64
requestBatchQueue chan *batchRequest
requestBatchWorkerSlots chan bool
}

type RPCClientBatchOptions struct {
BatchDelay *time.Duration
BatchSize int
BatchWorkerCount int
}

type RPCClientOptions struct {
MaxConcurrentRequest int64
BatchOptions *RPCClientBatchOptions
}

type RPCRequest struct {
Expand Down Expand Up @@ -141,11 +168,137 @@ func (rc *RPCClient) CallRPC(ctx context.Context, result interface{}, method str
return nil
}

// SyncRequest sends an individual RPC request to the backend (always over HTTP currently),
// and waits synchronously for the response, or an error.
//
// In all return paths *including error paths* the RPCResponse is populated
// so the caller has an RPC structure to send back to the front-end caller.
type batchRequest struct {
rpcReq *RPCRequest
rpcRes chan *RPCResponse
rpcErr chan error
}

func (rc *RPCClient) startBatcher(ctx context.Context, batchDelay time.Duration, batchSize int) {
requestQueue := make(chan *batchRequest)

go func() {
ticker := time.NewTicker(batchDelay)
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

var batch []*batchRequest

for {
select {
case req := <-requestQueue:
batch = append(batch, req)
if len(batch) >= batchSize {
rc.sendBatch(ctx, batch)
batch = nil
}
case <-ticker.C:
if len(batch) > 0 {
rc.sendBatch(ctx, batch)
batch = nil
}
case <-ctx.Done():
return
}
}
}()

rc.requestBatchQueue = requestQueue
}

func (rc *RPCClient) sendBatch(ctx context.Context, batch []*batchRequest) {
select {
case rc.requestBatchWorkerSlots <- true:
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
// wait for the worker slot and continue
case <-ctx.Done():
for _, req := range batch {
err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed)
req.rpcErr <- err
}
return
}
go func() {
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
<-rc.requestBatchWorkerSlots
}()

batchRPCTraceID := fmt.Sprintf("batch-%d", time.Now().UnixNano())
traceIDs := make([]string, len(batch))

var rpcReqs []*RPCRequest
for i, req := range batch {
// We always set the back-end request ID - as we need to support requests coming in from
// multiple concurrent clients on our front-end that might use clashing IDs.
var beReq = *req.rpcReq
beReq.JSONRpc = "2.0"
rpcTraceID := rc.allocateRequestID(&beReq)
if req.rpcReq.ID != nil {
// We're proxying a request with front-end RPC ID - log that as well
rpcTraceID = fmt.Sprintf("%s->%s/%s", req.rpcReq.ID, batchRPCTraceID, rpcTraceID)
}
traceIDs[i] = rpcTraceID
rpcReqs = append(rpcReqs, &beReq)
}
log.L(ctx).Debugf("RPC[%s] --> BATCH %d requests", batchRPCTraceID, len(rpcReqs))

responses := make([]*RPCResponse, len(batch))
res, err := rc.client.R().
SetContext(ctx).
SetBody(rpcReqs).
SetResult(&responses).
SetError(&responses).
Post("")

if err != nil {
log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", batchRPCTraceID, err)
for _, req := range batch {
req.rpcErr <- err
}
return
}

if len(responses) != len(batch) {
err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed)
for _, req := range batch {
req.rpcErr <- err
}
return
}

for i, resp := range responses {
if logrus.IsLevelEnabled(logrus.TraceLevel) {
jsonOutput, _ := json.Marshal(resp)
log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", batchRPCTraceID, jsonOutput)
}

// JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes
if res.IsError() || (resp != nil && resp.Error != nil && resp.Error.Code != 0) {
rpcMsg := ""
errLog := ""
if resp != nil {
rpcMsg = resp.Message()
errLog = rpcMsg
}
if rpcMsg == "" {
// Log the raw result in the case of JSON parse error etc. (note that Resty no longer
// returns this as an error - rather the body comes back raw)
errLog = string(res.Body())
rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error()
}
traceID := traceIDs[i]
log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", traceID, res.StatusCode(), errLog)
batch[i].rpcErr <- fmt.Errorf(rpcMsg)
} else {
if resp.Result == nil {
// We don't want a result for errors, but a null success response needs to go in there
resp.Result = fftypes.JSONAnyPtr(fftypes.NullString)
}
batch[i].rpcRes <- resp

}
}
}()
}

func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRes *RPCResponse, err error) {
if rc.concurrencySlots != nil {
select {
Expand All @@ -160,63 +313,89 @@ func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRe
}()
}

// We always set the back-end request ID - as we need to support requests coming in from
// multiple concurrent clients on our front-end that might use clashing IDs.
var beReq = *rpcReq
beReq.JSONRpc = "2.0"
rpcTraceID := rc.allocateRequestID(&beReq)
if rpcReq.ID != nil {
// We're proxying a request with front-end RPC ID - log that as well
rpcTraceID = fmt.Sprintf("%s->%s", rpcReq.ID, rpcTraceID)
}
if rc.requestBatchQueue != nil {
req := &batchRequest{
rpcReq: rpcReq,
rpcRes: make(chan *RPCResponse, 1),
rpcErr: make(chan error, 1),
}

rpcRes = new(RPCResponse)
select {
case rc.requestBatchQueue <- req:
case <-ctx.Done():
err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID)
return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err
}

log.L(ctx).Debugf("RPC[%s] --> %s", rpcTraceID, rpcReq.Method)
if logrus.IsLevelEnabled(logrus.TraceLevel) {
jsonInput, _ := json.Marshal(rpcReq)
log.L(ctx).Tracef("RPC[%s] INPUT: %s", rpcTraceID, jsonInput)
}
rpcStartTime := time.Now()
res, err := rc.client.R().
SetContext(ctx).
SetBody(beReq).
SetResult(&rpcRes).
SetError(rpcRes).
Post("")

// Restore the original ID
rpcRes.ID = rpcReq.ID
if err != nil {
err := i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, err)
log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", rpcTraceID, err)
rpcRes = RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError)
return rpcRes, err
}
if logrus.IsLevelEnabled(logrus.TraceLevel) {
jsonOutput, _ := json.Marshal(rpcRes)
log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", rpcTraceID, jsonOutput)
}
// JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes
if res.IsError() || rpcRes.Error != nil && rpcRes.Error.Code != 0 {
rpcMsg := rpcRes.Message()
errLog := rpcMsg
if rpcMsg == "" {
// Log the raw result in the case of JSON parse error etc. (note that Resty no longer
// returns this as an error - rather the body comes back raw)
errLog = string(res.Body())
rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error()
}
log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", rpcTraceID, res.StatusCode(), errLog)
err := fmt.Errorf(rpcMsg)
return rpcRes, err
}
log.L(ctx).Infof("RPC[%s] <-- %s [%d] OK (%.2fms)", rpcTraceID, rpcReq.Method, res.StatusCode(), float64(time.Since(rpcStartTime))/float64(time.Millisecond))
if rpcRes.Result == nil {
// We don't want a result for errors, but a null success response needs to go in there
rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString)
select {
case rpcRes := <-req.rpcRes:
return rpcRes, nil
case err := <-req.rpcErr:
return nil, err
case <-ctx.Done():
err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID)
return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err
}
} else {
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
// We always set the back-end request ID - as we need to support requests coming in from
// multiple concurrent clients on our front-end that might use clashing IDs.
var beReq = *rpcReq
beReq.JSONRpc = "2.0"
rpcTraceID := rc.allocateRequestID(&beReq)
if rpcReq.ID != nil {
// We're proxying a request with front-end RPC ID - log that as well
rpcTraceID = fmt.Sprintf("%s->%s", rpcReq.ID, rpcTraceID)
}

rpcRes = new(RPCResponse)

log.L(ctx).Debugf("RPC[%s] --> %s", rpcTraceID, rpcReq.Method)
if logrus.IsLevelEnabled(logrus.TraceLevel) {
jsonInput, _ := json.Marshal(rpcReq)
log.L(ctx).Tracef("RPC[%s] INPUT: %s", rpcTraceID, jsonInput)
}
rpcStartTime := time.Now()
res, err := rc.client.R().
SetContext(ctx).
SetBody(beReq).
SetResult(&rpcRes).
SetError(&rpcRes).
Post("")

// Restore the original ID
rpcRes.ID = rpcReq.ID
if err != nil {
err := i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, err)
log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", rpcTraceID, err)
rpcRes = RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError)
return rpcRes, err
}
if logrus.IsLevelEnabled(logrus.TraceLevel) {
jsonOutput, _ := json.Marshal(rpcRes)
log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", rpcTraceID, jsonOutput)
}
// JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes
if res.IsError() || rpcRes.Error != nil && rpcRes.Error.Code != 0 {
rpcMsg := rpcRes.Message()
errLog := rpcMsg
if rpcMsg == "" {
// Log the raw result in the case of JSON parse error etc. (note that Resty no longer
// returns this as an error - rather the body comes back raw)
errLog = string(res.Body())
rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error()
}
log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", rpcTraceID, res.StatusCode(), errLog)
err := fmt.Errorf(rpcMsg)
return rpcRes, err
}
log.L(ctx).Infof("RPC[%s] <-- %s [%d] OK (%.2fms)", rpcTraceID, rpcReq.Method, res.StatusCode(), float64(time.Since(rpcStartTime))/float64(time.Millisecond))
if rpcRes.Result == nil {
// We don't want a result for errors, but a null success response needs to go in there
rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString)
}
return rpcRes, nil
}
return rpcRes, nil

}

func RPCErrorResponse(err error, id *fftypes.JSONAny, code RPCCode) *RPCResponse {
Expand Down
Loading
Loading