Skip to content

Commit

Permalink
Move sorted serialize to tremor-value
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Oct 6, 2023
1 parent 82b0c07 commit a905e18
Show file tree
Hide file tree
Showing 16 changed files with 124 additions and 150 deletions.
4 changes: 2 additions & 2 deletions src/connectors/impls/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ use std::time::Duration;
use std::{fmt::Display, sync::atomic::AtomicBool};
use tokio::task;
use tremor_common::time::nanotime;
use tremor_script::utils::sorted_serialize;
use tremor_value::utils::sorted_serialize;
use tremor_value::value::StaticValue;
use value_trait::Mutable;

Expand Down Expand Up @@ -813,7 +813,7 @@ async fn handle_response(
} else {
return Err(Error::from(format!(
"Invalid Response from ES: No \"items\" or not an array: {}",
sorted_serialize(&response)?
String::from_utf8(sorted_serialize(&response)?)?
)));
}
// ack the event
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/otel/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ mod tests {
common::v1::{any_value, AnyValue, InstrumentationLibrary},
resource::v1::Resource,
};
use tremor_script::utils::sorted_serialize;
use tremor_value::utils::sorted_serialize;

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/otel/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ mod tests {
use tremor_otelapis::opentelemetry::proto::{
common::v1::InstrumentationLibrary, resource::v1::Resource,
};
use tremor_script::utils::sorted_serialize;
use tremor_value::utils::sorted_serialize;

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/otel/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ mod tests {
use tremor_otelapis::opentelemetry::proto::{
common::v1::InstrumentationLibrary, resource::v1::Resource,
};
use tremor_script::utils::sorted_serialize;
use tremor_value::utils::sorted_serialize;

use super::*;

Expand Down
6 changes: 4 additions & 2 deletions tests/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use serial_test::serial;
use tremor_runtime::errors::*;
use tremor_script::module::Manager;
use tremor_script::utils::*;
use tremor_value::utils::sorted_serialize;

fn to_pipe(query: String) -> Result<ExecutableGraph> {
let aggr_reg = tremor_script::aggr_registry();
Expand Down Expand Up @@ -85,9 +86,10 @@ macro_rules! test_cases {
assert_eq!(results.len(), out_json.len(), "Number of events differ error");
for (_, result) in results {
for value in result.value_iter() {
let serialized = sorted_serialize(value)?;
let serialized = String::from_utf8(sorted_serialize(value)?)?;
if let Some(expected) = out_json.pop() {
assert_eq!(serialized, sorted_serialize(&expected)?);
let expected = String::from_utf8(sorted_serialize(&expected)?)?;
assert_eq!(serialized, expected);
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions tests/query_runtime_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tremor_runtime::errors::*;
use tremor_script::highlighter::Dumb;
use tremor_script::module::Manager;
use tremor_script::utils::*;
use tremor_value::utils::sorted_serialize;

fn to_pipe(query: &str) -> Result<ExecutableGraph> {
let aggr_reg = tremor_script::aggr_registry();
Expand Down Expand Up @@ -110,9 +111,10 @@ macro_rules! test_cases {
assert_eq!(results.len(), out_json.len(), "Number of events differ error");
for (_, result) in results {
for value in result.value_iter() {
let serialized = sorted_serialize(value)?;
let serialized = String::from_utf8(sorted_serialize(value)?)?;
if let Some(expected) = out_json.pop() {
assert_eq!(serialized, sorted_serialize(&expected)?);
let expected = String::from_utf8(sorted_serialize(&expected)?)?;
assert_eq!(serialized, expected);
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions tests/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tremor_script::utils::*;
use tremor_script::{
highlighter::Dumb, module::Manager, AggrType, EventContext, Return, Script, FN_REGISTRY,
};
use tremor_value::utils::sorted_serialize;
use tremor_value::{Object, Value};

macro_rules! test_cases {
Expand Down Expand Up @@ -78,9 +79,11 @@ macro_rules! test_cases {
};
}
assert_eq!(results.len(), out_json.len());
for (i, value) in results.iter().enumerate() {
for (_, value) in results.iter().enumerate() {
let serialized = String::from_utf8(sorted_serialize(value)?)?;
if let Some(expected) = out_json.pop() {
assert_eq!(sorted_serialize(&value)?, sorted_serialize(&expected)?, "Input event #{} Expected `{}`, but got `{}`", i, sorted_serialize(&expected)?, sorted_serialize(&value)?);
let expected = String::from_utf8(sorted_serialize(&expected)?)?;
assert_eq!(serialized, expected);
}
}
Ok(())
Expand Down
64 changes: 3 additions & 61 deletions tremor-codec/src/codec/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,68 +23,10 @@
//! The codec can be configured with a mode, either `sorted` or `unsorted`. The default is `unsorted` as it is singnificantly faster, `sorted` json is only needed in testing situations where the key order in maps matters for compairson.
use crate::prelude::*;
use std::{cmp::max, io::Write, marker::PhantomData};
use std::{cmp::max, marker::PhantomData};
use tremor_value::utils::sorted_serialize;
use tremor_value::AlignedBuf;

// TODO: this is a copy from tremor script, it's not nice

/// Serialize a Value in a sorted fashion to allow equality comparing the result
///
/// # Errors
/// on IO errors
pub fn sorted_serialize(j: &Value) -> Result<String> {
// ballpark size of a 'sensible' message
let mut w = Vec::with_capacity(512);
sorted_serialize_(j, &mut w)?;
Ok(std::str::from_utf8(&w)?.to_string())
}

fn sorted_serialize_<'v, W: Write>(j: &Value<'v>, w: &mut W) -> Result<()> {
match j {
Value::Static(_) | Value::String(_) | Value::Bytes(_) => {
write!(w, "{}", j.encode())?;
}
Value::Array(a) => {
let mut iter = a.iter();
write!(w, "[")?;

if let Some(e) = iter.next() {
sorted_serialize_(e, w)?;
}

for e in iter {
write!(w, ",")?;
sorted_serialize_(e, w)?;
}
write!(w, "]")?;
}
Value::Object(o) => {
let mut v: Vec<(String, Value<'v>)> =
o.iter().map(|(k, v)| (k.to_string(), v.clone())).collect();

v.sort_by_key(|(k, _)| k.to_string());
let mut iter = v.into_iter();

write!(w, "{{")?;

if let Some((k, v)) = iter.next() {
sorted_serialize_(&Value::from(k), w)?;
write!(w, ":")?;
sorted_serialize_(&v, w)?;
}

for (k, v) in iter {
write!(w, ",")?;
sorted_serialize_(&Value::from(k), w)?;
write!(w, ":")?;
sorted_serialize_(&v, w)?;
}
write!(w, "}}")?;
}
}
Ok(())
}

/// Sorting for JSON
pub trait Sorting: Sync + Send + Copy + Clone + 'static {
/// Is this codec sorted
Expand Down Expand Up @@ -176,7 +118,7 @@ impl<S: Sorting> Codec for Json<S> {
}
async fn encode(&mut self, data: &Value, _meta: &Value) -> Result<Vec<u8>> {
if S::SORTED {
Ok(sorted_serialize(data)?.into_bytes())
Ok(sorted_serialize(data)?)
} else {
data.write(&mut self.data_buf)?;
let v = self.data_buf.clone();
Expand Down
16 changes: 8 additions & 8 deletions tremor-codec/src/codec/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ mod test {
"timestamp": 1_616_099_400_123_000_000_u64
});
assert_eq!(
crate::codec::json::sorted_serialize(&expected)?,
crate::codec::json::sorted_serialize(&decoded)?
tremor_value::utils::sorted_serialize(&expected)?,
tremor_value::utils::sorted_serialize(&decoded)?
);
Ok(())
}
Expand All @@ -499,8 +499,8 @@ mod test {
"protocol": "RFC3164",
});
assert_eq!(
crate::codec::json::sorted_serialize(&expected)?,
crate::codec::json::sorted_serialize(&decoded)?
tremor_value::utils::sorted_serialize(&expected)?,
tremor_value::utils::sorted_serialize(&decoded)?
);
Ok(())
}
Expand Down Expand Up @@ -634,8 +634,8 @@ mod test {
"timestamp": 1_616_099_400_123_000_000_u64
});
assert_eq!(
crate::codec::json::sorted_serialize(&expected)?,
crate::codec::json::sorted_serialize(&decoded)?
tremor_value::utils::sorted_serialize(&expected)?,
tremor_value::utils::sorted_serialize(&decoded)?
);
Ok(())
}
Expand Down Expand Up @@ -664,8 +664,8 @@ mod test {
"timestamp": timestamp.timestamp_nanos_opt().unwrap_or_default()
});
assert_eq!(
crate::codec::json::sorted_serialize(&expected)?,
crate::codec::json::sorted_serialize(&decoded)?
tremor_value::utils::sorted_serialize(&expected)?,
tremor_value::utils::sorted_serialize(&decoded)?
);
Ok(())
}
Expand Down
8 changes: 3 additions & 5 deletions tremor-pipeline/src/op/trickle/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@
mod test;

use super::window::{self, Group, Window};
use crate::op::prelude::trickle::window::{GroupWindow, SelectCtx, Trait};
use crate::op::prelude::*;
use crate::op::trickle::window::{GroupWindow, SelectCtx, Trait};
use crate::{errors::Result, SignalKind};
use crate::{Event, Operator};
use halfbrown::Entry;
use tremor_common::stry;

use tremor_script::{
self,
ast::{self, ImutExpr, RunConsts, SelectStmt},
errors::{err_generic, Result as TSResult},
interpreter::{Env, LocalStack},
prelude::*,
utils::sorted_serialize,
NO_AGGRS,
};
use tremor_value::Value;
use tremor_value::{utils::sorted_serialize, Value};

#[derive(Debug)]
pub(crate) struct Select {
Expand Down Expand Up @@ -252,7 +250,7 @@ impl Operator for Select {
// with the `each` grouping an event could be in more then one group, so we
// iterate over all groups we found
for group_value in group_values {
let group_str = stry!(sorted_serialize(&group_value));
let group_str = String::from_utf8(sorted_serialize(&group_value)?)?;

ctx.cardinality = groups.len();

Expand Down
9 changes: 6 additions & 3 deletions tremor-pipeline/src/op/trickle/select/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ fn select_single_win_with_script_on_signal() -> Result<()> {
eis = select.on_event(uid, &Port::In, &mut state, event)?;
assert!(eis.insights.is_empty());
assert_eq!(1, eis.events.len());
assert_eq!("1", sorted_serialize(eis.events[0].1.data.parts().0)?);
assert_eq!(&b"1"[..], sorted_serialize(eis.events[0].1.data.parts().0)?);
Ok(())
}

Expand Down Expand Up @@ -356,7 +356,7 @@ fn select_single_win_on_signal() -> Result<()> {
event_id.get_min_by_stream(uid.id(), 0)
);
assert_eq!(
r#"[{"g":"group"}]"#,
br#"[{"g":"group"}]"#[..],
sorted_serialize(eis.events[0].1.data.parts().0)?
);
assert!(!eis.events[0].1.transactional);
Expand Down Expand Up @@ -418,7 +418,10 @@ fn select_multiple_wins_on_signal() -> Result<()> {
assert!(eis.insights.is_empty());
assert_eq!(1, eis.events.len());
let (_port, event) = eis.events.remove(0);
assert_eq!(r#"[{"cat":42}]"#, sorted_serialize(event.data.parts().0)?);
assert_eq!(
br#"[{"cat":42}]"#[..],
sorted_serialize(event.data.parts().0)?
);
assert!(
event.id.is_tracking(&event_id1),
"Select result event {:?} is not tracking input event {:?}",
Expand Down
6 changes: 4 additions & 2 deletions tremor-script/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ impl ErrorKind {
CantSetWindowConst, CodecError, Common, CyclicUse, DecreasingRange,
DeployArtefactNotDefined, DeployRequiredArgDoesNotResolve, DivisionByZero, DoubleConst,
DoublePipelineCreate, DoubleStream, EmptyInterpolation, EmptyScript, ExtraToken,
Generic, Grok, InvalidAssign, InvalidBinary, InvalidBitshift, InvalidConst,
InvalidDefinitionalWithParam, InvalidDrop, InvalidEmit, InvalidExtractor,
FromUtf8Error, Generic, Grok, InvalidAssign, InvalidBinary, InvalidBitshift,
InvalidConst, InvalidDefinitionalWithParam, InvalidDrop, InvalidEmit, InvalidExtractor,
InvalidFloatLiteral, InvalidFn, InvalidHexLiteral, InvalidIntLiteral, InvalidPP,
InvalidRecur, InvalidToken, InvalidUnary, InvalidUtf8Sequence, Io, JsonError,
MergeTypeConflict, MissingEffectors, MissingFunction, MissingModule, ModuleNotFound,
Expand Down Expand Up @@ -351,6 +351,7 @@ impl ErrorKind {
| ParserError(_)
| Self::__Nonexhaustive { .. }
| Utf8Error(_)
| FromUtf8Error(_)
| ValueError(_) => (Some(Span::yolo()), None),
}
}
Expand Down Expand Up @@ -559,6 +560,7 @@ error_chain! {
ValueError(tremor_value::Error);
ParseIntError(num::ParseIntError);
Utf8Error(std::str::Utf8Error);
FromUtf8Error(std::string::FromUtf8Error);
NoObjectError(tremor_value::KnownKeyError);
AccessError(value_trait::AccessError);
CodecError(tremor_codec::Error);
Expand Down
4 changes: 3 additions & 1 deletion tremor-script/src/std_lib/chash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

use crate::prelude::*;
use crate::registry::Registry;
use crate::{tremor_const_fn, utils::sorted_serialize};
use crate::tremor_const_fn;
use tremor_value::utils::sorted_serialize;

pub fn load(registry: &mut Registry) {
registry.insert(
Expand All @@ -40,6 +41,7 @@ pub fn load(registry: &mut Registry) {
).insert(
tremor_const_fn!(chash|sorted_serialize(_context, _data) {
let ser = sorted_serialize(_data).map_err(|e| FunctionError::RuntimeError{mfa: this_mfa(), error: format!("Failed to serialize: {e}")})?;
let ser = String::from_utf8(ser).map_err(|e| FunctionError::RuntimeError{mfa: this_mfa(), error: format!("Failed to serialize: {e}")})?;
Ok(Value::from(ser))
}),
);
Expand Down
Loading

0 comments on commit a905e18

Please sign in to comment.