Skip to content

Commit

Permalink
Add temporary error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Sneagan committed Aug 11, 2022
1 parent e86b39c commit 2368408
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 64 deletions.
13 changes: 2 additions & 11 deletions kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

batgo_kafka "github.com/brave-intl/bat-go/utils/kafka"
"github.com/brave-intl/challenge-bypass-server/server"
"github.com/brave-intl/challenge-bypass-server/utils"
uuid "github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/segmentio/kafka-go"
Expand All @@ -19,7 +20,7 @@ var brokers []string

// Processor is an interface that represents functions which can be used to process kafka
// messages in our pipeline.
type Processor func([]byte, *kafka.Writer, *server.Server, *zerolog.Logger) error
type Processor func([]byte, *kafka.Writer, *server.Server, *zerolog.Logger) *utils.ProcessingError

// TopicMapping represents a kafka topic, how to process it, and where to emit the result.
type TopicMapping struct {
Expand Down Expand Up @@ -122,16 +123,6 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error
}
}
}

// The below block will close the producer connection when the error threshold is reached.
// @TODO: Test to determine if this Close() impacts the other goroutines that were passed
// the same topicMappings before re-enabling this block.
//for _, topicMapping := range topicMappings {
// logger.Trace().Msg(fmt.Sprintf("Closing producer connection %v", topicMapping))
// if err := topicMapping.ResultProducer.Close(); err != nil {
// logger.Error().Msg(fmt.Sprintf("Failed to close writer: %e", err))
// }
//}
}(topicMappings)
}

