Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis val fixes #2486

Merged
merged 11 commits into from
Jul 18, 2024
2 changes: 0 additions & 2 deletions pubsub/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pubsub
import (
"context"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
)

Expand All @@ -22,7 +21,6 @@ func CreateStream(ctx context.Context, streamName string, client redis.Universal
func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool {
got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result()
if err != nil {
log.Error("Reading redis streams", "error", err)
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
return false
}
return got != nil
Expand Down
164 changes: 113 additions & 51 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -69,7 +72,7 @@ var DefaultProducerConfig = ProducerConfig{
}

var TestProducerConfig = ProducerConfig{
EnableReproduce: true,
EnableReproduce: false,
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
Expand Down Expand Up @@ -99,34 +102,69 @@ func NewProducer[Request any, Response any](client redis.UniversalClient, stream
}, nil
}

func (p *Producer[Request, Response]) errorPromisesFor(msgs []*Message[Request]) {
func (p *Producer[Request, Response]) errorPromisesFor(msgIds []string) {
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
for _, msg := range msgs {
if promise, found := p.promises[msg.ID]; found {
for _, msg := range msgIds {
if promise, found := p.promises[msg]; found {
promise.ProduceError(fmt.Errorf("internal error, consumer died while serving the request"))
delete(p.promises, msg.ID)
delete(p.promises, msg)
}
}
}

// checkAndReproduce reproduce pending messages that were sent to consumers
// that are currently inactive.
func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) time.Duration {
msgs, err := p.checkPending(ctx)
staleIds, err := p.checkPending(ctx)
if err != nil {
log.Error("Checking pending messages", "error", err)
return p.cfg.CheckPendingInterval
}
if len(msgs) == 0 {
if len(staleIds) == 0 {
return p.cfg.CheckPendingInterval
}
if !p.cfg.EnableReproduce {
p.errorPromisesFor(msgs)
return p.cfg.CheckPendingInterval
if p.cfg.EnableReproduce {
err = p.reproduceIds(ctx, staleIds)
if err != nil {
log.Warn("filed reproducing messages", "err", err)
}
} else {
p.errorPromisesFor(staleIds)
}
return p.cfg.CheckPendingInterval
}

func (p *Producer[Request, Response]) reproduceIds(ctx context.Context, staleIds []string) error {
log.Info("Attempting to claim", "messages", staleIds)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible for two clients calling reproduceIds to claim the same ids, before either client acks it? Or is it no longer claimable after one client claims it? I haven't fully figured out the redis streams model.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all - this is called from the (single) producer and not from the consumers.
As far as I understand what prevent race conditions in call is the "minIdle" - if a message was called by anyone the "idle" timer resets so the next "call" will fail.
So if the producer is trying to reproduce a message while the client is trying to write a response - one of them will "probably" fail (we shouldn't get there because producer will only try to claim a message if the worker keepalive stops)

Copy link
Collaborator

@PlasmaPower PlasmaPower Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was unaware the model of this was a single producer. We don't support running multiple stakers against a single redis stream?

Edit: just saw your later response on why we only support a single producer. I'll think about this some more.

Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: staleIds,
}).Result()
if err != nil {
return fmt.Errorf("claiming ownership on messages: %v, error: %w", staleIds, err)
}
var messages []*Message[Request]
for _, msg := range claimedMsgs {
data, ok := (msg.Values[messageKey]).(string)
if !ok {
return fmt.Errorf("casting request: %v to bytes", msg.Values[messageKey])
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
return fmt.Errorf("marshaling value: %v, error: %w", msg.Values[messageKey], err)
}
messages = append(messages, &Message[Request]{
ID: msg.ID,
Value: req,
})
}

acked := make(map[string]Request)
for _, msg := range msgs {
for _, msg := range messages {
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also continue if this returns zero, meaning it wasn't successfully ack'd?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking in on this again, we seem to still be ignoring the int returned by XAck

log.Error("ACKing message", "error", err)
continue
Expand All @@ -140,29 +178,81 @@ func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) tim
log.Error("Re-inserting pending messages with inactive consumers", "error", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we merge this acked loop into the previous loop? It looks like we can't merge all the loops together because the first loop might exit early to return an error, but these two last loops should be mergeable I think since neither exits early.

Also, while unlikely, couldn't a request get dropped inbetween when it gets acknowledged and when it gets reproduced?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging loops - done.

Message Dropped - I fixed the edge case by avoiding the error in reproduce. There will be a new promise + request created that no-body will read but it should behave o.k.

}
}
return p.cfg.CheckPendingInterval
return nil
}

func setMinIdInt(min *[2]uint64, id string) error {
idParts := strings.Split(id, "-")
if len(idParts) != 2 {
return errors.New("invalid i.d")
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
}
idTimeStamp, err := strconv.ParseUint(idParts[0], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d ts: %w", err)
}
if idTimeStamp > min[0] {
return nil
}
idSerial, err := strconv.ParseUint(idParts[1], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d serial: %w", err)
}
if idTimeStamp < min[0] {
min[0] = idTimeStamp
min[1] = idSerial
return nil
}
// idTimeStamp == min[0]
if idSerial < min[1] {
min[1] = idSerial
}
return nil
}

// checkResponses checks iteratively whether response for the promise is ready.
func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.Duration {
minIdInt := [2]uint64{math.MaxUint64, math.MaxUint64}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
responded := 0
errored := 0
for id, promise := range p.promises {
if ctx.Err() != nil {
return 0
}
res, err := p.client.Get(ctx, id).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
continue
errSetId := setMinIdInt(&minIdInt, id)
if errSetId != nil {
log.Error("error setting minId", "err", err)
return p.cfg.CheckResultInterval
}
log.Error("Error reading value in redis", "key", id, "error", err)
if !errors.Is(err, redis.Nil) {
log.Error("Error reading value in redis", "key", id, "error", err)
}
continue
}
var resp Response
if err := json.Unmarshal([]byte(res), &resp); err != nil {
promise.ProduceError(fmt.Errorf("error unmarshalling: %w", err))
log.Error("Error unmarshaling", "value", res, "error", err)
continue
errored++
} else {
promise.Produce(resp)
responded++
}
promise.Produce(resp)
delete(p.promises, id)
}
var trimmed int64
var trimErr error
minId := "+"
if minIdInt[0] < math.MaxUint64 {
minId = fmt.Sprintf("%d-%d", minIdInt[0], minIdInt[1])
trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, minId).Result()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this potentially prune others' requests because they're not in our p.promises map?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short answer: it would.
Long answer:
Generally, we only expect to use this for arbitrator validations, and we only expect to need one producer in the system - otherwise would be wasting work.
Current approach is aligned for single nitro/multiple workers.
I have added the "prefix" field to allow multiple nitro nodes to use the same redis. That would require each to have their pool of workers and their own queue - not great but I don't think we'll really use it.

I think if we go for multi-producer-multi-consumer queue - there are two main options:

  1. create a mechanism for syncing the multiple producers, marking which messages are done, walking the queue and checking what to trim.
  2. define the queue with a max number of entries, add a mechanism for the producers to re-send the same message if one was trimmed before it got into a worker
    Both add complexity. 1st adds more complexity, 2nd makes redis size larger during work.

Considering the alternatives, I think the single-producer version makes sense at least for now.

} else {
trimmed, trimErr = p.client.XTrimMaxLen(ctx, p.redisStream, 0).Result()
}
log.Trace("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr)
return p.cfg.CheckResultInterval
}

Expand All @@ -184,15 +274,17 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
if err != nil {
return nil, fmt.Errorf("marshaling value: %w", err)
}
// catching the promiseLock before we sendXadd makes sure promise ids will
// be always ascending
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
id, err := p.client.XAdd(ctx, &redis.XAddArgs{
Stream: p.redisStream,
Values: map[string]any{messageKey: val},
}).Result()
if err != nil {
return nil, fmt.Errorf("adding values to redis: %w", err)
}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
promise := p.promises[oldKey]
if oldKey != "" && promise == nil {
// This will happen if the old consumer became inactive but then ack_d
Expand Down Expand Up @@ -232,7 +324,7 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {
return found
}

func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) {
func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]string, error) {
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Expand Down Expand Up @@ -265,35 +357,5 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
}
ids = append(ids, msg.ID)
}
if len(ids) == 0 {
log.Trace("There are no pending messages with inactive consumers")
return nil, nil
}
log.Info("Attempting to claim", "messages", ids)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: ids,
}).Result()
if err != nil {
return nil, fmt.Errorf("claiming ownership on messages: %v, error: %w", ids, err)
}
var res []*Message[Request]
for _, msg := range claimedMsgs {
data, ok := (msg.Values[messageKey]).(string)
if !ok {
return nil, fmt.Errorf("casting request: %v to bytes", msg.Values[messageKey])
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("marshaling value: %v, error: %w", msg.Values[messageKey], err)
}
res = append(res, &Message[Request]{
ID: msg.ID,
Value: req,
})
}
return res, nil
return ids, nil
}
Loading
Loading