Skip to content

Commit

Permalink
pass MessageWithMetadataAndBlockHashes to writeMessages
Browse files Browse the repository at this point in the history
  • Loading branch information
diegoximenes committed May 10, 2024
1 parent 8d903c5 commit 1f99ca9
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 70 deletions.
20 changes: 11 additions & 9 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestTransactionStreamer(t *testing.T) {
}
state.balances = newBalances

var messages []arbostypes.MessageWithMetadata
var messages []arbostypes.MessageWithMetadataAndBlockHash
// TODO replay a random amount of messages too
numMessages := rand.Int() % 5
for j := 0; j < numMessages; j++ {
Expand All @@ -154,16 +154,18 @@ func TestTransactionStreamer(t *testing.T) {
l2Message = append(l2Message, arbmath.U256Bytes(value)...)
var requestId common.Hash
binary.BigEndian.PutUint64(requestId.Bytes()[:8], uint64(i))
messages = append(messages, arbostypes.MessageWithMetadata{
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_L2Message,
Poster: source,
RequestId: &requestId,
messages = append(messages, arbostypes.MessageWithMetadataAndBlockHash{
Message: arbostypes.MessageWithMetadata{
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_L2Message,
Poster: source,
RequestId: &requestId,
},
L2msg: l2Message,
},
L2msg: l2Message,
DelayedMessagesRead: 1,
},
DelayedMessagesRead: 1,
})
state.balances[source].Sub(state.balances[source], value)
if state.balances[dest] == nil {
Expand Down
9 changes: 6 additions & 3 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
pos++
}

var messages []arbostypes.MessageWithMetadata
var messages []arbostypes.MessageWithMetadataAndBlockHash
backend := &multiplexerBackend{
batchSeqNum: batches[0].SequenceNumber,
batches: batches,
Expand All @@ -673,7 +673,10 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
if err != nil {
return err
}
messages = append(messages, *msg)
msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{
Message: *msg,
}
messages = append(messages, msgWithBlockHash)
batchMessageCounts[batchSeqNum] = currentpos
currentpos += 1
}
Expand Down Expand Up @@ -733,7 +736,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
}
var latestTimestamp uint64
if len(messages) > 0 {
latestTimestamp = messages[len(messages)-1].Message.Header.Timestamp
latestTimestamp = messages[len(messages)-1].Message.Message.Header.Timestamp
}
log.Info(
"InboxTracker",
Expand Down
7 changes: 5 additions & 2 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
if readUntil > localMsgCount+c.config.MsgPerPoll {
readUntil = localMsgCount + c.config.MsgPerPoll
}
var messages []arbostypes.MessageWithMetadata
var messages []arbostypes.MessageWithMetadataAndBlockHash
msgToRead := localMsgCount
var msgReadErr error
for msgToRead < readUntil {
Expand Down Expand Up @@ -592,7 +592,10 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
DelayedMessagesRead: lastDelayedMsg,
}
}
messages = append(messages, message)
msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{
Message: message,
}
messages = append(messages, msgWithBlockHash)
msgToRead++
}
if len(messages) > 0 {
Expand Down
77 changes: 42 additions & 35 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type TransactionStreamer struct {

nextAllowedFeedReorgLog time.Time

broadcasterQueuedMessages []arbostypes.MessageWithMetadata
broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockHash
broadcasterQueuedMessagesPos uint64
broadcasterQueuedMessagesActiveReorg bool

Expand Down Expand Up @@ -371,7 +371,7 @@ func deleteFromRange(ctx context.Context, db ethdb.Database, prefix []byte, star

// The insertion mutex must be held. This acquires the reorg mutex.
// Note: oldMessages will be empty if reorgHook is nil
func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata) error {
func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash) error {
if count == 0 {
return errors.New("cannot reorg out init message")
}
Expand Down Expand Up @@ -465,14 +465,14 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde
return err
}

messagesWithBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messagesResults))
messagesWithComputedBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messagesResults))
for i := 0; i < len(messagesResults); i++ {
messagesWithBlockHash = append(messagesWithBlockHash, arbostypes.MessageWithMetadataAndBlockHash{
Message: newMessages[i],
messagesWithComputedBlockHash = append(messagesWithComputedBlockHash, arbostypes.MessageWithMetadataAndBlockHash{
Message: newMessages[i].Message,
BlockHash: &messagesResults[i].BlockHash,
})
}
s.broadcastMessages(messagesWithBlockHash, count)
s.broadcastMessages(messagesWithComputedBlockHash, count)

if s.validator != nil {
err = s.validator.Reorg(s.GetContext(), count)
Expand Down Expand Up @@ -555,7 +555,7 @@ func (s *TransactionStreamer) GetProcessedMessageCount() (arbutil.MessageIndex,
return msgCount, nil
}

func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata) error {
func (s *TransactionStreamer) AddMessages(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash) error {
return s.AddMessagesAndEndBatch(pos, messagesAreConfirmed, messages, nil)
}

Expand All @@ -579,7 +579,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe
return nil
}
broadcastStartPos := feedMessages[0].SequenceNumber
var messages []arbostypes.MessageWithMetadata
var messages []arbostypes.MessageWithMetadataAndBlockHash
broadcastAfterPos := broadcastStartPos
for _, feedMessage := range feedMessages {
if broadcastAfterPos != feedMessage.SequenceNumber {
Expand All @@ -588,7 +588,11 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe
if feedMessage.Message.Message == nil || feedMessage.Message.Message.Header == nil {
return fmt.Errorf("invalid feed message at sequence number %v", feedMessage.SequenceNumber)
}
messages = append(messages, feedMessage.Message)
msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{
Message: feedMessage.Message,
BlockHash: feedMessage.BlockHash,
}
messages = append(messages, msgWithBlockHash)
broadcastAfterPos++
}

Expand All @@ -607,7 +611,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe
messages = messages[dups:]
broadcastStartPos += arbutil.MessageIndex(dups)
if oldMsg != nil {
s.logReorg(broadcastStartPos, oldMsg, &messages[0], false)
s.logReorg(broadcastStartPos, oldMsg, &messages[0].Message, false)
}
if len(messages) == 0 {
// No new messages received
Expand Down Expand Up @@ -681,16 +685,19 @@ func (s *TransactionStreamer) AddFakeInitMessage() error {
}
chainIdBytes := arbmath.U256Bytes(s.chainConfig.ChainID)
msg := append(append(chainIdBytes, 0), chainConfigJson...)
return s.AddMessages(0, false, []arbostypes.MessageWithMetadata{{
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_Initialize,
RequestId: &common.Hash{},
L1BaseFee: common.Big0,
return s.AddMessages(0, false, []arbostypes.MessageWithMetadataAndBlockHash{{
Message: arbostypes.MessageWithMetadata{
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_Initialize,
RequestId: &common.Hash{},
L1BaseFee: common.Big0,
},
L2msg: msg,
},
L2msg: msg,
DelayedMessagesRead: 1,
},
DelayedMessagesRead: 1,
BlockHash: nil,
}})
}

Expand All @@ -708,7 +715,7 @@ func endBatch(batch ethdb.Batch) error {
return batch.Write()
}

func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error {
func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error {
if messagesAreConfirmed {
// Trim confirmed messages from l1pricedataCache
s.TrimCache(pos + arbutil.MessageIndex(len(messages)))
Expand Down Expand Up @@ -748,7 +755,7 @@ func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (

func (s *TransactionStreamer) countDuplicateMessages(
pos arbutil.MessageIndex,
messages []arbostypes.MessageWithMetadata,
messages []arbostypes.MessageWithMetadataAndBlockHash,
batch *ethdb.Batch,
) (int, bool, *arbostypes.MessageWithMetadata, error) {
curMsg := 0
Expand All @@ -768,7 +775,7 @@ func (s *TransactionStreamer) countDuplicateMessages(
if err != nil {
return 0, false, nil, err
}
nextMessage := messages[curMsg]
nextMessage := messages[curMsg].Message
wantMessage, err := rlp.EncodeToBytes(nextMessage)
if err != nil {
return 0, false, nil, err
Expand Down Expand Up @@ -842,7 +849,7 @@ func (s *TransactionStreamer) logReorg(pos arbutil.MessageIndex, dbMsg *arbostyp

}

func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error {
func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error {
var confirmedReorg bool
var oldMsg *arbostypes.MessageWithMetadata
var lastDelayedRead uint64
Expand All @@ -860,7 +867,7 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil
return err
}
if duplicates > 0 {
lastDelayedRead = messages[duplicates-1].DelayedMessagesRead
lastDelayedRead = messages[duplicates-1].Message.DelayedMessagesRead
messages = messages[duplicates:]
messageStartPos += arbutil.MessageIndex(duplicates)
}
Expand Down Expand Up @@ -898,13 +905,13 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil
return err
}
if duplicates > 0 {
lastDelayedRead = messages[duplicates-1].DelayedMessagesRead
lastDelayedRead = messages[duplicates-1].Message.DelayedMessagesRead
messages = messages[duplicates:]
messageStartPos += arbutil.MessageIndex(duplicates)
}
}
if oldMsg != nil {
s.logReorg(messageStartPos, oldMsg, &messages[0], confirmedReorg)
s.logReorg(messageStartPos, oldMsg, &messages[0].Message, confirmedReorg)
}

if feedReorg {
Expand All @@ -924,12 +931,12 @@ func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil
// Validate delayed message counts of remaining messages
for i, msg := range messages {
msgPos := messageStartPos + arbutil.MessageIndex(i)
diff := msg.DelayedMessagesRead - lastDelayedRead
diff := msg.Message.DelayedMessagesRead - lastDelayedRead
if diff != 0 && diff != 1 {
return fmt.Errorf("attempted to insert jump from %v delayed messages read to %v delayed messages read at message index %v", lastDelayedRead, msg.DelayedMessagesRead, msgPos)
return fmt.Errorf("attempted to insert jump from %v delayed messages read to %v delayed messages read at message index %v", lastDelayedRead, msg.Message.DelayedMessagesRead, msgPos)
}
lastDelayedRead = msg.DelayedMessagesRead
if msg.Message == nil {
lastDelayedRead = msg.Message.DelayedMessagesRead
if msg.Message.Message == nil {
return fmt.Errorf("attempted to insert nil message at position %v", msgPos)
}
}
Expand Down Expand Up @@ -1007,14 +1014,14 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(
}
}

if err := s.writeMessages(pos, []arbostypes.MessageWithMetadata{msgWithMeta}, nil); err != nil {
return err
}

msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{
Message: msgWithMeta,
BlockHash: &msgResult.BlockHash,
}

if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, nil); err != nil {
return err
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos)

return nil
Expand Down Expand Up @@ -1059,12 +1066,12 @@ func (s *TransactionStreamer) broadcastMessages(

// The mutex must be held, and pos must be the latest message count.
// `batch` may be nil, which initializes a new batch. The batch is closed out in this function.
func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error {
func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error {
if batch == nil {
batch = s.db.NewBatch()
}
for i, msg := range messages {
err := s.writeMessage(pos+arbutil.MessageIndex(i), msg, batch)
err := s.writeMessage(pos+arbutil.MessageIndex(i), msg.Message, batch)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *ExecutionEngine) GetBatchFetcher() execution.BatchFetcher {
return s.consensus
}

func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) {
func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) {
if count == 0 {
return nil, errors.New("cannot reorg out genesis")
}
Expand Down Expand Up @@ -149,9 +149,9 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost
for i := range newMessages {
var msgForPrefetch *arbostypes.MessageWithMetadata
if i < len(newMessages)-1 {
msgForPrefetch = &newMessages[i]
msgForPrefetch = &newMessages[i].Message
}
msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch)
msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i].Message, msgForPrefetch)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (n *ExecutionNode) StopAndWait() {
func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*execution.MessageResult, error) {
return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch)
}
func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) {
func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) {
return n.ExecEngine.Reorg(count, newMessages, oldMessages)
}
func (n *ExecutionNode) HeadMessageNumber() (arbutil.MessageIndex, error) {
Expand Down
2 changes: 1 addition & 1 deletion execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken")
// always needed
type ExecutionClient interface {
DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*MessageResult, error)
Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) ([]*MessageResult, error)
Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*MessageResult, error)
HeadMessageNumber() (arbutil.MessageIndex, error)
HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error)
ResultAtPos(pos arbutil.MessageIndex) (*MessageResult, error)
Expand Down
26 changes: 14 additions & 12 deletions system_tests/contract_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,23 @@ func TestContractTxDeploy(t *testing.T) {
l2Msg = append(l2Msg, arbmath.U256Bytes(contractTx.Value)...)
l2Msg = append(l2Msg, contractTx.Data...)

err = builder.L2.ConsensusNode.TxStreamer.AddMessages(pos, true, []arbostypes.MessageWithMetadata{
err = builder.L2.ConsensusNode.TxStreamer.AddMessages(pos, true, []arbostypes.MessageWithMetadataAndBlockHash{
{
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_L2Message,
Poster: from,
BlockNumber: 0,
Timestamp: 0,
RequestId: &contractTx.RequestId,
L1BaseFee: &big.Int{},
Message: arbostypes.MessageWithMetadata{
Message: &arbostypes.L1IncomingMessage{
Header: &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_L2Message,
Poster: from,
BlockNumber: 0,
Timestamp: 0,
RequestId: &contractTx.RequestId,
L1BaseFee: &big.Int{},
},
L2msg: l2Msg,
BatchGasCost: new(uint64),
},
L2msg: l2Msg,
BatchGasCost: new(uint64),
DelayedMessagesRead: delayedMessagesRead,
},
DelayedMessagesRead: delayedMessagesRead,
},
})
Require(t, err)
Expand Down
8 changes: 5 additions & 3 deletions system_tests/reorg_resequencing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ func TestReorgResequencing(t *testing.T) {
},
L2msg: append(builder.L2Info.GetAddress("User4").Bytes(), arbmath.Uint64ToU256Bytes(params.Ether)...),
}
err = builder.L2.ConsensusNode.TxStreamer.AddMessages(startMsgCount, true, []arbostypes.MessageWithMetadata{{
Message: newMessage,
DelayedMessagesRead: prevMessage.DelayedMessagesRead + 1,
err = builder.L2.ConsensusNode.TxStreamer.AddMessages(startMsgCount, true, []arbostypes.MessageWithMetadataAndBlockHash{{
Message: arbostypes.MessageWithMetadata{
Message: newMessage,
DelayedMessagesRead: prevMessage.DelayedMessagesRead + 1,
},
}})
Require(t, err)

Expand Down
Loading

0 comments on commit 1f99ca9

Please sign in to comment.