Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2478: JobTracker tracks all operations not just jobs #3330

Closed
20 changes: 5 additions & 15 deletions core/store/src/main/java/uk/gov/gchq/gaffer/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,17 +384,9 @@ public void execute(final Operation operation, final Context context) throws Ope
public <O> O execute(final Output<O> operation, final Context context) throws OperationException {
return execute(OperationChain.wrap(operation), context);
}

protected <O> O execute(final OperationChain<O> operation, final Context context) throws OperationException {
try {
addOrUpdateJobDetail(operation, context, null, JobStatus.RUNNING);
final O result = (O) handleOperation(operation, context);
addOrUpdateJobDetail(operation, context, null, JobStatus.FINISHED);
return result;
} catch (final Throwable t) {
addOrUpdateJobDetail(operation, context, t.getMessage(), JobStatus.FAILED);
throw t;
}
final O result = (O) handleOperation(operation, context);
return result;
}

/**
Expand Down Expand Up @@ -978,21 +970,19 @@ public OperationHandler<Operation> getOperationHandler(final Class<? extends Ope
return operationHandlers.get(opClass);
}

private JobDetail addOrUpdateJobDetail(final OperationChain<?> operationChain, final Context context,
final String msg, final JobStatus jobStatus) {
private JobDetail addOrUpdateJobDetail(final OperationChain<?> operationChain, final Context context, final String msg, final JobStatus jobStatus) {
final JobDetail newJobDetail = new JobDetail(context.getJobId(), context.getUser(), operationChain, jobStatus, msg);
if (nonNull(jobTracker)) {
final JobDetail oldJobDetail = jobTracker.getJob(newJobDetail.getJobId(), context.getUser());
if (newJobDetail.getStatus().equals(JobStatus.SCHEDULED_PARENT)) {
newJobDetail.setRepeat(null);
newJobDetail.setSerialisedOperationChain(operationChain);
}

if (isNull(oldJobDetail)) {
if (null == oldJobDetail) {
jobTracker.addOrUpdateJob(newJobDetail, context.getUser());
} else {
jobTracker.addOrUpdateJob(new JobDetail(oldJobDetail, newJobDetail), context
.getUser());
.getUser());
}
}
return newJobDetail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,14 @@ public void shouldExecuteOperationWhenJobTrackerCacheIsBroken(@Mock final StoreP
store.initialise("graphId", createSchemaMock(), storeProperties);

// When
store.execute(addElements, context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is worth keeping as it tests job functionality
We no longer handle jobs in the execute method, but we do in the executeJob method.
Swapping this line should pass the test


store.executeJob(addElements, context);
// Then
verify(addElementsHandler).doOperation(addElements, context, store);
verify(mockICacheService, Mockito.atLeast(1)).getCache(any());
verify(mockICache, Mockito.atLeast(1)).put(any(), any());
}

};
@Test
public void shouldCreateStoreWithSpecificCaches() throws SchemaException, StoreException {
// Given
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 Crown Copyright
* Copyright 2015-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -106,24 +106,6 @@ public void shouldPropagateStatusInformationContainedInOperationExceptionsThrown
assertEquals(SERVICE_UNAVAILABLE.getStatusCode(), response.getStatus());
}

@Test
public void shouldReturnSameJobIdInHeaderAsGetAllJobDetailsOperation() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that this test can be deleted now as it fails due to no jobs running which is correct.
However, I'm not a fan of just deleting tests with no explanation. It's better to change the test to document the new behavior as a bit of self documenting code. I think we need a new test here to show that we shouldn't get a job-id header returned anymore for the executeOperation endpoint as it doesn't make sense.

However, we are still getting the header returned so you will need to implement this

// Given
final Graph graph = new Graph.Builder()
.config(StreamUtil.graphConfig(this.getClass()))
.storeProperties(StreamUtil.STORE_PROPERTIES)
.addSchema(new Schema())
.build();

client.reinitialiseGraph(graph);

// When
final Response response = client.executeOperation(new GetAllJobDetails());

// Then
assertTrue(response.readEntity(String.class).contains(response.getHeaderString("job-id")));
}

@Test
public void shouldReturnAllOperationsAsOperationDetails() throws IOException, ClassNotFoundException {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.springframework.http.ResponseEntity;
import org.springframework.util.LinkedMultiValueMap;

import uk.gov.gchq.gaffer.cache.impl.HashMapCacheService;
import uk.gov.gchq.gaffer.commonutil.StreamUtil;
import uk.gov.gchq.gaffer.core.exception.Error;
import uk.gov.gchq.gaffer.data.element.Entity;
Expand Down Expand Up @@ -161,37 +160,6 @@ public void shouldPropagateStatusInformationContainedInOperationExceptionsThrown
assertEquals(SERVICE_UNAVAILABLE.getStatusCode(), response.getStatusCode().value());
}

@Test
public void shouldReturnSameJobIdInHeaderAsGetAllJobDetailsOperation() throws IOException {
// Given
StoreProperties properties = new MapStoreProperties();
properties.setJobTrackerEnabled(true);
properties.setDefaultCacheServiceClass(HashMapCacheService.class.getName());

Graph graph = new Graph.Builder()
.config(StreamUtil.graphConfig(this.getClass()))
.storeProperties(properties)
.addSchema(new Schema())
.build();

when(getGraphFactory().getGraph()).thenReturn(graph);

// When
final ResponseEntity<Set> response = post("/graph/operations/execute",
new GetAllJobDetails(),
Set.class);

// Then
try {
assertTrue(response.getBody().toString().contains(response.getHeaders().get("job-id").get(0)));
} catch (final AssertionError e) {
System.out.println("Job ID was not found in the Header");
System.out.println("Header was: " + response.getHeaders().get("job-id"));
System.out.println("Body was: " + response.getBody());
throw e;
}
}

@Test
public void shouldCorrectlyStreamExecuteChunked() throws Exception {
// Given
Expand Down
Loading