From b23ea3d735a80b79aa656f5ead128423ccfc2bb1 Mon Sep 17 00:00:00 2001 From: Garrick Olson Date: Tue, 14 Mar 2017 16:52:35 -0700 Subject: [PATCH] Make sure the result handler runs on the correct Vert.x context --- .../com/github/susom/database/VertxUtil.java | 44 +++++++++-- .../susom/database/test/VertxLoggingTest.java | 75 ++++++++++++++++--- 2 files changed, 105 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/github/susom/database/VertxUtil.java b/src/main/java/com/github/susom/database/VertxUtil.java index 30f418a..e7d2921 100644 --- a/src/main/java/com/github/susom/database/VertxUtil.java +++ b/src/main/java/com/github/susom/database/VertxUtil.java @@ -5,6 +5,7 @@ import org.slf4j.MDC; import io.vertx.core.AsyncResult; +import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -22,10 +23,11 @@ public class VertxUtil { * Wrap a Handler in a way that will preserve the SLF4J MDC context. * The context from the current thread at the time of this method call * will be cached and restored within the wrapper at the time the - * handler is invoked. + * handler is invoked. This version delegates the handler call directly + * on the thread that calls it. */ - public static Handler mdc(Handler handler) { - Map mdc = MDC.getCopyOfContextMap(); + public static Handler mdc(final Handler handler) { + final Map mdc = MDC.getCopyOfContextMap(); return t -> { Map restore = MDC.getCopyOfContextMap(); @@ -46,6 +48,38 @@ public static Handler mdc(Handler handler) { }; } + /** + * Wrap a Handler in a way that will preserve the SLF4J MDC context. + * The context from the current thread at the time of this method call + * will be cached and restored within the wrapper at the time the + * handler is invoked. This version delegates the handler call using + * {@link Context#runOnContext(Handler)} from the current context that + * calls this method, ensuring the handler call will run on the correct + * event loop. + */ + public static Handler mdcEventLoop(final Handler handler) { + final Map mdc = MDC.getCopyOfContextMap(); + final Context context = Vertx.currentContext(); + + return t -> context.runOnContext((v) -> { + Map restore = MDC.getCopyOfContextMap(); + try { + if (mdc == null) { + MDC.clear(); + } else { + MDC.setContextMap(mdc); + } + handler.handle(t); + } finally { + if (restore == null) { + MDC.clear(); + } else { + MDC.setContextMap(restore); + } + } + }); + } + /** * Equivalent to {@link Vertx#executeBlocking(Handler, Handler)}, * but preserves the {@link MDC} correctly. @@ -60,7 +94,7 @@ public static void executeBlocking(Vertx vertx, Handler> future, H */ public static void executeBlocking(Vertx vertx, Handler> future, boolean ordered, Handler> handler) { - vertx.executeBlocking(mdc(future), ordered, mdc(handler)); + vertx.executeBlocking(mdc(future), ordered, mdcEventLoop(handler)); } /** @@ -77,6 +111,6 @@ public static void executeBlocking(WorkerExecutor executor, Handler void executeBlocking(WorkerExecutor executor, Handler> future, boolean ordered, Handler> handler) { - executor.executeBlocking(mdc(future), ordered, mdc(handler)); + executor.executeBlocking(mdc(future), ordered, mdcEventLoop(handler)); } } diff --git a/src/test/java/com/github/susom/database/test/VertxLoggingTest.java b/src/test/java/com/github/susom/database/test/VertxLoggingTest.java index 7fd483c..9e97783 100644 --- a/src/test/java/com/github/susom/database/test/VertxLoggingTest.java +++ b/src/test/java/com/github/susom/database/test/VertxLoggingTest.java @@ -13,6 +13,7 @@ import com.github.susom.database.DatabaseProviderVertx; import com.github.susom.database.DatabaseProviderVertx.Builder; +import io.vertx.core.Context; import io.vertx.core.Vertx; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; @@ -24,7 +25,7 @@ @RunWith(VertxUnitRunner.class) public class VertxLoggingTest { private Logger log = LoggerFactory.getLogger(VertxLoggingTest.class); - int times = 0; + private int times = 0; static { // We will put all Derby related files inside ./build to keep our working copy clean @@ -34,6 +35,63 @@ public class VertxLoggingTest { } } + @Test + public void testContextPropagation(TestContext context) { + Async async = context.async(); + + Vertx vertx = Vertx.vertx(); + + Config config = ConfigFrom.firstOf().value("database.url", "jdbc:derby:target/testdb;create=true").get(); + Builder db = DatabaseProviderVertx.pooledBuilder(vertx, config).withSqlParameterLogging(); + + vertx.createHttpServer() + .requestHandler(r -> { + Context context1 = Vertx.currentContext(); + log.debug("Request before blocking1: " + context1); + db.transactAsync(dbs -> { + Context context2 = Vertx.currentContext(); + log.debug("Request inside blocking2: " + context2); + context.assertNotEquals(context1, context2); + return null; + }, result -> { + Context context3 = Vertx.currentContext(); + log.debug("Request after blocking3: " + context3); + context.assertEquals(context1, context3); + + db.transactAsync(dbs -> { + Context context4 = Vertx.currentContext(); + log.debug("Request inside blocking4: " + context4); + context.assertNotEquals(context1, context4); + return null; + }, result2 -> { + Context context5 = Vertx.currentContext(); + log.debug("Request after blocking5: " + context5); + context.assertEquals(context1, context5); + db.transactAsync(dbs -> { + Context context6 = Vertx.currentContext(); + log.debug("Request inside blocking6: " + context6); + context.assertNotEquals(context1, context6); + return null; + }, result3 -> { + Context context7 = Vertx.currentContext(); + log.debug("Request after blocking7: " + context7); + context.assertEquals(context1, context7); + r.response().end(); + MDC.clear(); + async.complete(); + }); + MDC.put("userId", "bob"); + }); + MDC.put("userId", "bob"); + }); + }).listen(8111, server -> + vertx.createHttpClient().get(8111, "localhost", "/foo").handler(response -> { + context.assertEquals(200, response.statusCode()); + vertx.close(); + }).end() + ); + } + @Test public void testMdcTransferToWorkerDatabase(TestContext context) { Async async = context.async(); @@ -86,14 +144,13 @@ public void testMdcTransferToWorkerDatabase(TestContext context) { MDC.put("userId", "bob"); }); - // Simulate the event loop doing something else while we - // are processing, which may mess up the MDC - MDC.put("userId", "bob"); - }).listen(8101, server -> { - vertx.createHttpClient().get(8101, "localhost", "/foo").handler(response -> - context.assertEquals(200, response.statusCode()) - ).end(); - }); + // Clear the MDC when we are done (the async stuff should track/restore as needed) + MDC.clear(); + }).listen(8101, server -> + vertx.createHttpClient().get(8101, "localhost", "/foo").handler(response -> + context.assertEquals(200, response.statusCode()) + ).end() + ); vertx.setPeriodic(100, id -> { // Repeat enough times to cycle through all workers