From c1b5b3572fc248fc6677747fce4cc251b3b269bf Mon Sep 17 00:00:00 2001 From: Yan Zhao Date: Mon, 12 Feb 2024 09:20:14 +0800 Subject: [PATCH] Improve DefaultEntryLogger read performance. (#4038) * Avoid system call to improve read performance. * Fix ci. * Add comments for getCurrentWritingLogId * Fix ci. * Consider compacting log. * Fix checkstyle. * Address the comment. * Address comment. * Address the comments. * Add tests. * Fix checkstyle. * address the comments. * Fix concurrency problem. --- .../bookie/BufferedReadChannel.java | 27 ++++++++++- .../bookkeeper/bookie/DefaultEntryLogger.java | 13 ++++- .../bookie/EntryLogManagerBase.java | 6 ++- .../EntryLogManagerForSingleEntryLog.java | 5 +- .../bookie/EntryLoggerAllocator.java | 27 +++++++++-- .../TransactionalEntryLogCompactor.java | 11 +++++ .../bookie/DefaultEntryLogTest.java | 48 +++++++++++++++++++ 7 files changed, 125 insertions(+), 12 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java index 22f5a81690d..4de3890e082 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java @@ -30,7 +30,7 @@ /** * A Buffered channel without a write buffer. Only reads are buffered. */ -public class BufferedReadChannel extends BufferedChannelBase { +public class BufferedReadChannel extends BufferedChannelBase { // The capacity of the read buffer. protected final int readCapacity; @@ -43,9 +43,16 @@ public class BufferedReadChannel extends BufferedChannelBase { long invocationCount = 0; long cacheHitCount = 0; + private volatile long fileSize = -1; + final boolean sealed; public BufferedReadChannel(FileChannel fileChannel, int readCapacity) { + this(fileChannel, readCapacity, false); + } + + public BufferedReadChannel(FileChannel fileChannel, int readCapacity, boolean sealed) { super(fileChannel); + this.sealed = sealed; this.readCapacity = readCapacity; this.readBuffer = Unpooled.buffer(readCapacity); } @@ -64,10 +71,26 @@ public int read(ByteBuf dest, long pos) throws IOException { return read(dest, pos, dest.writableBytes()); } + @Override + public long size() throws IOException { + if (sealed) { + if (fileSize == -1) { + synchronized (this) { + if (fileSize == -1) { + fileSize = validateAndGetFileChannel().size(); + } + } + } + return fileSize; + } else { + return validateAndGetFileChannel().size(); + } + } + public synchronized int read(ByteBuf dest, long pos, int length) throws IOException { invocationCount++; long currentPosition = pos; - long eof = validateAndGetFileChannel().size(); + long eof = size(); // return -1 if the given position is greater than or equal to the file's current size. if (pos >= eof) { return -1; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index d02ede52fbf..c47c0411c24 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -587,6 +587,10 @@ private long readLastLogId(File f) { } } + void clearCompactingLogId() { + entryLoggerAllocator.clearCompactingLogId(); + } + /** * Flushes all rotated log channels. After log channels are flushed, * move leastUnflushedLogId ptr to current logId. @@ -894,7 +898,8 @@ private Header getHeaderForLogId(long entryLogId) throws IOException { } } - private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException { + @VisibleForTesting + BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException { BufferedReadChannel fc = getFromChannels(entryLogId); if (fc != null) { return fc; @@ -910,7 +915,11 @@ private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOExcepti } // We set the position of the write buffer of this buffered channel to Long.MAX_VALUE // so that there are no overlaps with the write buffer while reading - fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes()); + if (entryLogManager instanceof EntryLogManagerForSingleEntryLog) { + fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), entryLoggerAllocator.isSealed(entryLogId)); + } else { + fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), false); + } putInReadChannels(entryLogId, fc); return fc; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java index 36ce928a089..e997906c23c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java @@ -161,6 +161,7 @@ void createNewLog(long ledgerId, String reason) throws IOException { logChannel.appendLedgersMap(); BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()); + entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId()); setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel); log.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.", logChannel.getLogId(), rotatedLogChannels); @@ -168,8 +169,9 @@ void createNewLog(long ledgerId, String reason) throws IOException { listener.onRotateEntryLog(); } } else { - setCurrentLogForLedgerAndAddToRotate(ledgerId, - entryLoggerAllocator.createNewLog(selectDirForNextEntryLog())); + BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()); + entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId()); + setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java index 59bcc02a577..b7845118680 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java @@ -262,6 +262,9 @@ public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IO @Override public DefaultEntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException { - return entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog()); + BufferedLogChannel newLogForCompaction = entryLoggerAllocator.createNewLogForCompaction( + selectDirForNextEntryLog()); + entryLoggerAllocator.setWritingCompactingLogId(newLogForCompaction.getLogId()); + return newLogForCompaction; } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java index 68fc1eb3caf..aec2fb1cd0d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java @@ -64,6 +64,8 @@ class EntryLoggerAllocator { private final boolean entryLogPreAllocationEnabled; private final ByteBufAllocator byteBufAllocator; final ByteBuf logfileHeader = Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE); + private volatile long writingLogId = -1; + private volatile long writingCompactingLogId = -1; EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId, @@ -91,16 +93,19 @@ synchronized long getPreallocatedLogId() { return preallocatedLogId; } + public boolean isSealed(long logId) { + return logId != writingLogId && logId != writingCompactingLogId; + } + BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException { synchronized (createEntryLogLock) { BufferedLogChannel bc; - if (!entryLogPreAllocationEnabled){ + if (!entryLogPreAllocationEnabled) { // create a new log directly - bc = allocateNewLog(dirForNextEntryLog); - return bc; + return allocateNewLog(dirForNextEntryLog); } else { // allocate directly to response request - if (null == preallocation){ + if (null == preallocation) { bc = allocateNewLog(dirForNextEntryLog); } else { // has a preallocated entry log @@ -116,7 +121,7 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException { throw new IOException("Task to allocate a new entry log is cancelled.", ce); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); - throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie); + throw new IOException("Interrupted when waiting a new entry log to be allocated.", ie); } } // preallocate a new log in background upon every call @@ -132,6 +137,18 @@ BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog) throws IOE } } + void setWritingLogId(long lodId) { + this.writingLogId = lodId; + } + + void setWritingCompactingLogId(long logId) { + this.writingCompactingLogId = logId; + } + + void clearCompactingLogId() { + writingCompactingLogId = -1; + } + private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog) throws IOException { return allocateNewLog(dirForNextEntryLog, ".log"); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index 2b6fca30c10..9a27bcccd89 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -199,6 +199,7 @@ boolean complete() { LOG.info("No valid entry is found in entry log after scan, removing entry log now."); logRemovalListener.removeEntryLog(metadata.getEntryLogId()); compactionLog.abort(); + compactingLogWriteDone(); return false; } return true; @@ -209,6 +210,13 @@ void abort() { offsets.clear(); // since we haven't flushed yet, we only need to delete the unflushed compaction file. compactionLog.abort(); + compactingLogWriteDone(); + } + } + + private void compactingLogWriteDone() { + if (entryLogger instanceof DefaultEntryLogger) { + ((DefaultEntryLogger) entryLogger).clearCompactingLogId(); } } @@ -241,6 +249,8 @@ boolean complete() throws IOException { } catch (IOException ioe) { LOG.warn("Error marking compaction as done", ioe); return false; + } finally { + compactingLogWriteDone(); } } @@ -249,6 +259,7 @@ void abort() { offsets.clear(); // remove compaction log file and its hardlink compactionLog.abort(); + compactingLogWriteDone(); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java index 38a9ebaf213..3048ef33a8c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java @@ -67,6 +67,7 @@ import org.apache.bookkeeper.common.testing.annotations.FlakyTest; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; +import org.apache.bookkeeper.test.TestStatsProvider; import org.apache.bookkeeper.util.DiskChecker; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; @@ -154,6 +155,53 @@ public void testDeferCreateNewLog() throws Exception { assertEquals(0L, entryLogManager.getCurrentLogId()); } + @Test + public void testEntryLogIsSealedWithPerLedgerDisabled() throws Exception { + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setEntryLogPerLedgerEnabled(false); + conf.setEntryLogFilePreAllocationEnabled(true); + + TestStatsProvider statsProvider = new TestStatsProvider(); + TestStatsProvider.TestStatsLogger statsLogger = + statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr, null, statsLogger, + UnpooledByteBufAllocator.DEFAULT); + EntryLogManagerBase entrylogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); + entrylogManager.createNewLog(0); + BufferedReadChannel channel = entryLogger.getChannelForLogId(0); + assertFalse(channel.sealed); + entrylogManager.createNewLog(1); + channel = entryLogger.getChannelForLogId(0); + assertFalse(channel.sealed); + entrylogManager.createNewLog(2); + channel = entryLogger.getChannelForLogId(1); + assertTrue(channel.sealed); + } + + @Test + public void testEntryLogIsSealedWithPerLedgerEnabled() throws Exception { + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + //If entryLogPerLedgerEnabled is true, the buffer channel sealed flag always false. + conf.setEntryLogPerLedgerEnabled(true); + conf.setEntryLogFilePreAllocationEnabled(true); + + TestStatsProvider statsProvider = new TestStatsProvider(); + TestStatsProvider.TestStatsLogger statsLogger = + statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE); + DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr, null, statsLogger, + UnpooledByteBufAllocator.DEFAULT); + EntryLogManagerBase entrylogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager(); + entrylogManager.createNewLog(0); + BufferedReadChannel channel = entryLogger.getChannelForLogId(0); + assertFalse(channel.sealed); + entrylogManager.createNewLog(1); + channel = entryLogger.getChannelForLogId(0); + assertFalse(channel.sealed); + entrylogManager.createNewLog(2); + channel = entryLogger.getChannelForLogId(1); + assertFalse(channel.sealed); + } + @Test public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws Exception { entryLogger.close();