diff --git a/kuksa_apps/node-red/README.md b/kuksa_apps/node-red/README.md index e6f29cb08..8e490f7ac 100644 --- a/kuksa_apps/node-red/README.md +++ b/kuksa_apps/node-red/README.md @@ -41,7 +41,7 @@ By default, the mqtt flows will be configured in node-red. You can also use the Now you can view the example under [http://localhost:1880](http://localhost:1880/). - + To test the example, you can use [Kuksa Client](../../kuksa-client) or use the [gps feeder](https://github.com/eclipse/kuksa.val.feeders/tree/main/gps2val). In [`feeders.yml`](./feeders.yml), you can find the experimental [config](kuksa_config/gpsd_feeder.ini) for gps feeder container. You use the following command to also start containers of feeders: @@ -61,3 +61,7 @@ docker-compose -f docker-compose.yml -f feeders.yml up - [websocket-advanced.json](./websocket-advanced.json) implements a test client and uses secure connection with server ![screenshot](./node-red-screenshot.png) + +*Note*: Websocket node-red configs are using url **wss://127.0.0.1:8090** if docker-compose used for running demo **127.0.0.1** is not +available by docker container and in the flow dashboard state will be "disconnected". Compose is creating network between containers so check ip address of +kuksa-val container and change websocket node accordingly. diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index 37b9b23d1..88a114be4 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -27,6 +27,8 @@ use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; use std::time::SystemTime; +use crate::query::{CompiledQuery, ExecutionInput}; +use crate::types::ExecutionInputImplData; use tracing::{debug, info, warn}; use crate::glob; @@ -75,6 +77,7 @@ pub struct Datapoint { #[derive(Debug, Clone)] pub struct Entry { pub datapoint: Datapoint, + pub lag_datapoint: Datapoint, pub actuator_target: Option, pub metadata: Metadata, } @@ -556,9 +559,14 @@ impl Entry { } } + pub fn apply_lag_after_execute(&mut self) { + self.lag_datapoint = self.datapoint.clone(); + } + pub fn apply(&mut self, update: EntryUpdate) -> HashSet { let mut changed = HashSet::new(); if let Some(datapoint) = update.datapoint { + self.lag_datapoint = self.datapoint.clone(); self.datapoint = datapoint; changed.insert(Field::Datapoint); } @@ -598,11 +606,19 @@ impl Subscriptions { &self, changed: Option<&HashMap>>, db: &Database, - ) -> Result<(), NotificationError> { + ) -> Result>, NotificationError> { let mut error = None; + let mut lag_updates: HashMap = HashMap::new(); for sub in &self.query_subscriptions { match sub.notify(changed, db).await { - Ok(_) => {} + Ok(None) => {} + Ok(Some(input)) => { + for x in input.get_fields() { + if x.1.lag_value != x.1.value && !lag_updates.contains_key(x.0) { + lag_updates.insert(x.0.clone(), ()); + } + } + } Err(err) => error = Some(err), } } @@ -616,7 +632,13 @@ impl Subscriptions { match error { Some(err) => Err(err), - None => Ok(()), + None => { + if !lag_updates.is_empty() { + Ok(Some(lag_updates)) + } else { + Ok(None) + } + } } } @@ -757,45 +779,94 @@ impl ChangeSubscription { } impl QuerySubscription { - fn generate_input( + fn find_in_db_and_add( &self, - changed: Option<&HashMap>>, + name: &String, db: &DatabaseReadAccess, - ) -> Option { - let id_used_in_query = { - let mut query_uses_id = false; - match changed { - Some(changed) => { - for (id, fields) in changed { - if let Some(metadata) = db.get_metadata_by_id(*id) { - if self.query.input_spec.contains(&metadata.path) - && fields.contains(&Field::Datapoint) - { - query_uses_id = true; - break; + input: &mut query::ExecutionInputImpl, + ) { + match db.get_entry_by_path(name) { + Ok(entry) => { + input.add( + name.to_owned(), + ExecutionInputImplData { + value: entry.datapoint.value.to_owned(), + lag_value: entry.lag_datapoint.value.to_owned(), + }, + ); + } + Err(_) => { + // TODO: This should probably generate an error + input.add( + name.to_owned(), + ExecutionInputImplData { + value: DataValue::NotAvailable, + lag_value: DataValue::NotAvailable, + }, + ) + } + } + } + fn check_if_changes_match( + query: &CompiledQuery, + changed_origin: Option<&HashMap>>, + db: &DatabaseReadAccess, + ) -> bool { + match changed_origin { + Some(changed) => { + for (id, fields) in changed { + if let Some(metadata) = db.get_metadata_by_id(*id) { + if query.input_spec.contains(&metadata.path) + && fields.contains(&Field::Datapoint) + { + return true; + } + if !query.subquery.is_empty() { + for sub in query.subquery.iter() { + if QuerySubscription::check_if_changes_match( + sub, + changed_origin, + db, + ) { + return true; + } } } } } - None => { - // Always generate input if `changed` is None - query_uses_id = true; - } } - query_uses_id - }; + None => { + // Always generate input if `changed` is None + return true; + } + } + false + } + fn generate_input_list( + &self, + query: &CompiledQuery, + db: &DatabaseReadAccess, + input: &mut query::ExecutionInputImpl, + ) { + for name in query.input_spec.iter() { + self.find_in_db_and_add(name, db, input); + } + if !query.subquery.is_empty() { + for sub in query.subquery.iter() { + self.generate_input_list(sub, db, input) + } + } + } + fn generate_input( + &self, + changed: Option<&HashMap>>, + db: &DatabaseReadAccess, + ) -> Option { + let id_used_in_query = QuerySubscription::check_if_changes_match(&self.query, changed, db); if id_used_in_query { let mut input = query::ExecutionInputImpl::new(); - for name in self.query.input_spec.iter() { - match db.get_entry_by_path(name) { - Ok(entry) => input.add(name.to_owned(), entry.datapoint.value.to_owned()), - Err(_) => { - // TODO: This should probably generate an error - input.add(name.to_owned(), DataValue::NotAvailable) - } - } - } + self.generate_input_list(&self.query, db, &mut input); Some(input) } else { None @@ -806,8 +877,9 @@ impl QuerySubscription { &self, changed: Option<&HashMap>>, db: &Database, - ) -> Result<(), NotificationError> { + ) -> Result, NotificationError> { let db_read = db.authorized_read_access(&self.permissions); + match self.generate_input(changed, &db_read) { Some(input) => // Execute query (if anything queued) @@ -827,19 +899,19 @@ impl QuerySubscription { }) .await { - Ok(()) => Ok(()), + Ok(()) => Ok(Some(input)), Err(_) => Err(NotificationError {}), }, - None => Ok(()), + None => Ok(None), }, Err(e) => { // TODO: send error to subscriber debug!("{:?}", e); - Ok(()) // no cleanup needed + Ok(None) // no cleanup needed } } } - None => Ok(()), + None => Ok(None), } } } @@ -963,6 +1035,19 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> { } } + pub fn update_entry_lag_to_be_equal(&mut self, path: &str) -> Result<(), UpdateError> { + match self.db.path_to_id.get(path) { + Some(id) => match self.db.entries.get_mut(id) { + Some(entry) => { + entry.apply_lag_after_execute(); + Ok(()) + } + None => Err(UpdateError::NotFound), + }, + None => Err(UpdateError::NotFound), + } + } + pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result, UpdateError> { match self.db.entries.get_mut(&id) { Some(entry) => { @@ -1056,7 +1141,14 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> { description, allowed, }, - datapoint: match datapoint { + datapoint: match datapoint.clone() { + Some(datapoint) => datapoint, + None => Datapoint { + ts: SystemTime::now(), + value: DataValue::NotAvailable, + }, + }, + lag_datapoint: match datapoint { Some(datapoint) => datapoint, None => Datapoint { ts: SystemTime::now(), @@ -1277,6 +1369,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let mut errors = Vec::new(); let mut db = self.broker.database.write().await; let mut db_write = db.authorized_write_access(self.permissions); + let mut lag_updates: HashMap = HashMap::new(); let cleanup_needed = { let changed = { @@ -1310,11 +1403,23 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .notify(Some(&changed), &db) .await { - Ok(()) => false, + Ok(None) => false, + Ok(Some(lag_updates_)) => { + lag_updates = lag_updates_.clone(); + false + } Err(_) => true, // Cleanup needed } }; + if !lag_updates.is_empty() { + let mut db = self.broker.database.write().await; + let mut db_write = db.authorized_write_access(self.permissions); + for x in lag_updates { + if db_write.update_entry_lag_to_be_equal(x.0.as_str()).is_ok() {} + } + } + // Cleanup closed subscriptions if cleanup_needed { self.broker.subscriptions.write().await.cleanup(); @@ -1367,6 +1472,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { ) -> Result, QueryError> { let db_read = self.broker.database.read().await; let db_read_access = db_read.authorized_read_access(self.permissions); + let compiled_query = query::compile(query, &db_read_access); match compiled_query { diff --git a/kuksa_databroker/databroker/src/query/compiler.rs b/kuksa_databroker/databroker/src/query/compiler.rs index 9712c15f9..dc41eff35 100644 --- a/kuksa_databroker/databroker/src/query/compiler.rs +++ b/kuksa_databroker/databroker/src/query/compiler.rs @@ -65,6 +65,7 @@ impl CompilationInput for CompilationInputImpl { } } +#[derive(Debug)] pub struct CompiledQuery { /// A single expression (tree) evaluating to either /// true or false. @@ -79,8 +80,11 @@ pub struct CompiledQuery { /// or as part of a condition. /// /// These needs to be provided in the `input` when - /// executing the query. + /// executing the query. pub input_spec: HashSet, // Needed datapoints (values) for execution + + /// Vector of subquery in SELECT query + pub subquery: Vec, } impl CompiledQuery { @@ -89,6 +93,7 @@ impl CompiledQuery { selection: None, projection: Vec::new(), input_spec: HashSet::new(), + subquery: Vec::new(), } } } @@ -132,6 +137,7 @@ pub fn compile_expr( Ok(Expr::Datapoint { name: name.clone(), data_type, + lag: false, }) } @@ -145,8 +151,44 @@ pub fn compile_expr( Ok(Expr::Datapoint { name: field, data_type, + lag: false, }) } + ast::Expr::Function(f) => { + let name = &f.name.to_string(); + if name == "LAG" { + let args = &f.args[0]; + match args { + ast::FunctionArg::Unnamed(e) => match e { + ast::FunctionArgExpr::Expr(e) => { + let function_expr = compile_expr(e, input, output)?; + match function_expr { + Expr::Datapoint { + name, data_type, .. + } => Ok(Expr::Datapoint { + name, + data_type, + lag: true, + }), + _ => Err(CompilationError::ParseError( + "Unable to create lag datapoint".to_string(), + )), + } + } + _ => Err(CompilationError::UnsupportedOperator( + "Unsupported function argument expression".to_string(), + )), + }, + _ => Err(CompilationError::UnsupportedOperator( + "Unsupported function argument".to_string(), + )), + } + } else { + Err(CompilationError::UnsupportedOperator(format!( + "Unsupported operator \"{name}\"" + ))) + } + } ast::Expr::BinaryOp { ref left, @@ -298,6 +340,32 @@ pub fn compile_expr( } ast::Expr::Nested(e) => compile_expr(e, input, output), + ast::Expr::Subquery(q) => { + let select_statement = match &q.body { + sqlparser::ast::SetExpr::Select(query) => Some(query.clone()), + _ => None, + }; + if let Some(select) = select_statement { + let compiled_query = compile_select_statement(&select, input); + match compiled_query { + Ok(compiled_query) => { + output.subquery.push(compiled_query); + Ok(Expr::Subquery { + index: (output.subquery.len() - 1) as u32, + }) + } + + _ => Err(CompilationError::UnsupportedOperator( + "Subquery failed to compile query".to_string(), + )), + } + } else { + Err(CompilationError::UnsupportedOperation( + "Subquery to parse".to_string(), + )) + } + } + ast::Expr::UnaryOp { ref op, ref expr } => match op { ast::UnaryOperator::Not => Ok(Expr::UnaryOperation { expr: Box::new(compile_expr(expr, input, output)?), @@ -404,6 +472,54 @@ fn resolve_literal( } } +fn compile_select_statement( + select: &ast::Select, + input: &impl CompilationInput, +) -> Result { + let mut query = CompiledQuery::new(); + + match &select.selection { + None => {} + Some(expr) => { + let condition = compile_expr(expr, input, &mut query)?; + if let Ok(data_type) = condition.get_type() { + if data_type != DataType::Bool { + return Err(CompilationError::TypeError( + "WHERE statement doesn't evaluate to a boolean expression".to_string(), + )); + } + } + + query.selection = Some(condition); + } + }; + + for c in &select.projection { + match c { + ast::SelectItem::UnnamedExpr(expr) => { + let expr = compile_expr(expr, input, &mut query)?; + query.projection.push(expr); + } + ast::SelectItem::ExprWithAlias { expr, alias } => { + let expr = compile_expr(expr, input, &mut query)?; + + let name = alias.value.clone(); + query.projection.push(Expr::Alias { + expr: Box::new(expr), + alias: name, + }); + } + _ => { + return Err(CompilationError::UnsupportedOperation( + "unrecognized entry in SELECT statement".to_string(), + )) + } + } + } + + Ok(query) +} + pub fn compile( sql: &str, input: &impl CompilationInput, @@ -419,50 +535,8 @@ pub fn compile( }, _ => None, }; - if let Some(select) = select_statement { - let mut query = CompiledQuery::new(); - - match &select.selection { - None => {} - Some(expr) => { - let condition = compile_expr(expr, input, &mut query)?; - if let Ok(data_type) = condition.get_type() { - if data_type != DataType::Bool { - return Err(CompilationError::TypeError( - "WHERE statement doesn't evaluate to a boolean expression" - .to_string(), - )); - } - } - - query.selection = Some(condition); - } - }; - - for c in &select.projection { - match c { - ast::SelectItem::UnnamedExpr(expr) => { - let expr = compile_expr(expr, input, &mut query)?; - query.projection.push(expr); - } - ast::SelectItem::ExprWithAlias { expr, alias } => { - let expr = compile_expr(expr, input, &mut query)?; - let name = alias.value.clone(); - query.projection.push(Expr::Alias { - expr: Box::new(expr), - alias: name, - }); - } - _ => { - return Err(CompilationError::UnsupportedOperation( - "unrecognized entry in SELECT statement".to_string(), - )) - } - } - } - - Ok(query) + compile_select_statement(&select, input) } else { Err(CompilationError::UnsupportedOperation( "unrecognized SELECT statement".to_string(), diff --git a/kuksa_databroker/databroker/src/query/executor.rs b/kuksa_databroker/databroker/src/query/executor.rs index 013fea6a6..997f143a5 100644 --- a/kuksa_databroker/databroker/src/query/executor.rs +++ b/kuksa_databroker/databroker/src/query/executor.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use super::compiler::CompiledQuery; use super::expr::*; -use crate::types::DataValue; +use crate::types::{DataValue, ExecutionInputImplData}; #[derive(Debug)] pub enum ExecutionError { @@ -27,20 +27,22 @@ pub enum ExecutionError { #[derive(Debug)] pub struct ExecutionInputImpl { - fields: HashMap, + fields: HashMap, } pub trait ExecutionInput { - fn lookup(&self, field: &str) -> DataValue; + fn lookup(&self, field: &str) -> &ExecutionInputImplData; + + fn get_fields(&self) -> &HashMap; } impl CompiledQuery { - pub fn execute( - &self, + fn execute_internal( + query: &CompiledQuery, input: &impl ExecutionInput, ) -> Result>, ExecutionError> { // Check condition - let condition_fulfilled = match &self.selection { + let condition_fulfilled = match &query.selection { Some(condition) => match condition.execute(input) { Ok(DataValue::Bool(b)) => b, Ok(_) => { @@ -55,30 +57,129 @@ impl CompiledQuery { }; if condition_fulfilled { + struct NameAndData { + name: String, + data: Option>, + } let mut fields = Vec::new(); - for (index, e) in self.projection.iter().enumerate() { - let name = match e { - Expr::Datapoint { name, data_type: _ } => name.clone(), - Expr::Alias { alias, .. } => alias.clone(), - _ => format!("field_{index}"), + let mut is_subquery = false; + for (index, e) in query.projection.iter().enumerate() { + let expr_info = match e { + Expr::Datapoint { + name, data_type: _, .. + } => NameAndData { + name: name.clone(), + data: None, + }, + Expr::Alias { alias, expr } => { + match expr.as_ref() { + Expr::Subquery { index } => { + is_subquery = true; + match CompiledQuery::execute_internal( + &query.subquery[*index as usize], + input, + ) { + Ok(f) => match f { + None => NameAndData { + name: alias.clone(), + data: None, + }, + Some(vec) => NameAndData { + name: alias.clone(), + data: Some(vec), + }, + }, + Err(_) => { + // Don't be rude and just return None + NameAndData { + name: alias.clone(), + data: None, + } + } + } + } + _ => NameAndData { + name: alias.clone(), + data: None, + }, + } + } + Expr::Subquery { index } => { + is_subquery = true; + match CompiledQuery::execute_internal( + &query.subquery[*index as usize], + input, + ) { + Ok(f) => match f { + None => NameAndData { + name: format!("subquery_{index}"), + data: None, + }, + Some(vec) => NameAndData { + name: format!("subquery_{index}"), + data: Some(vec), + }, + }, + Err(_) => { + // Don't be rude and just return None + NameAndData { + name: format!("subquery_{index}"), + data: None, + } + } + } + } + _ => NameAndData { + name: format!("field_{index}"), + data: None, + }, }; - match e.execute(input) { - Ok(value) => fields.push((name, value)), - Err(e) => return Err(e), + + match expr_info.data { + None => match e.execute(input) { + Ok(value) => { + if !is_subquery { + fields.push((expr_info.name, value)) + } + } + Err(e) => return Err(e), + }, + Some(mut vec) => fields.append(&mut vec), } } - Ok(Some(fields)) + if !fields.is_empty() { + Ok(Some(fields)) + } else { + Ok(None) + } } else { // Successful execution, but condition wasn't met Ok(None) } } + pub fn execute( + &self, + input: &impl ExecutionInput, + ) -> Result>, ExecutionError> { + CompiledQuery::execute_internal(self, input) + } } impl Expr { pub fn execute(&self, input: &impl ExecutionInput) -> Result { match &self { - Expr::Datapoint { name, data_type: _ } => Ok(input.lookup(name)), + Expr::Datapoint { + name, + data_type: _, + lag, + } => { + let field = input.lookup(name); + if *lag { + Ok(field.lag_value.clone()) + } else { + Ok(field.value.clone()) + } + } Expr::Alias { expr, .. } => expr.execute(input), Expr::BinaryOperation { left, @@ -111,6 +212,7 @@ impl Expr { "Unresolved literal found while executing query".to_string(), )) } + Expr::Subquery { index } => Ok(DataValue::Uint32(*index)), } } } @@ -272,7 +374,7 @@ impl ExecutionInputImpl { } } - pub fn add(&mut self, name: String, value: DataValue) { + pub fn add(&mut self, name: String, value: ExecutionInputImplData) { self.fields.insert(name, value); } } @@ -284,12 +386,19 @@ impl Default for ExecutionInputImpl { } impl ExecutionInput for ExecutionInputImpl { - fn lookup(&self, field: &str) -> DataValue { + fn lookup(&self, field: &str) -> &ExecutionInputImplData { match self.fields.get(field) { - Some(value) => value.to_owned(), - None => DataValue::NotAvailable, + Some(value) => value, + None => &ExecutionInputImplData { + value: DataValue::NotAvailable, + lag_value: DataValue::NotAvailable, + }, } } + + fn get_fields(&self) -> &HashMap { + &self.fields + } } #[cfg(test)] @@ -299,28 +408,9 @@ use crate::query::compiler; #[cfg(test)] use crate::types::DataType; -#[cfg(test)] -struct TestExecutionInput { - seat_pos: i32, - datapoint1: i32, - datapoint2: bool, -} - #[cfg(test)] struct TestCompilationInput {} -#[cfg(test)] -impl ExecutionInput for TestExecutionInput { - fn lookup(&self, field: &str) -> DataValue { - match field { - "Vehicle.Cabin.Seat.Row1.Pos1.Position" => DataValue::Int32(self.seat_pos), - "Vehicle.Datapoint1" => DataValue::Int32(self.datapoint1), - "Vehicle.Datapoint2" => DataValue::Bool(self.datapoint2), - _ => DataValue::NotAvailable, - } - } -} - #[cfg(test)] impl CompilationInput for TestCompilationInput { fn get_datapoint_type(&self, field: &str) -> Result { @@ -335,6 +425,19 @@ impl CompilationInput for TestCompilationInput { } } +#[cfg(test)] +fn assert_expected(res: Option>, expected: &Vec<(String, DataValue)>) { + assert!(res.is_some()); + if let Some(fields) = &res { + assert_eq!(fields.len(), expected.len()); + for (i, (name, value)) in fields.iter().enumerate() { + assert_eq!(name, &expected[i].0); + assert_eq!(value, &expected[i].1); + println!("{name}: {value:?}") + } + } +} + #[test] fn executor_test() { let sql = " @@ -360,11 +463,28 @@ fn executor_test() { assert!(&compiled_query.input_spec.contains("Vehicle.Datapoint2")); println!("EXECUTE"); - let execution_input1 = TestExecutionInput { - seat_pos: 230, - datapoint1: 61, - datapoint2: true, - }; + let mut execution_input1 = ExecutionInputImpl::new(); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::NotAvailable, + }, + ); + execution_input1.add( + "Vehicle.Datapoint1".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(61), + lag_value: DataValue::NotAvailable, + }, + ); + execution_input1.add( + "Vehicle.Datapoint2".to_string(), + ExecutionInputImplData { + value: DataValue::Bool(true), + lag_value: DataValue::NotAvailable, + }, + ); let res = compiled_query.execute(&execution_input1).unwrap(); println!("RESULT: "); @@ -378,23 +498,150 @@ fn executor_test() { DataValue::NotAvailable, ), ]; - assert!(res.is_some()); - if let Some(fields) = &res { - assert_eq!(fields.len(), 2); - for (i, (name, value)) in fields.iter().enumerate() { - assert_eq!(name, &expected[i].0); - assert_eq!(value, &expected[i].1); - println!("{name}: {value:?}") + assert_expected(res, &expected); + + println!("EXECUTE"); + let mut execution_input1 = ExecutionInputImpl::new(); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::NotAvailable, + }, + ); + execution_input1.add( + "Vehicle.Datapoint1".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(40), + lag_value: DataValue::NotAvailable, + }, + ); + execution_input1.add( + "Vehicle.Datapoint2".to_string(), + ExecutionInputImplData { + value: DataValue::Bool(true), + lag_value: DataValue::NotAvailable, + }, + ); + let res = compiled_query.execute(&execution_input1).unwrap(); + + assert!(res.is_none()); +} + +#[test] +fn executor_lag_test() { + let sql = " + SELECT + Vehicle.Cabin.Seat.Row1.Pos1.Position, + LAG(Vehicle.Cabin.Seat.Row1.Pos1.Position) as previousCabinSeatRow1PosPosition + "; + + let test_compilation_input = TestCompilationInput {}; + let compiled_query = compiler::compile(sql, &test_compilation_input).unwrap(); + if let Some(Expr::Alias { alias, expr }) = compiled_query.projection.get(1) { + assert_eq!(alias, "previousCabinSeatRow1PosPosition"); + if let Expr::Datapoint { lag, .. } = **expr { + assert!(lag); } } println!("EXECUTE"); - let execution_input1 = TestExecutionInput { - seat_pos: 230, - datapoint1: 40, // Condition not met - datapoint2: true, - }; + let mut execution_input1 = ExecutionInputImpl::new(); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::Int32(231), + }, + ); + let res = compiled_query.execute(&execution_input1).unwrap(); + assert!(res.is_some()); + let expected = vec![ + ( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_owned(), + DataValue::Int32(230), + ), + ( + "previousCabinSeatRow1PosPosition".to_owned(), + DataValue::Int32(231), + ), + ]; + assert_expected(res, &expected); +} + +#[test] +fn executor_lag_subquery_test() { + let sql = " + SELECT + (SELECT Vehicle.Cabin.Seat.Row1.Pos1.Position), + (SELECT LAG(Vehicle.Cabin.Seat.Row1.Pos1.Position) as previousCabinSeatRow1PosPosition) + "; + let test_compilation_input = TestCompilationInput {}; + let compiled_query = compiler::compile(sql, &test_compilation_input).unwrap(); + assert_eq!(compiled_query.subquery.len(), 2); + if let Some(subquery) = compiled_query.subquery.get(0) { + assert!(subquery + .input_spec + .contains("Vehicle.Cabin.Seat.Row1.Pos1.Position")); + } + let mut execution_input1 = ExecutionInputImpl::new(); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::Int32(231), + }, + ); + let res = compiled_query.execute(&execution_input1).unwrap(); + assert!(res.is_some()); + let expected = vec![ + ( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_owned(), + DataValue::Int32(230), + ), + ( + "previousCabinSeatRow1PosPosition".to_owned(), + DataValue::Int32(231), + ), + ]; + assert_expected(res, &expected); +} + +#[test] +fn executor_where_lag_subquery_test() { + let sql = " + SELECT + (SELECT Vehicle.Cabin.Seat.Row1.Pos1.Position + WHERE + LAG(Vehicle.Cabin.Seat.Row1.Pos1.Position) <> Vehicle.Cabin.Seat.Row1.Pos1.Position + ) + "; + let test_compilation_input = TestCompilationInput {}; + let compiled_query = compiler::compile(sql, &test_compilation_input).unwrap(); + let mut execution_input1 = ExecutionInputImpl::new(); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::NotAvailable, + }, + ); let res = compiled_query.execute(&execution_input1).unwrap(); + assert!(res.is_some()); + let expected = vec![( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_owned(), + DataValue::Int32(230), + )]; + assert_expected(res, &expected); + let mut execution_input1 = ExecutionInputImpl::new(); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::Int32(230), + }, + ); + let res = compiled_query.execute(&execution_input1).unwrap(); assert!(res.is_none()); } diff --git a/kuksa_databroker/databroker/src/query/expr.rs b/kuksa_databroker/databroker/src/query/expr.rs index 23b846d09..1e97028ee 100644 --- a/kuksa_databroker/databroker/src/query/expr.rs +++ b/kuksa_databroker/databroker/src/query/expr.rs @@ -18,6 +18,7 @@ pub enum Expr { Datapoint { name: String, data_type: DataType, + lag: bool, }, Alias { expr: Box, @@ -43,6 +44,9 @@ pub enum Expr { expr: Box, operator: UnaryOperator, }, + Subquery { + index: u32, + }, Between { expr: Box, negated: bool, @@ -76,10 +80,13 @@ pub enum UnresolvedLiteral { impl Expr { pub fn get_type(&self) -> Result { match self { - Expr::Datapoint { name: _, data_type } => Ok(data_type.clone()), + Expr::Datapoint { + name: _, data_type, .. + } => Ok(data_type.clone()), Expr::Alias { expr, alias: _ } => expr.get_type(), Expr::Cast { expr: _, data_type } => Ok(data_type.clone()), Expr::UnresolvedLiteral { raw } => Err(UnresolvedLiteral::Number(raw.clone())), + Expr::Subquery { index: _ } => Ok(DataType::Uint32), Expr::ResolvedLiteral { value: _, data_type, diff --git a/kuksa_databroker/databroker/src/types.rs b/kuksa_databroker/databroker/src/types.rs index 31961444c..6d9241fd9 100644 --- a/kuksa_databroker/databroker/src/types.rs +++ b/kuksa_databroker/databroker/src/types.rs @@ -553,11 +553,29 @@ impl DataValue { // TODO: Implement better floating point comparison Ok((value - other_value).abs() < f64::EPSILON) } + (DataValue::NotAvailable, DataValue::Int32(..)) + | (DataValue::NotAvailable, DataValue::Int64(..)) + | (DataValue::NotAvailable, DataValue::Uint32(..)) + | (DataValue::NotAvailable, DataValue::Uint64(..)) + | (DataValue::NotAvailable, DataValue::Float(..)) + | (DataValue::NotAvailable, DataValue::Double(..)) => Ok(false), + (DataValue::Int32(..), DataValue::NotAvailable) + | (DataValue::Int64(..), DataValue::NotAvailable) + | (DataValue::Uint32(..), DataValue::NotAvailable) + | (DataValue::Uint64(..), DataValue::NotAvailable) + | (DataValue::Float(..), DataValue::NotAvailable) + | (DataValue::Double(..), DataValue::NotAvailable) => Ok(false), _ => Err(CastError {}), } } } +#[derive(Debug)] +pub struct ExecutionInputImplData { + pub value: DataValue, + pub lag_value: DataValue, +} + #[test] fn test_string_greater_than() { assert!(DataValue::String("string".to_owned())