Skip to content

Commit

Permalink
feat(restoreIndices): update restore indices args and docs (#12529)
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien authored Feb 3, 2025
1 parent 64aaaf1 commit 6f0d475
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class RestoreIndices implements Upgrade {
public static final String URN_BASED_PAGINATION_ARG_NAME = "urnBasedPagination";

public static final String STARTING_OFFSET_ARG_NAME = "startingOffset";
public static final String LAST_URN_ARG_NAME = "lastUrn";
public static final String LAST_ASPECT_ARG_NAME = "lastAspect";
public static final String GE_PIT_EPOCH_MS_ARG_NAME = "gePitEpochMs";
public static final String LE_PIT_EPOCH_MS_ARG_NAME = "lePitEpochMs";
public static final String ASPECT_NAMES_ARG_NAME = "aspectNames";

private final List<UpgradeStep> _steps;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.ebean.Database;
import io.ebean.ExpressionList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -123,6 +124,30 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
} else {
context.report().addLine("No urnLike arg present");
}
if (containsKey(context.parsedArgs(), RestoreIndices.LE_PIT_EPOCH_MS_ARG_NAME)) {
result.lePitEpochMs =
Long.parseLong(context.parsedArgs().get(RestoreIndices.LE_PIT_EPOCH_MS_ARG_NAME).get());
context.report().addLine(String.format("lePitEpochMs is %s", result.lePitEpochMs));
}
if (containsKey(context.parsedArgs(), RestoreIndices.GE_PIT_EPOCH_MS_ARG_NAME)) {
result.gePitEpochMs =
Long.parseLong(context.parsedArgs().get(RestoreIndices.GE_PIT_EPOCH_MS_ARG_NAME).get());
context.report().addLine(String.format("gePitEpochMs is %s", result.gePitEpochMs));
}
if (containsKey(context.parsedArgs(), RestoreIndices.LAST_URN_ARG_NAME)) {
result.lastUrn = context.parsedArgs().get(RestoreIndices.LAST_URN_ARG_NAME).get();
context.report().addLine(String.format("lastUrn is %s", result.lastUrn));
}
if (containsKey(context.parsedArgs(), RestoreIndices.LAST_ASPECT_ARG_NAME)) {
result.lastAspect = context.parsedArgs().get(RestoreIndices.LAST_ASPECT_ARG_NAME).get();
context.report().addLine(String.format("lastAspect is %s", result.lastAspect));
}
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAMES_ARG_NAME)) {
result.aspectNames =
Arrays.asList(
context.parsedArgs().get(RestoreIndices.ASPECT_NAMES_ARG_NAME).get().split(","));
context.report().addLine(String.format("aspectNames is %s", result.aspectNames));
}
return result;
}

Expand Down
32 changes: 32 additions & 0 deletions docs/how/restore-indices.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,38 @@ By default, restoring the indices from the local database will not remove any ex
the search and graph indices that no longer exist in the local database, potentially leading to inconsistencies
between the search and graph indices and the local database.

## Configuration

The upgrade jobs take arguments as command line args to the job itself rather than environment variables for job specific configuration. The RestoreIndices job is specified through the `-u RestoreIndices` upgrade ID parameter and then additional parameters are specified like `-a batchSize=1000`.
The following configurations are available:

### Time-Based Filtering

* `lePitEpochMs`: Restore records created before this timestamp (in milliseconds)
* `gePitEpochMs`: Restore records created after this timestamp (in milliseconds)

### Pagination and Performance Options

* `urnBasedPagination`: Enable key-based pagination instead of offset-based pagination. Recommended for large datasets as it's typically more efficient.
* `startingOffset`: When using default pagination, start from this offset
* `lastUrn`: Resume from a specific URN when using URN-based pagination
* `lastAspect`: Used with lastUrn to resume from a specific aspect, preventing reprocessing
* `numThreads`: Number of concurrent threads for processing restoration, only used with default offset based paging
* `batchSize`: Configures the size of each batch as the job pages through rows
* `batchDelayMs`: Adds a delay in between each batch to avoid overloading backend systems

### Content Filtering

* `aspectNames`: Comma-separated list of aspects to restore (e.g., "ownership,status")
* `urnLike`: SQL LIKE pattern to filter URNs (e.g., "urn:li:dataset%")

### Nuclear option
* `clean`: This option wipes out the current indices by running deletes of all the documents to guarantee a consistent state with SQL. This is generally not recommended unless there is significant data corruption on the instance.

### Helm

These are available in the helm charts as configurations for Kubernetes deployments under the `datahubUpgrade.restoreIndices.args` path which will set them up as args to the pod command.

## Quickstart

If you're using the quickstart images, you can use the `datahub` cli to restore the indices.
Expand Down

0 comments on commit 6f0d475

Please sign in to comment.