Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh-3322: Cache updates for federated POC #3323

Merged
merged 10 commits into from
Oct 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ private Collection<Operation> resolveNamedOperations(final Operation operation,
.getOperationChain(namedOperation.getParameters());
// Update the operation inputs and add operation chain to the updated list
OperationHandlerUtil.updateOperationInput(namedOperationChain, namedOperation.getInput());
namedOperationChain.setOptions(namedOperation.getOptions());

// Run again to resolve any nested operations in the chain before adding
namedOperationChain.updateOperations(resolveNamedOperations(namedOperationChain, user, depth + 1));
Expand Down
p29876 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import uk.gov.gchq.gaffer.federated.simple.operation.GetAllGraphIds;
import uk.gov.gchq.gaffer.federated.simple.operation.GetAllGraphInfo;
import uk.gov.gchq.gaffer.federated.simple.operation.RemoveGraph;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.EitherOperationHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOperationHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOutputHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.add.AddGraphHandler;
Expand All @@ -45,6 +46,13 @@
import uk.gov.gchq.gaffer.federated.simple.operation.handler.misc.ChangeGraphIdHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.misc.RemoveGraphHandler;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.named.operation.AddNamedOperation;
import uk.gov.gchq.gaffer.named.operation.DeleteNamedOperation;
import uk.gov.gchq.gaffer.named.operation.GetAllNamedOperations;
import uk.gov.gchq.gaffer.named.operation.NamedOperation;
import uk.gov.gchq.gaffer.named.view.AddNamedView;
import uk.gov.gchq.gaffer.named.view.DeleteNamedView;
import uk.gov.gchq.gaffer.named.view.GetAllNamedViews;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
Expand All @@ -53,6 +61,7 @@
import uk.gov.gchq.gaffer.operation.impl.get.GetAdjacentIds;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetGraphCreatedTime;
import uk.gov.gchq.gaffer.serialisation.Serialiser;
import uk.gov.gchq.gaffer.serialisation.ToBytesSerialiser;
import uk.gov.gchq.gaffer.store.Context;
Expand All @@ -63,8 +72,17 @@
import uk.gov.gchq.gaffer.store.operation.DeleteAllData;
import uk.gov.gchq.gaffer.store.operation.GetSchema;
import uk.gov.gchq.gaffer.store.operation.GetTraits;
import uk.gov.gchq.gaffer.store.operation.handler.GetGraphCreatedTimeHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OperationChainHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OutputOperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.AddNamedOperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.AddNamedViewHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.DeleteNamedOperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.DeleteNamedViewHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.GetAllNamedOperationsHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.GetAllNamedViewsHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.NamedOperationHandler;
import uk.gov.gchq.gaffer.store.schema.Schema;

import java.util.AbstractMap.SimpleEntry;
Expand All @@ -81,6 +99,7 @@
import static uk.gov.gchq.gaffer.accumulostore.utils.TableUtils.renameTable;
import static uk.gov.gchq.gaffer.cache.CacheServiceLoader.DEFAULT_SERVICE_NAME;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_DEFAULT_GRAPH_IDS;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_GRAPH_CACHE_NAME;

/**
* The federated store implementation. Provides the set up and required
Expand Down Expand Up @@ -290,16 +309,6 @@ public Schema getSchema(final List<GraphSerialisable> graphs) {
}
}

/**
* Access to getting the operations that have handlers specific to this
* store.
*
* @return The Operation classes handled by this store.
*/
public Set<Class<? extends Operation>> getStoreSpecificOperations() {
return storeHandlers.keySet();
}

@Override
public void initialise(final String graphId, final Schema unused, final StoreProperties properties) throws StoreException {
if (unused != null) {
Expand All @@ -308,7 +317,7 @@ public void initialise(final String graphId, final Schema unused, final StorePro
super.initialise(graphId, new Schema(), properties);

// Init the cache for graphs
graphCache = new Cache<>("federatedGraphCache-" + graphId);
graphCache = new Cache<>(properties.get(PROP_GRAPH_CACHE_NAME, "federatedGraphCache_" + graphId));

// Get and set default graph IDs from properties
if (properties.containsKey(PROP_DEFAULT_GRAPH_IDS)) {
Expand Down Expand Up @@ -344,11 +353,34 @@ protected Object doUnhandledOperation(final Operation operation, final Context c
@Override
protected void addAdditionalOperationHandlers() {
storeHandlers.forEach(this::addOperationHandler);

final String namedOpCacheSuffix = getProperties().getCacheServiceNamedOperationSuffix(getGraphId());
final String namedViewCacheSuffix = getProperties().getCacheServiceNamedViewSuffix(getGraphId());
final Boolean nestedNamedOpsAllowed = getProperties().isNestedNamedOperationAllow();

// Add overrides as cache operations could be run locally or on sub graphs
if (getProperties().getNamedOperationEnabled()) {
addOperationHandler(NamedOperation.class, new EitherOperationHandler<>(new NamedOperationHandler()));
addOperationHandler(AddNamedOperation.class, new EitherOperationHandler<>(
new AddNamedOperationHandler(namedOpCacheSuffix, nestedNamedOpsAllowed)));
addOperationHandler(GetAllNamedOperations.class, new EitherOperationHandler<>(new GetAllNamedOperationsHandler(namedOpCacheSuffix)));
addOperationHandler(DeleteNamedOperation.class, new EitherOperationHandler<>(new DeleteNamedOperationHandler(namedOpCacheSuffix)));
}

// Named Views could be either
if (getProperties().getNamedViewEnabled()) {
addOperationHandler(AddNamedView.class, new EitherOperationHandler<>(new AddNamedViewHandler(namedViewCacheSuffix)));
addOperationHandler(GetAllNamedViews.class, new EitherOperationHandler<>(new GetAllNamedViewsHandler(namedViewCacheSuffix)));
addOperationHandler(DeleteNamedView.class, new EitherOperationHandler<>(new DeleteNamedViewHandler(namedViewCacheSuffix)));
}

// Misc operations that could be for sub graphs or not
addOperationHandler(GetGraphCreatedTime.class, new EitherOperationHandler<>(new GetGraphCreatedTimeHandler()));
}

@Override
protected OperationHandler<? extends OperationChain<?>> getOperationChainHandler() {
return new FederatedOperationHandler<>();
return new EitherOperationHandler<>(new OperationChainHandler<>(getOperationChainValidator(), getOperationChainOptimisers()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class FederatedStoreProperties extends StoreProperties {
* Property key for setting if public graphs can be added to the store or not
*/
public static final String PROP_ALLOW_PUBLIC_GRAPHS = "gaffer.store.federated.allowPublicGraphs";
/**
* Property key for setting a custom name for the graph cache, by default
* this will be "federatedGraphCache_" followed by the federated graph ID.
*/
public static final String PROP_GRAPH_CACHE_NAME = "gaffer.store.federated.graphCache.name";
/**
* Property key for the class to use when merging number results
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.federated.simple.operation.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

/**
* Custom handler for operations that could in theory target sub graphs or the
* federated store directly.
*/
public class EitherOperationHandler<O extends Operation> implements OperationHandler<O> {
private static final Logger LOGGER = LoggerFactory.getLogger(EitherOperationHandler.class);

private final OperationHandler<O> standardHandler;

public EitherOperationHandler(final OperationHandler<O> standardHandler) {
this.standardHandler = standardHandler;
}

/**
* If graph IDs are in the options the operation will be handled by a
* {@link FederatedOperationHandler}, otherwise the original handler will be
* used e.g. executed on the federated store directly.
*/
tb06904 marked this conversation as resolved.
Show resolved Hide resolved
@Override
public Object doOperation(final O operation, final Context context, final Store store) throws OperationException {
tb06904 marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.debug("Checking if Operation should be handled locally or on sub graphs: {}", operation);

// If we have graph IDs then run as a federated operation
if (operation.containsOption(FederatedOperationHandler.OPT_GRAPH_IDS) ||
operation.containsOption(FederatedOperationHandler.OPT_SHORT_GRAPH_IDS) ||
operation.containsOption(FederatedOperationHandler.OPT_EXCLUDE_GRAPH_IDS)) {
LOGGER.debug("Operation has specified graph IDs, it will be handled by sub graphs");
return new FederatedOperationHandler<>().doOperation(operation, context, store);
}

// No sub graphs involved just run the handler for this operations on the federated store
return standardHandler.doOperation(operation, context, store);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,16 @@
import uk.gov.gchq.gaffer.federated.simple.access.GraphAccess;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.operation.handler.OperationChainHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -72,47 +69,29 @@ public class FederatedOperationHandler<P extends Operation> implements Operation
*/
public static final String OPT_AGGREGATE_ELEMENTS = "federated.aggregateElements";

/**
* A boolean option to specify if to forward the whole operation chain to the sub graph or not.
*/
public static final String OPT_FORWARD_CHAIN = "federated.forwardChain";

/**
* A boolean option to specify if a graph should be skipped if execution
* fails on it e.g. continue executing on the rest of the graphs
*/
public static final String OPT_SKIP_FAILED_EXECUTE = "federated.skipGraphOnFail";

/**
* A boolean option to specify if the results from each graph should be kept
* separate. If set this will return a map where each key value is the graph
* ID and its respective result.
*/
public static final String OPT_SEPARATE_RESULTS = "federated.separateResults";

@Override
public Object doOperation(final P operation, final Context context, final Store store) throws OperationException {
LOGGER.debug("Running operation: {}", operation);
// Check inside operation chains in case there are operations that don't require running on sub graphs
if (operation instanceof OperationChain) {
Set<Class<? extends Operation>> storeSpecificOps = ((FederatedStore) store).getStoreSpecificOperations();
List<Class<? extends Operation>> chainOps = ((OperationChain<?>) operation).flatten().stream()
.map(Operation::getClass)
.collect(Collectors.toList());

// If all the operations in the chain can be handled by the store then execute them.
// Or if told not to forward the whole chain process each operation individually.
if (storeSpecificOps.containsAll(chainOps) ||
(!Boolean.parseBoolean(operation.getOption(OPT_FORWARD_CHAIN, "true")))) {
// Use default handler
return new OperationChainHandler<>(store.getOperationChainValidator(), store.getOperationChainOptimisers())
.doOperation((OperationChain<Object>) operation, context, store);
}

// Check if we have a mix as that is an issue
// It's better to keep federated and non federated separate so error and report back
if (!Collections.disjoint(storeSpecificOps, chainOps)) {
throw new OperationException(
"Chain contains standard Operations alongside federated store specific Operations."
+ " Please submit each type separately or set: '" + OPT_FORWARD_CHAIN + "' to: 'false'.");
}
}

// If the operation has output wrap and return using sub class handler
if (operation instanceof Output) {
// Should we keep the results separate
if (Boolean.parseBoolean(operation.getOption(OPT_SEPARATE_RESULTS, "false"))) {
return new SeparateOutputHandler<>().doOperation((Output) operation, context, store);
}
return new FederatedOutputHandler<>().doOperation((Output) operation, context, store);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public O doOperation(final P operation, final Context context, final Store store
}

// Not expecting any output so exit since we've executed
if (operation.getOutputClass().isAssignableFrom(Void.class) || graphResults.isEmpty()) {
if (operation.getOutputClass() == Void.class || graphResults.isEmpty()) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.federated.simple.operation.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Handler for running federated operations but keeping the results separate
* under a key of the graph ID the results come from.
*/
public class SeparateOutputHandler<P extends Output<O>, O> extends FederatedOperationHandler<P> {
private static final Logger LOGGER = LoggerFactory.getLogger(SeparateOutputHandler.class);

@Override
public Map<String, O> doOperation(final P operation, final Context context, final Store store) throws OperationException {
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

if (graphsToExecute.isEmpty()) {
return new HashMap<>();

Check warning on line 45 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/SeparateOutputHandler.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/SeparateOutputHandler.java#L45

Added line #L45 was not covered by tests
}

// Execute the operation chain on each graph
LOGGER.debug("Returning separated graph results");
Map<String, O> results = new HashMap<>();
for (final GraphSerialisable gs : graphsToExecute) {
p29876 marked this conversation as resolved.
Show resolved Hide resolved
try {
results.put(gs.getGraphId(), gs.getGraph().execute(operation, context.getUser()));
} catch (final OperationException | UnsupportedOperationException e) {

Check warning on line 54 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/SeparateOutputHandler.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/SeparateOutputHandler.java#L54

Added line #L54 was not covered by tests
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", gs.getGraphId());

Check warning on line 56 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/SeparateOutputHandler.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/SeparateOutputHandler.java#L56

Added line #L56 was not covered by tests
if (!Boolean.parseBoolean(operation.getOption(OPT_SKIP_FAILED_EXECUTE, "false"))) {
throw e;

Check warning on line 58 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/SeparateOutputHandler.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/SeparateOutputHandler.java#L58

Added line #L58 was not covered by tests
}
}
}

return results;
}
}
Loading
Loading