diff --git a/arbos/block_processor.go b/arbos/block_processor.go index 77475856ac..a06034f905 100644 --- a/arbos/block_processor.go +++ b/arbos/block_processor.go @@ -115,11 +115,12 @@ func createNewHeader(prevHeader *types.Header, l1info *L1Info, state *arbosState type ConditionalOptionsForTx []*arbitrum_types.ConditionalOptions type SequencingHooks struct { - TxErrors []error - DiscardInvalidTxsEarly bool - PreTxFilter func(*params.ChainConfig, *types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, *arbitrum_types.ConditionalOptions, common.Address, *L1Info) error - PostTxFilter func(*types.Header, *arbosState.ArbosState, *types.Transaction, common.Address, uint64, *core.ExecutionResult) error - ConditionalOptionsForTx []*arbitrum_types.ConditionalOptions + TxErrors []error // This can be unset + DiscardInvalidTxsEarly bool // This can be unset + PreTxFilter func(*params.ChainConfig, *types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, *arbitrum_types.ConditionalOptions, common.Address, *L1Info) error // This has to be set + PostTxFilter func(*types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, common.Address, uint64, *core.ExecutionResult) error // This has to be set + BlockFilter func(*types.Header, *state.StateDB, types.Transactions, types.Receipts) error // This can be unset + ConditionalOptionsForTx []*arbitrum_types.ConditionalOptions // This can be unset } func NoopSequencingHooks() *SequencingHooks { @@ -129,10 +130,11 @@ func NoopSequencingHooks() *SequencingHooks { func(*params.ChainConfig, *types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, *arbitrum_types.ConditionalOptions, common.Address, *L1Info) error { return nil }, - func(*types.Header, *arbosState.ArbosState, *types.Transaction, common.Address, uint64, *core.ExecutionResult) error { + func(*types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, common.Address, uint64, *core.ExecutionResult) error { return nil }, nil, + nil, } } @@ -172,7 +174,7 @@ func ProduceBlockAdvanced( runMode core.MessageRunMode, ) (*types.Block, types.Receipts, error) { - state, err := arbosState.OpenSystemArbosState(statedb, nil, true) + arbState, err := arbosState.OpenSystemArbosState(statedb, nil, true) if err != nil { return nil, nil, err } @@ -189,11 +191,11 @@ func ProduceBlockAdvanced( l1Timestamp: l1Header.Timestamp, } - header := createNewHeader(lastBlockHeader, l1Info, state, chainConfig) + header := createNewHeader(lastBlockHeader, l1Info, arbState, chainConfig) signer := types.MakeSigner(chainConfig, header.Number, header.Time) // Note: blockGasLeft will diverge from the actual gas left during execution in the event of invalid txs, // but it's only used as block-local representation limiting the amount of work done in a block. - blockGasLeft, _ := state.L2PricingState().PerBlockGasLimit() + blockGasLeft, _ := arbState.L2PricingState().PerBlockGasLimit() l1BlockNum := l1Info.l1BlockNumber // Prepend a tx before all others to touch up the state (update the L1 block num, pricing pools, etc) @@ -226,7 +228,7 @@ func ProduceBlockAdvanced( if !ok { return nil, nil, errors.New("retryable tx is somehow not a retryable") } - retryable, _ := state.RetryableState().OpenRetryable(retry.TicketId, time) + retryable, _ := arbState.RetryableState().OpenRetryable(retry.TicketId, time) if retryable == nil { // retryable was already deleted continue @@ -263,22 +265,22 @@ func ProduceBlockAdvanced( return nil, nil, err } - if err = hooks.PreTxFilter(chainConfig, header, statedb, state, tx, options, sender, l1Info); err != nil { + if err = hooks.PreTxFilter(chainConfig, header, statedb, arbState, tx, options, sender, l1Info); err != nil { return nil, nil, err } // Additional pre-transaction validity check - if err = extraPreTxFilter(chainConfig, header, statedb, state, tx, options, sender, l1Info); err != nil { + if err = extraPreTxFilter(chainConfig, header, statedb, arbState, tx, options, sender, l1Info); err != nil { return nil, nil, err } if basefee.Sign() > 0 { dataGas = math.MaxUint64 - brotliCompressionLevel, err := state.BrotliCompressionLevel() + brotliCompressionLevel, err := arbState.BrotliCompressionLevel() if err != nil { return nil, nil, fmt.Errorf("failed to get brotli compression level: %w", err) } - posterCost, _ := state.L1PricingState().GetPosterInfo(tx, poster, brotliCompressionLevel) + posterCost, _ := arbState.L1PricingState().GetPosterInfo(tx, poster, brotliCompressionLevel) posterCostInL2Gas := arbmath.BigDiv(posterCost, basefee) if posterCostInL2Gas.IsUint64() { @@ -322,18 +324,20 @@ func ProduceBlockAdvanced( vm.Config{}, runMode, func(result *core.ExecutionResult) error { - return hooks.PostTxFilter(header, state, tx, sender, dataGas, result) + return hooks.PostTxFilter(header, statedb, arbState, tx, sender, dataGas, result) }, ) if err != nil { // Ignore this transaction if it's invalid under the state transition function statedb.RevertToSnapshot(snap) + statedb.ClearTxFilter() return nil, nil, err } // Additional post-transaction validity check - if err = extraPostTxFilter(chainConfig, header, statedb, state, tx, options, sender, l1Info, result); err != nil { + if err = extraPostTxFilter(chainConfig, header, statedb, arbState, tx, options, sender, l1Info, result); err != nil { statedb.RevertToSnapshot(snap) + statedb.ClearTxFilter() return nil, nil, err } @@ -363,13 +367,13 @@ func ProduceBlockAdvanced( if tx.Type() == types.ArbitrumInternalTxType { // ArbOS might have upgraded to a new version, so we need to refresh our state - state, err = arbosState.OpenSystemArbosState(statedb, nil, true) + arbState, err = arbosState.OpenSystemArbosState(statedb, nil, true) if err != nil { return nil, nil, err } // Update the ArbOS version in the header (if it changed) extraInfo := types.DeserializeHeaderExtraInformation(header) - extraInfo.ArbOSFormatVersion = state.ArbOSVersion() + extraInfo.ArbOSFormatVersion = arbState.ArbOSVersion() extraInfo.UpdateHeaderWithInfo(header) } @@ -455,6 +459,16 @@ func ProduceBlockAdvanced( } } + if statedb.IsTxFiltered() { + return nil, nil, state.ErrArbTxFilter + } + + if sequencingHooks.BlockFilter != nil { + if err = sequencingHooks.BlockFilter(header, statedb, complete, receipts); err != nil { + return nil, nil, err + } + } + binary.BigEndian.PutUint64(header.Nonce[:], delayedMessagesRead) FinalizeBlock(header, complete, statedb, chainConfig) diff --git a/cmd/datool/datool.go b/cmd/datool/datool.go index 06f94dc952..67998880e0 100644 --- a/cmd/datool/datool.go +++ b/cmd/datool/datool.go @@ -92,6 +92,7 @@ type ClientStoreConfig struct { SigningWallet string `koanf:"signing-wallet"` SigningWalletPassword string `koanf:"signing-wallet-password"` MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"` + EnableChunkedStore bool `koanf:"enable-chunked-store"` } func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) { @@ -104,6 +105,7 @@ func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) { f.String("signing-wallet-password", genericconf.PASSWORD_NOT_SET, "password to unlock the wallet, if not specified the user is prompted for the password") f.Duration("das-retention-period", 24*time.Hour, "The period which DASes are requested to retain the stored batches.") f.Int("max-store-chunk-body-size", 512*1024, "The maximum HTTP POST body size for a chunked store request") + f.Bool("enable-chunked-store", true, "enable data to be sent to DAS in chunks instead of all at once") k, err := confighelpers.BeginCommonParse(f, args) if err != nil { @@ -152,7 +154,7 @@ func startClientStore(args []string) error { } } - client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize) + client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize, config.EnableChunkedStore) if err != nil { return err } diff --git a/das/aggregator.go b/das/aggregator.go index 85fccb078f..4b1653a687 100644 --- a/das/aggregator.go +++ b/das/aggregator.go @@ -41,12 +41,14 @@ type AggregatorConfig struct { AssumedHonest int `koanf:"assumed-honest"` Backends BackendConfigList `koanf:"backends"` MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"` + EnableChunkedStore bool `koanf:"enable-chunked-store"` } var DefaultAggregatorConfig = AggregatorConfig{ AssumedHonest: 0, Backends: nil, MaxStoreChunkBodySize: 512 * 1024, + EnableChunkedStore: true, } var parsedBackendsConf BackendConfigList @@ -56,6 +58,7 @@ func AggregatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.Int(prefix+".assumed-honest", DefaultAggregatorConfig.AssumedHonest, "Number of assumed honest backends (H). If there are N backends, K=N+1-H valid responses are required to consider an Store request to be successful.") f.Var(&parsedBackendsConf, prefix+".backends", "JSON RPC backend configuration. This can be specified on the command line as a JSON array, eg: [{\"url\": \"...\", \"pubkey\": \"...\"},...], or as a JSON array in the config file.") f.Int(prefix+".max-store-chunk-body-size", DefaultAggregatorConfig.MaxStoreChunkBodySize, "maximum HTTP POST body size to use for individual batch chunks, including JSON RPC overhead and an estimated overhead of 512B of headers") + f.Bool(prefix+".enable-chunked-store", DefaultAggregatorConfig.EnableChunkedStore, "enable data to be sent to DAS in chunks instead of all at once") } type Aggregator struct { diff --git a/das/aggregator_test.go b/das/aggregator_test.go index 217315eef0..b14c2961ce 100644 --- a/das/aggregator_test.go +++ b/das/aggregator_test.go @@ -50,7 +50,7 @@ func TestDAS_BasicAggregationLocal(t *testing.T) { backends = append(backends, *details) } - aggregator, err := NewAggregator(ctx, DataAvailabilityConfig{RPCAggregator: AggregatorConfig{AssumedHonest: 1}, ParentChainNodeURL: "none"}, backends) + aggregator, err := NewAggregator(ctx, DataAvailabilityConfig{RPCAggregator: AggregatorConfig{AssumedHonest: 1, EnableChunkedStore: true}, ParentChainNodeURL: "none"}, backends) Require(t, err) rawMsg := []byte("It's time for you to see the fnords.") @@ -207,7 +207,7 @@ func testConfigurableStorageFailures(t *testing.T, shouldFailAggregation bool) { aggregator, err := NewAggregator( ctx, DataAvailabilityConfig{ - RPCAggregator: AggregatorConfig{AssumedHonest: assumedHonest}, + RPCAggregator: AggregatorConfig{AssumedHonest: assumedHonest, EnableChunkedStore: true}, ParentChainNodeURL: "none", RequestTimeout: time.Millisecond * 2000, }, backends) diff --git a/das/dasRpcClient.go b/das/dasRpcClient.go index 3ea6c4e2c6..5d4ca0dc93 100644 --- a/das/dasRpcClient.go +++ b/das/dasRpcClient.go @@ -35,10 +35,11 @@ var ( ) type DASRPCClient struct { // implements DataAvailabilityService - clnt *rpc.Client - url string - signer signature.DataSignerFunc - chunkSize uint64 + clnt *rpc.Client + url string + signer signature.DataSignerFunc + chunkSize uint64 + enableChunkedStore bool } func nilSigner(_ []byte) ([]byte, error) { @@ -47,7 +48,7 @@ func nilSigner(_ []byte) ([]byte, error) { const sendChunkJSONBoilerplate = "{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"das_sendChunked\",\"params\":[\"\"]}" -func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int) (*DASRPCClient, error) { +func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int, enableChunkedStore bool) (*DASRPCClient, error) { clnt, err := rpc.Dial(target) if err != nil { return nil, err @@ -56,18 +57,23 @@ func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChu signer = nilSigner } + client := &DASRPCClient{ + clnt: clnt, + url: target, + signer: signer, + enableChunkedStore: enableChunkedStore, + } + // Byte arrays are encoded in base64 - chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2 - if chunkSize <= 0 { - return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize) + if enableChunkedStore { + chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2 + if chunkSize <= 0 { + return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize) + } + client.chunkSize = uint64(chunkSize) } - return &DASRPCClient{ - clnt: clnt, - url: target, - signer: signer, - chunkSize: uint64(chunkSize), - }, nil + return client, nil } func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) { @@ -83,6 +89,11 @@ func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64 rpcClientStoreDurationHistogram.Update(time.Since(start).Nanoseconds()) }() + if !c.enableChunkedStore { + log.Debug("Legacy store is being force-used by the DAS client", "url", c.url) + return c.legacyStore(ctx, message, timeout) + } + // #nosec G115 timestamp := uint64(start.Unix()) nChunks := uint64(len(message)) / c.chunkSize diff --git a/das/rpc_aggregator.go b/das/rpc_aggregator.go index 916637aac6..1c9e2eecab 100644 --- a/das/rpc_aggregator.go +++ b/das/rpc_aggregator.go @@ -110,7 +110,7 @@ func ParseServices(config AggregatorConfig, signer signature.DataSignerFunc) ([] } metricName := metricsutil.CanonicalizeMetricName(url.Hostname()) - service, err := NewDASRPCClient(b.URL, signer, config.MaxStoreChunkBodySize) + service, err := NewDASRPCClient(b.URL, signer, config.MaxStoreChunkBodySize, config.EnableChunkedStore) if err != nil { return nil, err } diff --git a/das/rpc_test.go b/das/rpc_test.go index ebc4b736d5..c4ee71aa4f 100644 --- a/das/rpc_test.go +++ b/das/rpc_test.go @@ -84,6 +84,7 @@ func testRpcImpl(t *testing.T, size, times int, concurrent bool) { AssumedHonest: 1, Backends: beConfigs, MaxStoreChunkBodySize: (chunkSize * 2) + len(sendChunkJSONBoilerplate), + EnableChunkedStore: true, }, RequestTimeout: time.Minute, } diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 598af1cd37..aba8faae33 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -650,7 +650,10 @@ func (s *Sequencer) preTxFilter(_ *params.ChainConfig, header *types.Header, sta return nil } -func (s *Sequencer) postTxFilter(header *types.Header, _ *arbosState.ArbosState, tx *types.Transaction, sender common.Address, dataGas uint64, result *core.ExecutionResult) error { +func (s *Sequencer) postTxFilter(header *types.Header, statedb *state.StateDB, _ *arbosState.ArbosState, tx *types.Transaction, sender common.Address, dataGas uint64, result *core.ExecutionResult) error { + if statedb.IsTxFiltered() { + return state.ErrArbTxFilter + } if result.Err != nil && result.UsedGas > dataGas && result.UsedGas-dataGas <= s.config().MaxRevertGasReject { return arbitrum.NewRevertReason(result) } diff --git a/go-ethereum b/go-ethereum index 17286562eb..5bf9a75f04 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 17286562eb2f0dcd02dafdaf5da0fce5ff1f6096 +Subproject commit 5bf9a75f04bc59137e06816e065b8494f796ba2d diff --git a/staker/bold/bold_staker.go b/staker/bold/bold_staker.go index 1a8eed80fa..063f7b9719 100644 --- a/staker/bold/bold_staker.go +++ b/staker/bold/bold_staker.go @@ -244,20 +244,27 @@ func (b *BOLDStaker) Start(ctxIn context.Context) { if err != nil { log.Warn("error updating latest wasm module root", "err", err) } + confirmedMsgCount, confirmedGlobalState, err := b.getLatestState(ctx, true) + if err != nil { + log.Error("staker: error checking latest confirmed", "err", err) + } + agreedMsgCount, agreedGlobalState, err := b.getLatestState(ctx, false) if err != nil { log.Error("staker: error checking latest agreed", "err", err) } + if agreedGlobalState == nil { + // If we don't have a latest agreed global state, we should fall back to + // using the latest confirmed global state. + agreedGlobalState = confirmedGlobalState + agreedMsgCount = confirmedMsgCount + } if agreedGlobalState != nil { for _, notifier := range b.stakedNotifiers { notifier.UpdateLatestStaked(agreedMsgCount, *agreedGlobalState) } } - confirmedMsgCount, confirmedGlobalState, err := b.getLatestState(ctx, true) - if err != nil { - log.Error("staker: error checking latest confirmed", "err", err) - } if confirmedGlobalState != nil { for _, notifier := range b.confirmedNotifiers { diff --git a/system_tests/das_test.go b/system_tests/das_test.go index 52703c879d..ba50dcfff2 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -90,6 +90,7 @@ func aggConfigForBackend(backendConfig das.BackendConfig) das.AggregatorConfig { AssumedHonest: 1, Backends: das.BackendConfigList{backendConfig}, MaxStoreChunkBodySize: 512 * 1024, + EnableChunkedStore: true, } } diff --git a/system_tests/seq_filter_test.go b/system_tests/seq_filter_test.go new file mode 100644 index 0000000000..fdd0c96d13 --- /dev/null +++ b/system_tests/seq_filter_test.go @@ -0,0 +1,146 @@ +package arbtest + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/arbitrum_types" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/params" + + "github.com/offchainlabs/nitro/arbos" + "github.com/offchainlabs/nitro/arbos/arbosState" + "github.com/offchainlabs/nitro/arbos/arbostypes" + "github.com/offchainlabs/nitro/arbos/l1pricing" + "github.com/offchainlabs/nitro/util/arbmath" +) + +func TestSequencerTxFilter(t *testing.T) { + t.Parallel() + + builder, header, txes, hooks, cleanup := setupSequencerFilterTest(t, false) + defer cleanup() + + block, err := builder.L2.ExecNode.ExecEngine.SequenceTransactions(header, txes, hooks) + Require(t, err) // There shouldn't be any error in block generation + if block == nil { + t.Fatal("block should be generated as second tx should pass") + } + if len(block.Transactions()) != 2 { + t.Fatalf("expecting two txs found: %d", len(block.Transactions())) + } + if block.Transactions()[1].Hash() != txes[1].Hash() { + t.Fatal("tx hash mismatch, expecting second tx to be present in the block") + } + if len(hooks.TxErrors) != 2 { + t.Fatalf("expected 2 txErrors in hooks, found: %d", len(hooks.TxErrors)) + } + if hooks.TxErrors[0].Error() != state.ErrArbTxFilter.Error() { + t.Fatalf("expected ErrArbTxFilter, found: %s", err.Error()) + } + if hooks.TxErrors[1] != nil { + t.Fatalf("found a non-nil error for second transaction: %v", hooks.TxErrors[1]) + } +} + +func TestSequencerBlockFilterReject(t *testing.T) { + t.Parallel() + + builder, header, txes, hooks, cleanup := setupSequencerFilterTest(t, true) + defer cleanup() + + block, err := builder.L2.ExecNode.ExecEngine.SequenceTransactions(header, txes, hooks) + if block != nil { + t.Fatal("block shouldn't be generated when all txes have failed") + } + if err == nil { + t.Fatal("expected ErrArbTxFilter but found nil") + } + if err.Error() != state.ErrArbTxFilter.Error() { + t.Fatalf("expected ErrArbTxFilter, found: %s", err.Error()) + } +} + +func TestSequencerBlockFilterAccept(t *testing.T) { + t.Parallel() + + builder, header, txes, hooks, cleanup := setupSequencerFilterTest(t, true) + defer cleanup() + + block, err := builder.L2.ExecNode.ExecEngine.SequenceTransactions(header, txes[1:], hooks) + Require(t, err) + if block == nil { + t.Fatal("block should be generated as the tx should pass") + } + if len(block.Transactions()) != 2 { + t.Fatalf("expecting two txs found: %d", len(block.Transactions())) + } + if block.Transactions()[1].Hash() != txes[1].Hash() { + t.Fatal("tx hash mismatch, expecting second tx to be present in the block") + } +} + +func setupSequencerFilterTest(t *testing.T, isBlockFilter bool) (*NodeBuilder, *arbostypes.L1IncomingMessageHeader, types.Transactions, *arbos.SequencingHooks, func()) { + ctx, cancel := context.WithCancel(context.Background()) + + builder := NewNodeBuilder(ctx).DefaultConfig(t, false) + builder.isSequencer = true + builderCleanup := builder.Build(t) + + builder.L2Info.GenerateAccount("User") + var latestL2 uint64 + var err error + for i := 0; latestL2 < 3; i++ { + _, _ = builder.L2.TransferBalance(t, "Owner", "User", big.NewInt(1e18), builder.L2Info) + latestL2, err = builder.L2.Client.BlockNumber(ctx) + Require(t, err) + } + + header := &arbostypes.L1IncomingMessageHeader{ + Kind: arbostypes.L1MessageType_L2Message, + Poster: l1pricing.BatchPosterAddress, + BlockNumber: 1, + Timestamp: arbmath.SaturatingUCast[uint64](time.Now().Unix()), + RequestId: nil, + L1BaseFee: nil, + } + + var txes types.Transactions + txes = append(txes, builder.L2Info.PrepareTx("Owner", "User", builder.L2Info.TransferGas, big.NewInt(1e12), []byte{1, 2, 3})) + txes = append(txes, builder.L2Info.PrepareTx("User", "Owner", builder.L2Info.TransferGas, big.NewInt(1e12), nil)) + + hooks := arbos.NoopSequencingHooks() + if isBlockFilter { + hooks.BlockFilter = func(_ *types.Header, _ *state.StateDB, txes types.Transactions, _ types.Receipts) error { + if len(txes[1].Data()) > 0 { + return state.ErrArbTxFilter + } + return nil + } + } else { + hooks.PreTxFilter = func(_ *params.ChainConfig, _ *types.Header, statedb *state.StateDB, _ *arbosState.ArbosState, tx *types.Transaction, _ *arbitrum_types.ConditionalOptions, _ common.Address, _ *arbos.L1Info) error { + if len(tx.Data()) > 0 { + statedb.FilterTx() + } + return nil + } + hooks.PostTxFilter = func(_ *types.Header, statedb *state.StateDB, _ *arbosState.ArbosState, tx *types.Transaction, _ common.Address, _ uint64, _ *core.ExecutionResult) error { + if statedb.IsTxFiltered() { + return state.ErrArbTxFilter + } + return nil + } + } + + cleanup := func() { + builderCleanup() + cancel() + } + + return builder, header, txes, hooks, cleanup +}