diff --git a/CHANGELOG.md b/CHANGELOG.md index 6624daf802..19f77c4207 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ * Bump `MINIMUM_PLATFORM_VERSION` to `0.9.0` ([#1310](https://github.com/infinyon/fluvio/issues/1310)) * Fix owner reference type to work delete in K 1.20.0 ([#1342](https://github.com/infinyon/fluvio/issues/1342)) * Fix Upgrading K8 Cluster ([#1347](https://github.com/infinyon/fluvio/issues/1347)) +* Add Error Handling to SmartStreams ([#1198](https://github.com/infinyon/fluvio/pull/1198)) +* Finish SmartStream Map (`#[smartstream(map)]`) API ([#1174](https://github.com/infinyon/fluvio/pull/1174), [#1198](https://github.com/infinyon/fluvio/pull/1198)) ## Platform Version 0.8.5 - 2021-07-14 * Add unstable Admin Watch API for topics, partitions, and SPUs ([#1136](https://github.com/infinyon/fluvio/pull/1136)) diff --git a/src/smartstream/derive/src/generator/map.rs b/src/smartstream/derive/src/generator/map.rs index 1befe73e7e..76ce1bb59f 100644 --- a/src/smartstream/derive/src/generator/map.rs +++ b/src/smartstream/derive/src/generator/map.rs @@ -49,7 +49,7 @@ pub fn generate_map_smartstream(func: &SmartStreamFn) -> TokenStream { let error = fluvio_smartstream::dataplane::smartstream::SmartStreamRuntimeError::new( &record, smartstream_input.base_offset, - fluvio_smartstream::dataplane::smartstream::SmartStreamType::Filter, + fluvio_smartstream::dataplane::smartstream::SmartStreamType::Map, err, ); output.error = Some(error); diff --git a/src/spu/src/services/public/stream_fetch.rs b/src/spu/src/services/public/stream_fetch.rs index dbb0447836..48d51a39d2 100644 --- a/src/spu/src/services/public/stream_fetch.rs +++ b/src/spu/src/services/public/stream_fetch.rs @@ -1334,7 +1334,13 @@ mod test { let mut records: RecordSet = BatchProducer::builder() .records(10u16) - .record_generator(Arc::new(|i, _| Record::new(i.to_string()))) + .record_generator(Arc::new(|i, _| { + if i < 9 { + Record::new(i.to_string()) + } else { + Record::new("nine".to_string()) + } + })) .build() .expect("batch") .records(); @@ -1349,7 +1355,7 @@ mod test { assert_eq!(response.partition.records.batches.len(), 1); let records = response.partition.records.batches[0].records(); - assert_eq!(records.len(), 10); + assert_eq!(records.len(), 9); assert_eq!(records[0].value.as_ref(), "0".as_bytes()); assert_eq!(records[1].value.as_ref(), "2".as_bytes()); assert_eq!(records[2].value.as_ref(), "4".as_bytes()); @@ -1359,11 +1365,14 @@ mod test { assert_eq!(records[6].value.as_ref(), "12".as_bytes()); assert_eq!(records[7].value.as_ref(), "14".as_bytes()); assert_eq!(records[8].value.as_ref(), "16".as_bytes()); - assert_eq!(records[9].value.as_ref(), "18".as_bytes()); match &response.partition.error_code { - ErrorCode::None => (), - _ => panic!("should not get error"), + ErrorCode::SmartStreamError(SmartStreamError::Runtime(error)) => { + assert_eq!(error.offset, 9); + assert_eq!(error.kind, SmartStreamType::Map); + assert_eq!(error.record_value.as_ref(), "nine".as_bytes()); + } + _ => panic!("should get runtime error"), } drop(response);