diff --git a/examples/insert_keys.rs b/examples/insert_keys.rs index 2de3bd0..32889b6 100644 --- a/examples/insert_keys.rs +++ b/examples/insert_keys.rs @@ -1,24 +1,15 @@ use crabdis::error::Result; use crabdis::storage::value::Value; -use crabdis::CLI; use tokio::io::{AsyncWriteExt, BufReader}; use tokio::net::TcpStream; #[tokio::main] async fn main() -> Result<()> { - let cli = CLI { - address: [127, 0, 0, 1].into(), - port: 6379, - threads: 1, - }; - - let connect_address = format!("{}:{}", cli.address, cli.port); - - let mut stream = TcpStream::connect(connect_address).await?; + let mut stream = TcpStream::connect("localhost:6379").await?; let (mut reader, mut writer) = stream.split(); let mut bufreader = BufReader::new(&mut reader); - for i in 0..1000 { + for i in 0..1_000_000 { let req = Value::Multi( vec![ Value::String("SET".into()), @@ -34,6 +25,7 @@ async fn main() -> Result<()> { writer.flush().await?; + // can return none if the client has disconnected let Some(resp) = Value::from_resp(&mut bufreader).await? else { return Ok(()); }; diff --git a/src/commands/core/exists.rs b/src/commands/core/exists.rs index 5e38ffd..e2430ba 100644 --- a/src/commands/core/exists.rs +++ b/src/commands/core/exists.rs @@ -25,11 +25,14 @@ impl CommandTrait for Exists { let mut count = 0; while let Some(key) = args.pop_front() { match key { - Value::String(k) => { - if store.contains_key(&k) { + Value::String(k) => match store.get(&k) { + // Expired keys are not counted + Some(v) if v.expired() => {} + Some(_) => { count += 1; } - } + None => {} + }, _ => { return value_error!("Invalid key").to_resp(writer).await; diff --git a/src/commands/core/keys.rs b/src/commands/core/keys.rs index da4ae91..ff1801e 100644 --- a/src/commands/core/keys.rs +++ b/src/commands/core/keys.rs @@ -30,6 +30,7 @@ impl CommandTrait for Keys { }; let mut keys = VecDeque::new(); + for key in context.store.read().await.keys() { if pattern.matches(key) { keys.push_back(Value::String(key.clone())); diff --git a/src/commands/core/mset.rs b/src/commands/core/mset.rs index e4c1cbe..391efc3 100644 --- a/src/commands/core/mset.rs +++ b/src/commands/core/mset.rs @@ -21,6 +21,7 @@ impl CommandTrait for MSet { } let mut store = context.store.write().await; + while let Some(key) = args.pop_front() { match key { Value::String(k) => { diff --git a/src/commands/core/set.rs b/src/commands/core/set.rs index c62db32..859d1c6 100644 --- a/src/commands/core/set.rs +++ b/src/commands/core/set.rs @@ -22,15 +22,18 @@ impl CommandTrait for Set { let key = match args.pop_front() { Some(Value::String(key)) => key, - _ => { + Some(_) => { return value_error!("Invalid key").to_resp(writer).await; } + None => { + return value_error!("Missing key").to_resp(writer).await; + } }; let value = match args.pop_front() { Some(value) => value, _ => { - return value_error!("Invalid value").to_resp(writer).await; + return value_error!("Missing value").to_resp(writer).await; } }; diff --git a/src/commands/expire/expire.rs b/src/commands/expire/expire.rs new file mode 100644 index 0000000..2de78b4 --- /dev/null +++ b/src/commands/expire/expire.rs @@ -0,0 +1,66 @@ +use crate::prelude::*; + +pub struct Expire; + +#[async_trait] +impl CommandTrait for Expire { + fn name(&self) -> &str { + "EXPIRE" + } + + async fn handle_command( + &self, + writer: &mut WriteHalf, + args: &mut VecDeque, + context: ContextRef, + ) -> Result<()> { + if args.len() != 2 { + return value_error!("Invalid number of arguments") + .to_resp(writer) + .await; + } + + let key = match args.pop_front() { + Some(Value::String(key)) => key, + Some(_) => { + return value_error!("Invalid key").to_resp(writer).await; + } + None => { + return value_error!("Missing key").to_resp(writer).await; + } + }; + + let seconds = match args.pop_front() { + Some(Value::Integer(seconds)) => seconds, + Some(Value::String(seconds)) => seconds.parse::().unwrap_or(-1), + Some(_) => { + return value_error!("Invalid seconds").to_resp(writer).await; + } + None => { + return value_error!("Missing seconds").to_resp(writer).await; + } + }; + + if seconds < 0 { + return value_error!("Invalid seconds").to_resp(writer).await; + } + + let mut store = context.store.write().await; + + let value = match store.get_mut(&key) { + Some(Value::Expire((inner, _))) => inner, + Some(inner) => inner, + _ => { + return value_error!("Key not found").to_resp(writer).await; + } + }; + + *value = Value::Expire(( + Box::new(value.to_owned()), + tokio::time::Instant::now() + tokio::time::Duration::from_secs(seconds as u64), + )); + context.expire_keys.write().await.insert(key); + + Value::Ok.to_resp(writer).await + } +} diff --git a/src/commands/expire/mod.rs b/src/commands/expire/mod.rs new file mode 100644 index 0000000..771bb44 --- /dev/null +++ b/src/commands/expire/mod.rs @@ -0,0 +1,7 @@ +mod expire; +mod setex; +mod ttl; + +pub use expire::Expire; +pub use setex::SetEx; +pub use ttl::Ttl; diff --git a/src/commands/expire/setex.rs b/src/commands/expire/setex.rs new file mode 100644 index 0000000..371133c --- /dev/null +++ b/src/commands/expire/setex.rs @@ -0,0 +1,66 @@ +use crate::prelude::*; + +pub struct SetEx; + +#[async_trait] +impl CommandTrait for SetEx { + fn name(&self) -> &str { + "SETEX" + } + + async fn handle_command( + &self, + writer: &mut WriteHalf, + args: &mut VecDeque, + context: ContextRef, + ) -> Result<()> { + if args.len() != 3 { + return value_error!("Invalid number of arguments") + .to_resp(writer) + .await; + } + + let key = match args.pop_front() { + Some(Value::String(key)) => key, + Some(_) => { + return value_error!("Invalid key").to_resp(writer).await; + } + None => { + return value_error!("Missing key").to_resp(writer).await; + } + }; + + let seconds = match args.pop_front() { + Some(Value::Integer(seconds)) => seconds, + Some(Value::String(seconds)) => seconds.parse::().unwrap_or(-1), + Some(_) => { + return value_error!("Invalid seconds").to_resp(writer).await; + } + None => { + return value_error!("Missing seconds").to_resp(writer).await; + } + }; + + if seconds < 0 { + return value_error!("Invalid seconds").to_resp(writer).await; + } + + let value = match args.pop_front() { + Some(value) => value, + _ => { + return value_error!("Missing value").to_resp(writer).await; + } + }; + + context.store.write().await.insert( + key.clone(), + Value::Expire(( + Box::new(value), + tokio::time::Instant::now() + tokio::time::Duration::from_secs(seconds as u64), + )), + ); + context.expire_keys.write().await.insert(key); + + Value::Ok.to_resp(writer).await + } +} diff --git a/src/commands/expire/ttl.rs b/src/commands/expire/ttl.rs new file mode 100644 index 0000000..7deab7d --- /dev/null +++ b/src/commands/expire/ttl.rs @@ -0,0 +1,55 @@ +use crate::prelude::*; + +pub struct Ttl; + +#[async_trait] +impl CommandTrait for Ttl { + fn name(&self) -> &str { + "TTL" + } + + async fn handle_command( + &self, + writer: &mut WriteHalf, + args: &mut VecDeque, + context: ContextRef, + ) -> Result<()> { + if args.len() != 1 { + return value_error!("Invalid number of arguments") + .to_resp(writer) + .await; + } + + let key = match args.pop_front() { + Some(Value::String(key)) => key, + Some(_) => { + return value_error!("Invalid key").to_resp(writer).await; + } + None => { + return value_error!("Missing key").to_resp(writer).await; + } + }; + + let store = context.store.read().await; + + let duration = match store.get(&key) { + Some(Value::Expire((_, ttl))) => { + let duration = ttl.duration_since(tokio::time::Instant::now()).as_secs() as i64; + + if duration != 0 { + duration + } else { + -2 + } + } + + // non-expire keys should return -1 + Some(_) => -1, + + // not found keys should return -2 + None => -2, + }; + + Value::Integer(duration).to_resp(writer).await + } +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs index e8221a8..d10a0c1 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,4 +1,5 @@ pub mod core; +pub mod expire; use std::sync::Arc; @@ -45,6 +46,8 @@ impl CommandHandler { core::Exists, core::FlushDB, ); + + register_commands!(self, expire::Expire, expire::Ttl, expire::SetEx); } async fn register_command(&mut self, command: C) @@ -82,42 +85,6 @@ impl CommandHandler { // ) -> Result { // let response = match command { -// "MSET" => { -// let mut data = HashMap::new(); - -// if args.len() % 2 != 0 { -// return Ok(value_error!("Invalid number of arguments")); -// } - -// for kv in args.iter().collect::>().chunks_exact(2) { -// let key = match kv[0].to_owned() { -// Value::String(key) => key, -// _ => { -// return Ok(value_error!("Invalid key")); -// } -// }; - -// data.insert(key, kv[1].to_owned()); -// } - -// store.mset(data).await; - -// Value::Ok -// } - -// "KEYS" => { -// if args.len() > 1 { -// return Ok(value_error!("Invalid number of arguments")); -// } - -// let _pattern = match args.pop_front() { -// Some(Value::String(pattern)) => Some(pattern), -// _ => None, -// }; - -// store.keys().await -// } - // "HGET" => { // if args.len() != 2 { // return Ok(value_error!("Invalid number of arguments")); diff --git a/src/context.rs b/src/context.rs index 7319a99..1fd8a02 100644 --- a/src/context.rs +++ b/src/context.rs @@ -2,11 +2,13 @@ use std::sync::Arc; use crate::commands::CommandHandler; use crate::prelude::*; +use crate::storage::ExpireKey; #[derive(Clone, Default)] pub struct Context { pub store: Store, pub commands: CommandHandler, + pub expire_keys: ExpireKey, } impl Context { @@ -15,7 +17,52 @@ impl Context { context.commands.register().await; - Arc::new(context) + let context = Arc::new(context); + + Self::expire_keys_task(context.clone()); + + context + } + + fn expire_keys_task(context: Arc) { + tokio::spawn(async move { + // run every 60 seconds + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); + + // skip the first tick + interval.tick().await; + + loop { + interval.tick().await; + + #[cfg(debug_assertions)] + log::debug!("Running expire keys task"); + + let now = tokio::time::Instant::now(); + + let mut keys_to_remove = Vec::new(); + + for key in context.expire_keys.read().await.iter() { + let expire_at = context.store.read().await; + let expire_at = match expire_at.get(key) { + Some(Value::Expire((_, expire_at))) => expire_at, + _ => continue, + }; + + if now > *expire_at { + keys_to_remove.push(key.clone()); + } + } + + #[cfg(debug_assertions)] + log::debug!("Removing keys: {keys_to_remove:?}"); + + for key in keys_to_remove { + context.store.write().await.remove(&key); + context.expire_keys.write().await.remove(&key); + } + } + }); } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a8749aa..e225fcb 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,5 +1,6 @@ pub mod value; +use std::collections::HashSet; use std::sync::Arc; use tokio::sync::RwLock; @@ -7,3 +8,4 @@ use tokio::sync::RwLock; use crate::prelude::*; pub type Store = Arc>>; +pub type ExpireKey = Arc>>; diff --git a/src/storage/value.rs b/src/storage/value.rs index a056179..66a46cf 100644 --- a/src/storage/value.rs +++ b/src/storage/value.rs @@ -4,7 +4,7 @@ use std::io::Write; use std::pin::Pin; use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; -use tokio::time::Duration; +use tokio::time::Instant; use crate::prelude::*; @@ -17,12 +17,11 @@ pub enum Value { Integer(i64), String(String), Multi(VecDeque), + Expire((Box, Instant)), // i promise i will implement this #[allow(dead_code)] Hashmap(HashMap), - #[allow(dead_code)] - Expire((Box, Duration)), } macro_rules! value_error { @@ -34,6 +33,13 @@ macro_rules! value_error { pub(crate) use value_error; impl Value { + pub fn expired(&self) -> bool { + match self { + Self::Expire((_, expires_at)) => Instant::now() > *expires_at, + _ => false, + } + } + pub fn to_resp<'a, T>( &'a self, writer: &'a mut T, @@ -99,7 +105,14 @@ impl Value { Value::Multi(values).to_resp(writer).await } - Self::Expire((v, _)) => v.to_resp(writer).await, + Self::Expire((v, _)) => { + // check if the value is expired + if Self::expired(&self) { + return Self::Nil.to_resp(writer).await; + } + + v.to_resp(writer).await + } } }) }