Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Oct 9, 2023
1 parent a905e18 commit aad1454
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 3 deletions.
47 changes: 45 additions & 2 deletions src/preprocessor/kafka_schema_registry_prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,61 @@ impl Preprocessor for SchemaRegistryPrefix {
) -> Result<Vec<(Vec<u8>, Value<'static>)>> {
use std::io::Cursor;
if let Some(d) = data.get(8..) {
let magic = Cursor::new(data).read_u32::<BigEndian>()?;
let mut c = Cursor::new(data);
let magic = c.read_u32::<BigEndian>()?;
if magic != 0 {
return Err(format!(
"Invalid magic bytes (0x00000000) for kafka wire format: {magic}"
)
.into());
}
let schema = Cursor::new(data).read_u32::<BigEndian>()?;
let schema = c.read_u32::<BigEndian>()?;
meta.insert("schema_id", schema)?;
Ok(vec![(d.to_vec(), meta)])
} else {
Err("Kafka schema registry Preprocessor: < 8 byte".into())
}
}
}

#[cfg(test)]
mod test {
use super::*;
use value_trait::ValueAccess;

/// Tests if the preprocessor errors on data that's less then 8 bytes
#[test]
fn test_preprocessor_less_then_8_bytes() {
let mut pp = SchemaRegistryPrefix::default();
let mut ingest_ns = 0;
let data = vec![0, 0, 0, 0, 0, 0, 0];
let meta = Value::object();
let res = pp.process(&mut ingest_ns, &data, meta);
assert!(res.is_err());
}

/// Tests if `schema_id` is added to the meta data properly
#[test]
fn test_preprocessor_schema_id() -> Result<()> {
let mut pp = SchemaRegistryPrefix::default();
let mut ingest_ns = 0;
let data = vec![0, 0, 0, 0, 0, 0, 0, 1, 42];
let meta = Value::object();
let mut res = pp.process(&mut ingest_ns, &data, meta)?;
let (rest, meta) = res.pop().expect("no result");
assert_eq!(meta.get_u8("schema_id"), Some(1));
assert_eq!(rest, vec![42]);
Ok(())
}

/// Tests if the preprocessor errors on invalid magic bytes
#[test]
fn test_preprocessor_invalid_magic_bytes() {
let mut pp = SchemaRegistryPrefix::default();
let mut ingest_ns = 0;
let data = vec![0, 0, 0, 1, 0, 0, 0, 1];
let meta = Value::object();
let res = pp.process(&mut ingest_ns, &data, meta);
assert!(res.is_err());
}
}
31 changes: 31 additions & 0 deletions tremor-codec/src/codec/binflux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,35 @@ mod test {
"Invalid BInflux Line Protocol data: Unknown type as influx line value: Array"
);
}

#[tokio::test(flavor = "multi_thread")]
async fn encode_decode() {
let mut c = BInflux::default();

let value = literal!({
"measurement": "m",
"tags": {
"t1": "v1",
"t2": "v2"
},
"fields": {
"f1": 42,
"f2": 42.0,
"f3": true,
"f4": false,
"f5": "snot"
},
"timestamp": 42
});
let mut e = c
.encode(&value, &Value::const_null())
.await

Check warning on line 288 in tremor-codec/src/codec/binflux.rs

View check run for this annotation

Codecov / codecov/patch

tremor-codec/src/codec/binflux.rs#L288

Added line #L288 was not covered by tests
.expect("encode");
let (d, _meta) = c
.decode(e.as_mut_slice(), 42, Value::const_null())
.await

Check warning on line 292 in tremor-codec/src/codec/binflux.rs

View check run for this annotation

Codecov / codecov/patch

tremor-codec/src/codec/binflux.rs#L292

Added line #L292 was not covered by tests
.expect("decode")
.expect("decode");
assert_eq!(value, d);
}
}
2 changes: 1 addition & 1 deletion tremor-script/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ impl<'script> StringLit<'script> {
} else if let Some(_f) = r.as_f64() {
"42".to_string()
} else {
crate::utils::sorted_serialize(&r)?
String::from_utf8(tremor_value::utils::sorted_serialize(&r)?)?
}
}
};
Expand Down

0 comments on commit aad1454

Please sign in to comment.