Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis val fixes #2486

Merged
merged 11 commits into from
Jul 18, 2024
5 changes: 4 additions & 1 deletion pubsub/common.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package pubsub

import (
"context"
"strings"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
@@ -22,7 +23,9 @@ func CreateStream(ctx context.Context, streamName string, client redis.Universal
func StreamExists(ctx context.Context, streamName string, client redis.UniversalClient) bool {
got, err := client.Do(ctx, "XINFO", "STREAM", streamName).Result()
if err != nil {
log.Error("Reading redis streams", "error", err)
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
if !strings.Contains(err.Error(), "no such key") {
log.Error("redis error", "err", err, "searching stream", streamName)
}
return false
}
return got != nil
182 changes: 121 additions & 61 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,9 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

@@ -59,27 +62,31 @@ type ProducerConfig struct {
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
// Interval duration for checking the result set by consumers.
CheckResultInterval time.Duration `koanf:"check-result-interval"`
CheckPendingItems int64 `koanf:"check-pending-items"`
}

var DefaultProducerConfig = ProducerConfig{
EnableReproduce: true,
CheckPendingInterval: time.Second,
KeepAliveTimeout: 5 * time.Minute,
CheckResultInterval: 5 * time.Second,
CheckPendingItems: 256,
}

var TestProducerConfig = ProducerConfig{
EnableReproduce: true,
EnableReproduce: false,
CheckPendingInterval: 10 * time.Millisecond,
KeepAliveTimeout: 100 * time.Millisecond,
CheckResultInterval: 5 * time.Millisecond,
CheckPendingItems: 256,
}

func ProducerAddConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable-reproduce", DefaultProducerConfig.EnableReproduce, "when enabled, messages with dead consumer will be re-inserted into the stream")
f.Duration(prefix+".check-pending-interval", DefaultProducerConfig.CheckPendingInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".check-result-interval", DefaultProducerConfig.CheckResultInterval, "interval in which producer checks pending messages whether consumer processing them is inactive")
f.Duration(prefix+".keepalive-timeout", DefaultProducerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.Int64(prefix+".check-pending-items", DefaultProducerConfig.CheckPendingItems, "items to screen during check-pending")
}

func NewProducer[Request any, Response any](client redis.UniversalClient, streamName string, cfg *ProducerConfig) (*Producer[Request, Response], error) {
@@ -99,70 +106,146 @@ func NewProducer[Request any, Response any](client redis.UniversalClient, stream
}, nil
}

func (p *Producer[Request, Response]) errorPromisesFor(msgs []*Message[Request]) {
func (p *Producer[Request, Response]) errorPromisesFor(msgIds []string) {
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
for _, msg := range msgs {
if promise, found := p.promises[msg.ID]; found {
for _, msg := range msgIds {
if promise, found := p.promises[msg]; found {
promise.ProduceError(fmt.Errorf("internal error, consumer died while serving the request"))
delete(p.promises, msg.ID)
delete(p.promises, msg)
}
}
}

// checkAndReproduce reproduce pending messages that were sent to consumers
// that are currently inactive.
func (p *Producer[Request, Response]) checkAndReproduce(ctx context.Context) time.Duration {
msgs, err := p.checkPending(ctx)
staleIds, err := p.checkPending(ctx)
if err != nil {
log.Error("Checking pending messages", "error", err)
return p.cfg.CheckPendingInterval
}
if len(msgs) == 0 {
if len(staleIds) == 0 {
return p.cfg.CheckPendingInterval
}
if !p.cfg.EnableReproduce {
p.errorPromisesFor(msgs)
return p.cfg.CheckPendingInterval
if p.cfg.EnableReproduce {
err = p.reproduceIds(ctx, staleIds)
if err != nil {
log.Warn("filed reproducing messages", "err", err)
}
} else {
p.errorPromisesFor(staleIds)
}
acked := make(map[string]Request)
for _, msg := range msgs {
return p.cfg.CheckPendingInterval
}

func (p *Producer[Request, Response]) reproduceIds(ctx context.Context, staleIds []string) error {
log.Info("Attempting to claim", "messages", staleIds)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible for two clients calling reproduceIds to claim the same ids, before either client acks it? Or is it no longer claimable after one client claims it? I haven't fully figured out the redis streams model.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all - this is called from the (single) producer and not from the consumers.
As far as I understand what prevent race conditions in call is the "minIdle" - if a message was called by anyone the "idle" timer resets so the next "call" will fail.
So if the producer is trying to reproduce a message while the client is trying to write a response - one of them will "probably" fail (we shouldn't get there because producer will only try to claim a message if the worker keepalive stops)

Copy link
Contributor

@PlasmaPower PlasmaPower Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was unaware the model of this was a single producer. We don't support running multiple stakers against a single redis stream?

Edit: just saw your later response on why we only support a single producer. I'll think about this some more.

Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: staleIds,
}).Result()
if err != nil {
return fmt.Errorf("claiming ownership on messages: %v, error: %w", staleIds, err)
}
for _, msg := range claimedMsgs {
data, ok := (msg.Values[messageKey]).(string)
if !ok {
log.Error("redis producer reproduce: message not string", "id", msg.ID, "value", msg.Values[messageKey])
continue
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
log.Error("redis producer reproduce: message not a request", "id", msg.ID, "err", err, "value", msg.Values[messageKey])
continue
}
if _, err := p.client.XAck(ctx, p.redisStream, p.redisGroup, msg.ID).Result(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also continue if this returns zero, meaning it wasn't successfully ack'd?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking in on this again, we seem to still be ignoring the int returned by XAck

log.Error("ACKing message", "error", err)
log.Error("redis producer reproduce: could not ACK", "id", msg.ID, "err", err)
continue
}
acked[msg.ID] = msg.Value
}
for k, v := range acked {
// Only re-insert messages that were removed the the pending list first.
_, err := p.reproduce(ctx, v, k)
if err != nil {
log.Error("Re-inserting pending messages with inactive consumers", "error", err)
if _, err := p.reproduce(ctx, req, msg.ID); err != nil {
log.Error("redis producer reproduce: error", "err", err)
}
}
return p.cfg.CheckPendingInterval
return nil
}

func setMinIdInt(min *[2]uint64, id string) error {
idParts := strings.Split(id, "-")
if len(idParts) != 2 {
return fmt.Errorf("invalid i.d: %v", id)
}
idTimeStamp, err := strconv.ParseUint(idParts[0], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d: %v err: %w", id, err)
}
if idTimeStamp > min[0] {
return nil
}
idSerial, err := strconv.ParseUint(idParts[1], 10, 64)
if err != nil {
return fmt.Errorf("invalid i.d serial: %v err: %w", id, err)
}
if idTimeStamp < min[0] {
min[0] = idTimeStamp
min[1] = idSerial
return nil
}
// idTimeStamp == min[0]
if idSerial < min[1] {
min[1] = idSerial
}
return nil
}

// checkResponses checks iteratively whether response for the promise is ready.
func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.Duration {
minIdInt := [2]uint64{math.MaxUint64, math.MaxUint64}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
responded := 0
errored := 0
for id, promise := range p.promises {
if ctx.Err() != nil {
return 0
}
res, err := p.client.Get(ctx, id).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
continue
errSetId := setMinIdInt(&minIdInt, id)
if errSetId != nil {
log.Error("error setting minId", "err", err)
return p.cfg.CheckResultInterval
}
if !errors.Is(err, redis.Nil) {
log.Error("Error reading value in redis", "key", id, "error", err)
}
log.Error("Error reading value in redis", "key", id, "error", err)
continue
}
var resp Response
if err := json.Unmarshal([]byte(res), &resp); err != nil {
promise.ProduceError(fmt.Errorf("error unmarshalling: %w", err))
log.Error("Error unmarshaling", "value", res, "error", err)
continue
errored++
} else {
promise.Produce(resp)
responded++
}
promise.Produce(resp)
delete(p.promises, id)
}
var trimmed int64
var trimErr error
minId := "+"
if minIdInt[0] < math.MaxUint64 {
minId = fmt.Sprintf("%d-%d", minIdInt[0], minIdInt[1])
trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, minId).Result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this potentially prune others' requests because they're not in our p.promises map?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short answer: it would.
Long answer:
Generally, we only expect to use this for arbitrator validations, and we only expect to need one producer in the system - otherwise would be wasting work.
Current approach is aligned for single nitro/multiple workers.
I have added the "prefix" field to allow multiple nitro nodes to use the same redis. That would require each to have their pool of workers and their own queue - not great but I don't think we'll really use it.

I think if we go for multi-producer-multi-consumer queue - there are two main options:

  1. create a mechanism for syncing the multiple producers, marking which messages are done, walking the queue and checking what to trim.
  2. define the queue with a max number of entries, add a mechanism for the producers to re-send the same message if one was trimmed before it got into a worker
    Both add complexity. 1st adds more complexity, 2nd makes redis size larger during work.

Considering the alternatives, I think the single-producer version makes sense at least for now.

} else {
trimmed, trimErr = p.client.XTrimMaxLen(ctx, p.redisStream, 0).Result()
}
log.Trace("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr)
return p.cfg.CheckResultInterval
}

@@ -184,20 +267,23 @@ func (p *Producer[Request, Response]) reproduce(ctx context.Context, value Reque
if err != nil {
return nil, fmt.Errorf("marshaling value: %w", err)
}
// catching the promiseLock before we sendXadd makes sure promise ids will
// be always ascending
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
id, err := p.client.XAdd(ctx, &redis.XAddArgs{
Stream: p.redisStream,
Values: map[string]any{messageKey: val},
}).Result()
if err != nil {
return nil, fmt.Errorf("adding values to redis: %w", err)
}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
promise := p.promises[oldKey]
if oldKey != "" && promise == nil {
// This will happen if the old consumer became inactive but then ack_d
// the message afterwards.
return nil, fmt.Errorf("error reproducing the message, could not find existing one")
// don't error
log.Warn("tried reproducing a message but it wasn't found - probably got response", "oldKey", oldKey)
}
if oldKey == "" || promise == nil {
pr := containers.NewPromise[Response](nil)
@@ -232,13 +318,14 @@ func (p *Producer[Request, Response]) havePromiseFor(messageID string) bool {
return found
}

func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Message[Request], error) {
// returns ids of pending messages that's worker doesn't appear alive
func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]string, error) {
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
pendingMessages, err := p.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Start: "-",
End: "+",
Count: 100,
Count: p.cfg.CheckPendingItems,
}).Result()

if err != nil && !errors.Is(err, redis.Nil) {
@@ -247,6 +334,9 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
if len(pendingMessages) == 0 {
return nil, nil
}
if len(pendingMessages) >= int(p.cfg.CheckPendingItems) {
log.Warn("redis producer: many pending items found", "stream", p.redisStream, "check-pending-items", p.cfg.CheckPendingItems)
}
// IDs of the pending messages with inactive consumers.
var ids []string
active := make(map[string]bool)
@@ -265,35 +355,5 @@ func (p *Producer[Request, Response]) checkPending(ctx context.Context) ([]*Mess
}
ids = append(ids, msg.ID)
}
if len(ids) == 0 {
log.Trace("There are no pending messages with inactive consumers")
return nil, nil
}
log.Info("Attempting to claim", "messages", ids)
claimedMsgs, err := p.client.XClaim(ctx, &redis.XClaimArgs{
Stream: p.redisStream,
Group: p.redisGroup,
Consumer: p.id,
MinIdle: p.cfg.KeepAliveTimeout,
Messages: ids,
}).Result()
if err != nil {
return nil, fmt.Errorf("claiming ownership on messages: %v, error: %w", ids, err)
}
var res []*Message[Request]
for _, msg := range claimedMsgs {
data, ok := (msg.Values[messageKey]).(string)
if !ok {
return nil, fmt.Errorf("casting request: %v to bytes", msg.Values[messageKey])
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("marshaling value: %v, error: %w", msg.Values[messageKey], err)
}
res = append(res, &Message[Request]{
ID: msg.ID,
Value: req,
})
}
return res, nil
return ids, nil
}
114 changes: 46 additions & 68 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -49,10 +49,12 @@ type configOpt interface {
apply(consCfg *ConsumerConfig, prodCfg *ProducerConfig)
}

type disableReproduce struct{}
type withReproduce struct {
reproduce bool
}

func (e *disableReproduce) apply(_ *ConsumerConfig, prodCfg *ProducerConfig) {
prodCfg.EnableReproduce = false
func (e *withReproduce) apply(_ *ConsumerConfig, prodCfg *ProducerConfig) {
prodCfg.EnableReproduce = e.reproduce
}

func producerCfg() *ProducerConfig {
@@ -61,6 +63,7 @@ func producerCfg() *ProducerConfig {
CheckPendingInterval: TestProducerConfig.CheckPendingInterval,
KeepAliveTimeout: TestProducerConfig.KeepAliveTimeout,
CheckResultInterval: TestProducerConfig.CheckResultInterval,
CheckPendingItems: TestProducerConfig.CheckPendingItems,
}
}

@@ -71,7 +74,7 @@ func consumerCfg() *ConsumerConfig {
}
}

func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (*Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) {
func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt) (redis.UniversalClient, string, *Producer[testRequest, testResponse], []*Consumer[testRequest, testResponse]) {
t.Helper()
redisClient, err := redisutil.RedisClientFromURL(redisutil.CreateTestRedis(ctx, t))
if err != nil {
@@ -107,7 +110,7 @@ func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt)
log.Debug("Error deleting heartbeat keys", "error", err)
}
})
return producer, consumers
return redisClient, streamName, producer, consumers
}

func messagesMaps(n int) []map[string]string {
@@ -118,10 +121,14 @@ func messagesMaps(n int) []map[string]string {
return ret
}

func msgForIndex(idx int) string {
return fmt.Sprintf("msg: %d", idx)
}

func wantMessages(n int) []string {
var ret []string
for i := 0; i < n; i++ {
ret = append(ret, fmt.Sprintf("msg: %d", i))
ret = append(ret, msgForIndex(i))
}
sort.Strings(ret)
return ret
@@ -148,26 +155,25 @@ func produceMessages(ctx context.Context, msgs []string, producer *Producer[test
return promises, nil
}

func awaitResponses(ctx context.Context, promises []*containers.Promise[testResponse]) ([]string, error) {
func awaitResponses(ctx context.Context, promises []*containers.Promise[testResponse]) ([]string, []int) {
var (
responses []string
errs []error
errs []int
)
for _, p := range promises {
for idx, p := range promises {
res, err := p.Await(ctx)
if err != nil {
errs = append(errs, err)
errs = append(errs, idx)
continue
}
responses = append(responses, res.Response)
}
return responses, errors.Join(errs...)
return responses, errs
}

// consume messages from every consumer except stopped ones.
func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testRequest, testResponse]) ([]map[string]string, [][]string) {
func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testRequest, testResponse], gotMessages []map[string]string) [][]string {
t.Helper()
gotMessages := messagesMaps(consumersCount)
wantResponses := make([][]string, consumersCount)
for idx := 0; idx < consumersCount; idx++ {
if consumers[idx].Stopped() {
@@ -199,7 +205,7 @@ func consume(ctx context.Context, t *testing.T, consumers []*Consumer[testReques
}
})
}
return gotMessages, wantResponses
return wantResponses
}

func TestRedisProduce(t *testing.T) {
@@ -208,43 +214,56 @@ func TestRedisProduce(t *testing.T) {
for _, tc := range []struct {
name string
killConsumers bool
autoRecover bool
}{
{
name: "all consumers are active",
killConsumers: false,
autoRecover: false,
},
{
name: "some consumers killed, others should take over their work",
killConsumers: true,
autoRecover: true,
},
{
name: "some consumers killed, should return failure",
killConsumers: true,
autoRecover: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
producer, consumers := newProducerConsumers(ctx, t)
redisClient, streamName, producer, consumers := newProducerConsumers(ctx, t, &withReproduce{tc.autoRecover})
producer.Start(ctx)
wantMsgs := wantMessages(messagesCount)
promises, err := produceMessages(ctx, wantMsgs, producer)
if err != nil {
t.Fatalf("Error producing messages: %v", err)
}
gotMessages := messagesMaps(len(consumers))
if tc.killConsumers {
// Consumer messages in every third consumer but don't ack them to check
// that other consumers will claim ownership on those messages.
for i := 0; i < len(consumers); i += 3 {
consumers[i].Start(ctx)
if _, err := consumers[i].Consume(ctx); err != nil {
req, err := consumers[i].Consume(ctx)
if err != nil {
t.Errorf("Error consuming message: %v", err)
}
if !tc.autoRecover {
gotMessages[i][req.ID] = req.Value.Request
}
consumers[i].StopAndWait()
}

}
time.Sleep(time.Second)
gotMessages, wantResponses := consume(ctx, t, consumers)
gotResponses, err := awaitResponses(ctx, promises)
if err != nil {
t.Fatalf("Error awaiting responses: %v", err)
wantResponses := consume(ctx, t, consumers, gotMessages)
gotResponses, errIndexes := awaitResponses(ctx, promises)
if len(errIndexes) != 0 && tc.autoRecover {
t.Fatalf("Error awaiting responses: %v", errIndexes)
}
producer.StopAndWait()
for _, c := range consumers {
@@ -254,7 +273,6 @@ func TestRedisProduce(t *testing.T) {
if err != nil {
t.Fatalf("mergeMaps() unexpected error: %v", err)
}

if diff := cmp.Diff(wantMsgs, got); diff != "" {
t.Errorf("Unexpected diff (-want +got):\n%s\n", diff)
}
@@ -266,57 +284,17 @@ func TestRedisProduce(t *testing.T) {
if cnt := producer.promisesLen(); cnt != 0 {
t.Errorf("Producer still has %d unfullfilled promises", cnt)
}
msgs, err := redisClient.XRange(ctx, streamName, "-", "+").Result()
if err != nil {
t.Errorf("XRange failed: %v", err)
}
if len(msgs) != 0 {
t.Errorf("redis still has %v messages", len(msgs))
}
})
}
}

func TestRedisReproduceDisabled(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
producer, consumers := newProducerConsumers(ctx, t, &disableReproduce{})
producer.Start(ctx)
wantMsgs := wantMessages(messagesCount)
promises, err := produceMessages(ctx, wantMsgs, producer)
if err != nil {
t.Fatalf("Error producing messages: %v", err)
}

// Consumer messages in every third consumer but don't ack them to check
// that other consumers will claim ownership on those messages.
for i := 0; i < len(consumers); i += 3 {
consumers[i].Start(ctx)
if _, err := consumers[i].Consume(ctx); err != nil {
t.Errorf("Error consuming message: %v", err)
}
consumers[i].StopAndWait()
}

gotMessages, _ := consume(ctx, t, consumers)
gotResponses, err := awaitResponses(ctx, promises)
if err == nil {
t.Fatalf("All promises were fullfilled with reproduce disabled and some consumers killed")
}
producer.StopAndWait()
for _, c := range consumers {
c.StopWaiter.StopAndWait()
}
got, err := mergeValues(gotMessages)
if err != nil {
t.Fatalf("mergeMaps() unexpected error: %v", err)
}
wantMsgCnt := messagesCount - ((consumersCount + 2) / 3)
if len(got) != wantMsgCnt {
t.Fatalf("Got: %d messages, want %d", len(got), wantMsgCnt)
}
if len(gotResponses) != wantMsgCnt {
t.Errorf("Got %d responses want: %d\n", len(gotResponses), wantMsgCnt)
}
if cnt := producer.promisesLen(); cnt != 0 {
t.Errorf("Producer still has %d unfullfilled promises", cnt)
}
}

// mergeValues merges maps from the slice and returns their values.
// Returns and error if there exists duplicate key.
func mergeValues(messages []map[string]string) ([]string, error) {
18 changes: 10 additions & 8 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
@@ -35,6 +35,8 @@ var (
validatorValidValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/valid", nil)
validatorFailedValidationsCounter = metrics.NewRegisteredCounter("arb/validator/validations/failed", nil)
validatorMsgCountCurrentBatch = metrics.NewRegisteredGauge("arb/validator/msg_count_current_batch", nil)
validatorMsgCountCreatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_created", nil)
validatorMsgCountRecordSentGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_record_sent", nil)
validatorMsgCountValidatedGauge = metrics.NewRegisteredGauge("arb/validator/msg_count_validated", nil)
)

@@ -283,8 +285,9 @@ func NewBlockValidator(
return ret, nil
}

func atomicStorePos(addr *atomic.Uint64, val arbutil.MessageIndex) {
func atomicStorePos(addr *atomic.Uint64, val arbutil.MessageIndex, metr metrics.Gauge) {
addr.Store(uint64(val))
metr.Update(int64(val))
}

func atomicLoadPos(addr *atomic.Uint64) arbutil.MessageIndex {
@@ -588,7 +591,7 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e
v.validations.Store(pos, status)
v.nextCreateStartGS = endGS
v.nextCreatePrevDelayed = msg.DelayedMessagesRead
atomicStorePos(&v.createdA, pos+1)
atomicStorePos(&v.createdA, pos+1, validatorMsgCountCreatedGauge)
log.Trace("create validation entry: created", "pos", pos)
return true, nil
}
@@ -667,7 +670,7 @@ func (v *BlockValidator) sendNextRecordRequests(ctx context.Context) (bool, erro
return false, err
}
pos += 1
atomicStorePos(&v.recordSentA, pos)
atomicStorePos(&v.recordSentA, pos, validatorMsgCountRecordSentGauge)
log.Trace("next record request: sent", "pos", pos)
}

@@ -778,11 +781,10 @@ validationsLoop:
log.Error("failed writing new validated to database", "pos", pos, "err", err)
}
go v.recorder.MarkValid(pos, v.lastValidGS.BlockHash)
atomicStorePos(&v.validatedA, pos+1)
atomicStorePos(&v.validatedA, pos+1, validatorMsgCountValidatedGauge)
v.validations.Delete(pos)
nonBlockingTrigger(v.createNodesChan)
nonBlockingTrigger(v.sendRecordChan)
validatorMsgCountValidatedGauge.Update(int64(pos + 1))
if v.testingProgressMadeChan != nil {
nonBlockingTrigger(v.testingProgressMadeChan)
}
@@ -1222,9 +1224,9 @@ func (v *BlockValidator) checkValidatedGSCaughtUp() (bool, error) {
v.nextCreateBatchReread = true
v.nextCreateStartGS = v.lastValidGS
v.nextCreatePrevDelayed = msg.DelayedMessagesRead
atomicStorePos(&v.createdA, count)
atomicStorePos(&v.recordSentA, count)
atomicStorePos(&v.validatedA, count)
atomicStorePos(&v.createdA, count, validatorMsgCountCreatedGauge)
atomicStorePos(&v.recordSentA, count, validatorMsgCountRecordSentGauge)
atomicStorePos(&v.validatedA, count, validatorMsgCountValidatedGauge)
validatorMsgCountValidatedGauge.Update(int64(count))
v.chainCaughtUp = true
return true, nil
4 changes: 2 additions & 2 deletions system_tests/common_test.go
Original file line number Diff line number Diff line change
@@ -790,12 +790,12 @@ func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Co
conf.Wasm.RootPath = wasmRootDir
// Enable redis streams when URL is specified
if redisURL != "" {
conf.Arbitrator.RedisValidationServerConfig = rediscons.DefaultValidationServerConfig
conf.Arbitrator.RedisValidationServerConfig = rediscons.TestValidationServerConfig
redisClient, err := redisutil.RedisClientFromURL(redisURL)
if err != nil {
t.Fatalf("Error creating redis coordinator: %v", err)
}
redisStream := server_api.RedisStreamForRoot(currentRootModule(t))
redisStream := server_api.RedisStreamForRoot(rediscons.TestValidationServerConfig.StreamPrefix, currentRootModule(t))
createRedisGroup(ctx, t, redisStream, redisClient)
conf.Arbitrator.RedisValidationServerConfig.RedisURL = redisURL
t.Cleanup(func() { destroyRedisGroup(ctx, t, redisStream, redisClient) })
34 changes: 15 additions & 19 deletions validator/client/redis/producer.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import (

type ValidationClientConfig struct {
Name string `koanf:"name"`
StreamPrefix string `koanf:"stream-prefix"`
Room int32 `koanf:"room"`
RedisURL string `koanf:"redis-url"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
@@ -42,6 +43,7 @@ var TestValidationClientConfig = ValidationClientConfig{
Name: "test redis validation client",
Room: 2,
RedisURL: "",
StreamPrefix: "test-",
ProducerConfig: pubsub.TestProducerConfig,
CreateStreams: false,
}
@@ -50,21 +52,20 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".name", DefaultValidationClientConfig.Name, "validation client name")
f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room")
f.String(prefix+".redis-url", DefaultValidationClientConfig.RedisURL, "redis url")
f.String(prefix+".stream-prefix", DefaultValidationClientConfig.StreamPrefix, "prefix for stream name")
pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f)
f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist")
}

// ValidationClient implements validation client through redis streams.
type ValidationClient struct {
stopwaiter.StopWaiter
name string
room atomic.Int32
config *ValidationClientConfig
room atomic.Int32
// producers stores moduleRoot to producer mapping.
producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]
producerConfig pubsub.ProducerConfig
redisClient redis.UniversalClient
moduleRoots []common.Hash
createStreams bool
producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]
redisClient redis.UniversalClient
moduleRoots []common.Hash
}

func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) {
@@ -76,20 +77,18 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error)
return nil, err
}
validationClient := &ValidationClient{
name: cfg.Name,
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]),
producerConfig: cfg.ProducerConfig,
redisClient: redisClient,
createStreams: cfg.CreateStreams,
config: cfg,
producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]),
redisClient: redisClient,
}
validationClient.room.Store(cfg.Room)
return validationClient, nil
}

func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error {
for _, mr := range moduleRoots {
if c.createStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil {
if c.config.CreateStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), c.redisClient); err != nil {
return fmt.Errorf("creating redis stream: %w", err)
}
}
@@ -98,7 +97,7 @@ func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.
continue
}
p, err := pubsub.NewProducer[*validator.ValidationInput, validator.GoGlobalState](
c.redisClient, server_api.RedisStreamForRoot(mr), &c.producerConfig)
c.redisClient, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig)
if err != nil {
log.Warn("failed init redis for %v: %w", mr, err)
continue
@@ -146,10 +145,7 @@ func (c *ValidationClient) Stop() {
}

func (c *ValidationClient) Name() string {
if c.Started() {
return c.name
}
return "(not started)"
return c.config.Name
}

func (c *ValidationClient) Room() int {
4 changes: 2 additions & 2 deletions validator/server_api/json.go
Original file line number Diff line number Diff line change
@@ -45,8 +45,8 @@ func MachineStepResultFromJson(resultJson *MachineStepResultJson) (*validator.Ma
}, nil
}

func RedisStreamForRoot(moduleRoot common.Hash) string {
return fmt.Sprintf("stream:%s", moduleRoot.Hex())
func RedisStreamForRoot(prefix string, moduleRoot common.Hash) string {
return fmt.Sprintf("%sstream:%s", prefix, moduleRoot.Hex())
}

type Request struct {
6 changes: 5 additions & 1 deletion validator/valnode/redis/consumer.go
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati
consumers := make(map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState])
for _, hash := range cfg.ModuleRoots {
mr := common.HexToHash(hash)
c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, server_api.RedisStreamForRoot(mr), &cfg.ConsumerConfig)
c, err := pubsub.NewConsumer[*validator.ValidationInput, validator.GoGlobalState](redisClient, server_api.RedisStreamForRoot(cfg.StreamPrefix, mr), &cfg.ConsumerConfig)
if err != nil {
return nil, fmt.Errorf("creating consumer for validation: %w", err)
}
@@ -130,17 +130,20 @@ type ValidationServerConfig struct {
ModuleRoots []string `koanf:"module-roots"`
// Timeout on polling for existence of each redis stream.
StreamTimeout time.Duration `koanf:"stream-timeout"`
StreamPrefix string `koanf:"stream-prefix"`
}

var DefaultValidationServerConfig = ValidationServerConfig{
RedisURL: "",
StreamPrefix: "",
ConsumerConfig: pubsub.DefaultConsumerConfig,
ModuleRoots: []string{},
StreamTimeout: 10 * time.Minute,
}

var TestValidationServerConfig = ValidationServerConfig{
RedisURL: "",
StreamPrefix: "test-",
ConsumerConfig: pubsub.TestConsumerConfig,
ModuleRoots: []string{},
StreamTimeout: time.Minute,
@@ -150,6 +153,7 @@ func ValidationServerConfigAddOptions(prefix string, f *pflag.FlagSet) {
pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f)
f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes")
f.String(prefix+".redis-url", DefaultValidationServerConfig.RedisURL, "url of redis server")
f.String(prefix+".stream-prefix", DefaultValidationServerConfig.StreamPrefix, "prefix for stream name")
f.Duration(prefix+".stream-timeout", DefaultValidationServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams")
}