diff --git a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java index 856b3813985..545f0034f13 100644 --- a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java @@ -20,6 +20,7 @@ import java.net.http.HttpClient; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.jena.graph.Node; @@ -150,6 +151,8 @@ public String buildString() { private ContextAccumulator contextAcc = ContextAccumulator.newBuilder(()->ARQ.getContext()); // Uses query rewrite to replace variables by values. protected Map substitutionMap = new HashMap<>(); + protected long timeout = -1; + protected TimeUnit timeoutUnit = null; protected ExecUpdateHTTPBuilder() {} @@ -214,6 +217,12 @@ public Y substitution(Var var, Node value) { return thisBuilder(); } + public Y timeout(long timeout, TimeUnit timeoutUnit) { + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; + return thisBuilder(); + } + public Y httpClient(HttpClient httpClient) { this.httpClient = Objects.requireNonNull(httpClient); return thisBuilder(); diff --git a/jena-arq/src/main/java/org/apache/jena/query/ARQ.java b/jena-arq/src/main/java/org/apache/jena/query/ARQ.java index 941f90f6c89..196af200421 100644 --- a/jena-arq/src/main/java/org/apache/jena/query/ARQ.java +++ b/jena-arq/src/main/java/org/apache/jena/query/ARQ.java @@ -198,6 +198,17 @@ public static void enableBlankNodeResultLabels(boolean val) { */ public static final Symbol queryTimeout = SystemARQ.allocSymbol("queryTimeout"); + /** + * Set timeout. The value of this symbol gives the value of the timeout in milliseconds + * + * @see org.apache.jena.update.UpdateExecutionBuilder#timeout(long, TimeUnit) + */ + public static final Symbol updateTimeout = SystemARQ.allocSymbol("updateTimeout"); + // This can't be a context constant because NodeValues don't look in the context. // /** // * Context symbol controlling Roman Numerals in Filters. diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java index 8d67401a3c1..a79eea860b6 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java @@ -24,10 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.jena.atlas.iterator.Iter; -import org.apache.jena.atlas.logging.Log; import org.apache.jena.graph.Graph; import org.apache.jena.query.ARQ; -import org.apache.jena.sparql.ARQConstants; import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.engine.main.OpExecutor; import org.apache.jena.sparql.engine.main.OpExecutorFactory; @@ -78,18 +76,7 @@ public ExecutionContext(DatasetGraph dataset, OpExecutorFactory factory) { } public ExecutionContext(Context params, Graph activeGraph, DatasetGraph dataset, OpExecutorFactory factory) { - this(params, activeGraph, dataset, factory, cancellationSignal(params)); - } - - private static AtomicBoolean cancellationSignal(Context cxt) { - if ( cxt == null ) - return null; - try { - return cxt.get(ARQConstants.symCancelQuery); - } catch (ClassCastException ex) { - Log.error(ExecutionContext.class, "Class cast exception: Expected AtomicBoolean for cancel control: "+ex.getMessage()); - return null; - } + this(params, activeGraph, dataset, factory, Context.getCancelSignal(params)); } private ExecutionContext(Context params, Graph activeGraph, DatasetGraph dataset, OpExecutorFactory factory, AtomicBoolean cancelSignal) { diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java index 72b1e045e11..b5f27860a44 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java @@ -49,7 +49,8 @@ public QueryIterProcessBinding(QueryIterator qIter, ExecutionContext context) { nextBinding = null ; AtomicBoolean signal; try { - signal = context.getContext().get(ARQConstants.symCancelQuery); + signal = context.getCancelSignal(); + // FIXME Is above the same as this: context.getContext().get(ARQConstants.symCancelQuery); } catch(Exception ex) { signal = null; } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java index df797c8e774..b4b26c922f6 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java @@ -19,7 +19,6 @@ package org.apache.jena.sparql.exec; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -53,6 +52,7 @@ import org.apache.jena.sparql.engine.binding.Binding; import org.apache.jena.sparql.engine.binding.BindingFactory; import org.apache.jena.sparql.engine.iterator.QueryIteratorWrapper; +import org.apache.jena.sparql.exec.TimeoutLib.Timeout; import org.apache.jena.sparql.graph.GraphOps; import org.apache.jena.sparql.modify.TemplateLib; import org.apache.jena.sparql.syntax.ElementGroup; @@ -89,11 +89,10 @@ public class QueryExecDataset implements QueryExec private long timeout2 = TIMEOUT_UNSET; private final AlarmClock alarmClock = AlarmClock.get(); private long queryStartTime = -1; // Unset - private AtomicBoolean cancelSignal = new AtomicBoolean(false); + private AtomicBoolean cancelSignal; protected QueryExecDataset(Query query, String queryString, DatasetGraph datasetGraph, Context cxt, - QueryEngineFactory qeFactory, - long timeout1, TimeUnit timeUnit1, long timeout2, TimeUnit timeUnit2, + QueryEngineFactory qeFactory, Timeout timeout, Binding initialToEngine) { // Context cxt is already a safe copy. this.query = query; @@ -101,10 +100,14 @@ protected QueryExecDataset(Query query, String queryString, DatasetGraph dataset this.dataset = datasetGraph; this.qeFactory = qeFactory; this.context = (cxt == null) ? Context.setupContextForDataset(cxt, datasetGraph) : cxt; - this.timeout1 = asMillis(timeout1, timeUnit1); - this.timeout2 = asMillis(timeout2, timeUnit2); + this.timeout1 = timeout.initialTimeoutMillis(); + this.timeout2 = timeout.overallTimeoutMillis(); // See also query substitution handled in QueryExecBuilder this.initialBinding = initialToEngine; + + // Cancel signal may originate from an e.c. an update execution. + this.cancelSignal = Context.getOrSetCancelSignal(context); + init(); } @@ -114,10 +117,6 @@ private void init() { context.put(ARQConstants.sysCurrentQuery, query); } - private static long asMillis(long duration, TimeUnit timeUnit) { - return (duration < 0) ? duration : timeUnit.toMillis(duration); - } - @Override public void close() { closed = true; @@ -457,23 +456,6 @@ private void startQueryIteratorActual() { execInit(); - // JENA-2821 - Unconditionally provide a cancel signal because manual abort via QueryExec.abort() - // may be triggered any time, even if no timeouts were configured. - // Prior to this issue, the cancel signal was only provided when timeouts were configured. - - // The following note is older: - // JENA-2141 - the timeout can go off while building the query iterator structure. - // In this case, use a signal passed through the context. - // We don't know if getPlan().iterator() does a lot of work or not - // (ideally it shouldn't start executing the query but in some sub-systems - // it might be necessary) - // - // This applies to the time to first result because to get the first result, the - // queryIterator must have been built. So it does not apply for the second - // stage of N,-1 or N,M. - context.set(ARQConstants.symCancelQuery, cancelSignal); - - /* Timeouts: * -1,-1 No timeouts * N, same as -1,N Overall timeout only. No wrapper needed. diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java index eb0f705366e..8d98f59df16 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java @@ -23,7 +23,6 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; -import org.apache.jena.atlas.lib.Pair; import org.apache.jena.atlas.logging.Log; import org.apache.jena.graph.Graph; import org.apache.jena.graph.Node; @@ -34,8 +33,9 @@ import org.apache.jena.sparql.core.Var; import org.apache.jena.sparql.engine.QueryEngineFactory; import org.apache.jena.sparql.engine.QueryEngineRegistry; -import org.apache.jena.sparql.engine.Timeouts; import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.exec.TimeoutLib.Timeout; +import org.apache.jena.sparql.exec.TimeoutLib.TimeoutBuilderImpl; import org.apache.jena.sparql.syntax.syntaxtransform.QueryTransformOps; import org.apache.jena.sparql.util.Context; import org.apache.jena.sparql.util.ContextAccumulator; @@ -69,10 +69,7 @@ public static QueryExecDatasetBuilder create() { // Uses initial binding to execution (old, original) feature private Binding initialBinding = null; - private long initialTimeout = UNSET; - private TimeUnit initialTimeoutUnit = null; - private long overallTimeout = UNSET; - private TimeUnit overallTimeoutUnit = null; + private TimeoutBuilderImpl timeoutBuilder = new TimeoutBuilderImpl(); private QueryExecDatasetBuilder() { } @@ -166,60 +163,22 @@ public QueryExecDatasetBuilder initialBinding(Binding binding) { @Override public QueryExecDatasetBuilder timeout(long value, TimeUnit timeUnit) { - this.initialTimeout = UNSET; - this.initialTimeoutUnit = null; - this.overallTimeout = value; - this.overallTimeoutUnit = timeUnit; + timeoutBuilder.timeout(value, timeUnit); return this; } @Override public QueryExecDatasetBuilder initialTimeout(long value, TimeUnit timeUnit) { - this.initialTimeout = value < 0 ? -1L : value ; - this.initialTimeoutUnit = timeUnit; + timeoutBuilder.initialTimeout(value, timeUnit); return this; } @Override public QueryExecDatasetBuilder overallTimeout(long value, TimeUnit timeUnit) { - this.overallTimeout = value; - this.overallTimeoutUnit = timeUnit; + timeoutBuilder.overallTimeout(value, timeUnit); return this; } - // Set times from context if not set directly. e..g Context provides default values. - // Contrast with SPARQLQueryProcessor where the context is limiting values of the protocol parameter. - private static void defaultTimeoutsFromContext(QueryExecDatasetBuilder builder, Context cxt) { - applyTimeouts(builder, cxt.get(ARQ.queryTimeout)); - } - - /** Take obj, find the timeout(s) and apply to the builder */ - private static void applyTimeouts(QueryExecDatasetBuilder builder, Object obj) { - if ( obj == null ) - return ; - try { - if ( obj instanceof Number ) { - long x = ((Number)obj).longValue(); - if ( builder.overallTimeout < 0 ) - builder.overallTimeout(x, TimeUnit.MILLISECONDS); - } else if ( obj instanceof String ) { - String str = obj.toString(); - Pair pair = Timeouts.parseTimeoutStr(str, TimeUnit.MILLISECONDS); - if ( pair == null ) { - Log.warn(builder, "Bad timeout string: "+str); - return ; - } - if ( builder.initialTimeout < 0 ) - builder.initialTimeout(pair.getLeft(), TimeUnit.MILLISECONDS); - if ( builder.overallTimeout < 0 ) - builder.overallTimeout(pair.getRight(), TimeUnit.MILLISECONDS); - } else - Log.warn(builder, "Can't interpret timeout: " + obj); - } catch (Exception ex) { - Log.warn(builder, "Exception setting timeouts (context) from: "+obj); - } - } - @Override public QueryExec build() { Objects.requireNonNull(query, "No query for QueryExec"); @@ -243,17 +202,17 @@ public QueryExec build() { queryStringActual = null; } - defaultTimeoutsFromContext(this, cxt); + TimeoutLib.defaultTimeoutsFromContext(this.timeoutBuilder, cxt); if ( dataset != null ) cxt.set(ARQConstants.sysCurrentDataset, DatasetFactory.wrap(dataset)); if ( queryActual != null ) cxt.set(ARQConstants.sysCurrentQuery, queryActual); + Timeout timeout = timeoutBuilder.build(); + QueryExec qExec = new QueryExecDataset(queryActual, queryStringActual, dataset, cxt, qeFactory, - initialTimeout, initialTimeoutUnit, - overallTimeout, overallTimeoutUnit, - initialBinding); + timeout, initialBinding); return qExec; } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/TimeoutLib.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/TimeoutLib.java new file mode 100644 index 00000000000..3895e031415 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/TimeoutLib.java @@ -0,0 +1,202 @@ +package org.apache.jena.sparql.exec; + +import java.util.concurrent.TimeUnit; + +import org.apache.jena.atlas.lib.Pair; +import org.apache.jena.atlas.logging.Log; +import org.apache.jena.query.ARQ; +import org.apache.jena.sparql.engine.Timeouts; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.Symbol; + +/** Common timeout handling for query and update executions. */ +public class TimeoutLib { + + /** Internal duration record */ + static record Duration(long amount, TimeUnit unit) { + public Duration() { + this(-1, null); + } + + public boolean isSet() { + return amount >= 0; + } + + public long asMillis() { + return (amount < 0) ? amount : unit.toMillis(amount); + } + } + + // FIXME Convert to class because it seems we don't have to expose the duration record. + public static record Timeout(Duration initialTimeout, Duration overallTimeout) { + public static Timeout UNSET = new Timeout(-1, -1); + + public Timeout(long initialTimeout, TimeUnit initialTimeoutUnit, long overallTimeout, TimeUnit overallTimeoutUnit) { + this(new Duration(initialTimeout, initialTimeoutUnit), new Duration(overallTimeout, overallTimeoutUnit)); + } + + public Timeout(long initialTimeout, long overallTimeout) { + this(initialTimeout, TimeUnit.MILLISECONDS, overallTimeout, TimeUnit.MILLISECONDS); + } + + public boolean hasInitialTimeout() { + return initialTimeout().isSet(); + } + + public long initialTimeoutMillis() { + return initialTimeout().asMillis(); + } + + public boolean hasOverallTimeout() { + return overallTimeout().isSet(); + } + + public long overallTimeoutMillis() { + return overallTimeout().asMillis(); + } + + public boolean hasTimeout() { + return hasInitialTimeout() || hasOverallTimeout(); + } + } + +// public interface TimeoutBuilder> { +// public X timeout(long value, TimeUnit timeUnit); +// public X initialTimeout(long value, TimeUnit timeUnit); +// public X overallTimeout(long value, TimeUnit timeUnit); +// +// public boolean hasInitialTimeout(); +// public boolean hasOverallTimeout(); +// } + + // TimeoutBuilder reserved as a possible super-interface for {Query, Update}Exec(ution)Builder. + public static class TimeoutBuilderImpl { + private static final long UNSET = -1; + + protected long initialTimeout = UNSET; + protected TimeUnit initialTimeoutUnit = null; + protected long overallTimeout = UNSET; + protected TimeUnit overallTimeoutUnit = null; + + public TimeoutBuilderImpl timeout(long value, TimeUnit timeUnit) { + this.initialTimeout = UNSET; + this.initialTimeoutUnit = null; + this.overallTimeout = value; + this.overallTimeoutUnit = timeUnit; + return this; + } + + public TimeoutBuilderImpl initialTimeout(long value, TimeUnit timeUnit) { + this.initialTimeout = value < 0 ? -1L : value ; + this.initialTimeoutUnit = timeUnit; + return this; + } + + public boolean hasInitialTimeout() { + return initialTimeout >= 0; + } + + public TimeoutBuilderImpl overallTimeout(long value, TimeUnit timeUnit) { + this.overallTimeout = value; + this.overallTimeoutUnit = timeUnit; + return this; + } + + public boolean hasOverallTimeout() { + return overallTimeout >= 0; + } + + public Timeout build() { + return new Timeout(initialTimeout, initialTimeoutUnit, overallTimeout, overallTimeoutUnit); + } + } + + /** Take obj, find the timeout(s) and apply to the builder */ + public static void applyTimeouts(TimeoutBuilderImpl builder, Object obj) { + Timeout timeout = TimeoutLib.parseTimeouts(obj); + if (timeout != null) { + if ( !builder.hasInitialTimeout() ) + builder.initialTimeout(timeout.initialTimeout().amount(), timeout.initialTimeout().unit()); + if ( !builder.hasOverallTimeout() ) + builder.overallTimeout(timeout.overallTimeout().amount(), timeout.overallTimeout().unit()); + } + } + + public static Timeout parseQueryTimeout(Context cxt) { + return parseTimeouts(cxt, ARQ.queryTimeout); + } + + public static Timeout parseUpdateTimeouts(Context cxt) { + return parseTimeouts(cxt, ARQ.updateTimeout); + } + + public static Timeout parseTimeouts(Context cxt, Symbol symbol) { + Object obj = cxt.get(symbol); + return parseTimeouts(obj); + } + + public static Timeout parseTimeouts(Object obj) { + Timeout result = Timeout.UNSET; + if ( obj != null ) { + try { + if ( obj instanceof Number ) { + long x = ((Number)obj).longValue(); + result = new Timeout(new Duration(), new Duration(x, TimeUnit.MILLISECONDS)); + } else if ( obj instanceof String ) { + String str = obj.toString(); + Pair pair = Timeouts.parseTimeoutStr(str, TimeUnit.MILLISECONDS); + if ( pair == null ) { + Log.warn(TimeoutLib.class, "Bad timeout string: "+str); + return result; + } + result = new Timeout(pair.getLeft(), pair.getRight()); + } else + Log.warn(TimeoutLib.class, "Can't interpret timeout: " + obj); + } catch (Exception ex) { + Log.warn(TimeoutLib.class, "Exception setting timeouts (context) from: "+obj, ex); + } + } + return result; + } + + public static void setQueryTimeout(Context cxt, Timeout timeout) { + setTimeout(cxt, ARQ.queryTimeout, timeout); + } + + public static void setUpdateTimeout(Context cxt, Timeout timeout) { + setTimeout(cxt, ARQ.updateTimeout, timeout); + } + + public static void setTimeout(Context cxt, Symbol symbol, Timeout timeout) { + Object obj = toContextValue(timeout); + cxt.set(symbol, obj); + } + + /** Inverse function of {@link #parseTimeouts(Object)}. */ + public static Object toContextValue(Timeout timeout) { + Object result = timeout == null + ? null + : timeout.hasInitialTimeout() + ? toString(timeout) + : timeout.hasOverallTimeout() + ? timeout.overallTimeoutMillis() + : null; + return result; + } + + /** Inverse function of {@link #parseTimeouts(Object)}. */ + public static String toString(Timeout timeout) { + String result = timeout.hasInitialTimeout() + ? timeout.initialTimeoutMillis() + "," + timeout.overallTimeoutMillis() + : timeout.hasOverallTimeout() + ? Long.toString(timeout.overallTimeoutMillis()) + : null; + return result; + } + + // Set times from context if not set directly. e..g Context provides default values. + // Contrast with SPARQLQueryProcessor where the context is limiting values of the protocol parameter. + static void defaultTimeoutsFromContext(TimeoutBuilderImpl builder, Context cxt) { + applyTimeouts(builder, cxt.get(ARQ.queryTimeout)); + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java index 3e51b36507b..b97c2af0420 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java @@ -45,4 +45,9 @@ protected UpdateExecAdapter(UpdateExecution updateProc) { public Context getContext() { return updateProc.getContext(); } + + @Override + public void abort() { + updateProc.abort(); + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java index 46434ed84f4..02f56413c90 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java @@ -18,6 +18,8 @@ package org.apache.jena.sparql.exec; +import java.util.concurrent.TimeUnit; + import org.apache.jena.graph.Node; import org.apache.jena.query.ARQ; import org.apache.jena.sparql.core.Var; @@ -65,6 +67,8 @@ public default UpdateExecBuilder substitution(String var, Node value) { return substitution(Var.alloc(var), value); } + public UpdateExecBuilder timeout(long value, TimeUnit timeUnit); + public UpdateExec build(); /** Build and execute. */ diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java index 1355939aa6a..461373bb0e7 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java @@ -19,6 +19,7 @@ package org.apache.jena.sparql.exec; import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.jena.graph.Node; import org.apache.jena.sparql.core.ResultBinding; @@ -118,6 +119,12 @@ public UpdateExecBuilder substitution(Var var, Node value) { return this; } + @Override + public UpdateExecBuilder timeout(long timeout, TimeUnit timeoutUnit) { + builder = builder.timeout(timeout, timeoutUnit); + return this; + } + @Override public UpdateExec build() { UpdateExecution updateExec = builder.build(); diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java index b6f0eed8276..78360a8d7d1 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java @@ -20,6 +20,7 @@ import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.exec.TimeoutLib.Timeout; import org.apache.jena.sparql.modify.UpdateEngineFactory; import org.apache.jena.sparql.modify.UpdateProcessorBase; import org.apache.jena.sparql.util.Context; @@ -28,8 +29,8 @@ public class UpdateExecDataset extends UpdateProcessorBase implements UpdateExec { protected UpdateExecDataset(UpdateRequest request, DatasetGraph datasetGraph, - Binding inputBinding, Context context, UpdateEngineFactory factory) { - super(request, datasetGraph, inputBinding, context, factory); + Binding inputBinding, Context context, UpdateEngineFactory factory, Timeout timeout) { + super(request, datasetGraph, inputBinding, context, factory, timeout); } @Override diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java index 15c61bd1aff..5a912313391 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java @@ -21,13 +21,15 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.jena.graph.Node; import org.apache.jena.query.ARQ; -import org.apache.jena.query.Query; import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.core.Var; import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.exec.TimeoutLib.Timeout; +import org.apache.jena.sparql.exec.TimeoutLib.TimeoutBuilderImpl; import org.apache.jena.sparql.modify.UpdateEngineFactory; import org.apache.jena.sparql.modify.UpdateEngineRegistry; import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps; @@ -43,19 +45,19 @@ public class UpdateExecDatasetBuilder implements UpdateExecBuilder { public static UpdateExecDatasetBuilder create() { return new UpdateExecDatasetBuilder(); } - private DatasetGraph dataset = null; - private Query query = null; - // private Context context = null; + private DatasetGraph dataset = null; private ContextAccumulator contextAcc = ContextAccumulator.newBuilder(()->ARQ.getContext(), ()->Context.fromDataset(dataset)); - // Uses query rewrite to replace variables by values. - private Map substitutionMap = null; + private Map substitutionMap = null; + + private Binding initialBinding = null; + + private TimeoutBuilderImpl timeoutBuilder = new TimeoutBuilderImpl(); - private Binding initialBinding = null; - private UpdateRequest update = null; - private UpdateRequest updateRequest = new UpdateRequest(); + private UpdateRequest update = null; + private UpdateRequest updateRequest = new UpdateRequest(); private UpdateExecDatasetBuilder() {} @@ -141,6 +143,12 @@ private void ensureSubstitutionMap() { substitutionMap = new HashMap<>(); } + @Override + public UpdateExecDatasetBuilder timeout(long timeout, TimeUnit timeoutUnit) { + this.timeoutBuilder.timeout(timeout, timeoutUnit); + return this; + } + /** Use {@link #substitution(Binding)} */ @Deprecated public UpdateExecDatasetBuilder initialBinding(Binding initialBinding) { @@ -162,7 +170,10 @@ public UpdateExec build() { UpdateEngineFactory f = UpdateEngineRegistry.get().find(dataset, cxt); if ( f == null ) throw new UpdateException("Failed to find an UpdateEngine"); - UpdateExec uExec = new UpdateExecDataset(actualUpdate, dataset, initialBinding, cxt, f); + + Timeout timeout = timeoutBuilder.build(); + + UpdateExec uExec = new UpdateExecDataset(actualUpdate, dataset, initialBinding, cxt, f, timeout); return uExec; } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java index 8a7d7af62b1..7123b9c32fc 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java @@ -45,4 +45,9 @@ protected UpdateExecutionAdapter(UpdateExec updateExec) { public Context getContext() { return updateExec.getContext(); } + + @Override + public void abort() { + updateExec.abort(); + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java index 48240e68229..88e91e98c46 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java @@ -18,6 +18,8 @@ package org.apache.jena.sparql.exec; +import java.util.concurrent.TimeUnit; + import org.apache.jena.query.QuerySolution; import org.apache.jena.rdf.model.RDFNode; import org.apache.jena.sparql.engine.binding.Binding; @@ -103,6 +105,12 @@ public UpdateExecutionBuilder substitution(String varName, RDFNode value) { return this; } + @Override + public UpdateExecutionBuilder timeout(long value, TimeUnit timeUnit) { + builder.timeout(value, timeUnit); + return this; + } + @Override public UpdateExecution build() { return UpdateExecutionAdapter.adapt(builder.build()); diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java index 80644d0667f..02d51af5a97 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java @@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.jena.http.HttpEnv; import org.apache.jena.http.HttpLib; @@ -59,13 +60,16 @@ public static UpdateExecHTTPBuilder service(String endpointURL) { private final Params params; private final List usingGraphURIs; private final List usingNamedGraphURIs; + private final long timeout; + private final TimeUnit timeoutUnit; /*package*/ UpdateExecHTTP(String serviceURL, UpdateRequest update, String updateString, HttpClient httpClient, Params params, List usingGraphURIs, List usingNamedGraphURIs, Map httpHeaders, UpdateSendMode sendMode, - Context context) { + Context context, + long timeout, TimeUnit timeoutUnit) { this.context = context; this.service = serviceURL; //this.update = update; @@ -77,6 +81,8 @@ public static UpdateExecHTTPBuilder service(String endpointURL) { this.usingNamedGraphURIs = usingNamedGraphURIs; this.httpHeaders = httpHeaders; this.sendMode = sendMode; + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; } @Override @@ -130,7 +136,7 @@ private void executePostForm(Params thisParams) { } private String executeUpdate(String requestURL, BodyPublisher body, String contentType) { - HttpRequest.Builder builder = HttpLib.requestBuilder(requestURL, httpHeaders, -1L, null); + HttpRequest.Builder builder = HttpLib.requestBuilder(requestURL, httpHeaders, timeout, timeoutUnit); builder = contentTypeHeader(builder, contentType); HttpRequest request = builder.POST(body).build(); logUpdate(updateString, request); @@ -139,4 +145,9 @@ private String executeUpdate(String requestURL, BodyPublisher body, String conte } private static void logUpdate(String updateString, HttpRequest request) {} + + @Override + public void abort() { + // FIXME Currently a noop. Needs revision according to QueryExecHTTP. + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java index fb11b209854..c38ff2c72c4 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java @@ -45,6 +45,6 @@ protected UpdateExecHTTP buildX(HttpClient hClient, UpdateRequest updateActual, copyArray(usingGraphURIs), copyArray(usingNamedGraphURIs), new HashMap<>(httpHeaders), - sendMode, cxt); + sendMode, cxt, timeout, timeoutUnit); } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java index 5d6350c0407..f4aecfb0b7c 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java @@ -52,7 +52,8 @@ protected UpdateExecutionHTTP buildX(HttpClient hClient, UpdateRequest updateAct copyArray(usingGraphURIs), copyArray(usingNamedGraphURIs), new HashMap<>(httpHeaders), - sendMode, cxt); + sendMode, cxt, + timeout, timeoutUnit); return new UpdateExecutionHTTP(uExec); } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java index d8ef8284dd2..74171307c5e 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java @@ -30,12 +30,12 @@ public interface UpdateEngine * Signal start of a request being executed */ public void startRequest(); - + /** - * Signal end of a request being executed + * Signal end of a request being executed */ public void finishRequest(); - + /** * Returns an {@link UpdateSink} that accepts Update operations */ diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java index 597e1c0b9cf..45b23e387fc 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java @@ -38,7 +38,7 @@ * See {@link UpdateEngineNonStreaming} for a subclass that accumulates updates, including during * parsing then executes the operation. */ -public class UpdateEngineMain extends UpdateEngineBase +public class UpdateEngineMain extends UpdateEngineBase { /** * Creates a new Update Engine @@ -53,12 +53,12 @@ public UpdateEngineMain(DatasetGraph datasetGraph, Binding inputBinding, Context @Override public void startRequest() {} - + @Override public void finishRequest() {} - + private UpdateSink updateSink = null ; - + /* * Returns the {@link UpdateSink}. In this implementation, this is done by with * an {@link UpdateVisitor} which will visit each update operation and send the @@ -71,11 +71,11 @@ public UpdateSink getUpdateSink() { if ( updateSink == null ) updateSink = new UpdateVisitorSink(this.prepareWorker(), - sink(q->datasetGraph.add(q)), + sink(q->datasetGraph.add(q)), sink(q->datasetGraph.delete(q))); return updateSink ; } - + /** * Creates the {@link UpdateVisitor} which will do the work of applying the updates * @return The update visitor to be used to apply the updates @@ -84,18 +84,18 @@ protected UpdateVisitor prepareWorker() { return new UpdateEngineWorker(datasetGraph, inputBinding, context) ; } - /** Direct a sink to a Consumer. */ + /** Direct a sink to a Consumer. */ private Sink sink(Consumer action) { return new Sink() { @Override public void send(X item) { action.accept(item); } - @Override public void close() {} + @Override public void close() {} @Override public void flush() {} - }; + }; } - + private static UpdateEngineFactory factory = new UpdateEngineFactory() { @Override diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java index 53cc9685716..26395cc9916 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.jena.atlas.data.BagFactory; import org.apache.jena.atlas.data.DataBag; @@ -45,6 +46,7 @@ import org.apache.jena.sparql.engine.binding.Binding; import org.apache.jena.sparql.engine.binding.BindingRoot; import org.apache.jena.sparql.exec.*; +import org.apache.jena.sparql.exec.TimeoutLib.Timeout; import org.apache.jena.sparql.graph.GraphFactory; import org.apache.jena.sparql.graph.GraphOps; import org.apache.jena.sparql.modify.request.*; @@ -64,10 +66,42 @@ public class UpdateEngineWorker implements UpdateVisitor protected final Binding inputBinding; // Used for UpdateModify only: substitution is better. protected final Context context; + protected final Timeout timeout; + + /** Used to compute the remaining overall time that may be spent in query execution. */ + protected long startTimeMillis = -1; + + /** The currently executing query exec. */ + protected final AtomicBoolean cancelSignal; + protected volatile QueryExec activeQExec = null; + public UpdateEngineWorker(DatasetGraph datasetGraph, Binding inputBinding, Context context) { this.datasetGraph = datasetGraph; this.inputBinding = inputBinding; this.context = context; + this.timeout = TimeoutLib.parseUpdateTimeouts(context); + this.cancelSignal = Context.getOrSetCancelSignal(context); + } + + public void abort() { + if (cancelSignal.compareAndSet(false, true)) { + synchronized (this) { + // If the change of the cancel signal happened here then abort the activeQExec. + if (activeQExec != null) { + activeQExec.abort(); + } + } + } + } + + private synchronized void setQExec(QueryExec qExec) { + synchronized (this) { + this.activeQExec = qExec; + // Cancel the qExec immediately if the cancel signal is true. + if (cancelSignal.get()) { + activeQExec.abort(); + } + } } @Override @@ -528,7 +562,7 @@ protected Iterator evalBindings(Element pattern) { } @SuppressWarnings("all") - protected static Iterator evalBindings(Query query, DatasetGraph dsg, Binding inputBinding, Context context) { + protected Iterator evalBindings(Query query, DatasetGraph dsg, Binding inputBinding, Context context) { // The UpdateProcessorBase already copied the context and made it safe // ... but that's going to happen again :-( if ( query == null ) { @@ -536,6 +570,8 @@ protected static Iterator evalBindings(Query query, DatasetGraph dsg, B return Iter.singletonIterator(binding); } + updateRemainingQueryTimeout(context); + // Not QueryExecDataset.dataset(...) because of initialBinding. QueryExecDatasetBuilder builder = QueryExecDatasetBuilder.create().dataset(dsg).query(query).context(context); if ( inputBinding != null ) { @@ -545,9 +581,34 @@ protected static Iterator evalBindings(Query query, DatasetGraph dsg, B // builder.substitution(inputBinding); } QueryExec qExec = builder.build(); + setQExec(qExec); return qExec.select(); } + private void updateRemainingQueryTimeout(Context context) { + Timeout finalTimeout = null; + if (timeout.hasOverallTimeout()) { + long remainingOverallTimeoutMillis = -1; + if (startTimeMillis < 0) { + startTimeMillis = System.currentTimeMillis(); + remainingOverallTimeoutMillis = timeout.overallTimeoutMillis(); + } else { + long currentTimeMillis = System.currentTimeMillis(); + long elapsedMillis = currentTimeMillis - startTimeMillis; + remainingOverallTimeoutMillis -= elapsedMillis; + if (remainingOverallTimeoutMillis < 0) { + remainingOverallTimeoutMillis = 0; + } + } + finalTimeout = new Timeout(timeout.initialTimeoutMillis(), remainingOverallTimeoutMillis); + } else if(timeout.hasInitialTimeout()) { + finalTimeout = new Timeout(timeout.initialTimeoutMillis(), -1); + } + + // Override any prior queryTimeout symbol with a fresh value computed from the configured updateTimeout. + TimeoutLib.setQueryTimeout(context, finalTimeout); + } + /** * Execute. *
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java index 9abb33dccfd..2cb27dd3266 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java @@ -18,9 +18,13 @@ package org.apache.jena.sparql.modify; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.jena.atlas.iterator.Iter ; import org.apache.jena.sparql.core.DatasetGraph ; import org.apache.jena.sparql.engine.binding.Binding ; +import org.apache.jena.sparql.exec.TimeoutLib; +import org.apache.jena.sparql.exec.TimeoutLib.Timeout; import org.apache.jena.sparql.util.Context ; import org.apache.jena.update.UpdateProcessor ; import org.apache.jena.update.UpdateRequest ; @@ -35,12 +39,14 @@ public class UpdateProcessorBase implements UpdateProcessor protected final Binding inputBinding; protected final UpdateEngineFactory factory ; protected final Context context ; + protected final Timeout timeout ; public UpdateProcessorBase(UpdateRequest request, DatasetGraph datasetGraph, Binding inputBinding, Context context, - UpdateEngineFactory factory) + UpdateEngineFactory factory, + Timeout timeout) { this.request = request ; this.datasetGraph = datasetGraph ; @@ -48,6 +54,9 @@ public UpdateProcessorBase(UpdateRequest request, this.context = context; Context.setCurrentDateTime(this.context) ; this.factory = factory ; + this.timeout = timeout; + Context.getOrSetCancelSignal(this.context) ; + TimeoutLib.setUpdateTimeout(context, timeout); } @Override @@ -55,6 +64,7 @@ public void execute() { UpdateEngine uProc = factory.create(datasetGraph, inputBinding, context); uProc.startRequest(); + // context.get(ARQ.updateTimeout); try { UpdateSink sink = uProc.getUpdateSink(); Iter.sendToSink(request.iterator(), sink); // Will call close on sink if there are no exceptions @@ -67,4 +77,14 @@ public void execute() { public Context getContext() { return context; } + + @Override + public void abort() { + // Right now abort is only signaled via the context's cancel signal. + // An improvement might be introducing UpdateEngine.abort(). + AtomicBoolean cancelSignal = Context.getCancelSignal(context); + if (cancelSignal != null) { + cancelSignal.set(true); + } + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java b/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java index 9521ea3f7d2..08d6c41583b 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java @@ -20,13 +20,16 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import org.apache.jena.atlas.lib.Lib; +import org.apache.jena.atlas.logging.Log; import org.apache.jena.query.ARQ; import org.apache.jena.sparql.ARQConstants; import org.apache.jena.sparql.ARQException; import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.engine.ExecutionContext; /** * A class for setting and keeping named values. Used to pass @@ -418,6 +421,26 @@ public static void setCurrentDateTime(Context context) { context.set(ARQConstants.sysCurrentTime, NodeFactoryExtra.nowAsDateTime()); } + public static AtomicBoolean getCancelSignal(Context context) { + if ( context == null ) + return null; + try { + return context.get(ARQConstants.symCancelQuery); + } catch (ClassCastException ex) { + Log.error(Context.class, "Class cast exception: Expected AtomicBoolean for cancel control: "+ex.getMessage()); + return null; + } + } + + public static AtomicBoolean getOrSetCancelSignal(Context context) { + AtomicBoolean cancelSignal = getCancelSignal(context); + if (cancelSignal == null) { + cancelSignal = new AtomicBoolean(false); + context.set(ARQConstants.symCancelQuery, cancelSignal); + } + return cancelSignal; + } + /** Merge an outer (defaults to the system global context) * and local context to produce a new context * The new context is always a separate copy. diff --git a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java index 0a294e3c225..3672c15b8b4 100644 --- a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java @@ -18,6 +18,8 @@ package org.apache.jena.update; +import java.util.concurrent.TimeUnit; + import org.apache.jena.query.QuerySolution; import org.apache.jena.rdf.model.RDFNode; import org.apache.jena.sparql.util.Context; @@ -47,6 +49,10 @@ public interface UpdateExecutionBuilder { public UpdateExecutionBuilder substitution(String varName, RDFNode value); + public UpdateExecutionBuilder timeout(long value, TimeUnit timeUnit); + + public default UpdateExecutionBuilder timeout(long value) { return timeout(value, TimeUnit.MILLISECONDS); } + public UpdateExecution build(); /** Build and execute */ diff --git a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java index 63efc743f4e..b0ce8acac21 100644 --- a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java @@ -18,6 +18,8 @@ package org.apache.jena.update; +import java.util.concurrent.TimeUnit; + import org.apache.jena.graph.Node; import org.apache.jena.query.Dataset; import org.apache.jena.query.QueryExecution; @@ -131,6 +133,12 @@ public UpdateExecutionDatasetBuilder substitution(String varName, RDFNode value) return this; } + @Override + public UpdateExecutionBuilder timeout(long value, TimeUnit timeUnit) { + builder.timeout(value, timeUnit); + return this; + } + @Override public UpdateExecution build() { UpdateExec exec = builder.build(); diff --git a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java index b9e6f42909c..df2964ea11a 100644 --- a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java +++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java @@ -29,5 +29,8 @@ public interface UpdateProcessor /** Execute */ public void execute() ; + /** Attempt to asynchronously abort an update execution. */ + public void abort() ; + public Context getContext(); } diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java index d7bf6f318da..f7d4e99b338 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -352,6 +353,28 @@ public void test_cancel_signal_update_3() { Assert.assertEquals(1, Iter.count(dsg.find())); } + private static Graph createTestGraph() { + // Create a model with 1000 triples + Graph graph = GraphFactory.createDefaultGraph(); + IntStream.range(0, 1000) + .mapToObj(i -> NodeFactory.createURI("http://www.example.org/r" + i)) + .forEach(node -> graph.add(node, node, node)); + return graph; + } + + @Test(expected = QueryCancelledException.class, timeout = 5000) + public void test_update_cancel_1() { + Graph graph = createTestGraph(); + // Create a query that creates 3 cross joins - resulting in one billion result rows + UpdateExec + .dataset(graph) + // No-op delete followed by insert - indirectly tests remaining time calculation + .update("DELETE {

} WHERE { ?a ?b ?c } ; INSERT {

} WHERE { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }") + .timeout(50, TimeUnit.MILLISECONDS) + .build() + .execute(); + } + static void cancellationTest(String queryString, Function> itFactory, Consumer> itConsumer) { cancellationTest(queryString, itFactory::apply); cancellationTestForIterator(queryString, itFactory, itConsumer); @@ -392,11 +415,7 @@ public void test_cancel_concurrent_1() { int taskCount = cpuCount * 10; // Create a model with 1000 triples - Graph graph = GraphFactory.createDefaultGraph(); - IntStream.range(0, 1000) - .mapToObj(i -> NodeFactory.createURI("http://www.example.org/r" + i)) - .forEach(node -> graph.add(node, node, node)); - Model model = ModelFactory.createModelForGraph(graph); + Model model = ModelFactory.createModelForGraph(createTestGraph()); // Create a query that creates 3 cross joins - resulting in one billion result rows Query query = QueryFactory.create("SELECT * { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }");