Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2478: JobTracker tracks all operations not just jobs #3330

Closed
22 changes: 6 additions & 16 deletions core/store/src/main/java/uk/gov/gchq/gaffer/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,17 +384,9 @@
public <O> O execute(final Output<O> operation, final Context context) throws OperationException {
return execute(OperationChain.wrap(operation), context);
}

protected <O> O execute(final OperationChain<O> operation, final Context context) throws OperationException {
try {
addOrUpdateJobDetail(operation, context, null, JobStatus.RUNNING);
final O result = (O) handleOperation(operation, context);
addOrUpdateJobDetail(operation, context, null, JobStatus.FINISHED);
return result;
} catch (final Throwable t) {
addOrUpdateJobDetail(operation, context, t.getMessage(), JobStatus.FAILED);
throw t;
}
final O result = (O) handleOperation(operation, context);
return result;
}

/**
Expand Down Expand Up @@ -978,21 +970,19 @@
return operationHandlers.get(opClass);
}

private JobDetail addOrUpdateJobDetail(final OperationChain<?> operationChain, final Context context,
final String msg, final JobStatus jobStatus) {
private JobDetail addOrUpdateJobDetail(final OperationChain<?> operationChain, final Context context, final String msg, final JobStatus jobStatus) {
final JobDetail newJobDetail = new JobDetail(context.getJobId(), context.getUser(), operationChain, jobStatus, msg);
if (nonNull(jobTracker)) {
if (null != jobTracker) {
ms9698 marked this conversation as resolved.
Show resolved Hide resolved
final JobDetail oldJobDetail = jobTracker.getJob(newJobDetail.getJobId(), context.getUser());
if (newJobDetail.getStatus().equals(JobStatus.SCHEDULED_PARENT)) {
newJobDetail.setRepeat(null);
newJobDetail.setSerialisedOperationChain(operationChain);
}

if (isNull(oldJobDetail)) {
if (null == oldJobDetail) {
jobTracker.addOrUpdateJob(newJobDetail, context.getUser());
} else {
jobTracker.addOrUpdateJob(new JobDetail(oldJobDetail, newJobDetail), context
.getUser());
.getUser());

Check warning on line 985 in core/store/src/main/java/uk/gov/gchq/gaffer/store/Store.java

View check run for this annotation

Codecov / codecov/patch

core/store/src/main/java/uk/gov/gchq/gaffer/store/Store.java#L985

Added line #L985 was not covered by tests
}
}
return newJobDetail;
Expand Down
33 changes: 0 additions & 33 deletions core/store/src/test/java/uk/gov/gchq/gaffer/store/StoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
import org.mockito.junit.jupiter.MockitoExtension;

import uk.gov.gchq.gaffer.cache.CacheServiceLoader;
import uk.gov.gchq.gaffer.cache.ICache;
import uk.gov.gchq.gaffer.cache.ICacheService;
import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
import uk.gov.gchq.gaffer.cache.impl.HashMapCacheService;
import uk.gov.gchq.gaffer.commonutil.TestGroups;
import uk.gov.gchq.gaffer.commonutil.TestPropertyNames;
Expand Down Expand Up @@ -139,12 +136,10 @@
import uk.gov.gchq.koryphe.ValidationResult;
import uk.gov.gchq.koryphe.impl.binaryoperator.StringConcat;

import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -164,13 +159,11 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static uk.gov.gchq.gaffer.jobtracker.JobTracker.JOB_TRACKER_CACHE_SERVICE_NAME;
import static uk.gov.gchq.gaffer.store.StoreTrait.INGEST_AGGREGATION;
import static uk.gov.gchq.gaffer.store.StoreTrait.ORDERED;
import static uk.gov.gchq.gaffer.store.StoreTrait.PRE_AGGREGATION_FILTERING;
Expand Down Expand Up @@ -256,32 +249,6 @@ public void after() {
JSONSerialiser.update();
}

@Test
public void shouldExecuteOperationWhenJobTrackerCacheIsBroken(@Mock final StoreProperties storeProperties) throws Exception {
// Given
ICache<Object, Object> mockICache = Mockito.mock(ICache.class);
doThrow(new CacheOperationException("Stubbed class")).when(mockICache).put(any(), any());
ICacheService mockICacheService = Mockito.spy(ICacheService.class);
given(mockICacheService.getCache(any())).willReturn(mockICache);

Field field = CacheServiceLoader.class.getDeclaredField("SERVICES");
field.setAccessible(true);
java.util.Map<String, ICacheService> mockCacheServices = (java.util.Map<String, ICacheService>) field.get(new HashMap<>());
mockCacheServices.put(JOB_TRACKER_CACHE_SERVICE_NAME, mockICacheService);

final AddElements addElements = new AddElements();
final StoreImpl3 store = new StoreImpl3();
store.initialise("graphId", createSchemaMock(), storeProperties);

// When
store.execute(addElements, context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is worth keeping as it tests job functionality
We no longer handle jobs in the execute method, but we do in the executeJob method.
Swapping this line should pass the test


// Then
verify(addElementsHandler).doOperation(addElements, context, store);
verify(mockICacheService, Mockito.atLeast(1)).getCache(any());
verify(mockICache, Mockito.atLeast(1)).put(any(), any());
}

@Test
public void shouldCreateStoreWithSpecificCaches() throws SchemaException, StoreException {
// Given
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 Crown Copyright
* Copyright 2015-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -106,24 +106,6 @@ public void shouldPropagateStatusInformationContainedInOperationExceptionsThrown
assertEquals(SERVICE_UNAVAILABLE.getStatusCode(), response.getStatus());
}

@Test
public void shouldReturnSameJobIdInHeaderAsGetAllJobDetailsOperation() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that this test can be deleted now as it fails due to no jobs running which is correct.
However, I'm not a fan of just deleting tests with no explanation. It's better to change the test to document the new behavior as a bit of self documenting code. I think we need a new test here to show that we shouldn't get a job-id header returned anymore for the executeOperation endpoint as it doesn't make sense.

However, we are still getting the header returned so you will need to implement this

// Given
final Graph graph = new Graph.Builder()
.config(StreamUtil.graphConfig(this.getClass()))
.storeProperties(StreamUtil.STORE_PROPERTIES)
.addSchema(new Schema())
.build();

client.reinitialiseGraph(graph);

// When
final Response response = client.executeOperation(new GetAllJobDetails());

// Then
assertTrue(response.readEntity(String.class).contains(response.getHeaderString("job-id")));
}

@Test
public void shouldReturnAllOperationsAsOperationDetails() throws IOException, ClassNotFoundException {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.springframework.http.ResponseEntity;
import org.springframework.util.LinkedMultiValueMap;

import uk.gov.gchq.gaffer.cache.impl.HashMapCacheService;
import uk.gov.gchq.gaffer.commonutil.StreamUtil;
import uk.gov.gchq.gaffer.core.exception.Error;
import uk.gov.gchq.gaffer.data.element.Entity;
Expand Down Expand Up @@ -161,37 +160,6 @@ public void shouldPropagateStatusInformationContainedInOperationExceptionsThrown
assertEquals(SERVICE_UNAVAILABLE.getStatusCode(), response.getStatusCode().value());
}

@Test
public void shouldReturnSameJobIdInHeaderAsGetAllJobDetailsOperation() throws IOException {
// Given
StoreProperties properties = new MapStoreProperties();
properties.setJobTrackerEnabled(true);
properties.setDefaultCacheServiceClass(HashMapCacheService.class.getName());

Graph graph = new Graph.Builder()
.config(StreamUtil.graphConfig(this.getClass()))
.storeProperties(properties)
.addSchema(new Schema())
.build();

when(getGraphFactory().getGraph()).thenReturn(graph);

// When
final ResponseEntity<Set> response = post("/graph/operations/execute",
new GetAllJobDetails(),
Set.class);

// Then
try {
assertTrue(response.getBody().toString().contains(response.getHeaders().get("job-id").get(0)));
} catch (final AssertionError e) {
System.out.println("Job ID was not found in the Header");
System.out.println("Header was: " + response.getHeaders().get("job-id"));
System.out.println("Body was: " + response.getBody());
throw e;
}
}

@Test
public void shouldCorrectlyStreamExecuteChunked() throws Exception {
// Given
Expand Down