Skip to content

Commit

Permalink
Improve DefaultEntryLogger read performance. apache#4038
Browse files Browse the repository at this point in the history
  • Loading branch information
gavingaozhangmin committed Sep 8, 2023
1 parent 0b6a83f commit b3ea933
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,32 @@ public class BufferedReadChannel extends BufferedChannelBase {
long invocationCount = 0;
long cacheHitCount = 0;
private boolean closed = false;
private long fileSize = -1;
final boolean sealed;

public BufferedReadChannel(FileChannel fileChannel, int readCapacity) {
this(fileChannel, readCapacity, false);
}

public BufferedReadChannel(FileChannel fileChannel, int readCapacity, boolean sealed) {
super(fileChannel);
this.readCapacity = readCapacity;
this.sealed = sealed;
this.readBuffer = Unpooled.buffer(readCapacity);
}

@Override
public long size() throws IOException {
if (sealed) {
if (fileSize == -1) {
fileSize = validateAndGetFileChannel().size();
}
return fileSize;
} else {
return validateAndGetFileChannel().size();
}
}

/**
* Read as many bytes into dest as dest.capacity() starting at position pos in the
* FileChannel. This function can read from the buffer or the file channel
Expand All @@ -70,7 +89,7 @@ public int read(ByteBuf dest, long pos) throws IOException {
public synchronized int read(ByteBuf dest, long pos, int length) throws IOException {
invocationCount++;
long currentPosition = pos;
long eof = validateAndGetFileChannel().size();
long eof = size();
// return -1 if the given position is greater than or equal to the file's current size.
if (pos >= eof) {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ public EntryLoggerAllocator getEntryLoggerAllocator() {
return entryLoggerAllocator;
}

void clearCompactingLogId() {
entryLoggerAllocator.clearCompactingLogId();
}

public DefaultEntryLogger.RecentEntryLogsStatus getRecentlyCreatedEntryLogsStatus() {
return recentlyCreatedEntryLogsStatus;
}
Expand Down Expand Up @@ -960,7 +964,8 @@ private Header getHeaderForLogId(long entryLogId) throws IOException {
}
}

private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
@VisibleForTesting
BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
BufferedReadChannel fc = getFromChannels(entryLogId);
if (fc != null) {
return fc;
Expand All @@ -976,7 +981,11 @@ private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOExcepti
}
// We set the position of the write buffer of this buffered channel to Long.MAX_VALUE
// so that there are no overlaps with the write buffer while reading
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
if (entryLogManager instanceof EntryLogManagerForSingleEntryLog) {
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), entryLoggerAllocator.isSealed(entryLogId));
} else {
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), false);
}
putInReadChannels(entryLogId, fc);
return fc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ void createNewLog(long ledgerId, String reason) throws IOException {
logChannel.appendLedgersMap();

BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
entryLoggerAllocator.setWritingLogId(selectDirForNextEntryLog().getAbsolutePath(), newLogChannel.getLogId());
setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
log.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
logChannel.getLogId(), rotatedLogChannels);
Expand All @@ -169,6 +170,7 @@ void createNewLog(long ledgerId, String reason) throws IOException {
}
} else {
BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
entryLoggerAllocator.setWritingLogId(selectDirForNextEntryLog().getAbsolutePath(), newLogChannel.getLogId());
setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
for (EntryLogListener listener : listeners) {
listener.onRotateEntryLog(newLogChannel.getLogId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IO

@Override
public DefaultEntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException {
return entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog());
BufferedLogChannel newLogForCompaction = entryLoggerAllocator.createNewLogForCompaction(
selectDirForNextEntryLog());
entryLoggerAllocator.setWritingCompactingLogId(newLogForCompaction.getLogId());
return newLogForCompaction;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class EntryLoggerAllocator {
private final boolean entryLogPreAllocationEnabled;
private final ByteBufAllocator byteBufAllocator;
final ByteBuf logfileHeader = Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE);
private Map<String, Long> writingLogIds = new ConcurrentHashMap<>();
private long writingCompactingLogId = -1;

EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId,
Expand All @@ -91,6 +93,11 @@ public class EntryLoggerAllocator {
logfileHeader.writerIndex(DefaultEntryLogger.LOGFILE_HEADER_SIZE);

}

public boolean isSealed(long logId) {
return logId != writingCompactingLogId && !writingLogIds.containsValue(logId);
}

public void addLedgerDirsManager(LedgerDirsManager ledgerDirsManager) {
this.ledgersDirs.addAll(ledgerDirsManager.getAllLedgerDirs());
}
Expand Down Expand Up @@ -133,6 +140,18 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {
}
}

void setWritingLogId(String dirPath, long lodId) {
this.writingLogIds.put(dirPath, lodId);
}

void setWritingCompactingLogId(long logId) {
this.writingCompactingLogId = logId;
}

void clearCompactingLogId() {
writingCompactingLogId = -1;
}

BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog) throws IOException {
synchronized (createCompactionLogLock) {
return allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio
long entryId = entry.getLong(entry.readerIndex() + 8);
if (lid != ledgerId || entryId < -1) {
LOG.warn("Scanning expected ledgerId {}, but found invalid entry "
+ "with ledgerId {} entryId {} at offset {}",
+ "with ledgerId {} entryId {} at offset {}",
ledgerId, lid, entryId, offset);
throw new IOException("Invalid entry found @ offset " + offset);
}
Expand All @@ -199,6 +199,7 @@ boolean complete() {
LOG.info("No valid entry is found in entry log after scan, removing entry log now.");
logRemovalListener.removeEntryLog(metadata.getEntryLogId());
compactionLog.abort();
compactingLogWriteDone();
return false;
}
return true;
Expand All @@ -209,6 +210,13 @@ void abort() {
offsets.clear();
// since we haven't flushed yet, we only need to delete the unflushed compaction file.
compactionLog.abort();
compactingLogWriteDone();
}
}

private void compactingLogWriteDone() {
if (entryLogger instanceof DefaultEntryLogger) {
((DefaultEntryLogger) entryLogger).clearCompactingLogId();
}
}

Expand Down Expand Up @@ -241,6 +249,8 @@ boolean complete() throws IOException {
} catch (IOException ioe) {
LOG.warn("Error marking compaction as done", ioe);
return false;
} finally {
compactingLogWriteDone();
}
}

Expand All @@ -249,6 +259,7 @@ void abort() {
offsets.clear();
// remove compaction log file and its hardlink
compactionLog.abort();
compactingLogWriteDone();
}
}

Expand Down

0 comments on commit b3ea933

Please sign in to comment.