diff --git a/Cargo.lock b/Cargo.lock index 8faa458b6..befd47356 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,13 +2,19 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "ahash" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom 0.2.6", + "getrandom 0.2.7", "once_cell", "version_check", ] @@ -22,11 +28,57 @@ dependencies = [ "winapi", ] +[[package]] +name = "anyhow" +version = "1.0.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb07d2053ccdbe10e2af2995a2f116c1330396493dc1269f6a91d0ae82e19704" + +[[package]] +name = "arrayvec" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" + +[[package]] +name = "async-compression" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "345fd392ab01f746c717b1357165b76f0b67a60192007b234058c9045fdcf695" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" -version = "0.1.53" +version = "0.1.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" +checksum = "96cf8829f67d2eab0b2dfa42c5d0ef737e0724e4a82b01b3e292456202b19716" dependencies = [ "proc-macro2", "quote", @@ -35,9 +87,9 @@ dependencies = [ [[package]] name = "atoi" -version = "0.4.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5" +checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" dependencies = [ "num-traits", ] @@ -61,9 +113,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.6" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2504b827a8bef941ba3dd64bdffe9cf56ca182908a147edd6189c95fbcae7d" +checksum = "d16705af05732b7d3258ec0f7b73c03a658a28925e050d8852d5b568ee8bcf4e" dependencies = [ "async-trait", "axum-core", @@ -73,7 +125,7 @@ dependencies = [ "http", "http-body", "hyper", - "itoa", + "itoa 1.0.2", "matchit", "memchr", "mime", @@ -92,9 +144,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.2.4" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da31c0ed7b4690e2c78fe4b880d21cd7db04a346ebc658b4270251b695437f17" +checksum = "e4f44a0e6200e9d11a1cdc989e4b358f6e3d354fbf48478f345a17f4e43f8635" dependencies = [ "async-trait", "bytes", @@ -149,6 +201,18 @@ dependencies = [ "sha2 0.9.9", ] +[[package]] +name = "bstr" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + [[package]] name = "bumpalo" version = "3.10.0" @@ -195,16 +259,16 @@ dependencies = [ [[package]] name = "clap" -version = "3.1.18" +version = "3.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2dbdf4bdacb33466e854ce889eee8dfd5729abf7ccd7664d0a2d60cd384440b" +checksum = "ab8b79fe3946ceb4a0b1c080b4018992b8d27e9ff363644c1c9b6387c854614d" dependencies = [ "atty", "bitflags", "clap_derive", "clap_lex", "indexmap", - "lazy_static", + "once_cell", "strsim", "termcolor", "textwrap", @@ -212,9 +276,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "3.1.18" +version = "3.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25320346e922cffe59c0bbc5410c8d8784509efb321488971081313cb1e1a33c" +checksum = "759bf187376e1afa7b85b959e6a664a3e7a95203415dba952ad19139e798f902" dependencies = [ "heck", "proc-macro-error", @@ -225,13 +289,22 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.2.0" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a37c35f1112dad5e6e0b1adaff798507497a18fceeb30cceb3bae7d1427b9213" +checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" dependencies = [ "os_str_bytes", ] +[[package]] +name = "cmake" +version = "0.1.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" +dependencies = [ + "cc", +] + [[package]] name = "const-oid" version = "0.7.1" @@ -249,18 +322,27 @@ dependencies = [ [[package]] name = "crc" -version = "2.1.0" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +checksum = "53757d12b596c16c78b83458d732a5d1a17ab3f53f2f7412f6fb57cc8a140ab3" dependencies = [ "crc-catalog", ] [[package]] name = "crc-catalog" -version = "1.1.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" +checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff" + +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] [[package]] name = "crossbeam-queue" @@ -274,12 +356,12 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.8" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" +checksum = "7d82ee10ce34d7bc12c2122495e7593a9c41347ecdd64185af4ecf72cb1a7f83" dependencies = [ "cfg-if", - "lazy_static", + "once_cell", ] [[package]] @@ -296,9 +378,9 @@ dependencies = [ [[package]] name = "crypto-common" -version = "0.1.3" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8" +checksum = "2ccfd8c0ee4cce11e45b3fd6f9d5e69e0cc62912aa6a0cb1bf4617b0eba5a12f" dependencies = [ "generic-array", "typenum", @@ -314,6 +396,28 @@ dependencies = [ "subtle", ] +[[package]] +name = "csv" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" +dependencies = [ + "bstr", + "csv-core", + "itoa 0.4.8", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + [[package]] name = "curve25519-dalek" version = "3.2.1" @@ -418,9 +522,9 @@ dependencies = [ [[package]] name = "either" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +checksum = "3f107b87b6afc2a64fd13cac55fe06d6c8859f12d4b14cbcdd2c67d0976781be" [[package]] name = "elliptic-curve" @@ -446,6 +550,15 @@ version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" +[[package]] +name = "fastrand" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf" +dependencies = [ + "instant", +] + [[package]] name = "ff" version = "0.11.1" @@ -456,6 +569,22 @@ dependencies = [ "subtle", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + +[[package]] +name = "flate2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -472,6 +601,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.21" @@ -488,6 +632,17 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-intrusive" version = "0.4.0" @@ -499,6 +654,23 @@ dependencies = [ "parking_lot", ] +[[package]] +name = "futures-io" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" + +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.21" @@ -517,11 +689,16 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -547,13 +724,13 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" +checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ "cfg-if", "libc", - "wasi 0.10.2+wasi-snapshot-preview1", + "wasi 0.11.0+wasi-snapshot-preview1", ] [[package]] @@ -567,20 +744,39 @@ dependencies = [ "subtle", ] +[[package]] +name = "h2" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" -version = "0.11.2" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +checksum = "607c8a29735385251a339424dd462993c0fed8fa09d378f259377df08c126022" dependencies = [ "ahash", ] [[package]] name = "hashlink" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086" dependencies = [ "hashbrown", ] @@ -609,6 +805,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "helium-proto" +version = "0.1.0" +source = "git+https://github.com/helium/proto?branch=master#dd7143c0e1338591e44948290492d203efe01203" +dependencies = [ + "bytes", + "prost", + "prost-build", + "serde", + "serde_derive", + "serde_json", + "tonic", + "tonic-build", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -654,13 +865,13 @@ dependencies = [ [[package]] name = "http" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ "bytes", "fnv", - "itoa", + "itoa 1.0.2", ] [[package]] @@ -694,19 +905,20 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] name = "hyper" -version = "0.14.19" +version = "0.14.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42dc3c131584288d375f2d07f822b0cb012d8c6fb899a5b9fdb3cb7eb9b6004f" +checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" dependencies = [ "bytes", "futures-channel", "futures-core", "futures-util", + "h2", "http", "http-body", "httparse", "httpdate", - "itoa", + "itoa 1.0.2", "pin-project-lite", "socket2", "tokio", @@ -715,6 +927,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "idna" version = "0.2.3" @@ -728,9 +952,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.8.2" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6012d540c5baa3589337a98ce73408de9b5a25ec9fc2c6fd6be8f0d39e0ca5a" +checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", "hashbrown", @@ -754,6 +978,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" + [[package]] name = "itoa" version = "1.0.2" @@ -762,9 +992,9 @@ checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" [[package]] name = "js-sys" -version = "0.3.57" +version = "0.3.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "671a26f820db17c2a2750743f1dd03bafd15b98c9f30c7c2628c024c05d73397" +checksum = "c3fac17f7123a73ca62df411b1bf727ccc805daa070338fda671c86dac1bdc27" dependencies = [ "wasm-bindgen", ] @@ -848,11 +1078,20 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc" +dependencies = [ + "adler", +] + [[package]] name = "mio" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "713d550d9b44d89174e066b7a6217ae06234c10cb47819a88290d2b353c31799" +checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" dependencies = [ "libc", "log", @@ -860,6 +1099,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "nom" version = "7.1.1" @@ -870,6 +1115,17 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num-bigint" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -901,9 +1157,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" +checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" [[package]] name = "opaque-debug" @@ -966,20 +1222,30 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "petgraph" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" +checksum = "78203e83c48cffbe01e4a2d35d566ca4de445d79a85372fc64e378bfc812a260" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" +checksum = "710faf75e1b33345361201d36d04e98ac1ed8909151a017ed384700836104c74" dependencies = [ "proc-macro2", "quote", @@ -1002,22 +1268,35 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" name = "poc5g-server" version = "0.1.0" dependencies = [ + "async-compression", "axum", "base64", "chrono", "clap", + "csv", "dotenv", + "futures", + "futures-util", "helium-crypto", + "helium-proto", + "http", "hyper", + "lazy_static", + "once_cell", + "prost", + "rust_decimal", + "rust_decimal_macros", "serde", "serde_json", "sha2 0.10.2", "sqlx", "thiserror", "tokio", + "tonic", "tower-http", "tracing", "tracing-subscriber", + "triggered", ] [[package]] @@ -1026,6 +1305,16 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "prettyplease" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da6ffbe862780245013cb1c0a48c4e44b7d665548088f91f6b90876d0625e4c2" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1052,18 +1341,73 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.39" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f" +checksum = "dd96a1e8ed2596c337f8eae5f24924ec83f5ad5ab21ea8e455d3566c69fbcaf7" dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71adf41db68aa0daaefc69bb30bcd68ded9b9abaad5d1fbb6304c4fb390e083e" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae5a4388762d5815a9fc0dea33c56b021cdc8dde0c55e0c9ca57197254b0cab" +dependencies = [ + "bytes", + "cfg-if", + "cmake", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost", + "prost-types", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b670f45da57fb8542ebdbb6105a925fe571b67f9e7ed9f47a06a84e72b4e7cc" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68" +dependencies = [ + "bytes", + "prost", +] + [[package]] name = "quote" -version = "1.0.18" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" +checksum = "3bcdf212e9776fbcb2d23ab029360416bb1706b1aea2d1a5ba002727cbcab804" dependencies = [ "proc-macro2", ] @@ -1104,7 +1448,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ - "getrandom 0.2.6", + "getrandom 0.2.7", ] [[package]] @@ -1122,16 +1466,16 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" dependencies = [ - "getrandom 0.2.6", + "getrandom 0.2.7", "redox_syscall", "thiserror", ] [[package]] name = "regex" -version = "1.5.6" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1" +checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" dependencies = [ "regex-syntax", ] @@ -1147,9 +1491,18 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.26" +version = "0.6.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" + +[[package]] +name = "remove_dir_all" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] [[package]] name = "rfc6979" @@ -1177,19 +1530,48 @@ dependencies = [ "winapi", ] +[[package]] +name = "rust_decimal" +version = "1.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34a3bb58e85333f1ab191bf979104b586ebd77475bc6681882825f4532dfe87c" +dependencies = [ + "arrayvec", + "num-traits", + "serde", +] + +[[package]] +name = "rust_decimal_macros" +version = "1.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1467556c7c115165aa0346bcf45bc947203bcc880efad85a09ba24ea17926c4" +dependencies = [ + "quote", + "rust_decimal", +] + [[package]] name = "rustls" -version = "0.19.1" +version = "0.20.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" +checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" dependencies = [ - "base64", "log", "ring", "sct", "webpki", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9" +dependencies = [ + "base64", +] + [[package]] name = "ryu" version = "1.0.10" @@ -1204,9 +1586,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "sct" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ "ring", "untrusted", @@ -1226,18 +1608,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.137" +version = "1.0.139" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" +checksum = "0171ebb889e45aa68b44aee0859b3eede84c6f5f5c228e6f140c0b2a0a46cad6" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.137" +version = "1.0.139" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" +checksum = "dc1d3230c1de7932af58ad8ffbe1d784bd55efd5a9d84ac24f69c72d83543dfb" dependencies = [ "proc-macro2", "quote", @@ -1246,11 +1628,11 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" +checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7" dependencies = [ - "itoa", + "itoa 1.0.2", "ryu", "serde", ] @@ -1262,7 +1644,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa", + "itoa 1.0.2", "ryu", "serde", ] @@ -1330,11 +1712,17 @@ dependencies = [ "rand_core 0.6.3", ] +[[package]] +name = "slab" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" + [[package]] name = "smallvec" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" [[package]] name = "socket2" @@ -1365,9 +1753,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "551873805652ba0d912fec5bbb0f8b4cdd96baf8e2ebf5970e5671092966019b" +checksum = "1f82cbe94f41641d6c410ded25bbf5097c240cefdf8e3b06d04198d0a96af6a4" dependencies = [ "sqlx-core", "sqlx-macros", @@ -1375,9 +1763,9 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48c61941ccf5ddcada342cd59e3e5173b007c509e1e8e990dafc830294d9dc5" +checksum = "6b69bf218860335ddda60d6ce85ee39f6cf6e5630e300e19757d1de15886a093" dependencies = [ "ahash", "atoi", @@ -1400,16 +1788,19 @@ dependencies = [ "hkdf", "hmac 0.12.1", "indexmap", - "itoa", + "itoa 1.0.2", "libc", "log", "md-5", "memchr", + "num-bigint", "once_cell", "paste", "percent-encoding", "rand", + "rust_decimal", "rustls", + "rustls-pemfile", "serde", "serde_json", "sha-1", @@ -1422,16 +1813,15 @@ dependencies = [ "tokio-stream", "url", "uuid", - "webpki", "webpki-roots", "whoami", ] [[package]] name = "sqlx-macros" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc0fba2b0cae21fc00fe6046f8baa4c7fcb49e379f0f592b04696607f69ed2e1" +checksum = "f40c63177cf23d356b159b60acd27c54af7423f1736988502e36bae9a712118f" dependencies = [ "dotenv", "either", @@ -1448,9 +1838,9 @@ dependencies = [ [[package]] name = "sqlx-rt" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4db708cd3e459078f85f39f96a00960bd841f66ee2a669e90bf36907f5a79aae" +checksum = "874e93a365a598dc3dadb197565952cb143ae4aa716f7bcc933a8d836f6bf89f" dependencies = [ "once_cell", "tokio", @@ -1481,9 +1871,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.95" +version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942" +checksum = "c50aef8a904de4c23c788f104b7dddc7d6f79c647c7c8ce4cc8f73eb0ca773dd" dependencies = [ "proc-macro2", "quote", @@ -1508,6 +1898,20 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tempfile" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" +dependencies = [ + "cfg-if", + "fastrand", + "libc", + "redox_syscall", + "remove_dir_all", + "winapi", +] + [[package]] name = "termcolor" version = "1.1.3" @@ -1554,11 +1958,12 @@ dependencies = [ [[package]] name = "time" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", + "wasi 0.10.0+wasi-snapshot-preview1", "winapi", ] @@ -1579,10 +1984,11 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.18.2" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395" +checksum = "57aec3cfa4c296db7255446efb4928a6be304b431a806216105542a67b6ca82e" dependencies = [ + "autocfg", "bytes", "libc", "memchr", @@ -1596,11 +2002,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" dependencies = [ "proc-macro2", "quote", @@ -1609,9 +2025,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.22.0" +version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ "rustls", "tokio", @@ -1620,9 +2036,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" dependencies = [ "futures-core", "pin-project-lite", @@ -1631,27 +2047,76 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" +checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" dependencies = [ "bytes", "futures-core", "futures-sink", "pin-project-lite", "tokio", + "tracing", +] + +[[package]] +name = "tonic" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5be9d60db39854b30b835107500cf0aca0b0d14d6e1c3de124217c23a29c2ddb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic-build" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9263bf4c9bfaae7317c1c2faf7f18491d2fe476f70c414b73bf5d445b00ffa1" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", ] [[package]] name = "tower" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", "tokio-util", "tower-layer", @@ -1661,9 +2126,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d342c6d58709c0a6d48d48dabbb62d4ef955cf5f0f3bbfd845838e7ae88dbae" +checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" dependencies = [ "base64", "bitflags", @@ -1688,15 +2153,15 @@ checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" [[package]] name = "tower-service" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.34" +version = "0.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" +checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160" dependencies = [ "cfg-if", "log", @@ -1707,9 +2172,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.21" +version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" +checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" dependencies = [ "proc-macro2", "quote", @@ -1718,14 +2183,24 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.26" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" +checksum = "7b7358be39f2f274f322d2aaed611acc57f382e8eb1e5b48cb9ae30933495ce7" dependencies = [ - "lazy_static", + "once_cell", "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.1.3" @@ -1739,13 +2214,13 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.11" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596" +checksum = "3a713421342a5a666b7577783721d3117f1b69a393df803ee17bb73b1e122a59" dependencies = [ "ansi_term", - "lazy_static", "matchers", + "once_cell", "regex", "sharded-slab", "smallvec", @@ -1755,6 +2230,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "triggered" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce148eae0d1a376c1b94ae651fc3261d9cb8294788b962b7382066376503a2d1" + [[package]] name = "try-lock" version = "0.2.3" @@ -1775,15 +2256,15 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" [[package]] name = "unicode-ident" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d22af068fba1eb5edcb4aea19d382b2a3deb4c8f9d475c589b6ada9e0fd493ee" +checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" [[package]] name = "unicode-normalization" -version = "0.1.19" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" +checksum = "854cbdc4f7bc6ae19c820d44abdc3277ac3e1b2b93db20a636825d9322fb60e6" dependencies = [ "tinyvec", ] @@ -1826,9 +2307,9 @@ dependencies = [ [[package]] name = "uuid" -version = "0.8.2" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" [[package]] name = "valuable" @@ -1860,9 +2341,9 @@ checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" [[package]] name = "wasi" -version = "0.10.2+wasi-snapshot-preview1" +version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" [[package]] name = "wasi" @@ -1872,9 +2353,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27370197c907c55e3f1a9fbe26f44e937fe6451368324e009cba39e139dc08ad" +checksum = "7c53b543413a17a202f4be280a7e5c62a1c69345f5de525ee64f8cfdbc954994" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1882,9 +2363,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53e04185bfa3a779273da532f5025e33398409573f348985af9a1cbf3774d3f4" +checksum = "5491a68ab4500fa6b4d726bd67408630c3dbe9c4fe7bda16d5c82a1fd8c7340a" dependencies = [ "bumpalo", "lazy_static", @@ -1897,9 +2378,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17cae7ff784d7e83a2fe7611cfe766ecf034111b49deb850a3dc7699c08251f5" +checksum = "c441e177922bc58f1e12c022624b6216378e5febc2f0533e41ba443d505b80aa" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1907,9 +2388,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99ec0dc7a4756fffc231aab1b9f2f578d23cd391390ab27f952ae0c9b3ece20b" +checksum = "7d94ac45fcf608c1f45ef53e748d35660f168490c10b23704c7779ab8f5c3048" dependencies = [ "proc-macro2", "quote", @@ -1920,15 +2401,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d554b7f530dee5964d9a9468d95c1f8b8acae4f282807e7d27d4b03099a46744" +checksum = "6a89911bd99e5f3659ec4acf9c4d93b0a90fe4a2a11f15328472058edc5261be" [[package]] name = "web-sys" -version = "0.3.57" +version = "0.3.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b17e741662c70c8bd24ac5c5b18de314a2c26c32bf8346ee1e6f53de919c283" +checksum = "2fed94beee57daf8dd7d51f2b15dc2bcde92d7a72304cdf662a4371008b71b90" dependencies = [ "js-sys", "wasm-bindgen", @@ -1936,9 +2417,9 @@ dependencies = [ [[package]] name = "webpki" -version = "0.21.4" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" dependencies = [ "ring", "untrusted", @@ -1946,13 +2427,24 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.21.1" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" +checksum = "f1c760f0d366a6c24a02ed7816e23e691f5d92291f94d15e836006fd11b04daf" dependencies = [ "webpki", ] +[[package]] +name = "which" +version = "4.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae" +dependencies = [ + "either", + "lazy_static", + "libc", +] + [[package]] name = "whoami" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index 9bad32663..08f4d6fce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ thiserror = "1" serde = {version = "1", features=["derive"]} serde_json = "1" clap = {version = "3", features = ["derive"]} -sqlx = {version = "0", features = ["postgres", "uuid", "chrono", "migrate", "macros", "runtime-tokio-rustls"]} +sqlx = {version = "0", features = ["postgres", "uuid", "decimal", "chrono", "migrate", "macros", "runtime-tokio-rustls"]} tokio = { version = "1", default-features=false, features=["fs", "macros", "signal", "rt", "process", "time"] } tracing = "0" tracing-subscriber = { version = "0", features = ["env-filter"] } @@ -20,6 +20,19 @@ axum = "0" hyper = "*" base64 = "0" sha2 = "*" +http = "*" +tonic = "0" +lazy_static = "*" chrono = {version = "0", features = ["serde"]} tower-http = {version = "*", features = ["auth", "trace"]} +triggered = "0" +futures = "*" +futures-util = "*" +prost = "*" +csv = "1" +once_cell = "1" +async-compression = {version = "0", features = ["tokio", "gzip"]} +helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]} helium-crypto = {git = "https://github.com/helium/helium-crypto-rs", tag="v0.3.4"} +rust_decimal = "1" +rust_decimal_macros = "1" diff --git a/build.rs b/build.rs index 760959384..d5068697c 100644 --- a/build.rs +++ b/build.rs @@ -2,4 +2,4 @@ fn main() { // trigger recompilation when a new migration is added println!("cargo:rerun-if-changed=migrations"); -} \ No newline at end of file +} diff --git a/migrations/2_cell_attach_event copy.sql b/migrations/2_cell_attach_event copy.sql deleted file mode 100644 index 4c62cf162..000000000 --- a/migrations/2_cell_attach_event copy.sql +++ /dev/null @@ -1,9 +0,0 @@ -create table cell_attach_event ( - id uuid primary key not null default uuid_generate_v1mc(), - imsi text, - pubkey text, - timestamp timestamptz not null, - - created_at timestamptz default now() -); - diff --git a/migrations/2_follower.sql b/migrations/2_follower.sql new file mode 100644 index 000000000..17474c5f4 --- /dev/null +++ b/migrations/2_follower.sql @@ -0,0 +1,7 @@ +create table follower_meta ( + key text primary key not null, + value text +); + +insert into follower_meta (key, value) +values ('last_height', '995041') \ No newline at end of file diff --git a/migrations/3_cell_heartbeat.sql b/migrations/3_cell_heartbeat.sql deleted file mode 100644 index 23b761d99..000000000 --- a/migrations/3_cell_heartbeat.sql +++ /dev/null @@ -1,17 +0,0 @@ -create table cell_heartbeat ( - id uuid primary key not null default uuid_generate_v1mc(), - - pubkey text, - hotspot_type text, - cell_id integer, - timestamp timestamptz not null, - lon float, - lat float, - operation_mode boolean, - cbsd_category text, - - created_at timestamptz default now() -); - -create index cell_heartbeat_pubkey_idx on cell_heartbeat(pubkey); -create index cell_heartbeat_timestamp_idx on cell_heartbeat(timestamp); \ No newline at end of file diff --git a/migrations/3_gateway.sql b/migrations/3_gateway.sql new file mode 100644 index 000000000..c88d7c6b1 --- /dev/null +++ b/migrations/3_gateway.sql @@ -0,0 +1,11 @@ +create table gateway ( + address text primary key not null, + owner text not null, + location text, + + last_heartbeat timestamptz, + last_speedtest timestamptz, + last_attach timestamptz, + + created_at timestamptz default now() +); diff --git a/migrations/4_cell_speedtest.sql b/migrations/4_cell_speedtest.sql deleted file mode 100644 index 3ab685a74..000000000 --- a/migrations/4_cell_speedtest.sql +++ /dev/null @@ -1,15 +0,0 @@ -create table cell_speedtest ( - id uuid primary key not null default uuid_generate_v1mc(), - - pubkey text, - serial text, - timestamp timestamptz not null, - upload_speed bigint, - download_speed bigint, - latency integer, - - created_at timestamptz default now() -); - -create index cell_speedtest_pubkey_idx on cell_speedtest(pubkey); -create index cell_speedtest_timestamp_idx on cell_speedtest(timestamp); \ No newline at end of file diff --git a/src/api/attach_event.rs b/src/api/attach_event.rs new file mode 100644 index 000000000..73a1d49d6 --- /dev/null +++ b/src/api/attach_event.rs @@ -0,0 +1,54 @@ +use crate::{api::api_error, Error, Imsi, PublicKey, Result}; +use axum::{extract::Extension, http::StatusCode, Json}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use sqlx::{PgPool, Row}; + +pub async fn create_cell_attach_event( + Json(event): Json, + Extension(pool): Extension, +) -> std::result::Result, (StatusCode, String)> { + event + .insert_into(&pool) + .await + .map(|pubkey: PublicKey| { + json!({ + "pubkey": pubkey, + }) + }) + .map(Json) + .map_err(api_error) +} + +#[derive(sqlx::FromRow, Deserialize, Serialize)] +pub struct CellAttachEvent { + pub imsi: Imsi, + #[serde(alias = "publicAddress")] + pub pubkey: PublicKey, + #[serde(alias = "iso_timestamp")] + pub timestamp: DateTime, +} + +impl CellAttachEvent { + pub async fn insert_into<'e, 'c, E>(&self, executor: E) -> Result + where + E: 'e + sqlx::Executor<'c, Database = sqlx::Postgres>, + { + sqlx::query( + r#" + insert into gateways (pubkey, owner, payer, height, txn_hash, block_timestamp, last_heartbeat, last_speedtest, last_attach) + values ($1, NULL, NULL, 0, NULL, NULL, NULL, NULL, $2) + on conflict (pubkey) do update set + last_attach = EXCLUDED.last_attach + returning pubkey + "#, + ) + .bind(&self.pubkey) + .bind(&self.timestamp) + .fetch_one(executor) + .await + .and_then(|row| row.try_get("pubkey")) + .map_err(Error::from) + } +} diff --git a/src/api/attach_events.rs b/src/api/attach_events.rs deleted file mode 100644 index 49a2450b6..000000000 --- a/src/api/attach_events.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::{ - api::{internal_error, not_found_error, DatabaseConnection}, - CellAttachEvent, Uuid, -}; -use axum::{extract::Path, http::StatusCode, Json}; -use serde_json::{json, Value}; - -pub async fn create_cell_attach_event( - Json(event): Json, - DatabaseConnection(mut conn): DatabaseConnection, -) -> std::result::Result, (StatusCode, String)> { - event - .insert_into(&mut conn) - .await - .map(|id: Uuid| { - json!({ - "id": id, - }) - }) - .map(Json) - .map_err(internal_error) -} - -pub async fn get_cell_attach_event( - Path(id): Path, - DatabaseConnection(mut conn): DatabaseConnection, -) -> std::result::Result, (StatusCode, String)> { - let event = CellAttachEvent::get(&mut conn, &id) - .await - .map_err(internal_error)?; - if let Some(event) = event { - let json = serde_json::to_value(event).map_err(internal_error)?; - Ok(Json(json)) - } else { - Err(not_found_error()) - } -} diff --git a/src/api/gateway.rs b/src/api/gateway.rs new file mode 100644 index 000000000..85e5db840 --- /dev/null +++ b/src/api/gateway.rs @@ -0,0 +1,304 @@ +use crate::{ + api::api_error, datetime_from_epoch, follower::FollowerService, Error, PublicKey, Result, +}; +use axum::{ + extract::{Extension, Path, Query}, + http::StatusCode, + Json, +}; +use chrono::{DateTime, Utc}; +use helium_proto::FollowerGatewayRespV1; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use sqlx::PgPool; +use std::cmp::min; + +pub async fn get_gateway( + Path(pubkey): Path, + Extension(pool): Extension, +) -> std::result::Result, (StatusCode, String)> { + let event = Gateway::get(&pool, &pubkey).await.map_err(api_error)?; + if let Some(event) = event { + let json = serde_json::to_value(event).map_err(api_error)?; + Ok(Json(json)) + } else { + Err(Error::not_found(format!("Gateway {pubkey} not found")).into()) + } +} + +pub async fn get_gateways( + Query(after): Query, + Extension(pool): Extension, +) -> std::result::Result, (StatusCode, String)> { + let gateways = Gateway::list(&pool, &after).await.map_err(api_error)?; + let json = serde_json::to_value(gateways).map_err(api_error)?; + Ok(Json(json)) +} + +pub const DEFAULT_GATEWAY_COUNT: usize = 100; +pub const MAX_GATEWAY_COUNT: u32 = 1000; + +#[derive(sqlx::FromRow, Deserialize, Serialize, Debug)] +pub struct Gateway { + pub address: PublicKey, + pub owner: PublicKey, + pub location: Option, + + pub last_heartbeat: Option>, + pub last_speedtest: Option>, + pub last_attach: Option>, + + #[serde(skip_deserializing)] + pub created_at: Option>, +} + +impl TryFrom for Gateway { + type Error = Error; + fn try_from(value: FollowerGatewayRespV1) -> Result { + let location = if value.location.is_empty() { + None + } else { + Some(value.location) + }; + Ok(Self { + address: PublicKey::try_from(value.address.as_ref())?, + owner: PublicKey::try_from(value.owner.as_ref())?, + location, + last_heartbeat: None, + last_speedtest: None, + last_attach: None, + + created_at: None, + }) + } +} + +enum TimestampField { + Heartbeat, + SpeedTest, + Attach, +} + +impl TimestampField { + const UPDATE_LAST_HEARTBEAT: &'static str = r#" + update gateway set + last_heartbeat = $2 + where address = $1 + "#; + const UPDATE_LAST_SPEEDTEST: &'static str = r#" + update gateway set + last_speedtest = $2 + where address = $1 + "#; + const UPDATE_LAST_ATTACH: &'static str = r#" + update gateway set + last_attach = $2 + where address = $1 + "#; + + fn update_query(&self) -> &'static str { + match self { + Self::Heartbeat => Self::UPDATE_LAST_HEARTBEAT, + Self::SpeedTest => Self::UPDATE_LAST_SPEEDTEST, + Self::Attach => Self::UPDATE_LAST_ATTACH, + } + } + + fn update_gateway(&self, gw: &mut Gateway, timestamp: DateTime) { + match self { + Self::Heartbeat => gw.last_heartbeat = Some(timestamp), + Self::SpeedTest => gw.last_speedtest = Some(timestamp), + Self::Attach => gw.last_attach = Some(timestamp), + } + } +} + +impl Gateway { + pub async fn insert_into<'c, E>(&self, executor: E) -> Result + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + sqlx::query( + r#" + insert into gateway ( + address, + owner, + location, + last_heartbeat, + last_speedtest, + last_attach + ) values ($1, $2, $3, $4, $5, $6) + on conflict (address) do update set + owner = EXCLUDED.owner, + location = EXCLUDED.location, + last_heartbeat = EXCLUDED.last_heartbeat, + last_attach = EXCLUDED.last_attach; + "#, + ) + .bind(&self.address) + .bind(&self.owner) + .bind(&self.location) + .bind(self.last_attach) + .bind(self.last_heartbeat) + .bind(self.last_attach) + .execute(executor) + .await + .map(|_| ()) + .map_err(Error::from) + } + + pub async fn update_owner<'c, 'q, E>( + executor: E, + address: &PublicKey, + owner: &PublicKey, + ) -> Result + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + let rows_affected = sqlx::query( + r#" + update gateway set + owner = $2 + where address = $1 + "#, + ) + .bind(&address) + .bind(&owner) + .execute(executor) + .await + .map(|res| res.rows_affected()) + .map_err(Error::from)?; + if rows_affected == 0 { + Err(Error::not_found(format!("gateway {address} not found"))) + } else { + Ok(()) + } + } + + async fn _update_last_timestamp<'c, 'q, E>( + executor: E, + follower: &'q mut FollowerService, + field: TimestampField, + address: &'q PublicKey, + timestamp: &'q DateTime, + ) -> Result + where + E: sqlx::Executor<'c, Database = sqlx::Postgres> + Clone, + { + let rows_affected = sqlx::query(field.update_query()) + .bind(address) + .bind(timestamp) + .execute(executor.clone()) + .await + .map(|res| res.rows_affected()) + .map_err(Error::from)?; + if rows_affected == 0 { + let mut gw = Gateway::try_from(follower.find_gateway(address).await?)?; + field.update_gateway(&mut gw, *timestamp); + gw.insert_into(executor).await + } else { + Ok(()) + } + } + + pub async fn update_last_heartbeat<'c, 'q, E>( + executor: E, + follower: &'q mut FollowerService, + address: &'q PublicKey, + timestamp: &'q DateTime, + ) -> Result + where + E: sqlx::Executor<'c, Database = sqlx::Postgres> + Clone, + { + Self::_update_last_timestamp( + executor, + follower, + TimestampField::Heartbeat, + address, + timestamp, + ) + .await + } + + pub async fn update_last_speedtest<'c, 'q, E>( + executor: E, + follower: &'q mut FollowerService, + address: &'q PublicKey, + timestamp: &'q DateTime, + ) -> Result + where + E: sqlx::Executor<'c, Database = sqlx::Postgres> + Clone, + { + Self::_update_last_timestamp( + executor, + follower, + TimestampField::SpeedTest, + address, + timestamp, + ) + .await + } + + pub async fn update_last_attach<'c, E>( + executor: E, + follower: &'static mut FollowerService, + address: &'static PublicKey, + timestamp: &'static DateTime, + ) -> Result + where + E: sqlx::Executor<'c, Database = sqlx::Postgres> + Clone, + { + Self::_update_last_timestamp( + executor, + follower, + TimestampField::Attach, + address, + timestamp, + ) + .await + } + + pub async fn get<'c, E>(executor: E, address: &PublicKey) -> Result> + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + sqlx::query_as::<_, Self>( + r#" + select * from gateway + where address = $1 + "#, + ) + .bind(address) + .fetch_optional(executor) + .await + .map_err(Error::from) + } + + pub async fn list<'c, E>(executor: E, after: &After) -> Result> + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + sqlx::query_as::<_, Self>( + r#" + select * from gateway + where created_at > $1 + order by created_at asc + limit $3 + "#, + ) + .bind(after.created_at.unwrap_or_else(|| datetime_from_epoch(0))) + .bind(min( + MAX_GATEWAY_COUNT as i32, + after.count.unwrap_or(DEFAULT_GATEWAY_COUNT) as i32, + )) + .fetch_all(executor) + .await + .map_err(Error::from) + } +} + +#[derive(Deserialize)] +pub struct After { + pub created_at: Option>, + pub count: Option, +} diff --git a/src/api/heartbeat.rs b/src/api/heartbeat.rs new file mode 100644 index 000000000..4146e666c --- /dev/null +++ b/src/api/heartbeat.rs @@ -0,0 +1,57 @@ +use crate::{ + api::{api_error, gateway::Gateway, Follower}, + Error, EventId, PublicKey, Result, +}; +use axum::{extract::Extension, http::StatusCode, Json}; +use chrono::{DateTime, Utc}; +use helium_proto::services::poc_mobile::CellHeartbeatReqV1; +use serde::Deserialize; +use serde_json::{json, Value}; +use sqlx::PgPool; + +pub async fn create_cell_heartbeat( + Json(event): Json, + Extension(pool): Extension, + Follower(mut follower): Follower, +) -> std::result::Result, (StatusCode, String)> { + Gateway::update_last_heartbeat(&pool, &mut follower, &event.pubkey, &event.timestamp) + .await + .and_then(|_| EventId::try_from(event)) + .map(|id| json!({ "id": id })) + .map(Json) + .map_err(api_error) +} + +#[derive(Deserialize)] +pub struct CellHeartbeat { + #[serde(alias = "pubKey")] + pub pubkey: PublicKey, + pub hotspot_type: String, + pub cell_id: u32, + pub timestamp: DateTime, + #[serde(alias = "longitude")] + pub lon: f64, + #[serde(alias = "latitude")] + pub lat: f64, + pub operation_mode: bool, + pub cbsd_category: String, + pub cbsd_id: String, +} + +impl TryFrom for CellHeartbeatReqV1 { + type Error = Error; + fn try_from(v: CellHeartbeat) -> Result { + Ok(Self { + pub_key: v.pubkey.to_vec(), + hotspot_type: v.hotspot_type, + cell_id: v.cell_id, + timestamp: v.timestamp.timestamp() as u64, + lon: v.lon, + lat: v.lat, + operation_mode: v.operation_mode, + cbsd_category: v.cbsd_category, + cbsd_id: v.cbsd_id, + signature: vec![], + }) + } +} diff --git a/src/api/heartbeats.rs b/src/api/heartbeats.rs deleted file mode 100644 index 0f7741335..000000000 --- a/src/api/heartbeats.rs +++ /dev/null @@ -1,69 +0,0 @@ -use crate::{ - api::{internal_error, not_found_error, DatabaseConnection}, - pagination::Since, - CellHeartbeat, Uuid, -}; -use axum::{ - extract::{Path, Query}, - http::StatusCode, - Json, -}; -use serde_json::{json, Value}; - -pub async fn create_cell_heartbeat( - Json(event): Json, - DatabaseConnection(mut conn): DatabaseConnection, -) -> std::result::Result, (StatusCode, String)> { - event - .insert_into(&mut conn) - .await - .map(|id: Uuid| { - json!({ - "id": id, - }) - }) - .map(Json) - .map_err(internal_error) -} - -pub async fn get_cell_hearbeat( - Path(id): Path, - DatabaseConnection(mut conn): DatabaseConnection, -) -> std::result::Result, (StatusCode, String)> { - let event = CellHeartbeat::get(&mut conn, &id) - .await - .map_err(internal_error)?; - if let Some(event) = event { - let json = serde_json::to_value(event).map_err(internal_error)?; - Ok(Json(json)) - } else { - Err(not_found_error()) - } -} - -pub async fn get_hotspot_cell_heartbeats( - Path(id): Path, - Query(since): Query, - DatabaseConnection(mut conn): DatabaseConnection, -) -> std::result::Result, (StatusCode, String)> { - let heartbeats = CellHeartbeat::for_hotspot_since(&mut conn, &id, &since) - .await - .map_err(internal_error)?; - let json = serde_json::to_value(heartbeats).map_err(internal_error)?; - Ok(Json(json)) -} - -pub async fn get_hotspot_last_cell_heartbeat( - Path(id): Path, - DatabaseConnection(mut conn): DatabaseConnection, -) -> std::result::Result, (StatusCode, String)> { - let heartbeat = CellHeartbeat::for_hotspot_last(&mut conn, &id) - .await - .map_err(internal_error)?; - if let Some(heartbeat) = heartbeat { - let json = serde_json::to_value(heartbeat).map_err(internal_error)?; - Ok(Json(json)) - } else { - Err(not_found_error()) - } -} diff --git a/src/api/mod.rs b/src/api/mod.rs index 29d2afdb0..190c41e7b 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,45 +1,39 @@ -pub mod attach_events; -pub mod heartbeats; -pub mod speedtests; +pub mod attach_event; +pub mod gateway; +pub mod heartbeat; +pub mod server; +pub mod speedtest; +use crate::{follower::FollowerService, Error}; use axum::{ async_trait, extract::{Extension, FromRequest, RequestParts}, http::StatusCode, }; -use sqlx::postgres::PgPool; -/// Utility function returning a not found error -pub fn not_found_error() -> (StatusCode, String) { - (StatusCode::NOT_FOUND, "not found".to_string()) -} - -/// Utility function for mapping any error into a `500 Internal Server Error` -/// response. -pub fn internal_error(err: E) -> (StatusCode, String) +/// Utility function for mapping any error into an api error +pub fn api_error(err: E) -> (StatusCode, String) where E: std::error::Error, + Error: From, { - (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) + Error::from(err).into() } -// A custom extractor that grabs a connection from the pool -pub struct DatabaseConnection(sqlx::pool::PoolConnection); +pub struct Follower(FollowerService); #[async_trait] -impl FromRequest for DatabaseConnection +impl FromRequest for Follower where B: Send, { type Rejection = (StatusCode, String); async fn from_request(req: &mut RequestParts) -> std::result::Result { - let Extension(pool) = Extension::::from_request(req) + let Extension(follower) = Extension::::from_request(req) .await - .map_err(internal_error)?; - - let conn = pool.acquire().await.map_err(internal_error)?; + .map_err(api_error)?; - Ok(Self(conn)) + Ok(Self(follower)) } } diff --git a/src/api/server.rs b/src/api/server.rs new file mode 100644 index 000000000..2e04473fe --- /dev/null +++ b/src/api/server.rs @@ -0,0 +1,171 @@ +use crate::{ + api::{ + attach_event, + gateway::{self, Gateway}, + heartbeat, + }, + datetime_from_epoch, + follower::FollowerService, + Error, EventId, PublicKey, Result, +}; +use axum::{ + extract::Extension, + routing::{get, post}, + Router, +}; +use futures_util::TryFutureExt; +use helium_proto::services::poc_mobile::{ + self, CellHeartbeatReqV1, CellHeartbeatRespV1, SpeedtestReqV1, SpeedtestRespV1, +}; +use sqlx::{Pool, Postgres}; +use std::{io, net::SocketAddr}; +use tonic::{metadata::MetadataValue, transport, Request, Response, Status}; +use tower_http::{auth::RequireAuthorizationLayer, trace::TraceLayer}; + +async fn empty_handler() {} + +pub async fn api_server(pool: Pool, shutdown: triggered::Listener) -> Result { + let api_addr = dotenv::var("API_SOCKET_ADDR").and_then(|v| { + v.parse::().map_err(|_| { + dotenv::Error::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid api socket address", + )) + }) + })?; + let api_token = dotenv::var("API_TOKEN")?; + let api_ro_token = dotenv::var("API_RO_TOKEN")?; + let follower = FollowerService::from_env()?; + + // build our application with some routes + let app = Router::new() + // health + .route("/health", get(empty_handler)) + // attach events + .route( + "/cell/attach-events", + post(attach_event::create_cell_attach_event) + .layer(RequireAuthorizationLayer::bearer(&api_token)), + ) + // heartbeats + .route( + "/cell/heartbeats", + post(heartbeat::create_cell_heartbeat) + .layer(RequireAuthorizationLayer::bearer(&api_token)), + ) + // hotspots + .route( + "/hotspots", + get(gateway::get_gateways).layer(RequireAuthorizationLayer::bearer(&api_ro_token)), + ) + .route( + "/hotspots/:pubkey", + get(gateway::get_gateway).layer(RequireAuthorizationLayer::bearer(&api_ro_token)), + ) + .layer(TraceLayer::new_for_http()) + .layer(Extension(pool)) + .layer(Extension(follower)); + tracing::info!("api listening on {}", api_addr); + + axum::Server::bind(&api_addr) + .serve(app.into_make_service()) + .with_graceful_shutdown(async move { + shutdown.await; + tracing::info!("stopping server") + }) + .map_err(Error::from) + .await +} + +pub type GrpcResult = std::result::Result, Status>; + +#[derive(Clone)] +pub struct GrpcServer { + pool: Pool, +} + +impl GrpcServer { + pub fn new(pool: Pool) -> Self { + Self { pool } + } +} + +fn decode_pubkey(pubkey: &[u8]) -> std::result::Result { + PublicKey::try_from(pubkey).map_err(|_err| Status::internal("Failed to decode public key")) +} + +#[tonic::async_trait] +impl poc_mobile::PocMobile for GrpcServer { + async fn submit_speedtest( + &self, + request: Request, + ) -> GrpcResult { + // TODO: Signature verify speedtest_req + let event = request.into_inner(); + let mut follower = FollowerService::from_env()?; + Gateway::update_last_speedtest( + &self.pool, + &mut follower, + &decode_pubkey(&event.pub_key)?, + &datetime_from_epoch(event.timestamp as i64), + ) + .await + // Encode event digest, encode and return as the id + .map(|_| EventId::from(event)) + .map(|id| Response::new(id.into())) + .map_err(Status::from) + } + + async fn submit_cell_heartbeat( + &self, + request: Request, + ) -> GrpcResult { + // TODO: Signature verify heartbeat_req + let event = request.into_inner(); + let pubkey = decode_pubkey(&event.pub_key)?; + let mut follower = FollowerService::from_env()?; + Gateway::update_last_heartbeat( + &self.pool, + &mut follower, + &pubkey, + &datetime_from_epoch(event.timestamp as i64), + ) + .await + // Encode event digest, encode and return as the id + .map(|_| EventId::from(event)) + .map(|id| Response::new(id.into())) + .map_err(Status::from) + } +} + +pub async fn grpc_server(pool: Pool, shutdown: triggered::Listener) -> Result { + let grpc_addr = dotenv::var("GRPC_SOCKET_ADDR").and_then(|v| { + v.parse::().map_err(|_| { + dotenv::Error::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid grpc socket address", + )) + }) + })?; + + let poc_mobile = GrpcServer::new(pool); + let api_token = dotenv::var("API_TOKEN").map(|token| { + format!("Bearer {}", token) + .parse::>() + .unwrap() + })?; + + tracing::info!("grpc listening on {}", grpc_addr); + + transport::Server::builder() + .add_service(poc_mobile::Server::with_interceptor( + poc_mobile, + move |req: Request<()>| match req.metadata().get("authorization") { + Some(t) if api_token == t => Ok(req), + _ => Err(Status::unauthenticated("No valid auth token")), + }, + )) + .serve_with_shutdown(grpc_addr, shutdown) + .map_err(Error::from) + .await +} diff --git a/src/api/speedtest.rs b/src/api/speedtest.rs new file mode 100644 index 000000000..594132e5c --- /dev/null +++ b/src/api/speedtest.rs @@ -0,0 +1,51 @@ +use crate::{ + api::{api_error, gateway::Gateway, Follower}, + Error, EventId, PublicKey, Result, +}; +use axum::{extract::Extension, http::StatusCode, Json}; +use chrono::{DateTime, Utc}; +use helium_proto::services::poc_mobile::SpeedtestReqV1; +use serde::Deserialize; +use serde_json::{json, Value}; +use sqlx::PgPool; + +pub async fn create_cell_speedtest( + Json(event): Json, + Extension(pool): Extension, + Follower(mut follower): Follower, +) -> std::result::Result, (StatusCode, String)> { + Gateway::update_last_speedtest(&pool, &mut follower, &event.pubkey, &event.timestamp) + .await + .and_then(|_| EventId::try_from(event)) + .map(|id| json!({ "id": id })) + .map(Json) + .map_err(api_error) +} + +#[derive(Deserialize)] +pub struct CellSpeedtest { + #[serde(alias = "pubKey")] + pub pubkey: PublicKey, + pub serial: String, + pub timestamp: DateTime, + #[serde(alias = "uploadSpeed")] + pub upload_speed: u64, + #[serde(alias = "downloadSpeed")] + pub download_speed: u64, + pub latency: u32, +} + +impl TryFrom for SpeedtestReqV1 { + type Error = Error; + fn try_from(v: CellSpeedtest) -> Result { + Ok(SpeedtestReqV1 { + pub_key: v.pubkey.to_vec(), + serial: v.serial, + timestamp: v.timestamp.timestamp() as u64, + upload_speed: v.upload_speed, + download_speed: v.download_speed, + latency: v.latency, + signature: vec![], + }) + } +} diff --git a/src/api/speedtests.rs b/src/api/speedtests.rs deleted file mode 100644 index c29fd56f6..000000000 --- a/src/api/speedtests.rs +++ /dev/null @@ -1,69 +0,0 @@ -use crate::{ - api::{internal_error, not_found_error, DatabaseConnection}, - pagination::Since, - CellSpeedtest, Uuid, -}; -use axum::{ - extract::{Path, Query}, - http::StatusCode, - Json, -}; -use serde_json::{json, Value}; - -pub async fn create_cell_speedtest( - Json(event): Json, - DatabaseConnection(mut conn): DatabaseConnection, -) -> std::result::Result, (StatusCode, String)> { - event - .insert_into(&mut conn) - .await - .map(|id: Uuid| { - json!({ - "id": id, - }) - }) - .map(Json) - .map_err(internal_error) -} - -pub async fn get_cell_speedtest( - Path(id): Path, - DatabaseConnection(mut conn): DatabaseConnection, -) -> std::result::Result, (StatusCode, String)> { - let event = CellSpeedtest::get(&mut conn, &id) - .await - .map_err(internal_error)?; - if let Some(event) = event { - let json = serde_json::to_value(event).map_err(internal_error)?; - Ok(Json(json)) - } else { - Err(not_found_error()) - } -} - -pub async fn get_hotspot_cell_speedtests( - Path(id): Path, - Query(since): Query, - DatabaseConnection(mut conn): DatabaseConnection, -) -> std::result::Result, (StatusCode, String)> { - let heartbeats = CellSpeedtest::for_hotspot_since(&mut conn, &id, &since) - .await - .map_err(internal_error)?; - let json = serde_json::to_value(heartbeats).map_err(internal_error)?; - Ok(Json(json)) -} - -pub async fn get_hotspot_last_cell_speedtest( - Path(id): Path, - DatabaseConnection(mut conn): DatabaseConnection, -) -> std::result::Result, (StatusCode, String)> { - let heartbeat = CellSpeedtest::for_hotspot_last(&mut conn, &id) - .await - .map_err(internal_error)?; - if let Some(heartbeat) = heartbeat { - let json = serde_json::to_value(heartbeat).map_err(internal_error)?; - Ok(Json(json)) - } else { - Err(not_found_error()) - } -} diff --git a/src/attach_event.rs b/src/attach_event.rs deleted file mode 100644 index b40503a96..000000000 --- a/src/attach_event.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::{Error, Imsi, PublicKey, Result, Uuid}; -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use sqlx::{PgConnection, Row}; - -#[derive(sqlx::FromRow, Deserialize, Serialize)] -pub struct CellAttachEvent { - pub imsi: Imsi, - #[serde(alias = "publicAddress")] - pub pubkey: PublicKey, - #[serde(alias = "iso_timestamp")] - pub timestamp: DateTime, - - #[serde(skip_deserializing)] - pub id: Uuid, - #[serde(skip_deserializing)] - pub created_at: Option>, -} - -impl CellAttachEvent { - pub async fn insert_into(&self, conn: &mut PgConnection) -> Result { - sqlx::query( - r#" - insert into cell_attach_event (pubkey, imsi, timestamp) - values ($1, $2, $3) - returning id - "#, - ) - .bind(&self.pubkey) - .bind(&self.imsi) - .bind(self.timestamp) - .fetch_one(conn) - .await - .and_then(|row| row.try_get("id")) - .map_err(Error::from) - } - - pub async fn get(conn: &mut PgConnection, id: &Uuid) -> Result> { - sqlx::query_as::<_, Self>( - r#" - select * from cell_attach_event - where id = $1::uuid - "#, - ) - .bind(id) - .fetch_optional(conn) - .await - .map_err(Error::from) - } -} diff --git a/src/cell_type.rs b/src/cell_type.rs new file mode 100644 index 000000000..77520d0c5 --- /dev/null +++ b/src/cell_type.rs @@ -0,0 +1,41 @@ +use rust_decimal::Decimal; +use rust_decimal_macros::dec; + +#[derive(Debug, Eq, Hash, PartialEq)] +pub enum CellType { + Nova436H, + Nova430I, + Neutrino430, + SercommIndoor, + SercommOutdoor, +} + +impl CellType { + pub fn fcc_id(&self) -> &'static str { + match self { + Self::Nova436H => "2AG32PBS3101S", + Self::Nova430I => "2AG32PBS3101S", + Self::Neutrino430 => "2AG32PBS31010", + Self::SercommIndoor => "P27-SCE4255W", + Self::SercommOutdoor => "P27-SCO4255PA10", + } + } + + pub fn reward_weight(&self) -> Decimal { + match self { + Self::Nova436H => dec!(2.0), + Self::Nova430I => dec!(1.5), + Self::Neutrino430 => dec!(1.0), + Self::SercommIndoor => dec!(1.0), + Self::SercommOutdoor => dec!(1.5), + } + } + + pub fn reward_shares(&self, units: u64) -> Decimal { + self.reward_weight() * Decimal::from(units) + } + + pub fn rewards(&self, base_rewards: Decimal) -> Decimal { + base_rewards * self.reward_weight() + } +} diff --git a/src/cli/gateway.rs b/src/cli/gateway.rs new file mode 100644 index 000000000..bea884012 --- /dev/null +++ b/src/cli/gateway.rs @@ -0,0 +1,49 @@ +use crate::{ + api::gateway::Gateway, + cli::{mk_db_pool, print_json}, + PublicKey, Result, +}; +use serde_json::json; + +/// Import eligible gateways to align with the blockchain-node this follower is +/// connected to +#[derive(Debug, clap::Parser)] +pub struct Cmd { + #[clap(subcommand)] + cmd: GatewayCmd, +} + +impl Cmd { + pub async fn run(&self) -> Result { + self.cmd.run().await + } +} + +#[derive(Debug, clap::Subcommand)] +pub enum GatewayCmd { + Get(Get), +} + +impl GatewayCmd { + pub async fn run(&self) -> Result { + match self { + Self::Get(cmd) => cmd.run().await, + } + } +} + +/// Get gateway information for a given address +#[derive(Debug, clap::Args)] +pub struct Get { + address: PublicKey, +} + +impl Get { + pub async fn run(&self) -> Result { + let pool = mk_db_pool(1).await?; + + let gateway = Gateway::get(&pool, &self.address).await?; + + print_json(&json!({ "gateway": gateway })) + } +} diff --git a/src/cli/maker.rs b/src/cli/maker.rs new file mode 100644 index 000000000..59099d5b0 --- /dev/null +++ b/src/cli/maker.rs @@ -0,0 +1,38 @@ +use crate::{cli::print_json, maker, Result}; + +/// Add or remove eligible makers +#[derive(Debug, clap::Parser)] +pub struct Cmd { + #[clap(subcommand)] + cmd: MakerCmd, +} + +impl Cmd { + pub async fn run(&self) -> Result { + self.cmd.run().await + } +} + +#[derive(Debug, clap::Subcommand)] +pub enum MakerCmd { + List(List), +} + +/// Commands on makers and their descriptions +#[derive(Debug, clap::Args)] +pub struct List {} + +impl MakerCmd { + pub async fn run(&self) -> Result { + match self { + Self::List(cmd) => cmd.run().await, + } + } +} + +impl List { + pub async fn run(&self) -> Result { + let list = maker::allowed(); + print_json(&list) + } +} diff --git a/src/cli/mod.rs b/src/cli/mod.rs new file mode 100644 index 000000000..529c71c42 --- /dev/null +++ b/src/cli/mod.rs @@ -0,0 +1,20 @@ +pub mod gateway; +pub mod maker; +pub mod server; + +use crate::Result; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; + +pub(crate) fn print_json(value: &T) -> Result { + println!("{}", serde_json::to_string_pretty(value)?); + Ok(()) +} + +pub(crate) async fn mk_db_pool(size: u32) -> Result> { + let db_connection_str = dotenv::var("DATABASE_URL")?; + let pool = PgPoolOptions::new() + .max_connections(size) + .connect(&db_connection_str) + .await?; + Ok(pool) +} diff --git a/src/cli/server.rs b/src/cli/server.rs new file mode 100644 index 000000000..0632a177f --- /dev/null +++ b/src/cli/server.rs @@ -0,0 +1,43 @@ +use crate::{api::server, cli, Follower, Result}; +use tokio::{signal, sync::broadcast}; + +/// Starts the server +#[derive(Debug, clap::Args)] +pub struct Cmd {} + +impl Cmd { + pub async fn run(&self) -> Result { + // Create database pool + let pool = cli::mk_db_pool(10).await?; + sqlx::migrate!().run(&pool).await?; + + // configure shutdown trigger + let (shutdown_trigger, shutdown_listener) = triggered::trigger(); + tokio::spawn(async move { + let _ = signal::ctrl_c().await; + shutdown_trigger.trigger() + }); + + // api server + let api_server = server::api_server(pool.clone(), shutdown_listener.clone()); + + // grpc server + let grpc_server = server::grpc_server(pool.clone(), shutdown_listener.clone()); + + // chain follower + let (trigger_sender, _trigger_receiver) = broadcast::channel(2); + let mut follower = Follower::new(pool.clone(), trigger_sender).await?; + + // reward server + // let mut reward_server = RewardServer::new(follower_uri, base_path, trigger_receiver.clone(), shutdown_listener.clone()); + + tokio::try_join!( + api_server, + grpc_server, + follower.run(shutdown_listener.clone()), + // reward_server.run(shutdown_listener.clone()) + )?; + + Ok(()) + } +} diff --git a/src/error.rs b/src/error.rs index 5a7770d84..18a62078f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,8 +8,64 @@ pub enum Error { DotEnv(#[from] dotenv::Error), #[error("sql error")] Sql(#[from] sqlx::Error), + #[error("io error")] + Io(#[from] std::io::Error), #[error("migration error")] Migrate(#[from] sqlx::migrate::MigrateError), - #[error("http error")] - Http(#[from] hyper::Error), + #[error("http server error")] + Server(#[from] hyper::Error), + #[error("http server extension error")] + ServerExtension(#[from] axum::extract::rejection::ExtensionRejection), + #[error("grpc {}", .0.message())] + Grpc(#[from] tonic::Status), + #[error("service error")] + Service(#[from] helium_proto::services::Error), + #[error("uri error")] + Uri(#[from] http::uri::InvalidUri), + #[error("crypto error")] + Crypto(#[from] helium_crypto::Error), + #[error("json error")] + Json(#[from] serde_json::Error), + #[error("csv error")] + Csv(#[from] csv::Error), + #[error("datetime error")] + Chrono(#[from] chrono::ParseError), + #[error("not found")] + NotFound(String), + #[error("invalid decimals in {0}, only 8 allowed")] + Decimals(String), + #[error("unexpected or invalid number {0}")] + Number(String), +} + +impl Error { + pub fn not_found(msg: E) -> Self { + Self::NotFound(msg.to_string()) + } + + pub fn decimals(value: &str) -> Self { + Self::Decimals(value.to_string()) + } + + pub fn number(value: &str) -> Self { + Self::Number(value.to_string()) + } +} + +impl From for tonic::Status { + fn from(v: Error) -> Self { + match v { + Error::NotFound(msg) => tonic::Status::not_found(msg), + _other => tonic::Status::internal("internal error"), + } + } +} + +impl From for (http::StatusCode, String) { + fn from(v: Error) -> Self { + match v { + Error::NotFound(msg) => (http::StatusCode::NOT_FOUND, msg), + err => (http::StatusCode::INTERNAL_SERVER_ERROR, err.to_string()), + } + } } diff --git a/src/event_id.rs b/src/event_id.rs new file mode 100644 index 000000000..b7b892c2f --- /dev/null +++ b/src/event_id.rs @@ -0,0 +1,60 @@ +use crate::{ + api::{heartbeat::CellHeartbeat, speedtest::CellSpeedtest}, + Error, Result, +}; +use helium_proto::services::poc_mobile::{ + CellHeartbeatReqV1, CellHeartbeatRespV1, SpeedtestReqV1, SpeedtestRespV1, +}; +use serde::Serialize; +use sha2::{Digest, Sha256}; + +pub struct EventId(String); + +impl Serialize for EventId { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.0) + } +} + +impl ToString for EventId { + fn to_string(&self) -> String { + self.0.to_string() + } +} + +impl From for EventId { + fn from(event: M) -> Self { + Self(base64::encode(Sha256::digest(event.encode_to_vec()))) + } +} + +impl TryFrom for EventId { + type Error = Error; + fn try_from(event: CellHeartbeat) -> Result { + let req = CellHeartbeatReqV1::try_from(event)?; + Ok(Self::from(req)) + } +} + +impl TryFrom for EventId { + type Error = Error; + fn try_from(event: CellSpeedtest) -> Result { + let req = SpeedtestReqV1::try_from(event)?; + Ok(Self::from(req)) + } +} + +impl From for CellHeartbeatRespV1 { + fn from(v: EventId) -> Self { + Self { id: v.0 } + } +} + +impl From for SpeedtestRespV1 { + fn from(v: EventId) -> Self { + Self { id: v.0 } + } +} diff --git a/src/follower/client.rs b/src/follower/client.rs new file mode 100644 index 000000000..2a6dc8dfa --- /dev/null +++ b/src/follower/client.rs @@ -0,0 +1,62 @@ +use crate::{env_var, PublicKey, Result}; +use helium_proto::{ + services::{Channel, Endpoint}, + FollowerGatewayReqV1, FollowerGatewayRespV1, FollowerTxnStreamReqV1, FollowerTxnStreamRespV1, +}; +use http::Uri; +use std::time::Duration; +use tonic::Streaming; + +const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); +const RPC_TIMEOUT: Duration = Duration::from_secs(5); +pub const DEFAULT_URI: &str = "http://127.0.0.1:8080"; + +type FollowerClient = helium_proto::follower_client::FollowerClient; + +#[derive(Debug, Clone)] +pub struct FollowerService { + client: FollowerClient, +} + +impl FollowerService { + pub fn from_env() -> Result { + let uri = env_var("FOLLOWER_URI", Uri::from_static(DEFAULT_URI))?; + Self::new(uri) + } + + pub fn new(uri: Uri) -> Result { + let channel = Endpoint::from(uri) + .connect_timeout(CONNECT_TIMEOUT) + .timeout(RPC_TIMEOUT) + .connect_lazy(); + Ok(Self { + client: FollowerClient::new(channel), + }) + } + + pub async fn find_gateway(&mut self, address: &PublicKey) -> Result { + let req = FollowerGatewayReqV1 { + address: address.to_vec(), + }; + let res = self.client.find_gateway(req).await?.into_inner(); + Ok(res) + } + + pub async fn txn_stream( + &mut self, + height: Option, + txn_hash: &[u8], + txn_types: &[T], + ) -> Result> + where + T: ToString, + { + let req = FollowerTxnStreamReqV1 { + height, + txn_hash: txn_hash.to_vec(), + txn_types: txn_types.iter().map(|e| e.to_string()).collect(), + }; + let res = self.client.txn_stream(req).await?.into_inner(); + Ok(res) + } +} diff --git a/src/follower/mod.rs b/src/follower/mod.rs new file mode 100644 index 000000000..c724c7e35 --- /dev/null +++ b/src/follower/mod.rs @@ -0,0 +1,199 @@ +pub mod client; +pub use client::FollowerService; + +use crate::{api::gateway::Gateway, env_var, rewards, Error, PublicKey, Result}; +use helium_proto::{ + blockchain_txn::Txn, BlockchainTokenTypeV1, BlockchainTxn, BlockchainTxnSubnetworkRewardsV1, + FollowerTxnStreamRespV1, +}; +use sqlx::{Pool, Postgres}; +use tokio::{sync::broadcast, time}; +use tonic::Streaming; + +/// First block that 5G hotspots were introduced (FreedomFi) +pub const DEFAULT_START_BLOCK: i64 = 995041; + +pub const TXN_TYPES: &[&str] = &[ + "blockchain_txn_transfer_hotspot_v1", + "blockchain_txn_transfer_hotspot_v2", + "blockchain_txn_consensus_group_v1", + "blockchain_txn_subnetwork_rewards_v1", +]; + +pub struct Follower { + pool: Pool, + service: FollowerService, + start_block: i64, + trigger: broadcast::Sender, +} + +impl Follower { + pub async fn new( + pool: Pool, + trigger: broadcast::Sender, + ) -> Result { + let start_block = env_var("FOLLOWER_START_BLOCK", DEFAULT_START_BLOCK)?; + let service = FollowerService::from_env()?; + Ok(Self { + service, + pool, + start_block, + trigger, + }) + } + + async fn last_height<'c, E>(executor: E, start_block: i64) -> Result + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + let height = sqlx::query_scalar::<_, String>( + r#" + select value from follower_meta + where key = 'last_height' + "#, + ) + .fetch_optional(executor) + .await? + .and_then(|v| v.parse::().map_or_else(|_| None, Some)) + .unwrap_or(start_block); + Ok(height) + } + + async fn update_last_height<'c, E>(executor: E, height: i64) -> Result + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + let _ = sqlx::query( + r#" + insert into follower_meta (key, value) + values ('last_height', $1) + on conflict (key) do update set + value = EXCLUDED.value + "#, + ) + .bind(height.to_string()) + .execute(executor) + .await?; + Ok(()) + } + + pub async fn run(&mut self, shutdown: triggered::Listener) -> Result { + tracing::info!("starting follower"); + + loop { + if shutdown.is_triggered() { + tracing::info!("stopping follower"); + return Ok(()); + } + let height = Self::last_height(&self.pool, self.start_block).await? as u64; + tracing::info!("connecting to txn stream at height {height}"); + tokio::select! { + _ = shutdown.clone() => (), + stream_result = self.service.txn_stream(Some(height), &[], TXN_TYPES) => match stream_result { + Ok(txn_stream) => { + tracing::info!("connected to txn stream"); + self.run_with_txn_stream(txn_stream, shutdown.clone()).await? + } + Err(err) => { + tracing::warn!("failed to connect to txn stream: {err}"); + self.reconnect_wait(shutdown.clone()).await + } + } + } + } + } + + async fn reconnect_wait(&mut self, shutdown: triggered::Listener) { + let timer = time::sleep(time::Duration::from_secs(5)); + tokio::select! { + _ = timer => (), + _ = shutdown => (), + } + } + + async fn run_with_txn_stream( + &mut self, + mut txn_stream: Streaming, + shutdown: triggered::Listener, + ) -> Result { + loop { + tokio::select! { + msg = txn_stream.message() => match msg { + Ok(Some(txn)) => { + let height = txn.height as i64; + self.process_txn_entry(txn).await?; + Self::update_last_height(&self.pool, height).await?; + } + Ok(None) => { + tracing::warn!("txn stream disconnected"); + return Ok(()); + } + Err(err) => { + tracing::warn!("txn stream error {err:?}"); + return Ok(()); + } + }, + _ = shutdown.clone() => return Ok(()) + } + } + } + + async fn process_txn_entry(&mut self, entry: FollowerTxnStreamRespV1) -> Result { + let txn = match entry.txn { + Some(BlockchainTxn { txn: Some(ref txn) }) => txn, + _ => { + tracing::warn!("ignoring missing txn in stream"); + return Ok(()); + } + }; + match txn { + Txn::TransferHotspot(txn) => { + self.process_transfer_gateway(txn.gateway.as_ref(), txn.buyer.as_ref()) + .await + } + Txn::TransferHotspotV2(txn) => { + self.process_transfer_gateway(txn.gateway.as_ref(), txn.new_owner.as_ref()) + .await + } + Txn::SubnetworkRewards(txn) => self.process_subnet_rewards(&entry, txn).await, + Txn::ConsensusGroup(_) => self.process_consensus_group(&entry).await, + _ => Ok(()), + } + } + + async fn process_transfer_gateway(&mut self, gateway: &[u8], owner: &[u8]) -> Result { + let gateway = PublicKey::try_from(gateway)?; + let owner = PublicKey::try_from(owner)?; + tracing::info!("processing transfer hotspot for {gateway} to {owner}"); + match Gateway::update_owner(&self.pool, &gateway, &owner).await { + Ok(()) => Ok(()), + Err(Error::NotFound(_)) => Ok(()), + Err(err) => Err(err), + } + } + + async fn process_subnet_rewards( + &mut self, + _envelope: &FollowerTxnStreamRespV1, + txn: &BlockchainTxnSubnetworkRewardsV1, + ) -> Result { + if txn.token_type() != BlockchainTokenTypeV1::Mobile { + return Ok(()); + } + Ok(()) + } + + async fn process_consensus_group(&mut self, envelope: &FollowerTxnStreamRespV1) -> Result { + tracing::info!( + "processing consensus group at {height}", + height = envelope.height + ); + match self.trigger.send(rewards::Trigger::new(envelope.height)) { + Ok(_) => Ok(()), + Err(_) => { + tracing::error!("failed to send reward trigger"); + Ok(()) + } + } + } +} diff --git a/src/heartbeat.rs b/src/heartbeat.rs deleted file mode 100644 index 91b712750..000000000 --- a/src/heartbeat.rs +++ /dev/null @@ -1,108 +0,0 @@ -use crate::{pagination::Since, Error, PublicKey, Result, Uuid}; -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use sqlx::{PgConnection, Row}; -use std::{cmp::min, time::SystemTime}; - -pub const DEFAULT_HEARTBEAT_COUNT: usize = 100; -pub const MAX_HEARTBEAT_COUNT: u32 = 1000; - -#[derive(sqlx::FromRow, Deserialize, Serialize)] -pub struct CellHeartbeat { - #[serde(alias = "pubKey")] - pub pubkey: PublicKey, - pub hotspot_type: String, - pub cell_id: i32, - pub timestamp: DateTime, - #[serde(alias = "longitude")] - pub lon: f64, - #[serde(alias = "latitude")] - pub lat: f64, - pub operation_mode: bool, - pub cbsd_category: String, - - #[serde(skip_deserializing)] - pub id: Uuid, - #[serde(skip_deserializing)] - pub created_at: Option>, -} - -impl CellHeartbeat { - pub async fn insert_into(&self, conn: &mut PgConnection) -> Result { - sqlx::query( - r#" - insert into cell_heartbeat (pubkey, hotspot_type, cell_id, timestamp, lat, lon, operation_mode, cbsd_category) - values ($1, $2, $3, $4, $5, $6, $7, $8) - returning id - "#, - ) - .bind(&self.pubkey) - .bind(&self.hotspot_type) - .bind(&self.cell_id) - .bind(self.timestamp) - .bind(&self.lat) - .bind(&self.lon) - .bind(&self.operation_mode) - .bind(&self.cbsd_category) - .fetch_one(conn) - .await - .and_then(|row| row.try_get("id")) - .map_err(Error::from) - } - - pub async fn get(conn: &mut PgConnection, id: &Uuid) -> Result> { - sqlx::query_as::<_, Self>( - r#" - select * from cell_heartbeat - where id = $1::uuid - "#, - ) - .bind(id) - .fetch_optional(conn) - .await - .map_err(Error::from) - } - - pub async fn for_hotspot_since( - conn: &mut PgConnection, - id: &str, - since: &Since, - ) -> Result> { - sqlx::query_as::<_, Self>( - r#" - select * from cell_heartbeat - where pubkey = $1 and timestamp > $2 - order by timestamp asc - limit $3 - "#, - ) - .bind(id) - .bind( - since - .since - .unwrap_or_else(|| DateTime::::from(SystemTime::UNIX_EPOCH)), - ) - .bind(min( - MAX_HEARTBEAT_COUNT, - since.count.unwrap_or(DEFAULT_HEARTBEAT_COUNT) as u32, - )) - .fetch_all(conn) - .await - .map_err(Error::from) - } - - pub async fn for_hotspot_last(conn: &mut PgConnection, id: &str) -> Result> { - sqlx::query_as::<_, Self>( - r#" - select * from cell_heartbeat - where pubkey = $1 - order by timestamp desc - limit 1 - "#, - ) - .bind(id) - .fetch_optional(conn) - .await - .map_err(Error::from) - } -} diff --git a/src/lib.rs b/src/lib.rs index f6ffc46a2..b54448971 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,17 +1,42 @@ pub mod api; -mod attach_event; +mod cell_type; +pub mod cli; mod error; -mod heartbeat; +mod event_id; +mod follower; mod imsi; -pub mod pagination; +pub mod maker; mod public_key; -mod speedtest; +pub mod rewards; +pub mod store; +pub mod util; mod uuid; -pub use attach_event::CellAttachEvent; +pub use cell_type::CellType; pub use error::{Error, Result}; -pub use heartbeat::CellHeartbeat; +pub use event_id::EventId; +pub use follower::Follower; pub use imsi::Imsi; pub use public_key::PublicKey; -pub use speedtest::CellSpeedtest; pub use uuid::Uuid; + +use chrono::{DateTime, NaiveDateTime, Utc}; +use std::io; + +pub fn datetime_from_epoch(secs: i64) -> DateTime { + DateTime::::from_utc(NaiveDateTime::from_timestamp(secs, 0), Utc) +} + +fn env_var(key: &str, default: T) -> Result +where + T: std::str::FromStr, + ::Err: std::fmt::Debug, +{ + match dotenv::var(key) { + Ok(v) => v + .parse::() + .map_err(|_err| Error::from(io::Error::from(io::ErrorKind::InvalidInput))), + Err(dotenv::Error::EnvVar(std::env::VarError::NotPresent)) => Ok(default), + Err(err) => Err(Error::from(err)), + } +} diff --git a/src/main.rs b/src/main.rs index 0bd044f19..96b0882b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,22 @@ -use axum::{ - extract::Extension, - routing::{get, post}, - Router, -}; -use poc5g_server::{ - api::{attach_events, heartbeats, speedtests}, - Result, -}; -use sqlx::postgres::PgPoolOptions; -use std::{io, net::SocketAddr, time::Duration}; -use tower_http::{auth::RequireAuthorizationLayer, trace::TraceLayer}; +use clap::Parser; +use poc5g_server::{cli, Result}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +#[derive(Debug, clap::Subcommand)] +pub enum Cmd { + Server(cli::server::Cmd), + Maker(Box), + Gateway(cli::gateway::Cmd), +} + +#[derive(Debug, clap::Parser)] +#[clap(version = env!("CARGO_PKG_VERSION"))] +#[clap(about = "PoC Mobile Token Server")] +pub struct Cli { + #[clap(subcommand)] + cmd: Cmd, +} + #[tokio::main] async fn main() -> Result { dotenv::dotenv()?; @@ -22,78 +27,11 @@ async fn main() -> Result { .with(tracing_subscriber::fmt::layer()) .init(); - let db_connection_str = dotenv::var("DATABASE_URL")?; - let addr = dotenv::var("SOCKET_ADDR").and_then(|v| { - v.parse::().map_err(|_| { - dotenv::Error::Io(io::Error::new( - io::ErrorKind::InvalidInput, - "invalid socket address", - )) - }) - })?; - let api_token = dotenv::var("API_TOKEN")?; - let api_ro_token = dotenv::var("API_RO_TOKEN")?; - - let pool = PgPoolOptions::new() - .max_connections(10) - .connect_timeout(Duration::from_secs(3)) - .connect(&db_connection_str) - .await?; - - sqlx::migrate!().run(&pool).await?; - - // build our application with some routes - let app = Router::new() - .route( - "/cell/attach-events/:id", - get(attach_events::get_cell_attach_event), - ) - .route( - "/cell/attach-events", - post(attach_events::create_cell_attach_event) - .layer(RequireAuthorizationLayer::bearer(&api_token)), - ) - // heartbeats - .route( - "/cell/heartbeats/hotspots/:id/last", - get(heartbeats::get_hotspot_last_cell_heartbeat) - .layer(RequireAuthorizationLayer::bearer(&api_ro_token)), - ) - .route("/cell/heartbeats/:id", get(heartbeats::get_cell_hearbeat)) - .route( - "/cell/heartbeats", - post(heartbeats::create_cell_heartbeat) - .layer(RequireAuthorizationLayer::bearer(&api_token)), - ) - .route( - "/cell/heartbeats/hotspots/:id", - get(heartbeats::get_hotspot_cell_heartbeats) - .layer(RequireAuthorizationLayer::bearer(&api_ro_token)), - ) - // speedtests - .route("/cell/speedtests/:id", get(speedtests::get_cell_speedtest)) - .route( - "/cell/speedtests", - post(speedtests::create_cell_speedtest) - .layer(RequireAuthorizationLayer::bearer(&api_token)), - ) - .route( - "/cell/speedtests/hotspots/:id/last", - get(speedtests::get_hotspot_last_cell_speedtest) - .layer(RequireAuthorizationLayer::bearer(&api_ro_token)), - ) - .route( - "/cell/speedtests/hotspots/:id", - get(speedtests::get_hotspot_cell_speedtests) - .layer(RequireAuthorizationLayer::bearer(&api_ro_token)), - ) - .layer(TraceLayer::new_for_http()) - .layer(Extension(pool)); + let cli = Cli::parse(); - // run it with hyper - tracing::debug!("listening on {}", addr); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await?; - Ok(()) + match cli.cmd { + Cmd::Server(cmd) => cmd.run().await, + Cmd::Maker(cmd) => cmd.run().await, + Cmd::Gateway(cmd) => cmd.run().await, + } } diff --git a/src/maker.rs b/src/maker.rs new file mode 100644 index 000000000..8aaf0bd3f --- /dev/null +++ b/src/maker.rs @@ -0,0 +1,39 @@ +use crate::PublicKey; +use once_cell::sync::OnceCell; +use serde::Serialize; +use std::str::FromStr; + +#[derive(Serialize)] +pub struct Maker { + pub pubkey: PublicKey, + pub description: &'static str, +} + +impl Maker { + fn new(pubkey: &'static str, description: &'static str) -> Self { + Self { + pubkey: PublicKey::from_str(pubkey).expect("maker public key"), + description, + } + } +} + +pub fn allowed() -> &'static Vec { + static CELL: OnceCell> = OnceCell::new(); + CELL.get_or_init(|| { + vec![ + Maker::new( + "13y2EqUUzyQhQGtDSoXktz8m5jHNSiwAKLTYnHNxZq2uH5GGGym", + "FreedomFi", + ), + Maker::new( + "14gqqPV2HEs4PCNNUacKVG7XeAhCUkN553NcBVw4xfwSFcCjhXv", + "Bobcat 5G", + ), + ] + }) +} + +pub fn allows(pubkey: &PublicKey) -> bool { + allowed().iter().any(|maker| maker.pubkey.eq(pubkey)) +} diff --git a/src/pagination.rs b/src/pagination.rs deleted file mode 100644 index 3c50fe9e5..000000000 --- a/src/pagination.rs +++ /dev/null @@ -1,8 +0,0 @@ -use chrono::{DateTime, Utc}; -use serde::Deserialize; - -#[derive(Deserialize)] -pub struct Since { - pub since: Option>, - pub count: Option, -} diff --git a/src/public_key.rs b/src/public_key.rs index 45600c7ae..f12b954b2 100644 --- a/src/public_key.rs +++ b/src/public_key.rs @@ -1,3 +1,4 @@ +use crate::{Error, Result}; use serde::{ de::{self, Deserializer}, ser::Serializer, @@ -7,12 +8,13 @@ use sqlx::{ decode::Decode, encode::{Encode, IsNull}, error::BoxDynError, - postgres::{PgArgumentBuffer, PgTypeInfo, PgValueRef, Postgres}, + postgres::{PgArgumentBuffer, PgRow, PgTypeInfo, PgValueRef, Postgres}, types::Type, + Row, }; use std::{ops::Deref, str::FromStr}; -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct PublicKey(helium_crypto::PublicKey); impl Deref for PublicKey { @@ -41,7 +43,7 @@ impl Encode<'_, Postgres> for PublicKey { } impl<'r> Decode<'r, Postgres> for PublicKey { - fn decode(value: PgValueRef<'r>) -> Result { + fn decode(value: PgValueRef<'r>) -> std::result::Result { let value = <&str as Decode>::decode(value)?; let key = helium_crypto::PublicKey::from_str(value)?; Ok(Self(key)) @@ -62,10 +64,36 @@ impl<'de> Deserialize<'de> for PublicKey { } impl Serialize for PublicKey { - fn serialize(&self, serializer: S) -> Result + fn serialize(&self, serializer: S) -> std::result::Result where S: Serializer, { serializer.serialize_str(&self.to_string()) } } + +impl TryFrom<&[u8]> for PublicKey { + type Error = Error; + fn try_from(value: &[u8]) -> Result { + Ok(Self(helium_crypto::PublicKey::try_from(value)?)) + } +} + +impl std::fmt::Display for PublicKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::str::FromStr for PublicKey { + type Err = Error; + fn from_str(s: &str) -> std::result::Result { + Ok(Self(helium_crypto::PublicKey::from_str(s)?)) + } +} + +impl<'r> sqlx::FromRow<'r, PgRow> for PublicKey { + fn from_row(row: &'r PgRow) -> std::result::Result { + row.try_get("pubkey") + } +} diff --git a/src/rewards/emissions.rs b/src/rewards/emissions.rs new file mode 100644 index 000000000..9ccc86681 --- /dev/null +++ b/src/rewards/emissions.rs @@ -0,0 +1,147 @@ +use crate::{cell_type::CellType, util::Mobile}; +use chrono::{DateTime, TimeZone, Utc}; +use lazy_static::lazy_static; +use rust_decimal::Decimal; +use std::collections::HashMap; + +// 100M genesis rewards per day +const GENESIS_REWARDS_PER_DAY: u64 = 100_000_000; + +lazy_static! { + static ref GENESIS_START: DateTime = Utc.ymd(2022, 7, 11).and_hms(0, 0, 0); +} + +pub fn get_emissions_per_model( + models: HashMap, + datetime: DateTime, +) -> HashMap { + let total_rewards = get_scheduled_tokens(datetime) + .expect("Failed to supply valid date on the emission schedule"); + + let nova436h_units = models.get(&CellType::Nova436H).unwrap_or(&0); + let nova430i_units = models.get(&CellType::Nova430I).unwrap_or(&0); + let sercommo_units = models.get(&CellType::SercommOutdoor).unwrap_or(&0); + let sercommi_units = models.get(&CellType::SercommIndoor).unwrap_or(&0); + let neut430_units = models.get(&CellType::Neutrino430).unwrap_or(&0); + + let nova436h_shares = CellType::Nova436H.reward_shares(*nova436h_units); + let nova430i_shares = CellType::Nova430I.reward_shares(*nova430i_units); + let sercommo_shares = CellType::SercommOutdoor.reward_shares(*sercommo_units); + let sercommi_shares = CellType::SercommIndoor.reward_shares(*sercommi_units); + let neut430_shares = CellType::Neutrino430.reward_shares(*neut430_units); + + let total_shares = + nova436h_shares + nova430i_shares + sercommo_shares + sercommi_shares + neut430_shares; + + let base_reward = total_rewards / total_shares; + + let nova436h_rewards = calc_rewards(CellType::Nova436H, base_reward, *nova436h_units); + let nova430i_rewards = calc_rewards(CellType::Nova430I, base_reward, *nova430i_units); + let sercommo_rewards = calc_rewards(CellType::SercommOutdoor, base_reward, *sercommo_units); + let sercommi_rewards = calc_rewards(CellType::SercommIndoor, base_reward, *sercommi_units); + let neut430_rewards = calc_rewards(CellType::Neutrino430, base_reward, *neut430_units); + + HashMap::from([ + (CellType::Nova436H, nova436h_rewards), + (CellType::Nova430I, nova430i_rewards), + (CellType::SercommOutdoor, sercommo_rewards), + (CellType::SercommIndoor, sercommi_rewards), + (CellType::Neutrino430, neut430_rewards), + ]) +} + +fn calc_rewards(cell_type: CellType, base_reward: Decimal, num_units: u64) -> Mobile { + if num_units > 0 { + Mobile::from(cell_type.rewards(base_reward) * Decimal::from(num_units)) + } else { + Mobile::from(0) + } +} + +fn get_scheduled_tokens(datetime: DateTime) -> Option { + if *GENESIS_START < datetime { + // 100M genesis rewards per day + Some(Decimal::from(GENESIS_REWARDS_PER_DAY)) + } else { + None + } +} + +#[cfg(test)] +mod test { + use super::*; + use rust_decimal_macros::dec; + + #[test] + fn genesis_reward() { + let expected = HashMap::from([ + ( + CellType::SercommOutdoor, + Mobile::from(dec!(132860.93888397)), + ), + (CellType::Nova430I, Mobile::from(dec!(17670504.87156776))), + (CellType::Nova436H, Mobile::from(dec!(177147.91851196))), + ( + CellType::SercommIndoor, + Mobile::from(dec!(81842338.35252436)), + ), + (CellType::Neutrino430, Mobile::from(dec!(177147.91851196))), + ]); + let date = Utc.ymd(2022, 7, 17).and_hms(0, 0, 0); + let input = HashMap::from([ + (CellType::SercommOutdoor, 1), + (CellType::Nova430I, 133), + (CellType::Nova436H, 1), + (CellType::SercommIndoor, 924), + (CellType::Neutrino430, 2), + ]); + let output = get_emissions_per_model(input, date); + assert_eq!(expected, output); + } + + // #[test] + // fn post_genesis_reward() { + // let expected = HashMap::from([ + // (CellModel::SercommOutdoor, 6111534 * BONES), + // (CellModel::Nova430I, 6111534 * BONES), + // (CellModel::Nova436H, 8148712 * BONES), + // (CellModel::SercommIndoor, 4074356 * BONES), + // (CellModel::Neutrino430, 4074356 * BONES), + // ]); + // let date = Utc.ymd(2023, 1, 1).and_hms(0, 0, 0); + // let input = HashMap::from([ + // (CellModel::SercommOutdoor, 20), + // (CellModel::Nova430I, 15), + // (CellModel::Nova436H, 10), + // (CellModel::SercommIndoor, 13), + // (CellModel::Neutrino430, 8), + // ]); + // assert_eq!(expected, get_emissions_per_model(input, date)) + // } + + #[test] + fn no_reporting_model_reward() { + let expected = HashMap::from([ + ( + CellType::SercommOutdoor, + Mobile::from(dec!(133096.71694765)), + ), + (CellType::Nova430I, Mobile::from(dec!(17701863.35403727))), + (CellType::Nova436H, Mobile::from(0)), + ( + CellType::SercommIndoor, + Mobile::from(dec!(81987577.63975155)), + ), + (CellType::Neutrino430, Mobile::from(dec!(177462.28926353))), + ]); + let date = Utc.ymd(2022, 7, 17).and_hms(0, 0, 0); + let input = HashMap::from([ + (CellType::SercommOutdoor, 1), + (CellType::Nova430I, 133), + (CellType::SercommIndoor, 924), + (CellType::Neutrino430, 2), + ]); + let output = get_emissions_per_model(input, date); + assert_eq!(expected, output) + } +} diff --git a/src/rewards/mod.rs b/src/rewards/mod.rs new file mode 100644 index 000000000..666b7d4b6 --- /dev/null +++ b/src/rewards/mod.rs @@ -0,0 +1,12 @@ +pub mod emissions; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Trigger { + pub block_height: u64, +} + +impl Trigger { + pub fn new(block_height: u64) -> Self { + Self { block_height } + } +} diff --git a/src/speedtest.rs b/src/speedtest.rs deleted file mode 100644 index eacc3b287..000000000 --- a/src/speedtest.rs +++ /dev/null @@ -1,104 +0,0 @@ -use crate::{pagination::Since, Error, PublicKey, Result, Uuid}; -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use sqlx::{PgConnection, Row}; -use std::{cmp::min, time::SystemTime}; - -pub const DEFAULT_SPEEDTEST_COUNT: usize = 100; -pub const MAX_SPEEDTEST_COUNT: u32 = 1000; - -#[derive(sqlx::FromRow, Deserialize, Serialize)] -pub struct CellSpeedtest { - #[serde(alias = "pubKey")] - pub pubkey: PublicKey, - pub serial: String, - pub timestamp: DateTime, - #[serde(alias = "uploadSpeed")] - pub upload_speed: i64, - #[serde(alias = "downloadSpeed")] - pub download_speed: i64, - pub latency: i32, - - #[serde(skip_deserializing)] - pub id: Uuid, - #[serde(skip_deserializing)] - pub created_at: Option>, -} - -impl CellSpeedtest { - pub async fn insert_into(&self, conn: &mut PgConnection) -> Result { - sqlx::query( - r#" - insert into cell_speedtest (pubkey, serial, timestamp, upload_speed, download_speed, latency) - values ($1, $2, $3, $4, $5, $6) - returning id - "#, - ) - .bind(&self.pubkey) - .bind(&self.serial) - .bind(self.timestamp) - .bind(&self.upload_speed) - .bind(&self.download_speed) - .bind(&self.latency) - .fetch_one(conn) - .await - .and_then(|row| row.try_get("id")) - .map_err(Error::from) - } - - pub async fn get(conn: &mut PgConnection, id: &Uuid) -> Result> { - sqlx::query_as::<_, Self>( - r#" - select * from cell_speedtest - where id = $1::uuid - "#, - ) - .bind(id) - .fetch_optional(conn) - .await - .map_err(Error::from) - } - - pub async fn for_hotspot_since( - conn: &mut PgConnection, - id: &str, - since: &Since, - ) -> Result> { - sqlx::query_as::<_, Self>( - r#" - select * from cell_speedtest - where pubkey = $1 and timestamp > $2 - order by timestamp asc - limit $3 - "#, - ) - .bind(id) - .bind( - since - .since - .unwrap_or_else(|| DateTime::::from(SystemTime::UNIX_EPOCH)), - ) - .bind(min( - MAX_SPEEDTEST_COUNT, - since.count.unwrap_or(DEFAULT_SPEEDTEST_COUNT) as u32, - )) - .fetch_all(conn) - .await - .map_err(Error::from) - } - - pub async fn for_hotspot_last(conn: &mut PgConnection, id: &str) -> Result> { - sqlx::query_as::<_, Self>( - r#" - select * from cell_speedtest - where pubkey = $1 - order by timestamp desc - limit 1 - "#, - ) - .bind(id) - .fetch_optional(conn) - .await - .map_err(Error::from) - } -} diff --git a/src/store/file_writer.rs b/src/store/file_writer.rs new file mode 100644 index 000000000..8827fb853 --- /dev/null +++ b/src/store/file_writer.rs @@ -0,0 +1,108 @@ +use crate::{Error, Result}; +use async_compression::tokio::write::GzipEncoder; +use chrono::Utc; +use std::path::{Path, PathBuf}; +use tokio::{ + fs::{self, File, OpenOptions}, + io::AsyncWriteExt, +}; + +type Sink = GzipEncoder; + +pub struct FileWriter { + target_path: PathBuf, + tmp_path: PathBuf, + max_size: usize, + prefix: String, + + current_sink_size: usize, + current_sink_path: PathBuf, + current_sink: Option, +} + +impl FileWriter { + pub async fn new( + target_path: &Path, + tmp_path: &Path, + prefix: &str, + max_size: usize, + ) -> Result { + fs::create_dir_all(target_path).await?; + fs::create_dir_all(tmp_path).await?; + Ok(Self { + target_path: target_path.to_path_buf(), + tmp_path: tmp_path.to_path_buf(), + max_size, + prefix: prefix.to_string(), + current_sink_size: 0, + current_sink_path: PathBuf::new(), + current_sink: None, + }) + } + + async fn new_sink(&self) -> Result<(PathBuf, PathBuf, Sink)> { + let filename = format!("{}-{}.gz", self.prefix, Utc::now().timestamp_millis()); + let prev_path = self.current_sink_path.to_path_buf(); + let new_path = self.tmp_path.join(filename); + let new_sink = GzipEncoder::new( + OpenOptions::new() + .write(true) + .create(true) + .open(&new_path) + .await?, + ); + Ok((prev_path, new_path, new_sink)) + } + + async fn roll_sink(&mut self) -> Result { + let (prev_path, new_path, new_sink) = self.new_sink().await?; + if let Some(current_sink) = self.current_sink.as_mut() { + current_sink.shutdown().await?; + } + self.current_sink = Some(new_sink); + self.current_sink_path = new_path; + self.current_sink_size = 0; + Ok(prev_path) + } + + async fn deposit_sink(&self, sink_path: &Path) -> Result { + let target_filename = sink_path.file_name().ok_or_else(|| { + Error::from(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "expected sink filename", + )) + })?; + let target_path = self.target_path.join(&target_filename); + fs::rename(&sink_path, &target_path).await?; + Ok(()) + } + + pub async fn write(&mut self, item: T) -> Result { + let buf = item.encode_to_vec(); + self.write_all(&buf).await + } + + pub async fn write_all(&mut self, buf: &[u8]) -> Result { + if self.current_sink.is_none() { + let _ = self.roll_sink().await?; + }; + let prev_sink_path = if (self.current_sink_size + buf.len()) >= self.max_size { + Some(self.roll_sink().await?) + } else { + None + }; + + if let Some(prev_sink_path) = prev_sink_path { + self.deposit_sink(&prev_sink_path).await?; + } + + if let Some(sink) = self.current_sink.as_mut() { + Ok(sink.write(buf).await?) + } else { + Err(Error::from(std::io::Error::new( + std::io::ErrorKind::Other, + "sink not available", + ))) + } + } +} diff --git a/src/store/mod.rs b/src/store/mod.rs new file mode 100644 index 000000000..6a9695d20 --- /dev/null +++ b/src/store/mod.rs @@ -0,0 +1,3 @@ +mod file_writer; + +pub use file_writer::FileWriter; diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 000000000..e5e275a8d --- /dev/null +++ b/src/util.rs @@ -0,0 +1,112 @@ +use crate::{Error, Result}; +use core::fmt; +use rust_decimal::prelude::*; +use serde::{de::Deserializer, Deserialize, Serialize}; +use std::str::FromStr; + +macro_rules! decimal_scalar { + ($stype:ident, $scalar:literal, $scale:literal) => { + #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] + pub struct $stype(Decimal); + + impl FromStr for $stype { + type Err = Error; + + fn from_str(s: &str) -> Result { + match Decimal::from_str(s).or_else(|_| Decimal::from_scientific(s)) { + Ok(data) if data.scale() > 8 => Err(Error::decimals(s)), + Ok(data) => Ok(Self(data)), + Err(_) => Err(Error::decimals(s)), + } + } + } + + impl fmt::Display for $stype { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } + } + + impl $stype { + pub fn new(d: Decimal) -> Self { + Self(d) + } + + pub fn get_decimal(&self) -> Decimal { + self.0 + } + + pub fn deserialize<'de, D>(d: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let val = u64::deserialize(d)?; + Ok(Self::from(val)) + } + + pub fn deserialize_option<'de, D>(d: D) -> std::result::Result, D::Error> + where + D: Deserializer<'de>, + { + let v: Option = Option::deserialize(d)?; + if let Some(val) = v { + Ok(Some(Self::from(val))) + } else { + Ok(None) + } + } + } + + impl From for $stype { + fn from(v: u64) -> Self { + if let Some(mut data) = Decimal::from_u64(v) { + data.set_scale($scale).unwrap(); + return Self(data); + } + panic!("u64 could not be converted into Decimal") + } + } + + impl From for $stype { + fn from(mut v: Decimal) -> Self { + // rescale the decimal back to 8 digits of precision + v.rescale(8); + Self(v) + } + } + + impl From<$stype> for u64 { + fn from(v: $stype) -> Self { + if let Some(scaled_dec) = v.0.checked_mul($scalar.into()) { + if let Some(num) = scaled_dec.to_u64() { + return num; + } + } + panic!("Invalid scaled decimal construction") + } + } + + impl From for $stype { + fn from(v: i32) -> Self { + if let Some(mut data) = Decimal::from_i32(v) { + data.set_scale($scale).unwrap(); + return Self(data); + } + panic!("u64 could not be converted into Decimal") + } + } + + impl From<$stype> for i32 { + fn from(v: $stype) -> Self { + if let Some(scaled_dec) = v.0.checked_mul($scalar.into()) { + if let Some(num) = scaled_dec.to_i32() { + return num; + } + } + panic!("Invalid scaled decimal construction") + } + } + }; +} + +decimal_scalar!(Mobile, 100_000_000, 8); diff --git a/src/uuid.rs b/src/uuid.rs index 49e4ba8b5..1790d6ced 100644 --- a/src/uuid.rs +++ b/src/uuid.rs @@ -17,6 +17,12 @@ impl Deref for Uuid { } } +impl Uuid { + pub fn nil() -> Self { + Self(sqlx::types::Uuid::nil()) + } +} + impl<'de> Deserialize<'de> for Uuid { fn deserialize(deserializer: D) -> std::result::Result where