Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Exporters and Readers #210

Merged
merged 7 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ A local index exporter allows you to export the top N queries to local OpenSearc
PUT _cluster/settings
{
"persistent" : {
"search.insights.top_queries.latency.exporter.type" : "local_index",
"search.insights.top_queries.latency.exporter.config.index" : "YYYY.MM.dd"
"search.insights.top_queries.exporter.type" : "local_index"
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,19 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY,
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER,
QueryInsightsSettings.TOP_N_EXPORTER_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@
/**
* Debug exporter for development purpose
*/
public final class DebugExporter implements QueryInsightsExporter {
public class DebugExporter implements QueryInsightsExporter {
/**
* Logger of the debug exporter
*/
private final Logger logger = LogManager.getLogger();
private static final String EXPORTER_ID = "debug_exporter";

/**
* Constructor of DebugExporter
*/
private DebugExporter() {}

@Override
public String getId() {
return EXPORTER_ID;
}

private static class InstanceHolder {
private static final DebugExporter INSTANCE = new DebugExporter();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,89 @@

package org.opensearch.plugin.insights.core.exporter;

import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex;
import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE;

import java.time.Instant;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.service.TopQueriesService;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

/**
* Local index exporter for exporting query insights data to local OpenSearch indices.
*/
public final class LocalIndexExporter implements QueryInsightsExporter {
public class LocalIndexExporter implements QueryInsightsExporter {
/**
* Logger of the local index exporter
*/
private final Logger logger = LogManager.getLogger();
private final Client client;
private final ClusterService clusterService;
private final String indexMapping;
private DateTimeFormatter indexPattern;
private int deleteAfter;
private final String id;
private static final int DEFAULT_NUMBER_OF_REPLICA = 1;
private static final int DEFAULT_NUMBER_OF_SHARDS = 1;
private static final List<String> DEFAULT_SORTED_FIELDS = List.of(
"measurements.latency.number",
"measurements.cpu.number",
"measurements.memory.number"
);
private static final List<String> DEFAULT_SORTED_ORDERS = List.of("desc", "desc", "desc");

/**
* Constructor of LocalIndexExporter
*
* @param client OS client
* @param clusterService cluster service
* @param indexPattern the pattern of index to export to
* @param indexMapping the index mapping file
* @param id id of the exporter
*/
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
public LocalIndexExporter(
final Client client,
final ClusterService clusterService,
final DateTimeFormatter indexPattern,
final String indexMapping,
final String id
) {
this.indexPattern = indexPattern;
this.client = client;
this.clusterService = clusterService;
this.indexMapping = indexMapping;
this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE;
this.id = id;
}

@Override
public String getId() {
return id;
}

/**
Expand All @@ -73,7 +107,7 @@ public DateTimeFormatter getIndexPattern() {
*
* @param indexPattern index pattern
*/
void setIndexPattern(DateTimeFormatter indexPattern) {
public void setIndexPattern(DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
}

Expand All @@ -89,28 +123,76 @@ public void export(final List<SearchQueryRecord> records) {
}
try {
final String indexName = buildLocalIndexName();
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
for (SearchQueryRecord record : records) {
bulkRequestBuilder.add(
new IndexRequest(indexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
if (!checkIndexExists(indexName)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);

createIndexRequest.settings(
Settings.builder()
.putList("index.sort.field", DEFAULT_SORTED_FIELDS)
.putList("index.sort.order", DEFAULT_SORTED_ORDERS)
.put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS)
.put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA)
);
createIndexRequest.mapping(readIndexMappings());

client.admin().indices().create(createIndexRequest, new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
if (createIndexResponse.isAcknowledged()) {
try {
bulk(indexName, records);
} catch (IOException e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
logger.error("Unable to index query insights data: ", e);
}
}
}

@Override
public void onFailure(Exception e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof ResourceAlreadyExistsException) {
try {
bulk(indexName, records);
} catch (IOException ex) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
logger.error("Unable to index query insights data: ", e);
}
} else {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
logger.error("Unable to create query insights index: ", e);
}
}
});
} else {
bulk(indexName, records);
}
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {}

@Override
public void onFailure(Exception e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_BULK_FAILURES);
logger.error("Failed to execute bulk operation for query insights data: ", e);
}
});
} catch (final Exception e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
logger.error("Unable to index query insights data: ", e);
}
}

