Skip to content

Commit

Permalink
Merge branch 'master' into bold-review
Browse files Browse the repository at this point in the history
  • Loading branch information
rauljordan committed Jul 18, 2024
2 parents a525bb1 + 16434a2 commit 0a078a7
Show file tree
Hide file tree
Showing 65 changed files with 1,092 additions and 805 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ jobs:
if: matrix.test-mode == 'defaults'
run: |
packages=`go list ./...`
stdbuf -oL gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 --no-color=false -- ./... -coverprofile=coverage.txt -covermode=atomic -coverpkg=./...,./go-ethereum/... -timeout 20m -parallel=8 > >(stdbuf -oL tee full.log | grep -vE "INFO|seal")
stdbuf -oL gotestsum --format short-verbose --packages="$packages" --rerun-fails=1 --no-color=false -- ./... -coverprofile=coverage.txt -covermode=atomic -coverpkg=./...,./go-ethereum/... -timeout 20m -parallel=8 -tags=cionly > >(stdbuf -oL tee full.log | grep -vE "INFO|seal")
- name: run tests with race detection
if: matrix.test-mode == 'race'
Expand Down
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,10 @@ COPY --from=node-builder /workspace/target/bin/nitro /usr/local/bin/
COPY --from=node-builder /workspace/target/bin/relay /usr/local/bin/
COPY --from=node-builder /workspace/target/bin/nitro-val /usr/local/bin/
COPY --from=node-builder /workspace/target/bin/seq-coordinator-manager /usr/local/bin/
COPY --from=node-builder /workspace/target/bin/prover /usr/local/bin/
COPY --from=machine-versions /workspace/machines /home/user/target/machines
COPY ./scripts/validate-wasm-module-root.sh .
RUN ./validate-wasm-module-root.sh /home/user/target/machines /usr/local/bin/prover
USER root
RUN export DEBIAN_FRONTEND=noninteractive && \
apt-get update && \
Expand Down
15 changes: 15 additions & 0 deletions arbitrator/prover/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ struct Opts {
#[structopt(long)]
/// print modules to the console
print_modules: bool,
#[structopt(long)]
/// print wasm module root to the console
print_wasmmoduleroot: bool,
/// profile output instead of generting proofs
#[structopt(short = "p", long)]
profile_run: bool,
Expand Down Expand Up @@ -122,6 +125,18 @@ const DELAYED_HEADER_LEN: usize = 112; // also in test-case's host-io.rs & contr
fn main() -> Result<()> {
let opts = Opts::from_args();

if opts.print_wasmmoduleroot {
match Machine::new_from_wavm(&opts.binary) {
Ok(mach) => {
println!("0x{}", mach.get_modules_root());
return Ok(());
}
Err(err) => {
eprintln!("Error loading binary: {err}");
return Err(err);
}
}
}
let mut inbox_contents = HashMap::default();
let mut inbox_position = opts.inbox_position;
let mut delayed_position = opts.delayed_inbox_position;
Expand Down
8 changes: 4 additions & 4 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type BatchPoster struct {
// This is an atomic variable that should only be accessed atomically.
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
backlog uint64
backlog atomic.Uint64
lastHitL1Bounds time.Time // The last time we wanted to post a message but hit the L1 bounds

batchReverted atomic.Bool // indicates whether data poster batch was reverted
Expand Down Expand Up @@ -1086,7 +1086,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if config.IgnoreBlobPrice {
use4844 = true
} else {
backlog := atomic.LoadUint64(&b.backlog)
backlog := b.backlog.Load()
// Logic to prevent switching from non-4844 batches to 4844 batches too often,
// so that blocks can be filled efficiently. The geth txpool rejects txs for
// accounts that already have the other type of txs in the pool with
Expand Down Expand Up @@ -1437,7 +1437,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
// Setting the backlog to 0 here ensures that we don't lower compression as a result.
backlog = 0
}
atomic.StoreUint64(&b.backlog, backlog)
b.backlog.Store(backlog)
b.building = nil

// If we aren't queueing up transactions, wait for the receipt before moving on to the next batch.
Expand All @@ -1453,7 +1453,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}

func (b *BatchPoster) GetBacklogEstimate() uint64 {
return atomic.LoadUint64(&b.backlog)
return b.backlog.Load()
}

func (b *BatchPoster) Start(ctxIn context.Context) {
Expand Down
16 changes: 8 additions & 8 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type InboxReader struct {
l1Reader *headerreader.HeaderReader

// Atomic
lastSeenBatchCount uint64
lastReadBatchCount uint64
lastSeenBatchCount atomic.Uint64
lastReadBatchCount atomic.Uint64
}

func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, l1Reader *headerreader.HeaderReader, firstMessageBlock *big.Int, delayedBridge *DelayedBridge, sequencerInbox *SequencerInbox, config InboxReaderConfigFetcher) (*InboxReader, error) {
Expand Down Expand Up @@ -240,7 +240,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
seenBatchCountStored := uint64(math.MaxUint64)
storeSeenBatchCount := func() {
if seenBatchCountStored != seenBatchCount {
atomic.StoreUint64(&r.lastSeenBatchCount, seenBatchCount)
r.lastSeenBatchCount.Store(seenBatchCount)
seenBatchCountStored = seenBatchCount
}
}
Expand Down Expand Up @@ -394,7 +394,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
// There's nothing to do
from = arbmath.BigAddByUint(currentHeight, 1)
blocksToFetch = config.DefaultBlocksToRead
atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount)
r.lastReadBatchCount.Store(checkingBatchCount)
storeSeenBatchCount()
if !r.caughtUp && readMode == "latest" {
r.caughtUp = true
Expand Down Expand Up @@ -526,7 +526,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}
if len(sequencerBatches) > 0 {
readAnyBatches = true
atomic.StoreUint64(&r.lastReadBatchCount, sequencerBatches[len(sequencerBatches)-1].SequenceNumber+1)
r.lastReadBatchCount.Store(sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1)
storeSeenBatchCount()
}
}
Expand All @@ -553,7 +553,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
}

