Skip to content

Commit

Permalink
chore: continue message processing after error
Browse files Browse the repository at this point in the history
  • Loading branch information
maharifu committed Sep 23, 2024
1 parent cf4f09b commit 93710fe
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 18 deletions.
8 changes: 8 additions & 0 deletions relayer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ func handleProcessError(ctx context.Context, err error) error {
logger.WithError(err).Error("error returned in process loop")
return nil
}

// isFatal returns true if the error is unrecoverable and the caller should
// return it immediately
func isFatal(err error) bool {
return goerrors.Is(err, context.Canceled) ||
goerrors.Is(err, context.DeadlineExceeded) ||
errors.IsUnrecoverable(err)
}
17 changes: 12 additions & 5 deletions relayer/message_attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ func (r *Relayer) attestMessages(ctx context.Context, processors []chain.Process
})

messagesInQueue, err := r.palomaClient.QueryMessagesForAttesting(ctx, queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to attest")
if isFatal(err) {
return err
}
// Move on to the next queue on the same chain
continue
}

logger = logger.WithFields(log.Fields{
"message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 {
Expand All @@ -52,10 +60,6 @@ func (r *Relayer) attestMessages(ctx context.Context, processors []chain.Process
})

logger.Debug("got ", len(messagesInQueue), " messages from ", queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to attest")
return err
}

if len(messagesInQueue) > 0 {
logger := logger.WithFields(log.Fields{
Expand All @@ -75,7 +79,10 @@ func (r *Relayer) attestMessages(ctx context.Context, processors []chain.Process
Error(ctx); err != nil {
logger.WithError(err).Error("failed to send Paloma status update")
}
return err

if isFatal(err) {
return err
}
}
}
}
Expand Down
22 changes: 16 additions & 6 deletions relayer/message_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce
})

messagesInQueue, err := r.palomaClient.QueryMessagesForEstimating(ctx, queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to estimate")
if isFatal(err) {
return err
}
// Move on to the next queue on the same chain
continue
}

logger = logger.WithFields(log.Fields{
"message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 {
Expand All @@ -51,10 +59,6 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce
})

logger.Debug("got ", len(messagesInQueue), " messages from ", queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to estimate")
return err
}

if len(messagesInQueue) > 0 {
logger := logger.WithFields(log.Fields{
Expand All @@ -66,7 +70,11 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce
estimates, err := p.EstimateMessages(ctx, queue.FromString(queueName), messagesInQueue)
if err != nil {
logger.WithError(err).Error("error estimating messages")
return err
if isFatal(err) {
return err
}
// Move on to the next queue on the same chain
continue
}

filteredEstimates := make([]chain.MessageWithEstimate, 0, len(estimates))
Expand All @@ -93,7 +101,9 @@ func (r *Relayer) estimateMessages(ctx context.Context, processors []chain.Proce
err = r.palomaClient.AddMessagesGasEstimate(ctx, queueName, filteredEstimates...)
if err != nil {
logger.WithError(err).Error("failed to send estimates to Paloma")
return err
if isFatal(err) {
return err
}
}
}

Expand Down
16 changes: 11 additions & 5 deletions relayer/message_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (r *Relayer) relayMessages(ctx context.Context, processors []chain.Processo
})

messagesInQueue, err := r.palomaClient.QueryMessagesForRelaying(ctx, queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to relay")
if isFatal(err) {
return err
}
// Move on to the next queue on the same chain
continue
}

logger = logger.WithFields(log.Fields{
"message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 {
Expand All @@ -78,10 +86,6 @@ func (r *Relayer) relayMessages(ctx context.Context, processors []chain.Processo
})

logger.Debug("got ", len(messagesInQueue), " messages from ", queueName)
if err != nil {
logger.WithError(err).Error("couldn't get messages to relay")
return err
}

for _, v := range messagesInQueue {
r.msgCache.records[v.ID] = struct{}{}
Expand All @@ -103,7 +107,9 @@ func (r *Relayer) relayMessages(ctx context.Context, processors []chain.Processo
return msg.ID
}),
}).Error("error relaying messages")
return err
if isFatal(err) {
return err
}
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions relayer/message_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ func (r *Relayer) signMessages(ctx context.Context, processors []chain.Processor
messagesForSigning, err := r.palomaClient.QueryMessagesForSigning(ctx, queueName)
if err != nil {
logger.Error("failed getting messages to sign")
return err
if isFatal(err) {
return err
}
// Move on to the next queue on the same chain
continue
}

logger = logger.WithFields(log.Fields{
Expand All @@ -65,6 +69,8 @@ func (r *Relayer) signMessages(ctx context.Context, processors []chain.Processor
signedMessages, err := p.SignMessages(ctx, messagesForSigning...)
if err != nil {
logger.WithError(err).Error("unable to sign messages")
// If we fail to sign this batch, we will fail to sign them
// all, so might as well return now
return err
}
logger = logger.WithFields(log.Fields{
Expand All @@ -78,7 +84,9 @@ func (r *Relayer) signMessages(ctx context.Context, processors []chain.Processor

if err = r.broadcastSignatures(ctx, queueName, signedMessages); err != nil {
logger.WithError(err).Error("couldn't broadcast signatures and process attestation")
return err
if isFatal(err) {
return err
}
}
}

Expand Down

0 comments on commit 93710fe

Please sign in to comment.