Skip to content

Commit

Permalink
[Transform] Retry destination index creation (elastic#105759)
Browse files Browse the repository at this point in the history
For Unattended Transforms, if we fail to create the destination index on
the first run, we will retry the transformation iteration, but we will
not retry the destination index creation on that next iteration.

This change stops the Unattended Transform from progressing beyond the
0th checkpoint, so all retries will include the destination index
creation.

Fix elastic#105683
Relate elastic#104146
  • Loading branch information
prwhelan authored Feb 23, 2024
1 parent d37d93a commit f86532b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -418,16 +418,14 @@ public void testTransformPermissionsDeferUnattendedNoDest() throws Exception {
);
assertRed(transformId, authIssue);

startTransform(config.getId(), RequestOptions.DEFAULT);

// Give the transform indexer enough time to try creating destination index
Thread.sleep(5_000);
startTransform(transformId, RequestOptions.DEFAULT);

String destIndexIssue = Strings.format("Could not create destination index [%s] for transform [%s]", destIndexName, transformId);
// transform's auth state status is still RED due to:
// - lacking permissions
// - and the inability to create destination index in the indexer (which is also a consequence of lacking permissions)
assertRed(transformId, authIssue, destIndexIssue);
// wait for 10 seconds to give the transform indexer enough time to try creating destination index
assertBusy(() -> { assertRed(transformId, authIssue, destIndexIssue); });

// update transform's credentials so that the transform has permission to access source/dest indices
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build());
Expand Down Expand Up @@ -593,5 +591,7 @@ private void assertRed(String transformId, String... expectedHealthIssueDetails)
.map(issue -> (String) extractValue((Map<String, Object>) issue, "details"))
.collect(toSet());
assertThat("Stats were: " + stats, actualHealthIssueDetailsSet, containsInAnyOrder(expectedHealthIssueDetails));
// We should not progress beyond the 0th checkpoint until we correctly configure the Transform.
assertThat("Stats were: " + stats, getCheckpoint(stats), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,15 @@ protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception
}

protected void waitUntilCheckpoint(String id, long checkpoint, TimeValue waitTime) throws Exception {
assertBusy(
() -> assertEquals(
checkpoint,
((Integer) XContentMapValues.extractValue("checkpointing.last.checkpoint", getBasicTransformStats(id))).longValue()
),
waitTime.getMillis(),
TimeUnit.MILLISECONDS
);
assertBusy(() -> assertEquals(checkpoint, getCheckpoint(id)), waitTime.getMillis(), TimeUnit.MILLISECONDS);
}

protected long getCheckpoint(String id) throws IOException {
return getCheckpoint(getBasicTransformStats(id));
}

protected long getCheckpoint(Map<String, Object> stats) {
return ((Integer) XContentMapValues.extractValue("checkpointing.last.checkpoint", stats)).longValue();
}

protected DateHistogramGroupSource createDateHistogramGroupSourceWithFixedInterval(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
}
}, listener::onFailure);

var shouldMaybeCreateDestIndexForUnattended = context.getCheckpoint() == 0
&& Boolean.TRUE.equals(transformConfig.getSettings().getUnattended());

ActionListener<Map<String, String>> fieldMappingsListener = ActionListener.wrap(destIndexMappings -> {
if (destIndexMappings.isEmpty() == false) {
// If we managed to fetch destination index mappings, we use them from now on ...
Expand All @@ -344,9 +347,7 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
// Since the unattended transform could not have created the destination index yet, we do it here.
// This is important to create the destination index explicitly before indexing first documents. Otherwise, the destination
// index aliases may be missing.
if (destIndexMappings.isEmpty()
&& context.getCheckpoint() == 0
&& Boolean.TRUE.equals(transformConfig.getSettings().getUnattended())) {
if (destIndexMappings.isEmpty() && shouldMaybeCreateDestIndexForUnattended) {
doMaybeCreateDestIndex(deducedDestIndexMappings.get(), configurationReadyListener);
} else {
configurationReadyListener.onResponse(null);
Expand All @@ -364,7 +365,7 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
deducedDestIndexMappings.set(validationResponse.getDestIndexMappings());
if (isContinuous()) {
transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap(config -> {
if (transformConfig.equals(config) && fieldMappings != null) {
if (transformConfig.equals(config) && fieldMappings != null && shouldMaybeCreateDestIndexForUnattended == false) {
logger.trace("[{}] transform config has not changed.", getJobId());
configurationReadyListener.onResponse(null);
} else {
Expand Down

0 comments on commit f86532b

Please sign in to comment.