diff --git a/Cargo.lock b/Cargo.lock index 8291726c..ffca2403 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -169,9 +169,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.7" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cd2405b3ac1faab2990b74d728624cd9fd115651fcecc7c2d8daf01376275ba" +checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" dependencies = [ "anstyle", "anstyle-parse", @@ -245,7 +245,7 @@ dependencies = [ "quote", "rayon", "revm", - "revm-primitives 1.3.0 (git+https://github.com/bluealloy/revm.git?rev=30bbcdf)", + "revm-primitives 1.3.0 (git+https://github.com/bluealloy/revm.git?rev=30bbcdfe81446c9d1e9b37acc95f208943ddf858)", "serde", "serde_json", "syn 2.0.43", @@ -288,7 +288,7 @@ dependencies = [ "rand", "rand_distr", "revm", - "revm-primitives 1.3.0 (git+https://github.com/bluealloy/revm.git?rev=30bbcdf)", + "revm-primitives 1.3.0 (git+https://github.com/bluealloy/revm.git?rev=30bbcdfe81446c9d1e9b37acc95f208943ddf858)", "serde", "serde_json", "statrs", @@ -316,7 +316,6 @@ dependencies = [ "anyhow", "arbiter-bindings", "arbiter-core", - "async-broadcast", "async-stream", "async-trait", "crossbeam-channel", @@ -498,9 +497,9 @@ dependencies = [ [[package]] name = "assert_cmd" -version = "2.0.13" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00ad3f3a942eee60335ab4342358c161ee296829e0d16ff42fc1d6cb07815467" +checksum = "88903cb14723e4d4003335bb7f8a14f27691649105346a0f0957466c096adfe6" dependencies = [ "anstyle", "bstr", @@ -517,17 +516,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" -[[package]] -name = "async-broadcast" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "334d75cf09b33bede6cbc20e52515853ae7bee3d4eadd9540e13ce92af983d34" -dependencies = [ - "event-listener", - "event-listener-strategy", - "futures-core", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -675,7 +663,7 @@ version = "0.66.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2b84e06fc203107bfbad243f4aba2af864eb7db3b1cf46ea0a023b0b433d2a7" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "cexpr", "clang-sys", "lazy_static", @@ -715,9 +703,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.1" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" dependencies = [ "serde", ] @@ -981,9 +969,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.18" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c" +checksum = "33e92c5c1a78c62968ec57dbc2440366a2d6e5a23faf829970ff1585dc6b18e2" dependencies = [ "clap_builder", "clap_derive", @@ -991,9 +979,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.18" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7" +checksum = "f4323769dc8a61e2c39ad7dc26f6f2800524691a44d74fe3d1071a5c24db6370" dependencies = [ "anstream", "anstyle", @@ -1089,15 +1077,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "concurrent-queue" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "config" version = "0.13.4" @@ -1231,7 +1210,7 @@ version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "crossterm_winapi", "libc", "parking_lot", @@ -1532,9 +1511,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.10.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95b3f3e67048839cb0d0781f445682a35113da7121f7c949db0e2be96a4fbece" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" dependencies = [ "log", ] @@ -1883,27 +1862,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b90ca2580b73ab6a1f724b76ca11ab632df820fd6040c336200d2c1df7b3c82c" -[[package]] -name = "event-listener" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d93877bcde0eb80ca09131a08d23f0a5c18a620b01db137dba666d18cd9b30c2" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15c97b4e30ea7e4b7e7b429d6e2d8510433ba8cee4e70dfb3243794e539d29fd" -dependencies = [ - "event-listener", - "pin-project-lite", -] - [[package]] name = "eyre" version = "0.6.11" @@ -1955,9 +1913,9 @@ dependencies = [ [[package]] name = "figment" -version = "0.10.13" +version = "0.10.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7629b8c7bcd214a072c2c88b263b5bb3ceb54c34365d8c41c1665461aeae0993" +checksum = "2b6e5bc7bd59d60d0d45a6ccab6cf0f4ce28698fb4e81e750ddf229c9b824026" dependencies = [ "atomic", "pear", @@ -2266,9 +2224,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b553656127a00601c8ae5590fcfdc118e4083a7924b6cf4ffc1ea4b99dc429d7" +checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" dependencies = [ "bytes", "fnv", @@ -2340,9 +2298,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" [[package]] name = "hex" @@ -2612,9 +2570,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.66" +version = "0.3.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca" +checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" dependencies = [ "wasm-bindgen", ] @@ -2660,9 +2618,9 @@ dependencies = [ [[package]] name = "keccak" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f6d5ed8676d904364de097082f4e7d240b571b67989ced0240f08b7f966f940" +checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" dependencies = [ "cpufeatures", ] @@ -2812,7 +2770,7 @@ version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "libc", "redox_syscall", ] @@ -2825,9 +2783,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "linux-raw-sys" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" @@ -3138,7 +3096,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ - "proc-macro-crate 3.0.0", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.43", @@ -3238,12 +3196,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "parking" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" - [[package]] name = "parking_lot" version = "0.12.1" @@ -3532,9 +3484,9 @@ dependencies = [ [[package]] name = "pkg-config" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" +checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" [[package]] name = "planus" @@ -3614,7 +3566,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0f5efe734b6cbe5f97ea769be8360df5324fade396f1f3f5ad7fe9360ca4a23" dependencies = [ "ahash 0.8.7", - "bitflags 2.4.1", + "bitflags 2.4.2", "bytemuck", "chrono", "comfy-table", @@ -3716,7 +3668,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d7105b40905bb38e8fc4a7fd736594b7491baa12fad3ac492969ca221a1b5d5" dependencies = [ "ahash 0.8.7", - "bitflags 2.4.1", + "bitflags 2.4.2", "glob", "once_cell", "polars-arrow", @@ -3919,13 +3871,12 @@ checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" [[package]] name = "predicates" -version = "3.0.4" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dfc28575c2e3f19cb3c73b93af36460ae898d426eba6fc15b9bd2a5220758a0" +checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8" dependencies = [ "anstyle", "difflib", - "itertools 0.11.0", "predicates-core", ] @@ -3990,9 +3941,9 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "3.0.0" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b2685dd208a3771337d8d386a89840f0f43cd68be8dae90a5f8c2384effc9cd" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" dependencies = [ "toml_edit 0.21.0", ] @@ -4051,7 +4002,7 @@ checksum = "31b476131c3c86cb68032fdc5cb6d5a1045e3e42d96b69fa599fd77701e1f5bf" dependencies = [ "bit-set", "bit-vec", - "bitflags 2.4.1", + "bitflags 2.4.2", "lazy_static", "num-traits", "rand", @@ -4292,7 +4243,7 @@ dependencies = [ [[package]] name = "revm" version = "3.5.0" -source = "git+https://github.com/bluealloy/revm.git?rev=30bbcdf#30bbcdfe81446c9d1e9b37acc95f208943ddf858" +source = "git+https://github.com/bluealloy/revm.git?rev=30bbcdfe81446c9d1e9b37acc95f208943ddf858#30bbcdfe81446c9d1e9b37acc95f208943ddf858" dependencies = [ "auto_impl", "ethers-core", @@ -4308,22 +4259,22 @@ dependencies = [ [[package]] name = "revm-interpreter" version = "1.3.0" -source = "git+https://github.com/bluealloy/revm.git?rev=30bbcdf#30bbcdfe81446c9d1e9b37acc95f208943ddf858" +source = "git+https://github.com/bluealloy/revm.git?rev=30bbcdfe81446c9d1e9b37acc95f208943ddf858#30bbcdfe81446c9d1e9b37acc95f208943ddf858" dependencies = [ - "revm-primitives 1.3.0 (git+https://github.com/bluealloy/revm.git?rev=30bbcdf)", + "revm-primitives 1.3.0 (git+https://github.com/bluealloy/revm.git?rev=30bbcdfe81446c9d1e9b37acc95f208943ddf858)", "serde", ] [[package]] name = "revm-precompile" version = "2.2.0" -source = "git+https://github.com/bluealloy/revm.git?rev=30bbcdf#30bbcdfe81446c9d1e9b37acc95f208943ddf858" +source = "git+https://github.com/bluealloy/revm.git?rev=30bbcdfe81446c9d1e9b37acc95f208943ddf858#30bbcdfe81446c9d1e9b37acc95f208943ddf858" dependencies = [ "aurora-engine-modexp", "c-kzg", "k256", "once_cell", - "revm-primitives 1.3.0 (git+https://github.com/bluealloy/revm.git?rev=30bbcdf)", + "revm-primitives 1.3.0 (git+https://github.com/bluealloy/revm.git?rev=30bbcdfe81446c9d1e9b37acc95f208943ddf858)", "ripemd", "secp256k1", "sha2", @@ -4339,7 +4290,7 @@ dependencies = [ "alloy-primitives 0.4.2", "alloy-rlp", "auto_impl", - "bitflags 2.4.1", + "bitflags 2.4.2", "bitvec", "enumn", "hashbrown 0.14.3", @@ -4349,12 +4300,12 @@ dependencies = [ [[package]] name = "revm-primitives" version = "1.3.0" -source = "git+https://github.com/bluealloy/revm.git?rev=30bbcdf#30bbcdfe81446c9d1e9b37acc95f208943ddf858" +source = "git+https://github.com/bluealloy/revm.git?rev=30bbcdfe81446c9d1e9b37acc95f208943ddf858#30bbcdfe81446c9d1e9b37acc95f208943ddf858" dependencies = [ "alloy-primitives 0.5.4", "alloy-rlp", "auto_impl", - "bitflags 2.4.1", + "bitflags 2.4.2", "bitvec", "c-kzg", "derive_more", @@ -4524,11 +4475,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.28" +version = "0.38.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" +checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "errno", "libc", "linux-raw-sys", @@ -4900,9 +4851,9 @@ dependencies = [ [[package]] name = "simd-json" -version = "0.13.7" +version = "0.13.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a9511d2aa0b26dce65ea3860321cd680a8daeb6808b04f1e94429e0389ad952" +checksum = "2faf8f101b9bc484337a6a6b0409cf76c139f2fb70a9e3aee6b6774be7bfbf76" dependencies = [ "ahash 0.8.7", "getrandom", @@ -4951,9 +4902,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.2" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" [[package]] name = "smartstring" @@ -5699,9 +5650,9 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" @@ -5835,9 +5786,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.89" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" +checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -5845,9 +5796,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.89" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" +checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" dependencies = [ "bumpalo", "log", @@ -5860,9 +5811,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.39" +version = "0.4.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12" +checksum = "bde2032aeb86bdfaecc8b261eef3cba735cc426c1f3a3416d1e0791be95fc461" dependencies = [ "cfg-if", "js-sys", @@ -5872,9 +5823,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.89" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" +checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -5882,9 +5833,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.89" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" +checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" dependencies = [ "proc-macro2", "quote", @@ -5895,15 +5846,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.89" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" +checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" [[package]] name = "web-sys" -version = "0.3.66" +version = "0.3.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f" +checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 9462f735..7c540f48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,11 +23,11 @@ path = "bin/main.rs" [workspace.dependencies] arbiter-bindings = { version = "*", path = "./arbiter-bindings" } arbiter-core = { version = "*", path = "./arbiter-core" } -ethers = { version = "2.0.10" } +ethers = { version = "2.0.11" } serde = { version = "1.0.193", features = ["derive"] } serde_json = { version = "=1.0.108" } -revm = { git = "https://github.com/bluealloy/revm.git", rev = "30bbcdf", features = [ "ethersdb", "std", "serde"] } -revm-primitives = { git = "https://github.com/bluealloy/revm.git", rev = "30bbcdf" } +revm = { git = "https://github.com/bluealloy/revm.git", features = [ "ethersdb", "std", "serde"], rev = "30bbcdfe81446c9d1e9b37acc95f208943ddf858" } +revm-primitives = { git = "https://github.com/bluealloy/revm.git", rev = "30bbcdfe81446c9d1e9b37acc95f208943ddf858" } thiserror = { version = "1.0.55" } syn = { version = "2.0.43" } quote = { version = "=1.0.33" } @@ -44,7 +44,7 @@ async-stream = "0.3.5" arbiter-core.workspace = true # Command line and config -clap = { version = "=4.4.18", features = ["derive"] } +clap = { version = "=4.4.14", features = ["derive"] } serde.workspace = true serde_json.workspace = true config = { version = "=0.13.4" } @@ -65,8 +65,8 @@ thiserror.workspace = true # Dependencies for the test build and development [dev-dependencies] tokio.workspace = true -assert_cmd = { version = "=2.0.13" } -rayon = { version = "1.8.1" } +assert_cmd = { version = "=2.0.12" } +rayon = { version = "1.8.0" } revm-primitives.workspace = true # Release profile diff --git a/arbiter-core/src/data_collection.rs b/arbiter-core/src/data_collection.rs index e17fd1dd..e659b3b9 100644 --- a/arbiter-core/src/data_collection.rs +++ b/arbiter-core/src/data_collection.rs @@ -20,7 +20,7 @@ use std::{ collections::BTreeMap, fmt::Debug, io::BufWriter, marker::PhantomData, mem::transmute, - pin::Pin, sync::Arc, + sync::Arc, }; use ethers::{ @@ -38,6 +38,7 @@ use polars::{ }; use serde::Serialize; use serde_json::Value; +use tokio::{sync::broadcast::Receiver as BroadcastReceiver, task::JoinHandle}; use super::*; use crate::{ @@ -63,8 +64,8 @@ pub(crate) type FilterDecoder = /// traits, and has a static lifetime. pub struct EventLogger { decoder: FilterDecoder, - receiver: Option>, - shutdown_sender: Option>, + receiver: Option>, + // shutdown_sender: Option>, output_file_type: Option, directory: Option, file_name: Option, @@ -103,7 +104,7 @@ impl EventLogger { file_name: None, decoder: BTreeMap::new(), receiver: None, - shutdown_sender: None, + // shutdown_sender: None, output_file_type: None, metadata: None, } @@ -138,15 +139,7 @@ impl EventLogger { ); let connection = middleware.provider().as_ref(); if self.receiver.is_none() { - let (event_sender, event_receiver) = crossbeam_channel::unbounded::(); - let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded::<()>(1); - connection - .event_broadcaster - .lock() - .unwrap() - .add_sender(event_sender, Some(shutdown_receiver)); - self.receiver = Some(event_receiver); - self.shutdown_sender = Some(shutdown_sender); + self.receiver = Some(connection.event_sender.subscribe()); } debug!("`EventLogger` now provided with event labeled: {:?}", name); self @@ -253,15 +246,15 @@ impl EventLogger { /// /// This function will return an error if there is a problem creating the /// directories or files, or writing to the files. - pub fn run(self) -> Result<(), RevmMiddlewareError> { - let receiver = self.receiver.unwrap(); + pub fn run(self) -> Result, RevmMiddlewareError> { + let mut receiver = self.receiver.unwrap(); let dir = self.directory.unwrap_or("./data".into()); let file_name = self.file_name.unwrap_or("output".into()); let file_type = self.output_file_type.unwrap_or(OutputFileType::JSON); let metadata = self.metadata.clone(); - std::thread::spawn(move || { + let task = tokio::spawn(async move { let mut events: BTreeMap>> = BTreeMap::new(); - while let Ok(broadcast) = receiver.recv() { + while let Ok(broadcast) = receiver.recv().await { match broadcast { Broadcast::StopSignal => { debug!("`EventLogger` has seen a stop signal"); @@ -288,7 +281,6 @@ impl EventLogger { } let data = OutputData { events, metadata }; serde_json::to_writer(writer, &data).expect("Unable to write data"); - self.shutdown_sender.unwrap().send(()).unwrap(); } OutputFileType::CSV => { // Write the DataFrame to a CSV file @@ -301,7 +293,6 @@ impl EventLogger { writer.finish(&mut df).unwrap_or_else(|_| { panic!("Error writing to csv file"); }); - self.shutdown_sender.unwrap().send(()).unwrap(); } OutputFileType::Parquet => { // Write the DataFrame to a parquet file @@ -314,7 +305,6 @@ impl EventLogger { writer.finish(&mut df).unwrap_or_else(|_| { panic!("Error writing to parquet file"); }); - self.shutdown_sender.unwrap().send(()).unwrap(); } } break; @@ -361,36 +351,38 @@ impl EventLogger { } } }); - Ok(()) + Ok(task) } /// Returns a stream of the serialized events. - pub fn stream(self) -> Pin + Send + 'static>> { - let receiver = self.receiver.clone().unwrap(); - - let stream = async_stream::stream! { - while let Ok(broadcast) = receiver.recv() { - match broadcast { - Broadcast::StopSignal => { - trace!("`EventLogger` has seen a stop signal"); - break; - } - Broadcast::Event(event) => { - trace!("`EventLogger` received an event"); - let ethers_logs = revm_logs_to_ethers_logs(event); - for log in ethers_logs { - for (_id, (filter, decoder)) in self.decoder.iter() { - if filter.filter_address(&log) && filter.filter_topics(&log) { - yield decoder(&log.clone().into()); + pub fn stream(mut self) -> Option + Send> { + if let Some(mut receiver) = self.receiver.take() { + let stream = async_stream::stream! { + while let Ok(broadcast) = receiver.recv().await { + match broadcast { + Broadcast::StopSignal => { + trace!("`EventLogger` has seen a stop signal"); + break; + } + Broadcast::Event(event) => { + trace!("`EventLogger` received an event"); + let ethers_logs = revm_logs_to_ethers_logs(event); + for log in ethers_logs { + for (_id, (filter, decoder)) in self.decoder.iter() { + if filter.filter_address(&log) && filter.filter_topics(&log) { + yield decoder(&log.clone().into()); + } } } } } } - } - }; + }; - Box::pin(stream) + Some(stream) + } else { + None + } } } diff --git a/arbiter-core/src/environment/mod.rs b/arbiter-core/src/environment/mod.rs index 3f91f7e7..fc9b8e31 100644 --- a/arbiter-core/src/environment/mod.rs +++ b/arbiter-core/src/environment/mod.rs @@ -45,6 +45,7 @@ use revm::{ }; use serde::{Deserialize, Serialize}; use thiserror::Error; +use tokio::sync::broadcast::{channel, Sender as BroadcastSender}; use super::*; #[cfg_attr(doc, doc(hidden))] @@ -84,14 +85,6 @@ pub(crate) type OutcomeSender = Sender>; /// emitted from transactions. pub(crate) type OutcomeReceiver = Receiver>; -/// Alias for the sender used in the [`EventBroadcaster`] that transmits -/// contract events via [`Log`]. -pub(crate) type EventSender = Sender; - -/// Alias for the receiver used in the [`EventBroadcaster`] that accepts -/// shutdown signals from child processes. -pub(crate) type ShutDownReceiver = Receiver<()>; - /// Represents a sandboxed EVM environment. /// /// ## Communication @@ -162,10 +155,11 @@ impl Environment { db: Option>, ) -> Self { let (instruction_sender, instruction_receiver) = unbounded(); + let (event_broadcaster, _) = channel(512); let socket = Socket { instruction_sender: Arc::new(instruction_sender), instruction_receiver, - event_broadcaster: Arc::new(Mutex::new(EventBroadcaster::new())), + event_broadcaster, }; let db = db.map(|db| ArbiterDB(Arc::new(RwLock::new(db)))); @@ -540,15 +534,19 @@ impl Environment { // update transaction count for sender - let event_broadcaster = event_broadcaster - .lock() - .map_err(|e| EnvironmentError::Communication(e.to_string()))?; let receipt_data = ReceiptData { block_number, transaction_index: transaction_index.into(), cumulative_gas_per_block, }; - event_broadcaster.broadcast(Some(execution_result.logs()), false)?; + match event_broadcaster.send(Broadcast::Event(execution_result.logs())) { + Ok(_) => {} + Err(_) => { + warn!( + "Event was not sent to any listeners. Are there any listeners?" + ) + } + } outcome_sender .send(Ok(Outcome::TransactionCompleted( execution_result, @@ -649,10 +647,15 @@ impl Environment { .map_err(|e| EnvironmentError::Communication(e.to_string()))?; } Instruction::Stop(outcome_sender) => { + match event_broadcaster.send(Broadcast::StopSignal) { + Ok(_) => {} + Err(_) => { + warn!("Stop signal was not sent to any listeners. Are there any listeners?") + } + } outcome_sender .send(Ok(Outcome::StopCompleted(evm.db.unwrap()))) .map_err(|e| EnvironmentError::Communication(e.to_string()))?; - event_broadcaster.lock().unwrap().broadcast(None, true)?; break; } } @@ -718,7 +721,7 @@ impl Environment { pub(crate) struct Socket { pub(crate) instruction_sender: Arc, pub(crate) instruction_receiver: InstructionReceiver, - pub(crate) event_broadcaster: Arc>, + pub(crate) event_broadcaster: BroadcastSender, } /// Enum representing the types of broadcasts that can be sent. @@ -729,7 +732,7 @@ pub(crate) struct Socket { /// Variants: /// * `StopSignal`: Represents a signal to stop the event logger process. /// * `Event(Vec)`: Represents a broadcast of a vector of Ethereum logs. -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug)] pub enum Broadcast { /// Represents a signal to stop the event logger process. StopSignal, @@ -737,58 +740,6 @@ pub enum Broadcast { Event(Vec), } -/// Responsible for broadcasting Ethereum logs to subscribers. -/// -/// Maintains a list of senders to which logs are sent whenever they are -/// produced by the EVM. -#[derive(Clone, Debug)] -pub(crate) struct EventBroadcaster(Vec<(EventSender, Option)>); - -impl EventBroadcaster { - /// Called only when creating a new [`Environment`] - fn new() -> Self { - Self(vec![]) - } - - /// Called from [`RevmMiddleware`] implementation when setting up a new - /// `FilterWatcher` as each watcher will need their own sender - pub(crate) fn add_sender( - &mut self, - sender: EventSender, - shutdown_receiver: Option, - ) { - debug!("Sender added for `EventBroadcaster`"); - self.0.push((sender, shutdown_receiver)); - } - - /// Loop through each sender and send `Vec` emitted from a transaction - /// downstream to any and all receivers - fn broadcast(&self, logs: Option>, stop_signal: bool) -> Result<(), EnvironmentError> { - if stop_signal { - for (sender, receiver) in &self.0 { - sender.send(Broadcast::StopSignal)?; - debug!("Broadcasted stop signal to listener"); - if let Some(receiver) = receiver { - receiver - .recv() - .map_err(|_| EnvironmentError::ShutDownReceiverError)?; - debug!("Blocked on shutdown receiver signal"); - } - } - return Ok(()); - } else { - if logs.is_none() { - unreachable!(); - } - for (sender, _) in &self.0 { - sender.send(Broadcast::Event(logs.clone().unwrap()))?; - trace!("Broadcasting event to all listeners") - } - } - Ok(()) - } -} - /// Convert a U256 to a U64, discarding the higher bits if the number is larger /// than 2^64 # Arguments /// * `input` - The U256 to convert. diff --git a/arbiter-core/src/middleware/connection.rs b/arbiter-core/src/middleware/connection.rs index 670c5d43..f4c11e95 100644 --- a/arbiter-core/src/middleware/connection.rs +++ b/arbiter-core/src/middleware/connection.rs @@ -3,20 +3,19 @@ use std::{ collections::HashMap, fmt::Debug, pin::Pin, - sync::{Arc, Mutex, Weak}, - task::Poll, + sync::{Arc, Weak}, }; -use crossbeam_channel::TryRecvError; -use futures_util::{stream, Stream}; +use futures_util::Stream; use serde_json::value::RawValue; +use tokio::sync::broadcast::{Receiver as BroadcastReceiver, Sender as BroadcastSender}; use super::{cast::revm_logs_to_ethers_logs, *}; -use crate::environment::{EventBroadcaster, InstructionSender, OutcomeReceiver, OutcomeSender}; +use crate::environment::{InstructionSender, OutcomeReceiver, OutcomeSender}; /// Represents a connection to the EVM contained in the corresponding /// [`Environment`]. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Connection { /// Used to send calls and transactions to the [`Environment`] to be /// executed by `revm`. @@ -32,9 +31,7 @@ pub struct Connection { /// call/transact. pub(crate) outcome_receiver: OutcomeReceiver, - /// A reference to the [`EventBroadcaster`] so that more receivers of the - /// broadcast can be taken from it. - pub(crate) event_broadcaster: Arc>, + pub(crate) event_sender: BroadcastSender, /// A collection of `FilterReceiver`s that will receive outgoing logs /// generated by `revm` and output by the [`Environment`]. @@ -49,7 +46,7 @@ impl From<&Environment> for Connection { instruction_sender: Arc::downgrade(instruction_sender), outcome_sender, outcome_receiver, - event_broadcaster: Arc::clone(&environment.socket.event_broadcaster), + event_sender: environment.socket.event_broadcaster.clone(), filter_receivers: Arc::new(Mutex::new(HashMap::new())), } } @@ -103,22 +100,24 @@ impl JsonRpcClient for Connection { ))?; let mut logs = vec![]; let filtered_params = FilteredParams::new(Some(filter_receiver.filter.clone())); - if let Ok(broadcast) = filter_receiver.receiver.try_recv() { - match broadcast { - Broadcast::Event(received_logs) => { - let ethers_logs = revm_logs_to_ethers_logs(received_logs); - for log in ethers_logs { - if filtered_params.filter_address(&log) - && filtered_params.filter_topics(&log) - { - logs.push(log); + if let Some(receiver) = filter_receiver.receiver.as_mut() { + if let Ok(broadcast) = receiver.try_recv() { + match broadcast { + Broadcast::Event(received_logs) => { + let ethers_logs = revm_logs_to_ethers_logs(received_logs); + for log in ethers_logs { + if filtered_params.filter_address(&log) + && filtered_params.filter_topics(&log) + { + logs.push(log); + } } } - } - Broadcast::StopSignal => { - return Err(ProviderError::CustomError( - "The `EventBroadcaster` has stopped!".to_string(), - )) + Broadcast::StopSignal => { + return Err(ProviderError::CustomError( + "The `EventBroadcaster` has stopped!".to_string(), + )); + } } } } @@ -145,12 +144,22 @@ impl PubsubClient for Connection { let id = id.into(); debug!("Subscribing to filter with ID: {:?}", id); - let filter_receiver = self.filter_receivers.lock().unwrap().get(&id).cloned(); - match filter_receiver { - Some(filter_receiver) => { - let stream = stream::poll_fn(move |cx| { - match filter_receiver.receiver.try_recv() { - Ok(Broadcast::Event(logs)) => { + let mut filter_receiver = self + .filter_receivers + .lock() + .unwrap() + .remove(&id) + .take() + .unwrap(); + + let mut receiver = filter_receiver.receiver.take().unwrap(); + let stream = async_stream::stream! { + while let Ok(broadcast) = receiver.recv().await { + match broadcast { + Broadcast::StopSignal => { + break; + } + Broadcast::Event(logs) => { let filtered_params = FilteredParams::new(Some(filter_receiver.filter.clone())); let ethers_logs = revm_logs_to_ethers_logs(logs); @@ -173,39 +182,23 @@ impl PubsubClient for Connection { continue; } }; - return Poll::Ready(Some(raw_log)); + yield raw_log; } } - Poll::Ready(None) - } - Ok(Broadcast::StopSignal) => { - eprintln!("The `EventBroadcaster` has stopped!"); - Poll::Ready(None) - } - Err(TryRecvError::Empty) => { - // No logs available yet, so we'll try again later - cx.waker().wake_by_ref(); - Poll::Pending - } - Err(TryRecvError::Disconnected) => { - eprintln!("The `EventBroadcaster` has been disconnected!"); - Poll::Ready(None) + } - } - }); - Ok(Box::pin(stream)) + } } - None => Err(ProviderError::CustomError( - "The filter ID does not seem to match any that this client owns!".to_string(), - )), - } + }; + + Ok(Box::pin(stream)) } + // TODO: At the moment, this won't actually drop the stream. fn unsubscribe>(&self, id: T) -> Result<(), Self::Error> { let id = id.into(); debug!("Unsubscribing from filter with ID: {:?}", id); - let mut filter_receivers = self.filter_receivers.lock().unwrap(); - if filter_receivers.remove(&id).is_some() { + if self.filter_receivers.lock().unwrap().remove(&id).is_some() { Ok(()) } else { Err(ProviderError::CustomError( @@ -218,7 +211,7 @@ impl PubsubClient for Connection { /// Packages together a [`crossbeam_channel::Receiver>`] along with a /// [`Filter`] for events. Allows the client to have a stream of filtered /// events. -#[derive(Clone, Debug)] +#[derive(Debug)] pub(crate) struct FilterReceiver { /// The filter definition used for this receiver. /// Comes from the `ethers-rs` crate. @@ -226,5 +219,5 @@ pub(crate) struct FilterReceiver { /// The receiver for the channel that receives logs from the broadcaster. /// These are filtered upon reception. - pub(crate) receiver: crossbeam_channel::Receiver, + pub(crate) receiver: Option>, } diff --git a/arbiter-core/src/middleware/mod.rs b/arbiter-core/src/middleware/mod.rs index 8d8130f9..19b7ddc2 100644 --- a/arbiter-core/src/middleware/mod.rs +++ b/arbiter-core/src/middleware/mod.rs @@ -94,7 +94,7 @@ pub mod nonce_middleware; /// Use a seed like `Some("test_label")` for maintaining a /// consistent address across simulations and client labeling. Seeding is be /// useful for debugging and post-processing. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct RevmMiddleware { provider: Provider, wallet: EOA, @@ -222,7 +222,7 @@ impl RevmMiddleware { instruction_sender: Arc::downgrade(instruction_sender), outcome_sender, outcome_receiver: outcome_receiver.clone(), - event_broadcaster: Arc::clone(&environment.socket.event_broadcaster), + event_sender: environment.socket.event_broadcaster.clone(), filter_receivers: Arc::new(Mutex::new(HashMap::new())), }; let provider = Provider::new(connection); @@ -687,22 +687,11 @@ impl Middleware for RevmMiddleware { hasher.update(serde_json::to_string(&args).map_err(RevmMiddlewareError::Json)?); let hash = hasher.finalize(); let id = ethers::types::U256::from(ethers::types::H256::from_slice(&hash).as_bytes()); - let (event_sender, event_receiver) = crossbeam_channel::unbounded::(); + let event_receiver = self.provider().as_ref().event_sender.subscribe(); let filter_receiver = FilterReceiver { filter, - receiver: event_receiver, + receiver: Some(event_receiver), }; - self.provider() - .as_ref() - .event_broadcaster - .lock() - .map_err(|e| { - RevmMiddlewareError::EventBroadcaster(format!( - "Failed to gain lock on the `Connection`'s `event_broadcaster` due to {:?} ", - e - )) - })? - .add_sender(event_sender, None); self.provider() .as_ref() .filter_receivers diff --git a/arbiter-core/src/tests/data_collection_integration.rs b/arbiter-core/src/tests/data_collection_integration.rs index de895e05..4bce2684 100644 --- a/arbiter-core/src/tests/data_collection_integration.rs +++ b/arbiter-core/src/tests/data_collection_integration.rs @@ -19,7 +19,7 @@ async fn generate_events( lex: LiquidExchange, client: Arc, ) -> Result<(), RevmMiddlewareError> { - for _ in 0..5 { + for _ in 0..2 { arbx.approve(client.address(), U256::from(1)) .send() .await @@ -41,12 +41,13 @@ async fn generate_events( #[tokio::test] async fn data_capture() { + // let (env, client) = startup_user_controlled().unwrap(); let (arbx, arby, lex) = deploy_liquid_exchange(client.clone()).await.unwrap(); println!("Deployed contracts"); // default_listener - EventLogger::builder() + let logger_task = EventLogger::builder() .add(arbx.events(), "arbx") .add(arby.events(), "arby") .add(lex.events(), "lex") @@ -90,6 +91,8 @@ async fn data_capture() { let _ = env.stop(); + logger_task.await.unwrap(); + std::thread::sleep(std::time::Duration::from_secs(1)); assert!(Path::new("./data/output.csv").exists()); assert!(Path::new("./data/output.parquet").exists()); assert!(Path::new("./data/output.json").exists()); @@ -98,26 +101,37 @@ async fn data_capture() { #[tokio::test] async fn data_stream() { - std::env::set_var("RUST_LOG", "trace"); - tracing_subscriber::fmt::init(); + // std::env::set_var("RUST_LOG", "trace"); + // tracing_subscriber::fmt::init(); let (env, client) = startup_user_controlled().unwrap(); let (arbx, arby, lex) = deploy_liquid_exchange(client.clone()).await.unwrap(); - println!("Deployed contracts"); + println!( + "Deployed +contracts" + ); // default_listener - let streamer = EventLogger::builder() - .add_stream(arbx.events()) - .add_stream(arby.events()) - .add_stream(lex.events()) - .stream(); + let mut streamer = Box::pin( + EventLogger::builder() + .add_stream(arbx.events()) + .add_stream(lex.events()) + .stream() + .unwrap(), + ); generate_events(arbx, arby, lex, client.clone()) .await .unwrap_or_else(|e| { panic!("Error generating events: {}", e); }); - panic!("This test is not complete"); - let stream_buffer = streamer.enumerate().collect::>().await; - println!("Buffer: {:?}", stream_buffer); + let mut idx = 0; + while let Some(item) = streamer.next().await { + println!("item: {}", item); + idx += 1; + if idx == 4 { + break; + } + } let _ = env.stop(); + assert_eq!(idx, 4); } diff --git a/arbiter-engine/Cargo.toml b/arbiter-engine/Cargo.toml index 471a89bc..fef73219 100644 --- a/arbiter-engine/Cargo.toml +++ b/arbiter-engine/Cargo.toml @@ -21,8 +21,7 @@ tokio.workspace = true async-stream.workspace = true anyhow = { version = "=1.0.79" } tracing.workspace = true -tokio-stream = "0.1.14" -async-broadcast = "0.6.0" +tokio-stream = "0.1.14" futures = "0.3.30" crossbeam-channel.workspace = true arbiter-core.workspace = true diff --git a/arbiter-engine/src/agent.rs b/arbiter-engine/src/agent.rs index 19c81f58..9ad86c7f 100644 --- a/arbiter-engine/src/agent.rs +++ b/arbiter-engine/src/agent.rs @@ -1,5 +1,7 @@ // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // TODO: Notes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// * Maybe we just use tokio for everything (like `select`) so that we don't mix +// futures and tokio together in ways that may be weird. // When we start running an agent, we should have their messager start producing // events that can be used by any and all behaviors the agent has that takes in // messages as an event. Similarly, we should have agents start up any streams @@ -21,10 +23,14 @@ use std::{fmt::Debug, pin::Pin, sync::Arc}; use arbiter_core::{data_collection::EventLogger, middleware::RevmMiddleware}; use ethers::contract::{EthLogDecode, Event}; use futures::stream::{Stream, StreamExt}; -use futures_util::future::{join_all, JoinAll}; +use futures_util::future::join_all; use serde::de::DeserializeOwned; -use tokio::task::JoinHandle; +use tokio::{ + sync::broadcast::{channel, Receiver as BroadcastReceiver, Sender as BroadcastSender}, + task::JoinHandle, +}; +use self::machine::MachineInstruction; use super::*; use crate::{ machine::{Behavior, Engine, State, StateMachine}, @@ -72,7 +78,7 @@ pub struct Agent { /// The messager the agent uses to send and receive messages from other /// agents. - pub messager: Messager, + pub messager: Option, /// The client the agent uses to interact with the blockchain. pub client: Arc, @@ -85,141 +91,127 @@ pub struct Agent { /// events. behavior_engines: Option>>, - /// The tasks that represent the agent running a specific state transition. - behavior_tasks: Option>>>, - /// The pipeline for yielding events from the centralized event streamer /// (for both messages and Ethereum events) to agents. - distributor: ( - async_broadcast::Sender, - async_broadcast::Receiver, - ), + pub(crate) distributor: (BroadcastSender, BroadcastReceiver), + + broadcast_task: Option + Send>>>>, } impl Agent { /// Produces a new agent with the given identifier. - pub(crate) fn connect(id: &str, world: &World) -> Self { + pub fn new(id: &str, world: &World) -> Self { let messager = world.messager.for_agent(id); let client = RevmMiddleware::new(&world.environment, Some(id)).unwrap(); - let distributor = async_broadcast::broadcast(512); + let distributor = channel(512); Self { id: id.to_owned(), state: State::Uninitialized, - messager, + messager: Some(messager), client, event_streamer: Some(EventLogger::builder()), behavior_engines: None, distributor, - behavior_tasks: None, + broadcast_task: None, } } /// Adds an Ethereum event to the agent's event streamer. - pub fn add_event( - &mut self, + pub fn with_event( + mut self, event: Event, RevmMiddleware, D>, - ) { + ) -> Self { self.event_streamer = Some(self.event_streamer.take().unwrap().add_stream(event)); + self } /// Adds a behavior to the agent that it will run. - pub fn add_behavior( - &mut self, + pub fn with_behavior( + mut self, behavior: impl Behavior + 'static, - ) { - let event_receiver = self.distributor.0.new_receiver(); + ) -> Self { + let event_receiver = self.distributor.0.subscribe(); let engine = Engine::new(behavior, event_receiver); if let Some(engines) = &mut self.behavior_engines { engines.push(Box::new(engine)); } else { self.behavior_engines = Some(vec![Box::new(engine)]); - } + }; + self } - // TODO: This is unused for now, but we will use it in the future for the event - // pipelining. - #[allow(unused)] - pub(crate) fn start_event_stream(&mut self) -> Pin + Send + '_>> { - let event_stream = self.event_streamer.take().unwrap().stream(); - let message_stream = self - .messager - .stream() - .map(|msg| serde_json::to_string(&msg).unwrap_or_else(|e| e.to_string())); - - Box::pin(futures::stream::select(event_stream, message_stream)) + pub(crate) async fn run(&mut self, instruction: MachineInstruction) { + let behavior_engines = self.behavior_engines.take().unwrap(); + let behavior_tasks = join_all(behavior_engines.into_iter().map(|mut engine| { + tokio::spawn(async move { + engine.execute(instruction).await; + engine + }) + })); + self.behavior_engines = Some( + behavior_tasks + .await + .into_iter() + .map(|res| res.unwrap()) + .collect::>(), + ); } } #[async_trait::async_trait] impl StateMachine for Agent { - fn run_state(&mut self, state: State) { - match state { - State::Uninitialized => { - unimplemented!("This never gets called.") - } - State::Syncing => { - self.state = state; - trace!("Agent is syncing."); - let mut behavior_engines = self.behavior_engines.take().unwrap(); - for engine in behavior_engines.iter_mut() { - engine.run_state(state); - } - self.behavior_tasks = - Some(join_all(behavior_engines.into_iter().map(|mut engine| { - tokio::spawn(async move { - engine.transition().await; - engine - }) - }))); + #[tracing::instrument(skip(self), fields(id = self.id))] + async fn execute(&mut self, instruction: MachineInstruction) { + match instruction { + MachineInstruction::Sync => { + debug!("Agent is syncing."); + self.state = State::Syncing; + self.run(instruction).await; } - State::Startup => { - trace!("Agent is starting up."); - self.state = state; - let mut behavior_engines = self.behavior_engines.take().unwrap(); - for engine in behavior_engines.iter_mut() { - engine.run_state(state); - } - self.behavior_tasks = - Some(join_all(behavior_engines.into_iter().map(|mut engine| { - tokio::spawn(async move { - engine.transition().await; - engine - }) - }))); + MachineInstruction::Start => { + debug!("Agent is starting up."); + self.run(instruction).await; } - State::Processing => { - trace!("Agent is processing."); - self.state = state; - let mut behavior_engines = self.behavior_engines.take().unwrap(); - for engine in behavior_engines.iter_mut() { - engine.run_state(state); - } - self.behavior_tasks = - Some(join_all(behavior_engines.into_iter().map(|mut engine| { - tokio::spawn(async move { - engine.transition().await; - engine - }) - }))); + MachineInstruction::Process => { + debug!("Agent is processing."); + self.state = State::Processing; + let messager = self.messager.take().unwrap(); + let message_stream = messager + .stream() + .map(|msg| serde_json::to_string(&msg).unwrap_or_else(|e| e.to_string())); + + let eth_event_stream = self.event_streamer.take().unwrap().stream(); + + let mut event_stream: Pin + Send + '_>> = + if let Some(event_stream) = eth_event_stream { + trace!("Merging event streams."); + // Convert the individual streams into a Vec + let all_streams = vec![ + Box::pin(message_stream) as Pin + Send>>, + Box::pin(event_stream), + ]; + // Use select_all to combine them + Box::pin(futures::stream::select_all(all_streams)) + } else { + trace!("Agent only sees message stream."); + Box::pin(message_stream) + }; + + let sender = self.distributor.0.clone(); + self.broadcast_task = Some(tokio::spawn(async move { + while let Some(event) = event_stream.next().await { + sender.send(event).unwrap(); + } + event_stream + })); + self.run(instruction).await; } - State::Stopped => { - todo!() + MachineInstruction::Stop => { + unreachable!("This is never explicitly called on an agent.") } } } - - async fn transition(&mut self) { - self.behavior_engines = Some( - self.behavior_tasks - .take() - .unwrap() - .await - .into_iter() - .map(|res| res.unwrap()) - .collect::>(), - ); - } } #[cfg(test)] @@ -229,20 +221,14 @@ mod tests { use super::*; use crate::messager::Message; - #[ignore] - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn streaming() { - std::env::set_var("RUST_LOG", "trace"); - tracing_subscriber::fmt::init(); - - let mut world = World::new("world"); - let messager = world.messager.clone(); - println!( - "Receiver count: {:?}", - messager.broadcast_sender.receiver_count() - ); + // std::env::set_var("RUST_LOG", "trace"); + // tracing_subscriber::fmt::init(); - let agent = world.create_agent("agent"); + let world = World::new("world"); + let agent = Agent::new("agent", &world); let arb = ArbiterToken::deploy( agent.client.clone(), @@ -253,28 +239,73 @@ mod tests { .await .unwrap(); - agent.add_event(arb.events()); + let mut agent = agent.with_event(arb.events()); let address = agent.client.address(); - let mut streamer = agent.start_event_stream(); - - for _ in 0..5 { - messager - .send(Message { - from: "me".to_string(), - to: messager::To::All, - data: "hello".to_string(), - }) - .await; - arb.approve(address, U256::from(1)) - .send() - .await - .unwrap() - .await - .unwrap(); - } - while let Some(msg) = streamer.next().await { - println!("Printing message in test: {:?}", msg); - } + // TODO: (START BLOCK) It would be nice to get this block to be a single + // function that isn't copy and pasted from above. + let messager = agent.messager.take().unwrap(); + let message_stream = messager + .stream() + .map(|msg| serde_json::to_string(&msg).unwrap_or_else(|e| e.to_string())); + let eth_event_stream = agent.event_streamer.take().unwrap().stream(); + + let mut event_stream: Pin + Send + '_>> = + if let Some(event_stream) = eth_event_stream { + trace!("Merging event streams."); + let all_streams = vec![ + Box::pin(message_stream) as Pin + Send>>, + Box::pin(event_stream), + ]; + Box::pin(futures::stream::select_all(all_streams)) + } else { + trace!("Agent only sees message stream."); + Box::pin(message_stream) + }; + // TODO: (END BLOCK) + + let outside_messager = world.messager.join_with_id(None); + let message_task = tokio::spawn(async move { + for _ in 0..5 { + outside_messager + .send(Message { + from: "god".to_string(), + to: messager::To::All, + data: "hello".to_string(), + }) + .await; + } + }); + + let eth_event_task = tokio::spawn(async move { + for i in 0..5 { + if i == 0 { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + arb.approve(address, U256::from(1)) + .send() + .await + .unwrap() + .await + .unwrap(); + } + }); + + let mut idx = 0; + let print_task = tokio::spawn(async move { + while let Some(msg) = event_stream.next().await { + println!("Printing message in test: {:?}", msg); + if idx < 5 { + assert_eq!(msg, "{\"from\":\"god\",\"to\":\"All\",\"data\":\"hello\"}"); + } else { + assert_eq!(msg, "{\"ApprovalFilter\":{\"owner\":\"0xe7a46f3d9f0e9b9c02f58f95e3bcee2db54050b0\",\"spender\":\"0xe7a46f3d9f0e9b9c02f58f95e3bcee2db54050b0\",\"amount\":\"0x1\"}}"); + } + idx += 1; + if idx == 10 { + break; + } + } + }); + join_all(vec![message_task, eth_event_task, print_task]).await; } } diff --git a/arbiter-engine/src/examples/timed_message.rs b/arbiter-engine/src/examples/timed_message.rs index 919ad041..74d95748 100644 --- a/arbiter-engine/src/examples/timed_message.rs +++ b/arbiter-engine/src/examples/timed_message.rs @@ -2,8 +2,14 @@ const AGENT_ID: &str = "agent"; +use std::time::Duration; + +use tokio::time::timeout; + +use self::machine::MachineHalt; use super::*; use crate::{ + agent::Agent, machine::{Behavior, Engine, State, StateMachine}, messager::To, world::World, @@ -11,22 +17,35 @@ use crate::{ struct TimedMessage { delay: u64, - message: Message, + receive_data: String, + send_data: String, + messager: Messager, + count: u64, + max_count: Option, } #[async_trait::async_trait] impl Behavior for TimedMessage { - async fn process(&mut self, event: Message) { + async fn process(&mut self, event: Message) -> Option { trace!("Processing event."); - let message = Message { - from: "agent".to_owned(), - to: To::Agent("agent".to_owned()), - data: "Hello, world!".to_owned(), - }; + if event.data == self.receive_data { + trace!("Event matches message. Sending a new message."); + let message = Message { + from: self.messager.id.clone().unwrap(), + to: To::All, + data: self.send_data.clone(), + }; + self.messager.send(message).await; + self.count += 1; + } + if self.count == self.max_count.unwrap_or(u64::MAX) { + warn!("Reached max count. Halting behavior."); + return Some(MachineHalt); + } tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await; - // TODO: send a message trace!("Processed event."); + None } async fn sync(&mut self) { @@ -42,50 +61,194 @@ impl Behavior for TimedMessage { } } -// // TODO: Can we combine the `world.run().await` through the `for task in -// tasks // {task.await}` step to make this DEVX super easy TODO: Having -// something like // an automatic impl of Start and Stop for all behaviors -// would be nice or load // that in as a default behavior of agents or -// something. -#[ignore = "This is a work in progress and does not work and does not ever terminate."] -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn echoer() { - std::env::set_var("RUST_LOG", "trace"); - tracing_subscriber::fmt::init(); + // std::env::set_var("RUST_LOG", "trace"); + // tracing_subscriber::fmt::init(); let mut world = World::new("world"); - let agent = world.create_agent(AGENT_ID); - + let agent = Agent::new(AGENT_ID, &world); let behavior = TimedMessage { - delay: 2, - message: Message { - from: "agent".to_owned(), - to: To::Agent("agent".to_owned()), - data: "Hello, world!".to_owned(), - }, + delay: 1, + receive_data: "Hello, world!".to_owned(), + send_data: "Hello, world!".to_owned(), + messager: agent + .messager + .as_ref() + .unwrap() + .join_with_id(Some(AGENT_ID.to_owned())), + count: 0, + max_count: Some(2), }; - agent.add_behavior(behavior); + world.add_agent(agent.with_behavior(behavior)); - tracing::debug!("Starting world."); - let messager = world.messager.clone(); - world.run_state(State::Syncing); - world.transition().await; + let messager = world.messager.join_with_id(Some("god".to_owned())); + let task = world.run(); - world.run_state(State::Startup); - world.transition().await; + let message = Message { + from: "god".to_owned(), + to: To::Agent("agent".to_owned()), + data: "Hello, world!".to_owned(), + }; + messager.send(message).await; + task.await; + + let mut stream = Box::pin(messager.stream()); + let mut idx = 0; + + loop { + match timeout(Duration::from_secs(1), stream.next()).await { + Ok(Some(event)) => { + println!("Event received in outside world: {:?}", event); + idx += 1; + if idx == 2 { + break; + } + } + _ => { + panic!("Timeout reached. Test failed."); + } + } + } +} - world.run_state(State::Processing); +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn ping_pong() { + // std::env::set_var("RUST_LOG", "trace"); + // tracing_subscriber::fmt::init(); - let message = Message { - from: "agent".to_owned(), + let mut world = World::new("world"); + + let agent = Agent::new(AGENT_ID, &world); + let behavior_ping = TimedMessage { + delay: 1, + receive_data: "pong".to_owned(), + send_data: "ping".to_owned(), + messager: agent + .messager + .as_ref() + .unwrap() + .join_with_id(Some(AGENT_ID.to_owned())), + count: 0, + max_count: Some(2), + }; + let behavior_pong = TimedMessage { + delay: 1, + receive_data: "ping".to_owned(), + send_data: "pong".to_owned(), + messager: agent + .messager + .as_ref() + .unwrap() + .join_with_id(Some(AGENT_ID.to_owned())), + count: 0, + max_count: Some(2), + }; + + world.add_agent( + agent + .with_behavior(behavior_ping) + .with_behavior(behavior_pong), + ); + + let messager = world.messager.join_with_id(Some("god".to_owned())); + let task = world.run(); + + let init_message = Message { + from: "god".to_owned(), to: To::Agent("agent".to_owned()), - data: "Start".to_owned(), + data: "ping".to_owned(), + }; + messager.send(init_message).await; + + task.await; + + let mut stream = Box::pin(messager.stream()); + let mut idx = 0; + + loop { + match timeout(Duration::from_secs(1), stream.next()).await { + Ok(Some(event)) => { + println!("Event received in outside world: {:?}", event); + idx += 1; + if idx == 4 { + break; + } + } + _ => { + panic!("Timeout reached. Test failed."); + } + } + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn ping_pong_two_agent() { + // std::env::set_var("RUST_LOG", "trace"); + // tracing_subscriber::fmt::init(); + + let mut world = World::new("world"); + + let agent_ping = Agent::new("agent_ping", &world); + let behavior_ping = TimedMessage { + delay: 1, + receive_data: "pong".to_owned(), + send_data: "ping".to_owned(), + messager: agent_ping + .messager + .as_ref() + .unwrap() + .join_with_id(Some("agent_ping".to_owned())), + count: 0, + max_count: Some(2), + }; + + let agent_pong = Agent::new("agent_pong", &world); + let behavior_pong = TimedMessage { + delay: 1, + receive_data: "ping".to_owned(), + send_data: "pong".to_owned(), + messager: agent_pong + .messager + .as_ref() + .unwrap() + .join_with_id(Some("agent_pong".to_owned())), + count: 0, + max_count: Some(2), }; - let send_result = messager.send(message).await; - tracing::debug!("Start message sent {:?}", send_result); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + world.add_agent(agent_ping.with_behavior(behavior_ping)); + world.add_agent(agent_pong.with_behavior(behavior_pong)); + + let messager = world.messager.join_with_id(Some("god".to_owned())); + let task = world.run(); - world.transition().await; + let init_message = Message { + from: "god".to_owned(), + to: To::All, + data: "ping".to_owned(), + }; + + messager.send(init_message).await; + + task.await; + + let mut stream = Box::pin(messager.stream()); + let mut idx = 0; + + loop { + match timeout(Duration::from_secs(1), stream.next()).await { + Ok(Some(event)) => { + println!("Event received in outside world: {:?}", event); + idx += 1; + if idx == 5 { + break; + } + } + _ => { + panic!("Timeout reached. Test failed."); + } + } + } } diff --git a/arbiter-engine/src/examples/token_minter.rs b/arbiter-engine/src/examples/token_minter.rs index 65886fe9..f75f7ef4 100644 --- a/arbiter-engine/src/examples/token_minter.rs +++ b/arbiter-engine/src/examples/token_minter.rs @@ -1,335 +1,384 @@ -// use artemis_core::executors::mempool_executor::MempoolExecutor; -// use ethers::types::{transaction::request, Filter}; -// use tracing::error; - -// use super::*; -// use crate::{agent::BehaviorBuilder, messager::To}; - -// const TOKEN_ADMIN_ID: &str = "token_admin"; -// const REQUESTER_ID: &str = "requester"; -// const TOKEN_NAME: &str = "Arbiter Token"; -// const TOKEN_SYMBOL: &str = "ARB"; -// const TOKEN_DECIMALS: u8 = 18; - -// /// The token admin is responsible for handling token minting requests. -// #[derive(Clone, Debug)] -// pub struct TokenAdmin { -// /// The identifier of the token admin. -// pub id: String, // TODO: The strategies should not really need an ID. - -// pub token_data: HashMap, - -// /// The tokens that the token admin has control over. -// /// These will be deployed when we call `sync_state()` -// pub tokens: Option>>, - -// pub client: Arc, -// } - -// impl TokenAdmin { -// // TODO: I don't think we should pass in a client like this, probably, -// doing it // for testing purposes. Also using RevmMiddleware for testing -// purposes, // although this strategy should never be deployed -// /// Creates a new token admin with the given identifier. -// pub fn new(client: Arc) -> Self { -// Self { -// id: TOKEN_ADMIN_ID.to_owned(), -// token_data: HashMap::new(), -// tokens: None, -// client, -// } -// } - -// /// Adds a token to the token admin. -// pub fn add_token(&mut self, token_data: TokenData) { -// self.token_data.insert(token_data.name.clone(), token_data); -// } -// } - -// #[derive(Clone, Debug, Deserialize, Serialize)] -// pub struct TokenData { -// pub name: String, -// pub symbol: String, -// pub decimals: u8, -// pub address: Option
, -// } - -// /// Used as an action to ask what tokens are available. -// #[derive(Clone, Debug, Deserialize, Serialize)] -// pub enum TokenAdminQuery { -// /// Get the address of the token. -// AddressOf(String), - -// /// Mint tokens. -// MintRequest(MintRequest), -// } - -// /// Used as an action to mint tokens. -// #[derive(Clone, Debug, Deserialize, Serialize)] -// pub struct MintRequest { -// /// The token to mint. -// pub token: String, - -// /// The address to mint to. -// pub mint_to: Address, - -// /// The amount to mint. -// pub mint_amount: u64, -// } - -// #[async_trait::async_trait] -// impl Strategy for TokenAdmin { -// #[tracing::instrument(skip(self), fields(id = %self.id))] -// async fn sync_state(&mut self) -> Result<()> { -// debug!("Syncing state for `TokenAdmin`."); -// for token_data in self.token_data.values_mut() { -// let token = ArbiterToken::deploy( -// self.client.clone(), -// ( -// token_data.name.clone(), -// token_data.symbol.clone(), -// token_data.decimals, -// ), -// ) -// .unwrap() -// .send() -// .await -// .unwrap(); -// token_data.address = Some(token.address()); -// self.tokens -// .get_or_insert_with(HashMap::new) -// .insert(token_data.name.clone(), token.clone()); -// trace!("Deployed token: {:?}", token); -// } -// Ok(()) -// } - -// #[tracing::instrument(skip(self, event), fields(id = %self.id))] -// async fn process_event(&mut self, event: Message) -> Vec { -// trace!("Processing event for `TokenAdmin` {:?}.", event); -// if self.tokens.is_none() { -// error!("There were no tokens to deploy! You must add tokens to -// the token admin before running the simulation."); } - -// let query: TokenAdminQuery = -// serde_json::from_str(&event.data).unwrap(); trace!("Got query: {:?}", -// query); match query { -// TokenAdminQuery::AddressOf(token_name) => { -// trace!( -// "Getting address of token with name: {:?}", -// token_name.clone() -// ); -// let token_data = self.token_data.get(&token_name).unwrap(); -// let message = Message { -// from: self.id.to_owned(), -// to: To::Agent(event.from.clone()), // Reply back to -// sender data: serde_json::to_string(token_data).unwrap(), -// }; -// vec![MessageOrTx::Message(message)] -// } -// TokenAdminQuery::MintRequest(mint_request) => { -// trace!("Minting tokens: {:?}", mint_request); -// let token = self -// .tokens -// .as_ref() -// .unwrap() -// .get(&mint_request.token) -// .unwrap(); -// let tx = token -// .mint(mint_request.mint_to, -// U256::from(mint_request.mint_amount)) .tx; - -// vec![MessageOrTx::Tx(tx)] -// } -// } -// } -// } - -// /// The token requester is responsible for requesting tokens from the token -// /// admin. This agents is purely for testing purposes as far as I can tell. -// #[derive(Clone, Debug)] -// pub struct TokenRequester { -// /// The identifier of the token requester. -// pub id: String, - -// /// The tokens that the token requester has requested. -// pub token_data: TokenData, - -// /// The agent ID to request tokens to. -// pub request_to: String, - -// /// Client to have an address to receive token mint to and check balance -// pub client: Arc, -// } - -// impl TokenRequester { -// pub fn new(id: &str, client: Arc) -> Self { -// Self { -// id: id.to_owned(), -// token_data: TokenData { -// name: TOKEN_NAME.to_owned(), -// symbol: TOKEN_SYMBOL.to_owned(), -// decimals: TOKEN_DECIMALS, -// address: None, -// }, -// request_to: TOKEN_ADMIN_ID.to_owned(), -// client, -// } -// } -// } - -// #[async_trait::async_trait] -// impl Strategy for TokenRequester { -// #[tracing::instrument(skip(self), fields(id = %self.id))] -// async fn sync_state(&mut self) -> Result<()> { -// trace!("Syncing state for `TokenRequester` startup."); -// Ok(()) -// } - -// #[tracing::instrument(skip(self, event), fields(id = %self.id))] -// async fn process_event(&mut self, event: Message) -> Vec { -// trace!("Processing event for `TokenRequester` startup {:?}.", event); - -// if event.data == "Start" { -// trace!("Requesting address of token: {:?}", -// self.token_data.name); let message = Message { -// from: self.id.to_owned(), -// to: To::Agent(self.request_to.clone()), -// data: serde_json::to_string(&TokenAdminQuery::AddressOf( -// self.token_data.name.clone(), -// )) -// .unwrap(), -// }; -// vec![message] -// } else if event.data == "Mint" { -// trace!("Requesting mint of token: {:?}", self.token_data.name); -// let message = Message { -// from: self.id.to_owned(), -// to: To::Agent(self.request_to.clone()), -// data: -// serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { -// token: self.token_data.name.clone(), mint_to: -// self.client.address(), mint_amount: 1, -// })) -// .unwrap(), -// }; -// vec![message] -// } else { -// vec![] -// } -// } -// } - -// #[async_trait::async_trait] -// impl Strategy for TokenRequester { -// #[tracing::instrument(skip(self), fields(id = %self.id))] -// async fn sync_state(&mut self) -> Result<()> { -// trace!("Syncing state for `TokenRequester` logger."); -// Ok(()) -// } - -// #[tracing::instrument(skip(self, event), fields(id = %self.id))] -// async fn process_event(&mut self, event: Log) -> Vec { -// trace!("Got event for `TokenRequester` logger: {:?}", event); -// std::thread::sleep(std::time::Duration::from_secs(1)); -// let message = Message { -// from: self.id.clone(), -// to: To::Agent(self.request_to.clone()), -// data: -// serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { -// token: self.token_data.name.clone(), mint_to: -// self.client.address(), mint_amount: 1, -// })) -// .unwrap(), -// }; -// vec![message] -// } -// } - -// #[ignore] -// #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -// async fn token_minter_simulation() { -// // TODO: Test outline, requester requests 1, 2, 3 tokens, and the test -// stops // when the balance is checked to be 6. -// std::env::set_var("RUST_LOG", "trace"); -// tracing_subscriber::fmt::init(); - -// let environment = EnvironmentBuilder::new().build(); -// let connection = Connection::from(&environment); -// let provider = Provider::new(connection); -// let mut world = World::new("test_world", provider); - -// // Create the token admin agent -// let token_admin_agent = world.create_agent(TOKEN_ADMIN_ID); -// let token_admin_client = RevmMiddleware::new(&environment, -// Some(TOKEN_ADMIN_ID)).unwrap(); let mut token_admin_strategy = -// TokenAdmin::new(token_admin_client.clone()); token_admin_strategy. -// add_token(TokenData { name: TOKEN_NAME.to_owned(), -// symbol: TOKEN_SYMBOL.to_owned(), -// decimals: TOKEN_DECIMALS, -// address: None, -// }); - -// let message_and_mempool_executor = MessageAndTransactionExecutor { -// messager: token_admin_agent.messager.clone(), -// transactor: Transactor { -// client: token_admin_client.clone(), -// }, -// }; - -// let token_admin_behavior = BehaviorBuilder::new() -// .add_collector(token_admin_agent.messager.clone()) -// .add_executor(message_and_mempool_executor) -// .add_strategy(token_admin_strategy.clone()) -// .build(); -// token_admin_agent.add_behavior(token_admin_behavior); - -// // Create the token requester agent -// let requester_agent = world.create_agent(REQUESTER_ID); -// let requester_client = RevmMiddleware::new(&environment, -// Some(REQUESTER_ID)).unwrap(); let token_requester = -// TokenRequester::new(REQUESTER_ID, requester_client.clone()); -// let query_behavior = BehaviorBuilder::new() -// .add_collector(requester_agent.messager.clone()) -// .add_executor(requester_agent.messager.clone()) -// .add_strategy(token_requester.clone()) -// .build(); -// requester_agent.add_behavior(query_behavior); -// let mint_behavior = BehaviorBuilder::new() -// .add_collector(LogCollector::new( -// requester_client.clone(), -// Filter::default(), -// // token_admin_strategy -// // .tokens -// // .as_ref() -// // .unwrap() -// // .get(&TOKEN_NAME.to_owned()) -// // .unwrap() -// // .transfer_filter() -// // .filter, -// )) -// .add_executor(requester_agent.messager.clone()) -// .add_strategy(token_requester) -// .build(); -// requester_agent.add_behavior(mint_behavior); - -// // Run the world and send the start message - -// let message = Message { -// from: "host".to_owned(), -// to: To::Agent(REQUESTER_ID.to_owned()), -// data: "Start".to_owned(), -// }; -// // TODO: Messages like this could probably be put in the `world.run()` -// world.messager.execute(message).await; - -// let message = Message { -// from: "host".to_owned(), -// to: To::Agent(REQUESTER_ID.to_owned()), -// data: "Mint".to_owned(), -// }; -// world.messager.execute(message).await; - -// let tasks = world.run().await; -// world.join().await; -// } +use std::{str::FromStr, time::Duration}; + +use anyhow::Context; +use arbiter_bindings::bindings::arbiter_token; +use arbiter_core::data_collection::EventLogger; +use ethers::{ + abi::token, + types::{transaction::request, Filter}, +}; +use tokio::time::timeout; +use tracing::error; + +use self::machine::MachineHalt; +use super::*; +use crate::{ + agent::Agent, + machine::{Behavior, MachineInstruction, StateMachine}, + messager::To, + world::World, +}; + +const TOKEN_ADMIN_ID: &str = "token_admin"; +const REQUESTER_ID: &str = "requester"; +const TOKEN_NAME: &str = "Arbiter Token"; +const TOKEN_SYMBOL: &str = "ARB"; +const TOKEN_DECIMALS: u8 = 18; + +/// The token admin is responsible for handling token minting requests. +#[derive(Debug)] +pub struct TokenAdmin { + /// The identifier of the token admin. + pub token_data: HashMap, + + pub tokens: Option>>, + + // TODO: We should not have to have a client or a messager put here + // explicitly, they should come from the Agent the behavior is given to. + pub client: Arc, + pub messager: Messager, + + count: u64, + + max_count: Option, +} + +impl TokenAdmin { + pub fn new( + client: Arc, + messager: Messager, + count: u64, + max_count: Option, + ) -> Self { + Self { + token_data: HashMap::new(), + tokens: None, + client, + messager, + count, + max_count, + } + } + + /// Adds a token to the token admin. + pub fn add_token(&mut self, token_data: TokenData) { + self.token_data.insert(token_data.name.clone(), token_data); + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct TokenData { + pub name: String, + pub symbol: String, + pub decimals: u8, + pub address: Option
, +} + +/// Used as an action to ask what tokens are available. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum TokenAdminQuery { + /// Get the address of the token. + AddressOf(String), + + /// Mint tokens. + MintRequest(MintRequest), +} + +/// Used as an action to mint tokens. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MintRequest { + /// The token to mint. + pub token: String, + + /// The address to mint to. + pub mint_to: Address, + + /// The amount to mint. + pub mint_amount: u64, +} + +#[async_trait::async_trait] +impl Behavior for TokenAdmin { + #[tracing::instrument(skip(self), fields(id = +self.messager.id.as_deref()))] + async fn sync(&mut self) { + for token_data in self.token_data.values_mut() { + let token = ArbiterToken::deploy( + self.client.clone(), + ( + token_data.name.clone(), + token_data.symbol.clone(), + token_data.decimals, + ), + ) + .unwrap() + .send() + .await + .unwrap(); + token_data.address = Some(token.address()); + self.tokens + .get_or_insert_with(HashMap::new) + .insert(token_data.name.clone(), token.clone()); + debug!("Deployed token: {:?}", token); + } + } + + #[tracing::instrument(skip(self), fields(id = +self.messager.id.as_deref()))] + async fn process(&mut self, event: Message) -> Option { + if self.tokens.is_none() { + error!( + "There were no tokens to deploy! You must add tokens to +the token admin before running the simulation." + ); + } + + let query: TokenAdminQuery = serde_json::from_str(&event.data).unwrap(); + trace!("Got query: {:?}", query); + match query { + TokenAdminQuery::AddressOf(token_name) => { + trace!( + "Getting address of token with name: {:?}", + token_name.clone() + ); + let token_data = self.token_data.get(&token_name).unwrap(); + let message = Message { + from: self.messager.id.clone().unwrap(), + to: To::Agent(event.from.clone()), // Reply back to sender + data: serde_json::to_string(token_data).unwrap(), + }; + self.messager.send(message).await; + } + TokenAdminQuery::MintRequest(mint_request) => { + trace!("Minting tokens: {:?}", mint_request); + let token = self + .tokens + .as_ref() + .unwrap() + .get(&mint_request.token) + .unwrap(); + token + .mint(mint_request.mint_to, U256::from(mint_request.mint_amount)) + .send() + .await + .unwrap() + .await + .unwrap(); + self.count += 1; + if self.count == self.max_count.unwrap_or(u64::MAX) { + warn!("Reached max count. Halting behavior."); + return Some(MachineHalt); + } + } + } + None + } +} + +/// The token requester is responsible for requesting tokens from the token +/// admin. This agents is purely for testing purposes as far as I can tell. +#[derive(Debug)] +pub struct TokenRequester { + /// The tokens that the token requester has requested. + pub token_data: TokenData, + + /// The agent ID to request tokens to. + pub request_to: String, + + /// Client to have an address to receive token mint to and check balance + pub client: Arc, + + /// The messaging layer for the token requester. + pub messager: Messager, + + pub count: u64, + + pub max_count: Option, +} + +impl TokenRequester { + pub fn new( + client: Arc, + messager: Messager, + count: u64, + max_count: Option, + ) -> Self { + Self { + token_data: TokenData { + name: TOKEN_NAME.to_owned(), + symbol: TOKEN_SYMBOL.to_owned(), + decimals: TOKEN_DECIMALS, + address: None, + }, + request_to: TOKEN_ADMIN_ID.to_owned(), + client, + messager, + count, + max_count, + } + } +} + +#[async_trait::async_trait] +impl Behavior for TokenRequester { + #[tracing::instrument(skip(self), fields(id = +self.messager.id.as_deref()))] + async fn startup(&mut self) { + trace!("Requesting address of token: {:?}", self.token_data.name); + let message = Message { + from: self.messager.id.clone().unwrap(), + to: To::Agent(self.request_to.clone()), + data: serde_json::to_string(&TokenAdminQuery::AddressOf(self.token_data.name.clone())) + .unwrap(), + }; + self.messager.send(message).await; + } + + #[tracing::instrument(skip(self), fields(id = +self.messager.id.as_deref()))] + async fn process(&mut self, event: Message) -> Option { + if let Ok(token_data) = serde_json::from_str::(&event.data) { + trace!( + "Got +token data: {:?}", + token_data + ); + trace!( + "Requesting first mint of +token: {:?}", + self.token_data.name + ); + let message = Message { + from: self.messager.id.clone().unwrap(), + to: To::Agent(self.request_to.clone()), + data: serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { + token: self.token_data.name.clone(), + mint_to: self.client.address(), + mint_amount: 1, + })) + .unwrap(), + }; + self.messager.send(message).await; + } + Some(MachineHalt) + } +} + +#[async_trait::async_trait] +impl Behavior for TokenRequester { + #[tracing::instrument(skip(self), fields(id = +self.messager.id.as_deref()))] + async fn process(&mut self, event: arbiter_token::TransferFilter) -> Option { + trace!( + "Got event for +`TokenRequester` logger: {:?}", + event + ); + std::thread::sleep(std::time::Duration::from_secs(1)); + let message = Message { + from: self.messager.id.clone().unwrap(), + to: To::Agent(self.request_to.clone()), + data: serde_json::to_string(&TokenAdminQuery::MintRequest(MintRequest { + token: self.token_data.name.clone(), + mint_to: self.client.address(), + mint_amount: 1, + })) + .unwrap(), + }; + self.messager.send(message).await; + self.count += 1; + if self.count == self.max_count.unwrap_or(u64::MAX) { + warn!("Reached max count. Halting behavior."); + return Some(MachineHalt); + } + None + } +} + +#[ignore] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn token_minter_simulation() { + // std::env::set_var("RUST_LOG", "trace"); + // tracing_subscriber::fmt::init(); + + let mut world = World::new("test_world"); + + // Create the token admin agent + let token_admin = Agent::new(TOKEN_ADMIN_ID, &world); + let mut token_admin_behavior = TokenAdmin::new( + token_admin.client.clone(), + token_admin + .messager + .as_ref() + .unwrap() + .join_with_id(Some(TOKEN_ADMIN_ID.to_owned())), + 0, + Some(4), + ); + token_admin_behavior.add_token(TokenData { + name: TOKEN_NAME.to_owned(), + symbol: TOKEN_SYMBOL.to_owned(), + decimals: TOKEN_DECIMALS, + address: None, + }); + world.add_agent(token_admin.with_behavior(token_admin_behavior)); + + // Create the token requester agent + let token_requester = Agent::new(REQUESTER_ID, &world); + let token_requester_behavior = TokenRequester::new( + token_requester.client.clone(), + token_requester + .messager + .as_ref() + .unwrap() + .join_with_id(Some(REQUESTER_ID.to_owned())), + 0, + Some(4), + ); + let arb = ArbiterToken::new( + Address::from_str("0x240a76d4c8a7dafc6286db5fa6b589e8b21fc00f").unwrap(), + token_requester.client.clone(), + ); + let transfer_event = arb.transfer_filter(); + + let token_requester_behavior_again = TokenRequester::new( + token_requester.client.clone(), + token_requester + .messager + .as_ref() + .unwrap() + .join_with_id(Some(REQUESTER_ID.to_owned())), + 0, + Some(4), + ); + world.add_agent( + token_requester + .with_behavior::(token_requester_behavior) + .with_behavior::(token_requester_behavior_again) + .with_event(transfer_event), + ); + + let transfer_stream = EventLogger::builder() + .add_stream(arb.transfer_filter()) + .stream() + .unwrap(); + let mut stream = Box::pin(transfer_stream); + let mut idx = 0; + + world.run().await; + + loop { + match timeout(Duration::from_secs(1), stream.next()).await { + Ok(Some(event)) => { + println!("Event received in outside world: {:?}", event); + idx += 1; + if idx == 4 { + break; + } + } + _ => { + panic!("Timeout reached. Test failed."); + } + } + } +} diff --git a/arbiter-engine/src/machine.rs b/arbiter-engine/src/machine.rs index 6a9cb750..c16cd623 100644 --- a/arbiter-engine/src/machine.rs +++ b/arbiter-engine/src/machine.rs @@ -3,14 +3,38 @@ use std::fmt::Debug; -use async_broadcast::Receiver; use serde::de::DeserializeOwned; -use tokio::task::JoinHandle; +use tokio::sync::broadcast::Receiver; use super::*; +/// The instructions that can be sent to a [`StateMachine`]. +#[derive(Clone, Copy, Debug)] +pub enum MachineInstruction { + /// Used to make a [`StateMachine`] sync with the world. + Sync, + + /// Used to make a [`StateMachine`] start up. + Start, + + /// Used to make a [`StateMachine`] process events. + /// This will offload the process into a task that can be halted by sending + /// a [`MachineHalt`] message from the [`Messager`]. For our purposes, the + /// [`crate::world::World`] will handle this. + Process, + + /// Used to make a [`StateMachine`] stop. Only applicable for the + /// [`crate::world::World`] currently. + Stop, +} + +/// The message that can be used in a [`StateMachine`] to halt its processing. +/// Optionally returned by [`Behavior::process`] to close tasks. +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct MachineHalt; + /// The state used by any entity implementing [`StateMachine`]. -#[derive(Debug, Copy, Clone)] +#[derive(Clone, Copy, Debug)] pub enum State { /// The entity is not yet running any process. /// This is the state adopted by the entity when it is first created. @@ -26,7 +50,7 @@ pub enum State { /// This is where the entity can engage in its specific start up activities /// that it can do given the current state of the world. /// These are usually quick one-shot activities that are not repeated. - Startup, + Starting, /// The entity is processing. /// This is where the entity can engage in its specific processing @@ -47,23 +71,22 @@ pub enum State { pub trait Behavior: Send + Sync + 'static { /// Used to bring the agent back up to date with the latest state of the /// world. This could be used if the world was stopped and later restarted. - async fn sync(&mut self); + async fn sync(&mut self) {} /// Used to start the agent. /// This is where the agent can engage in its specific start up activities /// that it can do given the current state of the world. - async fn startup(&mut self); + async fn startup(&mut self) {} /// Used to process events. /// This is where the agent can engage in its specific processing /// of events that can lead to actions being taken. - async fn process(&mut self, event: E); + async fn process(&mut self, event: E) -> Option; } #[async_trait::async_trait] pub(crate) trait StateMachine: Send + Sync + 'static { - fn run_state(&mut self, state: State); - async fn transition(&mut self); + async fn execute(&mut self, instruction: MachineInstruction); } /// The idea of the [`Engine`] is that it drives the [`Behavior`] of a @@ -81,14 +104,13 @@ where /// The behavior the [`Engine`] runs. pub behavior: Option, - /// When the state of the [`Engine`] is modified, the tasks will be run (and - /// therefore `move`d) concurrently and stored here. - behavior_task: Option>, + /// The current state of the [`Engine`]. + pub state: State, /// The receiver of events that the [`Engine`] will process. /// The [`State::Processing`] stage will attempt a decode of the [`String`]s /// into the event type ``. - event_receiver: Receiver, + event_receiver: Option>, phantom: std::marker::PhantomData, } @@ -102,8 +124,8 @@ where pub(crate) fn new(behavior: B, event_receiver: Receiver) -> Self { Self { behavior: Some(behavior), - behavior_task: None, - event_receiver, + state: State::Uninitialized, + event_receiver: Some(event_receiver), phantom: std::marker::PhantomData, } } @@ -115,57 +137,63 @@ where B: Behavior, E: DeserializeOwned + Send + Sync + Debug + 'static, { - fn run_state(&mut self, state: State) { - match state { - State::Uninitialized => { - unimplemented!("This never gets called.") - } - State::Syncing => { + async fn execute(&mut self, instruction: MachineInstruction) { + match instruction { + MachineInstruction::Sync => { trace!("Behavior is syncing."); + self.state = State::Syncing; let mut behavior = self.behavior.take().unwrap(); - self.behavior_task = Some(tokio::spawn(async move { + let behavior_task = tokio::spawn(async move { behavior.sync().await; behavior - })); + }); + self.behavior = Some(behavior_task.await.unwrap()); } - State::Startup => { + MachineInstruction::Start => { trace!("Behavior is starting up."); + self.state = State::Starting; let mut behavior = self.behavior.take().unwrap(); - self.behavior_task = Some(tokio::spawn(async move { + let behavior_task = tokio::spawn(async move { behavior.startup().await; behavior - })); + }); + self.behavior = Some(behavior_task.await.unwrap()); } - State::Processing => { + MachineInstruction::Process => { trace!("Behavior is processing."); let mut behavior = self.behavior.take().unwrap(); - let mut receiver = self.event_receiver.clone(); // TODO Could use Option::take() if we don't want to clone. - self.behavior_task = Some(tokio::spawn(async move { + let mut receiver = self.event_receiver.take().unwrap(); + let behavior_task = tokio::spawn(async move { while let Ok(event) = receiver.recv().await { - trace!("Behavior has gotten event: {:?}", event); let decoding_result = serde_json::from_str::(&event); match decoding_result { - Ok(event) => behavior.process(event).await, - Err(e) => { - tracing::error!("Error decoding event: {:?}", e); - continue; + Ok(event) => { + let halt_option = behavior.process(event).await; + if halt_option.is_some() { + break; + } } + Err(_) => match serde_json::from_str::(&event) { + Ok(_) => { + warn!("Behavior received `MachineHalt` message. Breaking!"); + break; + } + Err(_) => { + trace!( + "Event received by behavior that could not be deserialized." + ); + continue; + } + }, } } behavior - })); + }); + self.behavior = Some(behavior_task.await.unwrap()); } - State::Stopped => { - todo!() + MachineInstruction::Stop => { + unreachable!("This is never explicitly called on an engine.") } } } - - /// Take the task and wait for it to finish. Then take the [`Behavior`] and - /// put it back into the engine. - async fn transition(&mut self) { - let behavior_task = self.behavior_task.take().unwrap(); - let behavior = behavior_task.await.unwrap(); - self.behavior = Some(behavior); - } } diff --git a/arbiter-engine/src/messager.rs b/arbiter-engine/src/messager.rs index c2973c8e..290fd6f3 100644 --- a/arbiter-engine/src/messager.rs +++ b/arbiter-engine/src/messager.rs @@ -1,9 +1,7 @@ //! The messager module contains the core messager layer for the Arbiter Engine. -use std::pin::Pin; - -use async_broadcast::{broadcast, Receiver as BroadcastReceiver, Sender as BroadcastSender}; use futures_util::Stream; +use tokio::sync::broadcast::{channel, Receiver, Sender}; use super::*; @@ -32,14 +30,14 @@ pub enum To { } /// A messager that can be used to send messages between agents. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct Messager { /// The identifier of the entity that is using the messager. pub id: Option, - pub(crate) broadcast_sender: BroadcastSender, + pub(crate) broadcast_sender: Sender, - broadcast_receiver: BroadcastReceiver, + broadcast_receiver: Option>, } impl Messager { @@ -49,10 +47,10 @@ impl Messager { /// Creates a new messager with the given capacity. #[allow(clippy::new_without_default)] pub fn new() -> Self { - let (broadcast_sender, broadcast_receiver) = broadcast(512); + let (broadcast_sender, broadcast_receiver) = channel(512); Self { broadcast_sender, - broadcast_receiver, + broadcast_receiver: Some(broadcast_receiver), id: None, } } @@ -62,16 +60,16 @@ impl Messager { pub(crate) fn for_agent(&self, id: &str) -> Self { Self { broadcast_sender: self.broadcast_sender.clone(), - broadcast_receiver: self.broadcast_receiver.clone(), + broadcast_receiver: Some(self.broadcast_sender.subscribe()), id: Some(id.to_owned()), } } /// Returns a stream of messages that are either sent to [`To::All`] or to /// the agent via [`To::Agent(id)`]. - pub fn stream(&self) -> Pin + Send + '_>> { - let mut receiver = self.broadcast_receiver.clone(); - let stream = async_stream::stream! { + pub fn stream(mut self) -> impl Stream + Send { + let mut receiver = self.broadcast_receiver.take().unwrap(); + async_stream::stream! { while let Ok(message) = receiver.recv().await { match &message.to { To::All => { @@ -86,13 +84,22 @@ impl Messager { } } } - }; - Box::pin(stream) + } + } + + /// Returns a [`Messager`] interface connected to the same instance but with + /// the `id` provided. + pub fn join_with_id(&self, id: Option) -> Messager { + Messager { + broadcast_sender: self.broadcast_sender.clone(), + broadcast_receiver: Some(self.broadcast_sender.subscribe()), + id, + } } /// Sends a message to the messager. pub async fn send(&self, message: Message) { trace!("Sending message via messager."); - self.broadcast_sender.broadcast(message).await.unwrap(); + self.broadcast_sender.send(message).unwrap(); } } diff --git a/arbiter-engine/src/world.rs b/arbiter-engine/src/world.rs index 63d8a5fa..13b65d3a 100644 --- a/arbiter-engine/src/world.rs +++ b/arbiter-engine/src/world.rs @@ -10,14 +10,17 @@ // it owns a clone of the same messager. // * The worlds now are just going to be revm worlds. We can generalize this // later. +// * Can we give the world an address book?? // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ //! The world module contains the core world abstraction for the Arbiter Engine. use arbiter_core::environment::{builder::EnvironmentBuilder, Environment}; -use futures_util::future::{join_all, JoinAll}; -use tokio::task::JoinHandle; +use futures_util::future::join_all; +use tokio::sync::broadcast::Sender as BroadcastSender; +use tracing::info; +use self::machine::{MachineHalt, MachineInstruction}; use super::*; use crate::{ agent::Agent, @@ -55,18 +58,19 @@ pub struct World { /// The identifier of the world. pub id: String, + /// The state of the [`World`]. + pub state: State, + /// The agents in the world. pub agents: Option>, + agent_distributors: Option>>, + /// The environment for the world. pub environment: Environment, /// The messaging layer for the world. pub messager: Messager, - - /// Holds onto the tasks that represent the agent running a specific state - /// transition. - agent_tasks: Option>>, } impl World { @@ -74,8 +78,9 @@ impl World { pub fn new(id: &str) -> Self { Self { id: id.to_owned(), + state: State::Uninitialized, agents: Some(HashMap::new()), - agent_tasks: None, + agent_distributors: None, environment: EnvironmentBuilder::new().build(), messager: Messager::new(), } @@ -86,97 +91,126 @@ impl World { Self { id: id.to_owned(), agents: Some(HashMap::new()), - agent_tasks: None, + state: State::Uninitialized, + agent_distributors: None, environment, messager: Messager::new(), } } /// Adds an agent to the world. - pub fn create_agent(&mut self, id: &str) -> &mut Agent { - let agent = Agent::connect(id, self); + pub fn add_agent(&mut self, agent: Agent) { + let id = agent.id.clone(); let agents = self.agents.as_mut().unwrap(); agents.insert(id.to_owned(), agent); - agents.get_mut(id).unwrap() + } + + /// Runs the world through up to the [`State::Processing`] stage. + pub async fn run(&mut self) { + self.execute(MachineInstruction::Sync).await; + self.execute(MachineInstruction::Start).await; + self.execute(MachineInstruction::Process).await; + } + + /// Stops the world by stopping all the behaviors that each of the agents is + /// running. + pub async fn stop(&mut self) { + self.execute(MachineInstruction::Stop).await; } } +// TODO: Idea, when we enter the `State::Processing`, we should pass the task +// into the struct. When we call `MachineInstruction::Stop` we should do message +// passing that will kill the tasks so that they return. This will allow us to +// do graceful shutdowns. + +// TODO: Worth explaining how the process stage is offloaded so it is +// understandable. + +// Right now what we do is we send a HALT message via the agent's distributor +// which means all behaviors should receive this now. If those behaviors all see +// this HALT message and then exit their process, then the await should finish. +// Actually we can probably not have to get the distributors up this high, but +// let's work with this for now. + #[async_trait::async_trait] impl StateMachine for World { - fn run_state(&mut self, state: State) { - match state { - State::Uninitialized => { - unimplemented!("This never gets called.") - } - State::Syncing => { - trace!("World is syncing."); - let mut agents = self.agents.take().unwrap(); - for agent in agents.values_mut() { - agent.run_state(state); - } - self.agent_tasks = Some(join_all(agents.into_values().map(|mut agent| { + async fn execute(&mut self, instruction: MachineInstruction) { + match instruction { + MachineInstruction::Sync => { + info!("World is syncing."); + self.state = State::Syncing; + let agents = self.agents.take().unwrap(); + let agent_tasks = join_all(agents.into_values().map(|mut agent| { tokio::spawn(async move { - agent.transition().await; + agent.execute(instruction).await; agent }) - }))); + })); + self.agents = Some( + agent_tasks + .await + .into_iter() + .map(|res| { + let agent = res.unwrap(); + (agent.id.clone(), agent) + }) + .collect::>(), + ); } - State::Startup => { - trace!("World is starting up."); - let mut agents = self.agents.take().unwrap(); - for agent in agents.values_mut() { - agent.run_state(state); - } - self.agent_tasks = Some(join_all(agents.into_values().map(|mut agent| { + MachineInstruction::Start => { + info!("World is starting up."); + self.state = State::Starting; + let agents = self.agents.take().unwrap(); + let agent_tasks = join_all(agents.into_values().map(|mut agent| { tokio::spawn(async move { - agent.transition().await; + agent.execute(instruction).await; agent }) - }))); + })); + self.agents = Some( + agent_tasks + .await + .into_iter() + .map(|res| { + let agent = res.unwrap(); + (agent.id.clone(), agent) + }) + .collect::>(), + ); } - State::Processing => { - trace!("World is starting up."); - let mut agents = self.agents.take().unwrap(); - for agent in agents.values_mut() { - agent.run_state(state); - } - self.agent_tasks = Some(join_all(agents.into_values().map(|mut agent| { + MachineInstruction::Process => { + info!("World is processing."); + self.state = State::Processing; + let agents = self.agents.take().unwrap(); + let mut agent_distributors = vec![]; + let agent_processors = join_all(agents.into_values().map(|mut agent| { + agent_distributors.push(agent.distributor.0.clone()); tokio::spawn(async move { - agent.transition().await; + agent.execute(instruction).await; agent }) - }))); + })); + self.agent_distributors = Some(agent_distributors); + self.agents = Some( + agent_processors + .await + .into_iter() + .map(|res| { + let agent = res.unwrap(); + (agent.id.clone(), agent) + }) + .collect::>(), + ); } - State::Stopped => { - trace!("World is starting up."); - let mut agents = self.agents.take().unwrap(); - for agent in agents.values_mut() { - agent.run_state(state); + MachineInstruction::Stop => { + let halt = serde_json::to_string(&MachineHalt).unwrap(); + for tx in self.agent_distributors.take().unwrap() { + tx.send(halt.clone()).unwrap(); } - self.agent_tasks = Some(join_all(agents.into_values().map(|mut agent| { - tokio::spawn(async move { - agent.transition().await; - agent - }) - }))); } } } - - async fn transition(&mut self) { - self.agents = Some( - self.agent_tasks - .take() - .unwrap() - .await - .into_iter() - .map(|res| { - let agent = res.unwrap(); - (agent.id.clone(), agent) - }) - .collect::>(), - ); - } } #[cfg(test)]