diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 654ace45476..a660a13ce84 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -87,7 +87,7 @@ /** * Implements a bookie. */ -public class BookieImpl extends BookieCriticalThread implements Bookie { +public class BookieImpl implements Bookie { private static final Logger LOG = LoggerFactory.getLogger(Bookie.class); @@ -119,6 +119,8 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { protected StateManager stateManager; + private BookieCriticalThread bookieThread; + // Expose Stats final StatsLogger statsLogger; private final BookieStats bookieStats; @@ -390,7 +392,6 @@ public BookieImpl(ServerConfiguration conf, ByteBufAllocator allocator, Supplier bookieServiceInfoProvider) throws IOException, InterruptedException, BookieException { - super("Bookie-" + conf.getBookiePort()); this.bookieServiceInfoProvider = bookieServiceInfoProvider; this.statsLogger = statsLogger; this.conf = conf; @@ -656,7 +657,9 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException @Override public synchronized void start() { - setDaemon(true); + bookieThread = new BookieCriticalThread(() -> run(), "Bookie-" + conf.getBookiePort()); + bookieThread.setDaemon(true); + ThreadRegistry.register("BookieThread", 0); if (LOG.isDebugEnabled()) { LOG.debug("I'm starting a bookie with journal directories {}", @@ -717,7 +720,7 @@ public synchronized void start() { syncThread.start(); // start bookie thread - super.start(); + bookieThread.start(); // After successful bookie startup, register listener for disk // error/full notifications. @@ -741,6 +744,20 @@ public synchronized void start() { } } + @Override + public void join() throws InterruptedException { + if (bookieThread != null) { + bookieThread.join(); + } + } + + public boolean isAlive() { + if (bookieThread == null) { + return false; + } + return bookieThread.isAlive(); + } + /* * Get the DiskFailure listener for the bookie */ @@ -824,7 +841,6 @@ public boolean isRunning() { return stateManager.isRunning(); } - @Override public void run() { // start journals for (Journal journal: journals) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 3b730cb476e..5a2030032ee 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -42,6 +42,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.bookie.stats.JournalStats; import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; @@ -66,13 +67,15 @@ /** * Provide journal related management. */ -public class Journal extends BookieCriticalThread implements CheckpointSource { +public class Journal implements CheckpointSource { private static final Logger LOG = LoggerFactory.getLogger(Journal.class); private static final RecyclableArrayList.Recycler entryListRecycler = new RecyclableArrayList.Recycler(); + private BookieCriticalThread thread; + /** * Filter to pickup journals. */ @@ -461,13 +464,13 @@ private class ForceWriteThread extends BookieCriticalThread { volatile boolean running = true; // This holds the queue entries that should be notified after a // successful force write - Thread threadToNotifyOnEx; + Consumer threadToNotifyOnEx; // should we group force writes private final boolean enableGroupForceWrites; private final Counter forceWriteThreadTime; - public ForceWriteThread(Thread threadToNotifyOnEx, + public ForceWriteThread(Consumer threadToNotifyOnEx, boolean enableGroupForceWrites, StatsLogger statsLogger) { super("ForceWriteThread"); @@ -531,7 +534,7 @@ public void run() { // Regardless of what caused us to exit, we should notify the // the parent thread as it should either exit or be in the process // of exiting else we will have write requests hang - threadToNotifyOnEx.interrupt(); + threadToNotifyOnEx.accept(null); } private void syncJournal(ForceWriteRequest lastRequest) throws IOException { @@ -652,8 +655,6 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator) { - super(journalThreadName + "-" + conf.getBookiePort()); - this.setPriority(Thread.MAX_PRIORITY); this.allocator = allocator; StatsLogger journalStatsLogger = statsLogger.scopeLabel("journalIndex", String.valueOf(journalIndex)); @@ -678,7 +679,7 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB; this.syncData = conf.getJournalSyncData(); this.maxBackupJournals = conf.getMaxBackupJournals(); - this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites(), + this.forceWriteThread = new ForceWriteThread((__) -> this.interrupt(), conf.getJournalAdaptiveGroupWrites(), journalStatsLogger); this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec()); this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold(); @@ -956,7 +957,6 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(), *

* @see org.apache.bookkeeper.bookie.SyncThread */ - @Override public void run() { LOG.info("Starting journal on {}", journalDirectory); ThreadRegistry.register(journalThreadName, 0); @@ -1256,7 +1256,7 @@ public synchronized void shutdown() { running = false; this.interrupt(); - this.join(); + this.joinThread(); LOG.info("Finished Shutting down Journal thread"); } catch (IOException | InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1284,7 +1284,21 @@ private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException */ @VisibleForTesting public void joinThread() throws InterruptedException { - join(); + if (thread != null) { + thread.join(); + } + } + + public void interrupt() { + if (thread != null) { + thread.interrupt(); + } + } + + public synchronized void start() { + thread = new BookieCriticalThread(() -> run(), journalThreadName + "-" + conf.getBookiePort()); + thread.setPriority(Thread.MAX_PRIORITY); + thread.start(); } long getMemoryUsage() {