diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 95a2b8adfce..a4e49b1c267 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -12,7 +12,7 @@ env: # Deny warnings in CI RUSTFLAGS: "-D warnings" # The Nightly version used for cargo-udeps, might need updating from time to time. - PINNED_NIGHTLY: nightly-2021-06-09 + PINNED_NIGHTLY: nightly-2021-12-01 jobs: target-branch-check: name: target-branch-check @@ -54,6 +54,12 @@ jobs: run: npm install -g ganache-cli - name: Install make run: choco install -y make + - uses: KyleMayes/install-llvm-action@v1 + with: + version: "13.0" + directory: ${{ runner.temp }}/llvm + - name: Set LIBCLANG_PATH + run: echo "LIBCLANG_PATH=$((gcm clang).source -replace "clang.exe")" >> $env:GITHUB_ENV - name: Run tests in release run: make test-release beacon-chain-tests: diff --git a/Cargo.lock b/Cargo.lock index a4dbfc92ece..4fe2b3573fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -150,9 +150,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.45" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee10e43ae4a853c0a3591d4e2ada1719e553be18199d9da9d4a83f5927c2f5c7" +checksum = "38d9ff5d688f1c13395289f67db01d4826b46dd694e7580accdc3e8430f2d98e" [[package]] name = "arbitrary" @@ -378,6 +378,25 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "453c49e5950bb0eb63bb3df640e31618846c89d5b7faa54040d76e98e0134375" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", +] + [[package]] name = "bitflags" version = "1.2.1" @@ -610,6 +629,15 @@ version = "1.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee" +[[package]] +name = "cexpr" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db507a7679252d2276ed0dd8113c6875ec56d3089f9225b2b42c30cc1f8e5c89" +dependencies = [ + "nom 6.1.2", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -663,6 +691,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "clang-sys" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa66045b9cb23c2e9c1520732030608b02ee07e5cfaa5a521ec15ded7fa24c90" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "2.33.3" @@ -1210,9 +1249,9 @@ dependencies = [ [[package]] name = "ed25519" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4620d40f6d2601794401d6dd95a5cf69b6c157852539470eeda433a99b3c0efc" +checksum = "74e1069e39f1454367eb2de793ed062fac4c35c2934b76a81d90dd9abcd28816" dependencies = [ "signature", ] @@ -2382,9 +2421,9 @@ checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" [[package]] name = "httpdate" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] name = "humantime" @@ -2666,6 +2705,12 @@ dependencies = [ "spin", ] +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "lcli" version = "2.0.1" @@ -2748,12 +2793,38 @@ dependencies = [ "rle-decode-fast", ] +[[package]] +name = "libloading" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afe203d669ec979b7128619bae5a63b7b42e9203c1b29146079ee05e2f604b52" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "libm" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a" +[[package]] +name = "libmdbx" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75aa79307892c0000dd0a8169c4db5529d32ca2302587d552870903109b46925" +dependencies = [ + "bitflags", + "byteorder", + "derive_more", + "indexmap", + "libc", + "mdbx-sys", + "parking_lot", + "thiserror", +] + [[package]] name = "libp2p" version = "0.41.0" @@ -3298,28 +3369,6 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" -[[package]] -name = "lmdb" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0908efb5d6496aa977d96f91413da2635a902e5e31dbef0bfb88986c248539" -dependencies = [ - "bitflags", - "libc", - "lmdb-sys", -] - -[[package]] -name = "lmdb-sys" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5b392838cfe8858e86fac37cf97a0e8c55cc60ba0a18365cadc33092f128ce9" -dependencies = [ - "cc", - "libc", - "pkg-config", -] - [[package]] name = "lock_api" version = "0.4.5" @@ -3428,6 +3477,18 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "mdbx-sys" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6fb0496b0bc2274db9ae3ee92cf97bb29bf40e51b96ec1087a6374c4a42a05d" +dependencies = [ + "bindgen", + "cc", + "cmake", + "libc", +] + [[package]] name = "memchr" version = "2.4.1" @@ -3748,6 +3809,18 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf51a729ecf40266a2368ad335a5fdde43471f545a967109cd62146ecf8b66ff" +[[package]] +name = "nom" +version = "6.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2" +dependencies = [ + "bitvec 0.19.5", + "funty", + "memchr", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -4036,6 +4109,12 @@ dependencies = [ "crypto-mac 0.11.1", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "percent-encoding" version = "2.1.0" @@ -4310,7 +4389,7 @@ checksum = "6ab1427f3d2635891f842892dda177883dca0639e05fe66796a62c9d2f23b49c" dependencies = [ "byteorder", "libc", - "nom", + "nom 2.2.1", "rustc_version 0.2.3", ] @@ -5140,9 +5219,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.70" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e277c495ac6cd1a01a58d0a0c574568b4d1ddf14f59965c6a58b8d96400b54f3" +checksum = "063bf466a64011ac24040a49009724ee60a57da1b437617ceb32e53ad61bfb19" dependencies = [ "itoa", "ryu", @@ -5231,6 +5310,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -5284,10 +5369,10 @@ dependencies = [ "filesystem", "flate2", "lazy_static", + "libmdbx", "lighthouse_metrics", - "lmdb", - "lmdb-sys", "logging", + "lru", "maplit", "parking_lot", "rand 0.7.3", diff --git a/Cross.toml b/Cross.toml index 050f2bdbd75..2db39924648 100644 --- a/Cross.toml +++ b/Cross.toml @@ -2,3 +2,14 @@ passthrough = [ "RUSTFLAGS", ] + +# These custom images are required to work around the lack of Clang in the default `cross` images. +# We need Clang to run `bindgen` for MDBX, and the `BINDGEN_EXTRA_CLANG_ARGS` flags must also be set +# while cross-compiling for ARM to prevent bindgen from attempting to include headers from the host. +# +# For more information see https://github.com/rust-embedded/cross/pull/608 +[target.x86_64-unknown-linux-gnu] +image = "michaelsproul/cross-clang:x86_64-latest" + +[target.aarch64-unknown-linux-gnu] +image = "michaelsproul/cross-clang:aarch64-latest" diff --git a/Dockerfile b/Dockerfile index 81aff883457..5ca8cbc964c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ -FROM rust:1.53.0 AS builder -RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake +FROM rust:1.56.1-bullseye AS builder +RUN apt-get update && apt-get -y upgrade && apt-get install -y cmake libclang-dev COPY . lighthouse ARG PORTABLE ENV PORTABLE $PORTABLE diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 9b97e3c7dcf..567e0cdb722 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -830,11 +830,7 @@ fn block_gossip_verification() { fn verify_block_for_gossip_slashing_detection() { let slasher_dir = tempdir().unwrap(); let slasher = Arc::new( - Slasher::open( - SlasherConfig::new(slasher_dir.path().into()).for_testing(), - test_logger(), - ) - .unwrap(), + Slasher::open(SlasherConfig::new(slasher_dir.path().into()), test_logger()).unwrap(), ); let inner_slasher = slasher.clone(); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index afcb125c274..0b2cda91ef4 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -515,12 +515,20 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("slasher-max-db-size") .long("slasher-max-db-size") .help( - "Maximum size of the LMDB database used by the slasher." + "Maximum size of the MDBX database used by the slasher." ) .value_name("GIGABYTES") .requires("slasher") .takes_value(true) ) + .arg( + Arg::with_name("slasher-att-cache-size") + .long("slasher-att-cache-size") + .help("Set the maximum number of attestation roots for the slasher to cache") + .value_name("COUNT") + .requires("slasher") + .takes_value(true) + ) .arg( Arg::with_name("slasher-chunk-size") .long("slasher-chunk-size") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index e9e3e2cd5b7..ce2f65e70b4 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -510,6 +510,12 @@ pub fn get_config( slasher_config.max_db_size_mbs = max_db_size_gbs * 1024; } + if let Some(attestation_cache_size) = + clap_utils::parse_optional(cli_args, "slasher-att-cache-size")? + { + slasher_config.attestation_root_cache_size = attestation_cache_size; + } + if let Some(chunk_size) = clap_utils::parse_optional(cli_args, "slasher-chunk-size")? { slasher_config.chunk_size = chunk_size; } diff --git a/book/src/installation-source.md b/book/src/installation-source.md index 864e647ad7e..4b977f5222f 100644 --- a/book/src/installation-source.md +++ b/book/src/installation-source.md @@ -1,85 +1,107 @@ -# Installation: Build from Source +# Build from Source -Lighthouse builds on Linux, macOS, and Windows (native Windows support in -BETA, we also support Windows via [WSL][]). +Lighthouse builds on Linux, macOS, and Windows. Install the [Dependencies](#dependencies) using +the instructions below, and then proceed to [Building Lighthouse](#build-lighthouse). -Compilation should be easy. In fact, if you already have Rust and the build -dependencies installed, all you need is: +## Dependencies -- `git clone https://github.com/sigp/lighthouse.git` -- `cd lighthouse` -- `git checkout stable` -- `make` +First, **install Rust** using [rustup](https://rustup.rs/). The rustup installer provides an easy way +to update the Rust compiler, and works on all platforms. -If this doesn't work or is not clear enough, see the [Detailed -Instructions](#detailed-instructions) below. If you have further issues, see -[Troubleshooting](#troubleshooting). If you'd prefer to use Docker, see the -[Docker Guide](./docker.md). +With Rust installed, follow the instructions below to install dependencies relevant to your +operating system. -## Updating lighthouse +#### Ubuntu -You can update Lighthouse to a specific version by running the commands below. The `lighthouse` -directory will be the location you cloned Lighthouse to during the installation process. -`${VERSION}` will be the version you wish to build in the format `vX.X.X`. +Install the following packages: -- `cd lighthouse` -- `git fetch` -- `git checkout ${VERSION}` -- `make` +```bash +sudo apt install -y git gcc g++ make cmake pkg-config llvm-dev libclang-dev clang +``` +#### macOS -## Detailed Instructions +1. Install the [Homebrew][] package manager. +1. Install CMake using Homebrew: -1. Install the build dependencies for your platform - - Check the [Dependencies](#dependencies) section for additional - information. -1. Clone the Lighthouse repository. - - Run `$ git clone https://github.com/sigp/lighthouse.git` - - Change into the newly created directory with `$ cd lighthouse` -1. Build Lighthouse with `$ make`. -1. Installation was successful if `$ lighthouse --help` displays the command-line documentation. +``` +brew install cmake +``` -> First time compilation may take several minutes. If you experience any -> failures, please reach out on [discord](https://discord.gg/cyAszAh) or -> [create an issue](https://github.com/sigp/lighthouse/issues/new). +[Homebrew]: https://brew.sh/ +#### Windows -## Dependencies +1. Install [Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git). +1. Install the [Chocolatey](https://chocolatey.org/install) package manager for Windows. +1. Install Make, CMake and LLVM using Chocolatey: -#### Installing Rust +``` +choco install make +``` -The best way to install Rust (regardless of platform) is usually with [rustup](https://rustup.rs/) -- Use the `stable` toolchain (it's the default). +``` +choco install cmake --installargs 'ADD_CMAKE_TO_PATH=System' +``` -#### Windows Support +``` +choco install llvm +``` -These instructions are for compiling or running Lighthouse natively on Windows, which is currently in -BETA testing. Lighthouse can also run successfully under the [Windows Subsystem for Linux (WSL)][WSL]. -If using Ubuntu under WSL, you should follow the instructions for Ubuntu listed in the -[Dependencies (Ubuntu)](#ubuntu) section. +These dependencies are for compiling Lighthouse natively on Windows, which is currently in beta +testing. Lighthouse can also run successfully under the [Windows Subsystem for Linux (WSL)][WSL]. +If using Ubuntu under WSL, you should follow the instructions for Ubuntu listed in the [Dependencies +(Ubuntu)](#ubuntu) section. [WSL]: https://docs.microsoft.com/en-us/windows/wsl/about -1. Install [Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) -1. Install [Chocolatey](https://chocolatey.org/install) Package Manager for Windows - - Install `make` via `choco install make` - - Install `cmake` via `choco install cmake --installargs 'ADD_CMAKE_TO_PATH=System'` +## Build Lighthouse -#### Ubuntu +Once you have Rust and the build dependencies you're ready to build Lighthouse: -Several dependencies may be required to compile Lighthouse. The following -packages may be required in addition a base Ubuntu Server installation: +``` +git clone https://github.com/sigp/lighthouse.git +``` -```bash -sudo apt install -y git gcc g++ make cmake pkg-config +``` +cd lighthouse ``` -#### macOS +``` +git checkout stable +``` + +``` +make +``` + +Compilation may take around 10 minutes. Installation was successful if `lighthouse --help` displays +the command-line documentation. + +If you run into any issues, please check the [Troubleshooting](#troubleshooting) section, or reach +out to us on [Discord](https://discord.gg/cyAszAh). + +## Update Lighthouse + +You can update Lighthouse to a specific version by running the commands below. The `lighthouse` +directory will be the location you cloned Lighthouse to during the installation process. +`${VERSION}` will be the version you wish to build in the format `vX.X.X`. -You will need `cmake`. You can install via homebrew: +``` +cd lighthouse +``` - brew install cmake +``` +git fetch +``` +``` +git checkout ${VERSION} +``` + +``` +make +``` ## Troubleshooting @@ -93,12 +115,12 @@ See ["Configuring the `PATH` environment variable" ### Compilation error -Make sure you are running the latest version of Rust. If you have installed Rust using rustup, simply type `$ rustup update`. +Make sure you are running the latest version of Rust. If you have installed Rust using rustup, simply type `rustup update`. If compilation fails with `(signal: 9, SIGKILL: kill)`, this could mean your machine ran out of memory during compilation. If you are on a resource-constrained device you can -look into [cross compilation](./cross-compiling.md). +look into [cross compilation](./cross-compiling.md), or use a [pre-built +binary](./installation-binaries.md). If compilation fails with `error: linking with cc failed: exit code: 1`, try running `cargo clean`. -[WSL]: https://docs.microsoft.com/en-us/windows/wsl/about diff --git a/book/src/slasher.md b/book/src/slasher.md index 126573c556a..05107238c39 100644 --- a/book/src/slasher.md +++ b/book/src/slasher.md @@ -12,7 +12,6 @@ of the immaturity of the slasher UX and the extra resources required. * Quad-core CPU * 16 GB RAM * 256 GB solid state storage (in addition to space for the beacon node DB) -* ⚠️ **If you are running natively on Windows**: LMDB will pre-allocate the entire 256 GB for the slasher database ## How to Run @@ -66,24 +65,29 @@ changed after initialization. * Argument: maximum size of the database in gigabytes * Default: 256 GB -The slasher uses LMDB as its backing store, and LMDB will consume up to the maximum amount of disk -space allocated to it. By default the limit is set to accomodate the default history length and -around 150K validators but you can set it lower if running with a reduced history length. The space -required scales approximately linearly in validator count and history length, i.e. if you halve -either you can halve the space required. +The slasher uses MDBX as its backing store, which places a hard limit on the size of the database +file. You can use the `--slasher-max-db-size` flag to set this limit. It can be adjusted after +initialization if the limit is reached. -If you want a better estimate you can use this formula: +By default the limit is set to accomodate the default history length and around 300K validators but +you can set it lower if running with a reduced history length. The space required scales +approximately linearly in validator count and history length, i.e. if you halve either you can halve +the space required. + +If you want an estimate of the database size you can use this formula: ``` -360 * V * N + (16 * V * N)/(C * K) + 15000 * N +4.56 GB * (N / 256) * (V / 250000) ``` -where +where `V` is the validator count and `N` is the history length. + +You should set the maximum size higher than the estimate to allow room for growth in the validator +count. -* `V` is the validator count -* `N` is the history length -* `C` is the chunk size -* `K` is the validator chunk size +> NOTE: In Lighthouse v2.1.0 the slasher database was switched from LMDB to MDBX. Unlike LMDB, MDBX +> does garbage collection of free pages and is capable of shrinking the database file and preventing +> it from growing indefinitely. ### Update Period @@ -138,6 +142,19 @@ about [how the slasher works][design-notes], and/or reading the source code. [design-notes]: https://hackmd.io/@sproul/min-max-slasher +### Attestation Root Cache Size + +* Flag: `--slasher-att-cache-size COUNT` +* Argument: number of attestations +* Default: 100,000 + +The number of attestation data roots to cache in memory. The cache is an LRU cache used to map +indexed attestation IDs to the tree hash roots of their attestation data. The cache prevents reading +whole indexed attestations from disk to determine whether they are slashable. + +Each value is very small (38 bytes) so the entire cache should fit in around 4 MB of RAM. Decreasing +the cache size is not recommended, and the size is set so as to be large enough for future growth. + ### Short-Range Example If you would like to run a lightweight slasher that just checks blocks and attestations within diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index b8dd31beb5f..73d5a20657d 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -670,7 +670,6 @@ fn no_reconstruct_historic_states_flag() { fn slasher_flag() { CommandLineTest::new() .flag("slasher", None) - .flag("slasher-max-db-size", Some("16")) .run_with_zero_port() .with_config_and_dir(|config, dir| { if let Some(slasher_config) = &config.slasher { @@ -689,7 +688,6 @@ fn slasher_dir_flag() { CommandLineTest::new() .flag("slasher", None) .flag("slasher-dir", dir.path().as_os_str().to_str()) - .flag("slasher-max-db-size", Some("16")) .run_with_zero_port() .with_config(|config| { if let Some(slasher_config) = &config.slasher { @@ -703,7 +701,6 @@ fn slasher_dir_flag() { fn slasher_update_period_flag() { CommandLineTest::new() .flag("slasher", None) - .flag("slasher-max-db-size", Some("16")) .flag("slasher-update-period", Some("100")) .run_with_zero_port() .with_config(|config| { @@ -715,21 +712,21 @@ fn slasher_update_period_flag() { }); } #[test] -fn slasher_slot_offset() { - // TODO: check that the offset is actually stored, once the config is un-hacked - // See: https://github.com/sigp/lighthouse/pull/2767#discussion_r741610402 +fn slasher_slot_offset_flag() { CommandLineTest::new() .flag("slasher", None) - .flag("slasher-max-db-size", Some("16")) .flag("slasher-slot-offset", Some("11.25")) - .run(); + .run() + .with_config(|config| { + let slasher_config = config.slasher.as_ref().unwrap(); + assert_eq!(slasher_config.slot_offset, 11.25); + }); } #[test] #[should_panic] -fn slasher_slot_offset_nan() { +fn slasher_slot_offset_nan_flag() { CommandLineTest::new() .flag("slasher", None) - .flag("slasher-max-db-size", Some("16")) .flag("slasher-slot-offset", Some("NaN")) .run(); } @@ -737,7 +734,6 @@ fn slasher_slot_offset_nan() { fn slasher_history_length_flag() { CommandLineTest::new() .flag("slasher", None) - .flag("slasher-max-db-size", Some("16")) .flag("slasher-history-length", Some("2048")) .run_with_zero_port() .with_config(|config| { @@ -763,11 +759,24 @@ fn slasher_max_db_size_flag() { }); } #[test] +fn slasher_attestation_cache_size_flag() { + CommandLineTest::new() + .flag("slasher", None) + .flag("slasher-att-cache-size", Some("10000")) + .run() + .with_config(|config| { + let slasher_config = config + .slasher + .as_ref() + .expect("Unable to parse Slasher config"); + assert_eq!(slasher_config.attestation_root_cache_size, 10000); + }); +} +#[test] fn slasher_chunk_size_flag() { CommandLineTest::new() .flag("slasher", None) .flag("slasher-chunk-size", Some("32")) - .flag("slasher-max-db-size", Some("16")) .run_with_zero_port() .with_config(|config| { let slasher_config = config @@ -781,7 +790,6 @@ fn slasher_chunk_size_flag() { fn slasher_validator_chunk_size_flag() { CommandLineTest::new() .flag("slasher", None) - .flag("slasher-max-db-size", Some("16")) .flag("slasher-validator-chunk-size", Some("512")) .run_with_zero_port() .with_config(|config| { @@ -797,7 +805,6 @@ fn slasher_broadcast_flag() { CommandLineTest::new() .flag("slasher", None) .flag("slasher-broadcast", None) - .flag("slasher-max-db-size", Some("16")) .run_with_zero_port() .with_config(|config| { let slasher_config = config diff --git a/slasher/Cargo.toml b/slasher/Cargo.toml index 7fd51ff920d..01beda7e9c0 100644 --- a/slasher/Cargo.toml +++ b/slasher/Cargo.toml @@ -13,8 +13,8 @@ flate2 = { version = "1.0.14", features = ["zlib"], default-features = false } lazy_static = "1.4.0" lighthouse_metrics = { path = "../common/lighthouse_metrics" } filesystem = { path = "../common/filesystem" } -lmdb = "0.8" -lmdb-sys = "0.8" +mdbx = { package = "libmdbx", version = "0.1.0" } +lru = "0.6.6" parking_lot = "0.11.0" rand = "0.7.3" safe_arith = { path = "../consensus/safe_arith" } diff --git a/slasher/service/src/service.rs b/slasher/service/src/service.rs index 510ed6cd98c..88feff0bbc4 100644 --- a/slasher/service/src/service.rs +++ b/slasher/service/src/service.rs @@ -128,7 +128,7 @@ impl SlasherService { log, "Error during scheduled slasher processing"; "epoch" => current_epoch, - "error" => format!("{:?}", e) + "error" => ?e, ); None } @@ -136,13 +136,13 @@ impl SlasherService { drop(batch_timer); // Prune the database, even in the case where batch processing failed. - // If the LMDB database is full then pruning could help to free it up. + // If the database is full then pruning could help to free it up. if let Err(e) = slasher.prune_database(current_epoch) { error!( log, "Error during slasher database pruning"; "epoch" => current_epoch, - "error" => format!("{:?}", e), + "error" => ?e, ); continue; }; diff --git a/slasher/src/array.rs b/slasher/src/array.rs index 545c0b7e6f9..d9f1fab819c 100644 --- a/slasher/src/array.rs +++ b/slasher/src/array.rs @@ -1,8 +1,9 @@ use crate::metrics::{self, SLASHER_COMPRESSION_RATIO, SLASHER_NUM_CHUNKS_UPDATED}; +use crate::RwTransaction; use crate::{AttesterSlashingStatus, Config, Error, IndexedAttesterRecord, SlasherDB}; use flate2::bufread::{ZlibDecoder, ZlibEncoder}; -use lmdb::{RwTransaction, Transaction}; use serde_derive::{Deserialize, Serialize}; +use std::borrow::{Borrow, Cow}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::convert::TryFrom; use std::io::Read; @@ -146,7 +147,10 @@ pub trait TargetArrayChunk: Sized + serde::Serialize + serde::de::DeserializeOwn fn next_start_epoch(start_epoch: Epoch, config: &Config) -> Epoch; - fn select_db(db: &SlasherDB) -> lmdb::Database; + fn select_db<'txn, E: EthSpec>( + db: &SlasherDB, + txn: &'txn RwTransaction<'txn>, + ) -> Result, Error>; fn load( db: &SlasherDB, @@ -156,13 +160,13 @@ pub trait TargetArrayChunk: Sized + serde::Serialize + serde::de::DeserializeOwn config: &Config, ) -> Result, Error> { let disk_key = config.disk_key(validator_chunk_index, chunk_index); - let chunk_bytes = match txn.get(Self::select_db(db), &disk_key.to_be_bytes()) { - Ok(chunk_bytes) => chunk_bytes, - Err(lmdb::Error::NotFound) => return Ok(None), - Err(e) => return Err(e.into()), - }; + let chunk_bytes: Cow<[u8]> = + match txn.get(&Self::select_db(db, txn)?, &disk_key.to_be_bytes())? { + Some(chunk_bytes) => chunk_bytes, + None => return Ok(None), + }; - let chunk = bincode::deserialize_from(ZlibDecoder::new(chunk_bytes))?; + let chunk = bincode::deserialize_from(ZlibDecoder::new(chunk_bytes.borrow()))?; Ok(Some(chunk)) } @@ -185,7 +189,7 @@ pub trait TargetArrayChunk: Sized + serde::Serialize + serde::de::DeserializeOwn metrics::set_float_gauge(&SLASHER_COMPRESSION_RATIO, compression_ratio); txn.put( - Self::select_db(db), + &Self::select_db(db, txn)?, &disk_key.to_be_bytes(), &compressed_value, SlasherDB::::write_flags(), @@ -292,8 +296,11 @@ impl TargetArrayChunk for MinTargetChunk { start_epoch / chunk_size * chunk_size - 1 } - fn select_db(db: &SlasherDB) -> lmdb::Database { - db.min_targets_db + fn select_db<'txn, E: EthSpec>( + db: &SlasherDB, + txn: &'txn RwTransaction<'txn>, + ) -> Result, Error> { + db.min_targets_db(txn) } } @@ -391,8 +398,11 @@ impl TargetArrayChunk for MaxTargetChunk { (start_epoch / chunk_size + 1) * chunk_size } - fn select_db(db: &SlasherDB) -> lmdb::Database { - db.max_targets_db + fn select_db<'txn, E: EthSpec>( + db: &SlasherDB, + txn: &'txn RwTransaction<'txn>, + ) -> Result, Error> { + db.max_targets_db(txn) } } diff --git a/slasher/src/attester_record.rs b/slasher/src/attester_record.rs index 310118e1ae9..498e8d49f07 100644 --- a/slasher/src/attester_record.rs +++ b/slasher/src/attester_record.rs @@ -1,17 +1,53 @@ +use crate::{database::IndexedAttestationId, Error}; use ssz_derive::{Decode, Encode}; -use std::sync::Arc; +use std::borrow::Cow; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; use tree_hash::TreeHash as _; use tree_hash_derive::TreeHash; use types::{AggregateSignature, EthSpec, Hash256, IndexedAttestation, VariableList}; -#[derive(Debug, Clone, Copy, Encode, Decode)] +#[derive(Debug, Clone, Copy)] pub struct AttesterRecord { - /// The hash of the attestation data, for checking double-voting. + /// The hash of the attestation data, for de-duplication. pub attestation_data_hash: Hash256, /// The hash of the indexed attestation, so it can be loaded. pub indexed_attestation_hash: Hash256, } +#[derive(Debug, Clone, Copy)] +pub struct CompactAttesterRecord { + /// The ID of the `IndexedAttestation` signed by this validator. + pub indexed_attestation_id: IndexedAttestationId, +} + +impl CompactAttesterRecord { + pub fn new(indexed_attestation_id: IndexedAttestationId) -> Self { + Self { + indexed_attestation_id, + } + } + + pub fn null() -> Self { + Self::new(IndexedAttestationId::null()) + } + + pub fn parse(bytes: Cow<[u8]>) -> Result { + let id = IndexedAttestationId::parse(bytes)?; + Ok(Self::new(IndexedAttestationId::new(id))) + } + + pub fn is_null(&self) -> bool { + self.indexed_attestation_id.is_null() + } + + pub fn as_bytes(&self) -> &[u8] { + self.indexed_attestation_id.as_ref() + } +} + /// Bundling of an `IndexedAttestation` with an `AttesterRecord`. /// /// This struct gets `Arc`d and passed around between each stage of queueing and processing. @@ -19,11 +55,26 @@ pub struct AttesterRecord { pub struct IndexedAttesterRecord { pub indexed: IndexedAttestation, pub record: AttesterRecord, + pub indexed_attestation_id: AtomicU64, } impl IndexedAttesterRecord { pub fn new(indexed: IndexedAttestation, record: AttesterRecord) -> Arc { - Arc::new(IndexedAttesterRecord { indexed, record }) + Arc::new(IndexedAttesterRecord { + indexed, + record, + indexed_attestation_id: AtomicU64::new(0), + }) + } + + pub fn set_id(&self, id: u64) { + self.indexed_attestation_id + .compare_exchange(0, id, Ordering::Relaxed, Ordering::Relaxed) + .expect("IDs should only be initialized once"); + } + + pub fn get_id(&self) -> u64 { + self.indexed_attestation_id.load(Ordering::Relaxed) } } diff --git a/slasher/src/config.rs b/slasher/src/config.rs index f8fcc1c02bf..81aa4b597db 100644 --- a/slasher/src/config.rs +++ b/slasher/src/config.rs @@ -9,14 +9,11 @@ pub const DEFAULT_HISTORY_LENGTH: usize = 4096; pub const DEFAULT_UPDATE_PERIOD: u64 = 12; pub const DEFAULT_SLOT_OFFSET: f64 = 10.5; pub const DEFAULT_MAX_DB_SIZE: usize = 256 * 1024; // 256 GiB +pub const DEFAULT_ATTESTATION_ROOT_CACHE_SIZE: usize = 100_000; pub const DEFAULT_BROADCAST: bool = false; -/// Database size to use for tests. -/// -/// Mostly a workaround for Windows due to a bug in LMDB, see: -/// -/// https://github.com/sigp/lighthouse/issues/2342 -pub const TESTING_MAX_DB_SIZE: usize = 16; // MiB +pub const MAX_HISTORY_LENGTH: usize = 1 << 16; +pub const MDBX_GROWTH_STEP: isize = 256 * (1 << 20); // 256 MiB #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { @@ -28,16 +25,21 @@ pub struct Config { /// Update frequency in seconds. pub update_period: u64, /// Offset from the start of the slot to begin processing. - #[serde(skip, default = "default_slot_offset")] pub slot_offset: f64, - /// Maximum size of the LMDB database in megabytes. + /// Maximum size of the database in megabytes. pub max_db_size_mbs: usize, + /// Maximum size of the in-memory cache for attestation roots. + pub attestation_root_cache_size: usize, /// Whether to broadcast slashings found to the network. pub broadcast: bool, } -fn default_slot_offset() -> f64 { - DEFAULT_SLOT_OFFSET +/// Immutable configuration parameters which are stored on disk and checked for consistency. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DiskConfig { + pub chunk_size: usize, + pub validator_chunk_size: usize, + pub history_length: usize, } impl Config { @@ -50,16 +52,11 @@ impl Config { update_period: DEFAULT_UPDATE_PERIOD, slot_offset: DEFAULT_SLOT_OFFSET, max_db_size_mbs: DEFAULT_MAX_DB_SIZE, + attestation_root_cache_size: DEFAULT_ATTESTATION_ROOT_CACHE_SIZE, broadcast: DEFAULT_BROADCAST, } } - /// Use a smaller max DB size for testing. - pub fn for_testing(mut self) -> Self { - self.max_db_size_mbs = TESTING_MAX_DB_SIZE; - self - } - pub fn validate(&self) -> Result<(), Error> { if self.chunk_size == 0 || self.validator_chunk_size == 0 @@ -74,15 +71,22 @@ impl Config { chunk_size: self.chunk_size, history_length: self.history_length, }) + } else if self.history_length > MAX_HISTORY_LENGTH { + Err(Error::ConfigInvalidHistoryLength { + history_length: self.history_length, + max_history_length: MAX_HISTORY_LENGTH, + }) } else { Ok(()) } } - pub fn is_compatible(&self, other: &Config) -> bool { - self.chunk_size == other.chunk_size - && self.validator_chunk_size == other.validator_chunk_size - && self.history_length == other.history_length + pub fn disk_config(&self) -> DiskConfig { + DiskConfig { + chunk_size: self.chunk_size, + validator_chunk_size: self.validator_chunk_size, + history_length: self.history_length, + } } pub fn chunk_index(&self, epoch: Epoch) -> usize { diff --git a/slasher/src/database.rs b/slasher/src/database.rs index 7576d18483d..653eccfa722 100644 --- a/slasher/src/database.rs +++ b/slasher/src/database.rs @@ -1,26 +1,41 @@ +use crate::config::MDBX_GROWTH_STEP; use crate::{ - utils::{TxnMapFull, TxnOptional}, - AttesterRecord, AttesterSlashingStatus, Config, Error, ProposerSlashingStatus, + metrics, utils::TxnMapFull, AttesterRecord, AttesterSlashingStatus, CompactAttesterRecord, + Config, Environment, Error, ProposerSlashingStatus, RwTransaction, }; use byteorder::{BigEndian, ByteOrder}; -use lmdb::{Cursor, Database, DatabaseFlags, Environment, RwTransaction, Transaction, WriteFlags}; -use serde::Deserialize; +use lru::LruCache; +use mdbx::{Database, DatabaseFlags, Geometry, WriteFlags}; +use parking_lot::Mutex; +use serde::de::DeserializeOwned; +use slog::{info, Logger}; use ssz::{Decode, Encode}; +use std::borrow::{Borrow, Cow}; use std::marker::PhantomData; +use std::ops::Range; +use std::path::Path; use std::sync::Arc; +use tree_hash::TreeHash; use types::{ Epoch, EthSpec, Hash256, IndexedAttestation, ProposerSlashing, SignedBeaconBlockHeader, Slot, }; /// Current database schema version, to check compatibility of on-disk DB with software. -pub const CURRENT_SCHEMA_VERSION: u64 = 2; +pub const CURRENT_SCHEMA_VERSION: u64 = 3; /// Metadata about the slashing database itself. const METADATA_DB: &str = "metadata"; -/// Map from `(target_epoch, validator_index)` to `AttesterRecord`. +/// Map from `(target_epoch, validator_index)` to `CompactAttesterRecord`. const ATTESTERS_DB: &str = "attesters"; -/// Map from `(target_epoch, indexed_attestation_hash)` to `IndexedAttestation`. +/// Companion database for the attesters DB mapping `validator_index` to largest `target_epoch` +/// stored for that validator in the attesters DB. +/// +/// Used to implement wrap-around semantics for target epochs modulo the history length. +const ATTESTERS_MAX_TARGETS_DB: &str = "attesters_max_targets"; +/// Map from `indexed_attestation_id` to `IndexedAttestation`. const INDEXED_ATTESTATION_DB: &str = "indexed_attestations"; +/// Map from `(target_epoch, indexed_attestation_hash)` to `indexed_attestation_id`. +const INDEXED_ATTESTATION_ID_DB: &str = "indexed_attestation_ids"; /// Table of minimum targets for every source epoch within range. const MIN_TARGETS_DB: &str = "min_targets"; /// Table of maximum targets for every source epoch within range. @@ -32,31 +47,31 @@ const CURRENT_EPOCHS_DB: &str = "current_epochs"; /// Map from `(slot, validator_index)` to `SignedBeaconBlockHeader`. const PROPOSERS_DB: &str = "proposers"; -/// The number of DBs for LMDB to use (equal to the number of DBs defined above). -const LMDB_MAX_DBS: u32 = 7; +/// The number of DBs for MDBX to use (equal to the number of DBs defined above). +const MAX_NUM_DBS: usize = 9; + +/// Filename for the legacy (LMDB) database file, so that it may be deleted. +const LEGACY_DB_FILENAME: &str = "data.mdb"; +const LEGACY_DB_LOCK_FILENAME: &str = "lock.mdb"; /// Constant key under which the schema version is stored in the `metadata_db`. const METADATA_VERSION_KEY: &[u8] = &[0]; /// Constant key under which the slasher configuration is stored in the `metadata_db`. const METADATA_CONFIG_KEY: &[u8] = &[1]; -const ATTESTER_KEY_SIZE: usize = 16; +const ATTESTER_KEY_SIZE: usize = 7; const PROPOSER_KEY_SIZE: usize = 16; const CURRENT_EPOCH_KEY_SIZE: usize = 8; -const INDEXED_ATTESTATION_KEY_SIZE: usize = 40; +const INDEXED_ATTESTATION_ID_SIZE: usize = 6; +const INDEXED_ATTESTATION_ID_KEY_SIZE: usize = 40; const MEGABYTE: usize = 1 << 20; #[derive(Debug)] pub struct SlasherDB { pub(crate) env: Environment, - pub(crate) indexed_attestation_db: Database, - pub(crate) attesters_db: Database, - pub(crate) min_targets_db: Database, - pub(crate) max_targets_db: Database, - pub(crate) current_epochs_db: Database, - pub(crate) proposers_db: Database, - pub(crate) metadata_db: Database, - config: Arc, + /// LRU cache mapping indexed attestation IDs to their attestation data roots. + attestation_root_cache: Mutex>, + pub(crate) config: Arc, _phantom: PhantomData, } @@ -64,27 +79,27 @@ pub struct SlasherDB { /// /// Stored as big-endian `(target_epoch, validator_index)` to enable efficient iteration /// while pruning. +/// +/// The target epoch is stored in 2 bytes modulo the `history_length`. +/// +/// The validator index is stored in 5 bytes (validator registry limit is 2^40). #[derive(Debug)] pub struct AttesterKey { data: [u8; ATTESTER_KEY_SIZE], } impl AttesterKey { - pub fn new(validator_index: u64, target_epoch: Epoch) -> Self { + pub fn new(validator_index: u64, target_epoch: Epoch, config: &Config) -> Self { let mut data = [0; ATTESTER_KEY_SIZE]; - data[0..8].copy_from_slice(&target_epoch.as_u64().to_be_bytes()); - data[8..ATTESTER_KEY_SIZE].copy_from_slice(&validator_index.to_be_bytes()); - AttesterKey { data } - } - pub fn parse(data: &[u8]) -> Result<(Epoch, u64), Error> { - if data.len() == ATTESTER_KEY_SIZE { - let target_epoch = Epoch::new(BigEndian::read_u64(&data[..8])); - let validator_index = BigEndian::read_u64(&data[8..]); - Ok((target_epoch, validator_index)) - } else { - Err(Error::AttesterKeyCorrupt { length: data.len() }) - } + BigEndian::write_uint( + &mut data[..2], + target_epoch.as_u64() % config.history_length as u64, + 2, + ); + BigEndian::write_uint(&mut data[2..], validator_index, 5); + + AttesterKey { data } } } @@ -111,7 +126,7 @@ impl ProposerKey { ProposerKey { data } } - pub fn parse(data: &[u8]) -> Result<(Slot, u64), Error> { + pub fn parse(data: Cow<[u8]>) -> Result<(Slot, u64), Error> { if data.len() == PROPOSER_KEY_SIZE { let slot = Slot::new(BigEndian::read_u64(&data[..8])); let validator_index = BigEndian::read_u64(&data[8..]); @@ -148,93 +163,213 @@ impl AsRef<[u8]> for CurrentEpochKey { } /// Key containing an epoch and an indexed attestation hash. -pub struct IndexedAttestationKey { - target_and_root: [u8; INDEXED_ATTESTATION_KEY_SIZE], +pub struct IndexedAttestationIdKey { + target_and_root: [u8; INDEXED_ATTESTATION_ID_KEY_SIZE], } -impl IndexedAttestationKey { +impl IndexedAttestationIdKey { pub fn new(target_epoch: Epoch, indexed_attestation_root: Hash256) -> Self { - let mut data = [0; INDEXED_ATTESTATION_KEY_SIZE]; + let mut data = [0; INDEXED_ATTESTATION_ID_KEY_SIZE]; data[0..8].copy_from_slice(&target_epoch.as_u64().to_be_bytes()); - data[8..INDEXED_ATTESTATION_KEY_SIZE].copy_from_slice(indexed_attestation_root.as_bytes()); + data[8..INDEXED_ATTESTATION_ID_KEY_SIZE] + .copy_from_slice(indexed_attestation_root.as_bytes()); Self { target_and_root: data, } } - pub fn parse(data: &[u8]) -> Result<(Epoch, Hash256), Error> { - if data.len() == INDEXED_ATTESTATION_KEY_SIZE { + pub fn parse(data: Cow<[u8]>) -> Result<(Epoch, Hash256), Error> { + if data.len() == INDEXED_ATTESTATION_ID_KEY_SIZE { let target_epoch = Epoch::new(BigEndian::read_u64(&data[..8])); let indexed_attestation_root = Hash256::from_slice(&data[8..]); Ok((target_epoch, indexed_attestation_root)) } else { - Err(Error::IndexedAttestationKeyCorrupt { length: data.len() }) + Err(Error::IndexedAttestationIdKeyCorrupt { length: data.len() }) } } } -impl AsRef<[u8]> for IndexedAttestationKey { +impl AsRef<[u8]> for IndexedAttestationIdKey { fn as_ref(&self) -> &[u8] { &self.target_and_root } } +/// Key containing a 6-byte indexed attestation ID. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct IndexedAttestationId { + id: [u8; INDEXED_ATTESTATION_ID_SIZE], +} + +impl IndexedAttestationId { + pub fn new(id: u64) -> Self { + let mut data = [0; INDEXED_ATTESTATION_ID_SIZE]; + BigEndian::write_uint(&mut data, id, INDEXED_ATTESTATION_ID_SIZE); + Self { id: data } + } + + pub fn parse(data: Cow<[u8]>) -> Result { + if data.len() == INDEXED_ATTESTATION_ID_SIZE { + Ok(BigEndian::read_uint( + data.borrow(), + INDEXED_ATTESTATION_ID_SIZE, + )) + } else { + Err(Error::IndexedAttestationIdCorrupt { length: data.len() }) + } + } + + pub fn null() -> Self { + Self::new(0) + } + + pub fn is_null(&self) -> bool { + self.id == [0, 0, 0, 0, 0, 0] + } + + pub fn as_u64(&self) -> u64 { + BigEndian::read_uint(&self.id, INDEXED_ATTESTATION_ID_SIZE) + } +} + +impl AsRef<[u8]> for IndexedAttestationId { + fn as_ref(&self) -> &[u8] { + &self.id + } +} + +/// Bincode deserialization specialised to `Cow<[u8]>`. +fn bincode_deserialize(bytes: Cow<[u8]>) -> Result { + Ok(bincode::deserialize(bytes.borrow())?) +} + +fn ssz_decode(bytes: Cow<[u8]>) -> Result { + Ok(T::from_ssz_bytes(bytes.borrow())?) +} + impl SlasherDB { - pub fn open(config: Arc) -> Result { + pub fn open(config: Arc, log: Logger) -> Result { + // Delete any legacy LMDB database. + Self::delete_legacy_file(&config.database_path, LEGACY_DB_FILENAME, &log)?; + Self::delete_legacy_file(&config.database_path, LEGACY_DB_LOCK_FILENAME, &log)?; + std::fs::create_dir_all(&config.database_path)?; + let env = Environment::new() - .set_max_dbs(LMDB_MAX_DBS) - .set_map_size(config.max_db_size_mbs * MEGABYTE) + .set_max_dbs(MAX_NUM_DBS) + .set_geometry(Self::geometry(&config)) .open_with_permissions(&config.database_path, 0o600)?; - let indexed_attestation_db = - env.create_db(Some(INDEXED_ATTESTATION_DB), Self::db_flags())?; - let attesters_db = env.create_db(Some(ATTESTERS_DB), Self::db_flags())?; - let min_targets_db = env.create_db(Some(MIN_TARGETS_DB), Self::db_flags())?; - let max_targets_db = env.create_db(Some(MAX_TARGETS_DB), Self::db_flags())?; - let current_epochs_db = env.create_db(Some(CURRENT_EPOCHS_DB), Self::db_flags())?; - let proposers_db = env.create_db(Some(PROPOSERS_DB), Self::db_flags())?; - let metadata_db = env.create_db(Some(METADATA_DB), Self::db_flags())?; + + let txn = env.begin_rw_txn()?; + txn.create_db(Some(INDEXED_ATTESTATION_DB), Self::db_flags())?; + txn.create_db(Some(INDEXED_ATTESTATION_ID_DB), Self::db_flags())?; + txn.create_db(Some(ATTESTERS_DB), Self::db_flags())?; + txn.create_db(Some(ATTESTERS_MAX_TARGETS_DB), Self::db_flags())?; + txn.create_db(Some(MIN_TARGETS_DB), Self::db_flags())?; + txn.create_db(Some(MAX_TARGETS_DB), Self::db_flags())?; + txn.create_db(Some(CURRENT_EPOCHS_DB), Self::db_flags())?; + txn.create_db(Some(PROPOSERS_DB), Self::db_flags())?; + txn.create_db(Some(METADATA_DB), Self::db_flags())?; + txn.commit()?; #[cfg(windows)] { use filesystem::restrict_file_permissions; - let data = config.database_path.join("data.mdb"); - let lock = config.database_path.join("lock.mdb"); + let data = config.database_path.join("mdbx.dat"); + let lock = config.database_path.join("mdbx.lck"); restrict_file_permissions(data).map_err(Error::DatabasePermissionsError)?; restrict_file_permissions(lock).map_err(Error::DatabasePermissionsError)?; } - let db = Self { + let attestation_root_cache = Mutex::new(LruCache::new(config.attestation_root_cache_size)); + + let mut db = Self { env, - indexed_attestation_db, - attesters_db, - min_targets_db, - max_targets_db, - current_epochs_db, - proposers_db, - metadata_db, + attestation_root_cache, config, _phantom: PhantomData, }; - let mut txn = db.begin_rw_txn()?; - - db.migrate(&mut txn)?; + db = db.migrate()?; + let mut txn = db.begin_rw_txn()?; if let Some(on_disk_config) = db.load_config(&mut txn)? { - if !db.config.is_compatible(&on_disk_config) { + let current_disk_config = db.config.disk_config(); + if current_disk_config != on_disk_config { return Err(Error::ConfigIncompatible { on_disk_config, - config: (*db.config).clone(), + config: current_disk_config, }); } } - db.store_config(&db.config, &mut txn)?; txn.commit()?; Ok(db) } + fn delete_legacy_file(slasher_dir: &Path, filename: &str, log: &Logger) -> Result<(), Error> { + let path = slasher_dir.join(filename); + + if path.is_file() { + info!( + log, + "Deleting legacy slasher DB"; + "file" => ?path.display(), + ); + std::fs::remove_file(&path)?; + } + Ok(()) + } + + fn open_db<'a>(&self, txn: &'a RwTransaction<'a>, name: &str) -> Result, Error> { + Ok(txn.open_db(Some(name))?) + } + + pub fn indexed_attestation_db<'a>( + &self, + txn: &'a RwTransaction<'a>, + ) -> Result, Error> { + self.open_db(txn, INDEXED_ATTESTATION_DB) + } + + pub fn indexed_attestation_id_db<'a>( + &self, + txn: &'a RwTransaction<'a>, + ) -> Result, Error> { + self.open_db(txn, INDEXED_ATTESTATION_ID_DB) + } + + pub fn attesters_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { + self.open_db(txn, ATTESTERS_DB) + } + + pub fn attesters_max_targets_db<'a>( + &self, + txn: &'a RwTransaction<'a>, + ) -> Result, Error> { + self.open_db(txn, ATTESTERS_MAX_TARGETS_DB) + } + + pub fn min_targets_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { + self.open_db(txn, MIN_TARGETS_DB) + } + + pub fn max_targets_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { + self.open_db(txn, MAX_TARGETS_DB) + } + + pub fn current_epochs_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { + self.open_db(txn, CURRENT_EPOCHS_DB) + } + + pub fn proposers_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { + self.open_db(txn, PROPOSERS_DB) + } + + pub fn metadata_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { + self.open_db(txn, METADATA_DB) + } + pub fn db_flags() -> DatabaseFlags { DatabaseFlags::default() } @@ -247,17 +382,24 @@ impl SlasherDB { Ok(self.env.begin_rw_txn()?) } + pub fn geometry(config: &Config) -> Geometry> { + Geometry { + size: Some(0..config.max_db_size_mbs * MEGABYTE), + growth_step: Some(MDBX_GROWTH_STEP), + shrink_threshold: None, + page_size: None, + } + } + pub fn load_schema_version(&self, txn: &mut RwTransaction<'_>) -> Result, Error> { - Ok(txn - .get(self.metadata_db, &METADATA_VERSION_KEY) - .optional()? - .map(bincode::deserialize) - .transpose()?) + txn.get(&self.metadata_db(txn)?, METADATA_VERSION_KEY)? + .map(bincode_deserialize) + .transpose() } pub fn store_schema_version(&self, txn: &mut RwTransaction<'_>) -> Result<(), Error> { txn.put( - self.metadata_db, + &self.metadata_db(txn)?, &METADATA_VERSION_KEY, &bincode::serialize(&CURRENT_SCHEMA_VERSION)?, Self::write_flags(), @@ -269,20 +411,18 @@ impl SlasherDB { /// /// This is generic in order to allow loading of configs for different schema versions. /// Care should be taken to ensure it is only called for `Config`-like `T`. - pub fn load_config<'a, T: Deserialize<'a>>( + pub fn load_config( &self, - txn: &'a mut RwTransaction<'_>, + txn: &mut RwTransaction<'_>, ) -> Result, Error> { - Ok(txn - .get(self.metadata_db, &METADATA_CONFIG_KEY) - .optional()? - .map(bincode::deserialize) - .transpose()?) + txn.get(&self.metadata_db(txn)?, METADATA_CONFIG_KEY)? + .map(bincode_deserialize) + .transpose() } pub fn store_config(&self, config: &Config, txn: &mut RwTransaction<'_>) -> Result<(), Error> { txn.put( - self.metadata_db, + &self.metadata_db(txn)?, &METADATA_CONFIG_KEY, &bincode::serialize(config)?, Self::write_flags(), @@ -290,19 +430,70 @@ impl SlasherDB { Ok(()) } + pub fn get_attester_max_target( + &self, + validator_index: u64, + txn: &mut RwTransaction<'_>, + ) -> Result, Error> { + txn.get( + &self.attesters_max_targets_db(txn)?, + CurrentEpochKey::new(validator_index).as_ref(), + )? + .map(ssz_decode) + .transpose() + } + + pub fn update_attester_max_target( + &self, + validator_index: u64, + previous_max_target: Option, + max_target: Epoch, + txn: &mut RwTransaction<'_>, + ) -> Result<(), Error> { + // Don't update maximum if new target is less than or equal to previous. In the case of + // no previous we *do* want to update. + if previous_max_target.map_or(false, |prev_max| max_target <= prev_max) { + return Ok(()); + } + + // Zero out attester DB entries which are now older than the history length. + // Avoid writing the whole array on initialization (`previous_max_target == None`), and + // avoid overwriting the entire attesters array more than once. + if let Some(previous_max_target) = previous_max_target { + let start_epoch = std::cmp::max( + previous_max_target.as_u64() + 1, + (max_target.as_u64() + 1).saturating_sub(self.config.history_length as u64), + ); + for target_epoch in (start_epoch..max_target.as_u64()).map(Epoch::new) { + txn.put( + &self.attesters_db(txn)?, + &AttesterKey::new(validator_index, target_epoch, &self.config), + &CompactAttesterRecord::null().as_bytes(), + Self::write_flags(), + )?; + } + } + + txn.put( + &self.attesters_max_targets_db(txn)?, + &CurrentEpochKey::new(validator_index), + &max_target.as_ssz_bytes(), + Self::write_flags(), + )?; + Ok(()) + } + pub fn get_current_epoch_for_validator( &self, validator_index: u64, txn: &mut RwTransaction<'_>, ) -> Result, Error> { - Ok(txn - .get( - self.current_epochs_db, - &CurrentEpochKey::new(validator_index), - ) - .optional()? - .map(Epoch::from_ssz_bytes) - .transpose()?) + txn.get( + &self.current_epochs_db(txn)?, + CurrentEpochKey::new(validator_index).as_ref(), + )? + .map(ssz_decode) + .transpose() } pub fn update_current_epoch_for_validator( @@ -312,7 +503,7 @@ impl SlasherDB { txn: &mut RwTransaction<'_>, ) -> Result<(), Error> { txn.put( - self.current_epochs_db, + &self.current_epochs_db(txn)?, &CurrentEpochKey::new(validator_index), ¤t_epoch.as_ssz_bytes(), Self::write_flags(), @@ -320,41 +511,128 @@ impl SlasherDB { Ok(()) } + fn get_indexed_attestation_id( + &self, + txn: &mut RwTransaction<'_>, + key: &IndexedAttestationIdKey, + ) -> Result, Error> { + txn.get(&self.indexed_attestation_id_db(txn)?, key.as_ref())? + .map(IndexedAttestationId::parse) + .transpose() + } + + fn put_indexed_attestation_id( + &self, + txn: &mut RwTransaction<'_>, + key: &IndexedAttestationIdKey, + value: IndexedAttestationId, + ) -> Result<(), Error> { + txn.put( + &self.indexed_attestation_id_db(txn)?, + key, + &value, + Self::write_flags(), + )?; + Ok(()) + } + + /// Store an indexed attestation and return its ID. + /// + /// If the attestation is already stored then the existing ID will be returned without a write. pub fn store_indexed_attestation( &self, txn: &mut RwTransaction<'_>, indexed_attestation_hash: Hash256, indexed_attestation: &IndexedAttestation, - ) -> Result<(), Error> { - let key = IndexedAttestationKey::new( + ) -> Result { + // Look-up ID by hash. + let id_key = IndexedAttestationIdKey::new( indexed_attestation.data.target.epoch, indexed_attestation_hash, ); + + if let Some(indexed_att_id) = self.get_indexed_attestation_id(txn, &id_key)? { + return Ok(indexed_att_id); + } + + // Store the new indexed attestation at the end of the current table. + let mut cursor = txn.cursor(&self.indexed_attestation_db(txn)?)?; + + let indexed_att_id = match cursor.last::<_, ()>()? { + // First ID is 1 so that 0 can be used to represent `null` in `CompactAttesterRecord`. + None => 1, + Some((key_bytes, _)) => IndexedAttestationId::parse(key_bytes)? + 1, + }; + + let attestation_key = IndexedAttestationId::new(indexed_att_id); let data = indexed_attestation.as_ssz_bytes(); - txn.put( - self.indexed_attestation_db, - &key, - &data, - Self::write_flags(), - )?; - Ok(()) + cursor.put(attestation_key.as_ref(), &data, Self::write_flags())?; + drop(cursor); + + // Update the (epoch, hash) to ID mapping. + self.put_indexed_attestation_id(txn, &id_key, attestation_key)?; + + Ok(indexed_att_id) } pub fn get_indexed_attestation( &self, txn: &mut RwTransaction<'_>, - target_epoch: Epoch, - indexed_attestation_hash: Hash256, + indexed_attestation_id: IndexedAttestationId, ) -> Result, Error> { - let key = IndexedAttestationKey::new(target_epoch, indexed_attestation_hash); let bytes = txn - .get(self.indexed_attestation_db, &key) - .optional()? + .get( + &self.indexed_attestation_db(txn)?, + indexed_attestation_id.as_ref(), + )? .ok_or(Error::MissingIndexedAttestation { - root: indexed_attestation_hash, + id: indexed_attestation_id.as_u64(), })?; - Ok(IndexedAttestation::from_ssz_bytes(bytes)?) + ssz_decode(bytes) + } + + fn get_attestation_data_root( + &self, + txn: &mut RwTransaction<'_>, + indexed_id: IndexedAttestationId, + ) -> Result<(Hash256, Option>), Error> { + metrics::inc_counter(&metrics::SLASHER_NUM_ATTESTATION_ROOT_QUERIES); + + // If the value already exists in the cache, return it. + let mut cache = self.attestation_root_cache.lock(); + if let Some(attestation_data_root) = cache.get(&indexed_id) { + metrics::inc_counter(&metrics::SLASHER_NUM_ATTESTATION_ROOT_HITS); + return Ok((*attestation_data_root, None)); + } + + // Otherwise, load the indexed attestation, compute the root and cache it. + let indexed_attestation = self.get_indexed_attestation(txn, indexed_id)?; + let attestation_data_root = indexed_attestation.data.tree_hash_root(); + + cache.put(indexed_id, attestation_data_root); + + Ok((attestation_data_root, Some(indexed_attestation))) + } + + pub fn cache_attestation_data_root( + &self, + indexed_attestation_id: IndexedAttestationId, + attestation_data_root: Hash256, + ) { + let mut cache = self.attestation_root_cache.lock(); + cache.put(indexed_attestation_id, attestation_data_root); + } + + fn delete_attestation_data_roots(&self, ids: impl IntoIterator) { + let mut cache = self.attestation_root_cache.lock(); + for indexed_id in ids { + cache.pop(&indexed_id); + } + } + + pub fn attestation_root_cache_size(&self) -> usize { + self.attestation_root_cache.lock().len() } pub fn check_and_update_attester_record( @@ -362,41 +640,57 @@ impl SlasherDB { txn: &mut RwTransaction<'_>, validator_index: u64, attestation: &IndexedAttestation, - record: AttesterRecord, + record: &AttesterRecord, + indexed_attestation_id: IndexedAttestationId, ) -> Result, Error> { // See if there's an existing attestation for this attester. let target_epoch = attestation.data.target.epoch; + + let prev_max_target = self.get_attester_max_target(validator_index, txn)?; + if let Some(existing_record) = - self.get_attester_record(txn, validator_index, target_epoch)? + self.get_attester_record(txn, validator_index, target_epoch, prev_max_target)? { - // If the existing attestation data is identical, then this attestation is not + // If the existing indexed attestation is identical, then this attestation is not // slashable and no update is required. - if existing_record.attestation_data_hash == record.attestation_data_hash { + let existing_att_id = existing_record.indexed_attestation_id; + if existing_att_id == indexed_attestation_id { return Ok(AttesterSlashingStatus::NotSlashable); } - // Otherwise, load the indexed attestation so we can confirm that it's slashable. - let existing_attestation = self.get_indexed_attestation( - txn, - target_epoch, - existing_record.indexed_attestation_hash, - )?; + // Otherwise, load the attestation data root and check slashability via a hash root + // comparison. + let (existing_data_root, opt_existing_att) = + self.get_attestation_data_root(txn, existing_att_id)?; + + if existing_data_root == record.attestation_data_hash { + return Ok(AttesterSlashingStatus::NotSlashable); + } + + // If we made it this far, then the attestation is slashable. Ensure that it's + // loaded, double-check the slashing condition and return. + let existing_attestation = opt_existing_att + .map_or_else(|| self.get_indexed_attestation(txn, existing_att_id), Ok)?; + if attestation.is_double_vote(&existing_attestation) { Ok(AttesterSlashingStatus::DoubleVote(Box::new( existing_attestation, ))) } else { - Err(Error::AttesterRecordInconsistentRoot) + Err(Error::InconsistentAttestationDataRoot) } } // If no attestation exists, insert a record for this validator. else { + self.update_attester_max_target(validator_index, prev_max_target, target_epoch, txn)?; + txn.put( - self.attesters_db, - &AttesterKey::new(validator_index, target_epoch), - &record.as_ssz_bytes(), + &self.attesters_db(txn)?, + &AttesterKey::new(validator_index, target_epoch, &self.config), + &indexed_attestation_id, Self::write_flags(), )?; + Ok(AttesterSlashingStatus::NotSlashable) } } @@ -407,13 +701,15 @@ impl SlasherDB { validator_index: u64, target_epoch: Epoch, ) -> Result, Error> { + let max_target = self.get_attester_max_target(validator_index, txn)?; + let record = self - .get_attester_record(txn, validator_index, target_epoch)? + .get_attester_record(txn, validator_index, target_epoch, max_target)? .ok_or(Error::MissingAttesterRecord { validator_index, target_epoch, })?; - self.get_indexed_attestation(txn, target_epoch, record.indexed_attestation_hash) + self.get_indexed_attestation(txn, record.indexed_attestation_id) } pub fn get_attester_record( @@ -421,13 +717,18 @@ impl SlasherDB { txn: &mut RwTransaction<'_>, validator_index: u64, target: Epoch, - ) -> Result, Error> { - let attester_key = AttesterKey::new(validator_index, target); + prev_max_target: Option, + ) -> Result, Error> { + if prev_max_target.map_or(true, |prev_max| target > prev_max) { + return Ok(None); + } + + let attester_key = AttesterKey::new(validator_index, target, &self.config); Ok(txn - .get(self.attesters_db, &attester_key) - .optional()? - .map(AttesterRecord::from_ssz_bytes) - .transpose()?) + .get(&self.attesters_db(txn)?, attester_key.as_ref())? + .map(CompactAttesterRecord::parse) + .transpose()? + .filter(|record| !record.is_null())) } pub fn get_block_proposal( @@ -437,11 +738,9 @@ impl SlasherDB { slot: Slot, ) -> Result, Error> { let proposer_key = ProposerKey::new(proposer_index, slot); - Ok(txn - .get(self.proposers_db, &proposer_key) - .optional()? - .map(SignedBeaconBlockHeader::from_ssz_bytes) - .transpose()?) + txn.get(&self.proposers_db(txn)?, proposer_key.as_ref())? + .map(ssz_decode) + .transpose() } pub fn check_or_insert_block_proposal( @@ -465,7 +764,7 @@ impl SlasherDB { } } else { txn.put( - self.proposers_db, + &self.proposers_db(txn)?, &ProposerKey::new(proposer_index, slot), &block_header.as_ssz_bytes(), Self::write_flags(), @@ -491,7 +790,6 @@ impl SlasherDB { txn: &mut RwTransaction<'_>, ) -> Result<(), Error> { self.prune_proposers(current_epoch, txn)?; - self.prune_attesters(current_epoch, txn)?; self.prune_indexed_attestations(current_epoch, txn)?; Ok(()) } @@ -506,33 +804,22 @@ impl SlasherDB { .saturating_sub(self.config.history_length) .start_slot(E::slots_per_epoch()); - let mut cursor = txn.open_rw_cursor(self.proposers_db)?; + let mut cursor = txn.cursor(&self.proposers_db(txn)?)?; // Position cursor at first key, bailing out if the database is empty. - if cursor - .get(None, None, lmdb_sys::MDB_FIRST) - .optional()? - .is_none() - { + if cursor.first::<(), ()>()?.is_none() { return Ok(()); } loop { - let key_bytes = cursor - .get(None, None, lmdb_sys::MDB_GET_CURRENT)? - .0 - .ok_or(Error::MissingProposerKey)?; + let (key_bytes, ()) = cursor.get_current()?.ok_or(Error::MissingProposerKey)?; let (slot, _) = ProposerKey::parse(key_bytes)?; if slot < min_slot { cursor.del(Self::write_flags())?; // End the loop if there is no next entry. - if cursor - .get(None, None, lmdb_sys::MDB_NEXT) - .optional()? - .is_none() - { + if cursor.next::<(), ()>()?.is_none() { break; } } else { @@ -543,7 +830,7 @@ impl SlasherDB { Ok(()) } - fn prune_attesters( + fn prune_indexed_attestations( &self, current_epoch: Epoch, txn: &mut RwTransaction<'_>, @@ -552,86 +839,46 @@ impl SlasherDB { .saturating_add(1u64) .saturating_sub(self.config.history_length as u64); - let mut cursor = txn.open_rw_cursor(self.attesters_db)?; + // Collect indexed attestation IDs to delete. + let mut indexed_attestation_ids = vec![]; + + let mut cursor = txn.cursor(&self.indexed_attestation_id_db(txn)?)?; // Position cursor at first key, bailing out if the database is empty. - if cursor - .get(None, None, lmdb_sys::MDB_FIRST) - .optional()? - .is_none() - { + if cursor.first::<(), ()>()?.is_none() { return Ok(()); } loop { - let key_bytes = cursor - .get(None, None, lmdb_sys::MDB_GET_CURRENT)? - .0 - .ok_or(Error::MissingAttesterKey)?; + let (key_bytes, value) = cursor + .get_current()? + .ok_or(Error::MissingIndexedAttestationIdKey)?; - let (target_epoch, _) = AttesterKey::parse(key_bytes)?; + let (target_epoch, _) = IndexedAttestationIdKey::parse(key_bytes)?; if target_epoch < min_epoch { + indexed_attestation_ids.push(IndexedAttestationId::new( + IndexedAttestationId::parse(value)?, + )); + cursor.del(Self::write_flags())?; - // End the loop if there is no next entry. - if cursor - .get(None, None, lmdb_sys::MDB_NEXT) - .optional()? - .is_none() - { + if cursor.next::<(), ()>()?.is_none() { break; } } else { break; } } + drop(cursor); - Ok(()) - } - - fn prune_indexed_attestations( - &self, - current_epoch: Epoch, - txn: &mut RwTransaction<'_>, - ) -> Result<(), Error> { - let min_epoch = current_epoch - .saturating_add(1u64) - .saturating_sub(self.config.history_length as u64); - - let mut cursor = txn.open_rw_cursor(self.indexed_attestation_db)?; - - // Position cursor at first key, bailing out if the database is empty. - if cursor - .get(None, None, lmdb_sys::MDB_FIRST) - .optional()? - .is_none() - { - return Ok(()); - } - - loop { - let key_bytes = cursor - .get(None, None, lmdb_sys::MDB_GET_CURRENT)? - .0 - .ok_or(Error::MissingAttesterKey)?; - - let (target_epoch, _) = IndexedAttestationKey::parse(key_bytes)?; - - if target_epoch < min_epoch { - cursor.del(Self::write_flags())?; - - if cursor - .get(None, None, lmdb_sys::MDB_NEXT) - .optional()? - .is_none() - { - break; - } - } else { - break; - } + // Delete the indexed attestations. + // Optimisation potential: use a cursor here. + let indexed_attestation_db = self.indexed_attestation_db(txn)?; + for indexed_attestation_id in &indexed_attestation_ids { + txn.del(&indexed_attestation_db, indexed_attestation_id, None)?; } + self.delete_attestation_data_roots(indexed_attestation_ids); Ok(()) } diff --git a/slasher/src/error.rs b/slasher/src/error.rs index d40a54f7144..7e689022e41 100644 --- a/slasher/src/error.rs +++ b/slasher/src/error.rs @@ -1,10 +1,10 @@ -use crate::Config; +use crate::config::{Config, DiskConfig}; use std::io; -use types::{Epoch, Hash256}; +use types::Epoch; #[derive(Debug)] pub enum Error { - DatabaseError(lmdb::Error), + DatabaseError(mdbx::Error), DatabaseIOError(io::Error), DatabasePermissionsError(filesystem::Error), SszDecodeError(ssz::DecodeError), @@ -19,12 +19,16 @@ pub enum Error { chunk_size: usize, history_length: usize, }, + ConfigInvalidHistoryLength { + history_length: usize, + max_history_length: usize, + }, ConfigInvalidZeroParameter { config: Config, }, ConfigIncompatible { - on_disk_config: Config, - config: Config, + on_disk_config: DiskConfig, + config: DiskConfig, }, ConfigMissing, DistanceTooLarge, @@ -43,22 +47,26 @@ pub enum Error { ProposerKeyCorrupt { length: usize, }, - IndexedAttestationKeyCorrupt { + IndexedAttestationIdKeyCorrupt { + length: usize, + }, + IndexedAttestationIdCorrupt { length: usize, }, MissingIndexedAttestation { - root: Hash256, + id: u64, }, MissingAttesterKey, MissingProposerKey, - MissingIndexedAttestationKey, - AttesterRecordInconsistentRoot, + MissingIndexedAttestationId, + MissingIndexedAttestationIdKey, + InconsistentAttestationDataRoot, } -impl From for Error { - fn from(e: lmdb::Error) -> Self { +impl From for Error { + fn from(e: mdbx::Error) -> Self { match e { - lmdb::Error::Other(os_error) => Error::from(io::Error::from_raw_os_error(os_error)), + mdbx::Error::Other(os_error) => Error::from(io::Error::from_raw_os_error(os_error)), _ => Error::DatabaseError(e), } } diff --git a/slasher/src/lib.rs b/slasher/src/lib.rs index 10427ba2f08..184e3080e55 100644 --- a/slasher/src/lib.rs +++ b/slasher/src/lib.rs @@ -16,14 +16,18 @@ mod utils; pub use crate::slasher::Slasher; pub use attestation_queue::{AttestationBatch, AttestationQueue, SimpleBatch}; -pub use attester_record::{AttesterRecord, IndexedAttesterRecord}; +pub use attester_record::{AttesterRecord, CompactAttesterRecord, IndexedAttesterRecord}; pub use block_queue::BlockQueue; pub use config::Config; -pub use database::SlasherDB; +pub use database::{IndexedAttestationId, SlasherDB}; pub use error::Error; use types::{AttesterSlashing, EthSpec, IndexedAttestation, ProposerSlashing}; +/// LMDB-to-MDBX compatibility shims. +pub type Environment = mdbx::Environment; +pub type RwTransaction<'env> = mdbx::Transaction<'env, mdbx::RW, mdbx::NoWriteMap>; + #[derive(Debug, PartialEq)] pub enum AttesterSlashingStatus { NotSlashable, diff --git a/slasher/src/metrics.rs b/slasher/src/metrics.rs index 6b21fb013a3..b11d21d4b5a 100644 --- a/slasher/src/metrics.rs +++ b/slasher/src/metrics.rs @@ -4,7 +4,7 @@ pub use lighthouse_metrics::*; lazy_static! { pub static ref SLASHER_DATABASE_SIZE: Result = try_create_int_gauge( "slasher_database_size", - "Size of the LMDB database backing the slasher, in bytes" + "Size of the database backing the slasher, in bytes" ); pub static ref SLASHER_RUN_TIME: Result = try_create_histogram( "slasher_process_batch_time", @@ -40,4 +40,17 @@ lazy_static! { "slasher_compression_ratio", "Compression ratio for min-max array chunks (higher is better)" ); + pub static ref SLASHER_NUM_ATTESTATION_ROOT_QUERIES: Result = + try_create_int_counter( + "slasher_num_attestation_root_queries", + "Number of requests for an attestation data root", + ); + pub static ref SLASHER_NUM_ATTESTATION_ROOT_HITS: Result = try_create_int_counter( + "slasher_num_attestation_root_hits", + "Number of requests for an attestation data root that hit the LRU cache", + ); + pub static ref SLASHER_ATTESTATION_ROOT_CACHE_SIZE: Result = try_create_int_gauge( + "slasher_attestation_root_cache_size", + "Number of attestation data roots cached in memory" + ); } diff --git a/slasher/src/migrate.rs b/slasher/src/migrate.rs index 020c7aaf9ac..674ab9c132d 100644 --- a/slasher/src/migrate.rs +++ b/slasher/src/migrate.rs @@ -1,79 +1,29 @@ -use crate::{ - config::{DEFAULT_BROADCAST, DEFAULT_SLOT_OFFSET}, - database::CURRENT_SCHEMA_VERSION, - Config, Error, SlasherDB, -}; -use lmdb::RwTransaction; -use serde_derive::{Deserialize, Serialize}; -use std::path::PathBuf; +use crate::{database::CURRENT_SCHEMA_VERSION, Error, SlasherDB}; use types::EthSpec; -/// Config from schema version 1, for migration to version 2+. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ConfigV1 { - database_path: PathBuf, - chunk_size: usize, - validator_chunk_size: usize, - history_length: usize, - update_period: u64, - max_db_size_mbs: usize, -} - -type ConfigV2 = Config; - -impl Into for ConfigV1 { - fn into(self) -> ConfigV2 { - Config { - database_path: self.database_path, - chunk_size: self.chunk_size, - validator_chunk_size: self.validator_chunk_size, - history_length: self.history_length, - update_period: self.update_period, - slot_offset: DEFAULT_SLOT_OFFSET, - max_db_size_mbs: self.max_db_size_mbs, - broadcast: DEFAULT_BROADCAST, - } - } -} - impl SlasherDB { /// If the database exists, and has a schema, attempt to migrate it to the current version. - pub fn migrate(&self, txn: &mut RwTransaction<'_>) -> Result<(), Error> { - if let Some(schema_version) = self.load_schema_version(txn)? { + pub fn migrate(self) -> Result { + let mut txn = self.begin_rw_txn()?; + let schema_version = self.load_schema_version(&mut txn)?; + drop(txn); + + if let Some(schema_version) = schema_version { match (schema_version, CURRENT_SCHEMA_VERSION) { - // The migration from v1 to v2 is a bit messy because v1.0.5 silently - // changed the schema to v2, so a v1 schema could have either a v1 or v2 - // config. - (1, 2) => { - match self.load_config::(txn) { - Ok(Some(config_v1)) => { - // Upgrade to v2 config and store on disk. - let config_v2 = config_v1.into(); - self.store_config(&config_v2, txn)?; - } - Ok(None) => { - // Impossible to have schema version and no config. - return Err(Error::ConfigMissing); - } - Err(_) => { - // If loading v1 config failed, ensure loading v2 config succeeds. - // No further action is needed. - let _config_v2 = self.load_config::(txn)?; - } - } - } - (x, y) if x == y => {} - (_, _) => { - return Err(Error::IncompatibleSchemaVersion { - database_schema_version: schema_version, - software_schema_version: CURRENT_SCHEMA_VERSION, - }); - } + // Schema v3 changed the underlying database from LMDB to MDBX. Unless the user did + // some manual hacking it should be impossible to read an MDBX schema version < 3. + (from, _) if from < 3 => Err(Error::IncompatibleSchemaVersion { + database_schema_version: schema_version, + software_schema_version: CURRENT_SCHEMA_VERSION, + }), + (x, y) if x == y => Ok(self), + (_, _) => Err(Error::IncompatibleSchemaVersion { + database_schema_version: schema_version, + software_schema_version: CURRENT_SCHEMA_VERSION, + }), } + } else { + Ok(self) } - - // If the migration succeeded, update the schema version on-disk. - self.store_schema_version(txn)?; - Ok(()) } } diff --git a/slasher/src/slasher.rs b/slasher/src/slasher.rs index 122ed439a4d..066c8d63d98 100644 --- a/slasher/src/slasher.rs +++ b/slasher/src/slasher.rs @@ -6,9 +6,8 @@ use crate::metrics::{ }; use crate::{ array, AttestationBatch, AttestationQueue, AttesterRecord, BlockQueue, Config, Error, - ProposerSlashingStatus, SimpleBatch, SlasherDB, + IndexedAttestationId, ProposerSlashingStatus, RwTransaction, SimpleBatch, SlasherDB, }; -use lmdb::{RwTransaction, Transaction}; use parking_lot::Mutex; use slog::{debug, error, info, Logger}; use std::collections::HashSet; @@ -32,7 +31,7 @@ impl Slasher { pub fn open(config: Config, log: Logger) -> Result { config.validate()?; let config = Arc::new(config); - let db = SlasherDB::open(config.clone())?; + let db = SlasherDB::open(config.clone(), log.clone())?; let attester_slashings = Mutex::new(HashSet::new()); let proposer_slashings = Mutex::new(HashSet::new()); let attestation_queue = AttestationQueue::default(); @@ -159,11 +158,19 @@ impl Slasher { let mut num_stored = 0; for weak_record in &batch.attestations { if let Some(indexed_record) = weak_record.upgrade() { - self.db.store_indexed_attestation( + let indexed_attestation_id = self.db.store_indexed_attestation( txn, indexed_record.record.indexed_attestation_hash, &indexed_record.indexed, )?; + indexed_record.set_id(indexed_attestation_id); + + // Prime the attestation data root LRU cache. + self.db.cache_attestation_data_root( + IndexedAttestationId::new(indexed_attestation_id), + indexed_record.record.attestation_data_hash, + ); + num_stored += 1; } } @@ -184,6 +191,12 @@ impl Slasher { for (subqueue_id, subqueue) in grouped_attestations.subqueues.into_iter().enumerate() { self.process_batch(txn, subqueue_id, subqueue, current_epoch)?; } + + metrics::set_gauge( + &metrics::SLASHER_ATTESTATION_ROOT_CACHE_SIZE, + self.db.attestation_root_cache_size() as i64, + ); + Ok(AttestationStats { num_processed }) } @@ -197,11 +210,13 @@ impl Slasher { ) -> Result<(), Error> { // First, check for double votes. for attestation in &batch { + let indexed_attestation_id = IndexedAttestationId::new(attestation.get_id()); match self.check_double_votes( txn, subqueue_id, &attestation.indexed, - attestation.record, + &attestation.record, + indexed_attestation_id, ) { Ok(slashings) => { if !slashings.is_empty() { @@ -262,7 +277,8 @@ impl Slasher { txn: &mut RwTransaction<'_>, subqueue_id: usize, attestation: &IndexedAttestation, - attester_record: AttesterRecord, + attester_record: &AttesterRecord, + indexed_attestation_id: IndexedAttestationId, ) -> Result>, Error> { let mut slashings = HashSet::new(); @@ -275,6 +291,7 @@ impl Slasher { validator_index, attestation, attester_record, + indexed_attestation_id, )?; if let Some(slashing) = slashing_status.into_slashing(attestation) { diff --git a/slasher/src/utils.rs b/slasher/src/utils.rs index 9c9eceaa14a..ccd31e74e24 100644 --- a/slasher/src/utils.rs +++ b/slasher/src/utils.rs @@ -1,20 +1,5 @@ use crate::Error; -/// Mix-in trait for loading values from LMDB that may or may not exist. -pub trait TxnOptional { - fn optional(self) -> Result, E>; -} - -impl TxnOptional for Result { - fn optional(self) -> Result, Error> { - match self { - Ok(x) => Ok(Some(x)), - Err(lmdb::Error::NotFound) => Ok(None), - Err(e) => Err(e.into()), - } - } -} - /// Transform a transaction that would fail with a `MapFull` error into an optional result. pub trait TxnMapFull { fn allow_map_full(self) -> Result, E>; @@ -24,7 +9,7 @@ impl TxnMapFull for Result { fn allow_map_full(self) -> Result, Error> { match self { Ok(x) => Ok(Some(x)), - Err(Error::DatabaseError(lmdb::Error::MapFull)) => Ok(None), + Err(Error::DatabaseError(mdbx::Error::MapFull)) => Ok(None), Err(e) => Err(e), } } diff --git a/slasher/tests/attester_slashings.rs b/slasher/tests/attester_slashings.rs index 987853077aa..a2abbc55b15 100644 --- a/slasher/tests/attester_slashings.rs +++ b/slasher/tests/attester_slashings.rs @@ -171,7 +171,7 @@ fn slasher_test( should_process_after: impl Fn(usize) -> bool, ) { let tempdir = tempdir().unwrap(); - let config = Config::new(tempdir.path().into()).for_testing(); + let config = Config::new(tempdir.path().into()); let slasher = Slasher::open(config, test_logger()).unwrap(); let current_epoch = Epoch::new(current_epoch); @@ -200,7 +200,7 @@ fn parallel_slasher_test( current_epoch: u64, ) { let tempdir = tempdir().unwrap(); - let config = Config::new(tempdir.path().into()).for_testing(); + let config = Config::new(tempdir.path().into()); let slasher = Slasher::open(config, test_logger()).unwrap(); let current_epoch = Epoch::new(current_epoch); diff --git a/slasher/tests/proposer_slashings.rs b/slasher/tests/proposer_slashings.rs index 13a9422feda..e8b052e664a 100644 --- a/slasher/tests/proposer_slashings.rs +++ b/slasher/tests/proposer_slashings.rs @@ -9,7 +9,7 @@ use types::{Epoch, EthSpec}; #[test] fn empty_pruning() { let tempdir = tempdir().unwrap(); - let config = Config::new(tempdir.path().into()).for_testing(); + let config = Config::new(tempdir.path().into()); let slasher = Slasher::::open(config, test_logger()).unwrap(); slasher.prune_database(Epoch::new(0)).unwrap(); } @@ -19,7 +19,7 @@ fn block_pruning() { let slots_per_epoch = E::slots_per_epoch(); let tempdir = tempdir().unwrap(); - let mut config = Config::new(tempdir.path().into()).for_testing(); + let mut config = Config::new(tempdir.path().into()); config.chunk_size = 2; config.history_length = 2; diff --git a/slasher/tests/random.rs b/slasher/tests/random.rs index 22ae26d1355..7ff7fe58501 100644 --- a/slasher/tests/random.rs +++ b/slasher/tests/random.rs @@ -41,7 +41,7 @@ fn random_test(seed: u64, test_config: TestConfig) { let tempdir = tempdir().unwrap(); - let mut config = Config::new(tempdir.path().into()).for_testing(); + let mut config = Config::new(tempdir.path().into()); config.validator_chunk_size = 1 << rng.gen_range(1, 4); let chunk_size_exponent = rng.gen_range(1, 4); diff --git a/slasher/tests/wrap_around.rs b/slasher/tests/wrap_around.rs index 47054ebc669..b256840ee50 100644 --- a/slasher/tests/wrap_around.rs +++ b/slasher/tests/wrap_around.rs @@ -1,12 +1,12 @@ use logging::test_logger; -use slasher::{test_utils::indexed_att, Config, Error, Slasher}; +use slasher::{test_utils::indexed_att, Config, Slasher}; use tempfile::tempdir; use types::Epoch; #[test] fn attestation_pruning_empty_wrap_around() { let tempdir = tempdir().unwrap(); - let mut config = Config::new(tempdir.path().into()).for_testing(); + let mut config = Config::new(tempdir.path().into()); config.validator_chunk_size = 1; config.chunk_size = 16; config.history_length = 16; @@ -35,53 +35,3 @@ fn attestation_pruning_empty_wrap_around() { )); slasher.process_queued(current_epoch).unwrap(); } - -// Test that pruning can recover from a `MapFull` error -#[test] -fn pruning_with_map_full() { - let tempdir = tempdir().unwrap(); - let mut config = Config::new(tempdir.path().into()).for_testing(); - config.validator_chunk_size = 1; - config.chunk_size = 16; - config.history_length = 1024; - config.max_db_size_mbs = 1; - - let slasher = Slasher::open(config, test_logger()).unwrap(); - - let v = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; - - let mut current_epoch = Epoch::new(0); - - loop { - slasher.accept_attestation(indexed_att( - v.clone(), - (current_epoch - 1).as_u64(), - current_epoch.as_u64(), - 0, - )); - if let Err(Error::DatabaseError(lmdb::Error::MapFull)) = - slasher.process_queued(current_epoch) - { - break; - } - current_epoch += 1; - } - - loop { - slasher.prune_database(current_epoch).unwrap(); - - slasher.accept_attestation(indexed_att( - v.clone(), - (current_epoch - 1).as_u64(), - current_epoch.as_u64(), - 0, - )); - match slasher.process_queued(current_epoch) { - Ok(_) => break, - Err(Error::DatabaseError(lmdb::Error::MapFull)) => { - current_epoch += 1; - } - Err(e) => panic!("{:?}", e), - } - } -}