Skip to content

Commit

Permalink
Make sure the result handler runs on the correct Vert.x context
Browse files Browse the repository at this point in the history
  • Loading branch information
garricko committed Mar 14, 2017
1 parent 422b310 commit b23ea3d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 14 deletions.
44 changes: 39 additions & 5 deletions src/main/java/com/github/susom/database/VertxUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.slf4j.MDC;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand All @@ -22,10 +23,11 @@ public class VertxUtil {
* Wrap a Handler in a way that will preserve the SLF4J MDC context.
* The context from the current thread at the time of this method call
* will be cached and restored within the wrapper at the time the
* handler is invoked.
* handler is invoked. This version delegates the handler call directly
* on the thread that calls it.
*/
public static <T> Handler<T> mdc(Handler<T> handler) {
Map mdc = MDC.getCopyOfContextMap();
public static <T> Handler<T> mdc(final Handler<T> handler) {
final Map mdc = MDC.getCopyOfContextMap();

return t -> {
Map restore = MDC.getCopyOfContextMap();
Expand All @@ -46,6 +48,38 @@ public static <T> Handler<T> mdc(Handler<T> handler) {
};
}

/**
* Wrap a Handler in a way that will preserve the SLF4J MDC context.
* The context from the current thread at the time of this method call
* will be cached and restored within the wrapper at the time the
* handler is invoked. This version delegates the handler call using
* {@link Context#runOnContext(Handler)} from the current context that
* calls this method, ensuring the handler call will run on the correct
* event loop.
*/
public static <T> Handler<T> mdcEventLoop(final Handler<T> handler) {
final Map mdc = MDC.getCopyOfContextMap();
final Context context = Vertx.currentContext();

return t -> context.runOnContext((v) -> {
Map restore = MDC.getCopyOfContextMap();
try {
if (mdc == null) {
MDC.clear();
} else {
MDC.setContextMap(mdc);
}
handler.handle(t);
} finally {
if (restore == null) {
MDC.clear();
} else {
MDC.setContextMap(restore);
}
}
});
}

/**
* Equivalent to {@link Vertx#executeBlocking(Handler, Handler)},
* but preserves the {@link MDC} correctly.
Expand All @@ -60,7 +94,7 @@ public static <T> void executeBlocking(Vertx vertx, Handler<Future<T>> future, H
*/
public static <T> void executeBlocking(Vertx vertx, Handler<Future<T>> future, boolean ordered,
Handler<AsyncResult<T>> handler) {
vertx.executeBlocking(mdc(future), ordered, mdc(handler));
vertx.executeBlocking(mdc(future), ordered, mdcEventLoop(handler));
}

/**
Expand All @@ -77,6 +111,6 @@ public static <T> void executeBlocking(WorkerExecutor executor, Handler<Future<T
*/
public static <T> void executeBlocking(WorkerExecutor executor, Handler<Future<T>> future, boolean ordered,
Handler<AsyncResult<T>> handler) {
executor.executeBlocking(mdc(future), ordered, mdc(handler));
executor.executeBlocking(mdc(future), ordered, mdcEventLoop(handler));
}
}
75 changes: 66 additions & 9 deletions src/test/java/com/github/susom/database/test/VertxLoggingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.github.susom.database.DatabaseProviderVertx;
import com.github.susom.database.DatabaseProviderVertx.Builder;

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
Expand All @@ -24,7 +25,7 @@
@RunWith(VertxUnitRunner.class)
public class VertxLoggingTest {
private Logger log = LoggerFactory.getLogger(VertxLoggingTest.class);
int times = 0;
private int times = 0;

static {
// We will put all Derby related files inside ./build to keep our working copy clean
Expand All @@ -34,6 +35,63 @@ public class VertxLoggingTest {
}
}

@Test
public void testContextPropagation(TestContext context) {
Async async = context.async();

Vertx vertx = Vertx.vertx();

Config config = ConfigFrom.firstOf().value("database.url", "jdbc:derby:target/testdb;create=true").get();
Builder db = DatabaseProviderVertx.pooledBuilder(vertx, config).withSqlParameterLogging();

vertx.createHttpServer()
.requestHandler(r -> {
Context context1 = Vertx.currentContext();
log.debug("Request before blocking1: " + context1);
db.transactAsync(dbs -> {
Context context2 = Vertx.currentContext();
log.debug("Request inside blocking2: " + context2);
context.assertNotEquals(context1, context2);
return null;
}, result -> {
Context context3 = Vertx.currentContext();
log.debug("Request after blocking3: " + context3);
context.assertEquals(context1, context3);

db.transactAsync(dbs -> {
Context context4 = Vertx.currentContext();
log.debug("Request inside blocking4: " + context4);
context.assertNotEquals(context1, context4);
return null;
}, result2 -> {
Context context5 = Vertx.currentContext();
log.debug("Request after blocking5: " + context5);
context.assertEquals(context1, context5);
db.transactAsync(dbs -> {
Context context6 = Vertx.currentContext();
log.debug("Request inside blocking6: " + context6);
context.assertNotEquals(context1, context6);
return null;
}, result3 -> {
Context context7 = Vertx.currentContext();
log.debug("Request after blocking7: " + context7);
context.assertEquals(context1, context7);
r.response().end();
MDC.clear();
async.complete();
});
MDC.put("userId", "bob");
});
MDC.put("userId", "bob");
});
}).listen(8111, server ->
vertx.createHttpClient().get(8111, "localhost", "/foo").handler(response -> {
context.assertEquals(200, response.statusCode());
vertx.close();
}).end()
);
}

@Test
public void testMdcTransferToWorkerDatabase(TestContext context) {
Async async = context.async();
Expand Down Expand Up @@ -86,14 +144,13 @@ public void testMdcTransferToWorkerDatabase(TestContext context) {
MDC.put("userId", "bob");
});

// Simulate the event loop doing something else while we
// are processing, which may mess up the MDC
MDC.put("userId", "bob");
}).listen(8101, server -> {
vertx.createHttpClient().get(8101, "localhost", "/foo").handler(response ->
context.assertEquals(200, response.statusCode())
).end();
});
// Clear the MDC when we are done (the async stuff should track/restore as needed)
MDC.clear();
}).listen(8101, server ->
vertx.createHttpClient().get(8101, "localhost", "/foo").handler(response ->
context.assertEquals(200, response.statusCode())
).end()
);

vertx.setPeriodic(100, id -> {
// Repeat enough times to cycle through all workers
Expand Down

0 comments on commit b23ea3d

Please sign in to comment.