Skip to content

Commit

Permalink
Merge branch 'main' into adaptive-rate-control
Browse files Browse the repository at this point in the history
  • Loading branch information
seankao-az committed Jan 9, 2025
2 parents 09b96e9 + e42c535 commit 40e2818
Show file tree
Hide file tree
Showing 25 changed files with 1,894 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:

- name: Upload test report
if: always() # Ensures the artifact is saved even if tests fail
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-reports
path: target/test-reports # Adjust this path if necessary
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `id_expression`: an expression string that generates an ID column to guarantee idempotency when index refresh job restart or any retry attempt during an index refresh. If an empty string is provided, no ID column will be generated.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'

Note that the index option name is case-sensitive. Here is an example:
Expand All @@ -406,6 +407,7 @@ WITH (
watermark_delay = '1 Second',
output_mode = 'complete',
index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}',
id_expression = "sha1(concat_ws('\0',startTime,status))",
extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}'
)
```
Expand Down
5 changes: 5 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## Example PPL Queries

#### **AppendCol**
[See additional command details](ppl-appendcol-command.md)
- `source=employees | stats avg(age) as avg_age1 by dept | fields dept, avg_age1 | APPENDCOL [ stats avg(age) as avg_age2 by dept | fields avg_age2 ];` (To display multiple table statistics side by side)
- `source=employees | FIELDS name, dept, age | APPENDCOL OVERRIDE=true [ stats avg(age) as age ];` (When the override option is enabled, fields from the sub-query take precedence over fields in the main query in cases of field name conflicts)

#### **Comment**
[See additional command details](ppl-comment.md)
- `source=accounts | top gender // finds most common gender of all the accounts` (line comment)
Expand Down
2 changes: 2 additions & 0 deletions docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).

- [`expand commands`](ppl-expand-command.md)

- [`appendcol command`](ppl-appendcol-command.md)

* **Functions**

- [`Expressions`](functions/ppl-expressions.md)
Expand Down
120 changes: 120 additions & 0 deletions docs/ppl-lang/ppl-appendcol-command.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
## PPL `appendcol` command

### Description
Using `appendcol` command to append the result of a sub-search and attach it alongside with the input search results (The main search).

### Syntax - APPENDCOL
`APPENDCOL <override=?> [sub-search]...`

* <override=?>: optional boolean field to specify should result from main-result be overwritten in the case of column name conflict.
* sub-search: Executes PPL commands as a secondary search. The sub-search uses the same data specified in the source clause of the main search results as its input.


#### Example 1: To append the result of `stats avg(age) as AVG_AGE` into existing search result

The example append the result of sub-search `stats avg(age) as AVG_AGE` alongside with the main-search.

PPL query:

os> source=employees | FIELDS name, dept, age | APPENDCOL [ stats avg(age) as AVG_AGE ];
fetched rows / total rows = 9/9
+------+-------------+-----+------------------+
| name | dept | age | AVG_AGE |
+------+-------------+-----+------------------+
| Lisa | Sales | 35 | 31.2222222222222 |
| Fred | Engineering | 28 | NULL |
| Paul | Engineering | 23 | NULL |
| Evan | Sales | 38 | NULL |
| Chloe| Engineering | 25 | NULL |
| Tom | Engineering | 33 | NULL |
| Alex | Sales | 33 | NULL |
| Jane | Marketing | 28 | NULL |
| Jeff | Marketing | 38 | NULL |
+------+-------------+-----+------------------+


#### Example 2: To compare multiple stats commands with side by side with appendCol.

This example demonstrates a common use case: performing multiple statistical calculations and displaying the results side by side in a horizontal layout.

PPL query:

os> source=employees | stats avg(age) as avg_age1 by dept | fields dept, avg_age1 | APPENDCOL [ stats avg(age) as avg_age2 by dept | fields avg_age2 ];
fetched rows / total rows = 3/3
+-------------+-----------+----------+
| dept | avg_age1 | avg_age2 |
+-------------+-----------+----------+
| Engineering | 27.25 | 27.25 |
| Sales | 35.33 | 35.33 |
| Marketing | 33.00 | 33.00 |
+-------------+-----------+----------+


#### Example 3: Append multiple sub-search result

The example demonstrate multiple APPENCOL commands can be chained to provide one comprehensive view for user.

PPL query:

