Skip to content

Commit

Permalink
Ignore closed indices for reindex (elastic#120244) (elastic#120562)
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke authored Jan 21, 2025
1 parent 00dbf11 commit 43df7d7
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 50 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/120244.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120244
summary: Ignore closed indices for reindex
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public static Predicate<Index> getReindexRequiredPredicate(Metadata metadata) {
public static boolean reindexRequired(IndexMetadata indexMetadata) {
return creationVersionBeforeMinimumWritableVersion(indexMetadata)
&& isNotSearchableSnapshot(indexMetadata)
&& isNotClosed(indexMetadata)
&& isNotVerifiedReadOnly(indexMetadata);
}

Expand All @@ -52,4 +53,8 @@ private static boolean creationVersionBeforeMinimumWritableVersion(IndexMetadata
return metadata.getCreationVersion().before(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
}

private static boolean isNotClosed(IndexMetadata indexMetadata) {
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE) == false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,56 +41,79 @@ public void testOldIndicesCheck() {
int oldIndexCount = randomIntBetween(1, 100);
int newIndexCount = randomIntBetween(1, 100);

List<Index> allIndices = new ArrayList<>();
Map<String, IndexMetadata> nameToIndexMetadata = new HashMap<>();
Set<String> expectedIndices = new HashSet<>();

for (int i = 0; i < oldIndexCount; i++) {
Settings.Builder settings = settings(IndexVersion.fromId(7170099));
DataStream dataStream = createTestDataStream(oldIndexCount, 0, newIndexCount, 0, nameToIndexMetadata, expectedIndices);

String indexName = "old-data-stream-index-" + i;
if (expectedIndices.isEmpty() == false && randomIntBetween(0, 2) == 0) {
settings.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE);
} else {
expectedIndices.add(indexName);
}
Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build();

Settings.Builder settingsBuilder = settings;
IndexMetadata oldIndexMetadata = IndexMetadata.builder(indexName)
.settings(settingsBuilder)
.numberOfShards(1)
.numberOfReplicas(0)
.build();
allIndices.add(oldIndexMetadata.getIndex());
nameToIndexMetadata.put(oldIndexMetadata.getIndex().getName(), oldIndexMetadata);
}
DeprecationIssue expected = new DeprecationIssue(
DeprecationIssue.Level.CRITICAL,
"Old data stream with a compatibility version < 8.0",
"https://www.elastic.co/guide/en/elasticsearch/reference/current/migrating-8.0.html#breaking-changes-8.0",
"This data stream has backing indices that were created before Elasticsearch 8.0.0",
false,
ofEntries(
entry("reindex_required", true),
entry("total_backing_indices", oldIndexCount + newIndexCount),
entry("indices_requiring_upgrade_count", expectedIndices.size()),
entry("indices_requiring_upgrade", expectedIndices)
)
);

for (int i = 0; i < newIndexCount; i++) {
Settings.Builder settingsBuilder = settings(IndexVersion.current());
IndexMetadata newIndexMetadata = IndexMetadata.builder("new-data-stream-index-" + i)
.settings(settingsBuilder)
.numberOfShards(1)
.numberOfReplicas(0)
.build();
allIndices.add(newIndexMetadata.getIndex());
nameToIndexMetadata.put(newIndexMetadata.getIndex().getName(), newIndexMetadata);
}
List<DeprecationIssue> issues = DeprecationChecks.filterChecks(DATA_STREAM_CHECKS, c -> c.apply(dataStream, clusterState));

DataStream dataStream = new DataStream(
randomAlphaOfLength(10),
allIndices,
randomNonNegativeLong(),
Map.of(),
randomBoolean(),
false,
false,
randomBoolean(),
randomFrom(IndexMode.values()),
null,
randomFrom(DataStreamOptions.EMPTY, DataStreamOptions.FAILURE_STORE_DISABLED, DataStreamOptions.FAILURE_STORE_ENABLED, null),
List.of(),
randomBoolean(),
null
assertThat(issues, equalTo(singletonList(expected)));
}

public void testOldIndicesCheckWithOnlyClosedOrNewIndices() {
// This tests what happens when any old indices that we have are closed. We expect no deprecation warning.
int oldClosedIndexCount = randomIntBetween(1, 100);
int newOpenIndexCount = randomIntBetween(0, 100);
int newClosedIndexCount = randomIntBetween(0, 100);

Map<String, IndexMetadata> nameToIndexMetadata = new HashMap<>();
Set<String> expectedIndices = new HashSet<>();

DataStream dataStream = createTestDataStream(
0,
oldClosedIndexCount,
newOpenIndexCount,
newClosedIndexCount,
nameToIndexMetadata,
expectedIndices
);

Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build();

List<DeprecationIssue> issues = DeprecationChecks.filterChecks(DATA_STREAM_CHECKS, c -> c.apply(dataStream, clusterState));

assertThat(issues.size(), equalTo(0));
}

public void testOldIndicesCheckWithClosedAndOpenIndices() {
/*
* This tests what happens when a data stream has old indices, and some are open and some are closed. We expect a deprecation
* warning that includes information about the old ones only.
*/
int oldOpenIndexCount = randomIntBetween(1, 100);
int oldClosedIndexCount = randomIntBetween(1, 100);
int newOpenIndexCount = randomIntBetween(0, 100);
int newClosedIndexCount = randomIntBetween(0, 100);

Map<String, IndexMetadata> nameToIndexMetadata = new HashMap<>();
Set<String> expectedIndices = new HashSet<>();

DataStream dataStream = createTestDataStream(
oldOpenIndexCount,
oldClosedIndexCount,
newOpenIndexCount,
newClosedIndexCount,
nameToIndexMetadata,
expectedIndices
);

Metadata metadata = Metadata.builder().indices(nameToIndexMetadata).build();
Expand All @@ -104,7 +127,7 @@ public void testOldIndicesCheck() {
false,
ofEntries(
entry("reindex_required", true),
entry("total_backing_indices", oldIndexCount + newIndexCount),
entry("total_backing_indices", oldOpenIndexCount + oldClosedIndexCount + newOpenIndexCount + newClosedIndexCount),
entry("indices_requiring_upgrade_count", expectedIndices.size()),
entry("indices_requiring_upgrade", expectedIndices)
)
Expand All @@ -115,4 +138,90 @@ public void testOldIndicesCheck() {
assertThat(issues, equalTo(singletonList(expected)));
}

/*
* This creates a test DataStream with the given counts. The nameToIndexMetadata Map and the expectedIndices Set are mutable collections
* that will be populated by this method.
*/
private DataStream createTestDataStream(
int oldOpenIndexCount,
int oldClosedIndexCount,
int newOpenIndexCount,
int newClosedIndexCount,
Map<String, IndexMetadata> nameToIndexMetadata,
Set<String> expectedIndices
) {
List<Index> allIndices = new ArrayList<>();

for (int i = 0; i < oldOpenIndexCount; i++) {
allIndices.add(createOldIndex(i, false, nameToIndexMetadata, expectedIndices));
}
for (int i = 0; i < oldClosedIndexCount; i++) {
allIndices.add(createOldIndex(i, true, nameToIndexMetadata, null));
}
for (int i = 0; i < newOpenIndexCount; i++) {
allIndices.add(createNewIndex(i, false, nameToIndexMetadata));
}
for (int i = 0; i < newClosedIndexCount; i++) {
allIndices.add(createNewIndex(i, true, nameToIndexMetadata));
}

DataStream dataStream = new DataStream(
randomAlphaOfLength(10),
allIndices,
randomNonNegativeLong(),
Map.of(),
randomBoolean(),
false,
false,
randomBoolean(),
randomFrom(IndexMode.values()),
null,
randomFrom(DataStreamOptions.EMPTY, DataStreamOptions.FAILURE_STORE_DISABLED, DataStreamOptions.FAILURE_STORE_ENABLED, null),
List.of(),
randomBoolean(),
null
);
return dataStream;
}

private Index createOldIndex(
int suffix,
boolean isClosed,
Map<String, IndexMetadata> nameToIndexMetadata,
Set<String> expectedIndices
) {
return createIndex(true, suffix, isClosed, nameToIndexMetadata, expectedIndices);
}

private Index createNewIndex(int suffix, boolean isClosed, Map<String, IndexMetadata> nameToIndexMetadata) {
return createIndex(false, suffix, isClosed, nameToIndexMetadata, null);
}

private Index createIndex(
boolean isOld,
int suffix,
boolean isClosed,
Map<String, IndexMetadata> nameToIndexMetadata,
Set<String> expectedIndices
) {
Settings.Builder settingsBuilder = isOld ? settings(IndexVersion.fromId(7170099)) : settings(IndexVersion.current());
String indexName = (isOld ? "old-" : "new-") + (isClosed ? "closed-" : "") + "data-stream-index-" + suffix;
if (isOld && isClosed == false) { // we only expect warnings on open old indices
if (expectedIndices.isEmpty() == false && randomIntBetween(0, 2) == 0) {
settingsBuilder.put(INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE);
} else {
expectedIndices.add(indexName);
}
}
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(settingsBuilder)
.numberOfShards(1)
.numberOfReplicas(0);
if (isClosed) {
indexMetadataBuilder.state(IndexMetadata.State.CLOSE);
}
IndexMetadata indexMetadata = indexMetadataBuilder.build();
nameToIndexMetadata.put(indexMetadata.getIndex().getName(), indexMetadata);
return indexMetadata.getIndex();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ public void testOldIndicesCheckSnapshotIgnored() {
assertThat(issues, empty());
}

public void testOldIndicesCheckClosedIgnored() {
IndexVersion createdWith = IndexVersion.fromId(7170099);
Settings.Builder settings = settings(createdWith);
IndexMetadata indexMetadata = IndexMetadata.builder("test")
.settings(settings)
.numberOfShards(1)
.numberOfReplicas(0)
.state(IndexMetadata.State.CLOSE)
.build();
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder().put(indexMetadata, true))
.build();
List<DeprecationIssue> issues = DeprecationChecks.filterChecks(INDEX_SETTINGS_CHECKS, c -> c.apply(indexMetadata, clusterState));
assertThat(issues, empty());
}

public void testTranslogRetentionSettings() {
Settings.Builder settings = settings(IndexVersion.current());
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -247,16 +248,27 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
assertOK(client().performRequest(putIndexTemplateRequest));
bulkLoadData(dataStreamName);
for (int i = 0; i < numRollovers; i++) {
rollover(dataStreamName);
String oldIndexName = rollover(dataStreamName);
if (randomBoolean()) {
closeIndex(oldIndexName);
}
bulkLoadData(dataStreamName);
}
}

private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
Set<String> indicesNeedingUpgrade = getDataStreamIndices(dataStreamName);
Set<String> closedOldIndices = getClosedIndices(dataStreamName);
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
rollover(dataStreamName);
String oldIndexName = rollover(dataStreamName);
if (randomBoolean()) {
if (i == 0) {
// Since this is the first rollover on the new cluster, the old index came from the old cluster
closedOldIndices.add(oldIndexName);
}
closeIndex(oldIndexName);
}
}
Request reindexRequest = new Request("POST", "/_migration/reindex");
reindexRequest.setJsonEntity(Strings.format("""
Expand Down Expand Up @@ -299,12 +311,14 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
*/
assertThat(
statusResponseMap.get("total_indices_requiring_upgrade"),
equalTo(originalWriteIndex + numRolloversOnOldCluster)
equalTo(originalWriteIndex + numRolloversOnOldCluster - closedOldIndices.size())
);
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1 - closedOldIndices.size()));
// We expect all the original indices to have been deleted
for (String oldIndex : indicesNeedingUpgrade) {
assertThat(indexExists(oldIndex), equalTo(false));
if (closedOldIndices.contains(oldIndex) == false) {
assertThat(indexExists(oldIndex), equalTo(false));
}
}
assertThat(getDataStreamIndices(dataStreamName).size(), equalTo(expectedTotalIndicesInDataStream));
}
Expand All @@ -324,6 +338,29 @@ private Set<String> getDataStreamIndices(String dataStreamName) throws IOExcepti
return indices.stream().map(index -> index.get("index_name").toString()).collect(Collectors.toSet());
}

@SuppressWarnings("unchecked")
private Set<String> getClosedIndices(String dataStreamName) throws IOException {
Set<String> allIndices = getDataStreamIndices(dataStreamName);
Set<String> closedIndices = new HashSet<>();
Response response = client().performRequest(new Request("GET", "_cluster/state/blocks/indices"));
Map<String, Object> responseMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false);
Map<String, Object> blocks = (Map<String, Object>) responseMap.get("blocks");
Map<String, Object> indices = (Map<String, Object>) blocks.get("indices");
for (Map.Entry<String, Object> indexEntry : indices.entrySet()) {
String indexName = indexEntry.getKey();
if (allIndices.contains(indexName)) {
Map<String, Object> blocksForIndex = (Map<String, Object>) indexEntry.getValue();
for (Map.Entry<String, Object> blockEntry : blocksForIndex.entrySet()) {
Map<String, String> block = (Map<String, String>) blockEntry.getValue();
if ("index closed".equals(block.get("description"))) {
closedIndices.add(indexName);
}
}
}
}
return closedIndices;
}

/*
* Similar to isOriginalClusterCurrent, but returns true if the major versions of the clusters are the same. So true
* for 8.6 and 8.17, but false for 7.17 and 8.18.
Expand Down Expand Up @@ -365,9 +402,11 @@ static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

private static void rollover(String dataStreamName) throws IOException {
private static String rollover(String dataStreamName) throws IOException {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
Response rolloverResponse = client().performRequest(rolloverRequest);
assertOK(rolloverResponse);
String oldIndexName = (String) entityAsMap(rolloverResponse).get("old_index");
return oldIndexName;
}
}

0 comments on commit 43df7d7

Please sign in to comment.