From 349d3516eb1d7bbc0a98131616c4e87b4aa5c91e Mon Sep 17 00:00:00 2001 From: parabala <115564000+parabala@users.noreply.github.com> Date: Fri, 6 Sep 2024 15:39:00 +0800 Subject: [PATCH] Revert "Main kvector hybrid 202405" --- Cargo.lock | 17 +- ds/src/mem/ring_slice.rs | 3 - endpoint/src/topo.rs | 1 - endpoint/src/vector/batch.rs | 162 ------------- endpoint/src/vector/config.rs | 35 +-- endpoint/src/vector/mod.rs | 1 - endpoint/src/vector/strategy.rs | 132 ++--------- endpoint/src/vector/topo.rs | 301 ++++--------------------- protocol/Cargo.toml | 1 - protocol/src/callback.rs | 82 +------ protocol/src/flag.rs | 50 ++-- protocol/src/kv/common/constants.rs | 1 - protocol/src/kv/common/error/mod.rs | 2 - protocol/src/kv/common/query_result.rs | 1 - protocol/src/parser.rs | 62 +---- protocol/src/redis/packet.rs | 7 +- protocol/src/req.rs | 11 +- protocol/src/request.rs | 27 +-- protocol/src/vector/attachment.rs | 213 ----------------- protocol/src/vector/mod.rs | 124 ++++------ protocol/src/vector/mysql.rs | 136 +---------- protocol/src/vector/packet.rs | 70 +----- protocol/src/vector/query_result.rs | 45 ++-- protocol/src/vector/redis.rs | 9 +- protocol/src/vector/reqpacket.rs | 25 +- protocol/src/vector/rsppacket.rs | 14 +- stream/src/context.rs | 8 +- stream/src/handler.rs | 2 +- stream/src/pipeline.rs | 12 +- stream/src/topology.rs | 3 - tests/src/benches/redis.rs | 4 - tests/src/layout.rs | 6 +- tests/src/mq/protocol.rs | 67 +----- tests/src/proto_hook.rs | 4 - 34 files changed, 239 insertions(+), 1399 deletions(-) delete mode 100644 endpoint/src/vector/batch.rs delete mode 100644 protocol/src/vector/attachment.rs diff --git a/Cargo.lock b/Cargo.lock index 7442fea11..754e07b9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -396,9 +396,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.8" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504bdec147f2cc13c8b57ed9401fd8a147cc66b67ad5cb241394244f2c947549" +checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" dependencies = [ "jobserver", "libc", @@ -2076,9 +2076,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.3" +version = "0.36.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" +checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e" dependencies = [ "memchr", ] @@ -2394,7 +2394,6 @@ dependencies = [ "byteorder", "bytes", "chrono", - "chrono-tz", "ctor 0.1.26", "ds", "enum_dispatch", @@ -2831,9 +2830,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.205" +version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33aedb1a7135da52b7c21791455563facbbcc43d0f0f66165b42c21b3dfb150" +checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" dependencies = [ "serde_derive", ] @@ -2850,9 +2849,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.205" +version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692d6f5ac90220161d6774db30c662202721e64aed9058d2c394f451261420c1" +checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", diff --git a/ds/src/mem/ring_slice.rs b/ds/src/mem/ring_slice.rs index 630c9d663..aaa248b05 100644 --- a/ds/src/mem/ring_slice.rs +++ b/ds/src/mem/ring_slice.rs @@ -85,9 +85,6 @@ impl RingSlice { #[inline] pub fn try_str_num(&self, r: impl Range) -> Option { let (start, end) = r.range(self); - if start == end { - return None; - } let mut num = 0usize; for i in start..end { if !self[i].is_ascii_digit() { diff --git a/endpoint/src/topo.rs b/endpoint/src/topo.rs index 07d81da95..2f231f03c 100644 --- a/endpoint/src/topo.rs +++ b/endpoint/src/topo.rs @@ -36,7 +36,6 @@ procs::topology_dispatcher! { pub trait Topology : Endpoint + Hash{ fn exp_sec(&self) -> u32 {86400} - fn has_attach(&self) -> bool {false} } => where P:Protocol, E:Endpoint, R:Request, Topologies: Endpoint trait Inited { diff --git a/endpoint/src/vector/batch.rs b/endpoint/src/vector/batch.rs deleted file mode 100644 index e963326b1..000000000 --- a/endpoint/src/vector/batch.rs +++ /dev/null @@ -1,162 +0,0 @@ -use super::strategy::Postfix; -use chrono::{Datelike, NaiveDate}; -use chrono_tz::Tz; -use core::fmt::Write; -use ds::RingSlice; -use protocol::Error; -use sharding::{distribution::DBRange, hash::Hasher}; - -#[derive(Clone, Debug)] -pub struct Batch { - db_prefix: String, - table_prefix: String, - table_postfix: Postfix, - hasher: Hasher, - distribution: DBRange, - keys_name: Vec, - si_cols: Vec, - si: Si, -} - -impl Batch { - pub fn new_with_db( - db_prefix: String, - table_prefix: String, - db_count: u32, - shards: u32, - table_postfix: Postfix, - keys_name: Vec, - si_cols: Vec, - si_db_prefix: String, - si_db_count: u32, - si_table_prefix: String, - si_table_count: u32, - si_shards: u32, - ) -> Self { - Self { - db_prefix, - table_prefix, - table_postfix, - distribution: DBRange::new(db_count as usize, 1usize, shards as usize), - hasher: Hasher::from("crc32"), - keys_name, - si_cols, - si: Si::new( - si_db_prefix, - si_db_count, - si_table_prefix, - si_table_count, - si_shards, - ), - } - } - - pub fn distribution(&self) -> &DBRange { - &self.distribution - } - - pub fn si_distribution(&self) -> &DBRange { - self.si.distribution() - } - - pub fn hasher(&self) -> &Hasher { - &self.hasher - } - - pub fn get_date(&self, _: &[RingSlice]) -> Result { - let now = chrono::Utc::now().with_timezone(&Tz::Asia__Shanghai); - Ok(NaiveDate::from_ymd_opt(now.year(), now.month(), now.day()).unwrap()) - } - - pub fn write_dname_with_hash(&self, buf: &mut impl Write, hash: i64) { - let db_idx: usize = self.distribution.db_idx(hash); - let _ = write!(buf, "{}_{}", self.db_prefix, db_idx); - } - - pub fn write_tname_with_date(&self, buf: &mut impl Write, date: &NaiveDate) { - let (mut year, month, day) = (date.year(), date.month(), date.day()); - year %= 100; - match self.table_postfix { - Postfix::YYMM => { - let _ = write!(buf, "{}_{:02}{:02}", &self.table_prefix, year, month); - } - //Postfix::YYMMDD - _ => { - let _ = write!( - buf, - "{}_{:02}{:02}{:02}", - &self.table_prefix, year, month, day - ); - } - } - } - - pub fn write_database_table(&self, buf: &mut impl Write, date: &NaiveDate, hash: i64) { - self.write_dname_with_hash(buf, hash); - let _ = buf.write_char('.'); - self.write_tname_with_date(buf, date) - } - - pub(crate) fn write_si_database_table(&self, buf: &mut impl Write, hash: i64) { - self.si.write_database_table(buf, hash) - } - - pub(crate) fn condition_keys(&self) -> Box> + '_> { - Box::new(self.keys_name.iter().map(|x| Some(x))) - } - - pub(crate) fn keys(&self) -> &[String] { - &self.keys_name - } - - // pub(crate) fn get_next_date(&self, year: u16, month: u8) -> NaiveDate { - // if month == 1 { - // return NaiveDate::from_ymd_opt((year - 1).into(), 12, 1).unwrap(); - // } else { - // return NaiveDate::from_ymd_opt(year.into(), (month - 1).into(), 1).unwrap(); - // } - // } - - pub(crate) fn batch(&self, limit: u64, _: &protocol::vector::VectorCmd) -> u64 { - limit - } - - pub(crate) fn si_cols(&self) -> &[String] { - &self.si_cols - } -} - -#[derive(Clone, Debug)] -struct Si { - db_prefix: String, - table_prefix: String, - distribution: DBRange, -} - -impl Si { - fn new( - db_prefix: String, - db_count: u32, - table_prefix: String, - table_count: u32, - shards: u32, - ) -> Self { - Self { - db_prefix: db_prefix, - table_prefix: table_prefix, - distribution: DBRange::new(db_count as usize, table_count as usize, shards as usize), - } - } - fn distribution(&self) -> &DBRange { - &self.distribution - } - fn write_database_table(&self, buf: &mut impl Write, hash: i64) { - let db_idx = self.distribution.db_idx(hash); - let table_idx = self.distribution.table_idx(hash); - let _ = write!( - buf, - "{}_{}.{}_{}", - self.db_prefix, db_idx, self.table_prefix, table_idx - ); - } -} diff --git a/endpoint/src/vector/config.rs b/endpoint/src/vector/config.rs index fe9fe870f..82296a314 100644 --- a/endpoint/src/vector/config.rs +++ b/endpoint/src/vector/config.rs @@ -14,8 +14,6 @@ pub struct VectorNamespace { pub(crate) backends_flaten: Vec, #[serde(default)] pub(crate) backends: HashMap>, - #[serde(default)] - pub(crate) si_backends: Vec, } #[derive(Debug, Clone, Default, Deserialize, Serialize)] @@ -46,20 +44,6 @@ pub struct Basic { pub(crate) user: String, #[serde(default)] pub(crate) region_enabled: bool, - #[serde(default)] - pub(crate) si_db_name: String, - #[serde(default)] - pub(crate) si_table_name: String, - #[serde(default)] - pub(crate) si_db_count: u32, - #[serde(default)] - pub(crate) si_table_count: u32, - #[serde(default)] - pub(crate) si_user: String, - #[serde(default)] - pub(crate) si_password: String, - #[serde(default)] - pub(crate) si_cols: Vec, } impl VectorNamespace { @@ -82,7 +66,7 @@ impl VectorNamespace { } last_year = year.1; } - match ns.decrypt_password(ns.basic.password.as_bytes()) { + match ns.decrypt_password() { Ok(password) => ns.basic.password = password, Err(e) => { log::warn!("failed to decrypt password, e:{}", e); @@ -93,19 +77,6 @@ impl VectorNamespace { init.extend_from_slice(b.1); init }); - if ns.basic.si_password.len() > 0 { - match ns.decrypt_password(ns.basic.si_password.as_bytes()) { - Ok(password) => ns.basic.si_password = password, - Err(e) => { - log::warn!("failed to decrypt si password, e:{}", e); - return None; - } - } - } - if ns.si_backends.len() > 0 { - ns.backends_flaten - .extend_from_slice(ns.si_backends.as_slice()); - } Some(ns) } Err(e) => { @@ -116,9 +87,9 @@ impl VectorNamespace { } #[inline] - fn decrypt_password(&self, data: &[u8]) -> Result> { + fn decrypt_password(&self) -> Result> { let key_pem = fs::read_to_string(&context::get().key_path)?; - let encrypted_data = general_purpose::STANDARD.decode(data)?; + let encrypted_data = general_purpose::STANDARD.decode(self.basic.password.as_bytes())?; let decrypted_data = ds::decrypt::decrypt_password(&key_pem, &encrypted_data)?; let decrypted_string = String::from_utf8(decrypted_data)?; Ok(decrypted_string) diff --git a/endpoint/src/vector/mod.rs b/endpoint/src/vector/mod.rs index 842ec70e6..a947c7944 100644 --- a/endpoint/src/vector/mod.rs +++ b/endpoint/src/vector/mod.rs @@ -1,4 +1,3 @@ -mod batch; pub(crate) mod config; mod strategy; pub mod topo; diff --git a/endpoint/src/vector/strategy.rs b/endpoint/src/vector/strategy.rs index acc0deebf..9367afb7a 100644 --- a/endpoint/src/vector/strategy.rs +++ b/endpoint/src/vector/strategy.rs @@ -7,14 +7,12 @@ use protocol::Result; use sharding::distribution::DBRange; use sharding::hash::Hasher; -use super::batch::Batch; use super::config::VectorNamespace; use super::vectortime::VectorTime; #[derive(Debug, Clone)] pub enum Strategist { VectorTime(VectorTime), - Batch(Batch), } impl Default for Strategist { @@ -36,121 +34,51 @@ impl Default for Strategist { //1. 数据库表名的格式如 table_yymm //2. 库名表名后缀如何计算 impl Strategist { - pub fn try_from(ns: &VectorNamespace) -> Option { - Some(match ns.basic.strategy.as_str() { - "aggregation" => { - //至少需要date和count两个字段名 - if ns.basic.si_cols.len() < 2 || ns.basic.keys.len() != 1 { - log::warn!("len si_cols < 2 or len keys != 1"); - return None; - } - Self::Batch(Batch::new_with_db( - ns.basic.db_name.clone(), - ns.basic.table_name.clone(), - ns.basic.db_count, - //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 - ns.backends.iter().next().unwrap().1.len() as u32, - ns.basic.table_postfix.as_str().into(), - ns.basic.keys.clone(), - ns.basic.si_cols.clone(), - ns.basic.si_db_name.clone(), - ns.basic.si_db_count, - ns.basic.si_table_name.clone(), - ns.basic.si_table_count, - ns.si_backends.len() as u32, - )) - } - _ => Self::VectorTime(VectorTime::new_with_db( - ns.basic.db_name.clone(), - ns.basic.table_name.clone(), - ns.basic.db_count, - //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 - ns.backends.iter().next().unwrap().1.len() as u32, - ns.basic.table_postfix.as_str().into(), - ns.basic.keys.clone(), - )), - }) + pub fn try_from(ns: &VectorNamespace) -> Self { + Self::VectorTime(VectorTime::new_with_db( + ns.basic.db_name.clone(), + ns.basic.table_name.clone(), + ns.basic.db_count, + //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 + ns.backends.iter().next().unwrap().1.len() as u32, + ns.basic.table_postfix.as_str().into(), + ns.basic.keys.clone(), + )) } #[inline] pub fn distribution(&self) -> &DBRange { match self { Strategist::VectorTime(inner) => inner.distribution(), - Strategist::Batch(inner) => inner.distribution(), - } - } - #[inline] - pub fn si_distribution(&self) -> &DBRange { - match self { - Strategist::VectorTime(_) => panic!("not support"), - Strategist::Batch(inner) => inner.si_distribution(), } } #[inline] pub fn hasher(&self) -> &Hasher { match self { Strategist::VectorTime(inner) => inner.hasher(), - Strategist::Batch(inner) => inner.hasher(), } } #[inline] pub fn get_date(&self, keys: &[RingSlice]) -> Result { match self { Strategist::VectorTime(inner) => inner.get_date(keys), - Strategist::Batch(inner) => inner.get_date(keys), - } - } - // 请求成功后,是否有更多的数据需要请求 - #[inline] - pub fn more(&self) -> bool { - match self { - Strategist::VectorTime(_) => false, - Strategist::Batch(_) => true, } } - - // pub(crate) fn get_next_date(&self, year: u16, month: u8) -> NaiveDate { - // match self { - // Strategist::VectorTime(_) => panic!("VectorTime not support get_next_date"), - // Strategist::Batch(inner) => inner.get_next_date(year, month), - // } - // } } impl protocol::vector::Strategy for Strategist { fn keys(&self) -> &[String] { match self { Strategist::VectorTime(inner) => inner.keys(), - Strategist::Batch(inner) => inner.keys(), } } fn condition_keys(&self) -> Box> + '_> { match self { Strategist::VectorTime(inner) => inner.condition_keys(), - Strategist::Batch(inner) => inner.condition_keys(), } } fn write_database_table(&self, buf: &mut impl Write, date: &NaiveDate, hash: i64) { match self { Strategist::VectorTime(inner) => inner.write_database_table(buf, date, hash), - Strategist::Batch(inner) => inner.write_database_table(buf, date, hash), - } - } - fn write_si_database_table(&self, buf: &mut impl Write, hash: i64) { - match self { - Strategist::VectorTime(_) => panic!("not support"), - Strategist::Batch(inner) => inner.write_si_database_table(buf, hash), - } - } - fn batch(&self, limit: u64, vcmd: &protocol::vector::VectorCmd) -> u64 { - match self { - Strategist::VectorTime(_) => 0, - Strategist::Batch(inner) => inner.batch(limit, vcmd), - } - } - fn si_cols(&self) -> &[String] { - match self { - Strategist::VectorTime(_) => panic!("not support"), - Strategist::Batch(inner) => inner.si_cols(), } } } @@ -201,13 +129,6 @@ mod tests { password: Default::default(), user: Default::default(), region_enabled: Default::default(), - si_db_name: Default::default(), - si_table_name: Default::default(), - si_db_count: Default::default(), - si_table_count: Default::default(), - si_user: Default::default(), - si_password: Default::default(), - si_cols: Default::default(), }, backends_flaten: Default::default(), backends: HashMap::from([( @@ -217,9 +138,8 @@ mod tests { "127.0.0.1:8081,127.0.0.2:8081".into(), ], )]), - si_backends: Default::default(), }; - let strategy = Strategist::try_from(&ns).unwrap(); + let strategy = Strategist::try_from(&ns); let mut buf = String::new(); let buf = &mut buf; // vrange @@ -240,8 +160,7 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = - SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); + let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); @@ -265,8 +184,7 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = - SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); + let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -313,8 +231,7 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = - SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); + let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -356,8 +273,7 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = - SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); + let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -397,8 +313,7 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = - SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); + let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -451,8 +366,7 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = - SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); + let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -494,8 +408,7 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = - SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); + let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -523,8 +436,7 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = - SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); + let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -549,8 +461,7 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = - SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); + let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); @@ -597,8 +508,7 @@ mod tests { }; let hash = strategy.hasher().hash(&"id".as_bytes()); let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); - let builder = - SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); + let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy).unwrap(); buf.clear(); builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); diff --git a/endpoint/src/vector/topo.rs b/endpoint/src/vector/topo.rs index 0aa91289f..0b70eb087 100644 --- a/endpoint/src/vector/topo.rs +++ b/endpoint/src/vector/topo.rs @@ -1,13 +1,11 @@ use std::collections::HashMap; -use chrono::{Datelike, NaiveDate}; +use chrono::Datelike; use discovery::dns; use discovery::dns::IPPort; use discovery::TopologyWrite; use ds::MemGuard; use protocol::kv::{ContextStatus, MysqlBuilder}; -use protocol::vector::attachment::{VAttach, VecAttach}; -use protocol::vector::redis::parse_vector_detail; use protocol::Protocol; use protocol::Request; use protocol::ResOption; @@ -17,7 +15,7 @@ use sharding::hash::{Hash, HashKey}; use crate::dns::DnsConfig; use crate::Timeout; use crate::{Endpoint, Topology}; -use protocol::vector::mysql::{SiSqlBuilder, SqlBuilder}; +use protocol::vector::mysql::SqlBuilder; use super::config::VectorNamespace; use super::strategy::Strategist; @@ -27,7 +25,6 @@ use crate::shards::Shard; #[derive(Clone)] pub struct VectorService { shards: Shards, - si_shard: Vec>, strategist: Strategist, parser: P, cfg: Box>, @@ -39,7 +36,6 @@ impl From

