diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc5cd68a90..7fea04de4b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -145,7 +145,7 @@ jobs: if: matrix.test-mode == 'defaults' run: | packages=`go list ./...` - stdbuf -oL gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 --no-color=false -- ./... -coverprofile=coverage.txt -covermode=atomic -coverpkg=./...,./go-ethereum/... -timeout 20m -parallel=8 > >(stdbuf -oL tee full.log | grep -vE "INFO|seal") + stdbuf -oL gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 --no-color=false -- ./... -coverprofile=coverage.txt -covermode=atomic -coverpkg=./...,./go-ethereum/... -timeout 20m -parallel=8 -tags=cionly > >(stdbuf -oL tee full.log | grep -vE "INFO|seal") - name: run tests with race detection if: matrix.test-mode == 'race' diff --git a/Dockerfile b/Dockerfile index 69e688679b..2fced8fade 100644 --- a/Dockerfile +++ b/Dockerfile @@ -250,7 +250,10 @@ COPY --from=node-builder /workspace/target/bin/nitro /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/relay /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/nitro-val /usr/local/bin/ COPY --from=node-builder /workspace/target/bin/seq-coordinator-manager /usr/local/bin/ +COPY --from=node-builder /workspace/target/bin/prover /usr/local/bin/ COPY --from=machine-versions /workspace/machines /home/user/target/machines +COPY ./scripts/validate-wasm-module-root.sh . +RUN ./validate-wasm-module-root.sh /home/user/target/machines /usr/local/bin/prover USER root RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update && \ diff --git a/arbitrator/prover/src/main.rs b/arbitrator/prover/src/main.rs index 9ddd5020c8..e10a415389 100644 --- a/arbitrator/prover/src/main.rs +++ b/arbitrator/prover/src/main.rs @@ -41,6 +41,9 @@ struct Opts { #[structopt(long)] /// print modules to the console print_modules: bool, + #[structopt(long)] + /// print wasm module root to the console + print_wasmmoduleroot: bool, /// profile output instead of generting proofs #[structopt(short = "p", long)] profile_run: bool, @@ -122,6 +125,18 @@ const DELAYED_HEADER_LEN: usize = 112; // also in test-case's host-io.rs & contr fn main() -> Result<()> { let opts = Opts::from_args(); + if opts.print_wasmmoduleroot { + match Machine::new_from_wavm(&opts.binary) { + Ok(mach) => { + println!("0x{}", mach.get_modules_root()); + return Ok(()); + } + Err(err) => { + eprintln!("Error loading binary: {err}"); + return Err(err); + } + } + } let mut inbox_contents = HashMap::default(); let mut inbox_position = opts.inbox_position; let mut delayed_position = opts.delayed_inbox_position; diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 2617a9a629..ccf9b4aab7 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -112,7 +112,7 @@ type BatchPoster struct { // This is an atomic variable that should only be accessed atomically. // An estimate of the number of batches we want to post but haven't yet. // This doesn't include batches which we don't want to post yet due to the L1 bounds. - backlog uint64 + backlog atomic.Uint64 lastHitL1Bounds time.Time // The last time we wanted to post a message but hit the L1 bounds batchReverted atomic.Bool // indicates whether data poster batch was reverted @@ -1086,7 +1086,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) if config.IgnoreBlobPrice { use4844 = true } else { - backlog := atomic.LoadUint64(&b.backlog) + backlog := b.backlog.Load() // Logic to prevent switching from non-4844 batches to 4844 batches too often, // so that blocks can be filled efficiently. The geth txpool rejects txs for // accounts that already have the other type of txs in the pool with @@ -1437,7 +1437,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) // Setting the backlog to 0 here ensures that we don't lower compression as a result. backlog = 0 } - atomic.StoreUint64(&b.backlog, backlog) + b.backlog.Store(backlog) b.building = nil // If we aren't queueing up transactions, wait for the receipt before moving on to the next batch. @@ -1453,7 +1453,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) } func (b *BatchPoster) GetBacklogEstimate() uint64 { - return atomic.LoadUint64(&b.backlog) + return b.backlog.Load() } func (b *BatchPoster) Start(ctxIn context.Context) { diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 3ba9aa78f3..78b4db1929 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -97,8 +97,8 @@ type InboxReader struct { l1Reader *headerreader.HeaderReader // Atomic - lastSeenBatchCount uint64 - lastReadBatchCount uint64 + lastSeenBatchCount atomic.Uint64 + lastReadBatchCount atomic.Uint64 } func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) { @@ -240,7 +240,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { seenBatchCountStored := uint64(math.MaxUint64) storeSeenBatchCount := func() { if seenBatchCountStored != seenBatchCount { - atomic.StoreUint64(&r.lastSeenBatchCount, seenBatchCount) + r.lastSeenBatchCount.Store(seenBatchCount) seenBatchCountStored = seenBatchCount } } @@ -394,7 +394,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { // There's nothing to do from = arbmath.BigAddByUint(currentHeight, 1) blocksToFetch = config.DefaultBlocksToRead - atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount) + r.lastReadBatchCount.Store(checkingBatchCount) storeSeenBatchCount() if !r.caughtUp && readMode == "latest" { r.caughtUp = true @@ -526,7 +526,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } if len(sequencerBatches) > 0 { readAnyBatches = true - atomic.StoreUint64(&r.lastReadBatchCount, sequencerBatches[len(sequencerBatches)-1].SequenceNumber+1) + r.lastReadBatchCount.Store(sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1) storeSeenBatchCount() } } @@ -553,7 +553,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } if !readAnyBatches { - atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount) + r.lastReadBatchCount.Store(checkingBatchCount) storeSeenBatchCount() } } @@ -625,7 +625,7 @@ func (r *InboxReader) GetSequencerMessageBytes(ctx context.Context, seqNum uint6 } func (r *InboxReader) GetLastReadBatchCount() uint64 { - return atomic.LoadUint64(&r.lastReadBatchCount) + return r.lastReadBatchCount.Load() } // GetLastSeenBatchCount returns how many sequencer batches the inbox reader has read in from L1. @@ -633,7 +633,7 @@ func (r *InboxReader) GetLastReadBatchCount() uint64 { // >0 - last batchcount seen in run() - only written after lastReadBatchCount updated // 0 - no batch seen, error func (r *InboxReader) GetLastSeenBatchCount() uint64 { - return atomic.LoadUint64(&r.lastSeenBatchCount) + return r.lastSeenBatchCount.Load() } func (r *InboxReader) GetDelayBlocks() uint64 { diff --git a/arbnode/redislock/redis.go b/arbnode/redislock/redis.go index 09296e3c79..7e26010cae 100644 --- a/arbnode/redislock/redis.go +++ b/arbnode/redislock/redis.go @@ -21,7 +21,7 @@ type Simple struct { stopwaiter.StopWaiter client redis.UniversalClient config SimpleCfgFetcher - lockedUntil int64 + lockedUntil atomic.Int64 mutex sync.Mutex stopping bool readyToLock func() bool @@ -239,12 +239,12 @@ func execTestPipe(pipe redis.Pipeliner, ctx context.Context) error { } // notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter. -func atomicTimeRead(addr *int64) time.Time { - asint64 := atomic.LoadInt64(addr) +func atomicTimeRead(addr *atomic.Int64) time.Time { + asint64 := addr.Load() return time.UnixMilli(asint64) } -func atomicTimeWrite(addr *int64, t time.Time) { +func atomicTimeWrite(addr *atomic.Int64, t time.Time) { asint64 := t.UnixMilli() - atomic.StoreInt64(addr, asint64) + addr.Store(asint64) } diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index cdf1011b11..98c19ce361 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -48,7 +48,7 @@ type SeqCoordinator struct { prevChosenSequencer string reportedWantsLockout bool - lockoutUntil int64 // atomic + lockoutUntil atomic.Int64 // atomic wantsLockoutMutex sync.Mutex // manages access to acquireLockoutAndWriteMessage and generally the wants lockout key avoidLockout int // If > 0, prevents acquiring the lockout but not extending the lockout if no alternative sequencer wants the lockout. Protected by chosenUpdateMutex. @@ -191,14 +191,14 @@ func StandaloneSeqCoordinatorInvalidateMsgIndex(ctx context.Context, redisClient return nil } -func atomicTimeWrite(addr *int64, t time.Time) { +func atomicTimeWrite(addr *atomic.Int64, t time.Time) { asint64 := t.UnixMilli() - atomic.StoreInt64(addr, asint64) + addr.Store(asint64) } // notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter. -func atomicTimeRead(addr *int64) time.Time { - asint64 := atomic.LoadInt64(addr) +func atomicTimeRead(addr *atomic.Int64) time.Time { + asint64 := addr.Load() return time.UnixMilli(asint64) } @@ -692,7 +692,7 @@ func (c *SeqCoordinator) DebugPrint() string { return fmt.Sprint("Url:", c.config.Url(), " prevChosenSequencer:", c.prevChosenSequencer, " reportedWantsLockout:", c.reportedWantsLockout, - " lockoutUntil:", c.lockoutUntil, + " lockoutUntil:", c.lockoutUntil.Load(), " redisErrors:", c.redisErrors) } diff --git a/arbnode/seq_coordinator_atomic_test.go b/arbnode/seq_coordinator_atomic_test.go index 61468a3adb..9b9d9dea81 100644 --- a/arbnode/seq_coordinator_atomic_test.go +++ b/arbnode/seq_coordinator_atomic_test.go @@ -21,21 +21,21 @@ import ( const messagesPerRound = 20 type CoordinatorTestData struct { - messageCount uint64 + messageCount atomic.Uint64 sequencer []string err error mutex sync.Mutex waitForCoords sync.WaitGroup - testStartRound int32 + testStartRound atomic.Int32 } func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *CoordinatorTestData) { nextRound := int32(0) for { sequenced := make([]bool, messagesPerRound) - for atomic.LoadInt32(&data.testStartRound) < nextRound { + for data.testStartRound.Load() < nextRound { if ctx.Err() != nil { return } @@ -44,7 +44,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo nextRound++ var execError error for { - messageCount := atomic.LoadUint64(&data.messageCount) + messageCount := data.messageCount.Load() if messageCount >= messagesPerRound { break } @@ -53,7 +53,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo err := coord.acquireLockoutAndWriteMessage(ctx, asIndex, asIndex+1, &arbostypes.EmptyTestMessageWithMetadata) if err == nil { sequenced[messageCount] = true - atomic.StoreUint64(&data.messageCount, messageCount+1) + data.messageCount.Store(messageCount + 1) randNr := rand.Intn(20) if randNr > 15 { execError = coord.chosenOneRelease(ctx) @@ -105,9 +105,9 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) { coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true coordConfig.Signer.Symmetric.SigningKey = "" testData := CoordinatorTestData{ - testStartRound: -1, - sequencer: make([]string, messagesPerRound), + sequencer: make([]string, messagesPerRound), } + testData.testStartRound.Store(-1) nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil) Require(t, err) @@ -134,12 +134,12 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) { for round := int32(0); round < 10; round++ { redisClient.Del(ctx, redisutil.CHOSENSEQ_KEY, redisutil.MSG_COUNT_KEY) - testData.messageCount = 0 + testData.messageCount.Store(0) for i := 0; i < messagesPerRound; i++ { testData.sequencer[i] = "" } testData.waitForCoords.Add(NumOfThreads) - atomic.StoreInt32(&testData.testStartRound, round) + testData.testStartRound.Store(round) testData.waitForCoords.Wait() Require(t, testData.err) seqList := "" diff --git a/arbnode/sequencer_inbox.go b/arbnode/sequencer_inbox.go index 46e1edb78b..73e52ded53 100644 --- a/arbnode/sequencer_inbox.go +++ b/arbnode/sequencer_inbox.go @@ -232,7 +232,7 @@ func (i *SequencerInbox) LookupBatchesInRange(ctx context.Context, from, to *big seqNum := parsedLog.BatchSequenceNumber.Uint64() if lastSeqNum != nil { if seqNum != *lastSeqNum+1 { - return nil, fmt.Errorf("sequencer batches out of order; after batch %v got batch %v", lastSeqNum, seqNum) + return nil, fmt.Errorf("sequencer batches out of order; after batch %v got batch %v", *lastSeqNum, seqNum) } } lastSeqNum = &seqNum diff --git a/arbnode/simple_redis_lock_test.go b/arbnode/simple_redis_lock_test.go index c9dd576749..72f3dfa837 100644 --- a/arbnode/simple_redis_lock_test.go +++ b/arbnode/simple_redis_lock_test.go @@ -21,11 +21,11 @@ const test_release_frac = 5 const test_delay = time.Millisecond const test_redisKey_prefix = "__TEMP_SimpleRedisLockTest__" -func attemptLock(ctx context.Context, s *redislock.Simple, flag *int32, wg *sync.WaitGroup) { +func attemptLock(ctx context.Context, s *redislock.Simple, flag *atomic.Int32, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < test_attempts; i++ { if s.AttemptLock(ctx) { - atomic.AddInt32(flag, 1) + flag.Add(1) } else if rand.Intn(test_release_frac) == 0 { s.Release(ctx) } @@ -76,17 +76,17 @@ func simpleRedisLockTest(t *testing.T, redisKeySuffix string, chosen int, backgo <-time.After(time.Second) } wg := sync.WaitGroup{} - counters := make([]int32, test_threads) + counters := make([]atomic.Int32, test_threads) for i, lock := range locks { wg.Add(1) go attemptLock(ctx, lock, &counters[i], &wg) } wg.Wait() successful := -1 - for i, counter := range counters { - if counter != 0 { - if counter != test_attempts { - t.Fatalf("counter %d value %d", i, counter) + for i := range counters { + if counters[i].Load() != 0 { + if counters[i].Load() != test_attempts { + t.Fatalf("counter %d value %d", i, counters[i].Load()) } if successful > 0 { t.Fatalf("counter %d and %d both positive", i, successful) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 5c02129ee6..22ecdd553c 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -62,7 +62,7 @@ type TransactionStreamer struct { nextAllowedFeedReorgLog time.Time broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockHash - broadcasterQueuedMessagesPos uint64 + broadcasterQueuedMessagesPos atomic.Uint64 broadcasterQueuedMessagesActiveReorg bool coordinator *SeqCoordinator @@ -491,14 +491,14 @@ func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreC } func (s *TransactionStreamer) FeedPendingMessageCount() arbutil.MessageIndex { - pos := atomic.LoadUint64(&s.broadcasterQueuedMessagesPos) + pos := s.broadcasterQueuedMessagesPos.Load() if pos == 0 { return 0 } s.insertionMutex.Lock() defer s.insertionMutex.Unlock() - pos = atomic.LoadUint64(&s.broadcasterQueuedMessagesPos) + pos = s.broadcasterQueuedMessagesPos.Load() if pos == 0 { return 0 } @@ -552,14 +552,14 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if len(s.broadcasterQueuedMessages) == 0 || (feedReorg && !s.broadcasterQueuedMessagesActiveReorg) { // Empty cache or feed different from database, save current feed messages until confirmed L1 messages catch up. s.broadcasterQueuedMessages = messages - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos)) s.broadcasterQueuedMessagesActiveReorg = feedReorg } else { - broadcasterQueuedMessagesPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)) + broadcasterQueuedMessagesPos := arbutil.MessageIndex(s.broadcasterQueuedMessagesPos.Load()) if broadcasterQueuedMessagesPos >= broadcastStartPos { // Feed messages older than cache s.broadcasterQueuedMessages = messages - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos)) s.broadcasterQueuedMessagesActiveReorg = feedReorg } else if broadcasterQueuedMessagesPos+arbutil.MessageIndex(len(s.broadcasterQueuedMessages)) == broadcastStartPos { // Feed messages can be added directly to end of cache @@ -579,7 +579,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe ) } s.broadcasterQueuedMessages = messages - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos)) s.broadcasterQueuedMessagesActiveReorg = feedReorg } } @@ -795,7 +795,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil var cacheClearLen int messagesAfterPos := messageStartPos + arbutil.MessageIndex(len(messages)) - broadcastStartPos := arbutil.MessageIndex(atomic.LoadUint64(&s.broadcasterQueuedMessagesPos)) + broadcastStartPos := arbutil.MessageIndex(s.broadcasterQueuedMessagesPos.Load()) if messagesAreConfirmed { var duplicates int @@ -903,10 +903,10 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil // Check if new messages were added at the end of cache, if they were, then dont remove those particular messages if len(s.broadcasterQueuedMessages) > cacheClearLen { s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[cacheClearLen:] - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, uint64(broadcastStartPos)+uint64(cacheClearLen)) + s.broadcasterQueuedMessagesPos.Store(uint64(broadcastStartPos) + uint64(cacheClearLen)) } else { s.broadcasterQueuedMessages = s.broadcasterQueuedMessages[:0] - atomic.StoreUint64(&s.broadcasterQueuedMessagesPos, 0) + s.broadcasterQueuedMessagesPos.Store(0) } s.broadcasterQueuedMessagesActiveReorg = false } diff --git a/arbos/programs/api.go b/arbos/programs/api.go index 65a58a47c2..0519cd9f01 100644 --- a/arbos/programs/api.go +++ b/arbos/programs/api.go @@ -137,16 +137,31 @@ func newApiClosures( startGas := am.SaturatingUSub(gasLeft, baseCost) * 63 / 64 gas := am.MinInt(startGas, gasReq) - // Tracing: emit the call (value transfer is done later in evm.Call) - if tracingInfo != nil { - tracingInfo.Tracer.CaptureState(0, opcode, startGas, baseCost+gas, scope, []byte{}, depth, nil) - } - // EVM rule: calls that pay get a stipend (opCall) if value.Sign() != 0 { gas = am.SaturatingUAdd(gas, params.CallStipend) } + // Tracing: emit the call (value transfer is done later in evm.Call) + if tracingInfo != nil { + var args []uint256.Int + args = append(args, *uint256.NewInt(gas)) // gas + args = append(args, *uint256.NewInt(0).SetBytes(contract.Bytes())) // to address + if opcode == vm.CALL { + args = append(args, *uint256.NewInt(0).SetBytes(value.Bytes())) // call value + } + args = append(args, *uint256.NewInt(0)) // memory offset + args = append(args, *uint256.NewInt(uint64(len(input)))) // memory length + args = append(args, *uint256.NewInt(0)) // return offset + args = append(args, *uint256.NewInt(0)) // return size + s := &vm.ScopeContext{ + Memory: util.TracingMemoryFromBytes(input), + Stack: util.TracingStackFromArgs(args...), + Contract: scope.Contract, + } + tracingInfo.Tracer.CaptureState(0, opcode, startGas, baseCost+gas, s, []byte{}, depth, nil) + } + var ret []byte var returnGas uint64 diff --git a/arbos/programs/native_api.go b/arbos/programs/native_api.go index 136f74c964..6fbb630ef3 100644 --- a/arbos/programs/native_api.go +++ b/arbos/programs/native_api.go @@ -34,7 +34,7 @@ import ( ) var apiObjects sync.Map -var apiIds uintptr // atomic and sequential +var apiIds atomic.Uintptr // atomic and sequential type NativeApi struct { handler RequestHandler @@ -49,7 +49,7 @@ func newApi( memoryModel *MemoryModel, ) NativeApi { handler := newApiClosures(interpreter, tracingInfo, scope, memoryModel) - apiId := atomic.AddUintptr(&apiIds, 1) + apiId := apiIds.Add(1) id := usize(apiId) api := NativeApi{ handler: handler, diff --git a/arbos/util/tracing.go b/arbos/util/tracing.go index f0f101bc20..f3564143c5 100644 --- a/arbos/util/tracing.go +++ b/arbos/util/tracing.go @@ -56,11 +56,8 @@ func (info *TracingInfo) RecordEmitLog(topics []common.Hash, data []byte) { for _, topic := range topics { args = append(args, HashToUint256(topic)) // topic: 32-byte value. Max topics count is 4 } - memory := vm.NewMemory() - memory.Resize(size) - memory.Set(0, size, data) scope := &vm.ScopeContext{ - Memory: memory, + Memory: TracingMemoryFromBytes(data), Stack: TracingStackFromArgs(args...), Contract: info.Contract, } diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index 2225341560..7d27c57fe9 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -133,7 +133,7 @@ type BroadcastClient struct { connMutex sync.Mutex conn net.Conn - retryCount int64 + retryCount atomic.Int64 retrying bool shuttingDown bool @@ -435,7 +435,7 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) { } func (bc *BroadcastClient) GetRetryCount() int64 { - return atomic.LoadInt64(&bc.retryCount) + return bc.retryCount.Load() } func (bc *BroadcastClient) isShuttingDown() bool { @@ -458,7 +458,7 @@ func (bc *BroadcastClient) retryConnect(ctx context.Context) io.Reader { case <-timer.C: } - atomic.AddInt64(&bc.retryCount, 1) + bc.retryCount.Add(1) earlyFrameData, err := bc.connect(ctx, bc.nextSeqNum) if err == nil { bc.retrying = false diff --git a/broadcastclients/broadcastclients.go b/broadcastclients/broadcastclients.go index 23c4bd7738..8cd124bfe0 100644 --- a/broadcastclients/broadcastclients.go +++ b/broadcastclients/broadcastclients.go @@ -49,7 +49,7 @@ type BroadcastClients struct { secondaryRouter *Router // Use atomic access - connected int32 + connected atomic.Int32 } func NewBroadcastClients( @@ -113,7 +113,7 @@ func NewBroadcastClients( } func (bcs *BroadcastClients) adjustCount(delta int32) { - connected := atomic.AddInt32(&bcs.connected, delta) + connected := bcs.connected.Add(delta) if connected <= 0 { log.Error("no connected feed") } diff --git a/das/reader_aggregator_strategies.go b/das/reader_aggregator_strategies.go index d20760bd5b..8e10d52c16 100644 --- a/das/reader_aggregator_strategies.go +++ b/das/reader_aggregator_strategies.go @@ -41,7 +41,7 @@ func (s *abstractAggregatorStrategy) update(readers []daprovider.DASReader, stat // Exponentially growing Explore Exploit Strategy type simpleExploreExploitStrategy struct { - iterations uint32 + iterations atomic.Uint32 exploreIterations uint32 exploitIterations uint32 @@ -49,7 +49,7 @@ type simpleExploreExploitStrategy struct { } func (s *simpleExploreExploitStrategy) newInstance() aggregatorStrategyInstance { - iterations := atomic.AddUint32(&s.iterations, 1) + iterations := s.iterations.Add(1) readerSets := make([][]daprovider.DASReader, 0) s.RLock() diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 2bace9b677..c5e97342d2 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -52,8 +52,8 @@ var ( nonceFailureCacheOverflowCounter = metrics.NewRegisteredGauge("arb/sequencer/noncefailurecache/overflow", nil) blockCreationTimer = metrics.NewRegisteredTimer("arb/sequencer/block/creation", nil) successfulBlocksCounter = metrics.NewRegisteredCounter("arb/sequencer/block/successful", nil) - conditionalTxRejectedBySequencerCounter = metrics.NewRegisteredCounter("arb/sequencer/condtionaltx/rejected", nil) - conditionalTxAcceptedBySequencerCounter = metrics.NewRegisteredCounter("arb/sequencer/condtionaltx/accepted", nil) + conditionalTxRejectedBySequencerCounter = metrics.NewRegisteredCounter("arb/sequencer/conditionaltx/rejected", nil) + conditionalTxAcceptedBySequencerCounter = metrics.NewRegisteredCounter("arb/sequencer/conditionaltx/accepted", nil) l1GasPriceGauge = metrics.NewRegisteredGauge("arb/sequencer/l1gasprice", nil) callDataUnitsBacklogGauge = metrics.NewRegisteredGauge("arb/sequencer/calldataunitsbacklog", nil) unusedL1GasChargeGauge = metrics.NewRegisteredGauge("arb/sequencer/unusedl1gascharge", nil) @@ -321,7 +321,7 @@ type Sequencer struct { onForwarderSet chan struct{} L1BlockAndTimeMutex sync.Mutex - l1BlockNumber uint64 + l1BlockNumber atomic.Uint64 l1Timestamp uint64 // activeMutex manages pauseChan (pauses execution) and forwarder @@ -356,7 +356,6 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead config: configFetcher, senderWhitelist: senderWhitelist, nonceCache: newNonceCache(config.NonceCacheSize), - l1BlockNumber: 0, l1Timestamp: 0, pauseChan: nil, onForwarderSet: make(chan struct{}, 1), @@ -900,7 +899,7 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { timestamp := time.Now().Unix() s.L1BlockAndTimeMutex.Lock() - l1Block := s.l1BlockNumber + l1Block := s.l1BlockNumber.Load() l1Timestamp := s.l1Timestamp s.L1BlockAndTimeMutex.Unlock() @@ -1014,9 +1013,9 @@ func (s *Sequencer) updateLatestParentChainBlock(header *types.Header) { defer s.L1BlockAndTimeMutex.Unlock() l1BlockNumber := arbutil.ParentHeaderToL1BlockNumber(header) - if header.Time > s.l1Timestamp || (header.Time == s.l1Timestamp && l1BlockNumber > s.l1BlockNumber) { + if header.Time > s.l1Timestamp || (header.Time == s.l1Timestamp && l1BlockNumber > s.l1BlockNumber.Load()) { s.l1Timestamp = header.Time - s.l1BlockNumber = l1BlockNumber + s.l1BlockNumber.Store(l1BlockNumber) } } @@ -1082,7 +1081,7 @@ func (s *Sequencer) Start(ctxIn context.Context) error { } if s.l1Reader != nil { - initialBlockNr := atomic.LoadUint64(&s.l1BlockNumber) + initialBlockNr := s.l1BlockNumber.Load() if initialBlockNr == 0 { return errors.New("sequencer not initialized") } diff --git a/execution/gethexec/tx_pre_checker.go b/execution/gethexec/tx_pre_checker.go index 1a48d75fda..dacfd32e81 100644 --- a/execution/gethexec/tx_pre_checker.go +++ b/execution/gethexec/tx_pre_checker.go @@ -23,10 +23,10 @@ import ( ) var ( - conditionalTxRejectedByTxPreCheckerCurrentStateCounter = metrics.NewRegisteredCounter("arb/txprechecker/condtionaltx/currentstate/rejected", nil) - conditionalTxAcceptedByTxPreCheckerCurrentStateCounter = metrics.NewRegisteredCounter("arb/txprechecker/condtionaltx/currentstate/accepted", nil) - conditionalTxRejectedByTxPreCheckerOldStateCounter = metrics.NewRegisteredCounter("arb/txprechecker/condtionaltx/oldstate/rejected", nil) - conditionalTxAcceptedByTxPreCheckerOldStateCounter = metrics.NewRegisteredCounter("arb/txprechecker/condtionaltx/oldstate/accepted", nil) + conditionalTxRejectedByTxPreCheckerCurrentStateCounter = metrics.NewRegisteredCounter("arb/txprechecker/conditionaltx/currentstate/rejected", nil) + conditionalTxAcceptedByTxPreCheckerCurrentStateCounter = metrics.NewRegisteredCounter("arb/txprechecker/conditionaltx/currentstate/accepted", nil) + conditionalTxRejectedByTxPreCheckerOldStateCounter = metrics.NewRegisteredCounter("arb/txprechecker/conditionaltx/oldstate/rejected", nil) + conditionalTxAcceptedByTxPreCheckerOldStateCounter = metrics.NewRegisteredCounter("arb/txprechecker/conditionaltx/oldstate/accepted", nil) ) const TxPreCheckerStrictnessNone uint = 0 diff --git a/go-ethereum b/go-ethereum index 7c6e05f753..18256c2dfc 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 7c6e05f75337d59562a6e2b8e1bdf1e445ad14fb +Subproject commit 18256c2dfcce8fd567aa05e03fbc11a4c17aa550 diff --git a/scripts/validate-wasm-module-root.sh b/scripts/validate-wasm-module-root.sh new file mode 100755 index 0000000000..f62a46a8d7 --- /dev/null +++ b/scripts/validate-wasm-module-root.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -e + +MACHINES_DIR=$1 +PROVER=$2 + +for machine in "$MACHINES_DIR"/*/ ; do + if [ -d "$machine" ]; then + expectedWasmModuleRoot=$(cat "$machine/module-root.txt") + actualWasmModuleRoot=$(cd "$machine" && "$PROVER" machine.wavm.br --print-wasmmoduleroot) + if [ "$expectedWasmModuleRoot" != "$actualWasmModuleRoot" ]; then + echo "Error: Expected module root $expectedWasmModuleRoot but found $actualWasmModuleRoot in $machine" + exit 1 + fi + fi +done diff --git a/staker/block_validator.go b/staker/block_validator.go index 94ee907da5..d7b5f4f6a2 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -64,9 +64,9 @@ type BlockValidator struct { // can be read (atomic.Load) by anyone holding reorg-read // written (atomic.Set) by appropriate thread or (any way) holding reorg-write - createdA uint64 - recordSentA uint64 - validatedA uint64 + createdA atomic.Uint64 + recordSentA atomic.Uint64 + validatedA atomic.Uint64 validations containers.SyncMap[arbutil.MessageIndex, *validationStatus] config BlockValidatorConfigFetcher @@ -208,19 +208,19 @@ const ( ) type validationStatus struct { - Status uint32 // atomic: value is one of validationStatus* + Status atomic.Uint32 // atomic: value is one of validationStatus* Cancel func() // non-atomic: only read/written to with reorg mutex Entry *validationEntry // non-atomic: only read if Status >= validationStatusPrepared Runs []validator.ValidationRun // if status >= ValidationSent } func (s *validationStatus) getStatus() valStatusField { - uintStat := atomic.LoadUint32(&s.Status) + uintStat := s.Status.Load() return valStatusField(uintStat) } func (s *validationStatus) replaceStatus(old, new valStatusField) bool { - return atomic.CompareAndSwapUint32(&s.Status, uint32(old), uint32(new)) + return s.Status.CompareAndSwap(uint32(old), uint32(new)) } func NewBlockValidator( @@ -283,12 +283,12 @@ func NewBlockValidator( return ret, nil } -func atomicStorePos(addr *uint64, val arbutil.MessageIndex) { - atomic.StoreUint64(addr, uint64(val)) +func atomicStorePos(addr *atomic.Uint64, val arbutil.MessageIndex) { + addr.Store(uint64(val)) } -func atomicLoadPos(addr *uint64) arbutil.MessageIndex { - return arbutil.MessageIndex(atomic.LoadUint64(addr)) +func atomicLoadPos(addr *atomic.Uint64) arbutil.MessageIndex { + return arbutil.MessageIndex(addr.Load()) } func (v *BlockValidator) created() arbutil.MessageIndex { @@ -582,9 +582,9 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e return false, err } status := &validationStatus{ - Status: uint32(Created), - Entry: entry, + Entry: entry, } + status.Status.Store(uint32(Created)) v.validations.Store(pos, status) v.nextCreateStartGS = endGS v.nextCreatePrevDelayed = msg.DelayedMessagesRead @@ -962,13 +962,13 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt v.nextCreateStartGS = globalState v.nextCreatePrevDelayed = msg.DelayedMessagesRead v.nextCreateBatchReread = true - v.createdA = countUint64 + v.createdA.Store(countUint64) } // under the reorg mutex we don't need atomic access - if v.recordSentA < countUint64 { - v.recordSentA = countUint64 + if v.recordSentA.Load() < countUint64 { + v.recordSentA.Store(countUint64) } - v.validatedA = countUint64 + v.validatedA.Store(countUint64) v.valLoopPos = count validatorMsgCountValidatedGauge.Update(int64(countUint64)) err = v.writeLastValidated(globalState, nil) // we don't know which wasm roots were validated @@ -1027,13 +1027,13 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex) v.nextCreatePrevDelayed = msg.DelayedMessagesRead v.nextCreateBatchReread = true countUint64 := uint64(count) - v.createdA = countUint64 + v.createdA.Store(countUint64) // under the reorg mutex we don't need atomic access - if v.recordSentA > countUint64 { - v.recordSentA = countUint64 + if v.recordSentA.Load() > countUint64 { + v.recordSentA.Store(countUint64) } - if v.validatedA > countUint64 { - v.validatedA = countUint64 + if v.validatedA.Load() > countUint64 { + v.validatedA.Store(countUint64) validatorMsgCountValidatedGauge.Update(int64(countUint64)) err := v.writeLastValidated(v.nextCreateStartGS, nil) // we don't know which wasm roots were validated if err != nil { diff --git a/staker/challenge-cache/cache.go b/staker/challenge-cache/cache.go index 8cca4bb835..ed4fad6450 100644 --- a/staker/challenge-cache/cache.go +++ b/staker/challenge-cache/cache.go @@ -10,7 +10,7 @@ store them in a filesystem cache to avoid recomputing them and for hierarchical Each file contains a list of 32 byte hashes, concatenated together as bytes. Using this structure, we can namespace hashes by message number and by challenge level. -Once a validator receives a full list of computed machine hashes for the first time from a validatio node, +Once a validator receives a full list of computed machine hashes for the first time from a validation node, it will write the hashes to this filesystem hierarchy for fast access next time these hashes are needed. Example uses: @@ -18,7 +18,7 @@ Example uses: - Obtain all the hashes from step 100 to 101 at subchallenge level 1 for the execution of message num 70. wavm-module-root-0xab/ - rollup-block-hash-0x12...-message-num-70/ + message-num-70-rollup-block-hash-0x12.../ hashes.bin subchallenge-level-1-big-step-100/ hashes.bin @@ -31,18 +31,21 @@ package challengecache import ( "bufio" + "context" "errors" "fmt" "io" "os" "path/filepath" + "regexp" + "strconv" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" ) var ( - ErrNotFoundInCache = errors.New("no found in challenge cache") + ErrNotFoundInCache = errors.New("not found in challenge cache") ErrFileAlreadyExists = errors.New("file already exists") ErrNoHashes = errors.New("no hashes being written") hashesFileName = "hashes.bin" @@ -59,30 +62,46 @@ type HistoryCommitmentCacher interface { Put(lookup *Key, hashes []common.Hash) error } +// Key for cache lookups includes the wavm module root of a challenge, as well +// as the heights for messages and big steps as needed. +type Key struct { + RollupBlockHash common.Hash + WavmModuleRoot common.Hash + MessageHeight uint64 + StepHeights []uint64 +} + // Cache for history commitments on disk. type Cache struct { - baseDir string + baseDir string + tempWritesDir string } // New cache from a base directory path. func New(baseDir string) (*Cache, error) { - if _, err := os.Stat(baseDir); err != nil { - if err := os.MkdirAll(baseDir, os.ModePerm); err != nil { - return nil, fmt.Errorf("could not make base cache directory %s: %w", baseDir, err) - } - } return &Cache{ - baseDir: baseDir, + baseDir: baseDir, + tempWritesDir: "", }, nil } -// Key for cache lookups includes the wavm module root of a challenge, as well -// as the heights for messages and big steps as needed. -type Key struct { - RollupBlockHash common.Hash - WavmModuleRoot common.Hash - MessageHeight uint64 - StepHeights []uint64 +// Init a cache by verifying its base directory exists. +func (c *Cache) Init(_ context.Context) error { + if _, err := os.Stat(c.baseDir); err != nil { + if err := os.MkdirAll(c.baseDir, os.ModePerm); err != nil { + return fmt.Errorf("could not make initialize challenge cache directory %s: %w", c.baseDir, err) + } + } + // We create a temp directory to write our hashes to first when putting to the cache. + // Once writing succeeds, we rename in an atomic operation to the correct file name + // in the cache directory hierarchy in the `Put` function. All of these temporary writes + // will occur in a subdir of the base directory called temp. + tempWritesDir, err := os.MkdirTemp(c.baseDir, "temp") + if err != nil { + return err + } + c.tempWritesDir = tempWritesDir + return nil } // Get a list of hashes from the cache from index 0 up to a certain index. Hashes are saved as files in the directory @@ -122,24 +141,14 @@ func (c *Cache) Put(lookup *Key, hashes []common.Hash) error { if len(hashes) == 0 { return ErrNoHashes } - fName, err := determineFilePath(c.baseDir, lookup) - if err != nil { - return err + if c.tempWritesDir == "" { + return fmt.Errorf("cache not initialized by calling .Init(ctx)") } - // We create a tmp file to write our hashes to first. If writing fails, - // we don't want to leave a half-written file in our cache directory. - // Once writing succeeds, we rename in an atomic operation to the correct file name - // in the cache directory hierarchy. - tmp, err := os.MkdirTemp(c.baseDir, "tmpdir") + fName, err := determineFilePath(c.baseDir, lookup) if err != nil { return err } - tmpFName := filepath.Join(tmp, fName) - dir := filepath.Dir(tmpFName) - if err := os.MkdirAll(dir, os.ModePerm); err != nil { - return fmt.Errorf("could not make tmp directory %s: %w", dir, err) - } - f, err := os.Create(tmpFName) + f, err := os.CreateTemp(c.tempWritesDir, fmt.Sprintf("%s-*", hashesFileName)) if err != nil { return err } @@ -154,11 +163,57 @@ func (c *Cache) Put(lookup *Key, hashes []common.Hash) error { if err := os.MkdirAll(filepath.Dir(fName), os.ModePerm); err != nil { return fmt.Errorf("could not make file directory %s: %w", fName, err) } - // If the file writing was successful, we rename the file from the tmp directory + // If the file writing was successful, we rename the file from the temp directory // into our cache directory. This is an atomic operation. // For more information on this atomic write pattern, see: // https://stackoverflow.com/questions/2333872/how-to-make-file-creation-an-atomic-operation - return os.Rename(tmpFName /*old */, fName /* new */) + return os.Rename(f.Name() /*old */, fName /* new */) +} + +// Prune all entries in the cache with a message number <= a specified value. +func (c *Cache) Prune(ctx context.Context, messageNumber uint64) error { + // Define a regex pattern to extract the message number + numPruned := 0 + messageNumPattern := fmt.Sprintf(`%s-(\d+)-`, messageNumberPrefix) + pattern := regexp.MustCompile(messageNumPattern) + pathsToDelete := make([]string, 0) + if err := filepath.WalkDir(c.baseDir, func(path string, info os.DirEntry, err error) error { + if ctx.Err() != nil { + return ctx.Err() + } + if err != nil { + return err + } + if info.IsDir() { + matches := pattern.FindStringSubmatch(info.Name()) + if len(matches) > 1 { + dirNameMessageNum, err := strconv.Atoi(matches[1]) + if err != nil { + return err + } + // Collect the directory path if the message number is <= the specified value. + if dirNameMessageNum <= int(messageNumber) { + pathsToDelete = append(pathsToDelete, path) + } + } + } + return nil + }); err != nil { + return err + } + // We delete separately from collecting the paths, as deleting while walking + // a dir can cause issues with the filepath.Walk function. + for _, path := range pathsToDelete { + if ctx.Err() != nil { + return ctx.Err() + } + if err := os.RemoveAll(path); err != nil { + return fmt.Errorf("could not prune directory with path %s: %w", path, err) + } + numPruned += 1 + } + log.Info("Pruned challenge cache", "numDirsPruned", numPruned, "messageNumber", messageNumPattern) + return nil } // Reads 32 bytes at a time from a reader up to a specified height. If none, then read all. @@ -217,7 +272,7 @@ for the data requested within the cache directory hierarchy. The folder structur for a given filesystem challenge cache will look as follows: wavm-module-root-0xab/ - rollup-block-hash-0x12...-message-num-70/ + message-num-70-rollup-block-hash-0x12.../ hashes.bin subchallenge-level-1-big-step-100/ hashes.bin @@ -225,7 +280,7 @@ for a given filesystem challenge cache will look as follows: func determineFilePath(baseDir string, lookup *Key) (string, error) { key := make([]string, 0) key = append(key, fmt.Sprintf("%s-%s", wavmModuleRootPrefix, lookup.WavmModuleRoot.Hex())) - key = append(key, fmt.Sprintf("%s-%s-%s-%d", rollupBlockHashPrefix, lookup.RollupBlockHash.Hex(), messageNumberPrefix, lookup.MessageHeight)) + key = append(key, fmt.Sprintf("%s-%d-%s-%s", messageNumberPrefix, lookup.MessageHeight, rollupBlockHashPrefix, lookup.RollupBlockHash.Hex())) for challengeLevel, height := range lookup.StepHeights { key = append(key, fmt.Sprintf( "%s-%d-%s-%d", diff --git a/staker/challenge-cache/cache_test.go b/staker/challenge-cache/cache_test.go index 6b15d62af7..af0a058f78 100644 --- a/staker/challenge-cache/cache_test.go +++ b/staker/challenge-cache/cache_test.go @@ -4,6 +4,7 @@ package challengecache import ( "bytes" + "context" "errors" "fmt" "io" @@ -17,19 +18,19 @@ import ( var _ HistoryCommitmentCacher = (*Cache)(nil) func TestCache(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() basePath := t.TempDir() if err := os.MkdirAll(basePath, os.ModePerm); err != nil { t.Fatal(err) } - t.Cleanup(func() { - if err := os.RemoveAll(basePath); err != nil { - t.Fatal(err) - } - }) cache, err := New(basePath) if err != nil { t.Fatal(err) } + if err = cache.Init(ctx); err != nil { + t.Fatal(err) + } key := &Key{ WavmModuleRoot: common.BytesToHash([]byte("foo")), MessageHeight: 0, @@ -69,6 +70,144 @@ func TestCache(t *testing.T) { } } +func TestPrune(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + basePath := t.TempDir() + if err := os.MkdirAll(basePath, os.ModePerm); err != nil { + t.Fatal(err) + } + cache, err := New(basePath) + if err != nil { + t.Fatal(err) + } + if err = cache.Init(ctx); err != nil { + t.Fatal(err) + } + key := &Key{ + WavmModuleRoot: common.BytesToHash([]byte("foo")), + MessageHeight: 20, + StepHeights: []uint64{0}, + } + if _, err = cache.Get(key, 0); !errors.Is(err, ErrNotFoundInCache) { + t.Fatal(err) + } + t.Run("pruning non-existent dirs does nothing", func(t *testing.T) { + if err = cache.Prune(ctx, key.MessageHeight); err != nil { + t.Error(err) + } + }) + t.Run("pruning single item", func(t *testing.T) { + want := []common.Hash{ + common.BytesToHash([]byte("foo")), + common.BytesToHash([]byte("bar")), + common.BytesToHash([]byte("baz")), + } + err = cache.Put(key, want) + if err != nil { + t.Fatal(err) + } + items, err := cache.Get(key, 3) + if err != nil { + t.Fatal(err) + } + if len(items) != len(want) { + t.Fatalf("Wrong number of hashes. Expected %d, got %d", len(want), len(items)) + } + if err = cache.Prune(ctx, key.MessageHeight); err != nil { + t.Error(err) + } + if _, err = cache.Get(key, 3); !errors.Is(err, ErrNotFoundInCache) { + t.Error(err) + } + }) + t.Run("does not prune items with message number > N", func(t *testing.T) { + want := []common.Hash{ + common.BytesToHash([]byte("foo")), + common.BytesToHash([]byte("bar")), + common.BytesToHash([]byte("baz")), + } + key.MessageHeight = 30 + err = cache.Put(key, want) + if err != nil { + t.Fatal(err) + } + items, err := cache.Get(key, 3) + if err != nil { + t.Fatal(err) + } + if len(items) != len(want) { + t.Fatalf("Wrong number of hashes. Expected %d, got %d", len(want), len(items)) + } + if err = cache.Prune(ctx, 20); err != nil { + t.Error(err) + } + items, err = cache.Get(key, 3) + if err != nil { + t.Fatal(err) + } + if len(items) != len(want) { + t.Fatalf("Wrong number of hashes. Expected %d, got %d", len(want), len(items)) + } + }) + t.Run("prunes many items with message number <= N", func(t *testing.T) { + moduleRoots := []common.Hash{ + common.BytesToHash([]byte("foo")), + common.BytesToHash([]byte("bar")), + common.BytesToHash([]byte("baz")), + } + totalMessages := 10 + for _, root := range moduleRoots { + for i := 0; i < totalMessages; i++ { + hashes := []common.Hash{ + common.BytesToHash([]byte("a")), + common.BytesToHash([]byte("b")), + common.BytesToHash([]byte("c")), + } + key = &Key{ + WavmModuleRoot: root, + MessageHeight: uint64(i), + StepHeights: []uint64{0}, + } + if err = cache.Put(key, hashes); err != nil { + t.Fatal(err) + } + } + } + if err = cache.Prune(ctx, 5); err != nil { + t.Error(err) + } + for _, root := range moduleRoots { + // Expect that we deleted all entries with message number <= 5 + for i := 0; i <= 5; i++ { + key = &Key{ + WavmModuleRoot: root, + MessageHeight: uint64(i), + StepHeights: []uint64{0}, + } + if _, err = cache.Get(key, 3); !errors.Is(err, ErrNotFoundInCache) { + t.Error(err) + } + } + // But also expect that we kept all entries with message number > 5 + for i := 6; i < totalMessages; i++ { + key = &Key{ + WavmModuleRoot: root, + MessageHeight: uint64(i), + StepHeights: []uint64{0}, + } + items, err := cache.Get(key, 3) + if err != nil { + t.Error(err) + } + if len(items) != 3 { + t.Fatalf("Wrong number of hashes. Expected %d, got %d", 3, len(items)) + } + } + } + }) +} + func TestReadWriteStatehashes(t *testing.T) { t.Run("read up to, but had empty reader", func(t *testing.T) { b := bytes.NewBuffer([]byte{}) @@ -255,7 +394,7 @@ func Test_determineFilePath(t *testing.T) { StepHeights: []uint64{50}, }, }, - want: "wavm-module-root-0x0000000000000000000000000000000000000000000000000000000000000000/rollup-block-hash-0x0000000000000000000000000000000000000000000000000000000000000000-message-num-100/subchallenge-level-1-big-step-50/hashes.bin", + want: "wavm-module-root-0x0000000000000000000000000000000000000000000000000000000000000000/message-num-100-rollup-block-hash-0x0000000000000000000000000000000000000000000000000000000000000000/subchallenge-level-1-big-step-50/hashes.bin", wantErr: false, }, } @@ -282,20 +421,20 @@ func Test_determineFilePath(t *testing.T) { } func BenchmarkCache_Read_32Mb(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() b.StopTimer() basePath := os.TempDir() if err := os.MkdirAll(basePath, os.ModePerm); err != nil { b.Fatal(err) } - b.Cleanup(func() { - if err := os.RemoveAll(basePath); err != nil { - b.Fatal(err) - } - }) cache, err := New(basePath) if err != nil { b.Fatal(err) } + if err = cache.Init(ctx); err != nil { + b.Fatal(err) + } key := &Key{ WavmModuleRoot: common.BytesToHash([]byte("foo")), MessageHeight: 0, diff --git a/system_tests/blocks_reexecutor_test.go b/system_tests/blocks_reexecutor_test.go index 66690d1427..c6a7181c46 100644 --- a/system_tests/blocks_reexecutor_test.go +++ b/system_tests/blocks_reexecutor_test.go @@ -2,39 +2,29 @@ package arbtest import ( "context" - "math/big" "testing" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/params" - "github.com/offchainlabs/nitro/arbnode" blocksreexecutor "github.com/offchainlabs/nitro/blocks_reexecutor" - "github.com/offchainlabs/nitro/execution/gethexec" ) func TestBlocksReExecutorModes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - execConfig := gethexec.ConfigDefaultTest() - Require(t, execConfig.Validate()) - l2info, stack, chainDb, arbDb, blockchain := createL2BlockChain(t, nil, t.TempDir(), params.ArbitrumDevTestChainConfig(), &execConfig.Caching) + builder := NewNodeBuilder(ctx).DefaultConfig(t, false) + cleanup := builder.Build(t) + defer cleanup() - execConfigFetcher := func() *gethexec.Config { return execConfig } - execNode, err := gethexec.CreateExecutionNode(ctx, stack, chainDb, blockchain, nil, execConfigFetcher) - Require(t, err) - - parentChainID := big.NewInt(1234) + l2info := builder.L2Info + client := builder.L2.Client + blockchain := builder.L2.ExecNode.Backend.ArbInterface().BlockChain() feedErrChan := make(chan error, 10) - node, err := arbnode.CreateNode(ctx, stack, execNode, arbDb, NewFetcherFromConfig(arbnode.ConfigDefaultL2Test()), blockchain.Config(), nil, nil, nil, nil, nil, feedErrChan, parentChainID, nil) - Require(t, err) - err = node.TxStreamer.AddFakeInitMessage() - Require(t, err) - Require(t, node.Start(ctx)) - client := ClientForStack(t, stack) l2info.GenerateAccount("User2") - for i := 0; i < 100; i++ { + genesis, err := client.BlockNumber(ctx) + Require(t, err) + for i := genesis; i < genesis+100; i++ { tx := l2info.PrepareTx("Owner", "User2", l2info.TransferGas, common.Big1, nil) err := client.SendTransaction(ctx, tx) Require(t, err) @@ -49,14 +39,9 @@ func TestBlocksReExecutorModes(t *testing.T) { success := make(chan struct{}) executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan) executorFull.Start(ctx, success) - select { case err := <-feedErrChan: - t.Errorf("error occurred: %v", err) - if node != nil { - node.StopAndWait() - } - t.FailNow() + t.Fatalf("error occurred: %v", err) case <-success: } @@ -66,14 +51,9 @@ func TestBlocksReExecutorModes(t *testing.T) { c.Mode = "random" executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan) executorRandom.Start(ctx, success) - select { case err := <-feedErrChan: - t.Errorf("error occurred: %v", err) - if node != nil { - node.StopAndWait() - } - t.FailNow() + t.Fatalf("error occurred: %v", err) case <-success: } } diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 16d6b2f131..ccee6aa8e4 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -88,6 +88,7 @@ type SecondNodeParams struct { stackConfig *node.Config dasConfig *das.DataAvailabilityConfig initData *statetransfer.ArbosInitializationInfo + addresses *chaininfo.RollupAddresses } type TestClient struct { @@ -167,6 +168,8 @@ type NodeBuilder struct { isSequencer bool takeOwnership bool withL1 bool + addresses *chaininfo.RollupAddresses + initMessage *arbostypes.ParsedInitMessage // Created nodes L1 *TestClient @@ -212,6 +215,30 @@ func (b *NodeBuilder) WithWasmRootDir(wasmRootDir string) *NodeBuilder { } func (b *NodeBuilder) Build(t *testing.T) func() { + b.CheckConfig(t) + if b.withL1 { + b.BuildL1(t) + return b.BuildL2OnL1(t) + } + return b.BuildL2(t) +} + +func (b *NodeBuilder) CheckConfig(t *testing.T) { + if b.chainConfig == nil { + b.chainConfig = params.ArbitrumDevTestChainConfig() + } + if b.nodeConfig == nil { + b.nodeConfig = arbnode.ConfigDefaultL1Test() + } + if b.execConfig == nil { + b.execConfig = gethexec.ConfigDefaultTest() + } + if b.L1Info == nil { + b.L1Info = NewL1TestInfo(t) + } + if b.L2Info == nil { + b.L2Info = NewArbTestInfo(t, b.chainConfig.ChainID) + } if b.execConfig.RPC.MaxRecreateStateDepth == arbitrum.UninitializedMaxRecreateStateDepth { if b.execConfig.Caching.Archive { b.execConfig.RPC.MaxRecreateStateDepth = arbitrum.DefaultArchiveNodeMaxRecreateStateDepth @@ -219,18 +246,69 @@ func (b *NodeBuilder) Build(t *testing.T) func() { b.execConfig.RPC.MaxRecreateStateDepth = arbitrum.DefaultNonArchiveNodeMaxRecreateStateDepth } } - if b.withL1 { - l1, l2 := NewTestClient(b.ctx), NewTestClient(b.ctx) - b.L2Info, l2.ConsensusNode, l2.Client, l2.Stack, b.L1Info, l1.L1Backend, l1.Client, l1.Stack = - createTestNodeWithL1(t, b.ctx, b.isSequencer, b.nodeConfig, b.execConfig, b.chainConfig, b.l2StackConfig, b.valnodeConfig, b.L2Info) - b.L1, b.L2 = l1, l2 - b.L1.cleanup = func() { requireClose(t, b.L1.Stack) } +} + +func (b *NodeBuilder) BuildL1(t *testing.T) { + b.L1 = NewTestClient(b.ctx) + b.L1Info, b.L1.Client, b.L1.L1Backend, b.L1.Stack = createTestL1BlockChain(t, b.L1Info) + locator, err := server_common.NewMachineLocator(b.valnodeConfig.Wasm.RootPath) + Require(t, err) + b.addresses, b.initMessage = DeployOnTestL1(t, b.ctx, b.L1Info, b.L1.Client, b.chainConfig, locator.LatestWasmModuleRoot()) + b.L1.cleanup = func() { requireClose(t, b.L1.Stack) } +} + +func (b *NodeBuilder) BuildL2OnL1(t *testing.T) func() { + if b.L1 == nil { + t.Fatal("must build L1 before building L2") + } + b.L2 = NewTestClient(b.ctx) + + var l2chainDb ethdb.Database + var l2arbDb ethdb.Database + var l2blockchain *core.BlockChain + _, b.L2.Stack, l2chainDb, l2arbDb, l2blockchain = createL2BlockChainWithStackConfig( + t, b.L2Info, b.dataDir, b.chainConfig, b.initMessage, b.l2StackConfig, &b.execConfig.Caching) + + var sequencerTxOptsPtr *bind.TransactOpts + var dataSigner signature.DataSignerFunc + if b.isSequencer { + sequencerTxOpts := b.L1Info.GetDefaultTransactOpts("Sequencer", b.ctx) + sequencerTxOptsPtr = &sequencerTxOpts + dataSigner = signature.DataSignerFromPrivateKey(b.L1Info.GetInfoWithPrivKey("Sequencer").PrivateKey) } else { - l2 := NewTestClient(b.ctx) - b.L2Info, l2.ConsensusNode, l2.Client = - createTestNode(t, b.ctx, b.L2Info, b.nodeConfig, b.execConfig, b.chainConfig, b.valnodeConfig, b.takeOwnership) - b.L2 = l2 + b.nodeConfig.BatchPoster.Enable = false + b.nodeConfig.Sequencer = false + b.nodeConfig.DelayedSequencer.Enable = false + b.execConfig.Sequencer.Enable = false + } + + var validatorTxOptsPtr *bind.TransactOpts + if b.nodeConfig.Staker.Enable { + validatorTxOpts := b.L1Info.GetDefaultTransactOpts("Validator", b.ctx) + validatorTxOptsPtr = &validatorTxOpts } + + AddDefaultValNode(t, b.ctx, b.nodeConfig, true, "", b.valnodeConfig.Wasm.RootPath) + + Require(t, b.execConfig.Validate()) + execConfig := b.execConfig + execConfigFetcher := func() *gethexec.Config { return execConfig } + execNode, err := gethexec.CreateExecutionNode(b.ctx, b.L2.Stack, l2chainDb, l2blockchain, b.L1.Client, execConfigFetcher) + Require(t, err) + + fatalErrChan := make(chan error, 10) + b.L2.ConsensusNode, err = arbnode.CreateNode( + b.ctx, b.L2.Stack, execNode, l2arbDb, NewFetcherFromConfig(b.nodeConfig), l2blockchain.Config(), b.L1.Client, + b.addresses, validatorTxOptsPtr, sequencerTxOptsPtr, dataSigner, fatalErrChan, big.NewInt(1337), nil) + Require(t, err) + + err = b.L2.ConsensusNode.Start(b.ctx) + Require(t, err) + + b.L2.Client = ClientForStack(t, b.L2.Stack) + + StartWatchChanErr(t, b.ctx, fatalErrChan, b.L2.ConsensusNode) + b.L2.ExecNode = getExecNode(t, b.L2.ConsensusNode) b.L2.cleanup = func() { b.L2.ConsensusNode.StopAndWait() } return func() { @@ -241,6 +319,93 @@ func (b *NodeBuilder) Build(t *testing.T) func() { } } +// L2 -Only. Enough for tests that needs no interface to L1 +// Requires precompiles.AllowDebugPrecompiles = true +func (b *NodeBuilder) BuildL2(t *testing.T) func() { + b.L2 = NewTestClient(b.ctx) + + AddDefaultValNode(t, b.ctx, b.nodeConfig, true, "", b.valnodeConfig.Wasm.RootPath) + + var chainDb ethdb.Database + var arbDb ethdb.Database + var blockchain *core.BlockChain + b.L2Info, b.L2.Stack, chainDb, arbDb, blockchain = createL2BlockChain( + t, b.L2Info, b.dataDir, b.chainConfig, &b.execConfig.Caching) + + Require(t, b.execConfig.Validate()) + execConfig := b.execConfig + execConfigFetcher := func() *gethexec.Config { return execConfig } + execNode, err := gethexec.CreateExecutionNode(b.ctx, b.L2.Stack, chainDb, blockchain, nil, execConfigFetcher) + Require(t, err) + + fatalErrChan := make(chan error, 10) + b.L2.ConsensusNode, err = arbnode.CreateNode( + b.ctx, b.L2.Stack, execNode, arbDb, NewFetcherFromConfig(b.nodeConfig), blockchain.Config(), + nil, nil, nil, nil, nil, fatalErrChan, big.NewInt(1337), nil) + Require(t, err) + + // Give the node an init message + err = b.L2.ConsensusNode.TxStreamer.AddFakeInitMessage() + Require(t, err) + + err = b.L2.ConsensusNode.Start(b.ctx) + Require(t, err) + + b.L2.Client = ClientForStack(t, b.L2.Stack) + + if b.takeOwnership { + debugAuth := b.L2Info.GetDefaultTransactOpts("Owner", b.ctx) + + // make auth a chain owner + arbdebug, err := precompilesgen.NewArbDebug(common.HexToAddress("0xff"), b.L2.Client) + Require(t, err, "failed to deploy ArbDebug") + + tx, err := arbdebug.BecomeChainOwner(&debugAuth) + Require(t, err, "failed to deploy ArbDebug") + + _, err = EnsureTxSucceeded(b.ctx, b.L2.Client, tx) + Require(t, err) + } + + StartWatchChanErr(t, b.ctx, fatalErrChan, b.L2.ConsensusNode) + + b.L2.ExecNode = getExecNode(t, b.L2.ConsensusNode) + b.L2.cleanup = func() { b.L2.ConsensusNode.StopAndWait() } + return func() { b.L2.cleanup() } +} + +// L2 -Only. RestartL2Node shutdowns the existing l2 node and start it again using the same data dir. +func (b *NodeBuilder) RestartL2Node(t *testing.T) { + if b.L2 == nil { + t.Fatalf("L2 was not created") + } + b.L2.cleanup() + + l2info, stack, chainDb, arbDb, blockchain := createL2BlockChain(t, b.L2Info, b.dataDir, b.chainConfig, &b.execConfig.Caching) + + execConfigFetcher := func() *gethexec.Config { return b.execConfig } + execNode, err := gethexec.CreateExecutionNode(b.ctx, stack, chainDb, blockchain, nil, execConfigFetcher) + Require(t, err) + + feedErrChan := make(chan error, 10) + currentNode, err := arbnode.CreateNode(b.ctx, stack, execNode, arbDb, NewFetcherFromConfig(b.nodeConfig), blockchain.Config(), nil, nil, nil, nil, nil, feedErrChan, big.NewInt(1337), nil) + Require(t, err) + + Require(t, currentNode.Start(b.ctx)) + client := ClientForStack(t, stack) + + StartWatchChanErr(t, b.ctx, feedErrChan, currentNode) + + l2 := NewTestClient(b.ctx) + l2.ConsensusNode = currentNode + l2.Client = client + l2.ExecNode = execNode + l2.cleanup = func() { b.L2.ConsensusNode.StopAndWait() } + + b.L2 = l2 + b.L2Info = l2info +} + func (b *NodeBuilder) Build2ndNode(t *testing.T, params *SecondNodeParams) (*TestClient, func()) { if b.L2 == nil { t.Fatal("builder did not previously build a L2 Node") @@ -265,6 +430,9 @@ func (b *NodeBuilder) Build2ndNode(t *testing.T, params *SecondNodeParams) (*Tes if params.execConfig == nil { params.execConfig = b.execConfig } + if params.addresses == nil { + params.addresses = b.addresses + } if params.execConfig.RPC.MaxRecreateStateDepth == arbitrum.UninitializedMaxRecreateStateDepth { if params.execConfig.Caching.Archive { params.execConfig.RPC.MaxRecreateStateDepth = arbitrum.DefaultArchiveNodeMaxRecreateStateDepth @@ -272,10 +440,13 @@ func (b *NodeBuilder) Build2ndNode(t *testing.T, params *SecondNodeParams) (*Tes params.execConfig.RPC.MaxRecreateStateDepth = arbitrum.DefaultNonArchiveNodeMaxRecreateStateDepth } } + if b.nodeConfig.BatchPoster.Enable && params.nodeConfig.BatchPoster.Enable && params.nodeConfig.BatchPoster.RedisUrl == "" { + t.Fatal("The batch poster must use Redis when enabled for multiple nodes") + } l2 := NewTestClient(b.ctx) l2.Client, l2.ConsensusNode = - Create2ndNodeWithConfig(t, b.ctx, b.L2.ConsensusNode, b.L1.Stack, b.L1Info, params.initData, params.nodeConfig, params.execConfig, params.stackConfig, b.valnodeConfig) + Create2ndNodeWithConfig(t, b.ctx, b.L2.ConsensusNode, b.L1.Stack, b.L1Info, params.initData, params.nodeConfig, params.execConfig, params.stackConfig, b.valnodeConfig, params.addresses, b.initMessage) l2.ExecNode = getExecNode(t, l2.ConsensusNode) l2.cleanup = func() { l2.ConsensusNode.StopAndWait() } return l2, func() { l2.cleanup() } @@ -327,7 +498,6 @@ func BridgeBalance( l2info.SetFullAccountInfo(account, &AccountInfo{ Address: l1acct.Address, PrivateKey: l1acct.PrivateKey, - Nonce: 0, }) } else { l2acct := l2info.GetInfoWithPrivKey(account) @@ -511,10 +681,6 @@ func (c *staticNodeConfigFetcher) Started() bool { return true } -func createTestL1BlockChain(t *testing.T, l1info info) (info, *ethclient.Client, *eth.Ethereum, *node.Node) { - return createTestL1BlockChainWithConfig(t, l1info, nil) -} - func createStackConfigForTest(dataDir string) *node.Config { stackConf := node.DefaultConfig stackConf.DataDir = dataDir @@ -639,13 +805,11 @@ func AddDefaultValNode(t *testing.T, ctx context.Context, nodeConfig *arbnode.Co configByValidationNode(nodeConfig, valStack) } -func createTestL1BlockChainWithConfig(t *testing.T, l1info info, stackConfig *node.Config) (info, *ethclient.Client, *eth.Ethereum, *node.Node) { +func createTestL1BlockChain(t *testing.T, l1info info) (info, *ethclient.Client, *eth.Ethereum, *node.Node) { if l1info == nil { l1info = NewL1TestInfo(t) } - if stackConfig == nil { - stackConfig = createStackConfigForTest(t.TempDir()) - } + stackConfig := createStackConfigForTest(t.TempDir()) l1info.GenerateAccount("Faucet") chainConfig := params.ArbitrumDevTestChainConfig() @@ -819,137 +983,6 @@ func ClientForStack(t *testing.T, backend *node.Node) *ethclient.Client { return ethclient.NewClient(rpcClient) } -// Create and deploy L1 and arbnode for L2 -func createTestNodeWithL1( - t *testing.T, - ctx context.Context, - isSequencer bool, - nodeConfig *arbnode.Config, - execConfig *gethexec.Config, - chainConfig *params.ChainConfig, - stackConfig *node.Config, - valnodeConfig *valnode.Config, - l2info_in info, -) ( - l2info info, currentNode *arbnode.Node, l2client *ethclient.Client, l2stack *node.Node, - l1info info, l1backend *eth.Ethereum, l1client *ethclient.Client, l1stack *node.Node, -) { - if nodeConfig == nil { - nodeConfig = arbnode.ConfigDefaultL1Test() - } - if execConfig == nil { - execConfig = gethexec.ConfigDefaultTest() - } - if chainConfig == nil { - chainConfig = params.ArbitrumDevTestChainConfig() - } - fatalErrChan := make(chan error, 10) - l1info, l1client, l1backend, l1stack = createTestL1BlockChain(t, nil) - var l2chainDb ethdb.Database - var l2arbDb ethdb.Database - var l2blockchain *core.BlockChain - l2info = l2info_in - if l2info == nil { - l2info = NewArbTestInfo(t, chainConfig.ChainID) - } - locator, err := server_common.NewMachineLocator(valnodeConfig.Wasm.RootPath) - Require(t, err) - addresses, initMessage := DeployOnTestL1(t, ctx, l1info, l1client, chainConfig, locator.LatestWasmModuleRoot()) - _, l2stack, l2chainDb, l2arbDb, l2blockchain = createL2BlockChainWithStackConfig(t, l2info, "", chainConfig, initMessage, stackConfig, &execConfig.Caching) - var sequencerTxOptsPtr *bind.TransactOpts - var dataSigner signature.DataSignerFunc - if isSequencer { - sequencerTxOpts := l1info.GetDefaultTransactOpts("Sequencer", ctx) - sequencerTxOptsPtr = &sequencerTxOpts - dataSigner = signature.DataSignerFromPrivateKey(l1info.GetInfoWithPrivKey("Sequencer").PrivateKey) - } - - if !isSequencer { - nodeConfig.BatchPoster.Enable = false - nodeConfig.Sequencer = false - nodeConfig.DelayedSequencer.Enable = false - execConfig.Sequencer.Enable = false - } - - var validatorTxOptsPtr *bind.TransactOpts - if nodeConfig.Staker.Enable { - validatorTxOpts := l1info.GetDefaultTransactOpts("Validator", ctx) - validatorTxOptsPtr = &validatorTxOpts - } - - AddDefaultValNode(t, ctx, nodeConfig, true, "", valnodeConfig.Wasm.RootPath) - - Require(t, execConfig.Validate()) - execConfigFetcher := func() *gethexec.Config { return execConfig } - execNode, err := gethexec.CreateExecutionNode(ctx, l2stack, l2chainDb, l2blockchain, l1client, execConfigFetcher) - Require(t, err) - currentNode, err = arbnode.CreateNode( - ctx, l2stack, execNode, l2arbDb, NewFetcherFromConfig(nodeConfig), l2blockchain.Config(), l1client, - addresses, validatorTxOptsPtr, sequencerTxOptsPtr, dataSigner, fatalErrChan, big.NewInt(1337), nil, - ) - Require(t, err) - - Require(t, currentNode.Start(ctx)) - - l2client = ClientForStack(t, l2stack) - - StartWatchChanErr(t, ctx, fatalErrChan, currentNode) - - return -} - -// L2 -Only. Enough for tests that needs no interface to L1 -// Requires precompiles.AllowDebugPrecompiles = true -func createTestNode( - t *testing.T, ctx context.Context, l2Info *BlockchainTestInfo, nodeConfig *arbnode.Config, execConfig *gethexec.Config, chainConfig *params.ChainConfig, valnodeConfig *valnode.Config, takeOwnership bool, -) (*BlockchainTestInfo, *arbnode.Node, *ethclient.Client) { - if nodeConfig == nil { - nodeConfig = arbnode.ConfigDefaultL2Test() - } - if execConfig == nil { - execConfig = gethexec.ConfigDefaultTest() - } - - feedErrChan := make(chan error, 10) - - AddDefaultValNode(t, ctx, nodeConfig, true, "", valnodeConfig.Wasm.RootPath) - - l2info, stack, chainDb, arbDb, blockchain := createL2BlockChain(t, l2Info, "", chainConfig, &execConfig.Caching) - - Require(t, execConfig.Validate()) - execConfigFetcher := func() *gethexec.Config { return execConfig } - execNode, err := gethexec.CreateExecutionNode(ctx, stack, chainDb, blockchain, nil, execConfigFetcher) - Require(t, err) - - currentNode, err := arbnode.CreateNode(ctx, stack, execNode, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), nil, nil, nil, nil, nil, feedErrChan, big.NewInt(1337), nil) - Require(t, err) - - // Give the node an init message - err = currentNode.TxStreamer.AddFakeInitMessage() - Require(t, err) - - Require(t, currentNode.Start(ctx)) - client := ClientForStack(t, stack) - - if takeOwnership { - debugAuth := l2info.GetDefaultTransactOpts("Owner", ctx) - - // make auth a chain owner - arbdebug, err := precompilesgen.NewArbDebug(common.HexToAddress("0xff"), client) - Require(t, err, "failed to deploy ArbDebug") - - tx, err := arbdebug.BecomeChainOwner(&debugAuth) - Require(t, err, "failed to deploy ArbDebug") - - _, err = EnsureTxSucceeded(ctx, client, tx) - Require(t, err) - } - - StartWatchChanErr(t, ctx, feedErrChan, currentNode) - - return l2info, currentNode, client -} - func StartWatchChanErr(t *testing.T, ctx context.Context, feedErrChan chan error, node *arbnode.Node) { go func() { select { @@ -985,6 +1018,8 @@ func Create2ndNodeWithConfig( execConfig *gethexec.Config, stackConfig *node.Config, valnodeConfig *valnode.Config, + addresses *chaininfo.RollupAddresses, + initMessage *arbostypes.ParsedInitMessage, ) (*ethclient.Client, *arbnode.Node) { if nodeConfig == nil { nodeConfig = arbnode.ConfigDefaultL1NonSequencerTest() @@ -1018,7 +1053,6 @@ func Create2ndNodeWithConfig( firstExec := getExecNode(t, first) chainConfig := firstExec.ArbInterface.BlockChain().Config() - initMessage := getInitMessage(ctx, t, l1client, first.DeployInfo) coreCacheConfig := gethexec.DefaultCacheConfigFor(l2stack, &execConfig.Caching) l2blockchain, err := gethexec.WriteOrTestBlockChain(l2chainDb, coreCacheConfig, initReader, chainConfig, initMessage, gethexec.ConfigDefaultTest().TxLookupLimit, 0) @@ -1032,7 +1066,7 @@ func Create2ndNodeWithConfig( currentExec, err := gethexec.CreateExecutionNode(ctx, l2stack, l2chainDb, l2blockchain, l1client, configFetcher) Require(t, err) - currentNode, err := arbnode.CreateNode(ctx, l2stack, currentExec, l2arbDb, NewFetcherFromConfig(nodeConfig), l2blockchain.Config(), l1client, first.DeployInfo, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil) + currentNode, err := arbnode.CreateNode(ctx, l2stack, currentExec, l2arbDb, NewFetcherFromConfig(nodeConfig), l2blockchain.Config(), l1client, addresses, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil) Require(t, err) err = currentNode.Start(ctx) diff --git a/system_tests/conditionaltx_test.go b/system_tests/conditionaltx_test.go index 4f800d9769..286060e666 100644 --- a/system_tests/conditionaltx_test.go +++ b/system_tests/conditionaltx_test.go @@ -58,7 +58,7 @@ func testConditionalTxThatShouldSucceed(t *testing.T, ctx context.Context, idx i func testConditionalTxThatShouldFail(t *testing.T, ctx context.Context, idx int, l2info info, rpcClient *rpc.Client, options *arbitrum_types.ConditionalOptions, expectedErrorCode int) { t.Helper() accountInfo := l2info.GetInfoWithPrivKey("Owner") - nonce := accountInfo.Nonce + nonce := accountInfo.Nonce.Load() tx := l2info.PrepareTx("Owner", "User2", l2info.TransferGas, big.NewInt(1e12), nil) err := arbitrum.SendConditionalTransactionRPC(ctx, rpcClient, tx, options) if err == nil { @@ -77,7 +77,7 @@ func testConditionalTxThatShouldFail(t *testing.T, ctx context.Context, idx int, Fatal(t, "unexpected error type, err:", err) } } - accountInfo.Nonce = nonce // revert nonce as the tx failed + accountInfo.Nonce.Store(nonce) // revert nonce as the tx failed } func getEmptyOptions(address common.Address) []*arbitrum_types.ConditionalOptions { diff --git a/system_tests/das_test.go b/system_tests/das_test.go index 2332f4ee9e..76ec2d7493 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -19,22 +19,16 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/blsSignatures" - "github.com/offchainlabs/nitro/cmd/conf" "github.com/offchainlabs/nitro/cmd/genericconf" "github.com/offchainlabs/nitro/das" - "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/solgen/go/bridgegen" "github.com/offchainlabs/nitro/solgen/go/precompilesgen" "github.com/offchainlabs/nitro/util/headerreader" - "github.com/offchainlabs/nitro/util/signature" - "github.com/offchainlabs/nitro/validator/server_common" - "github.com/offchainlabs/nitro/validator/valnode" "golang.org/x/exp/slog" ) @@ -108,98 +102,68 @@ func TestDASRekey(t *testing.T) { defer cancel() // Setup L1 chain and contracts - chainConfig := params.ArbitrumDevTestDASChainConfig() - l1info, l1client, _, l1stack := createTestL1BlockChain(t, nil) - defer requireClose(t, l1stack) - feedErrChan := make(chan error, 10) - locator, err := server_common.NewMachineLocator("") - Require(t, err) - addresses, initMessage := DeployOnTestL1(t, ctx, l1info, l1client, chainConfig, locator.LatestWasmModuleRoot()) + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + builder.BuildL1(t) // Setup DAS servers dasDataDir := t.TempDir() - nodeDir := t.TempDir() - dasRpcServerA, pubkeyA, backendConfigA, _, restServerUrlA := startLocalDASServer(t, ctx, dasDataDir, l1client, addresses.SequencerInbox) - l2info := NewArbTestInfo(t, chainConfig.ChainID) - l1NodeConfigA := arbnode.ConfigDefaultL1Test() + dasRpcServerA, pubkeyA, backendConfigA, _, restServerUrlA := startLocalDASServer(t, ctx, dasDataDir, builder.L1.Client, builder.addresses.SequencerInbox) l1NodeConfigB := arbnode.ConfigDefaultL1NonSequencerTest() - sequencerTxOpts := l1info.GetDefaultTransactOpts("Sequencer", ctx) - sequencerTxOptsPtr := &sequencerTxOpts - parentChainID := big.NewInt(1337) { - authorizeDASKeyset(t, ctx, pubkeyA, l1info, l1client) - - // Setup L2 chain - _, l2stackA, l2chainDb, l2arbDb, l2blockchain := createL2BlockChainWithStackConfig(t, l2info, nodeDir, chainConfig, initMessage, nil, nil) - l2info.GenerateAccount("User2") + authorizeDASKeyset(t, ctx, pubkeyA, builder.L1Info, builder.L1.Client) // Setup DAS config + builder.nodeConfig.DataAvailability.Enable = true + builder.nodeConfig.DataAvailability.RPCAggregator = aggConfigForBackend(t, backendConfigA) + builder.nodeConfig.DataAvailability.RestAggregator = das.DefaultRestfulClientAggregatorConfig + builder.nodeConfig.DataAvailability.RestAggregator.Enable = true + builder.nodeConfig.DataAvailability.RestAggregator.Urls = []string{restServerUrlA} + builder.nodeConfig.DataAvailability.ParentChainNodeURL = "none" - l1NodeConfigA.DataAvailability.Enable = true - l1NodeConfigA.DataAvailability.RPCAggregator = aggConfigForBackend(t, backendConfigA) - l1NodeConfigA.DataAvailability.RestAggregator = das.DefaultRestfulClientAggregatorConfig - l1NodeConfigA.DataAvailability.RestAggregator.Enable = true - l1NodeConfigA.DataAvailability.RestAggregator.Urls = []string{restServerUrlA} - l1NodeConfigA.DataAvailability.ParentChainNodeURL = "none" - execA, err := gethexec.CreateExecutionNode(ctx, l2stackA, l2chainDb, l2blockchain, l1client, gethexec.ConfigDefaultTest) - Require(t, err) - nodeA, err := arbnode.CreateNode(ctx, l2stackA, execA, l2arbDb, NewFetcherFromConfig(l1NodeConfigA), l2blockchain.Config(), l1client, addresses, sequencerTxOptsPtr, sequencerTxOptsPtr, nil, feedErrChan, parentChainID, nil) - Require(t, err) - Require(t, nodeA.Start(ctx)) - l2clientA := ClientForStack(t, l2stackA) + // Setup L2 chain + builder.L2Info.GenerateAccount("User2") + builder.BuildL2OnL1(t) + // Setup second node l1NodeConfigB.BlockValidator.Enable = false l1NodeConfigB.DataAvailability.Enable = true l1NodeConfigB.DataAvailability.RestAggregator = das.DefaultRestfulClientAggregatorConfig l1NodeConfigB.DataAvailability.RestAggregator.Enable = true l1NodeConfigB.DataAvailability.RestAggregator.Urls = []string{restServerUrlA} - l1NodeConfigB.DataAvailability.ParentChainNodeURL = "none" + nodeBParams := SecondNodeParams{ + nodeConfig: l1NodeConfigB, + initData: &builder.L2Info.ArbInitData, + } + l2B, cleanupB := builder.Build2ndNode(t, &nodeBParams) + checkBatchPosting(t, ctx, builder.L1.Client, builder.L2.Client, builder.L1Info, builder.L2Info, big.NewInt(1e12), l2B.Client) - l2clientB, nodeB := Create2ndNodeWithConfig(t, ctx, nodeA, l1stack, l1info, &l2info.ArbInitData, l1NodeConfigB, nil, nil, &valnode.TestValidationConfig) - checkBatchPosting(t, ctx, l1client, l2clientA, l1info, l2info, big.NewInt(1e12), l2clientB) - nodeA.StopAndWait() - nodeB.StopAndWait() + builder.L2.cleanup() + cleanupB() } - err = dasRpcServerA.Shutdown(ctx) + err := dasRpcServerA.Shutdown(ctx) Require(t, err) - dasRpcServerB, pubkeyB, backendConfigB, _, _ := startLocalDASServer(t, ctx, dasDataDir, l1client, addresses.SequencerInbox) + dasRpcServerB, pubkeyB, backendConfigB, _, _ := startLocalDASServer(t, ctx, dasDataDir, builder.L1.Client, builder.addresses.SequencerInbox) defer func() { err = dasRpcServerB.Shutdown(ctx) Require(t, err) }() - authorizeDASKeyset(t, ctx, pubkeyB, l1info, l1client) + authorizeDASKeyset(t, ctx, pubkeyB, builder.L1Info, builder.L1.Client) // Restart the node on the new keyset against the new DAS server running on the same disk as the first with new keys - - stackConfig := createStackConfigForTest(nodeDir) - l2stackA, err := node.New(stackConfig) - Require(t, err) - - l2chainDb, err := l2stackA.OpenDatabaseWithExtraOptions("l2chaindata", 0, 0, "l2chaindata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("l2chaindata")) - Require(t, err) - - l2arbDb, err := l2stackA.OpenDatabaseWithExtraOptions("arbitrumdata", 0, 0, "arbitrumdata/", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("arbitrumdata")) - Require(t, err) - - l2blockchain, err := gethexec.GetBlockChain(l2chainDb, nil, chainConfig, gethexec.ConfigDefaultTest().TxLookupLimit) - Require(t, err) - - execA, err := gethexec.CreateExecutionNode(ctx, l2stackA, l2chainDb, l2blockchain, l1client, gethexec.ConfigDefaultTest) - Require(t, err) - - l1NodeConfigA.DataAvailability.RPCAggregator = aggConfigForBackend(t, backendConfigB) - nodeA, err := arbnode.CreateNode(ctx, l2stackA, execA, l2arbDb, NewFetcherFromConfig(l1NodeConfigA), l2blockchain.Config(), l1client, addresses, sequencerTxOptsPtr, sequencerTxOptsPtr, nil, feedErrChan, parentChainID, nil) - Require(t, err) - Require(t, nodeA.Start(ctx)) - l2clientA := ClientForStack(t, l2stackA) - - l2clientB, nodeB := Create2ndNodeWithConfig(t, ctx, nodeA, l1stack, l1info, &l2info.ArbInitData, l1NodeConfigB, nil, nil, &valnode.TestValidationConfig) - checkBatchPosting(t, ctx, l1client, l2clientA, l1info, l2info, big.NewInt(2e12), l2clientB) - - nodeA.StopAndWait() - nodeB.StopAndWait() + builder.nodeConfig.DataAvailability.RPCAggregator = aggConfigForBackend(t, backendConfigB) + builder.l2StackConfig = createStackConfigForTest(builder.dataDir) + cleanup := builder.BuildL2OnL1(t) + defer cleanup() + + nodeBParams := SecondNodeParams{ + nodeConfig: l1NodeConfigB, + initData: &builder.L2Info.ArbInitData, + } + l2B, cleanup := builder.Build2ndNode(t, &nodeBParams) + defer cleanup() + checkBatchPosting(t, ctx, builder.L1.Client, builder.L2.Client, builder.L1Info, builder.L2Info, big.NewInt(2e12), l2B.Client) } func checkBatchPosting(t *testing.T, ctx context.Context, l1client, l2clientA *ethclient.Client, l1info, l2info info, expectedBalance *big.Int, l2ClientsToCheck ...*ethclient.Client) { @@ -240,20 +204,16 @@ func TestDASComplexConfigAndRestMirror(t *testing.T) { defer cancel() // Setup L1 chain and contracts - chainConfig := params.ArbitrumDevTestDASChainConfig() - l1info, l1client, _, l1stack := createTestL1BlockChain(t, nil) - defer requireClose(t, l1stack) - arbSys, _ := precompilesgen.NewArbSys(types.ArbSysAddress, l1client) - l1Reader, err := headerreader.New(ctx, l1client, func() *headerreader.Config { return &headerreader.TestConfig }, arbSys) + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + builder.chainConfig = params.ArbitrumDevTestDASChainConfig() + builder.BuildL1(t) + + arbSys, _ := precompilesgen.NewArbSys(types.ArbSysAddress, builder.L1.Client) + l1Reader, err := headerreader.New(ctx, builder.L1.Client, func() *headerreader.Config { return &headerreader.TestConfig }, arbSys) Require(t, err) l1Reader.Start(ctx) defer l1Reader.StopAndWait() - feedErrChan := make(chan error, 10) - locator, err := server_common.NewMachineLocator("") - Require(t, err) - addresses, initMessage := DeployOnTestL1(t, ctx, l1info, l1client, chainConfig, locator.LatestWasmModuleRoot()) - keyDir, fileDataDir, dbDataDir := t.TempDir(), t.TempDir(), t.TempDir() pubkey, _, err := das.GenerateAndStoreKeys(keyDir) Require(t, err) @@ -281,7 +241,7 @@ func TestDASComplexConfigAndRestMirror(t *testing.T) { // L1NodeURL: normally we would have to set this but we are passing in the already constructed client and addresses to the factory } - daReader, daWriter, signatureVerifier, daHealthChecker, lifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig, l1Reader, &addresses.SequencerInbox) + daReader, daWriter, signatureVerifier, daHealthChecker, lifecycleManager, err := das.CreateDAComponentsForDaserver(ctx, &serverConfig, l1Reader, &builder.addresses.SequencerInbox) Require(t, err) defer lifecycleManager.StopAndWaitUntil(time.Second) rpcLis, err := net.Listen("tcp", "localhost:0") @@ -291,13 +251,13 @@ func TestDASComplexConfigAndRestMirror(t *testing.T) { restLis, err := net.Listen("tcp", "localhost:0") Require(t, err) restServer, err := das.NewRestfulDasServerOnListener(restLis, genericconf.HTTPServerTimeoutConfigDefault, daReader, daHealthChecker) + Require(t, err) pubkeyA := pubkey - authorizeDASKeyset(t, ctx, pubkeyA, l1info, l1client) + authorizeDASKeyset(t, ctx, pubkeyA, builder.L1Info, builder.L1.Client) // - l1NodeConfigA := arbnode.ConfigDefaultL1Test() - l1NodeConfigA.DataAvailability = das.DataAvailabilityConfig{ + builder.nodeConfig.DataAvailability = das.DataAvailabilityConfig{ Enable: true, // AggregatorConfig set up below @@ -307,29 +267,17 @@ func TestDASComplexConfigAndRestMirror(t *testing.T) { URL: "http://" + rpcLis.Addr().String(), Pubkey: blsPubToBase64(pubkey), } - l1NodeConfigA.DataAvailability.RPCAggregator = aggConfigForBackend(t, beConfigA) - l1NodeConfigA.DataAvailability.RestAggregator = das.DefaultRestfulClientAggregatorConfig - l1NodeConfigA.DataAvailability.RestAggregator.Enable = true - l1NodeConfigA.DataAvailability.RestAggregator.Urls = []string{"http://" + restLis.Addr().String()} - l1NodeConfigA.DataAvailability.ParentChainNodeURL = "none" - - dataSigner := signature.DataSignerFromPrivateKey(l1info.Accounts["Sequencer"].PrivateKey) - - Require(t, err) + builder.nodeConfig.DataAvailability.RPCAggregator = aggConfigForBackend(t, beConfigA) + builder.nodeConfig.DataAvailability.RestAggregator = das.DefaultRestfulClientAggregatorConfig + builder.nodeConfig.DataAvailability.RestAggregator.Enable = true + builder.nodeConfig.DataAvailability.RestAggregator.Urls = []string{"http://" + restLis.Addr().String()} + builder.nodeConfig.DataAvailability.ParentChainNodeURL = "none" // Setup L2 chain - l2info, l2stackA, l2chainDb, l2arbDb, l2blockchain := createL2BlockChainWithStackConfig(t, nil, "", chainConfig, initMessage, nil, nil) - l2info.GenerateAccount("User2") - - execA, err := gethexec.CreateExecutionNode(ctx, l2stackA, l2chainDb, l2blockchain, l1client, gethexec.ConfigDefaultTest) - Require(t, err) - - sequencerTxOpts := l1info.GetDefaultTransactOpts("Sequencer", ctx) - sequencerTxOptsPtr := &sequencerTxOpts - nodeA, err := arbnode.CreateNode(ctx, l2stackA, execA, l2arbDb, NewFetcherFromConfig(l1NodeConfigA), l2blockchain.Config(), l1client, addresses, sequencerTxOptsPtr, sequencerTxOptsPtr, dataSigner, feedErrChan, big.NewInt(1337), nil) - Require(t, err) - Require(t, nodeA.Start(ctx)) - l2clientA := ClientForStack(t, l2stackA) + builder.L2Info = NewArbTestInfo(t, builder.chainConfig.ChainID) + builder.L2Info.GenerateAccount("User2") + cleanup := builder.BuildL2OnL1(t) + defer cleanup() // Create node to sync from chain l1NodeConfigB := arbnode.ConfigDefaultL1NonSequencerTest() @@ -348,12 +296,14 @@ func TestDASComplexConfigAndRestMirror(t *testing.T) { l1NodeConfigB.DataAvailability.RestAggregator.Enable = true l1NodeConfigB.DataAvailability.RestAggregator.Urls = []string{"http://" + restLis.Addr().String()} l1NodeConfigB.DataAvailability.ParentChainNodeURL = "none" - l2clientB, nodeB := Create2ndNodeWithConfig(t, ctx, nodeA, l1stack, l1info, &l2info.ArbInitData, l1NodeConfigB, nil, nil, &valnode.TestValidationConfig) - - checkBatchPosting(t, ctx, l1client, l2clientA, l1info, l2info, big.NewInt(1e12), l2clientB) + nodeBParams := SecondNodeParams{ + nodeConfig: l1NodeConfigB, + initData: &builder.L2Info.ArbInitData, + } + l2B, cleanupB := builder.Build2ndNode(t, &nodeBParams) + defer cleanupB() - nodeA.StopAndWait() - nodeB.StopAndWait() + checkBatchPosting(t, ctx, builder.L1.Client, builder.L2.Client, builder.L1Info, builder.L2Info, big.NewInt(1e12), l2B.Client) err = restServer.Shutdown() Require(t, err) diff --git a/system_tests/fees_test.go b/system_tests/fees_test.go index 4d8fbf43fd..ccca82e009 100644 --- a/system_tests/fees_test.go +++ b/system_tests/fees_test.go @@ -2,8 +2,10 @@ // For license information, see https://github.com/nitro/blob/master/LICENSE // these tests seems to consume too much memory with race detection -//go:build !race -// +build !race +// Test randomly fails with L1 gas price estimate should tend toward the basefee +// so skipping locally, but running on CI +//go:build !race && cionly +// +build !race,cionly package arbtest diff --git a/system_tests/full_challenge_impl_test.go b/system_tests/full_challenge_impl_test.go index b466e2db23..bf95c390ba 100644 --- a/system_tests/full_challenge_impl_test.go +++ b/system_tests/full_challenge_impl_test.go @@ -25,20 +25,15 @@ import ( "github.com/offchainlabs/nitro/arbcompress" "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos" - "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbstate" "github.com/offchainlabs/nitro/arbutil" - "github.com/offchainlabs/nitro/cmd/chaininfo" - "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/solgen/go/challengegen" "github.com/offchainlabs/nitro/solgen/go/mocksgen" "github.com/offchainlabs/nitro/solgen/go/ospgen" "github.com/offchainlabs/nitro/solgen/go/yulgen" "github.com/offchainlabs/nitro/staker" - "github.com/offchainlabs/nitro/util/signature" "github.com/offchainlabs/nitro/validator" "github.com/offchainlabs/nitro/validator/server_common" - "github.com/offchainlabs/nitro/validator/valnode" ) func DeployOneStepProofEntry(t *testing.T, ctx context.Context, auth *bind.TransactOpts, client *ethclient.Client) common.Address { @@ -238,16 +233,6 @@ func setupSequencerInboxStub(ctx context.Context, t *testing.T, l1Info *Blockcha return bridgeAddr, seqInbox, seqInboxAddr } -func createL2Nodes(t *testing.T, ctx context.Context, conf *arbnode.Config, chainConfig *params.ChainConfig, l1Client arbutil.L1Interface, l2info *BlockchainTestInfo, rollupAddresses *chaininfo.RollupAddresses, initMsg *arbostypes.ParsedInitMessage, txOpts *bind.TransactOpts, signer signature.DataSignerFunc, fatalErrChan chan error) (*arbnode.Node, *gethexec.ExecutionNode) { - _, stack, l2ChainDb, l2ArbDb, l2Blockchain := createL2BlockChainWithStackConfig(t, l2info, "", chainConfig, initMsg, nil, nil) - execNode, err := gethexec.CreateExecutionNode(ctx, stack, l2ChainDb, l2Blockchain, l1Client, gethexec.ConfigDefaultTest) - Require(t, err) - consensusNode, err := arbnode.CreateNode(ctx, stack, execNode, l2ArbDb, NewFetcherFromConfig(conf), chainConfig, l1Client, rollupAddresses, txOpts, txOpts, signer, fatalErrChan, big.NewInt(1337), nil) - Require(t, err) - - return consensusNode, execNode -} - func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, challengeMsgIdx int64, wasmRootDir string) { glogger := log.NewGlogHandler( log.NewTerminalHandler(io.Writer(os.Stderr), false)) @@ -257,35 +242,32 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall ctx, cancel := context.WithCancel(context.Background()) defer cancel() + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) initialBalance := new(big.Int).Lsh(big.NewInt(1), 200) - l1Info := NewL1TestInfo(t) + l1Info := builder.L1Info l1Info.GenerateGenesisAccount("deployer", initialBalance) l1Info.GenerateGenesisAccount("asserter", initialBalance) l1Info.GenerateGenesisAccount("challenger", initialBalance) l1Info.GenerateGenesisAccount("sequencer", initialBalance) - chainConfig := params.ArbitrumDevTestChainConfig() - l1Info, l1Backend, _, _ := createTestL1BlockChain(t, l1Info) - conf := arbnode.ConfigDefaultL1Test() + chainConfig := builder.chainConfig + conf := builder.nodeConfig conf.BlockValidator.Enable = false conf.BatchPoster.Enable = false conf.InboxReader.CheckDelay = time.Second var valStack *node.Node var mockSpawn *mockSpawner - valNodeConfig := &valnode.TestValidationConfig - valNodeConfig.Wasm.RootPath = wasmRootDir + builder.valnodeConfig.Wasm.RootPath = wasmRootDir if useStubs { - mockSpawn, valStack = createMockValidationNode(t, ctx, &valNodeConfig.Arbitrator) + mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator) } else { - _, valStack = createTestValidationNode(t, ctx, valNodeConfig) + _, valStack = createTestValidationNode(t, ctx, builder.valnodeConfig) } configByValidationNode(conf, valStack) - fatalErrChan := make(chan error, 10) - locator, err := server_common.NewMachineLocator(wasmRootDir) - Require(t, err) - asserterRollupAddresses, initMessage := DeployOnTestL1(t, ctx, l1Info, l1Backend, chainConfig, locator.LatestWasmModuleRoot()) + builder.BuildL1(t) + l1Backend := builder.L1.Client deployerTxOpts := l1Info.GetDefaultTransactOpts("deployer", ctx) sequencerTxOpts := l1Info.GetDefaultTransactOpts("sequencer", ctx) @@ -295,20 +277,28 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall asserterBridgeAddr, asserterSeqInbox, asserterSeqInboxAddr := setupSequencerInboxStub(ctx, t, l1Info, l1Backend, chainConfig) challengerBridgeAddr, challengerSeqInbox, challengerSeqInboxAddr := setupSequencerInboxStub(ctx, t, l1Info, l1Backend, chainConfig) + asserterRollupAddresses := builder.addresses asserterRollupAddresses.Bridge = asserterBridgeAddr asserterRollupAddresses.SequencerInbox = asserterSeqInboxAddr - asserterL2Info := NewArbTestInfo(t, chainConfig.ChainID) - asserterL2, asserterExec := createL2Nodes(t, ctx, conf, chainConfig, l1Backend, asserterL2Info, asserterRollupAddresses, initMessage, nil, nil, fatalErrChan) - err = asserterL2.Start(ctx) - Require(t, err) - challengerRollupAddresses := *asserterRollupAddresses + cleanup := builder.BuildL2OnL1(t) + defer cleanup() + asserterL2 := builder.L2.ConsensusNode + asserterL2Info := builder.L2Info + asserterExec := builder.L2.ExecNode + + challengerRollupAddresses := *builder.addresses challengerRollupAddresses.Bridge = challengerBridgeAddr challengerRollupAddresses.SequencerInbox = challengerSeqInboxAddr challengerL2Info := NewArbTestInfo(t, chainConfig.ChainID) - challengerL2, challengerExec := createL2Nodes(t, ctx, conf, chainConfig, l1Backend, challengerL2Info, &challengerRollupAddresses, initMessage, nil, nil, fatalErrChan) - err = challengerL2.Start(ctx) - Require(t, err) + challengerParams := SecondNodeParams{ + addresses: &challengerRollupAddresses, + initData: &challengerL2Info.ArbInitData, + } + challenger, challengerCleanup := builder.Build2ndNode(t, &challengerParams) + defer challengerCleanup() + challengerL2 := challenger.ConsensusNode + challengerExec := challenger.ExecNode asserterL2Info.GenerateAccount("Destination") challengerL2Info.SetFullAccountInfo("Destination", asserterL2Info.GetInfoWithPrivKey("Destination")) @@ -343,6 +333,8 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall if useStubs { wasmModuleRoot = mockWasmModuleRoots[0] } else { + locator, err := server_common.NewMachineLocator(wasmRootDir) + Require(t, err) wasmModuleRoot = locator.LatestWasmModuleRoot() if (wasmModuleRoot == common.Hash{}) { Fatal(t, "latest machine not found") diff --git a/system_tests/nodeinterface_test.go b/system_tests/nodeinterface_test.go index 4eace8d514..17bfb18892 100644 --- a/system_tests/nodeinterface_test.go +++ b/system_tests/nodeinterface_test.go @@ -15,57 +15,47 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/params" - "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbos/util" "github.com/offchainlabs/nitro/solgen/go/node_interfacegen" - "github.com/offchainlabs/nitro/validator/server_common" ) func TestFindBatch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - l1Info := NewL1TestInfo(t) + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + l1Info := builder.L1Info initialBalance := new(big.Int).Lsh(big.NewInt(1), 200) l1Info.GenerateGenesisAccount("deployer", initialBalance) l1Info.GenerateGenesisAccount("asserter", initialBalance) l1Info.GenerateGenesisAccount("challenger", initialBalance) l1Info.GenerateGenesisAccount("sequencer", initialBalance) - l1Info, l1Backend, _, _ := createTestL1BlockChain(t, l1Info) - conf := arbnode.ConfigDefaultL1Test() + conf := builder.nodeConfig conf.BlockValidator.Enable = false conf.BatchPoster.Enable = false - chainConfig := params.ArbitrumDevTestChainConfig() - fatalErrChan := make(chan error, 10) - locator, err := server_common.NewMachineLocator("") - Require(t, err) - rollupAddresses, initMsg := DeployOnTestL1(t, ctx, l1Info, l1Backend, chainConfig, locator.LatestWasmModuleRoot()) - - bridgeAddr, seqInbox, seqInboxAddr := setupSequencerInboxStub(ctx, t, l1Info, l1Backend, chainConfig) + builder.BuildL1(t) - callOpts := bind.CallOpts{Context: ctx} + bridgeAddr, seqInbox, seqInboxAddr := setupSequencerInboxStub(ctx, t, builder.L1Info, builder.L1.Client, builder.chainConfig) + builder.addresses.Bridge = bridgeAddr + builder.addresses.SequencerInbox = seqInboxAddr - rollupAddresses.Bridge = bridgeAddr - rollupAddresses.SequencerInbox = seqInboxAddr - l2Info := NewArbTestInfo(t, chainConfig.ChainID) - consensus, _ := createL2Nodes(t, ctx, conf, chainConfig, l1Backend, l2Info, rollupAddresses, initMsg, nil, nil, fatalErrChan) - err = consensus.Start(ctx) - Require(t, err) + cleanup := builder.BuildL2OnL1(t) + defer cleanup() - l2Client := ClientForStack(t, consensus.Stack) - nodeInterface, err := node_interfacegen.NewNodeInterface(types.NodeInterfaceAddress, l2Client) + nodeInterface, err := node_interfacegen.NewNodeInterface(types.NodeInterfaceAddress, builder.L2.Client) Require(t, err) - sequencerTxOpts := l1Info.GetDefaultTransactOpts("sequencer", ctx) + sequencerTxOpts := builder.L1Info.GetDefaultTransactOpts("sequencer", ctx) - l2Info.GenerateAccount("Destination") - makeBatch(t, consensus, l2Info, l1Backend, &sequencerTxOpts, seqInbox, seqInboxAddr, -1) - makeBatch(t, consensus, l2Info, l1Backend, &sequencerTxOpts, seqInbox, seqInboxAddr, -1) - makeBatch(t, consensus, l2Info, l1Backend, &sequencerTxOpts, seqInbox, seqInboxAddr, -1) + builder.L2Info.GenerateAccount("Destination") + const numBatches = 3 + for i := 0; i < numBatches; i++ { + makeBatch(t, builder.L2.ConsensusNode, builder.L2Info, builder.L1.Client, &sequencerTxOpts, seqInbox, seqInboxAddr, -1) + } for blockNum := uint64(0); blockNum < uint64(makeBatch_MsgsPerBatch)*3; blockNum++ { + callOpts := bind.CallOpts{Context: ctx} gotBatchNum, err := nodeInterface.FindBatchContainingBlock(&callOpts, blockNum) Require(t, err) expBatchNum := uint64(0) @@ -75,17 +65,17 @@ func TestFindBatch(t *testing.T) { if expBatchNum != gotBatchNum { Fatal(t, "wrong result from findBatchContainingBlock. blocknum ", blockNum, " expected ", expBatchNum, " got ", gotBatchNum) } - batchL1Block, err := consensus.InboxTracker.GetBatchParentChainBlock(gotBatchNum) + batchL1Block, err := builder.L2.ConsensusNode.InboxTracker.GetBatchParentChainBlock(gotBatchNum) Require(t, err) - blockHeader, err := l2Client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) + blockHeader, err := builder.L2.Client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) Require(t, err) blockHash := blockHeader.Hash() - minCurrentL1Block, err := l1Backend.BlockNumber(ctx) + minCurrentL1Block, err := builder.L1.Client.BlockNumber(ctx) Require(t, err) gotConfirmations, err := nodeInterface.GetL1Confirmations(&callOpts, blockHash) Require(t, err) - maxCurrentL1Block, err := l1Backend.BlockNumber(ctx) + maxCurrentL1Block, err := builder.L1.Client.BlockNumber(ctx) Require(t, err) if gotConfirmations > (maxCurrentL1Block-batchL1Block) || gotConfirmations < (minCurrentL1Block-batchL1Block) { diff --git a/system_tests/program_test.go b/system_tests/program_test.go index b05589a1bf..b748739cdc 100644 --- a/system_tests/program_test.go +++ b/system_tests/program_test.go @@ -1668,20 +1668,12 @@ func formatTime(duration time.Duration) string { return fmt.Sprintf("%.2f%s", span, units[unit]) } -func TestWasmRecreate(t *testing.T) { - builder, auth, cleanup := setupProgramTest(t, true) +func testWasmRecreate(t *testing.T, builder *NodeBuilder, storeTx *types.Transaction, loadTx *types.Transaction, want []byte) { ctx := builder.ctx l2info := builder.L2Info l2client := builder.L2.Client - defer cleanup() - - storage := deployWasm(t, ctx, auth, l2client, rustFile("storage")) - - zero := common.Hash{} - val := common.HexToHash("0x121233445566") // do an onchain call - store value - storeTx := l2info.PrepareTxTo("Owner", &storage, l2info.TransferGas, nil, argsForStorageWrite(zero, val)) Require(t, l2client.SendTransaction(ctx, storeTx)) _, err := EnsureTxSucceeded(ctx, l2client, storeTx) Require(t, err) @@ -1694,11 +1686,10 @@ func TestWasmRecreate(t *testing.T) { Require(t, err) // make sure reading 2nd value succeeds from 2nd node - loadTx := l2info.PrepareTxTo("Owner", &storage, l2info.TransferGas, nil, argsForStorageRead(zero)) result, err := arbutil.SendTxAsCall(ctx, nodeB.Client, loadTx, l2info.GetAddress("Owner"), nil, true) Require(t, err) - if common.BytesToHash(result) != val { - Fatal(t, "got wrong value") + if !bytes.Equal(result, want) { + t.Fatalf("got wrong value, got %x, want %x", result, want) } // close nodeB cleanupB() @@ -1723,8 +1714,8 @@ func TestWasmRecreate(t *testing.T) { // test nodeB - answers eth_call (requires reloading wasm) result, err = arbutil.SendTxAsCall(ctx, nodeB.Client, loadTx, l2info.GetAddress("Owner"), nil, true) Require(t, err) - if common.BytesToHash(result) != val { - Fatal(t, "got wrong value") + if !bytes.Equal(result, want) { + t.Fatalf("got wrong value, got %x, want %x", result, want) } // send new tx (requires wasm) and check nodeB sees it as well @@ -1743,7 +1734,46 @@ func TestWasmRecreate(t *testing.T) { Fatal(t, "not contents found before delete") } os.RemoveAll(wasmPath) +} + +func TestWasmRecreate(t *testing.T) { + builder, auth, cleanup := setupProgramTest(t, true) + ctx := builder.ctx + l2info := builder.L2Info + l2client := builder.L2.Client + defer cleanup() + + storage := deployWasm(t, ctx, auth, l2client, rustFile("storage")) + + zero := common.Hash{} + val := common.HexToHash("0x121233445566") + + storeTx := l2info.PrepareTxTo("Owner", &storage, l2info.TransferGas, nil, argsForStorageWrite(zero, val)) + loadTx := l2info.PrepareTxTo("Owner", &storage, l2info.TransferGas, nil, argsForStorageRead(zero)) + + testWasmRecreate(t, builder, storeTx, loadTx, val[:]) +} + +func TestWasmRecreateWithDelegatecall(t *testing.T) { + builder, auth, cleanup := setupProgramTest(t, true) + ctx := builder.ctx + l2info := builder.L2Info + l2client := builder.L2.Client + defer cleanup() + + storage := deployWasm(t, ctx, auth, l2client, rustFile("storage")) + multicall := deployWasm(t, ctx, auth, l2client, rustFile("multicall")) + + zero := common.Hash{} + val := common.HexToHash("0x121233445566") + + data := argsForMulticall(vm.DELEGATECALL, storage, big.NewInt(0), argsForStorageWrite(zero, val)) + storeTx := l2info.PrepareTxTo("Owner", &multicall, l2info.TransferGas, nil, data) + + data = argsForMulticall(vm.DELEGATECALL, storage, big.NewInt(0), argsForStorageRead(zero)) + loadTx := l2info.PrepareTxTo("Owner", &multicall, l2info.TransferGas, nil, data) + testWasmRecreate(t, builder, storeTx, loadTx, val[:]) } // createMapFromDb is used in verifying if wasm store rebuilding works diff --git a/system_tests/recreatestate_rpc_test.go b/system_tests/recreatestate_rpc_test.go index bf321808de..de123aee78 100644 --- a/system_tests/recreatestate_rpc_test.go +++ b/system_tests/recreatestate_rpc_test.go @@ -20,7 +20,6 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/trie" - "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/util" ) @@ -330,7 +329,6 @@ func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx1, cancel1 := context.WithCancel(ctx) execConfig := gethexec.ConfigDefaultTest() execConfig.RPC.MaxRecreateStateDepth = maxRecreateStateDepth execConfig.Sequencer.MaxBlockSpeed = 0 @@ -340,28 +338,19 @@ func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig skipBlocks := execConfig.Caching.MaxNumberOfBlocksToSkipStateSaving skipGas := execConfig.Caching.MaxAmountOfGasToSkipStateSaving - feedErrChan := make(chan error, 10) - l2info, stack, chainDb, arbDb, blockchain := createL2BlockChain(t, nil, t.TempDir(), params.ArbitrumDevTestChainConfig(), &execConfig.Caching) - - Require(t, execConfig.Validate()) - execConfigFetcher := func() *gethexec.Config { return execConfig } - execNode, err := gethexec.CreateExecutionNode(ctx1, stack, chainDb, blockchain, nil, execConfigFetcher) - Require(t, err) + builder := NewNodeBuilder(ctx).DefaultConfig(t, false) + builder.execConfig = execConfig + cleanup := builder.Build(t) + defer cleanup() - parentChainID := big.NewInt(1337) - node, err := arbnode.CreateNode(ctx1, stack, execNode, arbDb, NewFetcherFromConfig(arbnode.ConfigDefaultL2Test()), blockchain.Config(), nil, nil, nil, nil, nil, feedErrChan, parentChainID, nil) - Require(t, err) - err = node.TxStreamer.AddFakeInitMessage() + client := builder.L2.Client + l2info := builder.L2Info + genesis, err := client.BlockNumber(ctx) Require(t, err) - Require(t, node.Start(ctx1)) - client := ClientForStack(t, stack) - - StartWatchChanErr(t, ctx, feedErrChan, node) - dataDir := node.Stack.DataDir() l2info.GenerateAccount("User2") var txs []*types.Transaction - for i := 0; i < txCount; i++ { + for i := genesis; i < uint64(txCount)+genesis; i++ { tx := l2info.PrepareTx("Owner", "User2", l2info.TransferGas, common.Big1, nil) txs = append(txs, tx) err := client.SendTransaction(ctx, tx) @@ -372,8 +361,7 @@ func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig Fatal(t, "internal test error - tx got included in unexpected block number, have:", have, "want:", want) } } - bc := execNode.Backend.ArbInterface().BlockChain() - genesis := uint64(0) + bc := builder.L2.ExecNode.Backend.ArbInterface().BlockChain() currentHeader := bc.CurrentBlock() if currentHeader == nil { Fatal(t, "missing current block") @@ -385,24 +373,14 @@ func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig expectedBalance, err := client.BalanceAt(ctx, GetTestAddressForAccountName(t, "User2"), new(big.Int).SetUint64(lastBlock)) Require(t, err) - node.StopAndWait() - cancel1() - t.Log("stopped first node") - - l2info, stack, chainDb, arbDb, blockchain = createL2BlockChain(t, l2info, dataDir, params.ArbitrumDevTestChainConfig(), &execConfig.Caching) + builder.RestartL2Node(t) + t.Log("restarted the node") - execNode, err = gethexec.CreateExecutionNode(ctx1, stack, chainDb, blockchain, nil, execConfigFetcher) - Require(t, err) - - node, err = arbnode.CreateNode(ctx, stack, execNode, arbDb, NewFetcherFromConfig(arbnode.ConfigDefaultL2Test()), blockchain.Config(), nil, node.DeployInfo, nil, nil, nil, feedErrChan, parentChainID, nil) - Require(t, err) - Require(t, node.Start(ctx)) - client = ClientForStack(t, stack) - defer node.StopAndWait() - bc = execNode.Backend.ArbInterface().BlockChain() + client = builder.L2.Client + bc = builder.L2.ExecNode.Backend.ArbInterface().BlockChain() gas := skipGas blocks := skipBlocks - for i := genesis + 1; i <= genesis+uint64(txCount); i++ { + for i := genesis; i <= genesis+uint64(txCount); i++ { block := bc.GetBlockByNumber(i) if block == nil { Fatal(t, "header not found for block number:", i) diff --git a/system_tests/retryable_test.go b/system_tests/retryable_test.go index 1abf9f2162..268d59ac98 100644 --- a/system_tests/retryable_test.go +++ b/system_tests/retryable_test.go @@ -131,6 +131,38 @@ func TestRetryableNoExist(t *testing.T) { } } +func TestEstimateRetryableTicketWithNoFundsAndZeroGasPrice(t *testing.T) { + t.Parallel() + builder, _, _, ctx, teardown := retryableSetup(t) + defer teardown() + + user2Address := builder.L2Info.GetAddress("User2") + beneficiaryAddress := builder.L2Info.GetAddress("Beneficiary") + + deposit := arbmath.BigMul(big.NewInt(1e12), big.NewInt(1e12)) + callValue := big.NewInt(1e6) + + nodeInterface, err := node_interfacegen.NewNodeInterface(types.NodeInterfaceAddress, builder.L2.Client) + Require(t, err, "failed to deploy NodeInterface") + + builder.L2Info.GenerateAccount("zerofunds") + usertxoptsL2 := builder.L2Info.GetDefaultTransactOpts("zerofunds", ctx) + usertxoptsL2.NoSend = true + usertxoptsL2.GasMargin = 0 + usertxoptsL2.GasPrice = big.NewInt(0) + _, err = nodeInterface.EstimateRetryableTicket( + &usertxoptsL2, + usertxoptsL2.From, + deposit, + user2Address, + callValue, + beneficiaryAddress, + beneficiaryAddress, + []byte{}, + ) + Require(t, err, "failed to estimate retryable submission") +} + func TestSubmitRetryableImmediateSuccess(t *testing.T) { t.Parallel() builder, delayedInbox, lookupL2Tx, ctx, teardown := retryableSetup(t) diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index 43d55f40c9..1b8926a1b9 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -67,6 +67,7 @@ func TestRedisSeqCoordinatorPriorities(t *testing.T) { createStartNode := func(nodeNum int) { builder.nodeConfig.SeqCoordinator.MyUrl = nodeNames[nodeNum] builder.L2Info = l2Info + builder.dataDir = t.TempDir() // set new data dir for each node builder.Build(t) testNodes[nodeNum] = builder.L2 } diff --git a/system_tests/seq_nonce_test.go b/system_tests/seq_nonce_test.go index f0e3dcffd7..72629e1978 100644 --- a/system_tests/seq_nonce_test.go +++ b/system_tests/seq_nonce_test.go @@ -67,7 +67,7 @@ func TestSequencerNonceTooHigh(t *testing.T) { cleanup := builder.Build(t) defer cleanup() - builder.L2Info.GetInfoWithPrivKey("Owner").Nonce++ + builder.L2Info.GetInfoWithPrivKey("Owner").Nonce.Add(1) before := time.Now() tx := builder.L2Info.PrepareTx("Owner", "Owner", builder.L2Info.TransferGas, common.Big0, nil) @@ -97,21 +97,21 @@ func TestSequencerNonceTooHighQueueFull(t *testing.T) { defer cleanup() count := 15 - var completed uint64 + var completed atomic.Uint64 for i := 0; i < count; i++ { - builder.L2Info.GetInfoWithPrivKey("Owner").Nonce++ + builder.L2Info.GetInfoWithPrivKey("Owner").Nonce.Add(1) tx := builder.L2Info.PrepareTx("Owner", "Owner", builder.L2Info.TransferGas, common.Big0, nil) go func() { err := builder.L2.Client.SendTransaction(ctx, tx) if err == nil { Fatal(t, "No error when nonce was too high") } - atomic.AddUint64(&completed, 1) + completed.Add(1) }() } for wait := 9; wait >= 0; wait-- { - got := int(atomic.LoadUint64(&completed)) + got := int(completed.Load()) expected := count - builder.execConfig.Sequencer.NonceFailureCacheSize if got == expected { break diff --git a/system_tests/seq_reject_test.go b/system_tests/seq_reject_test.go index 76bdfc2612..2dbdba6804 100644 --- a/system_tests/seq_reject_test.go +++ b/system_tests/seq_reject_test.go @@ -54,7 +54,7 @@ func TestSequencerRejection(t *testing.T) { } wg := sync.WaitGroup{} - var stopBackground int32 + var stopBackground atomic.Int32 for user := 0; user < 9; user++ { user := user name := fmt.Sprintf("User%v", user) @@ -78,12 +78,12 @@ func TestSequencerRejection(t *testing.T) { GasFeeCap: arbmath.BigMulByUint(builderSeq.L2Info.GasPrice, 100), Value: common.Big0, } - for atomic.LoadInt32(&stopBackground) == 0 { - txData.Nonce = info.Nonce + for stopBackground.Load() == 0 { + txData.Nonce = info.Nonce.Load() var expectedErr string if user%3 == 0 { txData.Data = noopId - info.Nonce += 1 + info.Nonce.Add(1) } else if user%3 == 1 { txData.Data = revertId expectedErr = "execution reverted: SOLIDITY_REVERTING" @@ -116,7 +116,7 @@ func TestSequencerRejection(t *testing.T) { } } - atomic.StoreInt32(&stopBackground, 1) + stopBackground.Store(1) wg.Wait() header1, err := builderSeq.L2.Client.HeaderByNumber(ctx, nil) diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index ab30598b60..e3a98b4961 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -192,7 +192,7 @@ func testLyingSequencer(t *testing.T, dasModeStr string) { builder.L2Info.GenerateAccount("RealUser") fraudTx := builder.L2Info.PrepareTx("Owner", "FraudUser", builder.L2Info.TransferGas, big.NewInt(1e12), nil) - builder.L2Info.GetInfoWithPrivKey("Owner").Nonce -= 1 // Use same l2info object for different l2s + builder.L2Info.GetInfoWithPrivKey("Owner").Nonce.Add(^uint64(0)) // Use same l2info object for different l2s realTx := builder.L2Info.PrepareTx("Owner", "RealUser", builder.L2Info.TransferGas, big.NewInt(1e12), nil) for i := 0; i < 10; i++ { diff --git a/system_tests/snap_sync_test.go b/system_tests/snap_sync_test.go index dd22bb027c..31d5ea8492 100644 --- a/system_tests/snap_sync_test.go +++ b/system_tests/snap_sync_test.go @@ -39,7 +39,13 @@ func TestSnapSync(t *testing.T) { // This node will be stopped in middle and arbitrumdata will be deleted. testDir := t.TempDir() nodeBStack := createStackConfigForTest(testDir) - nodeB, cleanupB := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: nodeBStack}) + nodeBConfig := builder.nodeConfig + nodeBConfig.BatchPoster.Enable = false + nodeBParams := &SecondNodeParams{ + stackConfig: nodeBStack, + nodeConfig: nodeBConfig, + } + nodeB, cleanupB := builder.Build2ndNode(t, nodeBParams) builder.BridgeBalance(t, "Faucet", big.NewInt(1).Mul(big.NewInt(params.Ether), big.NewInt(10000))) diff --git a/system_tests/staker_test.go b/system_tests/staker_test.go index 4afe2e8ccd..52f16614f7 100644 --- a/system_tests/staker_test.go +++ b/system_tests/staker_test.go @@ -42,7 +42,7 @@ import ( func makeBackgroundTxs(ctx context.Context, builder *NodeBuilder) error { for i := uint64(0); ctx.Err() == nil; i++ { - builder.L2Info.Accounts["BackgroundUser"].Nonce = i + builder.L2Info.Accounts["BackgroundUser"].Nonce.Store(i) tx := builder.L2Info.PrepareTx("BackgroundUser", "BackgroundUser", builder.L2Info.TransferGas, common.Big0, nil) err := builder.L2.Client.SendTransaction(ctx, tx) if err != nil { diff --git a/system_tests/test_info.go b/system_tests/test_info.go index ee84564bdc..6313e392ca 100644 --- a/system_tests/test_info.go +++ b/system_tests/test_info.go @@ -28,7 +28,7 @@ var simulatedChainID = big.NewInt(1337) type AccountInfo struct { Address common.Address PrivateKey *ecdsa.PrivateKey - Nonce uint64 + Nonce atomic.Uint64 } type BlockchainTestInfo struct { @@ -92,7 +92,6 @@ func (b *BlockchainTestInfo) GenerateAccount(name string) { b.Accounts[name] = &AccountInfo{ PrivateKey: privateKey, Address: crypto.PubkeyToAddress(privateKey.PublicKey), - Nonce: 0, } log.Info("New Key ", "name", name, "Address", b.Accounts[name].Address) } @@ -138,8 +137,11 @@ func (b *BlockchainTestInfo) SetContract(name string, address common.Address) { } func (b *BlockchainTestInfo) SetFullAccountInfo(name string, info *AccountInfo) { - infoCopy := *info - b.Accounts[name] = &infoCopy + b.Accounts[name] = &AccountInfo{ + Address: info.Address, + PrivateKey: info.PrivateKey, + } + b.Accounts[name].Nonce.Store(info.Nonce.Load()) } func (b *BlockchainTestInfo) GetAddress(name string) common.Address { @@ -176,7 +178,7 @@ func (b *BlockchainTestInfo) GetDefaultTransactOpts(name string, ctx context.Con if err != nil { return nil, err } - atomic.AddUint64(&info.Nonce, 1) // we don't set Nonce, but try to keep track.. + info.Nonce.Add(1) // we don't set Nonce, but try to keep track.. return tx.WithSignature(b.Signer, signature) }, GasMargin: 2000, // adjust by 20% @@ -214,7 +216,7 @@ func (b *BlockchainTestInfo) PrepareTxTo( ) *types.Transaction { b.T.Helper() info := b.GetInfoWithPrivKey(from) - txNonce := atomic.AddUint64(&info.Nonce, 1) - 1 + txNonce := info.Nonce.Add(1) - 1 if value == nil { value = common.Big0 } diff --git a/system_tests/twonodeslong_test.go b/system_tests/twonodeslong_test.go index ce3244462f..83cd975dd8 100644 --- a/system_tests/twonodeslong_test.go +++ b/system_tests/twonodeslong_test.go @@ -120,7 +120,7 @@ func testTwoNodesLong(t *testing.T, dasModeStr string) { } } // create bad tx on delayed inbox - builder.L2Info.GetInfoWithPrivKey("ErrorTxSender").Nonce = 10 + builder.L2Info.GetInfoWithPrivKey("ErrorTxSender").Nonce.Store(10) builder.L1.SendWaitTestTransactions(t, []*types.Transaction{ WrapL2ForDelayed(t, builder.L2Info.PrepareTx("ErrorTxSender", "DelayedReceiver", 30002, delayedFaucetNeeds, nil), builder.L1Info, "User", 100000), }) diff --git a/util/containers/promise.go b/util/containers/promise.go index a180c094eb..54f0df195d 100644 --- a/util/containers/promise.go +++ b/util/containers/promise.go @@ -20,7 +20,7 @@ type Promise[R any] struct { chanReady chan struct{} result R err error - produced uint32 + produced atomic.Bool cancel func() } @@ -67,7 +67,7 @@ func (p *Promise[R]) Cancel() { } func (p *Promise[R]) ProduceErrorSafe(err error) error { - if !atomic.CompareAndSwapUint32(&p.produced, 0, 1) { + if !p.produced.CompareAndSwap(false, true) { return errors.New("cannot produce two values") } p.err = err @@ -83,7 +83,7 @@ func (p *Promise[R]) ProduceError(err error) { } func (p *Promise[R]) ProduceSafe(value R) error { - if !atomic.CompareAndSwapUint32(&p.produced, 0, 1) { + if !p.produced.CompareAndSwap(false, true) { return errors.New("cannot produce two values") } p.result = value diff --git a/util/containers/promise_test.go b/util/containers/promise_test.go index 61ee4f2c3a..3e41a6465d 100644 --- a/util/containers/promise_test.go +++ b/util/containers/promise_test.go @@ -25,8 +25,8 @@ func TestPromise(t *testing.T) { t.Fatal("unexpected Promise.Current when ready") } - cancelCalled := int64(0) - cancelFunc := func() { atomic.AddInt64(&cancelCalled, 1) } + var cancelCalled atomic.Int64 + cancelFunc := func() { cancelCalled.Add(1) } tempPromise = NewPromise[int](cancelFunc) res, err = tempPromise.Current() @@ -68,12 +68,12 @@ func TestPromise(t *testing.T) { t.Fatal("unexpected Promise.Current 2nd time") } - if atomic.LoadInt64(&cancelCalled) != 0 { + if cancelCalled.Load() != 0 { t.Fatal("cancel called by await/current when it shouldn't be") } tempPromise.Cancel() - if atomic.LoadInt64(&cancelCalled) != 0 { + if cancelCalled.Load() != 0 { t.Fatal("cancel called after error produced") } @@ -84,11 +84,11 @@ func TestPromise(t *testing.T) { if res != 0 || !errors.Is(err, context.DeadlineExceeded) { t.Fatal("unexpected Promise.Await with timeout") } - if atomic.LoadInt64(&cancelCalled) != 1 { + if cancelCalled.Load() != 1 { t.Fatal("cancel not called by await on timeout") } tempPromise.Cancel() - if atomic.LoadInt64(&cancelCalled) != 2 { + if cancelCalled.Load() != 2 { t.Fatal("cancel not called by promise.Cancel") } } diff --git a/util/iostat/iostat.go b/util/iostat/iostat.go index 9bc5ff800c..342a44100e 100644 --- a/util/iostat/iostat.go +++ b/util/iostat/iostat.go @@ -30,7 +30,9 @@ func RegisterAndPopulateMetrics(ctx context.Context, spawnInterval, maxDeviceCou if _, ok := deviceMetrics[stat.DeviceName]; !ok { // Register metrics for a maximum of maxDeviceCount (fail safe incase iostat command returns incorrect names indefinitely) if len(deviceMetrics) < maxDeviceCount { - baseMetricName := fmt.Sprintf("isotat/%s/", stat.DeviceName) + // Replace hyphens with underscores to avoid metric name issues + sanitizedDeviceName := strings.ReplaceAll(stat.DeviceName, "-", "_") + baseMetricName := fmt.Sprintf("iostat/%s/", sanitizedDeviceName) deviceMetrics[stat.DeviceName] = make(map[string]metrics.GaugeFloat64) deviceMetrics[stat.DeviceName]["readspersecond"] = metrics.NewRegisteredGaugeFloat64(baseMetricName+"readspersecond", nil) deviceMetrics[stat.DeviceName]["writespersecond"] = metrics.NewRegisteredGaugeFloat64(baseMetricName+"writespersecond", nil) diff --git a/util/rpcclient/rpcclient.go b/util/rpcclient/rpcclient.go index 56aebef396..e171e6e99b 100644 --- a/util/rpcclient/rpcclient.go +++ b/util/rpcclient/rpcclient.go @@ -77,7 +77,7 @@ type RpcClient struct { config ClientConfigFetcher client *rpc.Client autoStack *node.Node - logId uint64 + logId atomic.Uint64 } func NewRpcClient(config ClientConfigFetcher, stack *node.Node) *RpcClient { @@ -154,7 +154,7 @@ func (c *RpcClient) CallContext(ctx_in context.Context, result interface{}, meth if c.client == nil { return errors.New("not connected") } - logId := atomic.AddUint64(&c.logId, 1) + logId := c.logId.Add(1) log.Trace("sending RPC request", "method", method, "logId", logId, "args", limitedArgumentsMarshal{int(c.config().ArgLogLimit), args}) var err error for i := 0; i < int(c.config().Retries)+1; i++ { diff --git a/util/rpcclient/rpcclient_test.go b/util/rpcclient/rpcclient_test.go index 8613671d37..ae7e4fc226 100644 --- a/util/rpcclient/rpcclient_test.go +++ b/util/rpcclient/rpcclient_test.go @@ -46,10 +46,13 @@ func createTestNode(t *testing.T, ctx context.Context, stuckOrFailed int64) *nod stack, err := node.New(&stackConf) Require(t, err) + service := &testAPI{} + service.stuckCalls.Store(stuckOrFailed) + service.failedCalls.Store(stuckOrFailed) testAPIs := []rpc.API{{ Namespace: "test", Version: "1.0", - Service: &testAPI{stuckOrFailed, stuckOrFailed}, + Service: service, Public: true, Authenticated: false, }} @@ -67,12 +70,12 @@ func createTestNode(t *testing.T, ctx context.Context, stuckOrFailed int64) *nod } type testAPI struct { - stuckCalls int64 - failedCalls int64 + stuckCalls atomic.Int64 + failedCalls atomic.Int64 } func (t *testAPI) StuckAtFirst(ctx context.Context) error { - stuckRemaining := atomic.AddInt64(&t.stuckCalls, -1) + 1 + stuckRemaining := t.stuckCalls.Add(-1) + 1 if stuckRemaining <= 0 { return nil } @@ -81,7 +84,7 @@ func (t *testAPI) StuckAtFirst(ctx context.Context) error { } func (t *testAPI) FailAtFirst(ctx context.Context) error { - failedRemaining := atomic.AddInt64(&t.failedCalls, -1) + 1 + failedRemaining := t.failedCalls.Add(-1) + 1 if failedRemaining <= 0 { return nil } diff --git a/util/testhelpers/testhelpers.go b/util/testhelpers/testhelpers.go index 071429879e..b1b08708e7 100644 --- a/util/testhelpers/testhelpers.go +++ b/util/testhelpers/testhelpers.go @@ -11,6 +11,7 @@ import ( "math/rand" "os" "regexp" + "runtime/debug" "sync" "testing" @@ -24,6 +25,7 @@ import ( func RequireImpl(t *testing.T, err error, printables ...interface{}) { t.Helper() if err != nil { + t.Log(string(debug.Stack())) t.Fatal(colors.Red, printables, err, colors.Clear) } } diff --git a/validator/client/redis/producer.go b/validator/client/redis/producer.go index 4aa4031350..7594421f9a 100644 --- a/validator/client/redis/producer.go +++ b/validator/client/redis/producer.go @@ -58,7 +58,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { type ValidationClient struct { stopwaiter.StopWaiter name string - room int32 + room atomic.Int32 // producers stores moduleRoot to producer mapping. producers map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState] producerConfig pubsub.ProducerConfig @@ -75,14 +75,15 @@ func NewValidationClient(cfg *ValidationClientConfig) (*ValidationClient, error) if err != nil { return nil, err } - return &ValidationClient{ + validationClient := &ValidationClient{ name: cfg.Name, - room: cfg.Room, producers: make(map[common.Hash]*pubsub.Producer[*validator.ValidationInput, validator.GoGlobalState]), producerConfig: cfg.ProducerConfig, redisClient: redisClient, createStreams: cfg.CreateStreams, - }, nil + } + validationClient.room.Store(cfg.Room) + return validationClient, nil } func (c *ValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { @@ -114,8 +115,8 @@ func (c *ValidationClient) WasmModuleRoots() ([]common.Hash, error) { } func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&c.room, -1) - defer atomic.AddInt32(&c.room, 1) + c.room.Add(-1) + defer c.room.Add(1) producer, found := c.producers[moduleRoot] if !found { errPromise := containers.NewReadyPromise(validator.GoGlobalState{}, fmt.Errorf("no validation is configured for wasm root %v", moduleRoot)) @@ -152,5 +153,5 @@ func (c *ValidationClient) Name() string { } func (c *ValidationClient) Room() int { - return int(c.room) + return int(c.room.Load()) } diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index 949260002d..79ecc6bdf4 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -29,7 +29,7 @@ type ValidationClient struct { stopwaiter.StopWaiter client *rpcclient.RpcClient name string - room int32 + room atomic.Int32 wasmModuleRoots []common.Hash } @@ -40,12 +40,12 @@ func NewValidationClient(config rpcclient.ClientConfigFetcher, stack *node.Node) } func (c *ValidationClient) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&c.room, -1) + c.room.Add(-1) promise := stopwaiter.LaunchPromiseThread[validator.GoGlobalState](c, func(ctx context.Context) (validator.GoGlobalState, error) { input := server_api.ValidationInputToJson(entry) var res validator.GoGlobalState err := c.client.CallContext(ctx, &res, server_api.Namespace+"_validate", input, moduleRoot) - atomic.AddInt32(&c.room, 1) + c.room.Add(1) return res, err }) return server_common.NewValRun(promise, moduleRoot) @@ -81,7 +81,7 @@ func (c *ValidationClient) Start(ctx_in context.Context) error { } else { log.Info("connected to validation server", "name", name, "room", room) } - atomic.StoreInt32(&c.room, int32(room)) + c.room.Store(int32(room)) c.wasmModuleRoots = moduleRoots c.name = name return nil @@ -106,7 +106,7 @@ func (c *ValidationClient) Name() string { } func (c *ValidationClient) Room() int { - room32 := atomic.LoadInt32(&c.room) + room32 := c.room.Load() if room32 < 0 { return 0 } diff --git a/validator/server_arb/execution_run.go b/validator/server_arb/execution_run.go index 8bdce145a2..d29a88d34d 100644 --- a/validator/server_arb/execution_run.go +++ b/validator/server_arb/execution_run.go @@ -24,7 +24,7 @@ type executionRun struct { close sync.Once } -// NewExecutionChallengeBackend creates a backend with the given arguments. +// NewExecutionRun creates a backend with the given arguments. // Note: machineCache may be nil, but if present, it must not have a restricted range. func NewExecutionRun( ctxIn context.Context, diff --git a/validator/server_arb/machine.go b/validator/server_arb/machine.go index cffd3db0ee..adca9695e2 100644 --- a/validator/server_arb/machine.go +++ b/validator/server_arb/machine.go @@ -59,7 +59,7 @@ type ArbitratorMachine struct { var _ MachineInterface = (*ArbitratorMachine)(nil) var preimageResolvers containers.SyncMap[int64, GoPreimageResolver] -var lastPreimageResolverId int64 // atomic +var lastPreimageResolverId atomic.Int64 // atomic // Any future calls to this machine will result in a panic func (m *ArbitratorMachine) Destroy() { @@ -382,7 +382,7 @@ func (m *ArbitratorMachine) SetPreimageResolver(resolver GoPreimageResolver) err if m.frozen { return errors.New("machine frozen") } - id := atomic.AddInt64(&lastPreimageResolverId, 1) + id := lastPreimageResolverId.Add(1) preimageResolvers.Store(id, resolver) m.contextId = &id runtime.SetFinalizer(m.contextId, freeContextId) diff --git a/validator/server_arb/validator_spawner.go b/validator/server_arb/validator_spawner.go index dca15c369e..7b9293f7bd 100644 --- a/validator/server_arb/validator_spawner.go +++ b/validator/server_arb/validator_spawner.go @@ -59,7 +59,7 @@ func DefaultArbitratorSpawnerConfigFetcher() *ArbitratorSpawnerConfig { type ArbitratorSpawner struct { stopwaiter.StopWaiter - count int32 + count atomic.Int32 locator *server_common.MachineLocator machineLoader *ArbMachineLoader config ArbitratorSpawnerConfigFecher @@ -176,9 +176,9 @@ func (v *ArbitratorSpawner) execute( } func (v *ArbitratorSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&v.count, 1) + v.count.Add(1) promise := stopwaiter.LaunchPromiseThread[validator.GoGlobalState](v, func(ctx context.Context) (validator.GoGlobalState, error) { - defer atomic.AddInt32(&v.count, -1) + defer v.count.Add(-1) return v.execute(ctx, entry, moduleRoot) }) return server_common.NewValRun(promise, moduleRoot) diff --git a/validator/server_jit/spawner.go b/validator/server_jit/spawner.go index 703e761af5..eda74b2911 100644 --- a/validator/server_jit/spawner.go +++ b/validator/server_jit/spawner.go @@ -39,7 +39,7 @@ func JitSpawnerConfigAddOptions(prefix string, f *flag.FlagSet) { type JitSpawner struct { stopwaiter.StopWaiter - count int32 + count atomic.Int32 locator *server_common.MachineLocator machineLoader *JitMachineLoader config JitSpawnerConfigFecher @@ -91,9 +91,9 @@ func (s *JitSpawner) Name() string { } func (v *JitSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun { - atomic.AddInt32(&v.count, 1) + v.count.Add(1) promise := stopwaiter.LaunchPromiseThread[validator.GoGlobalState](v, func(ctx context.Context) (validator.GoGlobalState, error) { - defer atomic.AddInt32(&v.count, -1) + defer v.count.Add(-1) return v.execute(ctx, entry, moduleRoot) }) return server_common.NewValRun(promise, moduleRoot) diff --git a/wsbroadcastserver/clientconnection.go b/wsbroadcastserver/clientconnection.go index ba70756c98..7fb88bf9eb 100644 --- a/wsbroadcastserver/clientconnection.go +++ b/wsbroadcastserver/clientconnection.go @@ -51,7 +51,7 @@ type ClientConnection struct { requestedSeqNum arbutil.MessageIndex LastSentSeqNum atomic.Uint64 - lastHeardUnix int64 + lastHeardUnix atomic.Int64 out chan message backlog backlog.Backlog registered chan bool @@ -74,7 +74,7 @@ func NewClientConnection( delay time.Duration, bklg backlog.Backlog, ) *ClientConnection { - return &ClientConnection{ + clientConnection := &ClientConnection{ conn: conn, clientIp: connectingIP, desc: desc, @@ -82,7 +82,6 @@ func NewClientConnection( Name: fmt.Sprintf("%s@%s-%d", connectingIP, conn.RemoteAddr(), rand.Intn(10)), clientAction: clientAction, requestedSeqNum: requestedSeqNum, - lastHeardUnix: time.Now().Unix(), out: make(chan message, maxSendQueue), compression: compression, flateReader: NewFlateReader(), @@ -91,6 +90,8 @@ func NewClientConnection( registered: make(chan bool, 1), backlogSent: false, } + clientConnection.lastHeardUnix.Store(time.Now().Unix()) + return clientConnection } func (cc *ClientConnection) Age() time.Duration { @@ -287,7 +288,7 @@ func (cc *ClientConnection) RequestedSeqNum() arbutil.MessageIndex { } func (cc *ClientConnection) GetLastHeard() time.Time { - return time.Unix(atomic.LoadInt64(&cc.lastHeardUnix), 0) + return time.Unix(cc.lastHeardUnix.Load(), 0) } // Receive reads next message from client's underlying connection. @@ -307,7 +308,7 @@ func (cc *ClientConnection) readRequest(ctx context.Context, timeout time.Durati cc.ioMutex.Lock() defer cc.ioMutex.Unlock() - atomic.StoreInt64(&cc.lastHeardUnix, time.Now().Unix()) + cc.lastHeardUnix.Store(time.Now().Unix()) var data []byte var opCode ws.OpCode diff --git a/wsbroadcastserver/clientmanager.go b/wsbroadcastserver/clientmanager.go index a88716756a..4b7b7a2bcb 100644 --- a/wsbroadcastserver/clientmanager.go +++ b/wsbroadcastserver/clientmanager.go @@ -44,7 +44,7 @@ type ClientManager struct { stopwaiter.StopWaiter clientPtrMap map[*ClientConnection]bool - clientCount int32 + clientCount atomic.Int32 pool *gopool.Pool poller netpoll.Poller broadcastChan chan *m.BroadcastMessage @@ -85,7 +85,7 @@ func (cm *ClientManager) registerClient(ctx context.Context, clientConnection *C clientsCurrentGauge.Inc(1) clientsConnectCount.Inc(1) - atomic.AddInt32(&cm.clientCount, 1) + cm.clientCount.Add(1) cm.clientPtrMap[clientConnection] = true clientsTotalSuccessCounter.Inc(1) @@ -120,7 +120,7 @@ func (cm *ClientManager) removeClientImpl(clientConnection *ClientConnection) { clientsDurationHistogram.Update(clientConnection.Age().Microseconds()) clientsCurrentGauge.Dec(1) clientsDisconnectCount.Inc(1) - atomic.AddInt32(&cm.clientCount, -1) + cm.clientCount.Add(-1) } func (cm *ClientManager) removeClient(clientConnection *ClientConnection) { @@ -137,7 +137,7 @@ func (cm *ClientManager) removeClient(clientConnection *ClientConnection) { } func (cm *ClientManager) ClientCount() int32 { - return atomic.LoadInt32(&cm.clientCount) + return cm.clientCount.Load() } // Broadcast sends batch item to all clients.