Skip to content

Commit

Permalink
fix unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Jan 24, 2025
1 parent 70daa15 commit 7f3d20d
Show file tree
Hide file tree
Showing 11 changed files with 460 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/**
* Debug exporter for development purpose
*/
public final class DebugExporter implements QueryInsightsExporter {
public class DebugExporter implements QueryInsightsExporter {
/**
* Logger of the debug exporter
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,29 @@

package org.opensearch.plugin.insights.core.exporter;

import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex;
import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -41,13 +39,12 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.service.TopQueriesService;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

/**
* Local index exporter for exporting query insights data to local OpenSearch indices.
*/
public final class LocalIndexExporter implements QueryInsightsExporter {
public class LocalIndexExporter implements QueryInsightsExporter {
/**
* Logger of the local index exporter
*/
Expand Down Expand Up @@ -110,7 +107,7 @@ public DateTimeFormatter getIndexPattern() {
*
* @param indexPattern index pattern
*/
void setIndexPattern(DateTimeFormatter indexPattern) {
public void setIndexPattern(DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
}

Expand Down Expand Up @@ -153,7 +150,8 @@ public void onResponse(CreateIndexResponse createIndexResponse) {

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof ResourceAlreadyExistsException) {
try {
bulk(indexName, records);
} catch (IOException ex) {
Expand Down Expand Up @@ -222,34 +220,37 @@ public void setDeleteAfter(final int deleteAfter) {
}

/**
* Delete Top N local indices older than the configured data retention period
* Get local index exporter data retention period
*
* @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
* @return the number of days after which Top N local indices should be deleted
*/
public void deleteExpiredTopNIndices(final Map<String, IndexMetadata> indexMetadataMap) {
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
TopQueriesService.deleteSingleIndex(indexName, client);
}
}
public int getDeleteAfter() {
return deleteAfter;
}

/**
* Generates a consistent 5-digit numeric hash based on the current UTC date.
* The generated hash is deterministic, meaning it will return the same result for the same date.
* Deletes the specified index and logs any failure that occurs during the operation.
*
* @return A 5-digit numeric string representation of the current date's hash.
* @param indexName The name of the index to delete.
* @param client The OpenSearch client used to perform the deletion.
*/
public static String generateLocalIndexDateHash() {
// Get the current date in UTC (yyyy-MM-dd format)
String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT)
.format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate());
public void deleteSingleIndex(String indexName, Client client) {
Logger logger = LogManager.getLogger();
client.admin().indices().delete(new DeleteIndexRequest(indexName), new ActionListener<>() {
@Override
// CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here
public void onResponse(org.opensearch.action.support.master.AcknowledgedResponse acknowledgedResponse) {}

// Generate a 5-digit numeric hash from the date's hashCode
return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000);
@Override
public void onFailure(Exception e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IndexNotFoundException) {
return;
}
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_DELETE_FAILURES);
logger.error("Failed to delete index '{}': ", indexName, e);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

package org.opensearch.plugin.insights.core.reader;

import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash;
import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID;
import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_READER_ID;
import static org.opensearch.plugin.insights.core.service.TopQueriesService.deleteSingleIndex;
import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
Expand Down Expand Up @@ -426,7 +425,7 @@ private void setExporterAndReader(final SinkType sinkType, final Map<String, Ind
// This method is invoked when sink type is changed
// Clear local indices if exporter is of type LocalIndexExporter
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
deleteAllTopNIndices(client, indexMetadataMap);
deleteAllTopNIndices(client, indexMetadataMap, (LocalIndexExporter) topQueriesExporter);
}

if (sinkType != null) {
Expand Down Expand Up @@ -564,13 +563,23 @@ public QueryInsightsHealthStats getHealthStats() {
/**
* Delete Top N local indices older than the configured data retention period
*/
private void deleteExpiredTopNIndices() {
void deleteExpiredTopNIndices() {
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID);
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
threadPool.executor(QUERY_INSIGHTS_EXECUTOR)
.execute(
() -> ((LocalIndexExporter) topQueriesExporter).deleteExpiredTopNIndices(clusterService.state().metadata().indices())
final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter;
threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> {
final Map<String, IndexMetadata> indexMetadataMap = clusterService.state().metadata().indices();
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(
((LocalIndexExporter) topQueriesExporter).getDeleteAfter()
);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
localIndexExporter.deleteSingleIndex(indexName, client);
}
}
});
}
}

Expand All @@ -579,11 +588,15 @@ private void deleteExpiredTopNIndices() {
*
* @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
*/
void deleteAllTopNIndices(final Client client, final Map<String, IndexMetadata> indexMetadataMap) {
void deleteAllTopNIndices(
final Client client,
final Map<String, IndexMetadata> indexMetadataMap,
final LocalIndexExporter localIndexExporter
) {
indexMetadataMap.entrySet()
.stream()
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
.forEach(entry -> deleteSingleIndex(entry.getKey(), client));
.forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
Expand Down Expand Up @@ -520,27 +518,6 @@ static void validateExporterDeleteAfter(final int deleteAfter) {
}
}

/**
* Deletes the specified index and logs any failure that occurs during the operation.
*
* @param indexName The name of the index to delete.
* @param client The OpenSearch client used to perform the deletion.
*/
public static void deleteSingleIndex(String indexName, Client client) {
Logger logger = LogManager.getLogger();
client.admin().indices().delete(new DeleteIndexRequest(indexName), new ActionListener<>() {
@Override
// CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here
public void onResponse(org.opensearch.action.support.master.AcknowledgedResponse acknowledgedResponse) {}

@Override
public void onFailure(Exception e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_DELETE_FAILURES);
logger.error("Failed to delete index '{}': ", indexName, e);
}
});
}

/**
* Validates if the input string is a Query Insights local index
* in the format "top_queries-YYYY.MM.dd-XXXXX", and has the expected index metadata.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.opensearch.plugin.insights.core.utils;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Locale;

/**
* Util functions for exporter and reader
*
*/
public class ExporterReaderUtils {

private ExporterReaderUtils() {}

/**
* Generates a consistent 5-digit numeric hash based on the current UTC date.
* The generated hash is deterministic, meaning it will return the same result for the same date.
*
* @return A 5-digit numeric string representation of the current date's hash.
*/
public static String generateLocalIndexDateHash() {
// Get the current date in UTC (yyyy-MM-dd format)
String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT)
.format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate());

// Generate a 5-digit numeric hash from the date's hashCode
return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Util functions
*/
package org.opensearch.plugin.insights.core.utils;
Loading

0 comments on commit 7f3d20d

Please sign in to comment.