Skip to content

Commit

Permalink
Gh-3149: Add depth limit to NamedOperationResolver (#3174)
Browse files Browse the repository at this point in the history
* add depth limit to name op resolver

* Tweak functionality for resolver

* update and add limit test

* update instance check to operation chains

* throw exception if any unresolved named ops

* add test for configurable limit

---------

Co-authored-by: rj77259 <[email protected]>
Co-authored-by: GCHQDeveloper314 <[email protected]>
  • Loading branch information
3 people authored Mar 21, 2024
1 parent 6213159 commit 55cf1f7
Show file tree
Hide file tree
Showing 2 changed files with 282 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import uk.gov.gchq.gaffer.named.operation.NamedOperation;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.Operations;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.operation.handler.named.cache.NamedOperationCache;
import uk.gov.gchq.gaffer.store.operation.handler.util.OperationHandlerUtil;
Expand All @@ -37,79 +36,56 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* A {@link GraphHook} to resolve named operations.
*/
@JsonPropertyOrder(alphabetic = true)
public class NamedOperationResolver implements GetFromCacheHook {
private static final Logger LOGGER = LoggerFactory.getLogger(NamedOperationResolver.class);
// TODO: Possibly implement a depth limit rather than a hard timeout
public static final int TIMEOUT_DEFAULT = 1;
public static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.MINUTES;
private final NamedOperationCache cache;
private final int timeout;
private final TimeUnit timeUnit;
/**
* Default depth the resolver will go when checking for nested named operations
*/
public static final int DEPTH_LIMIT_DEFAULT = 3;

public NamedOperationResolver(final String suffixNamedOperationCacheName) {
this(suffixNamedOperationCacheName, TIMEOUT_DEFAULT, TIME_UNIT_DEFAULT);
}
private final int depthLimit;
private final NamedOperationCache cache;

@JsonCreator
public NamedOperationResolver(
@JsonProperty("suffixNamedOperationCacheName") final String suffixNamedOperationCacheName,
@JsonProperty("timeout") final int timeout,
@JsonProperty("timeUnit") final TimeUnit timeUnit) {
this(new NamedOperationCache(suffixNamedOperationCacheName), timeout, timeUnit);
@JsonProperty("depthLimit") final int depthLimit) {
this(new NamedOperationCache(suffixNamedOperationCacheName), depthLimit);
}

public NamedOperationResolver(final String suffixNamedOperationCacheName) {
this(suffixNamedOperationCacheName, DEPTH_LIMIT_DEFAULT);
}

public NamedOperationResolver(final NamedOperationCache cache) {
this(cache, TIMEOUT_DEFAULT, TIME_UNIT_DEFAULT);
this(cache, DEPTH_LIMIT_DEFAULT);
}

public NamedOperationResolver(
final NamedOperationCache cache,
final int timeout,
final TimeUnit timeUnit) {
public NamedOperationResolver(final NamedOperationCache cache, final int depthLimit) {
this.cache = cache;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.depthLimit = depthLimit;
}

@JsonGetter("suffixNamedOperationCacheName")
public String getSuffixCacheName() {
return cache.getSuffixCacheName();
}

@JsonGetter("timeout")
public int getTimeout() {
return timeout;
}

@JsonGetter("timeUnit")
public TimeUnit getTimeUnit() {
return timeUnit;
@JsonGetter("depthLimit")
public int getDepthLimit() {
return depthLimit;
}

@Override
public void preExecute(final OperationChain<?> opChain, final Context context) {
try {
LOGGER.info("Resolving Named Operations with timeout: " + timeout);
CompletableFuture.runAsync(() -> resolveNamedOperations(opChain, context.getUser())).get(timeout, timeUnit);
LOGGER.info("Finished Named Operation resolver");
} catch (final ExecutionException | TimeoutException e) {
throw new GafferRuntimeException("ResolverTask did not complete: " + e.getMessage(), e);
} catch (final InterruptedException e) {
LOGGER.warn("Thread interrupted", e);
Thread.currentThread().interrupt();
}

// Resolve the named operations in the chain
// (depth is set to zero as this is first level of recursion when checking for nested named ops)
opChain.updateOperations(resolveNamedOperations(opChain, context.getUser(), 0));
}

@Override
Expand All @@ -122,42 +98,67 @@ public <T> T onFailure(final T result, final OperationChain<?> opChain, final Co
return result;
}

private void resolveNamedOperations(final Operations<?> operations, final User user) {
final List<Operation> updatedOperations = new ArrayList<>();

operations.getOperations().forEach(operation -> {
if (operation instanceof NamedOperation) {
updatedOperations.addAll(resolveNamedOperation((NamedOperation<?, ?>) operation, user));
} else {
if (operation instanceof Operations) {
resolveNamedOperations(((Operations<?>) operation), user);
/**
* Resolves any named operations from the cache. What is meant
* by 'resolved' is turning the named operations into their respective
* {@link OperationChain}s. Ensures any supplied {@link NamedOperation}s
* actually exist in the cache and contain their correct {@link Operation}s.
* Will also run recursively to a given depth limit to ensure any nested
* {@link NamedOperation}s are also resolved from the cache.
*
* @param operation {@link NamedOperation} or {@link OperationChain} to act on.
* @param user User for the cache access.
* @param depth Current recursive depth, will use limit set in class to
* continue or not.
* @return A list of resolved operations essentially flattened into plain
* Operations and OperationChains.
*/
private Collection<Operation> resolveNamedOperations(final Operation operation, final User user, final int depth) {
final Collection<Operation> updatedOperations = new ArrayList<>();
LOGGER.debug("Current resolver depth is: {}", depth);

// If a named operation resolve the operations within it
if (operation instanceof NamedOperation) {
NamedOperation<?, ?> namedOperation = (NamedOperation<?, ?>) operation;
LOGGER.debug("Resolving named operation called: {}", namedOperation.getOperationName());
try {
// Get the chain for the named operation from the cache
final OperationChain<?> namedOperationChain = cache
.getNamedOperation(namedOperation.getOperationName(), user)
.getOperationChain(namedOperation.getParameters());
// Update the operation inputs and add operation chain to the updated list
OperationHandlerUtil.updateOperationInput(namedOperationChain, namedOperation.getInput());

// Run again to resolve any nested operations in the chain before adding
namedOperationChain.updateOperations(resolveNamedOperations(namedOperationChain, user, depth + 1));
updatedOperations.add(namedOperationChain);

} catch (final CacheOperationException e) {
LOGGER.error("Exception resolving NamedOperation within the cache: {}", e.getMessage());
return Collections.singletonList(namedOperation);
}
// If given an operationChain resolve Operations inside it
} else if (operation instanceof OperationChain) {
LOGGER.debug("Resolving Operations in Operation Chain: {}", ((OperationChain<?>) operation).getOperations());
for (final Operation op : ((OperationChain<?>) operation).getOperations()) {
// Resolve if haven't hit the depth limit for named operations yet
// Note only need to check the depth here as when checking for a nested named operation it will always be
// recursively passed as an operation chain
if (depth < depthLimit) {
updatedOperations.addAll(resolveNamedOperations(op, user, depth));
} else {
// If a NamedOperation couldn't be resolved then error
if (op instanceof NamedOperation) {
LOGGER.error("Nested depth limit hit resolving NamedOperation: {}", ((NamedOperation<?, ?>) op).getOperationName());
throw new GafferRuntimeException("NamedOperation Resolver hit nested depth limit of " + depthLimit);
}
updatedOperations.add(op);
}
updatedOperations.add(operation);
}
});

operations.updateOperations((Collection) updatedOperations);
}

private List<? extends Operation> resolveNamedOperation(final NamedOperation<?, ?> namedOp, final User user) {
try {
// Get the named operation chain from the cache
final OperationChain<?> namedOperationChain = cache
.getNamedOperation(namedOp.getOperationName(), user)
.getOperationChain(namedOp.getParameters());

OperationHandlerUtil.updateOperationInput(namedOperationChain, namedOp.getInput());

// Call resolveNamedOperations again to check there are no nested named operations
resolveNamedOperations(namedOperationChain, user);
return namedOperationChain.getOperations();
} catch (final CacheOperationException e) {
// An Exception with the cache has occurred e.g. it was unable to find named operation
// and then simply returned the original operation chain with the unresolved NamedOperation.

// The exception from cache would otherwise be lost, so capture it here and print to LOGS.
LOGGER.error("Exception resolving NamedOperation within the cache:{}", e.getMessage());
return Collections.singletonList(namedOp);
// If just a plain operation then nothing to resolve
} else {
updatedOperations.add(operation);
}
return updatedOperations;
}
}
Loading

0 comments on commit 55cf1f7

Please sign in to comment.