Skip to content

Commit

Permalink
Select require_alias for OS bulk inserts from ISM Policy (#3560)
Browse files Browse the repository at this point in the history
* Select require_alias for OS bulk inserts from ISM Policy

This change requires an alias when writing to an aliased
index. This avoids creation of an index without alias, when
a previous existing alias and index was deleted. It increases
robustness of DataPrepper's trace index against OS user
interactions.

Signed-off-by: Karsten Schnitter <[email protected]>

* 3342 Determine Alias Configuration from OS

During OS sink initialization it is determined from OS, whether the
configured index actually is an alias. If so, bulk request will require
the index to always be an alias. The response is cached to avoid
further requests. This also ensures, that the alias configuration is
kept in the initially intended state. After all, this change is about to
prevent an automatic index creation for a formerly existing alias.

Signed-off-by: Karsten Schnitter <[email protected]>

* Fix imports for checkstyle

Signed-off-by: Karsten Schnitter <[email protected]>

* Fix integration tests

The specific user used in some tests of OpenSerachSinkIT
needs get permissions on all aliases to test for their existence.
Another bug with determining the alias name is fixed as well.

As a final result, the DataPrepper OpenSearch user requires
write access to the indices and now additionally read access to
the aliases. This can be a change for self-managed indices.

Signed-off-by: Karsten Schnitter <[email protected]>

* Fix Bulk Requests for older OD versions

The `require_alias` parameter for bulk requests was only introduced
with ES 7.10. Since DataPrepper needs to be compatible down to 6.8,
the parameter should not be used in earlier OD versions. This change
will apply the parameter only when OpenSearch is detected as target.

Signed-off-by: Karsten Schnitter <[email protected]>

* Add Permission to get Cluster Info

For checking the OS version, the test user needs an
additional permission.

Signed-off-by: Karsten Schnitter <[email protected]>

---------

Signed-off-by: Karsten Schnitter <[email protected]>
  • Loading branch information
KarstenSchnitter authored Nov 21, 2023
1 parent 30d88f9 commit 915e84d
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ private void createRole(final String role, final String indexPattern, final Stri
final String createRoleJson = Strings.toString(
XContentFactory.jsonBuilder()
.startObject()
.array("cluster_permissions", "cluster:monitor/main")
.startArray("index_permissions")
.startObject()
.array("index_patterns", new String[]{indexPattern})
.array("allowed_actions", allowedActions)
.endObject()
.startObject()
.array("index_patterns", new String[]{"*"})
.array("allowed_actions", "indices:admin/aliases/get")
.endObject()
.endArray()
.endObject()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,18 @@ private void doInitializeInternal() throws IOException {
}
indexManager.setupIndex();

final Boolean requireAlias = indexManager.isIndexAlias(configuredIndexAlias);
final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression();
final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled();
if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) {
final int maxLocalCompressionsForEstimation = openSearchSinkConfig.getIndexConfiguration().getMaxLocalCompressionsForEstimation();
bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder(), bulkSize, maxLocalCompressionsForEstimation);
bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias), bulkSize, maxLocalCompressionsForEstimation);
} else if (isEstimateBulkSizeUsingCompression) {
LOG.warn("Estimate bulk request size using compression was enabled but request compression is disabled. " +
"Estimating bulk request size without compression.");
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias));
} else {
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias));
}

final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.opensearch.client.opensearch.cluster.GetClusterSettingsRequest;
import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -50,6 +52,8 @@ public abstract class AbstractIndexManager implements IndexManager {
protected IsmPolicyManagementStrategy ismPolicyManagementStrategy;
private final TemplateStrategy templateStrategy;
protected String indexPrefix;
private Boolean isIndexAlias;
private boolean isIndexAliasChecked;

private static final Logger LOG = LoggerFactory.getLogger(AbstractIndexManager.class);

Expand Down Expand Up @@ -112,6 +116,10 @@ public static String getIndexAliasWithDate(final String indexAlias) {
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix;
}

private void initalizeIsIndexAlias(final String indexAlias) {

}

private void initializeIndexPrefixAndSuffix(final String indexAlias){
final DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias);
if (dateFormatter != null) {
Expand Down Expand Up @@ -176,6 +184,26 @@ public static ZonedDateTime getCurrentUtcTime() {
return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID);
}

@Override
public Boolean isIndexAlias(final String dynamicIndexAlias) throws IOException {
if (isIndexAliasChecked == false) {
try {
// Try to get the OpenSearch version. This fails on older OpenDistro versions, that do not support
// `require_alias` as a bulk API parameter. All OpenSearch versions do, as this was introduced in
// ES 7.10.
openSearchClient.info();
ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(dynamicIndexAlias).build();
BooleanResponse response = openSearchClient.indices().existsAlias(request);
isIndexAlias = response.value() && checkISMEnabled();
} catch (RuntimeException ex) {
isIndexAlias = null;
} finally {
isIndexAliasChecked = true;
}
}
return isIndexAlias;
}

final boolean checkISMEnabled() throws IOException {
final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder()
.includeDefaults(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.IOException;

public interface IndexManager{

void setupIndex() throws IOException;
String getIndexName(final String indexAlias) throws IOException;
Boolean isIndexAlias(final String indexAlias) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse;
import org.opensearch.client.opensearch.cluster.OpenSearchClusterClient;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.opensearch.indices.GetTemplateRequest;
import org.opensearch.client.opensearch.indices.GetTemplateResponse;
Expand Down Expand Up @@ -313,6 +314,51 @@ void constructor_NullConfiguration() {
verify(indexConfiguration).getIsmPolicyFile();
}

@Test
void isIndexAlias_True() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true));
when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("true");
assertEquals(true, defaultIndexManager.isIndexAlias(INDEX_ALIAS));
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void isIndexAlias_False_NoAlias() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(false));
assertEquals(false, defaultIndexManager.isIndexAlias(INDEX_ALIAS));
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
}

@Test
void isIndexAlias_False_NoISM() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true));
when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("false");
assertEquals(false, defaultIndexManager.isIndexAlias(INDEX_ALIAS));
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void checkISMEnabled_True() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
Expand Down

0 comments on commit 915e84d

Please sign in to comment.