Skip to content

Commit

Permalink
feat(smdk): support lookback in test (#3317)
Browse files Browse the repository at this point in the history
Added an ability to specify the lookback parameter and provide existing records to simulate data in the topic.

For example, here we run `test` and pass input record "1" and provide other records "2" and "3" as they would exist in the topic before:
```bash
smdk test --text "1" --lookback-last 1 --record "2" --record "3" 
```
the `look_back` function will receive record "3".

Closes #3315
  • Loading branch information
Alexander Galibey committed Jun 9, 2023
1 parent ee4dec1 commit fed60b0
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 9 deletions.
4 changes: 4 additions & 0 deletions crates/fluvio-smartengine/src/engine/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ impl SmartModuleConfig {
pub(crate) fn version(&self) -> i16 {
self.version.unwrap_or(DEFAULT_SMARTENGINE_VERSION)
}

pub fn set_lookback(&mut self, lookback: Option<Lookback>) {
self.lookback = lookback;
}
}

#[cfg(feature = "transformation")]
Expand Down
2 changes: 1 addition & 1 deletion crates/smartmodule-development-kit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod publish;
mod hub;
mod set_public;

use std::path::{PathBuf};
use std::path::PathBuf;

use clap::Parser;
use anyhow::Result;
Expand Down
74 changes: 66 additions & 8 deletions crates/smartmodule-development-kit/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use fluvio_future::task::run_block_on;
use fluvio_sc_schema::smartmodule::SmartModuleApiClient;
use fluvio_smartengine::metrics::SmartModuleChainMetrics;
use fluvio_smartengine::transformation::TransformationConfig;
use fluvio_smartengine::{SmartEngine, SmartModuleChainBuilder, SmartModuleConfig};
use fluvio_smartengine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleChainInstance, Lookback,
};
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;
use fluvio_protocol::record::Record;
use fluvio_cli_common::user_input::{UserInputRecords, UserInputType};
Expand Down Expand Up @@ -70,6 +72,15 @@ pub struct TestCmd {
/// verbose output
#[arg(short = 'v', long = "verbose")]
verbose: bool,

/// Records which act as existing in the topic before the SmartModule starts processing. Useful
/// for testing `lookback`. Multiple values are allowed.
#[arg(long, short)]
record: Vec<String>,

/// Sets the lookback parameter to the last N records.
#[arg(long, short)]
lookback_last: Option<u64>,
}

fn parse_key_val(s: &str) -> Result<(String, String)> {
Expand All @@ -81,28 +92,43 @@ fn parse_key_val(s: &str) -> Result<(String, String)> {

impl TestCmd {
pub(crate) fn process(self) -> Result<()> {
run_block_on(self.process_async())
}

async fn process_async(self) -> Result<()> {
debug!("starting smartmodule test");

let lookback: Option<Lookback> = self.lookback_last.map(Lookback::Last);

let chain_builder = if let Some(transforms_file) = self.transforms_file {
let config = TransformationConfig::from_file(transforms_file)
.context("unable to read transformation config")?;
run_block_on(build_chain(config))?
build_chain(config, lookback).await?
} else if !self.transform.is_empty() {
let config = TransformationConfig::try_from(self.transform)
.context("unable to parse transform")?;
run_block_on(build_chain(config))?
build_chain(config, lookback).await?
} else if let Some(wasm_file) = self.wasm_file {
build_chain_ad_hoc(crate::read_bytes_from_path(&wasm_file)?, self.params)?
build_chain_ad_hoc(
crate::read_bytes_from_path(&wasm_file)?,
self.params,
lookback,
)?
} else {
let package_info = PackageInfo::from_options(&self.package.as_opt())?;
let wasm_file = package_info.target_wasm32_path()?;
build_chain_ad_hoc(crate::read_bytes_from_path(&wasm_file)?, self.params)?
build_chain_ad_hoc(
crate::read_bytes_from_path(&wasm_file)?,
self.params,
lookback,
)?
};

let engine = SmartEngine::new();
debug!("SmartModule chain created");

let mut chain = chain_builder.initialize(&engine)?;
look_back(&mut chain, self.record).await?;

let key = self.key.map(Bytes::from);

Expand Down Expand Up @@ -141,7 +167,34 @@ impl TestCmd {
}
}

async fn build_chain(config: TransformationConfig) -> Result<SmartModuleChainBuilder> {
async fn look_back(chain: &mut SmartModuleChainInstance, records: Vec<String>) -> Result<()> {
let records: Vec<Record> = records
.into_iter()
.map(|r| Record::new(r.as_str()))
.collect();
chain
.look_back(
|lookback| {
let res = match lookback {
fluvio_smartengine::Lookback::Last(n) => Ok(records
.clone()
.into_iter()
.rev()
.take(n as usize)
.rev()
.collect()),
};
async { res }
},
&Default::default(),
)
.await
}

async fn build_chain(
config: TransformationConfig,
lookback: Option<Lookback>,
) -> Result<SmartModuleChainBuilder> {
let client_config = FluvioConfig::load()?.try_into()?;
let api_client = SmartModuleApiClient::connect_with_config(client_config).await?;
let mut chain_builder = SmartModuleChainBuilder::default();
Expand All @@ -153,7 +206,8 @@ async fn build_chain(config: TransformationConfig) -> Result<SmartModuleChainBui
.ok_or_else(|| anyhow!("smartmodule {} not found", &transform.uses))?
.wasm
.as_raw_wasm()?;
let config = SmartModuleConfig::from(transform);
let mut config = SmartModuleConfig::from(transform);
config.set_lookback(lookback);
chain_builder.add_smart_module(config, wasm);
}
Ok(chain_builder)
Expand All @@ -162,10 +216,14 @@ async fn build_chain(config: TransformationConfig) -> Result<SmartModuleChainBui
fn build_chain_ad_hoc(
wasm: Vec<u8>,
params: Vec<(String, String)>,
lookback: Option<Lookback>,
) -> Result<SmartModuleChainBuilder> {
let params: BTreeMap<String, String> = params.into_iter().collect();
Ok(SmartModuleChainBuilder::from((
SmartModuleConfig::builder().params(params.into()).build()?,
SmartModuleConfig::builder()
.params(params.into())
.lookback(lookback)
.build()?,
wasm,
)))
}
19 changes: 19 additions & 0 deletions tests/cli/smdk_smoke_tests/smdk-basic.bats
Original file line number Diff line number Diff line change
Expand Up @@ -867,3 +867,22 @@ setup_file() {
assert_output --partial "Creating SmartModule: $SM_PACKAGE_NAME"
assert_success
}

@test "Test Lookback" {
# Test with smartmodule example with Lookback
cd "$(pwd)/smartmodule/examples/filter_look_back/"

# Build
run $SMDK_BIN build
refute_output --partial "could not compile"

# Test
run $SMDK_BIN test --text '111' --lookback-last '1' --record '222' --record '333'
refute_output --partial "111"
assert_success

run $SMDK_BIN test --text '444' --lookback-last '1' --record '222' --record '333'
assert_output --partial "444"
assert_success

}

0 comments on commit fed60b0

Please sign in to comment.