From e1f3ffb6b175dd42e466b8ef30359052de466d12 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Fri, 15 Nov 2024 13:05:25 +0200 Subject: [PATCH 1/2] tests work --- Cargo.lock | 130 +++++++++++++++++++++------ Cargo.toml | 2 + mobile_config/Cargo.toml | 2 + mobile_config/src/gateway_info.rs | 94 ++++++++++++++----- mobile_config/src/gateway_service.rs | 6 +- 5 files changed, 182 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fbcebc3ee..1edac19bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1228,7 +1228,7 @@ dependencies = [ "aws-smithy-http-tower 0.51.0", "aws-smithy-types 0.51.0", "bytes", - "fastrand", + "fastrand 1.8.0", "http 0.2.11", "http-body 0.4.5", "hyper 0.14.28", @@ -1251,7 +1251,7 @@ dependencies = [ "aws-smithy-http-tower 0.54.4", "aws-smithy-types 0.54.4", "bytes", - "fastrand", + "fastrand 1.8.0", "http 0.2.11", "http-body 0.4.5", "pin-project-lite", @@ -2762,7 +2762,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.1", + "hashbrown 0.14.5", "lock_api", "once_cell", "parking_lot_core 0.9.8", @@ -3140,6 +3140,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -3173,6 +3183,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fastrand" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" + [[package]] name = "feature-probe" version = "0.1.1" @@ -3390,7 +3406,7 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" dependencies = [ - "fastrand", + "fastrand 1.8.0", "futures-core", "futures-io", "memchr", @@ -3669,9 +3685,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.1" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash 0.8.11", ] @@ -4258,7 +4274,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", - "hashbrown 0.14.1", + "hashbrown 0.14.5", "serde", ] @@ -4313,6 +4329,17 @@ dependencies = [ "triggered", ] +[[package]] +name = "inherent" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0122b7114117e64a63ac49f752a5ca4624d534c7b1c7de796ac196381cd2d947" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", +] + [[package]] name = "inotify" version = "0.9.6" @@ -4728,9 +4755,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libflate" @@ -4827,6 +4854,12 @@ dependencies = [ "cc", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" + [[package]] name = "lock_api" version = "0.4.10" @@ -4965,7 +4998,7 @@ checksum = "8b07a5eb561b8cbc16be2d216faf7757f9baf3bfb94dbb0fae3df8387a5bb47f" dependencies = [ "crossbeam-epoch", "crossbeam-utils", - "hashbrown 0.14.1", + "hashbrown 0.14.5", "metrics", "num_cpus", "quanta", @@ -5044,6 +5077,8 @@ dependencies = [ "prost", "rand 0.8.5", "retainer", + "sea-query", + "sea-query-binder", "serde", "serde_json", "solana-sdk", @@ -6060,7 +6095,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", - "heck 0.4.0", + "heck 0.5.0", "itertools", "log", "multimap", @@ -6469,15 +6504,6 @@ version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" -[[package]] -name = "remove_dir_all" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" -dependencies = [ - "winapi", -] - [[package]] name = "rend" version = "0.4.2" @@ -6844,6 +6870,19 @@ dependencies = [ "nom", ] +[[package]] +name = "rustix" +version = "0.38.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" +dependencies = [ + "bitflags 2.5.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "rustls" version = "0.20.9" @@ -6984,6 +7023,44 @@ dependencies = [ "untrusted 0.7.1", ] +[[package]] +name = "sea-query" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4fd043b8117af233e221f73e3ea8dfbc8e8c3c928017c474296db45c649105c" +dependencies = [ + "chrono", + "inherent", + "sea-query-derive", + "serde_json", +] + +[[package]] +name = "sea-query-binder" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "754965d4aee6145bec25d0898e5c931e6c22859789ce62fd85a42a15ed5a8ce3" +dependencies = [ + "chrono", + "sea-query", + "serde_json", + "sqlx", +] + +[[package]] +name = "sea-query-derive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9834af2c4bd8c5162f00c89f1701fb6886119a88062cf76fe842ea9e232b9839" +dependencies = [ + "darling 0.20.5", + "heck 0.4.0", + "proc-macro2", + "quote", + "syn 2.0.58", + "thiserror", +] + [[package]] name = "seahash" version = "4.1.0" @@ -8665,6 +8742,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", + "serde_json", "sha2 0.10.8", "sqlx-core", "sqlx-rt", @@ -8836,16 +8914,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.3.0" +version = "3.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", - "fastrand", - "libc", - "redox_syscall 0.2.16", - "remove_dir_all", - "winapi", + "fastrand 2.2.0", + "rustix", + "windows-sys 0.52.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7388ff0ad..f3d47e8cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,8 @@ tokio-util = "0" uuid = { version = "1", features = ["v4", "serde"] } tower-http = { version = "0", features = ["trace"] } derive_builder = "0" +sea-query = { version = "=0.31", default-features = false, features = ["derive", "backend-postgres", "with-chrono", "with-json"] } +sea-query-binder = { version = "0", default-features = false, features = ["sqlx-postgres", "with-chrono", "with-json"] } [patch.crates-io] # v0.7.0-alpha.3 diff --git a/mobile_config/Cargo.toml b/mobile_config/Cargo.toml index f413f4cba..5f6d70a9e 100644 --- a/mobile_config/Cargo.toml +++ b/mobile_config/Cargo.toml @@ -46,6 +46,8 @@ solana-sdk = { workspace = true } custom-tracing = { path = "../custom_tracing", features = ["grpc"] } humantime-serde = { workspace = true } coverage-map = { path = "../coverage_map" } +sea-query = { workspace = true } +sea-query-binder = { workspace = true } [dev-dependencies] rand = { workspace = true } diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index 30a49b9e4..f68ad1f18 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -150,9 +150,29 @@ pub(crate) mod db { use chrono::{DateTime, Utc}; use futures::stream::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; + use serde_json::json; use sqlx::{types::Json, PgExecutor, Row}; use std::str::FromStr; + use sea_query::*; + + #[derive(Iden)] + enum MobileHotspotInfos { + Table, + Asset, + Location, + DeviceType, + RefreshedAt, + CreatedAt, + } + + #[derive(Iden)] + enum KeyToAssets { + Table, + Asset, + EntityKey, + } + const GET_METADATA_SQL: &str = r#" select kta.entity_key, infos.location::bigint, infos.device_type, infos.refreshed_at, infos.created_at @@ -168,7 +188,6 @@ pub(crate) mod db { where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#); static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET); - } pub async fn get_info( @@ -201,31 +220,60 @@ pub(crate) mod db { .boxed()) } + use sea_query::{Expr, Iden, PostgresQueryBuilder, Query}; + use sea_query_binder::{SqlxBinder, SqlxValues}; + + // Returns (sql query string, values) + pub fn prepare_all_info_stream_query( + device_types: &[DeviceType], + min_refreshed_at: DateTime, + ) -> (String, SqlxValues) { + let mut binding = Query::select(); + let query = binding + .expr(Expr::col(MobileHotspotInfos::Location).cast_as(Alias::new("bigint"))) + .column(MobileHotspotInfos::DeviceType) + .column(MobileHotspotInfos::RefreshedAt) + .column(MobileHotspotInfos::CreatedAt) + .column(KeyToAssets::EntityKey) + .from(MobileHotspotInfos::Table) + .and_where( + Expr::col(MobileHotspotInfos::RefreshedAt) + .gte(min_refreshed_at) + .or(Expr::col(MobileHotspotInfos::RefreshedAt) + .is_null() + .and(Expr::col(MobileHotspotInfos::CreatedAt).gte(min_refreshed_at))), + ) + .inner_join( + KeyToAssets::Table, + Expr::col((KeyToAssets::Table, KeyToAssets::Asset)) + .equals((MobileHotspotInfos::Table, MobileHotspotInfos::Asset)), + ); + let query = if device_types.is_empty() { + query + } else { + let device_types = device_types + .iter() + // The device_types field has a jsonb type but is being used as a string, + // which forces us to add quotes. + // .map(|v| json!(format!("{}", v))) + .map(|v| json!(v.to_string())) + .collect::>(); + dbg!(&device_types); + query.and_where(Expr::col(MobileHotspotInfos::DeviceType).is_in(device_types)) + }; + + query.build_sqlx(PostgresQueryBuilder) + } + pub fn all_info_stream<'a>( db: impl PgExecutor<'a> + 'a, - device_types: &'a [DeviceType], - min_refreshed_at: DateTime, + query: (&'a str, SqlxValues), ) -> impl Stream + 'a { - match device_types.is_empty() { - true => sqlx::query_as::<_, GatewayInfo>(&GET_METADATA_SQL_REFRESHED_AT) - .bind(min_refreshed_at) - .fetch(db) - .filter_map(|metadata| async move { metadata.ok() }) - .boxed(), - false => sqlx::query_as::<_, GatewayInfo>(&DEVICE_TYPES_METADATA_SQL) - .bind(min_refreshed_at) - .bind( - device_types - .iter() - // The device_types field has a jsonb type but is being used as a string, - // which forces us to add quotes. - .map(|v| format!("\"{}\"", v)) - .collect::>(), - ) - .fetch(db) - .filter_map(|metadata| async move { metadata.ok() }) - .boxed(), - } + println!("{}", &query.0); + sqlx::query_as_with::<_, GatewayInfo, _>(&query.0, query.1) + .fetch(db) + .filter_map(|metadata| async move { metadata.ok() }) + .boxed() } impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for GatewayInfo { diff --git a/mobile_config/src/gateway_service.rs b/mobile_config/src/gateway_service.rs index 9932f4341..a13b18fc9 100644 --- a/mobile_config/src/gateway_service.rs +++ b/mobile_config/src/gateway_service.rs @@ -1,5 +1,5 @@ use crate::{ - gateway_info::{self, DeviceType, GatewayInfo}, + gateway_info::{self, db::prepare_all_info_stream_query, DeviceType, GatewayInfo}, key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; @@ -176,7 +176,9 @@ impl mobile_config::Gateway for GatewayService { ); tokio::spawn(async move { - let stream = gateway_info::db::all_info_stream(&pool, &device_types, min_refreshed_at); + let (sql_query, values) = + prepare_all_info_stream_query(&device_types, min_refreshed_at); + let stream = gateway_info::db::all_info_stream(&pool, (&sql_query, values)); stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await }); From ffd99c64df97be52dc2889c998aee9a034fb52ba Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Fri, 15 Nov 2024 13:18:40 +0200 Subject: [PATCH 2/2] sea-query works --- mobile_config/src/gateway_info.rs | 52 +++++++++++++------------- mobile_config/tests/gateway_service.rs | 2 + 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/mobile_config/src/gateway_info.rs b/mobile_config/src/gateway_info.rs index f68ad1f18..80ea25c7c 100644 --- a/mobile_config/src/gateway_info.rs +++ b/mobile_config/src/gateway_info.rs @@ -180,14 +180,9 @@ pub(crate) mod db { join key_to_assets kta on infos.asset = kta.asset "#; const BATCH_SQL_WHERE_SNIPPET: &str = " where kta.entity_key = any($1::bytea[]) "; - const DEVICE_TYPES_AND_SNIPPET: &str = " and device_type::text = any($2) "; lazy_static::lazy_static! { static ref BATCH_METADATA_SQL: String = format!("{GET_METADATA_SQL} {BATCH_SQL_WHERE_SNIPPET}"); - static ref GET_METADATA_SQL_REFRESHED_AT: String = format!(r#"{GET_METADATA_SQL} - where ( infos.refreshed_at >= $1 OR (infos.refreshed_at IS NULL AND infos.created_at >= $1) ) "#); - - static ref DEVICE_TYPES_METADATA_SQL: String = format!("{} {}", *GET_METADATA_SQL_REFRESHED_AT, DEVICE_TYPES_AND_SNIPPET); } pub async fn get_info( @@ -229,25 +224,32 @@ pub(crate) mod db { min_refreshed_at: DateTime, ) -> (String, SqlxValues) { let mut binding = Query::select(); - let query = binding - .expr(Expr::col(MobileHotspotInfos::Location).cast_as(Alias::new("bigint"))) - .column(MobileHotspotInfos::DeviceType) - .column(MobileHotspotInfos::RefreshedAt) - .column(MobileHotspotInfos::CreatedAt) - .column(KeyToAssets::EntityKey) - .from(MobileHotspotInfos::Table) - .and_where( - Expr::col(MobileHotspotInfos::RefreshedAt) - .gte(min_refreshed_at) - .or(Expr::col(MobileHotspotInfos::RefreshedAt) + let query = + binding + .expr(Expr::col(MobileHotspotInfos::Location).cast_as(Alias::new("bigint"))) + .column((MobileHotspotInfos::Table, MobileHotspotInfos::DeviceType)) + .column((MobileHotspotInfos::Table, MobileHotspotInfos::RefreshedAt)) + .column((MobileHotspotInfos::Table, MobileHotspotInfos::CreatedAt)) + .column(KeyToAssets::EntityKey) + .from(MobileHotspotInfos::Table) + .and_where( + Expr::col((MobileHotspotInfos::Table, MobileHotspotInfos::RefreshedAt)) + .gte(min_refreshed_at) + .or(Expr::col(( + MobileHotspotInfos::Table, + MobileHotspotInfos::RefreshedAt, + )) .is_null() - .and(Expr::col(MobileHotspotInfos::CreatedAt).gte(min_refreshed_at))), - ) - .inner_join( - KeyToAssets::Table, - Expr::col((KeyToAssets::Table, KeyToAssets::Asset)) - .equals((MobileHotspotInfos::Table, MobileHotspotInfos::Asset)), - ); + .and( + Expr::col((MobileHotspotInfos::Table, MobileHotspotInfos::CreatedAt)) + .gte(min_refreshed_at), + )), + ) + .inner_join( + KeyToAssets::Table, + Expr::col((KeyToAssets::Table, KeyToAssets::Asset)) + .equals((MobileHotspotInfos::Table, MobileHotspotInfos::Asset)), + ); let query = if device_types.is_empty() { query } else { @@ -258,7 +260,6 @@ pub(crate) mod db { // .map(|v| json!(format!("{}", v))) .map(|v| json!(v.to_string())) .collect::>(); - dbg!(&device_types); query.and_where(Expr::col(MobileHotspotInfos::DeviceType).is_in(device_types)) }; @@ -269,8 +270,7 @@ pub(crate) mod db { db: impl PgExecutor<'a> + 'a, query: (&'a str, SqlxValues), ) -> impl Stream + 'a { - println!("{}", &query.0); - sqlx::query_as_with::<_, GatewayInfo, _>(&query.0, query.1) + sqlx::query_as_with::<_, GatewayInfo, _>(query.0, query.1) .fetch(db) .filter_map(|metadata| async move { metadata.ok() }) .boxed() diff --git a/mobile_config/tests/gateway_service.rs b/mobile_config/tests/gateway_service.rs index 0ffaa2cc5..b778685f1 100644 --- a/mobile_config/tests/gateway_service.rs +++ b/mobile_config/tests/gateway_service.rs @@ -218,6 +218,8 @@ async fn gateway_stream_info_refreshed_at_is_null(pool: PgPool) { assert!(stream.next().await.is_none()); } +// TODO two device_types test + #[sqlx::test] async fn gateway_stream_info_data_types(pool: PgPool) { let admin_key = make_keypair();