diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 5c879743a4..fbd1dba96a 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -128,7 +128,7 @@ func TestTransactionStreamer(t *testing.T) { } state.balances = newBalances - var messages []arbostypes.MessageWithMetadata + var messages []arbostypes.MessageWithMetadataAndBlockHash // TODO replay a random amount of messages too numMessages := rand.Int() % 5 for j := 0; j < numMessages; j++ { @@ -154,16 +154,18 @@ func TestTransactionStreamer(t *testing.T) { l2Message = append(l2Message, arbmath.U256Bytes(value)...) var requestId common.Hash binary.BigEndian.PutUint64(requestId.Bytes()[:8], uint64(i)) - messages = append(messages, arbostypes.MessageWithMetadata{ - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_L2Message, - Poster: source, - RequestId: &requestId, + messages = append(messages, arbostypes.MessageWithMetadataAndBlockHash{ + Message: arbostypes.MessageWithMetadata{ + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: source, + RequestId: &requestId, + }, + L2msg: l2Message, }, - L2msg: l2Message, + DelayedMessagesRead: 1, }, - DelayedMessagesRead: 1, }) state.balances[source].Sub(state.balances[source], value) if state.balances[dest] == nil { diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index ba1b875ec8..e2aa1d5e74 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -652,7 +652,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L pos++ } - var messages []arbostypes.MessageWithMetadata + var messages []arbostypes.MessageWithMetadataAndBlockHash backend := &multiplexerBackend{ batchSeqNum: batches[0].SequenceNumber, batches: batches, @@ -673,7 +673,10 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L if err != nil { return err } - messages = append(messages, *msg) + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + Message: *msg, + } + messages = append(messages, msgWithBlockHash) batchMessageCounts[batchSeqNum] = currentpos currentpos += 1 } @@ -733,7 +736,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L } var latestTimestamp uint64 if len(messages) > 0 { - latestTimestamp = messages[len(messages)-1].Message.Header.Timestamp + latestTimestamp = messages[len(messages)-1].Message.Message.Header.Timestamp } log.Info( "InboxTracker", diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index ecf38ddf42..0a27d89d40 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -533,7 +533,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { if readUntil > localMsgCount+c.config.MsgPerPoll { readUntil = localMsgCount + c.config.MsgPerPoll } - var messages []arbostypes.MessageWithMetadata + var messages []arbostypes.MessageWithMetadataAndBlockHash msgToRead := localMsgCount var msgReadErr error for msgToRead < readUntil { @@ -592,7 +592,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { DelayedMessagesRead: lastDelayedMsg, } } - messages = append(messages, message) + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + Message: message, + } + messages = append(messages, msgWithBlockHash) msgToRead++ } if len(messages) > 0 { diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index b8b35186b2..411eba965d 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -60,7 +60,7 @@ type TransactionStreamer struct { nextAllowedFeedReorgLog time.Time - broadcasterQueuedMessages []arbostypes.MessageWithMetadata + broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockHash broadcasterQueuedMessagesPos uint64 broadcasterQueuedMessagesActiveReorg bool @@ -371,7 +371,7 @@ func deleteFromRange(ctx context.Context, db ethdb.Database, prefix []byte, star // The insertion mutex must be held. This acquires the reorg mutex. // Note: oldMessages will be empty if reorgHook is nil -func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata) error { +func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash) error { if count == 0 { return errors.New("cannot reorg out init message") } @@ -465,14 +465,14 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde return err } - messagesWithBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messagesResults)) + messagesWithComputedBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messagesResults)) for i := 0; i < len(messagesResults); i++ { - messagesWithBlockHash = append(messagesWithBlockHash, arbostypes.MessageWithMetadataAndBlockHash{ - Message: newMessages[i], + messagesWithComputedBlockHash = append(messagesWithComputedBlockHash, arbostypes.MessageWithMetadataAndBlockHash{ + Message: newMessages[i].Message, BlockHash: &messagesResults[i].BlockHash, }) } - s.broadcastMessages(messagesWithBlockHash, count) + s.broadcastMessages(messagesWithComputedBlockHash, count) if s.validator != nil { err = s.validator.Reorg(s.GetContext(), count) @@ -555,7 +555,7 @@ func (s *TransactionStreamer) GetProcessedMessageCount() (arbutil.MessageIndex, return msgCount, nil } -func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata) error { +func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash) error { return s.AddMessagesAndEndBatch(pos, messagesAreConfirmed, messages, nil) } @@ -579,7 +579,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe return nil } broadcastStartPos := feedMessages[0].SequenceNumber - var messages []arbostypes.MessageWithMetadata + var messages []arbostypes.MessageWithMetadataAndBlockHash broadcastAfterPos := broadcastStartPos for _, feedMessage := range feedMessages { if broadcastAfterPos != feedMessage.SequenceNumber { @@ -588,7 +588,11 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe if feedMessage.Message.Message == nil || feedMessage.Message.Message.Header == nil { return fmt.Errorf("invalid feed message at sequence number %v", feedMessage.SequenceNumber) } - messages = append(messages, feedMessage.Message) + msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + Message: feedMessage.Message, + BlockHash: feedMessage.BlockHash, + } + messages = append(messages, msgWithBlockHash) broadcastAfterPos++ } @@ -607,7 +611,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe messages = messages[dups:] broadcastStartPos += arbutil.MessageIndex(dups) if oldMsg != nil { - s.logReorg(broadcastStartPos, oldMsg, &messages[0], false) + s.logReorg(broadcastStartPos, oldMsg, &messages[0].Message, false) } if len(messages) == 0 { // No new messages received @@ -681,16 +685,19 @@ func (s *TransactionStreamer) AddFakeInitMessage() error { } chainIdBytes := arbmath.U256Bytes(s.chainConfig.ChainID) msg := append(append(chainIdBytes, 0), chainConfigJson...) - return s.AddMessages(0, false, []arbostypes.MessageWithMetadata{{ - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_Initialize, - RequestId: &common.Hash{}, - L1BaseFee: common.Big0, + return s.AddMessages(0, false, []arbostypes.MessageWithMetadataAndBlockHash{{ + Message: arbostypes.MessageWithMetadata{ + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_Initialize, + RequestId: &common.Hash{}, + L1BaseFee: common.Big0, + }, + L2msg: msg, }, - L2msg: msg, + DelayedMessagesRead: 1, }, - DelayedMessagesRead: 1, + BlockHash: nil, }}) } @@ -708,7 +715,7 @@ func endBatch(batch ethdb.Batch) error { return batch.Write() } -func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { +func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { if messagesAreConfirmed { // Trim confirmed messages from l1pricedataCache s.TrimCache(pos + arbutil.MessageIndex(len(messages))) @@ -748,7 +755,7 @@ func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) ( func (s *TransactionStreamer) countDuplicateMessages( pos arbutil.MessageIndex, - messages []arbostypes.MessageWithMetadata, + messages []arbostypes.MessageWithMetadataAndBlockHash, batch *ethdb.Batch, ) (int, bool, *arbostypes.MessageWithMetadata, error) { curMsg := 0 @@ -768,7 +775,7 @@ func (s *TransactionStreamer) countDuplicateMessages( if err != nil { return 0, false, nil, err } - nextMessage := messages[curMsg] + nextMessage := messages[curMsg].Message wantMessage, err := rlp.EncodeToBytes(nextMessage) if err != nil { return 0, false, nil, err @@ -842,7 +849,7 @@ func (s *TransactionStreamer) logReorg(pos arbutil.MessageIndex, dbMsg *arbostyp } -func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { +func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { var confirmedReorg bool var oldMsg *arbostypes.MessageWithMetadata var lastDelayedRead uint64 @@ -860,7 +867,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil return err } if duplicates > 0 { - lastDelayedRead = messages[duplicates-1].DelayedMessagesRead + lastDelayedRead = messages[duplicates-1].Message.DelayedMessagesRead messages = messages[duplicates:] messageStartPos += arbutil.MessageIndex(duplicates) } @@ -898,13 +905,13 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil return err } if duplicates > 0 { - lastDelayedRead = messages[duplicates-1].DelayedMessagesRead + lastDelayedRead = messages[duplicates-1].Message.DelayedMessagesRead messages = messages[duplicates:] messageStartPos += arbutil.MessageIndex(duplicates) } } if oldMsg != nil { - s.logReorg(messageStartPos, oldMsg, &messages[0], confirmedReorg) + s.logReorg(messageStartPos, oldMsg, &messages[0].Message, confirmedReorg) } if feedReorg { @@ -924,12 +931,12 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil // Validate delayed message counts of remaining messages for i, msg := range messages { msgPos := messageStartPos + arbutil.MessageIndex(i) - diff := msg.DelayedMessagesRead - lastDelayedRead + diff := msg.Message.DelayedMessagesRead - lastDelayedRead if diff != 0 && diff != 1 { - return fmt.Errorf("attempted to insert jump from %v delayed messages read to %v delayed messages read at message index %v", lastDelayedRead, msg.DelayedMessagesRead, msgPos) + return fmt.Errorf("attempted to insert jump from %v delayed messages read to %v delayed messages read at message index %v", lastDelayedRead, msg.Message.DelayedMessagesRead, msgPos) } - lastDelayedRead = msg.DelayedMessagesRead - if msg.Message == nil { + lastDelayedRead = msg.Message.DelayedMessagesRead + if msg.Message.Message == nil { return fmt.Errorf("attempted to insert nil message at position %v", msgPos) } } @@ -1007,14 +1014,14 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( } } - if err := s.writeMessages(pos, []arbostypes.MessageWithMetadata{msgWithMeta}, nil); err != nil { - return err - } - msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ Message: msgWithMeta, BlockHash: &msgResult.BlockHash, } + + if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, nil); err != nil { + return err + } s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos) return nil @@ -1059,12 +1066,12 @@ func (s *TransactionStreamer) broadcastMessages( // The mutex must be held, and pos must be the latest message count. // `batch` may be nil, which initializes a new batch. The batch is closed out in this function. -func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error { +func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error { if batch == nil { batch = s.db.NewBatch() } for i, msg := range messages { - err := s.writeMessage(pos+arbutil.MessageIndex(i), msg, batch) + err := s.writeMessage(pos+arbutil.MessageIndex(i), msg.Message, batch) if err != nil { return err } diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 38569f44ab..c4fbc04712 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -116,7 +116,7 @@ func (s *ExecutionEngine) GetBatchFetcher() execution.BatchFetcher { return s.consensus } -func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { +func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { if count == 0 { return nil, errors.New("cannot reorg out genesis") } @@ -149,9 +149,9 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost for i := range newMessages { var msgForPrefetch *arbostypes.MessageWithMetadata if i < len(newMessages)-1 { - msgForPrefetch = &newMessages[i] + msgForPrefetch = &newMessages[i].Message } - msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch) + msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i].Message, msgForPrefetch) if err != nil { return nil, err } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index ae76b88530..458d6601c5 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -346,7 +346,7 @@ func (n *ExecutionNode) StopAndWait() { func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*execution.MessageResult, error) { return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch) } -func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { +func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { return n.ExecEngine.Reorg(count, newMessages, oldMessages) } func (n *ExecutionNode) HeadMessageNumber() (arbutil.MessageIndex, error) { diff --git a/execution/interface.go b/execution/interface.go index d2a5b58fe5..66aefe9a5e 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -31,7 +31,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken") // always needed type ExecutionClient interface { DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*MessageResult, error) - Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*MessageResult, error) + Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*MessageResult, error) HeadMessageNumber() (arbutil.MessageIndex, error) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) ResultAtPos(pos arbutil.MessageIndex) (*MessageResult, error) diff --git a/system_tests/contract_tx_test.go b/system_tests/contract_tx_test.go index 7d66e516b4..d0f7b153f3 100644 --- a/system_tests/contract_tx_test.go +++ b/system_tests/contract_tx_test.go @@ -69,21 +69,23 @@ func TestContractTxDeploy(t *testing.T) { l2Msg = append(l2Msg, arbmath.U256Bytes(contractTx.Value)...) l2Msg = append(l2Msg, contractTx.Data...) - err = builder.L2.ConsensusNode.TxStreamer.AddMessages(pos, true, []arbostypes.MessageWithMetadata{ + err = builder.L2.ConsensusNode.TxStreamer.AddMessages(pos, true, []arbostypes.MessageWithMetadataAndBlockHash{ { - Message: &arbostypes.L1IncomingMessage{ - Header: &arbostypes.L1IncomingMessageHeader{ - Kind: arbostypes.L1MessageType_L2Message, - Poster: from, - BlockNumber: 0, - Timestamp: 0, - RequestId: &contractTx.RequestId, - L1BaseFee: &big.Int{}, + Message: arbostypes.MessageWithMetadata{ + Message: &arbostypes.L1IncomingMessage{ + Header: &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: from, + BlockNumber: 0, + Timestamp: 0, + RequestId: &contractTx.RequestId, + L1BaseFee: &big.Int{}, + }, + L2msg: l2Msg, + BatchGasCost: new(uint64), }, - L2msg: l2Msg, - BatchGasCost: new(uint64), + DelayedMessagesRead: delayedMessagesRead, }, - DelayedMessagesRead: delayedMessagesRead, }, }) Require(t, err) diff --git a/system_tests/reorg_resequencing_test.go b/system_tests/reorg_resequencing_test.go index b188504acb..6d5ecd5e6a 100644 --- a/system_tests/reorg_resequencing_test.go +++ b/system_tests/reorg_resequencing_test.go @@ -72,9 +72,11 @@ func TestReorgResequencing(t *testing.T) { }, L2msg: append(builder.L2Info.GetAddress("User4").Bytes(), arbmath.Uint64ToU256Bytes(params.Ether)...), } - err = builder.L2.ConsensusNode.TxStreamer.AddMessages(startMsgCount, true, []arbostypes.MessageWithMetadata{{ - Message: newMessage, - DelayedMessagesRead: prevMessage.DelayedMessagesRead + 1, + err = builder.L2.ConsensusNode.TxStreamer.AddMessages(startMsgCount, true, []arbostypes.MessageWithMetadataAndBlockHash{{ + Message: arbostypes.MessageWithMetadata{ + Message: newMessage, + DelayedMessagesRead: prevMessage.DelayedMessagesRead + 1, + }, }}) Require(t, err) diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index 886a0528c7..5e539a8812 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -91,7 +91,10 @@ func TestRedisSeqCoordinatorPriorities(t *testing.T) { return false } Require(t, err) - Require(t, node.TxStreamer.AddMessages(curMsgs, false, []arbostypes.MessageWithMetadata{emptyMessage})) + emptyMessageWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{ + Message: emptyMessage, + } + Require(t, node.TxStreamer.AddMessages(curMsgs, false, []arbostypes.MessageWithMetadataAndBlockHash{emptyMessageWithBlockHash})) return true }