From 8cb3565a963307f3189429c617ad0ee1610e32a1 Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Mon, 9 Oct 2023 10:43:11 +0200 Subject: [PATCH] Tests Signed-off-by: Heinz N. Gies --- .../kafka_schema_registry_prefix.rs | 47 ++++++++++++++++++- tremor-codec/src/codec/binflux.rs | 31 ++++++++++++ tremor-script/src/ast.rs | 6 ++- 3 files changed, 80 insertions(+), 4 deletions(-) diff --git a/src/preprocessor/kafka_schema_registry_prefix.rs b/src/preprocessor/kafka_schema_registry_prefix.rs index afce5bd824..906c50f30f 100644 --- a/src/preprocessor/kafka_schema_registry_prefix.rs +++ b/src/preprocessor/kafka_schema_registry_prefix.rs @@ -33,14 +33,15 @@ impl Preprocessor for SchemaRegistryPrefix { ) -> Result, Value<'static>)>> { use std::io::Cursor; if let Some(d) = data.get(8..) { - let magic = Cursor::new(data).read_u32::()?; + let mut c = Cursor::new(data); + let magic = c.read_u32::()?; if magic != 0 { return Err(format!( "Invalid magic bytes (0x00000000) for kafka wire format: {magic}" ) .into()); } - let schema = Cursor::new(data).read_u32::()?; + let schema = c.read_u32::()?; meta.insert("schema_id", schema)?; Ok(vec![(d.to_vec(), meta)]) } else { @@ -48,3 +49,45 @@ impl Preprocessor for SchemaRegistryPrefix { } } } + +#[cfg(test)] +mod test { + use super::*; + use value_trait::ValueAccess; + + /// Tests if the preprocessor errors on data that's less then 8 bytes + #[test] + fn test_preprocessor_less_then_8_bytes() { + let mut pp = SchemaRegistryPrefix::default(); + let mut ingest_ns = 0; + let data = vec![0, 0, 0, 0, 0, 0, 0]; + let meta = Value::object(); + let res = pp.process(&mut ingest_ns, &data, meta); + assert!(res.is_err()); + } + + /// Tests if `schema_id` is added to the meta data properly + #[test] + fn test_preprocessor_schema_id() -> Result<()> { + let mut pp = SchemaRegistryPrefix::default(); + let mut ingest_ns = 0; + let data = vec![0, 0, 0, 0, 0, 0, 0, 1, 42]; + let meta = Value::object(); + let mut res = pp.process(&mut ingest_ns, &data, meta)?; + let (rest, meta) = res.pop().expect("no result"); + assert_eq!(meta.get_u8("schema_id"), Some(1)); + assert_eq!(rest, vec![42]); + Ok(()) + } + + /// Tests if the preprocessor errors on invalid magic bytes + #[test] + fn test_preprocessor_invalid_magic_bytes() { + let mut pp = SchemaRegistryPrefix::default(); + let mut ingest_ns = 0; + let data = vec![0, 0, 0, 1, 0, 0, 0, 1]; + let meta = Value::object(); + let res = pp.process(&mut ingest_ns, &data, meta); + assert!(res.is_err()); + } +} diff --git a/tremor-codec/src/codec/binflux.rs b/tremor-codec/src/codec/binflux.rs index 0f70be89a7..0047962cef 100644 --- a/tremor-codec/src/codec/binflux.rs +++ b/tremor-codec/src/codec/binflux.rs @@ -263,4 +263,35 @@ mod test { "Invalid BInflux Line Protocol data: Unknown type as influx line value: Array" ); } + + #[tokio::test(flavor = "multi_thread")] + async fn encode_decode() { + let mut c = BInflux::default(); + + let value = literal!({ + "measurement": "m", + "tags": { + "t1": "v1", + "t2": "v2" + }, + "fields": { + "f1": 42, + "f2": 42.0, + "f3": true, + "f4": false, + "f5": "snot" + }, + "timestamp": 42 + }); + let mut e = c + .encode(&value, &Value::const_null()) + .await + .expect("encode"); + let (d, _meta) = c + .decode(e.as_mut_slice(), 42, Value::const_null()) + .await + .expect("decode") + .expect("decode"); + assert_eq!(value, d); + } } diff --git a/tremor-script/src/ast.rs b/tremor-script/src/ast.rs index 361bf85c8a..0e76ee08f8 100644 --- a/tremor-script/src/ast.rs +++ b/tremor-script/src/ast.rs @@ -860,7 +860,7 @@ impl<'script> StringLit<'script> { } else if let Some(_f) = r.as_f64() { "42".to_string() } else { - crate::utils::sorted_serialize(&r)? + String::from_utf8(tremor_value::utils::sorted_serialize(&r)?)? } } }; @@ -888,7 +888,9 @@ impl<'script> StringLit<'script> { } else if let Some(_f) = r.as_f64() { res.push_str("42"); } else { - res.push_str(&crate::utils::sorted_serialize(&r)?); + res.push_str(&String::from_utf8( + tremor_value::utils::sorted_serialize(&r)?, + )??); } } };