os> source=employees | FIELDS name, dept, age | APPENDCOL [ stats avg(age) as AVG_AGE ] | APPENDCOL [ stats max(age) as MAX_AGE ];
fetched rows / total rows = 9/9
+------+-------------+-----+------------------+---------+
| name | dept | age | AVG_AGE | MAX_AGE |
+------+-------------+-----+------------------+---------+
| Lisa | Sales------ | 35 | 31.22222222222222| 38 |
| Fred | Engineering | 28 | NULL | NULL |
| Paul | Engineering | 23 | NULL | NULL |
| Evan | Sales------ | 38 | NULL | NULL |
| Chloe| Engineering | 25 | NULL | NULL |
| Tom | Engineering | 33 | NULL | NULL |
| Alex | Sales | 33 | NULL | NULL |
| Jane | Marketing | 28 | NULL | NULL |
| Jeff | Marketing | 38 | NULL | NULL |
+------+-------------+-----+------------------+---------+

#### Example 4: Over main-search in the case of column name conflict

The example demonstrate the usage of `OVERRIDE` option to overwrite the `age` column from the main-search,
when the option is set to true and column with same name `age` present on sub-search.

PPL query:

os> source=employees | FIELDS name, dept, age | APPENDCOL OVERRIDE=true [ stats avg(age) as age ];
fetched rows / total rows = 9/9
+------+-------------+------------------+
| name | dept | age |
+------+-------------+------------------+
| Lisa | Sales------ | 31.22222222222222|
| Fred | Engineering | NULL |
| Paul | Engineering | NULL |
| Evan | Sales------ | NULL |
| Chloe| Engineering | NULL |
| Tom | Engineering | NULL |
| Alex | Sales | NULL |
| Jane | Marketing | NULL |
| Jeff | Marketing | NULL |
+------+-------------+------------------+

#### Example 5: AppendCol command with duplicated columns

The example demonstrate what could happen when conflicted columns exist, with `override` set to false or absent.
In this particular case, average aggregation is being performed over column `age` with group-by `dept`, on main and sub query respectively.
As the result, `dept` and `avg_age1` will be returned by the main query, with `avg_age2` and `dept` for the sub-query,
and take into consideration `override` is absent, duplicated columns won't be dropped, hence all four columns will be displayed as the final result.

PPL query:

os> source=employees | stats avg(age) as avg_age1 by dept | APPENDCOL [ stats avg(age) as avg_age2 by dept ];
fetched rows / total rows = 3/3
+------------+--------------+------------+--------------+
| Avg Age 1 | Dept | Avg Age 2 | Dept |
+------------+--------------+------------+--------------+
| 35.33 | Sales | 35.33 | Sales |
| 27.25 | Engineering | 27.25 | Engineering |
| 33.00 | Marketing | 33.00 | Marketing |
+------------+--------------+------------+--------------+


### Limitation:
When override is set to true, only `FIELDS` and `STATS` commands are allowed as the final clause in a sub-search.
Otherwise, an IllegalStateException with the message `Not Supported operation: APPENDCOL should specify the output fields` will be thrown.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.FlintJsonHelper._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -62,7 +64,7 @@ trait FlintSparkIndex {
def build(spark: SparkSession, df: Option[DataFrame]): DataFrame
}