for VectorService { Self { parser, shards: Default::default(), - si_shard: Default::default(), strategist: Default::default(), cfg: Default::default(), } @@ -63,164 +59,59 @@ where Req: Request, P: Protocol, { - fn has_attach(&self) -> bool { - self.strategist.more() - } } -impl VectorService +impl Endpoint for VectorService where E: Endpoint, Req: Request, P: Protocol, { - fn get_shard(&self, req: &mut Req) -> Result<&Shard, protocol::Error> { - let (year, shard_idx) = if req.ctx_mut().runs == 0 { - let vcmd = parse_vector_detail(****req, req.flag())?; - //定位年库 - let date = self.strategist.get_date(&vcmd.keys)?; - let year = date.year() as u16; - - let shard_idx = self.shard_idx(req.hash()); - req.ctx_mut().year = year; - req.ctx_mut().shard_idx = shard_idx as u16; - - let vector_builder = SqlBuilder::new(&vcmd, req.hash(), date, &self.strategist, 0)?; - let cmd = MysqlBuilder::build_packets_for_vector(vector_builder)?; - req.reshape(MemGuard::from_vec(cmd)); + type Item = Req; - (year, shard_idx) - } else { - (req.ctx_mut().year, req.ctx_mut().shard_idx as usize) - }; + fn send(&self, mut req: Self::Item) { + let shard = (|| -> Result<&Shard, protocol::Error> { + let (year, shard_idx) = if req.ctx_mut().runs == 0 { + let vcmd = protocol::vector::redis::parse_vector_detail(&req)?; + //定位年库 + let date = self.strategist.get_date(&vcmd.keys)?; + let year = date.year() as u16; - let shards = self.shards.get(year); - let shard = shards.get(shard_idx).ok_or(protocol::Error::TopInvalid)?; - log::debug!( - "+++ mysql {} send {} year {} shards {:?} => {:?}", - self.cfg.service, - shard_idx, - year, - shards, - req - ); - Ok(shard) - } - fn more_get_shard(&self, req: &mut Req) -> Result<&Shard, protocol::Error> { - req.attachment_mut() - .get_or_insert(VecAttach::default().to_attach()); - //分别代表请求的轮次和每轮重试次数 - let (round, runs) = (req.attach().round, req.ctx_mut().runs); - //runs == 0 表示第一轮第一次请求 - let shard = if runs == 0 || req.attach().rsp_ok { - let shard = if runs == 0 { - //请求si表 - assert_eq!(req.attach().left_count, 0); - assert_eq!(*req.context_mut(), 0); - let vcmd = parse_vector_detail(****req, req.flag())?; - if req.operation().is_retrival() { - req.retry_on_rsp_notok(true); - let limit = vcmd.limit(); - assert!(limit > 0, "{limit}"); - //需要在buildsql之前设置 - req.attach_mut().init(limit as u16); - } + let shard_idx = self.shard_idx(req.hash()); + req.ctx_mut().year = year; + req.ctx_mut().shard_idx = shard_idx as u16; - let si_sql = SiSqlBuilder::new(&vcmd, req.hash(), &self.strategist)?; - let cmd = MysqlBuilder::build_packets_for_vector(si_sql)?; + let vector_builder = SqlBuilder::new(&vcmd, req.hash(), date, &self.strategist)?; + let cmd = MysqlBuilder::build_packets_for_vector(vector_builder)?; req.reshape(MemGuard::from_vec(cmd)); - let si_shard_idx = self.strategist.si_distribution().index(req.hash()); - req.ctx_mut().shard_idx = si_shard_idx as u16; - req.attach_mut().vcmd = vcmd; - req.set_last(false); - &self.si_shard[si_shard_idx] + (year, shard_idx) } else { - //根据round获取si - let si_items = req.attach().si(); - assert!(si_items.len() > 0, "si_items.len() = 0"); - assert!( - round <= si_items.len() as u16, - "round = {round}, si_items.len() = {}", - si_items.len() - ); - let si_item = &si_items[(round - 1) as usize]; - - let year = si_item.date.year as u16 + 2000; - //构建sql - let limit = req.attach().left_count.min(si_item.count); - assert!(si_item.count > 0, "{}", si_item.count); - assert!(req.attach().left_count > 0, "{}", req.attach().left_count); - - let Some(date) = NaiveDate::from_ymd_opt(year.into(), si_item.date.month.into(), 1) - else { - return Err(protocol::Error::ResponseInvalidMagic); - }; - let vector_builder = SqlBuilder::new( - &req.attach().vcmd, - req.hash(), - date, - &self.strategist, - limit as u64, - )?; - let cmd = MysqlBuilder::build_packets_for_vector(vector_builder)?; - - //更新轮次信息 - if round == si_items.len() as u16 { - req.set_last(true); - } - - req.reshape(MemGuard::from_vec(cmd)); - //获取shard - let shard_idx = self.shard_idx(req.hash()); - req.ctx_mut().year = year; - req.ctx_mut().shard_idx = shard_idx as u16; - let shards = self.shards.get(year); - shards.get(shard_idx).ok_or(protocol::Error::TopInvalid)? + (req.ctx_mut().year, req.ctx_mut().shard_idx as usize) }; - //重新发送后,视作新的请求,重置响应和runs - req.attach_mut().rsp_ok = false; - req.attach_mut().round += 1; - req.ctx_mut().runs = 0; - req.set_fitst_try(); - shard - } else { - if round - 1 == 0 { - //上一轮si表重试 - &self.si_shard[req.ctx_mut().shard_idx as usize] - } else { - let (year, shard_idx) = (req.ctx_mut().year, req.ctx_mut().shard_idx); - let shards = self.shards.get(year); - shards - .get(shard_idx as usize) - .ok_or(protocol::Error::TopInvalid)? + let shards = self.shards.get(year); + if shards.len() == 0 { + return Err(protocol::Error::TopInvalid); } - }; - - log::debug!( - "+++ mysql {} shards {:?} => {:?}", - self.cfg.service, - shard, - req - ); - Ok(shard) - } -} -impl Endpoint for VectorService -where - E: Endpoint, - Req: Request, - P: Protocol, -{ - type Item = Req; - - fn send(&self, mut req: Self::Item) { - let shard = if !self.strategist.more() { - self.get_shard(&mut req) - } else { - self.more_get_shard(&mut req) - }; + debug_assert!( + shard_idx < shards.len(), + "mysql: {}/{} req:{:?}", + shard_idx, + shards.len(), + req + ); + let shard = unsafe { shards.get_unchecked(shard_idx) }; + log::debug!( + "+++ mysql {} send {} year {} shards {:?} => {:?}", + self.cfg.service, + shard_idx, + year, + shards, + req + ); + Ok(shard) + })(); let shard = match shard { Ok(shard) => shard, Err(e) => { @@ -228,7 +119,6 @@ where protocol::Error::TopInvalid => ContextStatus::TopInvalid, _ => ContextStatus::ReqInvalid, }; - req.try_next(false); req.on_err(e); return; } @@ -272,18 +162,14 @@ where E: Endpoint, { fn need_load(&self) -> bool { - (self.shards.len() + self.si_shard.len()) != self.cfg.shards_url.len() - || self.cfg.need_load() + self.shards.len() != self.cfg.shards_url.len() || self.cfg.need_load() } fn load(&mut self) -> bool { self.cfg.load_guard().check_load(|| self.load_inner()) } fn update(&mut self, namespace: &str, cfg: &str) { if let Some(ns) = VectorNamespace::try_from(cfg) { - let Some(strategist) = Strategist::try_from(&ns) else { - return; - }; - self.strategist = strategist; + self.strategist = Strategist::try_from(&ns); self.cfg.update(namespace, ns); } } @@ -411,114 +297,13 @@ where } self.shards.push((interval, shards_per_interval)); } - if !self.load_inner_si() { - return false; - } - assert_eq!( - self.shards.len() + self.si_shard.len(), - self.cfg.shards_url.len() - ); - + assert_eq!(self.shards.len(), self.cfg.shards_url.len()); log::info!("{} load complete. dropping:{:?}", self.cfg.service, { old.retain(|_k, v| v.len() > 0); old.keys() }); true } - - #[inline] - fn load_inner_si(&mut self) -> bool { - // 所有的ip要都能解析出主从域名 - let mut addrs = Vec::with_capacity(self.cfg.si_backends.len()); - for shard in &self.cfg.si_backends { - let shard: Vec<&str> = shard.split(",").collect(); - if shard.len() < 2 { - log::warn!("{} si both master and slave required.", self.cfg.service); - return false; - } - let master_url = &shard[0]; - let mut master = String::new(); - dns::lookup_ips(master_url.host(), |ips| { - if ips.len() > 0 { - master = ips[0].to_string() + ":" + master_url.port(); - } - }); - let mut slaves = Vec::with_capacity(8); - for url_port in &shard[1..] { - let url = url_port.host(); - let port = url_port.port(); - use ds::vec::Add; - dns::lookup_ips(url, |ips| { - for ip in ips { - slaves.add(ip.to_string() + ":" + port); - } - }); - } - if master.len() == 0 || slaves.len() == 0 { - log::warn!( - "master:({}=>{}) or slave ({:?}=>{:?}) not looked up", - master_url, - master, - &shard[1..], - slaves - ); - return false; - } - addrs.push((master, slaves)); - } - - // 到这之后,所有的shard都能解析出ip - let mut old = HashMap::with_capacity(addrs.len()); - self.si_shard.split_off(0).into_iter().for_each(|shard| { - old.entry(shard.master.addr().to_string()) - .or_insert(Vec::new()) - .push(shard.master); - for endpoint in shard.slaves.into_inner() { - let addr = endpoint.addr().to_string(); - // 一个ip可能存在于多个域名中。 - old.entry(addr).or_insert(Vec::new()).push(endpoint); - } - }); - - // 遍历所有的shards_url - for (master_addr, slaves) in addrs { - assert_ne!(master_addr.len(), 0); - assert_ne!(slaves.len(), 0); - // 用户名和密码 - let res_option = ResOption { - token: self.cfg.basic.si_password.clone(), - username: self.cfg.basic.si_user.clone(), - }; - let master = self.take_or_build( - &mut old, - &master_addr, - self.cfg.timeout_master(), - res_option.clone(), - ); - // slave - let mut replicas = Vec::with_capacity(8); - for addr in slaves { - let slave = self.take_or_build( - &mut old, - &addr, - self.cfg.timeout_slave(), - res_option.clone(), - ); - replicas.push(slave); - } - - use crate::PerformanceTuning; - let shard = Shard::selector( - self.cfg.basic.selector.tuning_mode(), - master, - replicas, - self.cfg.basic.region_enabled, - ); - self.si_shard.push(shard); - } - - true - } } impl discovery::Inited for VectorService where @@ -529,7 +314,7 @@ where fn inited(&self) -> bool { // 每一个分片都有初始, 并且至少有一主一从。 self.shards.len() > 0 - && (self.shards.len() + self.si_shard.len()) == self.cfg.shards_url.len() + && self.shards.len() == self.cfg.shards_url.len() && self.shards.inited() } } diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index 767987c18..df7a9b676 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -42,7 +42,6 @@ url = "2.1" percent-encoding = "2.1.0" seq-macro = "*" chrono = "0.4" -chrono-tz = { version = "0.5", default-features = false } paste = "1.0" [features] diff --git a/protocol/src/callback.rs b/protocol/src/callback.rs index f7471921e..5cd43460e 100644 --- a/protocol/src/callback.rs +++ b/protocol/src/callback.rs @@ -1,4 +1,5 @@ use std::{ + cell::OnceCell, mem::MaybeUninit, ptr::{self, NonNull}, sync::{ @@ -7,7 +8,7 @@ use std::{ }, }; -use crate::{Attachment, BackendQuota}; +use crate::BackendQuota; use ds::{time::Instant, AtomicWaker}; use crate::{request::Request, Command, Error, HashedCommand}; @@ -37,7 +38,7 @@ pub struct CallbackContext { pub(crate) try_next: bool, // 请求失败后,topo层面是否允许重试 pub(crate) retry_on_rsp_notok: bool, // 有响应且响应不ok时,协议层面是否允许重试 pub(crate) write_back: bool, // 请求结束后,是否需要回写。 - pub(crate) max_tries: u8, // 最大重试次数 + pub(crate) max_tries: OnceCell, // 最大重试次数 first: bool, // 当前请求是否是所有子请求的第一个 last: bool, // 当前请求是否是所有子请求的最后一个 tries: AtomicU8, @@ -47,8 +48,6 @@ pub struct CallbackContext { waker: *const Arc, callback: CallbackPtr, quota: Option, - attachment: Option, // 附加数据,用于辅助请求和响应,目前只有kvector在使用 - drop_attach: Option>, } impl CallbackContext { @@ -61,7 +60,6 @@ impl CallbackContext { last: bool, retry_on_rsp_notok: bool, max_tries: u8, - drop_attach: Option>, ) -> Self { log::debug!("request prepared:{}", req); let now = Instant::now(); @@ -75,7 +73,7 @@ impl CallbackContext { try_next: false, retry_on_rsp_notok, write_back: false, - max_tries, + max_tries: OnceCell::from(max_tries), request: req, response: MaybeUninit::uninit(), callback: cb, @@ -83,8 +81,6 @@ impl CallbackContext { tries: 0.into(), waker, quota: None, - attachment: None, - drop_attach, } } @@ -118,46 +114,14 @@ impl CallbackContext { } } #[inline] - pub fn on_complete(&mut self, parser: &P, resp: Command) { + pub fn on_complete(&mut self, resp: Command) { log::debug!("on-complete:{} resp:{}", self, resp); // 异步请求不关注response。 if !self.async_mode { debug_assert!(!self.complete(), "{:?}", self); - if self.attachment.is_some() { - // vector聚合场景 - self.on_complete_aggregate(parser, resp); - } else { - self.swap_response(resp); - } - } - self.on_done(); - } - - #[inline] - pub fn on_complete_aggregate(&mut self, parser: &P, mut resp: Command) { - // 返回成功: - // 1. 第一轮获取si;若si获取失败(例如si为空),则终止请求 - // 2. 后续轮次更新attachment,并判断是否是最后一轮。 - // 返回失败,则终止请求。 - if resp.ok() { - // 更新attachment - let attach = self.attachment.as_mut().expect("attach"); - let last = parser.update_attachment(attach, &mut resp); - if last { - self.set_last(true); - } - // 更新attachment不成功,或者响应数足够,终止请求 - } else { - self.set_last(true); - } - if self.last() { - // 中间轮次的resp没有被使用,可忽略; self.swap_response(resp); - } else { - // 重置下一轮访问需要的变量 - self.try_next = true; // 可以进入下一轮访问 - self.set_fitst_try(); } + self.on_done(); } #[inline] @@ -178,7 +142,7 @@ impl CallbackContext { // 当前重试条件为 rsp == None || ("mc" && !rsp.ok()) if self.inited() { // 优先筛出正常的请求,便于理解 - // rsp.ok + // rsp.ok 不需要重试 if unsafe { self.unchecked_response().ok() } { return false; } @@ -187,8 +151,8 @@ impl CallbackContext { return false; } } - - self.try_next && self.tries.fetch_add(1, Release) < self.max_tries + let max_tries = *self.max_tries.get().expect("max tries"); + self.try_next && self.tries.fetch_add(1, Release) < max_tries } else { // write back请求 self.write_back @@ -208,10 +172,6 @@ impl CallbackContext { // 需要重试或回写 return self.goon(); } - - // 改到这里,不需要额外判断逻辑了 - self.set_last(true); - //markdone后,req标记为已完成,那么CallbackContext和CopyBidirectional都有可能被释放 //CopyBidirectional会提前释放,所以需要提前clone一份 //CallbackContext会提前释放,则需要在此clone到栈上 @@ -330,27 +290,6 @@ impl CallbackContext { pub fn quota(&mut self, quota: BackendQuota) { self.quota = Some(quota); } - #[inline] - pub fn attachment(&self) -> Option<&Attachment> { - self.attachment.as_ref() - } - #[inline] - pub fn attachment_mut(&mut self) -> &mut Option { - &mut self.attachment - } - #[inline] - pub fn set_last(&mut self, last: bool) { - // todo: 可优化为依据请求数或者响应数量判断可以设置last为true - self.last = last; - } - #[inline] - pub fn set_max_tries(&mut self, max_tries: u8) { - self.max_tries = max_tries; - } - #[inline] - pub fn set_fitst_try(&mut self) { - self.tries = 0.into(); - } } impl Drop for CallbackContext { @@ -361,9 +300,6 @@ impl Drop for CallbackContext { // 可以尝试检查double free // 在debug环境中,设置done为false debug_assert_eq!(*self.done.get_mut() = false, ()); - if let Some(attachment) = self.attachment.take() { - (self.drop_attach.as_ref().expect("should has drop_attach"))(attachment); - } } } diff --git a/protocol/src/flag.rs b/protocol/src/flag.rs index 4c8accf64..dda81ba37 100644 --- a/protocol/src/flag.rs +++ b/protocol/src/flag.rs @@ -79,20 +79,38 @@ impl Flag { } } -#[derive(Debug, Default)] -pub struct ResponseHeader { - pub(crate) header: Vec, - pub(crate) rows: u16, // 包含的响应行数 - pub(crate) columns: u16, // 每行的响应列数 -} +// TODO 暂时保留备查,2024.2后再考虑清理 fishermen +// #[derive(Debug, Clone)] +// pub enum TryNextType { +// NotTryNext = 0, +// TryNext = 1, +// // 去掉unknow类型,统一逻辑处理,测试稳定后清理,预计2024.1后清理 fishermen +// // Unkown = 2, +// } -impl ResponseHeader { - #[inline] - pub fn new(header: Vec, rows: u16, columns: u16) -> Self { - ResponseHeader { - header, - rows, - columns, - } - } -} +// // (1) 0: not try next(对add/replace生效); (2) 1: try next; (3) 2:unkown (仅对set生效,注意提前考虑cas) +// impl From for TryNextType { +// fn from(val: u8) -> Self { +// match val { +// 0 => TryNextType::NotTryNext, +// 1 => TryNextType::TryNext, +// // 2 => TryNextType::Unkown, +// _ => panic!("unknow try next type"), +// } +// } +// // TODO 暂时保留,线上稳定后清理,预计2024.2之后 fishermen +// // pub fn from(val: u8) -> Self { +// // match val { +// // 0 => TryNextType::NotTryNext, +// // 1 => TryNextType::TryNext, +// // // 2 => TryNextType::Unkown, +// // _ => panic!("unknow try next type"), +// // } +// // } +// } + +// impl Default for TryNextType { +// fn default() -> Self { +// TryNextType::TryNext +// } +// } diff --git a/protocol/src/kv/common/constants.rs b/protocol/src/kv/common/constants.rs index 2d401e4bd..29fda03df 100644 --- a/protocol/src/kv/common/constants.rs +++ b/protocol/src/kv/common/constants.rs @@ -640,7 +640,6 @@ impl ColumnType { | MYSQL_TYPE_INT24 | MYSQL_TYPE_LONG | MYSQL_TYPE_LONGLONG - | MYSQL_TYPE_NEWDECIMAL ) } diff --git a/protocol/src/kv/common/error/mod.rs b/protocol/src/kv/common/error/mod.rs index 6b731b557..371720428 100644 --- a/protocol/src/kv/common/error/mod.rs +++ b/protocol/src/kv/common/error/mod.rs @@ -63,9 +63,7 @@ pub enum Error { UrlError(UrlError), #[cfg(any(feature = "native-tls", feature = "rustls"))] TlsError(tls::TlsError), - #[allow(dead_code)] FromValueError(Value), - #[allow(dead_code)] FromRowError(Row), } diff --git a/protocol/src/kv/common/query_result.rs b/protocol/src/kv/common/query_result.rs index 2e41335c1..199cc24eb 100644 --- a/protocol/src/kv/common/query_result.rs +++ b/protocol/src/kv/common/query_result.rs @@ -64,7 +64,6 @@ pub enum SetIteratorState { /// Iterator is in a non-empty set. InSet(Arc<[Column]>), /// Iterator is in an empty set. - #[allow(dead_code)] InEmptySet(OkPacket), /// Iterator is in an errored result set. Errored(Error), diff --git a/protocol/src/parser.rs b/protocol/src/parser.rs index 397f69470..97b1e7ddf 100644 --- a/protocol/src/parser.rs +++ b/protocol/src/parser.rs @@ -9,7 +9,7 @@ use crate::msgque::MsgQue; use crate::redis::Redis; use crate::uuid::Uuid; use crate::vector::Vector; -use crate::{Attachment, Error, Flag, OpCode, Operation, ResponseHeader, Result, Stream, Writer}; +use crate::{Error, Flag, OpCode, Operation, Result, Stream, Writer}; #[derive(Clone)] #[enum_dispatch(Proto)] @@ -128,17 +128,6 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static { fn max_tries(&self, _req_op: Operation) -> u8 { 1_u8 } - - #[inline] - fn update_attachment(&self, attachment: &mut Attachment, response: &mut Command) -> bool { - // 默认情况下,attachment应该为空 - assert!(false, "{:?} {response}", attachment); - false - } - #[inline] - fn drop_attach(&self, _att: Attachment) { - panic!("unreachable"); - } } pub trait RequestProcessor { @@ -150,11 +139,8 @@ pub trait RequestProcessor { fn process(&mut self, req: HashedCommand, last: bool); } -// TODO Command实质就是response,考虑直接用response? fishermen pub struct Command { ok: bool, - pub(crate) header: ResponseHeader, - count: u32, cmd: MemGuard, } @@ -170,24 +156,8 @@ pub struct HashedCommand { impl Command { #[inline] pub fn from(ok: bool, cmd: ds::MemGuard) -> Self { - Self { - ok, - header: Default::default(), - count: 0, - cmd, - } + Self { ok, cmd } } - #[inline] - pub fn with_assemble_pack(ok: bool, header: ResponseHeader, body: ds::MemGuard) -> Self { - let count = header.rows as u32; - Self { - ok, - header, - count, - cmd: body, - } - } - pub fn from_ok(cmd: ds::MemGuard) -> Self { Self::from(true, cmd) } @@ -195,18 +165,6 @@ impl Command { pub fn ok(&self) -> bool { self.ok } - #[inline] - - pub fn update_ok(&mut self, ok: bool) { - self.ok = ok; - } - pub fn count(&self) -> u32 { - self.count - } - #[inline] - pub fn set_count(&mut self, n: u32) { - self.count = n; - } } impl std::ops::Deref for Command { type Target = MemGuard; @@ -292,13 +250,14 @@ impl HashedCommand { } #[inline] pub fn reshape(&mut self, mut dest_cmd: MemGuard) { - if self.origin_cmd.is_none() { - // 将dest cmd设给cmd,并将换出的cmd保留在origin_cmd中 - mem::swap(&mut self.cmd, &mut dest_cmd); - self.origin_cmd = Some(dest_cmd); - } else { - self.cmd = dest_cmd; - } + assert!( + self.origin_cmd.is_none(), + "origin cmd should be none: {:?}", + self.origin_cmd + ); + // 将dest cmd设给cmd,并将换出的cmd保留在origin_cmd中 + mem::swap(&mut self.cmd, &mut dest_cmd); + self.origin_cmd = Some(dest_cmd); } } @@ -335,7 +294,6 @@ pub trait Commander, I: MetricItem> { fn request_shard(&self) -> usize; fn metric(&self) -> &M; fn ctx(&self) -> u64; - fn attachment(&self) -> Option<&Attachment>; } pub enum MetricName { diff --git a/protocol/src/redis/packet.rs b/protocol/src/redis/packet.rs index a21c589be..634ca0604 100644 --- a/protocol/src/redis/packet.rs +++ b/protocol/src/redis/packet.rs @@ -486,10 +486,7 @@ impl Packet { pub fn num(&self, oft: &mut usize) -> crate::Result { // 至少4个字节 if *oft + 4 <= self.len() { - debug_assert!( - self[*oft] == b'*' || self[*oft] == b'$' || self[*oft] == b':', - "packet:{self:?}" - ); + debug_assert!(self[*oft] == b'*' || self[*oft] == b'$', "packet:{self:?}"); let start = *oft; *oft += num_skips(self.at(*oft + 1)); let mut val: usize = 0; @@ -515,7 +512,7 @@ impl Packet { } if b.is_ascii_digit() { val = val * 10 + (b - b'0') as usize; - if val <= std::usize::MAX as usize { + if val <= std::u32::MAX as usize { continue; } } diff --git a/protocol/src/req.rs b/protocol/src/req.rs index d3b88a268..4577248bc 100644 --- a/protocol/src/req.rs +++ b/protocol/src/req.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use crate::{Command, HashedCommand}; pub type Context = u64; -pub type Attachment = [u8; 280]; + #[repr(transparent)] #[derive(Clone, Default)] pub struct BackendQuota { @@ -49,7 +49,7 @@ pub trait Request: fn start_at(&self) -> Instant; fn on_noforward(&mut self); fn on_sent(self) -> Option; - fn on_complete(self, parser: &P, resp: Command); + fn on_complete(self, resp: Command); fn on_err(self, err: crate::Error); #[inline] fn context_mut(&mut self) -> &mut Context { @@ -67,11 +67,4 @@ pub trait Request: fn retry_on_rsp_notok(&mut self, retry: bool); // 初始化quota fn quota(&mut self, quota: BackendQuota); - // 对request增加附加信息 - fn attachment_mut(&mut self) -> &mut Option; - // 获取附加信息 - fn attachment(&self) -> Option<&Attachment>; - fn set_max_tries(&mut self, max_tries: u8); - fn set_fitst_try(&mut self); - fn set_last(&mut self, last: bool); } diff --git a/protocol/src/request.rs b/protocol/src/request.rs index c66adbdf3..f376095d4 100644 --- a/protocol/src/request.rs +++ b/protocol/src/request.rs @@ -1,6 +1,4 @@ -use crate::{ - callback::CallbackContext, Attachment, BackendQuota, Command, Context, Error, HashedCommand, -}; +use crate::{callback::CallbackContext, BackendQuota, Command, Context, Error, HashedCommand}; use std::{ fmt::{self, Debug, Display, Formatter}, ptr::NonNull, @@ -28,8 +26,8 @@ impl crate::Request for Request { } } #[inline] - fn on_complete(self, parser: &P, resp: Command) { - self.ctx().on_complete(parser, resp); + fn on_complete(self, resp: Command) { + self.ctx().on_complete(resp); } #[inline] fn on_err(self, err: Error) { @@ -59,25 +57,6 @@ impl crate::Request for Request { fn quota(&mut self, quota: BackendQuota) { self.ctx().quota(quota); } - #[inline] - fn attachment(&self) -> Option<&Attachment> { - self.ctx().attachment() - } - #[inline] - fn attachment_mut(&mut self) -> &mut Option { - self.ctx().attachment_mut() - } - #[inline] - fn set_max_tries(&mut self, max_tries: u8) { - self.ctx().set_max_tries(max_tries); - } - #[inline] - fn set_fitst_try(&mut self) { - self.ctx().set_fitst_try(); - } - fn set_last(&mut self, last: bool) { - self.ctx().set_last(last); - } } impl Request { #[inline] diff --git a/protocol/src/vector/attachment.rs b/protocol/src/vector/attachment.rs deleted file mode 100644 index f45ec8a8a..000000000 --- a/protocol/src/vector/attachment.rs +++ /dev/null @@ -1,213 +0,0 @@ -use crate::Attachment; -use crate::Command; -use crate::Packet; -use ds::RingSlice; - -use super::VectorCmd; -#[derive(Debug, Default)] -#[repr(C)] -pub struct VecAttach { - pub rsp_ok: bool, - // 查询的轮次,0代表si - pub round: u16, - // 待查询数量,不能超过u16::MAX - pub left_count: u16, - body_token_count: u16, - //本轮响应是否成功 - // header,*2 + column names - header: Vec, - body: Vec>, - // 查询响应的body中token数量 - si: Vec, // si表中查询到的数据, si字段信息在配置里存放 - pub vcmd: VectorCmd, -} - -#[derive(Debug, Default)] -pub struct SiItem { - pub date: VDate, - pub count: u16, -} -impl SiItem { - pub fn new(yy: u8, mm: u8, count: u16) -> Self { - Self { - date: VDate { - year: yy, - month: mm, - }, - count, - } - } -} -#[derive(Debug, Default)] -pub struct VDate { - pub year: u8, // year - pub month: u8, // month -} -impl VDate { - // 2024-11-01 -> {24, 11} - pub fn from(d: &RingSlice) -> Self { - if let Some(first) = d.find(0, b'-') { - let y = d.try_str_num(0..first).unwrap_or(0); - if let Some(second) = d.find(first + 1, b'-') { - let m = d.try_str_num(first + 1..second).unwrap_or(0); - if y > 0 && m > 0 { - return Self { - year: y.checked_rem(100).unwrap_or(0) as u8, - month: m as u8, - }; - } - } - } - Self { year: 0, month: 0 } - } - #[inline] - pub fn year(&self) -> u8 { - self.year - } - #[inline] - pub fn month(&self) -> u8 { - self.month - } - #[inline] - pub fn is_valid(&self) -> bool { - self.month > 0 && self.year > 0 - } -} - -pub trait VAttach { - fn attach(&self) -> &VecAttach; - fn attach_mut(&mut self) -> &mut VecAttach; -} - -impl VAttach for T { - #[inline(always)] - fn attach(&self) -> &VecAttach { - unsafe { std::mem::transmute(self.attachment().expect("attach is none")) } - } - #[inline(always)] - fn attach_mut(&mut self) -> &mut VecAttach { - unsafe { std::mem::transmute(self.attachment_mut().as_mut().expect("attach is none")) } - } -} - -impl VecAttach { - #[inline(always)] - pub fn from(att: Attachment) -> VecAttach { - unsafe { std::mem::transmute(att) } - } - #[inline(always)] - pub fn attach(att: &Attachment) -> &VecAttach { - unsafe { std::mem::transmute(att) } - } - #[inline(always)] - pub fn attach_mut(att: &mut Attachment) -> &mut VecAttach { - unsafe { std::mem::transmute(att) } - } - #[inline(always)] - pub fn to_attach(self) -> Attachment { - unsafe { std::mem::transmute(self) } - } - #[inline] - pub fn init(&mut self, left_count: u16) { - *self = VecAttach { - round: 0, - left_count, - header: Vec::with_capacity(8), - body: Vec::with_capacity(12), - body_token_count: 0, - rsp_ok: false, - si: Vec::with_capacity(6), - vcmd: Default::default(), - }; - } - #[inline] - pub fn is_empty(&self) -> bool { - self.body.is_empty() - } - - pub fn attach_header(&mut self, header: Vec) { - self.header = header; - } - #[inline] - pub fn attach_body(&mut self, body_data: Vec, rows: u16, columns: u16) { - self.body.push(body_data); - self.body_token_count += rows * columns; - self.left_count = self.left_count.saturating_sub(rows); - } - - #[inline] - pub fn header(&self) -> &Vec { - &self.header - } - - #[inline] - pub fn body(&self) -> &Vec> { - &self.body - } - - #[inline] - pub fn body_token_count(&self) -> u16 { - self.body_token_count - } - // 从response中解析si - // 约定:si返回结果的结构: uid、date、count顺序排列 - #[inline] - pub fn attach_si(&mut self, response: &Command) -> bool { - let rows = response.header.rows; - let cols = response.header.columns; - debug_assert_eq!(cols, 3); - self.si.reserve(rows as usize); - let data = Packet::from(***response); - let mut oft: usize = 0; - while oft < data.len() { - if data.num(&mut oft).is_err() { - return false; - } - let d = data.bulk_string(&mut oft); - if d.is_err() { - return false; - } - if let Ok(count) = data.num(&mut oft) { - if count > 0 { - let date = VDate::from(&d.unwrap()); - if date.is_valid() { - let si_item = SiItem::new(date.year(), date.month(), count as u16); - self.si.push(si_item); - } - } - } - } - self.si.len() > 0 - } - #[inline] - pub fn has_si(&mut self) -> bool { - self.si.len() > 0 - } - #[inline] - pub fn si(&self) -> &Vec { - &self.si - } -} - -#[cfg(test)] -mod tests { - use ds::MemGuard; - - use crate::ResponseHeader; - - use super::*; - #[test] - fn test_attach_si() { - let header: ResponseHeader = ResponseHeader::new( - "*3\r\n$3\r\nuid\r\n$10\r\nstart_date\r\n$5\r\ncount\r\n".into(), - 246, - 3, - ); - let body: MemGuard = MemGuard::from_vec(":6351590999\r\n$10\r\n2024-06-01\r\n:674\r\n:6351590999\r\n$10\r\n2024-05-01\r\n:1113\r\n:6351590999\r\n$10\r\n2024-04-01\r\n:833\r\n:6351590999\r\n$10\r\n2024-03-01\r\n:45\r\n:6351590999\r\n$10\r\n2024-02-01\r\n:61\r\n:6351590999\r\n$10\r\n2024-01-01\r\n:59\r\n:6351590999\r\n$10\r\n2023-12-01\r\n:20\r\n:6351590999\r\n$10\r\n2023-11-01\r\n:9\r\n:6351590999\r\n$10\r\n2023-10-01\r\n:13\r\n:6351590999\r\n$10\r\n2023-09-01\r\n:50\r\n:6351590999\r\n$10\r\n2023-08-01\r\n:16\r\n:6351590999\r\n$10\r\n2023-07-01\r\n:61\r\n:6351590999\r\n$10\r\n2023-06-01\r\n:30\r\n:6351590999\r\n$10\r\n2023-05-01\r\n:41\r\n:6351590999\r\n$10\r\n2023-04-01\r\n:54\r\n:6351590999\r\n$10\r\n2023-03-01\r\n:108\r\n:6351590999\r\n$10\r\n2023-02-01\r\n:213\r\n:6351590999\r\n$10\r\n2023-01-01\r\n:159\r\n:6351590999\r\n$10\r\n2022-12-01\r\n:26\r\n:6351590999\r\n$10\r\n2022-11-01\r\n:16\r\n:6351590999\r\n$10\r\n2022-10-01\r\n:14\r\n:6351590999\r\n$10\r\n2022-09-01\r\n:3\r\n:6351590999\r\n$10\r\n2022-08-01\r\n:10\r\n:6351590999\r\n$10\r\n2022-07-01\r\n:9\r\n:6351590999\r\n$10\r\n2022-06-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-05-01\r\n:23\r\n:6351590999\r\n$10\r\n2022-04-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-03-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-02-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-01-01\r\n:5\r\n:6351590999\r\n$10\r\n2021-12-01\r\n:14\r\n:6351590999\r\n$10\r\n2021-11-01\r\n:4\r\n:6351590999\r\n$10\r\n2021-10-01\r\n:2\r\n:6351590999\r\n$10\r\n2021-09-01\r\n:3\r\n:6351590999\r\n$10\r\n2021-08-01\r\n:25\r\n:6351590999\r\n$10\r\n2021-07-01\r\n:36\r\n:6351590999\r\n$10\r\n2021-06-01\r\n:30\r\n:6351590999\r\n$10\r\n2021-05-01\r\n:18\r\n:6351590999\r\n$10\r\n2021-04-01\r\n:20\r\n:6351590999\r\n$10\r\n2021-03-01\r\n:21\r\n:6351590999\r\n$10\r\n2021-02-01\r\n:35\r\n:6351590999\r\n$10\r\n2021-01-01\r\n:22\r\n:6351590999\r\n$10\r\n2020-12-01\r\n:55\r\n:6351590999\r\n$10\r\n2020-11-01\r\n:22\r\n:6351590999\r\n$10\r\n2020-10-01\r\n:37\r\n:6351590999\r\n$10\r\n2020-09-01\r\n:33\r\n:6351590999\r\n$10\r\n2020-08-01\r\n:15\r\n:6351590999\r\n$10\r\n2020-07-01\r\n:12\r\n:6351590999\r\n$10\r\n2020-06-01\r\n:26\r\n:6351590999\r\n$10\r\n2020-05-01\r\n:54\r\n:6351590999\r\n$10\r\n2020-04-01\r\n:38\r\n:6351590999\r\n$10\r\n2020-03-01\r\n:27\r\n:6351590999\r\n$10\r\n2020-02-01\r\n:80\r\n:6351590999\r\n$10\r\n2020-01-01\r\n:99\r\n:6351590999\r\n$10\r\n2019-12-01\r\n:67\r\n:6351590999\r\n$10\r\n2019-11-01\r\n:120\r\n:6351590999\r\n$10\r\n2019-10-01\r\n:80\r\n:6351590999\r\n$10\r\n2019-09-01\r\n:76\r\n:6351590999\r\n$10\r\n2019-08-01\r\n:120\r\n:6351590999\r\n$10\r\n2019-07-01\r\n:140\r\n:6351590999\r\n$10\r\n2019-06-01\r\n:118\r\n:6351590999\r\n$10\r\n2019-05-01\r\n:146\r\n:6351590999\r\n$10\r\n2019-04-01\r\n:287\r\n:6351590999\r\n$10\r\n2019-03-01\r\n:83\r\n:6351590999\r\n$10\r\n2019-02-01\r\n:88\r\n:6351590999\r\n$10\r\n2019-01-01\r\n:262\r\n:6351590999\r\n$10\r\n2018-12-01\r\n:213\r\n:6351590999\r\n$10\r\n2018-11-01\r\n:251\r\n:6351590999\r\n$10\r\n2018-10-01\r\n:215\r\n:6351590999\r\n$10\r\n2018-09-01\r\n:192\r\n:6351590999\r\n$10\r\n2018-08-01\r\n:208\r\n:6351590999\r\n$10\r\n2018-07-01\r\n:339\r\n:6351590999\r\n$10\r\n2018-06-01\r\n:97\r\n:6351590999\r\n$10\r\n2018-05-01\r\n:162\r\n:6351590999\r\n$10\r\n2018-04-01\r\n:127\r\n:6351590999\r\n$10\r\n2018-03-01\r\n:147\r\n:6351590999\r\n$10\r\n2018-02-01\r\n:529\r\n:6351590999\r\n$10\r\n2018-01-01\r\n:702\r\n:6351590999\r\n$10\r\n2017-12-01\r\n:453\r\n:6351590999\r\n$10\r\n2017-11-01\r\n:70\r\n:6351590999\r\n$10\r\n2017-10-01\r\n:1\r\n:6351590999\r\n$10\r\n2017-08-01\r\n:1\r\n".into()); - let response: Command = Command::with_assemble_pack(true, header, body); - let mut att = VecAttach::default(); - att.init(1); - let r = att.attach_si(&response); - assert!(r); - } -} diff --git a/protocol/src/vector/mod.rs b/protocol/src/vector/mod.rs index 047d22cf1..65917f384 100644 --- a/protocol/src/vector/mod.rs +++ b/protocol/src/vector/mod.rs @@ -1,4 +1,3 @@ -pub mod attachment; mod command; pub(crate) mod error; pub mod flager; @@ -10,18 +9,15 @@ mod reqpacket; mod rsppacket; use std::fmt::Write; -use std::mem; use crate::{ - Attachment, Command, Commander, Error, HashedCommand, Metric, MetricItem, Protocol, - RequestProcessor, Result, Stream, Writer, + Command, Commander, Error, HashedCommand, Metric, MetricItem, Protocol, RequestProcessor, + Result, Stream, Writer, }; use chrono::NaiveDate; use ds::RingSlice; use sharding::hash::Hash; -use self::attachment::VecAttach; -use self::packet::RedisPack; use self::reqpacket::RequestPacket; use self::rsppacket::ResponsePacket; use crate::kv::client::Client; @@ -121,34 +117,13 @@ impl Protocol for Vector { w.write("-ERR ".as_bytes())?; w.write_slice(response, 0)?; // mysql返回的错误信息 w.write("\r\n".as_bytes())?; + return Ok(()); } else { - if ctx.attachment().is_some() { - // 有attachment: 组装rsp: header(vec[0]) + *body_tokens + vec[1..] - let attach = VecAttach::attach(ctx.attachment().unwrap()); - if attach.body_token_count() > 0 { - w.write(attach.header())?; - w.write(format!("*{}\r\n", attach.body_token_count()).as_bytes())?; - for b in attach.body() { - w.write(b.as_slice())?; - } - } else { - // 返回空 - w.write("$-1\r\n".as_bytes())?; - } - } else { - // 无attachment: response已封装为redis协议。正常响应有三种: - // 1. 只返回影响的行数 - // 2. 一行或多行数据 - // 3. 结果为空 - if response.header.rows > 0 { - w.write(response.header.header.as_ref())?; - w.write( - format!("*{}\r\n", response.header.rows * response.header.columns) - .as_bytes(), - )?; - } - w.write_slice(response, 0)?; // value - } + // response已封装为redis协议。正常响应有三种: + // 1. 只返回影响的行数 + // 2. 一行或多行数据 + // 3. 结果为空 + w.write_slice(response, 0)?; // value } return Ok(()); } @@ -174,42 +149,6 @@ impl Protocol for Vector { log::debug!("+++ send to client padding {:?}", ctx.request()); Ok(()) } - - // 将中间响应放到attachment中,方便后续继续查询 - // 先收集si信息,再收集body - // 返回值:是否需要继续查询 - #[inline] - fn update_attachment(&self, attachment: &mut Attachment, response: &mut Command) -> bool { - assert!(response.ok()); - let attach = VecAttach::attach_mut(attachment); - //收到响应就算ok,响应有问题也不会发送到topo了 - attach.rsp_ok = true; - - if attach.is_empty() { - // TODO 先打通,此处的内存操作需要考虑优化 fishermen - let mut header_data = Vec::new(); - let header = &mut response.header; - mem::swap(&mut header_data, &mut header.header); - attach.attach_header(header_data); - } - - // TODO 先打通,此处的内存操作需要考虑优化 fishermen - match attach.has_si() { - true => { - if response.header.rows > 0 { - let header = &response.header; - attach.attach_body(response.data().0.to_vec(), header.rows, header.columns); - } - attach.left_count == 0 - } - // 按si解析响应: 未成功获取有效si信息或者解析si失败,并终止后续请求 - false => response.count() == 0 || !attach.attach_si(response), - } - } - #[inline] - fn drop_attach(&self, att: Attachment) { - let _ = VecAttach::from(att); - } } impl Vector { @@ -274,11 +213,41 @@ impl Vector { Ok(cmd) => Ok(cmd), Err(crate::kv::error::Error::UnhandleResponseError(emsg)) => { // 对于UnhandleResponseError,需要构建rsp,发给client - let cmd = rsp_packet.build_final_rsp_cmd(false, RedisPack::with_simple(emsg)); + let cmd = rsp_packet.build_final_rsp_cmd(false, emsg); Ok(cmd) } Err(e) => Err(e.into()), } + + // let meta = match rsp_packet.parse_result_set_meta() { + // Ok(meta) => meta, + // Err(crate::kv::error::Error::UnhandleResponseError(emsg)) => { + // // 对于UnhandleResponseError,需要构建rsp,发给client + // let cmd = rsp_packet.build_final_rsp_cmd(false, emsg); + // return Ok(cmd); + // } + // Err(e) => return Err(e.into()), + // }; + + // // 如果是只有meta的ok packet,直接返回影响的列数,如insert/delete/update + // if let Or::B(ok) = meta { + // let affected = ok.affected_rows(); + // let cmd = rsp_packet.build_final_affected_rows_rsp_cmd(affected); + // return Ok(cmd); + // } + + // // 解析meta后面的rows,返回列记录,如select + // // 有可能多行数据,直接build成 + // let mut query_result: QueryResult = QueryResult::new(rsp_packet, meta); + // match query_result.parse_rows_to_cmd() { + // Ok(cmd) => Ok(cmd), + // Err(crate::kv::error::Error::UnhandleResponseError(emsg)) => { + // // 对于UnhandleResponseError,需要构建rsp,发给client + // let cmd = query_result.build_final_rsp_cmd(false, emsg); + // Ok(cmd) + // } + // Err(e) => Err(e.into()), + // } } } @@ -303,8 +272,6 @@ pub(crate) const COND_ORDER: &[u8] = b"ORDER"; pub(crate) const COND_LIMIT: &[u8] = b"LIMIT"; pub(crate) const COND_GROUP: &[u8] = b"GROUP"; -const DEFAULT_LIMIT: usize = 15; - #[derive(Debug, Clone, Default)] pub struct Condition { pub field: RingSlice, @@ -359,16 +326,6 @@ pub struct VectorCmd { pub group_by: GroupBy, } -impl VectorCmd { - #[inline(always)] - pub fn limit(&self) -> usize { - match self.limit.limit.try_str_num(..) { - Some(limit) => limit, - None => DEFAULT_LIMIT, - } - } -} - /// field 字段的值,对于‘field’关键字,值是|分隔的field names,否则就是二进制value #[derive(Debug, Clone)] pub enum FieldVal { @@ -418,7 +375,4 @@ pub trait Strategy { //todo 通过代理类型实现 fn condition_keys(&self) -> Box> + '_>; fn write_database_table(&self, buf: &mut impl Write, date: &NaiveDate, hash: i64); - fn write_si_database_table(&self, buf: &mut impl Write, hash: i64); - fn batch(&self, limit: u64, vcmd: &VectorCmd) -> u64; - fn si_cols(&self) -> &[String]; } diff --git a/protocol/src/vector/mysql.rs b/protocol/src/vector/mysql.rs index 8c960f8dc..33315317a 100644 --- a/protocol/src/vector/mysql.rs +++ b/protocol/src/vector/mysql.rs @@ -157,12 +157,12 @@ impl<'a> Display for UpdateFields<'a> { } } -struct KeysAndCondsAndOrderAndLimit<'a, S>(&'a S, &'a VectorCmd, u64); +struct KeysAndCondsAndOrderAndLimit<'a, S>(&'a S, &'a VectorCmd); impl<'a, S: Strategy> Display for KeysAndCondsAndOrderAndLimit<'a, S> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let &Self( strategy, - vcmd @ VectorCmd { + VectorCmd { cmd: _, keys, fields: _, @@ -171,7 +171,6 @@ impl<'a, S: Strategy> Display for KeysAndCondsAndOrderAndLimit<'a, S> { limit, group_by, }, - extra, ) = self; for (i, key) in (&mut strategy.condition_keys()).enumerate() { if let Some(key) = key { @@ -196,10 +195,7 @@ impl<'a, S: Strategy> Display for KeysAndCondsAndOrderAndLimit<'a, S> { VRingSlice(&order.order) ); } - let strategy_limit = strategy.batch(extra, vcmd); - if strategy_limit > 0 { - let _ = write!(f, " limit {}", strategy.batch(extra, vcmd),); - } else if limit.offset.len() != 0 { + if limit.offset.len() != 0 { let _ = write!( f, " limit {} offset {}", @@ -216,17 +212,10 @@ pub struct SqlBuilder<'a, S> { hash: i64, date: NaiveDate, strategy: &'a S, - limit: u64, } impl<'a, S: Strategy> SqlBuilder<'a, S> { - pub fn new( - vcmd: &'a VectorCmd, - hash: i64, - date: NaiveDate, - strategy: &'a S, - limit: u64, - ) -> Result { + pub fn new(vcmd: &'a VectorCmd, hash: i64, date: NaiveDate, strategy: &'a S) -> Result { if vcmd.keys.len() != strategy.keys().len() { Err(Error::RequestProtocolInvalid) } else { @@ -235,7 +224,6 @@ impl<'a, S: Strategy> SqlBuilder<'a, S> { hash, date, strategy, - limit, }) } } @@ -292,6 +280,7 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { } fn write_sql(&self, buf: &mut impl Write) { + // let cmd_type = vector::get_cmd_type(self.op).unwrap_or(vector::CommandType::Unknown); match self.vcmd.cmd { CommandType::VRange | CommandType::VGet => { let _ = write!( @@ -299,7 +288,7 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { "select {} from {} where {}", Select(self.vcmd.fields.get(0)), Table(self.strategy, &self.date, self.hash), - KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), + KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd), ); } CommandType::VCard => { @@ -307,7 +296,7 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { buf, "select count(*) from {} where {}", Table(self.strategy, &self.date, self.hash), - KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), + KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd), ); } CommandType::VAdd => { @@ -325,7 +314,7 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { "update {} set {} where {}", Table(self.strategy, &self.date, self.hash), UpdateFields(&self.vcmd.fields), - KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), + KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd), ); } CommandType::VDel => { @@ -333,7 +322,7 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { buf, "delete from {} where {}", Table(self.strategy, &self.date, self.hash), - KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), + KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd), ); } _ => { @@ -343,110 +332,3 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { } } } - -pub struct SiSqlBuilder<'a, S> { - vcmd: &'a VectorCmd, - hash: i64, - strategy: &'a S, -} - -impl<'a, S: Strategy> SiSqlBuilder<'a, S> { - pub fn new(vcmd: &'a VectorCmd, hash: i64, strategy: &'a S) -> Result { - if vcmd.keys.len() != strategy.keys().len() { - Err(Error::RequestProtocolInvalid) - } else { - Ok(Self { - vcmd, - hash, - strategy, - }) - } - } -} - -impl<'a, S> MysqlBinary for SiSqlBuilder<'a, S> { - fn mysql_cmd(&self) -> Command { - Command::COM_QUERY - } -} - -impl<'a, S: Strategy> VectorSqlBuilder for SiSqlBuilder<'a, S> { - fn len(&self) -> usize { - 128 - } - - // (1) 根据object_type查用户的si数据 - // select uid, start_date as stat_date, sum(count) as count from $db$.$tb$ where uid=? and object_type in(?) group by uid, start_date order by start_date desc - - // (2) 查用户所有的si数据 - // select uid, start_date as stat_date, sum(count) as count from $db$.$tb$ where uid=? group by start_date order by start_date desc - - // select date,count字段名, - // 条件需要key,字段名 - fn write_sql(&self, buf: &mut impl Write) { - match self.vcmd.cmd { - CommandType::VRange => { - let _ = write!( - buf, - "select {} from {} where {}", - SiSelect(self.strategy.keys(), self.strategy.si_cols()), - SiTable(self.strategy, self.hash), - SiKeysAndCondsAndOrder(self.strategy, &self.vcmd), - ); - } - _ => { - //校验应该在parser_req出 - panic!("not support cmd_type:{:?}", self.vcmd.cmd); - } - } - } -} - -//keys, cols -struct SiSelect<'a>(&'a [String], &'a [String]); -impl<'a> Display for SiSelect<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // select key, start_date, sum(count) - write!( - f, - "{},{},sum({})", - self.0[0], - self.1[0], - self.1.last().unwrap() - ) - } -} - -struct SiKeysAndCondsAndOrder<'a, S>(&'a S, &'a VectorCmd); -impl<'a, S: Strategy> Display for SiKeysAndCondsAndOrder<'a, S> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let &Self(strategy, VectorCmd { keys, wheres, .. }) = self; - let key_name = &strategy.keys()[0]; - let cols = strategy.si_cols(); - let _ = write!(f, "`{}`={}", key_name, Val(&keys[0])); - for w in wheres { - //条件中和si相同的列写入条件 - for col in cols { - if w.field.equal(col.as_bytes()) { - let _ = write!(f, " and {}", ConditionDisplay(w)); - break; - } - } - } - //按key和日期group,按日期倒叙排 - let _ = write!( - f, - " group by {},{} order by {} desc", - key_name, cols[0], cols[0] - ); - Ok(()) - } -} - -struct SiTable<'a, S>(&'a S, i64); -impl<'a, S: Strategy> Display for SiTable<'a, S> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.write_si_database_table(f, self.1); - Ok(()) - } -} diff --git a/protocol/src/vector/packet.rs b/protocol/src/vector/packet.rs index 8d4898a9a..8a9302fd4 100644 --- a/protocol/src/vector/packet.rs +++ b/protocol/src/vector/packet.rs @@ -4,17 +4,14 @@ use std::{fmt::Display, num::NonZeroUsize}; use ds::{ByteOrder, RingSlice}; -use crate::{ - kv::{ - common::{ - constants::{CapabilityFlags, StatusFlags}, - error::Error::MySqlError, - packets::{ErrPacket, OkPacket, OkPacketDeserializer, OkPacketKind}, - ParseBuf, - }, - error::{Error, Result}, +use crate::kv::{ + common::{ + constants::{CapabilityFlags, StatusFlags}, + error::Error::MySqlError, + packets::{ErrPacket, OkPacket, OkPacketDeserializer, OkPacketKind}, + ParseBuf, }, - Command, ResponseHeader, + error::{Error, Result}, }; const HEADER_LEN: usize = 4; @@ -194,56 +191,3 @@ impl Display for MysqlPacket { write!(f, "seq: {}, payload:{:?}", self.seq, self.payload) } } - -#[derive(Debug)] -pub struct RedisPackAssemble { - pub(crate) header: Vec, - pub(crate) body: Vec, - pub(crate) rows: u16, - pub(crate) columns: u16, -} - -#[derive(Debug)] -pub struct RedisPackSimple { - pub(crate) packet: Vec, -} - -#[derive(Debug)] -pub enum RedisPack { - Assemble(RedisPackAssemble), - Simple(RedisPackSimple), -} - -impl RedisPack { - #[inline] - pub fn with_simple(packet: Vec) -> Self { - let pack = RedisPackSimple { packet }; - RedisPack::Simple(pack) - } - - #[inline] - pub fn with_assamble(header: Vec, body: Vec, rows: u16, columns: u16) -> Self { - let assemble = RedisPackAssemble { - header, - body, - rows, - columns, - }; - RedisPack::Assemble(assemble) - } - - #[inline] - pub fn to_cmd(self, ok: bool) -> Command { - match self { - RedisPack::Simple(pack) => { - let mem = ds::MemGuard::from_vec(pack.packet); - Command::from(ok, mem) - } - RedisPack::Assemble(assemble) => { - let body = ds::MemGuard::from_vec(assemble.body); - let header = ResponseHeader::new(assemble.header, assemble.rows, assemble.columns); - Command::with_assemble_pack(ok, header, body) - } - } - } -} diff --git a/protocol/src/vector/query_result.rs b/protocol/src/vector/query_result.rs index 994d86428..0b00872c8 100644 --- a/protocol/src/vector/query_result.rs +++ b/protocol/src/vector/query_result.rs @@ -4,7 +4,7 @@ use crate::kv::error::Result; pub use crate::kv::common::proto::Text; -use super::packet::{MysqlRawPacket, RedisPack}; +use super::packet::MysqlRawPacket; use crate::kv::common::{io::ParseBuf, packets::OkPacket, row::RowDeserializer}; use std::marker::PhantomData; @@ -125,7 +125,7 @@ impl QueryResult { /// 解析meta后面的rows #[inline(always)] - pub(crate) fn parse_rows_to_redis(&mut self, oft: &mut usize) -> Result { + pub(crate) fn parse_rows_to_redis(&mut self, oft: &mut usize) -> Result> { // 解析出mysql rows // 改为每次只处理本次的响应 let mut rows = Vec::with_capacity(8); @@ -182,12 +182,12 @@ impl QueryResult { } #[inline] -pub fn format_to_redis(rows: &Vec) -> RedisPack { +pub fn format_to_redis(rows: &Vec) -> Vec { let mut data = Vec::with_capacity(32 * rows.len()); // 响应为空,返回 if rows.len() == 0 { data.extend_from_slice("$-1\r\n".as_bytes()); - return RedisPack::with_simple(data); + return data; } let columns = rows.get(0).expect("columns unexists").columns_ref(); @@ -198,11 +198,12 @@ pub fn format_to_redis(rows: &Vec) -> RedisPack { const VCARD_NAME: &[u8] = b"count(*)"; if columns.len() == 1 && columns[0].name_ref().eq(VCARD_NAME) { format_for_vcard(rows, &mut data); - return RedisPack::with_simple(data); + return data; } // 至此,只有vrange了(select * ..),后续可能还有其他协议 - format_for_commons(rows, columns) + format_for_commons(rows, &mut data, columns); + data } fn format_for_vcard(rows: &Vec, data: &mut Vec) { @@ -217,37 +218,29 @@ fn format_for_vcard(rows: &Vec, data: &mut Vec) { } /// 为vrange等带column header + rows的指令构建响应 -fn format_for_commons(rows: &Vec, columns: &[Column]) -> RedisPack { - // TODO old 实现,测试完毕后清理 fishermen +fn format_for_commons(rows: &Vec, data: &mut Vec, columns: &[Column]) { // 构建 resp协议的header总array计数 以及 column的计数 - // header.put(&b"*2\r\n*"[..]); - - // header记录*2以及column name,最后发送时再拼装,其只拼装第一个包的header - let mut header = Vec::with_capacity(6 + 8 * columns.len()); - header.put(&b"*2\r\n*"[..]); - header.put(columns.len().to_string().as_bytes()); - header.put(CRLF); + data.put(&b"*2\r\n*"[..]); + data.put(columns.len().to_string().as_bytes()); + data.put(CRLF); // 构建columns 内容 for idx in 0..columns.len() { let col = columns.get(idx).expect("column"); - header.push(b'+'); - header.put(col.name_str().as_bytes()); - header.put(CRLF); + data.push(b'+'); + data.put(col.name_str().as_bytes()); + data.put(CRLF); } // 构建column values // 先构建value的header - // let val_count = columns.len() * rows.len(); - // data.put_u8(b'*'); - // data.put(val_count.to_string().as_bytes()); - // data.put(CRLF); + let val_count = columns.len() * rows.len(); + data.put_u8(b'*'); + data.put(val_count.to_string().as_bytes()); + data.put(CRLF); // 再写入每个row的val - - let mut body = Vec::with_capacity(64 * rows.len()); for ri in 0..rows.len() { let row = rows.get(ri).expect("row unexists"); - row.write_as_redis(&mut body); + row.write_as_redis(data); } - RedisPack::with_assamble(header, body, rows.len() as u16, columns.len() as u16) } diff --git a/protocol/src/vector/redis.rs b/protocol/src/vector/redis.rs index 2eb025183..aa6bc4f06 100644 --- a/protocol/src/vector/redis.rs +++ b/protocol/src/vector/redis.rs @@ -1,14 +1,15 @@ use super::{command::get_cfg, flager::KvFlager, *}; - -use crate::{Flag, Packet, Result}; +use crate::{HashedCommand, Packet, Result}; use ds::RingSlice; pub(crate) const FIELD_BYTES: &'static [u8] = b"FIELD"; + pub(crate) const KVECTOR_SEPARATOR: u8 = b','; /// 根据parse的结果,此处进一步获得kvector的detail/具体字段信息,以便进行sql构建 -pub fn parse_vector_detail(cmd: RingSlice, flag: &Flag) -> crate::Result { - let data = Packet::from(cmd); +pub fn parse_vector_detail(cmd: &HashedCommand) -> crate::Result { + let data = Packet::from(cmd.sub_slice(0, cmd.len())); + let flag = cmd.flag(); let mut vcmd: VectorCmd = Default::default(); vcmd.cmd = get_cfg(flag.op_code())?.cmd_type; diff --git a/protocol/src/vector/reqpacket.rs b/protocol/src/vector/reqpacket.rs index a8e05dd36..e00d3fddc 100644 --- a/protocol/src/vector/reqpacket.rs +++ b/protocol/src/vector/reqpacket.rs @@ -103,14 +103,10 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> { assert!(cfg.has_key, "{:?}", self); let key = self.parse_key(flag)?; - if self.bulks == 0 { - return Ok(Some(self.main_key(&key))); - } - // 如果有field,则接下来解析fields,不管是否有field,统一先把where这个token消费掉,方便后续统一从condition位置解析 if cfg.can_hold_field { self.parse_fields(flag)?; - } else if cfg.can_hold_where_condition { + } else if self.bulks > 0 && cfg.can_hold_where_condition { // 如果还有bulks,且该cmd可以hold where condition,此处肯定是where token,直接读出skip掉 let token = self.next_bulk_string()?; if !token.equal_ignore_case(BYTES_WHERE) || self.bulks < 1 { @@ -132,7 +128,8 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> { log::debug!("++++ after condition parsed oft:{}", self.oft); // 返回main-key,不带sub-key、ext-key - Ok(Some(self.main_key(&key))) + let main_key_len = key.find(0, KEY_SEPERATOR).map_or(key.len(), |len| len); + Ok(Some(key.sub_slice(0, main_key_len))) } #[inline] @@ -162,12 +159,16 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> { } } - /// 获取主key,不带sub-key、ext-key - #[inline] - fn main_key(&self, key: &RingSlice) -> RingSlice { - let main_key_len = key.find(0, KEY_SEPERATOR).map_or(key.len(), |len| len); - key.sub_slice(0, main_key_len) - } + // #[inline] + // fn parse_cmd_name(&mut self) -> Result<()> { + // // 第一个bulk是bulk-string类型的cmd + // let cmd = self.next_bulk_string()?; + // self.cmd_type = cmd.into(); + // if self.cmd_type.is_invalid() { + // return Err(Error::FlushOnClose(ERR_UNSUPPORT_CMD.into())); + // } + // Ok(()) + // } /// 读取下一个bulk string,bulks会减1 #[inline] diff --git a/protocol/src/vector/rsppacket.rs b/protocol/src/vector/rsppacket.rs index ba6e1bad9..1be3b8a2d 100644 --- a/protocol/src/vector/rsppacket.rs +++ b/protocol/src/vector/rsppacket.rs @@ -19,7 +19,6 @@ use crate::{Command, StreamContext}; use ds::RingSlice; use super::packet::MysqlRawPacket; -use super::packet::RedisPack; use super::query_result::{QueryResult, Text}; // const HEADER_LEN: usize = 4; @@ -93,7 +92,6 @@ impl<'a, S: crate::Stream> ResponsePacket<'a, S> { let mut query_result: QueryResult = QueryResult::new(self.data.clone(), self.has_results, meta); // 解析出mysql rows - // let (redis_data, count) = query_result.parse_rows_to_redis(&mut self.oft)?; let redis_data = query_result.parse_rows_to_redis(&mut self.oft)?; // 构建响应 @@ -138,16 +136,10 @@ impl<'a, S: crate::Stream> ResponsePacket<'a, S> { /// 构建最终的响应,并对已解析的内容进行take #[inline(always)] - pub(super) fn build_final_rsp_cmd(&mut self, ok: bool, redis_pack: RedisPack) -> Command { + pub(super) fn build_final_rsp_cmd(&mut self, ok: bool, rsp_data: Vec) -> Command { // 构建最终返回给client的响应内容 - let cmd = redis_pack.to_cmd(ok); - - // TODO 冲突,暂时注释掉 - // // 构建最终返回给client的响应内容 - // let mem = ds::MemGuard::from_vec(rsp_data); - // let mut cmd = Command::from(ok, mem); - // cmd.set_count(count); - + let mem = ds::MemGuard::from_vec(rsp_data); + let cmd = Command::from(ok, mem); log::debug!("+++ build kvector rsp, ok:{} => {:?}", ok, cmd); // 返回最终响应前,take走已经解析的数据 diff --git a/stream/src/context.rs b/stream/src/context.rs index 10df719b3..6987f406f 100644 --- a/stream/src/context.rs +++ b/stream/src/context.rs @@ -1,8 +1,8 @@ use std::{marker::PhantomData, ptr::NonNull, sync::Arc}; use protocol::{ - callback::CallbackContext, request::Request, Attachment, Command, Commander, HashedCommand, - Metric, MetricItem, Protocol, + callback::CallbackContext, request::Request, Command, Commander, HashedCommand, Metric, + MetricItem, Protocol, }; use crate::arena::CallbackContextArena; @@ -118,8 +118,4 @@ impl<'a, M: Metric, T: MetricItem, F: Fn(i64) -> usize> Commander fn ctx(&self) -> u64 { self.ctx.flag() } - #[inline] - fn attachment(&self) -> Option<&Attachment> { - self.ctx.attachment() - } } diff --git a/stream/src/handler.rs b/stream/src/handler.rs index 1659f0b02..c809d8d9a 100644 --- a/stream/src/handler.rs +++ b/stream/src/handler.rs @@ -145,7 +145,7 @@ where // 统计请求耗时。 self.rtt += start.elapsed(); self.parser.check(&*req, &cmd); - req.on_complete(&self.parser, cmd); + req.on_complete(cmd); continue; } if l == self.s.len() { diff --git a/stream/src/pipeline.rs b/stream/src/pipeline.rs index b7a9e6163..f0e8490c2 100644 --- a/stream/src/pipeline.rs +++ b/stream/src/pipeline.rs @@ -11,7 +11,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use crate::topology::TopologyCheck; use ds::{time::Instant, AtomicWaker}; use endpoint::Topology; -use protocol::{Attachment, Error::FlushOnClose}; +use protocol::Error::FlushOnClose; use protocol::{HashedCommand, Protocol, Result, Stream}; use crate::{ @@ -124,7 +124,6 @@ where arena: &mut self.arena, retry_on_rsp_notok: self.parser.config().retry_on_rsp_notok, parser: &self.parser, - has_attach: self.top.has_attach(), }; self.parser @@ -164,6 +163,7 @@ where *self.metrics.key() += 1; let mut response = ctx.take_response(); + self.parser.write_response( &mut ResponseContext::new(&mut ctx, &self.metrics, |hash| self.top.shard_idx(hash)), response.as_mut(), @@ -222,7 +222,6 @@ struct Visitor<'a, T, P> { first: &'a mut bool, arena: &'a mut CallbackContextArena, retry_on_rsp_notok: bool, - has_attach: bool, } impl<'a, T: Topology + TopologyCheck, P: Protocol> protocol::RequestProcessor @@ -236,12 +235,6 @@ impl<'a, T: Topology + TopologyCheck, P: Protocol> protocol::Req *self.first = last; let cb = self.top.callback(); let req_op = cmd.operation(); - let drop_attach: Option> = if self.has_attach { - let parser = self.parser.clone(); - Some(Box::new(move |att| parser.drop_attach(att))) - } else { - None - }; let ctx = self.arena.alloc(CallbackContext::new( cmd, self.waker, @@ -250,7 +243,6 @@ impl<'a, T: Topology + TopologyCheck, P: Protocol> protocol::Req last, self.retry_on_rsp_notok, self.parser.max_tries(req_op), - drop_attach, )); let mut ctx = CallbackContextPtr::from(ctx, self.arena); diff --git a/stream/src/topology.rs b/stream/src/topology.rs index a185c2125..dd08023df 100644 --- a/stream/src/topology.rs +++ b/stream/src/topology.rs @@ -83,7 +83,4 @@ impl Topology for CheckedTopology { fn exp_sec(&self) -> u32 { self.top.exp_sec() } - fn has_attach(&self) -> bool { - self.top.has_attach() - } } diff --git a/tests/src/benches/redis.rs b/tests/src/benches/redis.rs index 99031b731..5f681223a 100644 --- a/tests/src/benches/redis.rs +++ b/tests/src/benches/redis.rs @@ -173,10 +173,6 @@ mod proto_hook { fn ctx(&self) -> u64 { todo!() } - - fn attachment(&self) -> Option<&protocol::Attachment> { - todo!() - } } #[derive(Debug)] pub(crate) struct TestStream { diff --git a/tests/src/layout.rs b/tests/src/layout.rs index 1518bb3a0..293972061 100644 --- a/tests/src/layout.rs +++ b/tests/src/layout.rs @@ -49,10 +49,6 @@ fn checkout_basic() { assert_eq!(40, size_of::()); assert_eq!(192, size_of::()); assert_eq!(24, size_of::()); - assert_eq!( - size_of::(), - size_of::() - ); } // 如果要验证 layout-min模式,需要 --features layout-min --release --no-default-features @@ -64,7 +60,7 @@ fn check_layout_rx_buffer() { #[ignore] #[test] fn check_callback_ctx() { - assert_eq!(520, size_of::()); + assert_eq!(192, size_of::()); //assert_eq!(16, size_of::()); } //#[ignore] diff --git a/tests/src/mq/protocol.rs b/tests/src/mq/protocol.rs index f24612a8f..a3f225327 100644 --- a/tests/src/mq/protocol.rs +++ b/tests/src/mq/protocol.rs @@ -1,11 +1,11 @@ -use std::cell::UnsafeCell; - use crate::proto_hook; use protocol::{ msgque::{MsgQue, OP_GET, OP_QUIT, OP_SET, OP_STATS, OP_VERSION}, - Attachment, BufRead, Commander, Error, HashedCommand, Metric, Proto, + Error, Proto, }; +use protocol::BufRead; + /// 请求以任意长度发送 #[test] fn test_req_reenter() { @@ -228,67 +228,6 @@ fn test_rsp() { } } -#[allow(dead_code)] -struct TestCtx { - req: HashedCommand, - metric: TestMetric, -} - -impl TestCtx { - #[allow(dead_code)] - fn new(req: HashedCommand) -> Self { - Self { - req, - metric: TestMetric { - item: UnsafeCell::new(TestMetricItem {}), - }, - } - } -} - -struct TestMetricItem {} -impl std::ops::AddAssign for TestMetricItem { - fn add_assign(&mut self, _rhs: i64) {} -} -impl std::ops::AddAssign for TestMetricItem { - fn add_assign(&mut self, _rhs: bool) {} -} - -struct TestMetric { - item: UnsafeCell, -} -impl Metric for TestMetric { - fn get(&self, _name: protocol::MetricName) -> &mut TestMetricItem { - unsafe { &mut *self.item.get() } - } -} - -impl Commander for TestCtx { - fn request_mut(&mut self) -> &mut HashedCommand { - todo!() - } - - fn request(&self) -> &HashedCommand { - &self.req - } - - fn request_shard(&self) -> usize { - todo!() - } - - fn metric(&self) -> &TestMetric { - &self.metric - } - - fn ctx(&self) -> u64 { - todo!() - } - - fn attachment(&self) -> Option<&Attachment> { - todo!() - } -} - #[test] fn test_write_response() { let proto = MsgQue; diff --git a/tests/src/proto_hook.rs b/tests/src/proto_hook.rs index 22d3952ad..42ede040c 100644 --- a/tests/src/proto_hook.rs +++ b/tests/src/proto_hook.rs @@ -77,10 +77,6 @@ impl Commander for TestCtx { fn ctx(&self) -> u64 { todo!() } - - fn attachment(&self) -> Option<&protocol::Attachment> { - todo!() - } } #[derive(Debug)] pub(crate) struct TestStream {