Skip to content

Commit

Permalink
Add e2e tiled test
Browse files Browse the repository at this point in the history
  • Loading branch information
caiocamatta-stripe committed Nov 30, 2023
1 parent 9183f8b commit 1ef8f29
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
2 changes: 1 addition & 1 deletion flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ai.chronon.flink
import ai.chronon.api.Extensions.GroupByOps
import ai.chronon.api.{Constants, DataModel, Query, StructType => ChrononStructType}
import ai.chronon.flink.window.TimestampedTile
import ai.chronon.online.{AvroConversions, GroupByServingInfoParsed, KVStore}
import ai.chronon.online.{AvroConversions, GroupByServingInfoParsed}
import ai.chronon.online.KVStore.PutRequest
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.configuration.Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class FlinkJobIntegrationTest {
val flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(8)
.setNumberTaskManagers(1)
.setNumberTaskManagers(2)
.build)

@Before
Expand Down Expand Up @@ -122,6 +122,43 @@ class FlinkJobIntegrationTest {

env.execute("FlinkJobIntegrationTest")

// capture the datastream of the 'created' timestamps of all the written out events
val writeEventCreatedDS = CollectSink.values.asScala

assert(writeEventCreatedDS.size == elements.size)
// check that the timestamps of the written out events match the input events
// we use a Set as we can have elements out of order given we have multiple tasks
assertEquals(writeEventCreatedDS.map(_.putRequest.tsMillis).map(_.get).toSet, elements.map(_.created).toSet)
// check that all the writes were successful
assertEquals(writeEventCreatedDS.map(_.status), Seq(true, true, true))
}

@Test
def testTiledFlinkJobEndToEnd(): Unit = {
implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val elements = Seq(
E2ETestEvent("test1", 12, 1.5, 1L),
E2ETestEvent("test2", 13, 1.6, 2L),
E2ETestEvent("test3", 14, 1.7, 3L)
)

val source = new E2EEventSource(elements)
val groupBy = FlinkTestUtils.makeGroupBy(Seq("id"))
val encoder = Encoders.product[E2ETestEvent]

val outputSchema = new SparkExpressionEvalFn(encoder, groupBy).getOutputSchema

val groupByServingInfoParsed = makeTestGroupByServingInfoParsed(groupBy, encoder.schema, outputSchema)
val mockApi = mock[Api](withSettings().serializable())
val writerFn = new MockAsyncKVStoreWriter(Seq(true), mockApi, "testFG")
val job = new FlinkJob[E2ETestEvent](source, writerFn, groupByServingInfoParsed, encoder, 2)

job.runTiledGroupByJob(env).addSink(new CollectSink)

env.execute("FlinkJobIntegrationTest")


// capture the datastream of the 'created' timestamps of all the written out events
val writeEventCreatedDS = CollectSink.values.asScala

Expand Down

0 comments on commit 1ef8f29

Please sign in to comment.