From b3ed247a96d4643fee7acac1924fb1f0d90cdd50 Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Mon, 30 Oct 2023 11:03:04 +0100 Subject: [PATCH 1/6] Select require_alias for OS bulk inserts from ISM Policy This change requires an alias when writing to an aliased index. This avoids creation of an index without alias, when a previous existing alias and index was deleted. It increases robustness of DataPrepper's trace index against OS user interactions. Signed-off-by: Karsten Schnitter --- .../plugins/sink/opensearch/OpenSearchSink.java | 7 ++++--- .../sink/opensearch/index/AbstractIndexManager.java | 2 +- .../plugins/sink/opensearch/index/IndexManager.java | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 8820de4afe..ccb270db88 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -217,17 +217,18 @@ private void doInitializeInternal() throws IOException { } indexManager.setupIndex(); + boolean requireAlias = indexManager.checkISMEnabled(); final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression(); final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled(); if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) { final int maxLocalCompressionsForEstimation = openSearchSinkConfig.getIndexConfiguration().getMaxLocalCompressionsForEstimation(); - bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder(), bulkSize, maxLocalCompressionsForEstimation); + bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias), bulkSize, maxLocalCompressionsForEstimation); } else if (isEstimateBulkSizeUsingCompression) { LOG.warn("Estimate bulk request size using compression was enabled but request compression is disabled. " + "Estimating bulk request size without compression."); - bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias)); } else { - bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); + bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias)); } final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries(); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java index 2aca8b0275..07f66fde52 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java @@ -176,7 +176,7 @@ public static ZonedDateTime getCurrentUtcTime() { return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); } - final boolean checkISMEnabled() throws IOException { + public final boolean checkISMEnabled() throws IOException { final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder() .includeDefaults(true) .build(); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java index ceb1c829a6..94768184d0 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java @@ -5,4 +5,5 @@ public interface IndexManager{ void setupIndex() throws IOException; String getIndexName(final String indexAlias) throws IOException; + boolean checkISMEnabled() throws IOException; } From d4a2f6aa42c3f22cf01d4b43bae0b6d2e1a28966 Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Tue, 31 Oct 2023 14:30:24 +0100 Subject: [PATCH 2/6] 3342 Determine Alias Configuration from OS During OS sink initialization it is determined from OS, whether the configured index actually is an alias. If so, bulk request will require the index to always be an alias. The response is cached to avoid further requests. This also ensures, that the alias configuration is kept in the initially intended state. After all, this change is about to prevent an automatic index creation for a formerly existing alias. Signed-off-by: Karsten Schnitter --- .../sink/opensearch/OpenSearchSink.java | 2 +- .../index/AbstractIndexManager.java | 20 +++++++- .../sink/opensearch/index/IndexManager.java | 3 +- .../index/DefaultIndexManagerTests.java | 51 +++++++++++++++++-- 4 files changed, 68 insertions(+), 8 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index ccb270db88..18ff42a4c8 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -217,7 +217,7 @@ private void doInitializeInternal() throws IOException { } indexManager.setupIndex(); - boolean requireAlias = indexManager.checkISMEnabled(); + boolean requireAlias = indexManager.isIndexAlias(); final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression(); final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled(); if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java index 07f66fde52..038c5a6d8e 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java @@ -14,6 +14,8 @@ import org.opensearch.client.opensearch.cluster.GetClusterSettingsRequest; import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse; import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.opensearch.indices.ExistsAliasRequest; +import org.opensearch.client.transport.endpoints.BooleanResponse; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration; import org.slf4j.Logger; @@ -50,6 +52,7 @@ public abstract class AbstractIndexManager implements IndexManager { protected IsmPolicyManagementStrategy ismPolicyManagementStrategy; private final TemplateStrategy templateStrategy; protected String indexPrefix; + private Boolean isIndexAlias; private static final Logger LOG = LoggerFactory.getLogger(AbstractIndexManager.class); @@ -112,6 +115,10 @@ public static String getIndexAliasWithDate(final String indexAlias) { return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix; } + private void initalizeIsIndexAlias(final String indexAlias) { + + } + private void initializeIndexPrefixAndSuffix(final String indexAlias){ final DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); if (dateFormatter != null) { @@ -176,7 +183,18 @@ public static ZonedDateTime getCurrentUtcTime() { return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); } - public final boolean checkISMEnabled() throws IOException { + @Override + public boolean isIndexAlias() throws IOException { + if (isIndexAlias == null) { + String indexAlias = getIndexName(null); + ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(indexAlias).build(); + BooleanResponse response = openSearchClient.indices().existsAlias(request); + isIndexAlias = response.value() && checkISMEnabled(); + } + return isIndexAlias; + } + + final boolean checkISMEnabled() throws IOException { final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder() .includeDefaults(true) .build(); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java index 94768184d0..b08428a659 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java @@ -3,7 +3,8 @@ import java.io.IOException; public interface IndexManager{ + void setupIndex() throws IOException; String getIndexName(final String indexAlias) throws IOException; - boolean checkISMEnabled() throws IOException; + boolean isIndexAlias() throws IOException; } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java index 6522a93e98..9a5cbf9180 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java @@ -21,11 +21,7 @@ import org.opensearch.client.opensearch.cluster.GetClusterSettingsRequest; import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse; import org.opensearch.client.opensearch.cluster.OpenSearchClusterClient; -import org.opensearch.client.opensearch.indices.CreateIndexRequest; -import org.opensearch.client.opensearch.indices.ExistsRequest; -import org.opensearch.client.opensearch.indices.GetTemplateRequest; -import org.opensearch.client.opensearch.indices.GetTemplateResponse; -import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient; +import org.opensearch.client.opensearch.indices.*; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.endpoints.BooleanResponse; import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration; @@ -313,6 +309,51 @@ void constructor_NullConfiguration() { verify(indexConfiguration).getIsmPolicyFile(); } + @Test + void isIndexAlias_True() throws IOException { + defaultIndexManager = indexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy); + when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true)); + when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("true"); + assertEquals(true, defaultIndexManager.isIndexAlias()); + verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration(); + verify(indexConfiguration).getIsmPolicyFile(); + verify(indexConfiguration).getIndexAlias(); + verify(openSearchClient).indices(); + verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class)); + verify(openSearchClient).cluster(); + verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class)); + } + + @Test + void isIndexAlias_False_NoAlias() throws IOException { + defaultIndexManager = indexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy); + when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(false)); + assertEquals(false, defaultIndexManager.isIndexAlias()); + verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration(); + verify(indexConfiguration).getIsmPolicyFile(); + verify(indexConfiguration).getIndexAlias(); + verify(openSearchClient).indices(); + verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class)); + } + + @Test + void isIndexAlias_False_NoISM() throws IOException { + defaultIndexManager = indexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy); + when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true)); + when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("false"); + assertEquals(false, defaultIndexManager.isIndexAlias()); + verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration(); + verify(indexConfiguration).getIsmPolicyFile(); + verify(indexConfiguration).getIndexAlias(); + verify(openSearchClient).indices(); + verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class)); + verify(openSearchClient).cluster(); + verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class)); + } + @Test void checkISMEnabled_True() throws IOException { defaultIndexManager = indexManagerFactory.getIndexManager( From 05fcfe3200581c970dd3003a2c2acd7c9bef5685 Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Thu, 2 Nov 2023 08:23:07 +0100 Subject: [PATCH 3/6] Fix imports for checkstyle Signed-off-by: Karsten Schnitter --- .../sink/opensearch/index/DefaultIndexManagerTests.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java index 9a5cbf9180..410af74afe 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java @@ -21,7 +21,12 @@ import org.opensearch.client.opensearch.cluster.GetClusterSettingsRequest; import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse; import org.opensearch.client.opensearch.cluster.OpenSearchClusterClient; -import org.opensearch.client.opensearch.indices.*; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.opensearch.indices.ExistsAliasRequest; +import org.opensearch.client.opensearch.indices.ExistsRequest; +import org.opensearch.client.opensearch.indices.GetTemplateRequest; +import org.opensearch.client.opensearch.indices.GetTemplateResponse; +import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.endpoints.BooleanResponse; import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration; From 7bba08059c2360e902b16fb35130f329274b0495 Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Tue, 7 Nov 2023 13:27:52 +0100 Subject: [PATCH 4/6] Fix integration tests The specific user used in some tests of OpenSerachSinkIT needs get permissions on all aliases to test for their existence. Another bug with determining the alias name is fixed as well. As a final result, the DataPrepper OpenSearch user requires write access to the indices and now additionally read access to the aliases. This can be a change for self-managed indices. Signed-off-by: Karsten Schnitter --- .../plugins/sink/opensearch/OpenSearchSecurityAccessor.java | 4 ++++ .../dataprepper/plugins/sink/opensearch/OpenSearchSink.java | 2 +- .../plugins/sink/opensearch/index/AbstractIndexManager.java | 5 ++--- .../plugins/sink/opensearch/index/IndexManager.java | 2 +- .../sink/opensearch/index/DefaultIndexManagerTests.java | 6 +++--- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSecurityAccessor.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSecurityAccessor.java index 0e93d94d09..2f5cd9e682 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSecurityAccessor.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSecurityAccessor.java @@ -37,6 +37,10 @@ private void createRole(final String role, final String indexPattern, final Stri .array("index_patterns", new String[]{indexPattern}) .array("allowed_actions", allowedActions) .endObject() + .startObject() + .array("index_patterns", new String[]{"*"}) + .array("allowed_actions", "indices:admin/aliases/get") + .endObject() .endArray() .endObject() ); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 18ff42a4c8..bd43d95b84 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -217,7 +217,7 @@ private void doInitializeInternal() throws IOException { } indexManager.setupIndex(); - boolean requireAlias = indexManager.isIndexAlias(); + boolean requireAlias = indexManager.isIndexAlias(configuredIndexAlias); final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression(); final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled(); if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java index 038c5a6d8e..c2f5d2ff64 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java @@ -184,10 +184,9 @@ public static ZonedDateTime getCurrentUtcTime() { } @Override - public boolean isIndexAlias() throws IOException { + public boolean isIndexAlias(final String dynamicIndexAlias) throws IOException { if (isIndexAlias == null) { - String indexAlias = getIndexName(null); - ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(indexAlias).build(); + ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(dynamicIndexAlias).build(); BooleanResponse response = openSearchClient.indices().existsAlias(request); isIndexAlias = response.value() && checkISMEnabled(); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java index b08428a659..030474db55 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java @@ -6,5 +6,5 @@ public interface IndexManager{ void setupIndex() throws IOException; String getIndexName(final String indexAlias) throws IOException; - boolean isIndexAlias() throws IOException; + boolean isIndexAlias(final String indexAlias) throws IOException; } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java index 410af74afe..d18c7cbb36 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DefaultIndexManagerTests.java @@ -320,7 +320,7 @@ void isIndexAlias_True() throws IOException { IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy); when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true)); when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("true"); - assertEquals(true, defaultIndexManager.isIndexAlias()); + assertEquals(true, defaultIndexManager.isIndexAlias(INDEX_ALIAS)); verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration(); verify(indexConfiguration).getIsmPolicyFile(); verify(indexConfiguration).getIndexAlias(); @@ -335,7 +335,7 @@ void isIndexAlias_False_NoAlias() throws IOException { defaultIndexManager = indexManagerFactory.getIndexManager( IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy); when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(false)); - assertEquals(false, defaultIndexManager.isIndexAlias()); + assertEquals(false, defaultIndexManager.isIndexAlias(INDEX_ALIAS)); verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration(); verify(indexConfiguration).getIsmPolicyFile(); verify(indexConfiguration).getIndexAlias(); @@ -349,7 +349,7 @@ void isIndexAlias_False_NoISM() throws IOException { IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy); when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true)); when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("false"); - assertEquals(false, defaultIndexManager.isIndexAlias()); + assertEquals(false, defaultIndexManager.isIndexAlias(INDEX_ALIAS)); verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration(); verify(indexConfiguration).getIsmPolicyFile(); verify(indexConfiguration).getIndexAlias(); From 03fad8bcbcbc4f0b57adb1e9287b7d662645e4c0 Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Tue, 7 Nov 2023 15:41:11 +0100 Subject: [PATCH 5/6] Fix Bulk Requests for older OD versions The `require_alias` parameter for bulk requests was only introduced with ES 7.10. Since DataPrepper needs to be compatible down to 6.8, the parameter should not be used in earlier OD versions. This change will apply the parameter only when OpenSearch is detected as target. Signed-off-by: Karsten Schnitter --- .../sink/opensearch/OpenSearchSink.java | 2 +- .../index/AbstractIndexManager.java | 21 ++++++++++++++----- .../sink/opensearch/index/IndexManager.java | 2 +- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index bd43d95b84..e7f12313c3 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -217,7 +217,7 @@ private void doInitializeInternal() throws IOException { } indexManager.setupIndex(); - boolean requireAlias = indexManager.isIndexAlias(configuredIndexAlias); + final Boolean requireAlias = indexManager.isIndexAlias(configuredIndexAlias); final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression(); final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled(); if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java index c2f5d2ff64..86974bec13 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/AbstractIndexManager.java @@ -53,6 +53,7 @@ public abstract class AbstractIndexManager implements IndexManager { private final TemplateStrategy templateStrategy; protected String indexPrefix; private Boolean isIndexAlias; + private boolean isIndexAliasChecked; private static final Logger LOG = LoggerFactory.getLogger(AbstractIndexManager.class); @@ -184,11 +185,21 @@ public static ZonedDateTime getCurrentUtcTime() { } @Override - public boolean isIndexAlias(final String dynamicIndexAlias) throws IOException { - if (isIndexAlias == null) { - ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(dynamicIndexAlias).build(); - BooleanResponse response = openSearchClient.indices().existsAlias(request); - isIndexAlias = response.value() && checkISMEnabled(); + public Boolean isIndexAlias(final String dynamicIndexAlias) throws IOException { + if (isIndexAliasChecked == false) { + try { + // Try to get the OpenSearch version. This fails on older OpenDistro versions, that do not support + // `require_alias` as a bulk API parameter. All OpenSearch versions do, as this was introduced in + // ES 7.10. + openSearchClient.info(); + ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(dynamicIndexAlias).build(); + BooleanResponse response = openSearchClient.indices().existsAlias(request); + isIndexAlias = response.value() && checkISMEnabled(); + } catch (RuntimeException ex) { + isIndexAlias = null; + } finally { + isIndexAliasChecked = true; + } } return isIndexAlias; } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java index 030474db55..1e271b0e14 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManager.java @@ -6,5 +6,5 @@ public interface IndexManager{ void setupIndex() throws IOException; String getIndexName(final String indexAlias) throws IOException; - boolean isIndexAlias(final String indexAlias) throws IOException; + Boolean isIndexAlias(final String indexAlias) throws IOException; } From 63d6bbf536a3bf6b09610bd55a48a6f96260a3ad Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Tue, 7 Nov 2023 16:25:46 +0100 Subject: [PATCH 6/6] Add Permission to get Cluster Info For checking the OS version, the test user needs an additional permission. Signed-off-by: Karsten Schnitter --- .../plugins/sink/opensearch/OpenSearchSecurityAccessor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSecurityAccessor.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSecurityAccessor.java index 2f5cd9e682..1a8c6977a9 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSecurityAccessor.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSecurityAccessor.java @@ -32,6 +32,7 @@ private void createRole(final String role, final String indexPattern, final Stri final String createRoleJson = Strings.toString( XContentFactory.jsonBuilder() .startObject() + .array("cluster_permissions", "cluster:monitor/main") .startArray("index_permissions") .startObject() .array("index_patterns", new String[]{indexPattern})