Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select require_alias for OS bulk inserts from ISM Policy #3560

Merged
merged 6 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ private void createRole(final String role, final String indexPattern, final Stri
final String createRoleJson = Strings.toString(
XContentFactory.jsonBuilder()
.startObject()
.array("cluster_permissions", "cluster:monitor/main")
.startArray("index_permissions")
.startObject()
.array("index_patterns", new String[]{indexPattern})
.array("allowed_actions", allowedActions)
.endObject()
.startObject()
.array("index_patterns", new String[]{"*"})
.array("allowed_actions", "indices:admin/aliases/get")
.endObject()
.endArray()
.endObject()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,18 @@ private void doInitializeInternal() throws IOException {
}
indexManager.setupIndex();

final Boolean requireAlias = indexManager.isIndexAlias(configuredIndexAlias);
final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression();
final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled();
if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) {
final int maxLocalCompressionsForEstimation = openSearchSinkConfig.getIndexConfiguration().getMaxLocalCompressionsForEstimation();
bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder(), bulkSize, maxLocalCompressionsForEstimation);
bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias), bulkSize, maxLocalCompressionsForEstimation);
} else if (isEstimateBulkSizeUsingCompression) {
LOG.warn("Estimate bulk request size using compression was enabled but request compression is disabled. " +
"Estimating bulk request size without compression.");
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any situations in which the require_alias would vary from document to document? If so, we'd want this on the individual operations. I'm currently not setting any such situations, but I may be missing one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://opensearch.org/docs/2.11/api-reference/document-apis/bulk/#url-parameters, require_alias is a URL parameter of the bulk request. As such it cannot be changed for each document but only for the entire request.

If you wanted this setting to be depending on the document, you would have to group documents with different settings into different bulk requests. In any case, you would need a configuration on event level, that carries the alias configuration. That could either be the event or the record metadata. Both can easily be accessed in the OpenSearchSink#doOutput to create different bulk requests. This would be a much larger change, also requiring a decision on how to set the alias configuration for the event.

} else {
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias));
}

final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.opensearch.client.opensearch.cluster.GetClusterSettingsRequest;
import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -50,6 +52,8 @@ public abstract class AbstractIndexManager implements IndexManager {
protected IsmPolicyManagementStrategy ismPolicyManagementStrategy;
private final TemplateStrategy templateStrategy;
protected String indexPrefix;
private Boolean isIndexAlias;
private boolean isIndexAliasChecked;

private static final Logger LOG = LoggerFactory.getLogger(AbstractIndexManager.class);

Expand Down Expand Up @@ -112,6 +116,10 @@ public static String getIndexAliasWithDate(final String indexAlias) {
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix;
}

private void initalizeIsIndexAlias(final String indexAlias) {

}

private void initializeIndexPrefixAndSuffix(final String indexAlias){
final DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias);
if (dateFormatter != null) {
Expand Down Expand Up @@ -176,6 +184,26 @@ public static ZonedDateTime getCurrentUtcTime() {
return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID);
}

@Override
public Boolean isIndexAlias(final String dynamicIndexAlias) throws IOException {
if (isIndexAliasChecked == false) {
try {
// Try to get the OpenSearch version. This fails on older OpenDistro versions, that do not support
// `require_alias` as a bulk API parameter. All OpenSearch versions do, as this was introduced in
// ES 7.10.
openSearchClient.info();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable from this object, you can get the OpenSearch version as a String. For this feature, it is enough to detect OpenSearch. For other use-cases, you might want to extend this detection, so that you can ask for a required minimum version. But this requires parsing the version string.

ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(dynamicIndexAlias).build();
BooleanResponse response = openSearchClient.indices().existsAlias(request);
isIndexAlias = response.value() && checkISMEnabled();
} catch (RuntimeException ex) {
isIndexAlias = null;
} finally {
isIndexAliasChecked = true;
}
}
return isIndexAlias;
}

final boolean checkISMEnabled() throws IOException {
final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder()
.includeDefaults(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.IOException;

public interface IndexManager{

void setupIndex() throws IOException;
String getIndexName(final String indexAlias) throws IOException;
Boolean isIndexAlias(final String indexAlias) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse;
import org.opensearch.client.opensearch.cluster.OpenSearchClusterClient;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.opensearch.indices.GetTemplateRequest;
import org.opensearch.client.opensearch.indices.GetTemplateResponse;
Expand Down Expand Up @@ -313,6 +314,51 @@ void constructor_NullConfiguration() {
verify(indexConfiguration).getIsmPolicyFile();
}

@Test
void isIndexAlias_True() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true));
when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("true");
assertEquals(true, defaultIndexManager.isIndexAlias(INDEX_ALIAS));
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void isIndexAlias_False_NoAlias() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(false));
assertEquals(false, defaultIndexManager.isIndexAlias(INDEX_ALIAS));
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
}

@Test
void isIndexAlias_False_NoISM() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true));
when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("false");
assertEquals(false, defaultIndexManager.isIndexAlias(INDEX_ALIAS));
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void checkISMEnabled_True() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
Expand Down
Loading