Skip to content

Commit

Permalink
3342 Determine Alias Configuration from OS
Browse files Browse the repository at this point in the history
During OS sink initialization it is determined from OS, whether the
configured index actually is an alias. If so, bulk request will require
the index to always be an alias. The response is cached to avoid
further requests. This also ensures, that the alias configuration is
kept in the initially intended state. After all, this change is about to
prevent an automatic index creation for a formerly existing alias.

Signed-off-by: Karsten Schnitter <[email protected]>
  • Loading branch information
KarstenSchnitter committed Oct 31, 2023
1 parent 211b775 commit b9e3c79
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private void doInitializeInternal() throws IOException {
}
indexManager.setupIndex();

boolean requireAlias = indexManager.checkISMEnabled();
boolean requireAlias = indexManager.isIndexAlias();
final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression();
final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled();
if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.client.opensearch.cluster.GetClusterSettingsRequest;
import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -48,6 +50,7 @@ public abstract class AbstractIndexManager implements IndexManager {
protected IsmPolicyManagementStrategy ismPolicyManagementStrategy;
private final TemplateStrategy templateStrategy;
protected String indexPrefix;
private Boolean isIndexAlias;

private static final Logger LOG = LoggerFactory.getLogger(AbstractIndexManager.class);

Expand Down Expand Up @@ -110,6 +113,10 @@ public static String getIndexAliasWithDate(final String indexAlias) {
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix;
}

private void initalizeIsIndexAlias(final String indexAlias) {

}

private void initializeIndexPrefixAndSuffix(final String indexAlias){
final DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias);
if (dateFormatter != null) {
Expand Down Expand Up @@ -174,7 +181,18 @@ public static ZonedDateTime getCurrentUtcTime() {
return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID);
}

public final boolean checkISMEnabled() throws IOException {
@Override
public boolean isIndexAlias() throws IOException {
if (isIndexAlias == null) {
String indexAlias = getIndexName(null);
ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(indexAlias).build();
BooleanResponse response = openSearchClient.indices().existsAlias(request);
isIndexAlias = response.value() && checkISMEnabled();
}
return isIndexAlias;
}

final boolean checkISMEnabled() throws IOException {
final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder()
.includeDefaults(true)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import java.io.IOException;

public interface IndexManager{

void setupIndex() throws IOException;
String getIndexName(final String indexAlias) throws IOException;
boolean checkISMEnabled() throws IOException;
boolean isIndexAlias() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
import org.opensearch.client.opensearch.cluster.GetClusterSettingsRequest;
import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse;
import org.opensearch.client.opensearch.cluster.OpenSearchClusterClient;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.opensearch.indices.GetTemplateRequest;
import org.opensearch.client.opensearch.indices.GetTemplateResponse;
import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient;
import org.opensearch.client.opensearch.indices.*;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;
Expand Down Expand Up @@ -310,6 +306,51 @@ void constructor_NullConfiguration() {
verify(indexConfiguration).getIsmPolicyFile();
}

@Test
void isIndexAlias_True() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true));
when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("true");
assertEquals(true, defaultIndexManager.isIndexAlias());
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void isIndexAlias_False_NoAlias() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(false));
assertEquals(false, defaultIndexManager.isIndexAlias());
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
}

@Test
void isIndexAlias_False_NoISM() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true));
when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("false");
assertEquals(false, defaultIndexManager.isIndexAlias());
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void checkISMEnabled_True() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
Expand Down

0 comments on commit b9e3c79

Please sign in to comment.