object FlintSparkIndex {
object FlintSparkIndex extends Logging {

/**
* Interface indicates a Flint index has custom streaming refresh capability other than foreach
Expand Down Expand Up @@ -117,6 +119,25 @@ object FlintSparkIndex {
s"${parts(0)}.${parts(1)}.`${parts.drop(2).mkString(".")}`"
}

/**
* Generate an ID column using ID expression provided in the index option.
*
* @param df
* which DataFrame to generate ID column
* @param options
* Flint index options
* @return
* DataFrame with/without ID column
*/
def addIdColumn(df: DataFrame, options: FlintSparkIndexOptions): DataFrame = {
options.idExpression() match {
case Some(idExpr) if idExpr.nonEmpty =>
logInfo(s"Using user-provided ID expression: $idExpr")
df.withColumn(ID_COLUMN, expr(idExpr))
case _ => df
}
}

/**
* Populate environment variables to persist in Flint metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.{Collections, UUID}
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser
Expand Down Expand Up @@ -96,6 +96,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
*/
def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS)

/**
* An expression that generates unique value as index data row ID.
*
* @return
* ID expression
*/
def idExpression(): Option[String] = getOptionValue(ID_EXPRESSION)

/**
* Extra streaming source options that can be simply passed to DataStreamReader or
* Relation.options
Expand Down Expand Up @@ -187,6 +195,7 @@ object FlintSparkIndexOptions {
val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay")
val OUTPUT_MODE: OptionName.Value = Value("output_mode")
val INDEX_SETTINGS: OptionName.Value = Value("index_settings")
val ID_EXPRESSION: OptionName.Value = Value("id_expression")
val EXTRA_OPTIONS: OptionName.Value = Value("extra_options")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchema, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndex.{addIdColumn, flintIndexNamePrefix, generateSchema, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

Expand Down Expand Up @@ -71,10 +71,13 @@ case class FlintSparkCoveringIndex(
val job = df.getOrElse(spark.read.table(quotedTableName(tableName)))

// Add optional filtering condition
filterCondition
.map(job.where)
.getOrElse(job)
.select(colNames.head, colNames.tail: _*)
val batchDf =
filterCondition
.map(job.where)
.getOrElse(job)
.select(colNames.head, colNames.tail: _*)

addIdColumn(batchDf, options)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import scala.collection.convert.ImplicitConversions.`map AsScala`
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchema, metadataBuilder, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndex.{addIdColumn, flintIndexNamePrefix, generateSchema, metadataBuilder, ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE}
Expand Down Expand Up @@ -81,7 +81,8 @@ case class FlintSparkMaterializedView(
override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = {
require(df.isEmpty, "materialized view doesn't support reading from other data frame")

spark.sql(query)
val batchDf = spark.sql(query)
addIdColumn(batchDf, options)
}

override def buildStream(spark: SparkSession): DataFrame = {
Expand All @@ -99,7 +100,9 @@ case class FlintSparkMaterializedView(
case relation: UnresolvedRelation if !relation.isStreaming =>
relation.copy(isStreaming = true, options = optionsWithExtra(spark, relation))
}
logicalPlanToDataFrame(spark, streamingPlan)

val streamingDf = logicalPlanToDataFrame(spark, streamingPlan)
addIdColumn(streamingDf, options)
}

private def watermark(timeCol: Attribute, child: LogicalPlan) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
package org.apache.spark

import org.opensearch.flint.spark.FlintSparkExtensions
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN

import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.{Alias, CodegenObjectFactoryMode, Expression}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf}
import org.apache.spark.sql.flint.config.FlintSparkConf.{EXTERNAL_SCHEDULER_ENABLED, HYBRID_SCAN_ENABLED, METADATA_CACHE_WRITE}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -68,4 +71,27 @@ trait FlintSuite extends SharedSparkSession {
setFlintSparkConf(METADATA_CACHE_WRITE, "false")
}
}

/**
* Implicit class to extend DataFrame functionality with additional utilities.
*
* @param df
* the DataFrame to which the additional methods are added
*/
protected implicit class DataFrameExtensions(val df: DataFrame) {

/**
* Retrieves the ID column expression from the logical plan of the DataFrame, if it exists.
*
* @return
* an `Option` containing the `Expression` for the ID column if present, or `None` otherwise
*/
def idColumn(): Option[Expression] = {
df.queryExecution.logical.collectFirst { case Project(projectList, _) =>
projectList.collectFirst { case Alias(child, ID_COLUMN) =>
child
}
}.flatten
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite
Expand All @@ -22,6 +21,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
WATERMARK_DELAY.toString shouldBe "watermark_delay"
OUTPUT_MODE.toString shouldBe "output_mode"
INDEX_SETTINGS.toString shouldBe "index_settings"
ID_EXPRESSION.toString shouldBe "id_expression"
EXTRA_OPTIONS.toString shouldBe "extra_options"
}

Expand All @@ -36,6 +36,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
"watermark_delay" -> "30 Seconds",
"output_mode" -> "complete",
"index_settings" -> """{"number_of_shards": 3}""",
"id_expression" -> """sha1(col("timestamp"))""",
"extra_options" ->
""" {
| "alb_logs": {
Expand All @@ -55,6 +56,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
options.watermarkDelay() shouldBe Some("30 Seconds")
options.outputMode() shouldBe Some("complete")
options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""")
options.idExpression() shouldBe Some("""sha1(col("timestamp"))""")
options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1")
options.extraSinkOptions() shouldBe Map("opt2" -> "val2", "opt3" -> "val3")
}
Expand Down Expand Up @@ -83,6 +85,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
options.watermarkDelay() shouldBe empty
options.outputMode() shouldBe empty
options.indexSettings() shouldBe empty
options.idExpression() shouldBe empty
options.extraSourceOptions("alb_logs") shouldBe empty
options.extraSinkOptions() shouldBe empty
options.optionsWithDefault should contain("auto_refresh" -> "false")
Expand Down
Loading

0 comments on commit 40e2818

Please sign in to comment.