Skip to content

Commit

Permalink
Merge pull request #201 from Ladicek/bugfixes
Browse files Browse the repository at this point in the history
bugfixes
  • Loading branch information
Ladicek authored Mar 4, 2020
2 parents 27de391 + 9c0d7e2 commit d0e706b
Show file tree
Hide file tree
Showing 31 changed files with 873 additions and 417 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,20 @@ public V call() throws Exception {
return delegate.call();
}

// arbitrary contextual data

private final ConcurrentMap<Class<?>, Object> data = new ConcurrentHashMap<>();

public <T> void set(Class<T> clazz, T object) {
data.put(clazz, object);
}

public <T> T get(Class<T> clazz) {
return clazz.cast(data.get(clazz));
}

// out-of-band communication between fault tolerance strategies in a single chain
// (only makes sense if different strategies in the chain run on different threads)

private final ConcurrentMap<Class<? extends InvocationContextEvent>, Collection<Consumer<? extends InvocationContextEvent>>> eventHandlers = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.faulttolerance.core.bulkhead;

import static io.smallrye.faulttolerance.core.util.CompletionStages.failedStage;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -36,20 +38,21 @@ public CompletionStageBulkhead(

@Override
public CompletionStage<V> apply(InvocationContext<CompletionStage<V>> ctx) {
// TODO we shouldn't put tasks into the executor if they immediately block on workSemaphore,
// they should be put into some queue
if (capacitySemaphore.tryAcquire()) {
CompletionStageBulkheadTask task = new CompletionStageBulkheadTask(System.nanoTime(), ctx);
executor.execute(task);
recorder.bulkheadQueueEntered();
return task.result;
} else {
recorder.bulkheadRejected();
CompletableFuture<V> result = new CompletableFuture<>();
result.completeExceptionally(bulkheadRejected());
return result;
return failedStage(bulkheadRejected());
}
}

public int getQueueSize() {
// only for tests
int getQueueSize() {
return Math.max(0, queueSize - capacitySemaphore.availablePermits());
}

Expand All @@ -68,7 +71,7 @@ public void run() {
try {
workSemaphore.acquire();
} catch (InterruptedException e) {
logger.error("Bulkhead worker interrupted, exiting", e);
// among other occasions, this also happens during shutdown
result.completeExceptionally(e);
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.smallrye.faulttolerance.core.circuit.breaker;

import static io.smallrye.faulttolerance.core.util.CompletionStages.failedStage;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
Expand Down Expand Up @@ -48,34 +49,31 @@ public CompletionStage<V> apply(InvocationContext<CompletionStage<V>> ctx) throw

private CompletionStage<V> inClosed(InvocationContext<CompletionStage<V>> target, CircuitBreaker.State state) {
try {
CompletionStage<V> result = delegate.apply(target);
CompletableFuture<V> result = new CompletableFuture<>();

return result.handle((val, error) -> {
delegate.apply(target).whenComplete((value, error) -> {
if (error != null) {
throw onFailure(state, error);
onFailure(state, error);
result.completeExceptionally(error);
} else {
metricsRecorder.circuitBreakerSucceeded();
boolean failureThresholdReached = state.rollingWindow.recordSuccess();
if (failureThresholdReached) {
toOpen(state);
}
listeners.forEach(CircuitBreakerListener::succeeded);
return val;
result.complete(value);
}
});

return result;
} catch (Throwable e) {
CompletionException failure = onFailure(state, e);
return failedCompletionStage(failure);
onFailure(state, e);
return failedStage(e);
}
}

private CompletionStage<V> failedCompletionStage(Throwable failure) {
CompletableFuture<V> result = new CompletableFuture<>();
result.completeExceptionally(failure);
return result;
}

private CompletionException onFailure(State state, Throwable e) {
private void onFailure(State state, Throwable e) {
boolean isFailure = !isConsideredSuccess(e);
if (isFailure) {
listeners.forEach(CircuitBreakerListener::failed);
Expand All @@ -96,20 +94,14 @@ private CompletionException onFailure(State state, Throwable e) {

toOpen(state);
}

if (e instanceof CompletionException) {
return (CompletionException) e;
} else {
return new CompletionException(e);
}
}

private CompletionStage<V> inOpen(InvocationContext<CompletionStage<V>> target,
CircuitBreaker.State state) throws Exception {
if (state.runningStopwatch.elapsedTimeInMillis() < delayInMillis) {
metricsRecorder.circuitBreakerRejected();
listeners.forEach(CircuitBreakerListener::rejected);
return failedCompletionStage(new CircuitBreakerOpenException(description + " circuit breaker is open"));
return failedStage(new CircuitBreakerOpenException(description + " circuit breaker is open"));
} else {
long now = System.nanoTime();

Expand All @@ -124,24 +116,36 @@ private CompletionStage<V> inOpen(InvocationContext<CompletionStage<V>> target,

private CompletionStage<V> inHalfOpen(InvocationContext<CompletionStage<V>> target, CircuitBreaker.State state) {
try {
CompletionStage<V> result = delegate.apply(target);
metricsRecorder.circuitBreakerSucceeded();
CompletableFuture<V> result = new CompletableFuture<>();

int successes = state.consecutiveSuccesses.incrementAndGet();
if (successes >= successThreshold) {
long now = System.nanoTime();
closedStart = now;
previousHalfOpenTime.addAndGet(now - halfOpenStart);
delegate.apply(target).whenComplete((value, error) -> {
if (error != null) {
metricsRecorder.circuitBreakerFailed();
listeners.forEach(CircuitBreakerListener::failed);
toOpen(state);
result.completeExceptionally(error);
} else {
metricsRecorder.circuitBreakerSucceeded();

int successes = state.consecutiveSuccesses.incrementAndGet();
if (successes >= successThreshold) {
long now = System.nanoTime();
closedStart = now;
previousHalfOpenTime.addAndGet(now - halfOpenStart);

toClosed(state);
}
listeners.forEach(CircuitBreakerListener::succeeded);
result.complete(value);
}
});

toClosed(state);
}
listeners.forEach(CircuitBreakerListener::succeeded);
return result;
} catch (Throwable e) {
metricsRecorder.circuitBreakerFailed();
listeners.forEach(CircuitBreakerListener::failed);
toOpen(state);
return failedCompletionStage(e);
return failedStage(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,65 +1,56 @@
package io.smallrye.faulttolerance.core.fallback;

import static io.smallrye.faulttolerance.core.util.CompletionStages.failedStage;
import static io.smallrye.faulttolerance.core.util.CompletionStages.propagateCompletion;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;
import io.smallrye.faulttolerance.core.util.SetOfThrowables;

public class CompletionStageFallback<V> extends Fallback<CompletionStage<V>> {
private final Executor executor;

public CompletionStageFallback(FaultToleranceStrategy<CompletionStage<V>> delegate, String description,
FallbackFunction<CompletionStage<V>> fallback, SetOfThrowables applyOn, SetOfThrowables skipOn,
Executor executor, MetricsRecorder metricsRecorder) {
MetricsRecorder metricsRecorder) {
super(delegate, description, fallback, applyOn, skipOn, metricsRecorder);
this.executor = executor;
}

@Override
public CompletionStage<V> apply(InvocationContext<CompletionStage<V>> ctx) {
CompletableFuture<V> result = new CompletableFuture<>();

executor.execute(() -> {
CompletionStage<V> originalResult;
try {
originalResult = delegate.apply(ctx);
} catch (Exception e) {
CompletableFuture<V> failure = new CompletableFuture<>();
failure.completeExceptionally(e);
originalResult = failure;
CompletionStage<V> originalResult;
try {
originalResult = delegate.apply(ctx);
} catch (Exception e) {
originalResult = failedStage(e);
}

originalResult.whenComplete((value, exception) -> {
if (value != null) {
result.complete(value);
return;
}

originalResult.whenComplete((value, exception) -> {
if (value != null) {
result.complete(value);
return;
}

if (exception instanceof InterruptedException || Thread.interrupted()) {
result.completeExceptionally(new InterruptedException());
return;
}
if (exception instanceof InterruptedException || Thread.interrupted()) {
result.completeExceptionally(new InterruptedException());
return;
}

if (shouldSkipFallback(exception)) {
result.completeExceptionally(exception);
}
if (shouldSkipFallback(exception)) {
result.completeExceptionally(exception);
return;
}

try {
metricsRecorder.fallbackCalled();
fallback.call(exception).whenComplete((fallbackValue, fallbackException) -> {
if (fallbackValue != null) {
result.complete(fallbackValue);
} else {
result.completeExceptionally(fallbackException);
}
});
} catch (Exception e) {
result.completeExceptionally(e);
}
});
try {
metricsRecorder.fallbackCalled();
FallbackContext<CompletionStage<V>> fallbackContext = new FallbackContext<>(exception, ctx);
propagateCompletion(fallback.call(fallbackContext), result);
} catch (Exception e) {
result.completeExceptionally(e);
}
});

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ public V apply(InvocationContext<V> ctx) throws Exception {
}

if (shouldSkipFallback(failure)) {
sneakyThrow(failure);
throw sneakyThrow(failure);
}

metricsRecorder.fallbackCalled();
return fallback.call(failure);
FallbackContext<V> fallbackContext = new FallbackContext<>(failure, ctx);
return fallback.call(fallbackContext);
}

boolean shouldSkipFallback(Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.smallrye.faulttolerance.core.fallback;

import io.smallrye.faulttolerance.core.InvocationContext;

public final class FallbackContext<V> {
public final Throwable failure;
public final InvocationContext<V> invocationContext;

public FallbackContext(Throwable failure, InvocationContext<V> invocationContext) {
this.failure = failure;
this.invocationContext = invocationContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

@FunctionalInterface
public interface FallbackFunction<T> {
T call(Throwable failure) throws Exception;
T call(FallbackContext<T> ctx) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
* The strategies here are not meant to be used directly.
* Their API is not optimized for end user friendliness, but for integration correctness.
* The core abstraction is {@link io.smallrye.faulttolerance.core.FaultToleranceStrategy}; each strategy implements that.
* API stability is <b>not</b> guaranteed!
* API stability in this package and all its subpackages is <b>not</b> guaranteed!
*/
package io.smallrye.faulttolerance.core;
Loading

0 comments on commit d0e706b

Please sign in to comment.