if !readAnyBatches {
atomic.StoreUint64(&r.lastReadBatchCount, checkingBatchCount)
r.lastReadBatchCount.Store(checkingBatchCount)
storeSeenBatchCount()
}
}
Expand Down Expand Up @@ -625,15 +625,15 @@ func (r *InboxReader) GetSequencerMessageBytes(ctx context.Context, seqNum uint6
}

func (r *InboxReader) GetLastReadBatchCount() uint64 {
return atomic.LoadUint64(&r.lastReadBatchCount)
return r.lastReadBatchCount.Load()
}

// GetLastSeenBatchCount returns how many sequencer batches the inbox reader has read in from L1.
// Return values:
// >0 - last batchcount seen in run() - only written after lastReadBatchCount updated
// 0 - no batch seen, error
func (r *InboxReader) GetLastSeenBatchCount() uint64 {
return atomic.LoadUint64(&r.lastSeenBatchCount)
return r.lastSeenBatchCount.Load()
}

func (r *InboxReader) GetDelayBlocks() uint64 {
Expand Down
10 changes: 5 additions & 5 deletions arbnode/redislock/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Simple struct {
stopwaiter.StopWaiter
client redis.UniversalClient
config SimpleCfgFetcher
lockedUntil int64
lockedUntil atomic.Int64
mutex sync.Mutex
stopping bool
readyToLock func() bool
Expand Down Expand Up @@ -239,12 +239,12 @@ func execTestPipe(pipe redis.Pipeliner, ctx context.Context) error {
}

// notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter.
func atomicTimeRead(addr *int64) time.Time {
asint64 := atomic.LoadInt64(addr)
func atomicTimeRead(addr *atomic.Int64) time.Time {
asint64 := addr.Load()
return time.UnixMilli(asint64)
}

func atomicTimeWrite(addr *int64, t time.Time) {
func atomicTimeWrite(addr *atomic.Int64, t time.Time) {
asint64 := t.UnixMilli()
atomic.StoreInt64(addr, asint64)
addr.Store(asint64)
}
12 changes: 6 additions & 6 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type SeqCoordinator struct {
prevChosenSequencer string
reportedWantsLockout bool

lockoutUntil int64 // atomic
lockoutUntil atomic.Int64 // atomic

wantsLockoutMutex sync.Mutex // manages access to acquireLockoutAndWriteMessage and generally the wants lockout key
avoidLockout int // If > 0, prevents acquiring the lockout but not extending the lockout if no alternative sequencer wants the lockout. Protected by chosenUpdateMutex.
Expand Down Expand Up @@ -191,14 +191,14 @@ func StandaloneSeqCoordinatorInvalidateMsgIndex(ctx context.Context, redisClient
return nil
}

func atomicTimeWrite(addr *int64, t time.Time) {
func atomicTimeWrite(addr *atomic.Int64, t time.Time) {
asint64 := t.UnixMilli()
atomic.StoreInt64(addr, asint64)
addr.Store(asint64)
}

// notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter.
func atomicTimeRead(addr *int64) time.Time {
asint64 := atomic.LoadInt64(addr)
func atomicTimeRead(addr *atomic.Int64) time.Time {
asint64 := addr.Load()
return time.UnixMilli(asint64)
}

Expand Down Expand Up @@ -692,7 +692,7 @@ func (c *SeqCoordinator) DebugPrint() string {
return fmt.Sprint("Url:", c.config.Url(),
" prevChosenSequencer:", c.prevChosenSequencer,
" reportedWantsLockout:", c.reportedWantsLockout,
" lockoutUntil:", c.lockoutUntil,
" lockoutUntil:", c.lockoutUntil.Load(),
" redisErrors:", c.redisErrors)
}

Expand Down
18 changes: 9 additions & 9 deletions arbnode/seq_coordinator_atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ import (
const messagesPerRound = 20

type CoordinatorTestData struct {
messageCount uint64
messageCount atomic.Uint64

sequencer []string
err error
mutex sync.Mutex

waitForCoords sync.WaitGroup
testStartRound int32
testStartRound atomic.Int32
}

func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *CoordinatorTestData) {
nextRound := int32(0)
for {
sequenced := make([]bool, messagesPerRound)
for atomic.LoadInt32(&data.testStartRound) < nextRound {
for data.testStartRound.Load() < nextRound {
if ctx.Err() != nil {
return
}
Expand All @@ -44,7 +44,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo
nextRound++
var execError error
for {
messageCount := atomic.LoadUint64(&data.messageCount)
messageCount := data.messageCount.Load()
if messageCount >= messagesPerRound {
break
}
Expand All @@ -53,7 +53,7 @@ func coordinatorTestThread(ctx context.Context, coord *SeqCoordinator, data *Coo
err := coord.acquireLockoutAndWriteMessage(ctx, asIndex, asIndex+1, &arbostypes.EmptyTestMessageWithMetadata)
if err == nil {
sequenced[messageCount] = true
atomic.StoreUint64(&data.messageCount, messageCount+1)
data.messageCount.Store(messageCount + 1)
randNr := rand.Intn(20)
if randNr > 15 {
execError = coord.chosenOneRelease(ctx)
Expand Down Expand Up @@ -105,9 +105,9 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {
coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true
coordConfig.Signer.Symmetric.SigningKey = ""
testData := CoordinatorTestData{
testStartRound: -1,
sequencer: make([]string, messagesPerRound),
sequencer: make([]string, messagesPerRound),
}
testData.testStartRound.Store(-1)
nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil)
Require(t, err)

Expand All @@ -134,12 +134,12 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {

for round := int32(0); round < 10; round++ {
redisClient.Del(ctx, redisutil.CHOSENSEQ_KEY, redisutil.MSG_COUNT_KEY)
testData.messageCount = 0
testData.messageCount.Store(0)
for i := 0; i < messagesPerRound; i++ {
testData.sequencer[i] = ""
}
testData.waitForCoords.Add(NumOfThreads)
atomic.StoreInt32(&testData.testStartRound, round)
testData.testStartRound.Store(round)
testData.waitForCoords.Wait()
Require(t, testData.err)
seqList := ""
Expand Down
2 changes: 1 addition & 1 deletion arbnode/sequencer_inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (i *SequencerInbox) LookupBatchesInRange(ctx context.Context, from, to *big
seqNum := parsedLog.BatchSequenceNumber.Uint64()
if lastSeqNum != nil {
if seqNum != *lastSeqNum+1 {
return nil, fmt.Errorf("sequencer batches out of order; after batch %v got batch %v", lastSeqNum, seqNum)
return nil, fmt.Errorf("sequencer batches out of order; after batch %v got batch %v", *lastSeqNum, seqNum)
}
}
lastSeqNum = &seqNum
Expand Down
14 changes: 7 additions & 7 deletions arbnode/simple_redis_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ const test_release_frac = 5
const test_delay = time.Millisecond
const test_redisKey_prefix = "__TEMP_SimpleRedisLockTest__"

func attemptLock(ctx context.Context, s *redislock.Simple, flag *int32, wg *sync.WaitGroup) {
func attemptLock(ctx context.Context, s *redislock.Simple, flag *atomic.Int32, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < test_attempts; i++ {
if s.AttemptLock(ctx) {
atomic.AddInt32(flag, 1)
flag.Add(1)
} else if rand.Intn(test_release_frac) == 0 {
s.Release(ctx)
}
Expand Down Expand Up @@ -76,17 +76,17 @@ func simpleRedisLockTest(t *testing.T, redisKeySuffix string, chosen int, backgo
<-time.After(time.Second)
}
wg := sync.WaitGroup{}
counters := make([]int32, test_threads)
counters := make([]atomic.Int32, test_threads)
for i, lock := range locks {
wg.Add(1)
go attemptLock(ctx, lock, &counters[i], &wg)
}
wg.Wait()
successful := -1
for i, counter := range counters {
if counter != 0 {
if counter != test_attempts {
t.Fatalf("counter %d value %d", i, counter)
for i := range counters {
if counters[i].Load() != 0 {
if counters[i].Load() != test_attempts {
t.Fatalf("counter %d value %d", i, counters[i].Load())
}
if successful > 0 {
t.Fatalf("counter %d and %d both positive", i, successful)
Expand Down
Loading

0 comments on commit 0a078a7

Please sign in to comment.