Expand Down
109 changes: 88 additions & 21 deletions kafka/signed_blinded_token_issuer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
avroSchema "github.com/brave-intl/challenge-bypass-server/avro/generated"
"github.com/brave-intl/challenge-bypass-server/btd"
cbpServer "github.com/brave-intl/challenge-bypass-server/server"
"github.com/brave-intl/challenge-bypass-server/utils"
"github.com/rs/zerolog"
"github.com/segmentio/kafka-go"
)
Expand All @@ -19,7 +20,7 @@ import (
// @TODO: It would be better for the Server implementation and the Kafka implementation of
// this behavior to share utility functions rather than passing an instance of the server
// as an argument here. That will require a bit of refactoring.
func SignedBlindedTokenIssuerHandler(data []byte, producer *kafka.Writer, server *cbpServer.Server, log *zerolog.Logger) error {
func SignedBlindedTokenIssuerHandler(data []byte, producer *kafka.Writer, server *cbpServer.Server, log *zerolog.Logger) *utils.ProcessingError {
const (
issuerOk = 0
issuerInvalid = 1
Expand All @@ -28,7 +29,11 @@ func SignedBlindedTokenIssuerHandler(data []byte, producer *kafka.Writer, server

blindedTokenRequestSet, err := avroSchema.DeserializeSigningRequestSet(bytes.NewReader(data))
if err != nil {
return fmt.Errorf("request %s: failed avro deserialization: %w", blindedTokenRequestSet.Request_id, err)
message := fmt.Sprintf(
"Request %s: Failed Avro deserialization",
blindedTokenRequestSet.Request_id,
)
return utils.ProcessingErrorFromErrorWithMessage(err, message, log)
}

logger := log.With().Str("request_id", blindedTokenRequestSet.Request_id).Logger()
Expand All @@ -37,9 +42,11 @@ func SignedBlindedTokenIssuerHandler(data []byte, producer *kafka.Writer, server
if len(blindedTokenRequestSet.Data) > 1 {
// NOTE: When we start supporting multiple requests we will need to review
// errors and return values as well.
return fmt.Errorf(`request %s: data array unexpectedly contained more than a single message. this array is
intended to make future extension easier, but no more than a single value is currently expected`,
blindedTokenRequestSet.Request_id)
message := fmt.Sprintf(
"Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.",
blindedTokenRequestSet.Request_id,
)
return utils.ProcessingErrorFromErrorWithMessage(err, message, &logger)
}

OUTER:
Expand Down Expand Up @@ -147,15 +154,28 @@ OUTER:

marshaledDLEQProof, err := DLEQProof.MarshalText()
if err != nil {
return fmt.Errorf("request %s: could not marshal dleq proof: %w", blindedTokenRequestSet.Request_id, err)
message := fmt.Sprintf("request %s: could not marshal dleq proof: %s", blindedTokenRequestSet.Request_id, err)
temporary, backoff := utils.ErrorIsTemporary(err, &logger)
return &utils.ProcessingError{
OriginalError: err,
FailureMessage: message,
Temporary: temporary,
Backoff: backoff,
}
}

var marshalledBlindedTokens []string
for _, token := range blindedTokensSlice {
marshaledToken, err := token.MarshalText()
if err != nil {
return fmt.Errorf("request %s: could not marshal blinded token slice to bytes: %w",
blindedTokenRequestSet.Request_id, err)
message := fmt.Sprintf("request %s: could not marshal blinded token slice to bytes: %s", blindedTokenRequestSet.Request_id, err)
temporary, backoff := utils.ErrorIsTemporary(err, &logger)
return &utils.ProcessingError{
OriginalError: err,
FailureMessage: message,
Temporary: temporary,
Backoff: backoff,
}
}
marshalledBlindedTokens = append(marshalledBlindedTokens, string(marshaledToken[:]))
}
Expand All @@ -164,17 +184,29 @@ OUTER:
for _, token := range signedTokens {
marshaledToken, err := token.MarshalText()
if err != nil {
return fmt.Errorf("request %s: could not marshal new tokens to bytes: %w",
blindedTokenRequestSet.Request_id, err)
message := fmt.Sprintf("request %s: could not marshal new tokens to bytes: %s", blindedTokenRequestSet.Request_id, err)
temporary, backoff := utils.ErrorIsTemporary(err, &logger)
return &utils.ProcessingError{
OriginalError: err,
FailureMessage: message,
Temporary: temporary,
Backoff: backoff,
}
}
marshaledSignedTokens = append(marshaledSignedTokens, string(marshaledToken[:]))
}

publicKey := signingKey.PublicKey()
marshaledPublicKey, err := publicKey.MarshalText()
if err != nil {
return fmt.Errorf("request %s: could not marshal signing key: %w",
blindedTokenRequestSet.Request_id, err)
message := fmt.Sprintf("request %s: could not marshal signing key: %s", blindedTokenRequestSet.Request_id, err)
temporary, backoff := utils.ErrorIsTemporary(err, &logger)
return &utils.ProcessingError{
OriginalError: err,
FailureMessage: message,
Temporary: temporary,
Backoff: backoff,
}
}

blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResultV2{
Expand Down Expand Up @@ -212,16 +244,29 @@ OUTER:

marshaledDLEQProof, err := DLEQProof.MarshalText()
if err != nil {
return fmt.Errorf("request %s: could not marshal dleq proof: %w",
message := fmt.Sprintf("request %s: could not marshal dleq proof: %s",
blindedTokenRequestSet.Request_id, err)
temporary, backoff := utils.ErrorIsTemporary(err, &logger)
return &utils.ProcessingError{
OriginalError: err,
FailureMessage: message,
Temporary: temporary,
Backoff: backoff,
}
}

var marshalledBlindedTokens []string
for _, token := range blindedTokens {
marshaledToken, err := token.MarshalText()
if err != nil {
return fmt.Errorf("request %s: could not marshal blinded token slice to bytes: %w",
blindedTokenRequestSet.Request_id, err)
message := fmt.Sprintf("request %s: could not marshal blinded token slice to bytes: %s", blindedTokenRequestSet.Request_id, err)
temporary, backoff := utils.ErrorIsTemporary(err, &logger)
return &utils.ProcessingError{
OriginalError: err,
FailureMessage: message,
Temporary: temporary,
Backoff: backoff,
}
}
marshalledBlindedTokens = append(marshalledBlindedTokens, string(marshaledToken[:]))
}
Expand All @@ -230,15 +275,29 @@ OUTER:
for _, token := range signedTokens {
marshaledToken, err := token.MarshalText()
if err != nil {
return fmt.Errorf("error could not marshal new tokens to bytes: %w", err)
message := fmt.Sprintf("error could not marshal new tokens to bytes: %s", err)
temporary, backoff := utils.ErrorIsTemporary(err, &logger)
return &utils.ProcessingError{
OriginalError: err,
FailureMessage: message,
Temporary: temporary,
Backoff: backoff,
}
}
marshaledSignedTokens = append(marshaledSignedTokens, string(marshaledToken[:]))
}

publicKey := signingKey.PublicKey()
marshaledPublicKey, err := publicKey.MarshalText()
if err != nil {
return fmt.Errorf("error could not marshal signing key: %w", err)
message := fmt.Sprintf("error could not marshal signing key: %s", err)
temporary, backoff := utils.ErrorIsTemporary(err, &logger)
return &utils.ProcessingError{
OriginalError: err,
FailureMessage: message,
Temporary: temporary,
Backoff: backoff,
}
}

blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResultV2{
Expand All @@ -260,14 +319,22 @@ OUTER:
var resultSetBuffer bytes.Buffer
err = resultSet.Serialize(&resultSetBuffer)
if err != nil {
return fmt.Errorf("request %s: failed to serialize result set: %s: %w",
blindedTokenRequestSet.Request_id, resultSetBuffer.String(), err)
message := fmt.Sprintf(
"Request %s: Failed to serialize ResultSet: %+v",
blindedTokenRequestSet.Request_id,
resultSet,
)
return utils.ProcessingErrorFromErrorWithMessage(err, message, &logger)
}

err = Emit(producer, resultSetBuffer.Bytes(), log)
if err != nil {
return fmt.Errorf("request %s: failed to emit results to topic %s: %w",
blindedTokenRequestSet.Request_id, producer.Topic, err)
message := fmt.Sprintf(
"Request %s: Failed to emit results to topic %s",
blindedTokenRequestSet.Request_id,
producer.Topic,
)
return utils.ProcessingErrorFromErrorWithMessage(err, message, &logger)
}

return nil
Expand Down
51 changes: 34 additions & 17 deletions kafka/signed_token_redeem_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
avroSchema "github.com/brave-intl/challenge-bypass-server/avro/generated"
"github.com/brave-intl/challenge-bypass-server/btd"
cbpServer "github.com/brave-intl/challenge-bypass-server/server"
"github.com/brave-intl/challenge-bypass-server/utils"
"github.com/rs/zerolog"
kafka "github.com/segmentio/kafka-go"
)
Expand All @@ -21,7 +22,7 @@ func SignedTokenRedeemHandler(
producer *kafka.Writer,
server *cbpServer.Server,
logger *zerolog.Logger,
) error {
) *utils.ProcessingError {
const (
redeemOk = 0
redeemDuplicateRedemption = 1
Expand All @@ -30,7 +31,8 @@ func SignedTokenRedeemHandler(
)
tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data))
if err != nil {
return fmt.Errorf("request %s: failed avro deserialization: %w", tokenRedeemRequestSet.Request_id, err)
message := fmt.Sprintf("Request %s: Failed Avro deserialization", tokenRedeemRequestSet.Request_id)
return utils.ProcessingErrorFromErrorWithMessage(err, message, logger)
}
defer func() {
if recover() != nil {
Expand All @@ -43,17 +45,19 @@ func SignedTokenRedeemHandler(
if len(tokenRedeemRequestSet.Data) > 1 {
// NOTE: When we start supporting multiple requests we will need to review
// errors and return values as well.
return fmt.Errorf("request %s: data array unexpectedly contained more than a single message. this array is intended to make future extension easier, but no more than a single value is currently expected", tokenRedeemRequestSet.Request_id)
message := fmt.Sprintf("Request %s: Data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected.", tokenRedeemRequestSet.Request_id)
return utils.ProcessingErrorFromErrorWithMessage(err, message, logger)
}
issuers, err := server.FetchAllIssuers()
if err != nil {
return fmt.Errorf("request %s: failed to fetch all issuers: %w", tokenRedeemRequestSet.Request_id, err)
message := fmt.Sprintf("Request %s: Failed to fetch all issuers", tokenRedeemRequestSet.Request_id)
return utils.ProcessingErrorFromErrorWithMessage(err, message, logger)
}
for _, request := range tokenRedeemRequestSet.Data {
var (
verified = false
verifiedIssuer = &cbpServer.Issuer{}
verifiedCohort int32 = 0
verified = false
verifiedIssuer = &cbpServer.Issuer{}
verifiedCohort int32
)
if request.Public_key == "" {
logger.Error().
Expand Down Expand Up @@ -84,14 +88,14 @@ func SignedTokenRedeemHandler(
tokenPreimage := crypto.TokenPreimage{}
err = tokenPreimage.UnmarshalText([]byte(request.Token_preimage))
if err != nil {
return fmt.Errorf("request %s: could not unmarshal text into preimage: %w",
tokenRedeemRequestSet.Request_id, err)
message := fmt.Sprintf("Request %s: Could not unmarshal text into preimage", tokenRedeemRequestSet.Request_id)
return utils.ProcessingErrorFromErrorWithMessage(err, message, logger)
}
verificationSignature := crypto.VerificationSignature{}
err = verificationSignature.UnmarshalText([]byte(request.Signature))
if err != nil {
return fmt.Errorf("request %s: could not unmarshal text into verification signature: %w",
tokenRedeemRequestSet.Request_id, err)
message := fmt.Sprintf("Request %s: Could not unmarshal text into verification signature", tokenRedeemRequestSet.Request_id)
return utils.ProcessingErrorFromErrorWithMessage(err, message, logger)
}
for _, issuer := range *issuers {
if !issuer.ExpiresAt.IsZero() && issuer.ExpiresAt.Before(time.Now()) {
Expand All @@ -117,9 +121,16 @@ func SignedTokenRedeemHandler(
// Only attempt token verification with the issuer that was provided.
issuerPublicKey := signingKey.PublicKey()
marshaledPublicKey, err := issuerPublicKey.MarshalText()
// Unmarshaling failure is a data issue and is probably permanent.
if err != nil {
return fmt.Errorf("request %s: could not unmarshal issuer public key into text: %w",
tokenRedeemRequestSet.Request_id, err)
message := fmt.Sprintf("Request %s: Could not unmarshal issuer public key into text", tokenRedeemRequestSet.Request_id)
temporary, backoff := utils.ErrorIsTemporary(err, logger)
return &utils.ProcessingError{
OriginalError: err,
FailureMessage: message,
Temporary: temporary,
Backoff: backoff,
}
}

logger.Trace().
Expand Down Expand Up @@ -200,14 +211,20 @@ func SignedTokenRedeemHandler(
var resultSetBuffer bytes.Buffer
err = resultSet.Serialize(&resultSetBuffer)
if err != nil {
return fmt.Errorf("request %s: failed to serialize result set: %w",
tokenRedeemRequestSet.Request_id, err)
message := fmt.Sprintf("Request %s: Could not unmarshal issuer public key into text", tokenRedeemRequestSet.Request_id)
return utils.ProcessingErrorFromErrorWithMessage(err, message, logger)
}

err = Emit(producer, resultSetBuffer.Bytes(), logger)
if err != nil {
return fmt.Errorf("request %s: failed to emit results to topic %s: %w",
tokenRedeemRequestSet.Request_id, producer.Topic, err)
message := fmt.Sprintf("Request %s: Failed to emit results to topic %s", tokenRedeemRequestSet.Request_id, producer.Topic)
temporary, backoff := utils.ErrorIsTemporary(err, logger)
return &utils.ProcessingError{
OriginalError: err,
FailureMessage: message,
Temporary: temporary,
Backoff: backoff,
}
}
return nil
}
Loading

0 comments on commit 2368408

Please sign in to comment.