Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh 3059 other temp merge graphs #3063

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static com.google.common.collect.Iterables.isEmpty;
import static java.util.Objects.nonNull;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedStoreUtil.GIVEN_MERGE_STORE;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedStoreUtil.getStoreConfiguredMergeFunction;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedStoreUtil.processIfFunctionIsContextSpecific;

Expand Down Expand Up @@ -111,6 +112,12 @@ private Object mergeResults(final Iterable resultsFromAllGraphs, final Federated

private static BiFunction getMergeFunction(final FederatedOperation operation, final FederatedStore store, final Context context, final boolean isResultsFromAllGraphsEmpty) throws GafferCheckedException {
final BiFunction mergeFunction;

// pass the given information from options to the operation context to be available to the merge function
if (operation.containsOption(GIVEN_MERGE_STORE)) {
context.setVariable(GIVEN_MERGE_STORE, operation.getOption(GIVEN_MERGE_STORE));
}

if (isResultsFromAllGraphsEmpty) {
//No Merge function required.
mergeFunction = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@
* Such as the re-application of View filter or Schema Validation after the local aggregation of results from multiple graphs.
* By default, a local in memory MapStore is used for local aggregation,
* but a Graph or {@link GraphSerialisable} of any kind could be supplied via the {@link #context} with the key {@link #TEMP_RESULTS_GRAPH}.
* <p>
* An issue not covered:
* GraphA has elementA with a property value 101.
* GraphB has the same elementA with property value 1.
* GraphC has the same elementA with property value 2.
* Asking for a simple GetAllElements with a view filter of property less than 100. Will incorrectly return elementA with a value 3.
* Because outside the functions scope, GraphA filtered out 101.

*/
public class FederatedElementFunction implements ContextSpecificMergeFunction<Object, Iterable<Object>, Iterable<Object>> {
private static final Logger LOGGER = LoggerFactory.getLogger(FederatedElementFunction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2022-2023 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.federatedstore.util;

import com.fasterxml.jackson.annotation.JsonIgnore;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;


public class FederatedElementFunctionWithGivenStore extends FederatedElementFunction {


public static final String TEAR_DOWN_TEMP_GRAPH = "tearDownTempGraph";

@Override
@JsonIgnore
public Set<String> getRequiredContextValues() {
final HashSet<String> set = new HashSet<>(super.getRequiredContextValues());
set.add(TEMP_RESULTS_GRAPH);
set.add(TEAR_DOWN_TEMP_GRAPH);
return Collections.unmodifiableSet(set);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedElementFunction.SCHEMA;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedElementFunction.TEMP_RESULTS_GRAPH;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedElementFunction.USER;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedElementFunction.VIEW;

Expand All @@ -76,6 +77,7 @@ public final class FederatedStoreUtil {

@Deprecated
public static final String DEPRECATED_GRAPHIDS_OPTION = "gaffer.federatedstore.operation.graphIds";
public static final String GIVEN_MERGE_STORE = "gaffer.federatedstore.merge.function.given.merge.store";

private FederatedStoreUtil() {
}
Expand Down Expand Up @@ -304,6 +306,7 @@ public static BiFunction processIfFunctionIsContextSpecific(final BiFunction mer
final ContextSpecificMergeFunction specificMergeFunction = (ContextSpecificMergeFunction) mergeFunction;
HashMap<String, Object> functionContext = new HashMap<>();

functionContext = processGivenResultStoreForSpecificMergeFunction(specificMergeFunction, functionContext, operationContext, federatedStore);
functionContext = processSchemaForSpecificMergeFunction(specificMergeFunction, functionContext, payload, graphIds, operationContext, federatedStore);
functionContext = processViewForSpecificMergeFunction(specificMergeFunction, functionContext, payload);
functionContext = processUserForSpecificMergeFunction(specificMergeFunction, functionContext, operationContext.getUser());
Expand All @@ -317,6 +320,18 @@ public static BiFunction processIfFunctionIsContextSpecific(final BiFunction mer
return rtn;
}

public static HashMap<String, Object> processGivenResultStoreForSpecificMergeFunction(final ContextSpecificMergeFunction specificMergeFunction, final HashMap<String, Object> functionContext, final Context operationContext, final FederatedStore federatedStore) {
if (specificMergeFunction.isRequired(TEMP_RESULTS_GRAPH)) {
final String variable = (String) operationContext.getVariable(GIVEN_MERGE_STORE);
if (variable != null) {
throw new UnsupportedOperationException("Implementation of adding a different type of temporary merge graph " +
"is not yet supported. Behaviour on how to delete the graph is not yet defined. Behaviour of what info " +
"to take from users or admins, is not yet defined.");
}
}
return functionContext;
}

private static HashMap<String, Object> processViewForSpecificMergeFunction(final ContextSpecificMergeFunction specificMergeFunction, final HashMap<String, Object> functionContext, final Operation payload) throws GafferCheckedException {
if (specificMergeFunction.isRequired(VIEW)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@
package uk.gov.gchq.gaffer.federatedstore;


import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import uk.gov.gchq.gaffer.data.element.Entity;
import uk.gov.gchq.gaffer.federatedstore.operation.FederatedOperation;
import uk.gov.gchq.gaffer.federatedstore.util.FederatedElementFunctionWithGivenStore;
import uk.gov.gchq.gaffer.federatedstore.util.FederatedStoreUtil;
import uk.gov.gchq.gaffer.graph.GraphConfig;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.add.AddElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.serialisation.implementation.JavaSerialiser;
import uk.gov.gchq.gaffer.store.schema.Schema;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -132,6 +138,34 @@ public void shouldNotReturnAnyElementsAfterInValidationInTemporaryMap() throws E
.hasSize(1);
}

@Test
public void shouldNotCurrentlySupportOtherTempResultsGraph() throws Exception {

final byte[] givenResultsGraph = new JavaSerialiser().serialise(new GraphSerialisable.Builder()
.config(new GraphConfig("TheGivenResultsGraph"))
.schema(loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"))
.properties(FederatedStoreTestUtil.loadAccumuloStoreProperties(FederatedStoreTestUtil.ACCUMULO_STORE_SINGLE_USE_PROPERTIES))
.build());

//given
addGraphToAccumuloStore(federatedStore, GRAPH_ID_A, true, loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"));
addGraphToAccumuloStore(federatedStore, GRAPH_ID_B, true, loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"));
addGraphToAccumuloStore(federatedStore, GRAPH_ID_C, true, loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"));

addEntity(GRAPH_ID_A, entity99); // 99 is valid
addEntity(GRAPH_ID_B, entity1); // 100 is not valid.
addEntity(GRAPH_ID_C, entity1); // correct behavior 100 & 1 is invalid. returning 1 would be incorrect if 100 had been deleted.
addEntity(GRAPH_ID_B, entityOther);

//when
Assertions.assertThatException()
.isThrownBy(() -> federatedStore.execute(new FederatedOperation.Builder()
.op(new GetAllElements())
.option(FederatedStoreUtil.GIVEN_MERGE_STORE, givenResultsGraph.toString())
.mergeFunction(new FederatedElementFunctionWithGivenStore())
.build(), contextTestUser()))
.withMessageContaining("Implementation of adding a different type of temporary merge graph is not yet implemented");
}

private void addEntity(final String graphIdA, final Entity entity) throws OperationException {
federatedStore.execute(new FederatedOperation.Builder()
Expand Down
Loading