Skip to content

Commit

Permalink
feat: expire family
Browse files Browse the repository at this point in the history
  • Loading branch information
pxseu committed Mar 31, 2024
1 parent 1cb8ad9 commit 90ed6e9
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 57 deletions.
14 changes: 3 additions & 11 deletions examples/insert_keys.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
use crabdis::error::Result;
use crabdis::storage::value::Value;
use crabdis::CLI;
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<()> {
let cli = CLI {
address: [127, 0, 0, 1].into(),
port: 6379,
threads: 1,
};

let connect_address = format!("{}:{}", cli.address, cli.port);

let mut stream = TcpStream::connect(connect_address).await?;
let mut stream = TcpStream::connect("localhost:6379").await?;
let (mut reader, mut writer) = stream.split();
let mut bufreader = BufReader::new(&mut reader);

for i in 0..1000 {
for i in 0..1_000_000 {
let req = Value::Multi(
vec![
Value::String("SET".into()),
Expand All @@ -34,6 +25,7 @@ async fn main() -> Result<()> {

writer.flush().await?;

// can return none if the client has disconnected
let Some(resp) = Value::from_resp(&mut bufreader).await? else {
return Ok(());
};
Expand Down
9 changes: 6 additions & 3 deletions src/commands/core/exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ impl CommandTrait for Exists {
let mut count = 0;
while let Some(key) = args.pop_front() {
match key {
Value::String(k) => {
if store.contains_key(&k) {
Value::String(k) => match store.get(&k) {
// Expired keys are not counted
Some(v) if v.expired() => {}
Some(_) => {
count += 1;
}
}
None => {}
},

_ => {
return value_error!("Invalid key").to_resp(writer).await;
Expand Down
1 change: 1 addition & 0 deletions src/commands/core/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl CommandTrait for Keys {
};

let mut keys = VecDeque::new();

for key in context.store.read().await.keys() {
if pattern.matches(key) {
keys.push_back(Value::String(key.clone()));
Expand Down
1 change: 1 addition & 0 deletions src/commands/core/mset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl CommandTrait for MSet {
}

let mut store = context.store.write().await;

while let Some(key) = args.pop_front() {
match key {
Value::String(k) => {
Expand Down
7 changes: 5 additions & 2 deletions src/commands/core/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ impl CommandTrait for Set {

let key = match args.pop_front() {
Some(Value::String(key)) => key,
_ => {
Some(_) => {
return value_error!("Invalid key").to_resp(writer).await;
}
None => {
return value_error!("Missing key").to_resp(writer).await;
}
};

let value = match args.pop_front() {
Some(value) => value,
_ => {
return value_error!("Invalid value").to_resp(writer).await;
return value_error!("Missing value").to_resp(writer).await;
}
};

Expand Down
66 changes: 66 additions & 0 deletions src/commands/expire/expire.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use crate::prelude::*;

pub struct Expire;

#[async_trait]
impl CommandTrait for Expire {
fn name(&self) -> &str {
"EXPIRE"
}

async fn handle_command(
&self,
writer: &mut WriteHalf,
args: &mut VecDeque<Value>,
context: ContextRef,
) -> Result<()> {
if args.len() != 2 {
return value_error!("Invalid number of arguments")
.to_resp(writer)
.await;
}

let key = match args.pop_front() {
Some(Value::String(key)) => key,
Some(_) => {
return value_error!("Invalid key").to_resp(writer).await;
}
None => {
return value_error!("Missing key").to_resp(writer).await;
}
};

let seconds = match args.pop_front() {
Some(Value::Integer(seconds)) => seconds,
Some(Value::String(seconds)) => seconds.parse::<i64>().unwrap_or(-1),
Some(_) => {
return value_error!("Invalid seconds").to_resp(writer).await;
}
None => {
return value_error!("Missing seconds").to_resp(writer).await;
}
};

if seconds < 0 {
return value_error!("Invalid seconds").to_resp(writer).await;
}

let mut store = context.store.write().await;

let value = match store.get_mut(&key) {
Some(Value::Expire((inner, _))) => inner,
Some(inner) => inner,
_ => {
return value_error!("Key not found").to_resp(writer).await;
}
};

*value = Value::Expire((
Box::new(value.to_owned()),
tokio::time::Instant::now() + tokio::time::Duration::from_secs(seconds as u64),
));
context.expire_keys.write().await.insert(key);

Value::Ok.to_resp(writer).await
}
}
7 changes: 7 additions & 0 deletions src/commands/expire/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod expire;
mod setex;
mod ttl;

pub use expire::Expire;
pub use setex::SetEx;
pub use ttl::Ttl;
66 changes: 66 additions & 0 deletions src/commands/expire/setex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use crate::prelude::*;

pub struct SetEx;

#[async_trait]
impl CommandTrait for SetEx {
fn name(&self) -> &str {
"SETEX"
}

async fn handle_command(
&self,
writer: &mut WriteHalf,
args: &mut VecDeque<Value>,
context: ContextRef,
) -> Result<()> {
if args.len() != 3 {
return value_error!("Invalid number of arguments")
.to_resp(writer)
.await;
}

let key = match args.pop_front() {
Some(Value::String(key)) => key,
Some(_) => {
return value_error!("Invalid key").to_resp(writer).await;
}
None => {
return value_error!("Missing key").to_resp(writer).await;
}
};

let seconds = match args.pop_front() {
Some(Value::Integer(seconds)) => seconds,
Some(Value::String(seconds)) => seconds.parse::<i64>().unwrap_or(-1),
Some(_) => {
return value_error!("Invalid seconds").to_resp(writer).await;
}
None => {
return value_error!("Missing seconds").to_resp(writer).await;
}
};

if seconds < 0 {
return value_error!("Invalid seconds").to_resp(writer).await;
}

let value = match args.pop_front() {
Some(value) => value,
_ => {
return value_error!("Missing value").to_resp(writer).await;
}
};

context.store.write().await.insert(
key.clone(),
Value::Expire((
Box::new(value),
tokio::time::Instant::now() + tokio::time::Duration::from_secs(seconds as u64),
)),
);
context.expire_keys.write().await.insert(key);

Value::Ok.to_resp(writer).await
}
}
55 changes: 55 additions & 0 deletions src/commands/expire/ttl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::prelude::*;

pub struct Ttl;

#[async_trait]
impl CommandTrait for Ttl {
fn name(&self) -> &str {
"TTL"
}

async fn handle_command(
&self,
writer: &mut WriteHalf,
args: &mut VecDeque<Value>,
context: ContextRef,
) -> Result<()> {
if args.len() != 1 {
return value_error!("Invalid number of arguments")
.to_resp(writer)
.await;
}

let key = match args.pop_front() {
Some(Value::String(key)) => key,
Some(_) => {
return value_error!("Invalid key").to_resp(writer).await;
}
None => {
return value_error!("Missing key").to_resp(writer).await;
}
};

let store = context.store.read().await;

let duration = match store.get(&key) {
Some(Value::Expire((_, ttl))) => {
let duration = ttl.duration_since(tokio::time::Instant::now()).as_secs() as i64;

if duration != 0 {
duration
} else {
-2
}
}

// non-expire keys should return -1
Some(_) => -1,

// not found keys should return -2
None => -2,
};

Value::Integer(duration).to_resp(writer).await
}
}
39 changes: 3 additions & 36 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod core;
pub mod expire;

use std::sync::Arc;

Expand Down Expand Up @@ -45,6 +46,8 @@ impl CommandHandler {
core::Exists,
core::FlushDB,
);

register_commands!(self, expire::Expire, expire::Ttl, expire::SetEx);
}

async fn register_command<C>(&mut self, command: C)
Expand Down Expand Up @@ -82,42 +85,6 @@ impl CommandHandler {
// ) -> Result<Value> {
// let response = match command {

// "MSET" => {
// let mut data = HashMap::new();

// if args.len() % 2 != 0 {
// return Ok(value_error!("Invalid number of arguments"));
// }

// for kv in args.iter().collect::<Vec<_>>().chunks_exact(2) {
// let key = match kv[0].to_owned() {
// Value::String(key) => key,
// _ => {
// return Ok(value_error!("Invalid key"));
// }
// };

// data.insert(key, kv[1].to_owned());
// }

// store.mset(data).await;

// Value::Ok
// }

// "KEYS" => {
// if args.len() > 1 {
// return Ok(value_error!("Invalid number of arguments"));
// }

// let _pattern = match args.pop_front() {
// Some(Value::String(pattern)) => Some(pattern),
// _ => None,
// };

// store.keys().await
// }

// "HGET" => {
// if args.len() != 2 {
// return Ok(value_error!("Invalid number of arguments"));
Expand Down
Loading

0 comments on commit 90ed6e9

Please sign in to comment.