From 890930c778607af6584319b76b4e6ce997e9b98e Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Mon, 15 Jul 2024 09:04:52 -0700 Subject: [PATCH] Add scheduler_mode index option (#415) * Add scheduler_mode index option Signed-off-by: Louis Chu * Add integ test Signed-off-by: Louis Chu * Delete flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/worksheet.sc Signed-off-by: Louis Chu * Fix CI Signed-off-by: Louis Chu * Resolve comment Signed-off-by: Louis Chu * Fix CI Signed-off-by: Louis Chu --------- Signed-off-by: Louis Chu --- docs/index.md | 11 ++++- .../flint/spark/FlintSparkIndexOptions.scala | 41 ++++++++++++++++++- .../spark/refresh/AutoIndexRefresh.scala | 13 +++--- .../refresh/FlintSparkIndexRefresh.scala | 19 ++++++++- .../refresh/IncrementalIndexRefresh.scala | 5 +-- .../FlintSparkCoveringIndexAstBuilder.scala | 6 ++- ...FlintSparkMaterializedViewAstBuilder.scala | 6 ++- .../FlintSparkSkippingIndexAstBuilder.scala | 6 ++- .../spark/FlintSparkIndexOptionsSuite.scala | 30 ++++++++++++++ .../FlintSparkCoveringIndexITSuite.scala | 23 +++++++++++ .../FlintSparkCoveringIndexSqlITSuite.scala | 30 ++++++++++++++ .../FlintSparkIndexValidationITSuite.scala | 23 ++++++++++- .../FlintSparkMaterializedViewITSuite.scala | 3 +- ...FlintSparkMaterializedViewSqlITSuite.scala | 30 ++++++++++++++ .../FlintSparkSkippingIndexITSuite.scala | 36 +++++++++++++++- .../FlintSparkSkippingIndexSqlITSuite.scala | 29 +++++++++++++ .../spark/FlintSparkUpdateIndexITSuite.scala | 1 + 17 files changed, 291 insertions(+), 21 deletions(-) diff --git a/docs/index.md b/docs/index.md index 46ebd87eb..af6e54a3e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -30,7 +30,13 @@ Please see the following example in which Index Building Logic and Query Rewrite ### Flint Index Refresh - **Auto Refresh:** - - This feature allows the Flint Index to automatically refresh. Users can configure such as frequency of auto-refresh based on their preferences. + - This feature allows the Flint Index to automatically refresh. Users can configure such as frequency of auto-refresh based on their preferences. There are two modes available for scheduling the auto-refresh: + - **Internal Scheduler:** + - Description: The data refresh is executed in micro-batch mode using the internal scheduler. + - Recommended Use-Case: This mode is ideal for low-latency use-cases where data needs to be refreshed frequently and quickly. + - **External Scheduler:** + - Description: The data refresh is executed using an external scheduler. + - Recommended Use-Case: This mode is suitable for scenarios where data responsiveness is less critical, helping to reduce the cost of maintaining a long-running Spark cluster. - **Manual Refresh:** - Users have the option to manually trigger a refresh for the Flint Index. This provides flexibility and control over when the refresh occurs. - **Full Refresh:** @@ -369,7 +375,8 @@ fetched rows / total rows = 5/5 User can provide the following options in `WITH` clause of create statement: -+ `auto_refresh`: default value is false. Automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually. ++ `auto_refresh`: default value is false. Automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually. ++ `scheduler_mode`: A mode string (`internal` or `external`) that describes how `auto_refresh` is scheduled. `checkpoint_location` is required for the external scheduler. + `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. + `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled. + `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 9107a8a66..8bf09caf9 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -8,8 +8,9 @@ package org.opensearch.flint.spark import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode /** * Flint Spark index configurable options. @@ -22,6 +23,7 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { implicit val formats: Formats = Serialization.formats(NoTypeHints) validateOptionNames(options) + validateOptionSchedulerModeValue() /** * Is Flint index auto refreshed or manual refreshed. @@ -31,6 +33,19 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def autoRefresh(): Boolean = getOptionValue(AUTO_REFRESH).getOrElse("false").toBoolean + /** + * The scheduler mode for the Flint index refresh. + * + * @return + * scheduler mode option value + */ + def schedulerMode(): SchedulerMode.Value = { + // TODO: Change default value to external once the external scheduler is enabled + val defaultMode = "internal" + val modeStr = getOptionValue(SCHEDULER_MODE).getOrElse(defaultMode) + SchedulerMode.fromString(modeStr) + } + /** * The refresh interval (only valid if auto refresh enabled). * @@ -112,6 +127,21 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { if (!options.contains(AUTO_REFRESH.toString)) { map += (AUTO_REFRESH.toString -> autoRefresh().toString) } + + // Add default option only when auto refresh is TRUE + if (autoRefresh() == true) { + if (!options.contains(SCHEDULER_MODE.toString)) { + map += (SCHEDULER_MODE.toString -> schedulerMode().toString) + } + + // The query will be executed in micro-batch mode using the internal scheduler + // The default interval for the external scheduler is 15 minutes. + if (SchedulerMode.EXTERNAL == schedulerMode() && !options.contains( + REFRESH_INTERVAL.toString)) { + map += (REFRESH_INTERVAL.toString -> "15 minutes") + } + } + if (!options.contains(INCREMENTAL_REFRESH.toString)) { map += (INCREMENTAL_REFRESH.toString -> incrementalRefresh().toString) } @@ -127,6 +157,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { .map(opt => (parse(opt) \ key).extract[Map[String, String]]) .getOrElse(Map.empty) } + + private def validateOptionSchedulerModeValue(): Unit = { + getOptionValue(SCHEDULER_MODE) match { + case Some(modeStr) => + SchedulerMode.fromString(modeStr) // Will throw an exception if the mode is invalid + case None => // no action needed if modeStr is empty + } + } } object FlintSparkIndexOptions { @@ -142,6 +180,7 @@ object FlintSparkIndexOptions { object OptionName extends Enumeration { type OptionName = Value val AUTO_REFRESH: OptionName.Value = Value("auto_refresh") + val SCHEDULER_MODE: OptionName.Value = Value("scheduler_mode") val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval") val INCREMENTAL_REFRESH: OptionName.Value = Value("incremental_refresh") val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index db2f321a0..47097db60 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -10,6 +10,7 @@ import java.util.Collections import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper} import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE @@ -43,7 +44,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) !isTableProviderSupported(spark, index), "Index auto refresh doesn't support Hive table") - // Checkpoint location is required if mandatory option set + // Checkpoint location is required if mandatory option set or external scheduler is used val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String]) val checkpointLocation = options.checkpointLocation() if (flintSparkConf.isCheckpointMandatory) { @@ -90,7 +91,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) .foreachBatch { (batchDF: DataFrame, _: Long) => new FullIndexRefresh(indexName, index, Some(batchDF)) .start(spark, flintSparkConf) - () // discard return value above and return unit to use right overridden method + () // discard return value above and return unit to use the right overridden method } .start() Some(job.id.toString) @@ -103,10 +104,12 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) def addSinkOptions( options: FlintSparkIndexOptions, flintSparkConf: FlintSparkConf): DataStreamWriter[Row] = { + // For incremental refresh, the refresh_interval option is overridden by Trigger.AvailableNow(). dataStream .addCheckpointLocation(options.checkpointLocation(), flintSparkConf.isCheckpointMandatory) .addRefreshInterval(options.refreshInterval()) - .addAvailableNowTrigger(options.incrementalRefresh()) + .addAvailableNowTrigger( + SchedulerMode.EXTERNAL == options.schedulerMode() || options.incrementalRefresh()) .addOutputMode(options.outputMode()) .options(options.extraSinkOptions()) } @@ -129,8 +132,8 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) .getOrElse(dataStream) } - def addAvailableNowTrigger(incrementalRefresh: Boolean): DataStreamWriter[Row] = { - if (incrementalRefresh) { + def addAvailableNowTrigger(setAvailableNow: Boolean): DataStreamWriter[Row] = { + if (setAvailableNow) { dataStream.trigger(Trigger.AvailableNow()) } else { dataStream diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala index 0c6adb0bd..2dff77e26 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -59,6 +59,20 @@ object FlintSparkIndexRefresh { val AUTO, FULL, INCREMENTAL = Value } + /** Index scheduler mode for auto refresh */ + object SchedulerMode extends Enumeration { + type SchedulerMode = Value + val INTERNAL: SchedulerMode.Value = Value("internal") + val EXTERNAL: SchedulerMode.Value = Value("external") + + def fromString(s: String): SchedulerMode.Value = { + require( + SchedulerMode.values.exists(_.toString.equalsIgnoreCase(s)), + s"Scheduler mode $s is invalid. Must be 'internal' or 'external'.") + SchedulerMode.values.find(_.toString.equalsIgnoreCase(s)).get + } + } + /** * Create concrete index refresh implementation for the given index. * @@ -71,7 +85,10 @@ object FlintSparkIndexRefresh { */ def create(indexName: String, index: FlintSparkIndex): FlintSparkIndexRefresh = { val options = index.options - if (options.autoRefresh()) { + // TODO: Refactor the refresh class names and RefreshMode enum to distinguish Spark side physical trigger setup from the logical Flint index option. + if (SchedulerMode.EXTERNAL == options.schedulerMode()) { + new IncrementalIndexRefresh(indexName, index) + } else if (options.autoRefresh()) { new AutoIndexRefresh(indexName, index) } else if (options.incrementalRefresh()) { new IncrementalIndexRefresh(indexName, index) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index 5ee7308f1..98f0d838f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -34,9 +34,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) // Checkpoint location is required regardless of mandatory option val options = index.options val checkpointLocation = options.checkpointLocation() - require( - options.checkpointLocation().nonEmpty, - "Checkpoint location is required by incremental refresh") + require(options.checkpointLocation().nonEmpty, "Checkpoint location is required") require( isCheckpointLocationAccessible(spark, checkpointLocation.get), s"No sufficient permission to access the checkpoint location ${checkpointLocation.get}") @@ -50,6 +48,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) new AutoIndexRefresh(indexName, index) .start(spark, flintSparkConf) + // Blocks the calling thread until the streaming query finishes spark.streams .get(jobId.get) .awaitTermination() diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index fc200aebf..daac87395 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql.covering import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -49,8 +50,9 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A .options(indexOptions) .create(ignoreIfExists) - // Trigger auto refresh if enabled - if (indexOptions.autoRefresh()) { + // Trigger auto refresh if enabled and not using external scheduler + if (indexOptions + .autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) { val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.refreshIndex(flintIndexName) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index cd4f84028..4ef0b003e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -10,6 +10,7 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -42,8 +43,9 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito .options(indexOptions) .create(ignoreIfExists) - // Trigger auto refresh if enabled - if (indexOptions.autoRefresh()) { + // Trigger auto refresh if enabled and not using external scheduler + if (indexOptions + .autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) { val flintIndexName = getFlintIndexName(flint, ctx.mvName) flint.refreshIndex(flintIndexName) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 521898aa2..5c4613504 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -10,6 +10,7 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory._ import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET} @@ -75,8 +76,9 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A .options(indexOptions) .create(ignoreIfExists) - // Trigger auto refresh if enabled - if (indexOptions.autoRefresh()) { + // Trigger auto refresh if enabled and not using external scheduler + if (indexOptions + .autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) { val indexName = getSkippingIndexName(flint, ctx.tableName) flint.refreshIndex(indexName) } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index 212d91e13..29ec885be 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.scalatest.matchers.should.Matchers import org.apache.spark.FlintSuite @@ -14,6 +15,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { test("should return lowercase name as option name") { AUTO_REFRESH.toString shouldBe "auto_refresh" + SCHEDULER_MODE.toString shouldBe "scheduler_mode" REFRESH_INTERVAL.toString shouldBe "refresh_interval" INCREMENTAL_REFRESH.toString shouldBe "incremental_refresh" CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location" @@ -27,6 +29,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { val options = FlintSparkIndexOptions( Map( "auto_refresh" -> "true", + "scheduler_mode" -> "external", "refresh_interval" -> "1 Minute", "incremental_refresh" -> "true", "checkpoint_location" -> "s3://test/", @@ -45,6 +48,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { | }""".stripMargin)) options.autoRefresh() shouldBe true + options.schedulerMode() shouldBe SchedulerMode.EXTERNAL options.refreshInterval() shouldBe Some("1 Minute") options.incrementalRefresh() shouldBe true options.checkpointLocation() shouldBe Some("s3://test/") @@ -73,6 +77,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { val options = FlintSparkIndexOptions(Map.empty) options.autoRefresh() shouldBe false + options.schedulerMode() shouldBe SchedulerMode.INTERNAL options.refreshInterval() shouldBe empty options.checkpointLocation() shouldBe empty options.watermarkDelay() shouldBe empty @@ -92,6 +97,27 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { "refresh_interval" -> "1 Minute") } + test("should return include default scheduler_mode option when auto refresh is set to true") { + val options = FlintSparkIndexOptions(Map("auto_refresh" -> "true")) + + options.optionsWithDefault shouldBe Map( + "auto_refresh" -> "true", + "scheduler_mode" -> "internal", + "incremental_refresh" -> "false") + } + + test( + "should return include default refresh_interval option with auto_refresh=true and scheduler_mode=external") { + val options = + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "scheduler_mode" -> "external")) + + options.optionsWithDefault shouldBe Map( + "auto_refresh" -> "true", + "scheduler_mode" -> "external", + "refresh_interval" -> "15 minutes", + "incremental_refresh" -> "false") + } + test("should report error if any unknown option name") { the[IllegalArgumentException] thrownBy FlintSparkIndexOptions(Map("autoRefresh" -> "true")) @@ -102,5 +128,9 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { the[IllegalArgumentException] thrownBy { FlintSparkIndexOptions(Map("auto_refresh" -> "true", "indexSetting" -> "test")) } should have message "requirement failed: option name indexSetting is invalid" + + the[IllegalArgumentException] thrownBy { + FlintSparkIndexOptions(Map("scheduler_mode" -> "invalid_mode")) + } should have message "requirement failed: Scheduler mode invalid_mode is invalid. Must be 'internal' or 'external'." } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index e5aa7b4d1..31b5c14b1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -118,6 +118,29 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) } + test("auto refresh covering index successfully with external scheduler") { + withTempDir { checkpointDir => + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "scheduler_mode" -> "external", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .create() + + val jobId = flint.refreshIndex(testFlintIndex) + jobId shouldBe None + + val indexData = flint.queryIndex(testFlintIndex) + checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) + } + } + test("update covering index successfully") { // Create full refresh Flint index flint diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 371c6ca2f..db14e395b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -141,6 +141,36 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("create covering index with external scheduler") { + withTempDir { checkpointDir => + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WITH ( + | auto_refresh = true, + | scheduler_mode = 'external', + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + + // Refresh all present source data as of now + sql(s"REFRESH INDEX $testIndex ON $testTable") + flint.queryIndex(testFlintIndex).count() shouldBe 2 + + // New data won't be refreshed until refresh statement triggered + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | VALUES ('Hello', 50, 'Vancouver') + |""".stripMargin) + flint.queryIndex(testFlintIndex).count() shouldBe 2 + + // New data is refreshed incrementally + sql(s"REFRESH INDEX $testIndex ON $testTable") + flint.queryIndex(testFlintIndex).count() shouldBe 3 + } + } + test("create covering index with incremental refresh") { withTempDir { checkpointDir => sql(s""" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala index 008b04078..af5e443df 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala @@ -88,6 +88,27 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup } } + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) + .foreach { statement => + test( + s"should fail to create auto refresh Flint index if scheduler_mode is external and no checkpoint location: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | auto_refresh = true, + | scheduler_mode = 'external' + | ) + |""".stripMargin) + } should have message + "requirement failed: Checkpoint location is required" + } + } + } + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) .foreach { statement => test( @@ -103,7 +124,7 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup | ) |""".stripMargin) } should have message - "requirement failed: Checkpoint location is required by incremental refresh" + "requirement failed: Checkpoint location is required" } } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 83fe1546c..f824aab73 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -78,7 +78,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | "auto_refresh": "true", | "incremental_refresh": "false", | "checkpoint_location": "${checkpointDir.getAbsolutePath}", - | "watermark_delay": "30 Seconds" + | "watermark_delay": "30 Seconds", + | "scheduler_mode":"internal" | }, | "latestId": "$testLatestId", | "properties": {} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 3a17cb8b1..fc4cdbeac 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -82,6 +82,36 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } } + test("create materialized view with auto refresh and external scheduler") { + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | auto_refresh = true, + | scheduler_mode = 'external', + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | watermark_delay = '1 Second' + | ) + | """.stripMargin) + + // Refresh all present source data as of now + sql(s"REFRESH MATERIALIZED VIEW $testMvName") + flint.queryIndex(testFlintIndex).count() shouldBe 3 + + // New data won't be refreshed until refresh statement triggered + sql(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver') + | """.stripMargin) + flint.queryIndex(testFlintIndex).count() shouldBe 3 + + // New data is refreshed incrementally + sql(s"REFRESH MATERIALIZED VIEW $testMvName") + flint.queryIndex(testFlintIndex).count() shouldBe 4 + } + } + test("create materialized view with streaming job options") { withTempDir { checkpointDir => sql(s""" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index b2185a5a9..66e777dea 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -159,6 +159,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { optionJson should matchJson(s""" | { | "auto_refresh": "true", + | "scheduler_mode": "internal", | "incremental_refresh": "false", | "refresh_interval": "1 Minute", | "checkpoint_location": "${checkpointDir.getAbsolutePath}", @@ -242,7 +243,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) .create() - } should have message "requirement failed: Checkpoint location is required by incremental refresh" + } should have message "requirement failed: Checkpoint location is required" } test("auto refresh skipping index successfully") { @@ -266,6 +267,39 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { indexData should have size 2 } + test("auto refresh skipping index successfully with external scheduler") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "scheduler_mode" -> "external", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .create() + + flint.refreshIndex(testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 2 + + // Delete all index data intentionally and generate a new source file + openSearchClient.deleteByQuery( + new DeleteByQueryRequest(testIndex).setQuery(QueryBuilders.matchAllQuery()), + RequestOptions.DEFAULT) + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) + + // Expect to only refresh the new file + flint.refreshIndex(testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 1 + } + } + test("update skipping index successfully") { // Create full refresh Flint index flint diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index e10e6a29b..af497eb2b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -62,6 +62,35 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit indexData.count() shouldBe 2 } + test("create skipping index with auto refresh and external scheduler") { + withTempDir { checkpointDir => + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | auto_refresh = true, + | scheduler_mode = 'external', + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + + // Refresh all present source data as of now + sql(s"REFRESH SKIPPING INDEX ON $testTable") + flint.queryIndex(testIndex).count() shouldBe 2 + + // New data won't be refreshed until refresh statement triggered + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | VALUES ('Hello', 50, 'Vancouver') + |""".stripMargin) + flint.queryIndex(testIndex).count() shouldBe 2 + + sql(s"REFRESH SKIPPING INDEX ON $testTable") + flint.queryIndex(testIndex).count() shouldBe 3 + } + } + test("create skipping index with max size value set") { sql(s""" | CREATE SKIPPING INDEX ON $testTable diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index 76da7e8c3..c42822f71 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -61,6 +61,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { optionJson should matchJson(s""" | { | "auto_refresh": "true", + | "scheduler_mode": "internal", | "incremental_refresh": "false", | "refresh_interval": "1 Minute", | "checkpoint_location": "${checkpointDir.getAbsolutePath}",