private void bulk(final String indexName, final List<SearchQueryRecord> records) throws IOException {
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
for (SearchQueryRecord record : records) {
bulkRequestBuilder.add(
new IndexRequest(indexName).id(record.getId())
.source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {}

@Override
public void onFailure(Exception e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_BULK_FAILURES);
logger.error("Failed to execute bulk operation for query insights data: ", e);
}
});
}

/**
* Close the exporter sink
*/
Expand All @@ -125,7 +207,8 @@ public void close() {
* @return A string representing the index name in the format "top_queries-YYYY.MM.dd-01234".
*/
String buildLocalIndexName() {
return indexPattern.format(ZonedDateTime.now(ZoneOffset.UTC)) + "-" + generateLocalIndexDateHash();
ZonedDateTime currentTime = ZonedDateTime.now(ZoneOffset.UTC);
return indexPattern.format(currentTime) + "-" + generateLocalIndexDateHash(currentTime.toLocalDate());
}

/**
Expand All @@ -138,33 +221,58 @@ public void setDeleteAfter(final int deleteAfter) {
}

/**
* Delete Top N local indices older than the configured data retention period
* Get local index exporter data retention period
*
* @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
* @return the number of days after which Top N local indices should be deleted
*/
public void deleteExpiredTopNIndices(final Map<String, IndexMetadata> indexMetadataMap) {
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (isTopQueriesIndex(indexName) && entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
TopQueriesService.deleteSingleIndex(indexName, client);
}
}
public int getDeleteAfter() {
return deleteAfter;
}

/**
* Generates a consistent 5-digit numeric hash based on the current UTC date.
* The generated hash is deterministic, meaning it will return the same result for the same date.
* Deletes the specified index and logs any failure that occurs during the operation.
*
* @return A 5-digit numeric string representation of the current date's hash.
* @param indexName The name of the index to delete.
* @param client The OpenSearch client used to perform the deletion.
*/
public void deleteSingleIndex(String indexName, Client client) {
Logger logger = LogManager.getLogger();
client.admin().indices().delete(new DeleteIndexRequest(indexName), new ActionListener<>() {
@Override
// CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here
public void onResponse(org.opensearch.action.support.master.AcknowledgedResponse acknowledgedResponse) {}

@Override
public void onFailure(Exception e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IndexNotFoundException) {
return;
}
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_DELETE_FAILURES);
logger.error("Failed to delete index '{}': ", indexName, e);
}
});
}

/**
* check if index exists
* @return boolean
*/
public static String generateLocalIndexDateHash() {
// Get the current date in UTC (yyyy-MM-dd format)
String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT)
.format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate());
private boolean checkIndexExists(String indexName) {
ClusterState clusterState = clusterService.state();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be a corner scenario where the cluster state does not have up to date information? Are we handling this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, In theory the cluster state should be most up to date but there are scenarios when 2 threads are trying to create / delete the same index for the same time. If you look into the code I already handled with something like

Throwable cause = ExceptionsHelper.unwrapCause(e);
                        if (cause instanceof ResourceAlreadyExistsException) {

return clusterState.getRoutingTable().hasIndex(indexName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hope we don't need to handle any potential errors here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// Generate a 5-digit numeric hash from the date's hashCode
return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000);
/**
* get correlation rule index mappings
* @return mappings of correlation rule index
* @throws IOException IOException
*/
private String readIndexMappings() throws IOException {
return new String(
Objects.requireNonNull(LocalIndexExporter.class.getClassLoader().getResourceAsStream(indexMapping)).readAllBytes(),
Charset.defaultCharset()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ public interface QueryInsightsExporter extends Closeable {
* @param records list of {@link SearchQueryRecord}
*/
void export(final List<SearchQueryRecord> records);

String getId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: good to add some comments here since this is an interface and the intent may get lost over time

}
Loading
Loading