Skip to content

Commit

Permalink
Resolve merge conflicts (#8974)
Browse files Browse the repository at this point in the history
* Resolve merge conflicts

Signed-off-by: Fanit Kolchina <[email protected]>

* More merge conflicts

Signed-off-by: Fanit Kolchina <[email protected]>

* Fix links

Signed-off-by: Fanit Kolchina <[email protected]>

---------

Signed-off-by: Fanit Kolchina <[email protected]>
  • Loading branch information
kolchfa-aws authored Dec 19, 2024
1 parent 54d82a1 commit 239a867
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 6 deletions.
109 changes: 103 additions & 6 deletions _dashboards/management/accelerate-external-data.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
---
layout: default
title: Optimize query performance using OpenSearch indexing
title: Optimizing query performance using OpenSearch indexing
parent: Connecting Amazon S3 to OpenSearch
grand_parent: Data sources
nav_order: 15
has_children: false
canonical_url: https://opensearch.org/docs/latest/dashboards/management/accelerate-external-data/
---

# Optimize query performance using OpenSearch indexing
# Optimizing query performance using OpenSearch indexing
Introduced 2.11
{: .label .label-purple }

Expand Down Expand Up @@ -39,10 +39,107 @@ To get started with the **Accelerate performance** use case available in **Data

### Define covering index settings

1. Under **Index settings**, enter a valid index name. Note that each Amazon S3 table can have multiple covering indexes.
2. Once you have added the index name, define the covering index fields by selecting `(add fields here)` under **Covering index definition**.
3. Select the **Copy Query to Editor** button to apply your covering index settings.
4. View the covering index query details in the table pane and then select the **Run** button. Your index is added to the left-side navigation menu containing the list of your databases.
1. For **Index name**, enter a valid index name. Note that each table can have multiple covering indexes.
2. Choose a **Refresh type**. By default, OpenSearch automatically refreshes the index. Otherwise, you must manually trigger a refresh using a REFRESH statement.
3. Enter a **Checkpoint location**, which is a path for refresh job checkpoints. The location must be a path in a file system compatible with the Hadoop Distributed File System (HDFS). For more information, see [Starting streaming queries](https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#starting-streaming-queries).
4. Define the covering index fields by selecting **(add fields here)** under **Covering index definition**.
5. Select **Create acceleration** to apply your covering index settings.
6. View the covering index query details and then click **Run**. OpenSearch adds your index to the left navigation pane.

Alternately, you can manually create a covering index on your table using Query Workbench. Select your data source from the dropdown and run a query like the following:

```sql
CREATE INDEX vpc_covering_index
ON datasourcename.gluedatabasename.vpclogstable (version, account_id, interface_id,
srcaddr, dstaddr, srcport, dstport, protocol, packets,
bytes, start, action, log_status STRING,
`aws-account-id`, `aws-service`, `aws-region`, year,
month, day, hour )
WITH (
auto_refresh = true,
refresh_interval = '15 minute',
checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint'
)
```

## Materialized views

With _materialized views_, you can use complex queries, such as aggregations, to power Dashboards visualizations. Materialized views ingest a small amount of your data, depending on the query, into OpenSearch. OpenSearch then forms an index from the ingested data that you can use for visualizations. You can manage the materialized view index with Index State Management. For more information, see [Index State Management](https://opensearch.org/docs/latest/im-plugin/ism/index/).

### Define materialized view settings

1. For **Index name**, enter a valid index name. Note that each table can have multiple covering indexes.
2. Choose a **Refresh type**. By default, OpenSearch automatically refreshes the index. Otherwise, you must manually trigger a refresh using a `REFRESH` statement.
3. Enter a **Checkpoint location**, which is a path for refresh job checkpoints. The location must be a path in an HDFS compatible file system.
4. Enter a **Watermark delay**, which defines how late data can come and still be processed, such as 1 minute or 10 seconds.
5. Define the covering index fields under **Materialized view definition**.
6. Select **Create acceleration** to apply your materialized view index settings.
7. View the materialized view query details and then click **Run**. OpenSearch adds your index to the left navigation pane.

Alternately, you can manually create a materialized view index on your table using Query Workbench. Select your data source from the dropdown and run a query like the following:

```sql
CREATE MATERIALIZED VIEW {table_name}__week_live_mview AS
SELECT
cloud.account_uid AS `aws.vpc.cloud_account_uid`,
cloud.region AS `aws.vpc.cloud_region`,
cloud.zone AS `aws.vpc.cloud_zone`,
cloud.provider AS `aws.vpc.cloud_provider`,

CAST(IFNULL(src_endpoint.port, 0) AS LONG) AS `aws.vpc.srcport`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(src_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`,
CAST(IFNULL(src_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`,
CAST(IFNULL(src_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`,
CAST(IFNULL(src_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`,
CAST(IFNULL(src_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`,

CAST(IFNULL(dst_endpoint.port, 0) AS LONG) AS `aws.vpc.dstport`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`,
CAST(IFNULL(dst_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-interface_uid`,
CAST(IFNULL(dst_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-vpc_uid`,
CAST(IFNULL(dst_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-instance_uid`,
CAST(IFNULL(dst_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-subnet_uid`,
CASE
WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
THEN 'ingress'
ELSE 'egress'
END AS `aws.vpc.flow-direction`,

CAST(IFNULL(connection_info['protocol_num'], 0) AS INT) AS `aws.vpc.connection.protocol_num`,
CAST(IFNULL(connection_info['tcp_flags'], '0') AS STRING) AS `aws.vpc.connection.tcp_flags`,
CAST(IFNULL(connection_info['protocol_ver'], '0') AS STRING) AS `aws.vpc.connection.protocol_ver`,
CAST(IFNULL(connection_info['boundary'], 'Unknown') AS STRING) AS `aws.vpc.connection.boundary`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,

CAST(IFNULL(traffic.packets, 0) AS LONG) AS `aws.vpc.packets`,
CAST(IFNULL(traffic.bytes, 0) AS LONG) AS `aws.vpc.bytes`,

CAST(FROM_UNIXTIME(time / 1000) AS TIMESTAMP) AS `@timestamp`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `start_time`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `interval_start_time`,
CAST(FROM_UNIXTIME(end_time / 1000) AS TIMESTAMP) AS `end_time`,
status_code AS `aws.vpc.status_code`,

severity AS `aws.vpc.severity`,
class_name AS `aws.vpc.class_name`,
category_name AS `aws.vpc.category_name`,
activity_name AS `aws.vpc.activity_name`,
disposition AS `aws.vpc.disposition`,
type_name AS `aws.vpc.type_name`,

region AS `aws.vpc.region`,
accountid AS `aws.vpc.account-id`
FROM
datasourcename.gluedatabasename.vpclogstable
WITH (
auto_refresh = true,
refresh_interval = '15 Minute',
checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint',
watermark_delay = '1 Minute',
)
```

## Limitations

Expand Down
208 changes: 208 additions & 0 deletions _dashboards/management/scheduled-query-acceleration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
---
layout: default
title: Scheduled Query Acceleration
parent: Data sources
nav_order: 18
has_children: false
---

# Scheduled Query Acceleration
Introduced 2.17
{: .label .label-purple }

Scheduled Query Acceleration (SQA) is designed to optimize queries sent directly from OpenSearch to external data sources, such as Amazon Simple Storage Service (Amazon S3). It uses automation to address issues commonly encountered when managing and refreshing indexes, views, and data.

Query acceleration is facilitated by secondary indexes like skipping indexes, covering indexes, or materialized views. When queries run, they use these indexes instead of directly querying Amazon S3.

The secondary indexes need to be refreshed periodically in order to remain current with the Amazon S3 data. This refresh operation can be scheduled using either an internal scheduler (within Spark) or an external scheduler.

SQA provides the following benefits:

- **Cost reduction through optimized resource usage**: SQA reduces the operational load on driver nodes, lowering the costs associated with maintaining auto-refresh for indexes and views.

- **Improved observability of refresh operations**: SQA provides visibility into index states and refresh timing, offering insights into data processing and the current system state.

- **Better control over refresh scheduling**: SQA allows flexible scheduling of refresh intervals, helping you to manage resource usage and refresh frequency according to specific requirements.

- **Simplified index management**: SQA enables updates to index settings, such as refresh intervals, in a single query, which simplifies workflows.

## Concepts

Before configuring SQA, familiarize yourself with the following topics:

- [Optimizing query performance using OpenSearch indexing]({{site.url}}{{site.baseurl}}/dashboards/management/accelerate-external-data/)
- [Flint index refresh](https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md#flint-index-refresh)
- [Index State Management](https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md#index-state-transition-1)

## Prerequisites

Before configuring SQA, verify that the following requirements are met:

- Ensure you're running OpenSearch version 2.17 or later.
- Ensure you have the SQL plugin installed. The SQL plugin is included in most OpenSearch distributions. For more information, see [Installing plugins]({{site.url}}{{site.baseurl}}/install-and-configure/plugins/).
- Ensure you have configured a data source (in this example, Amazon S3): Configure a skipping index, covering index, or materialized view. These secondary data sources are additional data structures that improve query performance by optimizing queries sent to external data sources, such as Amazon S3. For more information, see [Optimizing query performance using OpenSearch indexing]({{site.url}}{{site.baseurl}}/dashboards/management/accelerate-external-data/).
- Configure Amazon EMR Serverless (needed for access to Apache Spark).

## Configuring SQA settings

If you want to override default configuration values, change the following cluster settings:

- **Enable asynchronous query execution**: Set `plugins.query.executionengine.async_query.enabled` to `true` (default value):
```json
PUT /_cluster/settings
{
"transient": {
"plugins.query.executionengine.async_query.enabled": "true"
}
}
```
{% include copy-curl.html %}

For more information, see [Settings](https://github.com/opensearch-project/sql/blob/main/docs/user/admin/settings.rst#pluginsqueryexecutionengineasync_queryenabled).

- **Configure the external scheduler interval for asynchronous queries**: This setting defines how often the external scheduler checks for tasks, allowing customization of refresh frequency. There is no default value for this setting: if this value is empty, the default comes from `opensearch-spark` and is `5 minutes`. Adjusting the interval based on workload volume can help you to optimize resources and manage costs:
```json
PUT /_cluster/settings
{
"transient": {
"plugins.query.executionengine.async_query.external_scheduler.interval": "10 minutes"
}
}
```
{% include copy-curl.html %}

For more information, see [Settings](https://github.com/opensearch-project/sql/blob/main/docs/user/admin/settings.rst#pluginsqueryexecutionengineasync_queryexternal_schedulerinterval).

## Running an accelerated query

You can run accelerated queries in [Query Workbench]({{site.url}}{{site.baseurl}}/dashboards/query-workbench/). To run an accelerated query, use the following syntax:

```sql
CREATE SKIPPING INDEX example_index
WITH (
auto_refresh = true,
refresh_interval = '15 minutes'
);
```
{% include copy.html %}

By default, the query uses an external scheduler. To use an internal scheduler, set `scheduler_mode` to `internal`:

```sql
CREATE SKIPPING INDEX example_index
WITH (
auto_refresh = true,
refresh_interval = '15 minutes',
scheduler_mode = 'internal'
);
```
{% include copy.html %}

## Parameters

When creating indexes using an accelerated query, you can specify the following parameters in the `WITH` clause to control refresh behavior, scheduling, and timing.

| Parameter | Description |
|:--- | :--- |
| `auto_refresh` | Enables automatic refresh for the index. If `true`, the index refreshes automatically at the specified interval. If `false`, the refresh operation must be triggered manually using the `REFRESH` statement. Default is `false`. |
| `refresh_interval` | Defines the amount of time between index refresh operations for the index, which determines how frequently new data is ingested into the index. This is applicable only when `auto_refresh` is enabled. The interval determines how frequently new data is integrated and can be specified in formats like `1 minute` or `10 seconds`. For valid time units, see [Time units](#time-units).|
| `scheduler_mode` | Specifies the scheduling mode for auto-refresh (internal or external scheduling). The external scheduler requires a `checkpoint_location` (a path for refresh job checkpoints) for state management. For more information, see [Starting streaming queries](https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#starting-streaming-queries). Valid values are `internal` and `external`.|

For more information and additional available parameters, see [Flint index refresh](https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md#flint-index-refresh).

## Time units

You can specify the following time units when defining time intervals:

- Milliseconds: `ms`, `millisecond`, or `milliseconds`
- Seconds: `s`, `second`, or `seconds`
- Minutes: `m`, `minute`, or `minutes`
- Hours: `h`, `hour`, or `hours`
- Days: `d`, `day`, or `days`

## Monitoring index status

To monitor the status of an index, use the following statement:

```sql
SHOW FLINT INDEXES IN spark_catalog.default;
```
{% include copy.html %}

## Managing scheduled jobs

Use the following commands to manage scheduled jobs.

### Enabling jobs

To disable auto-refresh using an internal or external scheduler, set `auto_refresh` to `false`:

```sql
ALTER MATERIALIZED VIEW myglue_test.default.count_by_status_v9 WITH (auto_refresh = false);
```
{% include copy.html %}

### Updating schedules

To update the schedule and modify the refresh settings, specify the `refresh_interval` in the `WITH` clause:

```sql
ALTER INDEX example_index
WITH (refresh_interval = '30 minutes');
```
{% include copy.html %}

### Switching the scheduler mode

To switch the scheduler mode, specify the `scheduler_mode` in the `WITH` clause:

```sql
ALTER MATERIALIZED VIEW myglue_test.default.count_by_status_v9 WITH (scheduler_mode = 'internal');
```
{% include copy.html %}

### Inspecting scheduler metadata

To inspect scheduler metadata, use the following request:

```json
GET /.async-query-scheduler/_search
```
{% include copy-curl.html %}

## Best practices

We recommend the following best practices when using SQA.

### Performance optimization

- **Recommended refresh intervals**: Choosing the right refresh interval is crucial for balancing resource usage and system performance. Consider your workload requirements and the freshness of the data you need when setting intervals.

- **Concurrent job limits**: Limit the number of concurrent running jobs to avoid overloading system resources. Monitor system capacity and adjust job limits accordingly to ensure optimal performance.

- **Resource usage**: Efficient resource allocation is key to maximizing performance. Properly allocate memory, CPU, and I/O based on the workload and the type of queries you're running.

### Cost management

- **Use an external scheduler**: An external scheduler offloads refresh operations, reducing demand on core driver nodes.

- **Configure a refresh interval for your use case**: Longer refresh intervals lead to reduced costs but may impact data freshness.

- **Optimize the refresh schedule**: Adjust refresh intervals based on workload patterns to reduce unnecessary refresh operations.

- **Monitor costs**: Regularly monitor costs related to scheduled queries and refresh operations. Using observability tools can help you gain insights into resource usage and costs over time.

## Validating settings

You can validate your settings by running a test query and verifying the scheduler configuration:

```sql
SHOW FLINT INDEXES EXTENDED
```
{% include copy.html %}

For more information, see the [OpenSearch Spark documentation](https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md#all-indexes).

## Troubleshooting

If the refresh operation is not triggering as expected, ensure that the `auto_refresh` setting is enabled and the refresh interval is properly configured.

0 comments on commit 239a867

Please sign in to comment.