Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jans-cedarling): Make SparKV use generics, and update MemoryLogger to use those. #10593

Merged
merged 24 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a90c3af
feat(jans-cedarling): SparKV Generics
djellemah Nov 29, 2024
2c47a66
feat(jans-cedarling): In MemoryLogger, use serde_json::Value as SparK…
djellemah Jan 7, 2025
9e0024c
feat(jans-cedarling): SparKV key-value iterator
djellemah Jan 8, 2025
ae1d79f
feat(jans-cedarling): use SparKV key-value iterator in MemoryLogger f…
djellemah Jan 8, 2025
725fbc0
feat(jans-cedarling): SparKV implement drain similar to std::collections
djellemah Jan 8, 2025
a5841e2
feat(jans-cedarling): use SparKV drain in MemoryLogger for more effic…
djellemah Jan 8, 2025
a2025e7
chore(jans-cedarling): SparKV simplify clear_expired
djellemah Jan 8, 2025
6cd9930
chore(jans-cedarling): simplify some Loggable code
djellemah Jan 8, 2025
bef3c03
feat(jans-cedarling): SparKV add size function for non-byte types
djellemah Jan 8, 2025
5444773
chore(jans-cedarling): SparKV test for serde_json::Value storage
djellemah Jan 8, 2025
916fdf9
chore(jans-cedarling): SparKV small fixes
djellemah Jan 8, 2025
b31220c
chore(jans-cedarling): SparKV move tests to own file
djellemah Jan 8, 2025
647edb1
docs(jans-cedarling): SparKV documentation for generics
djellemah Jan 8, 2025
5c9c6b2
feat(jans-cedarling): some notes on custom value length function
djellemah Jan 8, 2025
cac8c86
chore(jans-cedarling): fallout from merge with wasm branch
djellemah Jan 10, 2025
a611856
chore(jans-cedarling): in MemoryLogger remove unnecessary serde_json:…
djellemah Jan 10, 2025
0094bb1
chore(jans-cedarling): SparKV rectify expiry tests, and use nicer Dur…
djellemah Jan 13, 2025
bbbb1c5
chore(jans-cedarling): SparKV clean up some test cases
djellemah Jan 13, 2025
447d886
chore(jans-cedarling): SparKV add copyright to tests files
djellemah Jan 13, 2025
50684c8
chore(jans-cedarling): SparKV cargo fmt -p sparkv
djellemah Jan 15, 2025
5cad82a
chore(jans-cedarling): SparKV rectify expiry test
djellemah Jan 15, 2025
927ffc9
feat(jans-cedarling): cargo fmt on log/memory_logger.rs
djellemah Jan 15, 2025
edc088e
feat(jans-cedarling): use fallback logger on failure in MemoryLogger
djellemah Jan 15, 2025
dc81dc8
Merge branch 'main' into jans-cedarling-10554
djellemah Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions jans-cedarling/cedarling/src/log/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) trait LogWriter {
pub(crate) trait Loggable: serde::Serialize {
/// get unique request ID
fn get_request_id(&self) -> Uuid;

/// get log level for entity
/// not all log entities have log level, only when `log_kind` == `System`
fn get_log_level(&self) -> Option<LogLevel>;
Expand All @@ -34,13 +35,8 @@ pub(crate) trait Loggable: serde::Serialize {
// is used to avoid boilerplate code
fn can_log(&self, logger_level: LogLevel) -> bool {
if let Some(entry_log_level) = self.get_log_level() {
if entry_log_level < logger_level {
// entry log level lower than logger level
false
} else {
// entry log higher or equal than logger level
true
}
// higher level is more important, ie closer to fatal
logger_level <= entry_log_level
} else {
// if `.get_log_level` return None
// it means that `log_kind` != `System` and we should log it
Expand Down
2 changes: 1 addition & 1 deletion jans-cedarling/cedarling/src/log/log_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl Loggable for &DecisionLogEntry<'_> {
// TODO: maybe using wasm we can use `js_sys::Date::now()`
// Static variable initialize only once at start of program and available during all program live cycle.
// Import inside function guarantee that it is used only inside function.
fn gen_uuid7() -> Uuid {
pub fn gen_uuid7() -> Uuid {
use std::sync::{LazyLock, Mutex};
use uuid7::V7Generator;

Expand Down
117 changes: 94 additions & 23 deletions jans-cedarling/cedarling/src/log/memory_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ use super::interface::{LogStorage, LogWriter, Loggable};
use crate::bootstrap_config::log_config::MemoryLogConfig;

const STORAGE_MUTEX_EXPECT_MESSAGE: &str = "MemoryLogger storage mutex should unlock";
const STORAGE_JSON_PARSE_EXPECT_MESSAGE: &str =
"In MemoryLogger storage value should be valid LogEntry json string";

/// A logger that store logs in-memory.
pub(crate) struct MemoryLogger {
storage: Mutex<SparKV>,
storage: Mutex<SparKV<serde_json::Value>>,
log_level: LogLevel,
}

Expand All @@ -40,6 +38,44 @@ impl MemoryLogger {
}
}

/// In case of failure in MemoryLogger, log to stderr where supported.
/// On WASM, stderr is not supported, so log to whatever the wasm logger uses.
mod fallback {
use crate::LogLevel;

/// conform to Loggable requirement imposed by LogStrategy
#[derive(serde::Serialize)]
struct StrWrap<'a>(&'a str);

impl crate::log::interface::Loggable for StrWrap<'_> {
fn get_request_id(&self) -> uuid7::Uuid {
crate::log::log_entry::gen_uuid7()
}

fn get_log_level(&self) -> Option<LogLevel> {
// These must always be logged.
Some(LogLevel::TRACE)
}
}

/// Fetch the correct logger. That takes some work, and it's done on every
/// call. But this is a fallback logger, so it is not intended to be used
/// often, and in this case correctness and non-fallibility are far more
/// important than performance.
pub fn log(msg: &str) {
let log_config = crate::bootstrap_config::LogConfig{
log_type: crate::bootstrap_config::log_config::LogTypeConfig::StdOut,
// level is so that all messages passed here are logged.
log_level: LogLevel::TRACE,
};
// This should always be a LogStrategy::StdOut(StdOutLogger)
let log_strategy = crate::log::LogStrategy::new(&log_config);
use crate::log::interface::LogWriter;
// a string is always serializable
log_strategy.log_any(StrWrap(msg))
}
}

// Implementation of LogWriter
impl LogWriter for MemoryLogger {
fn log_any<T: Loggable>(&self, entry: T) {
Expand All @@ -48,43 +84,43 @@ impl LogWriter for MemoryLogger {
return;
}

let json_string = serde_json::json!(entry).to_string();
let json = match serde_json::to_value(&entry) {
olehbozhok marked this conversation as resolved.
Show resolved Hide resolved
Ok(json) => json,
Err(err) => {
fallback::log(&format!("could not serialize LogEntry to serde_json::Value: {err:?}"));
return;
},
};

let result = self
let set_result = self
.storage
.lock()
.expect(STORAGE_MUTEX_EXPECT_MESSAGE)
.set(entry.get_request_id().to_string().as_str(), &json_string);
.set(&entry.get_request_id().to_string(), json);

if let Err(err) = result {
// log error to stderr
eprintln!("could not store LogEntry to memory: {err:?}");
if let Err(err) = set_result {
fallback::log(&format!("could not store LogEntry to memory: {err:?}"));
};
}
}

// Implementation of LogStorage
impl LogStorage for MemoryLogger {
fn pop_logs(&self) -> Vec<serde_json::Value> {
// TODO: implement more efficient implementation

let mut storage_guard = self.storage.lock().expect(STORAGE_MUTEX_EXPECT_MESSAGE);

let keys = storage_guard.get_keys();

keys.iter()
.filter_map(|key| storage_guard.pop(key))
// we call unwrap, because we know that the value is valid json
.map(|str_json| serde_json::from_str::<serde_json::Value>(str_json.as_str())
.expect(STORAGE_JSON_PARSE_EXPECT_MESSAGE))
self.storage
.lock()
.expect(STORAGE_MUTEX_EXPECT_MESSAGE)
.drain()
.map(|(_k, value)| value)
.collect()
}

fn get_log_by_id(&self, id: &str) -> Option<serde_json::Value> {
self.storage.lock().expect(STORAGE_MUTEX_EXPECT_MESSAGE)
self.storage
.lock()
.expect(STORAGE_MUTEX_EXPECT_MESSAGE)
.get(id)
// we call unwrap, because we know that the value is valid json
.map(|str_json| serde_json::from_str::<serde_json::Value>(str_json.as_str()).expect(STORAGE_JSON_PARSE_EXPECT_MESSAGE))
.cloned()
}

fn get_log_ids(&self) -> Vec<String> {
Expand Down Expand Up @@ -211,4 +247,39 @@ mod tests {
"Logs were not fully popped"
);
}

#[test]
fn fallback_logger() {
struct FailSerialize;

impl serde::Serialize for FailSerialize {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
Err(serde::ser::Error::custom("this always fails"))
}
}

impl crate::log::interface::Loggable for FailSerialize {
fn get_request_id(&self) -> uuid7::Uuid {
crate::log::log_entry::gen_uuid7()
}

fn get_log_level(&self) -> Option<LogLevel> {
// These must always be logged.
Some(LogLevel::TRACE)
}
}

let logger = create_memory_logger();
logger.log_any(FailSerialize);

// There isn't a good way, in unit tests, to verify the output was
// actually written to stderr/json console.
//
// To eyeball-verify it:
// cargo test -- --nocapture fall
// and look in the output for
// "could not serialize LogEntry to serde_json::Value: Error(\"this always fails\", line: 0, column: 0)"
assert!(logger.pop_logs().is_empty(), "logger should be empty");
}
}
3 changes: 3 additions & 0 deletions jans-cedarling/sparkv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ homepage = "https://crates.io/crates/sparkv"
[dependencies]
thiserror = { workspace = true }
chrono = { workspace = true }

[dev-dependencies]
serde_json = "*"
2 changes: 1 addition & 1 deletion jans-cedarling/sparkv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ sparkv.set("your-key", "your-value"); // write
let value = sparkv.get("your-key").unwrap(); // read

// Write with unique TTL
sparkv.set_with_ttl("diff-ttl", "your-value", chrono::Duration::new(60, 0));
sparkv.set_with_ttl("diff-ttl", "your-value", chrono::Duration::seconds(60));
```

See `config.rs` for more configuration options.
Expand Down
14 changes: 4 additions & 10 deletions jans-cedarling/sparkv/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ impl Config {
Config {
max_items: 10_000,
max_item_size: 500_000,
max_ttl: Duration::new(60 * 60, 0).expect("a valid duration"),
default_ttl: Duration::new(5 * 60, 0).expect("a valid duration"), // 5 minutes
max_ttl: Duration::seconds(60 * 60),
default_ttl: Duration::seconds(5 * 60), // 5 minutes
auto_clear_expired: true,
}
}
Expand All @@ -43,14 +43,8 @@ mod tests {
let config: Config = Config::new();
assert_eq!(config.max_items, 10_000);
assert_eq!(config.max_item_size, 500_000);
assert_eq!(
config.max_ttl,
Duration::new(60 * 60, 0).expect("a valid duration")
);
assert_eq!(
config.default_ttl,
Duration::new(5 * 60, 0).expect("a valid duration")
);
assert_eq!(config.max_ttl, Duration::seconds(60 * 60));
assert_eq!(config.default_ttl, Duration::seconds(5 * 60));
assert!(config.auto_clear_expired);
}
}
24 changes: 10 additions & 14 deletions jans-cedarling/sparkv/src/expentry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ pub struct ExpEntry {
}

impl ExpEntry {
pub fn new(key: &str, expiration: Duration) -> Self {
pub fn new<S: AsRef<str>>(key: S, expiration: Duration) -> Self {
let expired_at: DateTime<Utc> = Utc::now() + expiration;
Self {
key: String::from(key),
key: key.as_ref().into(),
expired_at,
}
}

pub fn from_kv_entry(kv_entry: &KvEntry) -> Self {
pub fn from_kv_entry<T>(kv_entry: &KvEntry<T>) -> Self {
Self {
key: kv_entry.key.clone(),
expired_at: kv_entry.expired_at,
Expand Down Expand Up @@ -59,35 +59,31 @@ mod tests {

#[test]
fn test_new() {
let item = ExpEntry::new("key", Duration::new(10, 0).expect("a valid duration"));
let item = ExpEntry::new("key", Duration::seconds(10));
assert_eq!(item.key, "key");
assert!(item.expired_at > Utc::now() + Duration::new(9, 0).expect("a valid duration"));
assert!(item.expired_at <= Utc::now() + Duration::new(10, 0).expect("a valid duration"));
assert!(item.expired_at > Utc::now() + Duration::seconds(9));
assert!(item.expired_at <= Utc::now() + Duration::seconds(10));
}

#[test]
fn test_from_kventry() {
let kv_entry = KvEntry::new(
"keyFromKV",
"value from KV",
Duration::new(10, 0).expect("a valid duration"),
);
let kv_entry = KvEntry::new("keyFromKV", "value from KV", Duration::seconds(10));
let exp_item = ExpEntry::from_kv_entry(&kv_entry);
assert_eq!(exp_item.key, "keyFromKV");
assert_eq!(exp_item.expired_at, kv_entry.expired_at);
}

#[test]
fn test_cmp() {
let item_small = ExpEntry::new("k1", Duration::new(10, 0).expect("a valid duration"));
let item_big = ExpEntry::new("k2", Duration::new(8000, 0).expect("a valid duration"));
let item_small = ExpEntry::new("k1", Duration::seconds(10));
let item_big = ExpEntry::new("k2", Duration::seconds(8000));
assert!(item_small > item_big); // reverse order
assert!(item_big < item_small); // reverse order
}

#[test]
fn test_is_expired() {
let item = ExpEntry::new("k1", Duration::new(0, 100).expect("a valid duration"));
let item = ExpEntry::new("k1", Duration::seconds(0));
std::thread::sleep(std::time::Duration::from_nanos(200));
assert!(item.is_expired());
}
Expand Down
22 changes: 9 additions & 13 deletions jans-cedarling/sparkv/src/kventry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ use chrono::Duration;
use chrono::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KvEntry {
pub struct KvEntry<T> {
pub key: String,
pub value: String,
pub value: T,
pub expired_at: DateTime<Utc>,
}

impl KvEntry {
pub fn new(key: &str, value: &str, expiration: Duration) -> Self {
impl<T> KvEntry<T> {
pub fn new<S: AsRef<str>>(key: S, value: T, expiration: Duration) -> Self {
let expired_at: DateTime<Utc> = Utc::now() + expiration;
Self {
key: String::from(key),
value: String::from(value),
key: key.as_ref().into(),
value,
expired_at,
}
}
Expand All @@ -31,14 +31,10 @@ mod tests {

#[test]
fn test_new() {
let item = KvEntry::new(
"key",
"value",
Duration::new(10, 0).expect("a valid duration"),
);
let item = KvEntry::<String>::new("key", "value".into(), Duration::seconds(10));
assert_eq!(item.key, "key");
assert_eq!(item.value, "value");
assert!(item.expired_at > Utc::now() + Duration::new(9, 0).expect("a valid duration"));
assert!(item.expired_at <= Utc::now() + Duration::new(10, 0).expect("a valid duration"));
assert!(item.expired_at > Utc::now() + Duration::seconds(9));
assert!(item.expired_at <= Utc::now() + Duration::seconds(10));
}
}
Loading
Loading