Skip to content

Commit

Permalink
DWL is not truncated on file flush but only if it reaches file limit.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii0lomakin committed Feb 17, 2024
1 parent 522e470 commit abec92f
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,14 @@ public final class OWOWCache extends OAbstractWriteCache
private static final ThreadLocal<Cipher> CIPHER =
ThreadLocal.withInitial(OWOWCache::getCipherInstance);

/** Extension for the file which contains mapping between file name and file id */
/**
* Extension for the file which contains mapping between file name and file id
*/
private static final String NAME_ID_MAP_EXTENSION = ".cm";

/** Name for file which contains first version of binary format */
/**
* Name for file which contains first version of binary format
*/
private static final String NAME_ID_MAP_V1 = "name_id_map" + NAME_ID_MAP_EXTENSION;

/**
Expand Down Expand Up @@ -193,16 +197,24 @@ public final class OWOWCache extends OAbstractWriteCache
*/
private static final int MAX_FILE_RECORD_LEN = 16 * 1024;

/** Marks pages which have a checksum stored. */
/**
* Marks pages which have a checksum stored.
*/
public static final long MAGIC_NUMBER_WITH_CHECKSUM = 0xFACB03FEL;

/** Marks pages which have a checksum stored and data encrypted */
/**
* Marks pages which have a checksum stored and data encrypted
*/
public static final long MAGIC_NUMBER_WITH_CHECKSUM_ENCRYPTED = 0x1L;

/** Marks pages which have no checksum stored. */
/**
* Marks pages which have no checksum stored.
*/
private static final long MAGIC_NUMBER_WITHOUT_CHECKSUM = 0xEF30BCAFL;

/** Marks pages which have no checksum stored but have data encrypted */
/**
* Marks pages which have no checksum stored but have data encrypted
*/
private static final long MAGIC_NUMBER_WITHOUT_CHECKSUM_ENCRYPTED = 0x2L;

private static final int MAGIC_NUMBER_OFFSET = 0;
Expand All @@ -214,7 +226,9 @@ public final class OWOWCache extends OAbstractWriteCache

private static final int CHUNK_SIZE = 64 * 1024 * 1024;

/** Executor which runs in single thread all tasks are related to flush of write cache data. */
/**
* Executor which runs in single thread all tasks are related to flush of write cache data.
*/
private static final ScheduledExecutorService commitExecutor;

static {
Expand All @@ -223,15 +237,21 @@ public final class OWOWCache extends OAbstractWriteCache
"OrientDB Write Cache Flush Task", OAbstractPaginatedStorage.storageThreadGroup);
}

/** Limit of free space on disk after which database will be switched to "read only" mode */
/**
* Limit of free space on disk after which database will be switched to "read only" mode
*/
private final long freeSpaceLimit =
OGlobalConfiguration.DISK_CACHE_FREE_SPACE_LIMIT.getValueAsLong() * 1024L * 1024L;

/** Listeners which are called once we detect that some of the pages of files are broken. */
/**
* Listeners which are called once we detect that some of the pages of files are broken.
*/
private final List<WeakReference<OPageIsBrokenListener>> pageIsBrokenListeners =
new CopyOnWriteArrayList<>();

/** Path to the storage root directory where all files served by write cache will be stored */
/**
* Path to the storage root directory where all files served by write cache will be stored
*/
private final Path storagePath;

private final FileStore fileStore;
Expand Down Expand Up @@ -280,14 +300,14 @@ public final class OWOWCache extends OAbstractWriteCache
new ConcurrentHashMap<>();

/**
* Copy of content of {@link #dirtyPages} table at the moment when {@link
* #convertSharedDirtyPagesToLocal()} was called. This field is not thread safe because it is used
* inside of tasks which are running inside of {@link #commitExecutor} thread. It is used to keep
* results of postprocessing of {@link #dirtyPages} table. Every time we invoke {@link
* #convertSharedDirtyPagesToLocal()} all content of dirty pages is removed and copied to current
* field and {@link #localDirtyPagesBySegment} filed. Such approach is possible because {@link
* #dirtyPages} table is filled by many threads but is read only from inside of {@link
* #commitExecutor} thread.
* Copy of content of {@link #dirtyPages} table at the moment when
* {@link #convertSharedDirtyPagesToLocal()} was called. This field is not thread safe because it
* is used inside of tasks which are running inside of {@link #commitExecutor} thread. It is used
* to keep results of postprocessing of {@link #dirtyPages} table. Every time we invoke
* {@link #convertSharedDirtyPagesToLocal()} all content of dirty pages is removed and copied to
* current field and {@link #localDirtyPagesBySegment} filed. Such approach is possible because
* {@link #dirtyPages} table is filled by many threads but is read only from inside of
* {@link #commitExecutor} thread.
*/
private final HashMap<PageKey, OLogSequenceNumber> localDirtyPages = new HashMap<>();

Expand All @@ -299,19 +319,29 @@ public final class OWOWCache extends OAbstractWriteCache
*/
private final TreeMap<Long, TreeSet<PageKey>> localDirtyPagesBySegment = new TreeMap<>();

/** Approximate amount of all pages contained by write cache at the moment */
/**
* Approximate amount of all pages contained by write cache at the moment
*/
private final AtomicLong writeCacheSize = new AtomicLong();

/** Amount of exclusive pages are hold by write cache. */
/**
* Amount of exclusive pages are hold by write cache.
*/
private final AtomicLong exclusiveWriteCacheSize = new AtomicLong();

/** Serialized is used to encode/decode names of files are managed by write cache. */
/**
* Serialized is used to encode/decode names of files are managed by write cache.
*/
private final OBinarySerializer<String> stringSerializer;

/** Size of single page in cache in bytes. */
/**
* Size of single page in cache in bytes.
*/
private final int pageSize;

/** WAL instance */
/**
* WAL instance
*/
private final OWriteAheadLog writeAheadLog;

/**
Expand Down Expand Up @@ -342,10 +372,14 @@ public final class OWOWCache extends OAbstractWriteCache

private final Random fileIdGen = new Random();

/** Path to the file which contains metadata for the files registered in storage. */
/**
* Path to the file which contains metadata for the files registered in storage.
*/
private Path nameIdMapHolderPath;

/** Write cache id , which should be unique across all storages. */
/**
* Write cache id , which should be unique across all storages.
*/
private final int id;

/**
Expand All @@ -358,13 +392,19 @@ public final class OWOWCache extends OAbstractWriteCache

private volatile OChecksumMode checksumMode;

/** Error thrown during data flush. Once error registered no more write operations are allowed. */
/**
* Error thrown during data flush. Once error registered no more write operations are allowed.
*/
private Throwable flushError;

/** IV is used for AES encryption */
/**
* IV is used for AES encryption
*/
private final byte[] iv;

/** Key is used for AES encryption */
/**
* Key is used for AES encryption
*/
private final byte[] aesKey;

private final int exclusiveWriteCacheMaxSize;
Expand All @@ -382,7 +422,9 @@ public final class OWOWCache extends OAbstractWriteCache

private final int shutdownTimeout;

/** Listeners which are called when exception in background data flush thread is happened. */
/**
* Listeners which are called when exception in background data flush thread is happened.
*/
private final List<WeakReference<OBackgroundExceptionListener>> backgroundExceptionListeners =
new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -467,7 +509,9 @@ public OWOWCache(
}
}

/** Loads files already registered in storage. Has to be called before usage of this cache */
/**
* Loads files already registered in storage. Has to be called before usage of this cache
*/
public void loadRegisteredFiles() throws IOException, InterruptedException {
filesLock.acquireWriteLock();
try {
Expand Down Expand Up @@ -510,7 +554,9 @@ public void removeBackgroundExceptionListener(final OBackgroundExceptionListener
backgroundExceptionListeners.removeAll(itemsToRemove);
}

/** Fires event about exception is thrown in data flush thread */
/**
* Fires event about exception is thrown in data flush thread
*/
private void fireBackgroundDataFlushExceptionEvent(final Throwable e) {
for (final WeakReference<OBackgroundExceptionListener> ref : backgroundExceptionListeners) {
final OBackgroundExceptionListener listener = ref.get();
Expand All @@ -533,20 +579,24 @@ private static int normalizeMemory(final long maxSize, final int pageSize) {
* Directory which contains all files managed by write cache.
*
* @return Directory which contains all files managed by write cache or <code>null</code> in case
* of in memory database.
* of in memory database.
*/
@Override
public Path getRootDirectory() {
return storagePath;
}

/** @inheritDoc */
/**
* @inheritDoc
*/
@Override
public void addPageIsBrokenListener(final OPageIsBrokenListener listener) {
pageIsBrokenListeners.add(new WeakReference<>(listener));
}

/** @inheritDoc */
/**
* @inheritDoc
*/
@Override
public void removePageIsBrokenListener(final OPageIsBrokenListener listener) {
final List<WeakReference<OPageIsBrokenListener>> itemsToRemove = new ArrayList<>(1);
Expand Down Expand Up @@ -608,7 +658,9 @@ public long bookFileId(final String fileName) {
}
}

/** @inheritDoc */
/**
* @inheritDoc
*/
@Override
public int pageSize() {
return pageSize;
Expand Down Expand Up @@ -783,10 +835,12 @@ public void updateDirtyPagesTable(
}

@Override
public void create() {}
public void create() {
}

@Override
public void open() {}
public void open() {
}

@Override
public long addFile(final String fileName, long fileId) throws IOException {
Expand Down Expand Up @@ -890,10 +944,7 @@ public void syncDataFiles(final long segmentId, final byte[] lastMetadata) throw
}

writeAheadLog.flush();

writeAheadLog.cutAllSegmentsSmallerThan(segmentId);

doubleWriteLog.truncate();
} finally {
doubleWriteLog.endCheckpoint();
}
Expand Down Expand Up @@ -2346,8 +2397,8 @@ private OCachePointer loadFileContent(

if (verifyChecksums
&& (checksumMode == OChecksumMode.StoreAndVerify
|| checksumMode == OChecksumMode.StoreAndThrow
|| checksumMode == OChecksumMode.StoreAndSwitchReadOnlyMode)) {
|| checksumMode == OChecksumMode.StoreAndThrow
|| checksumMode == OChecksumMode.StoreAndSwitchReadOnlyMode)) {
// if page is broken inside of data file we check double write log
if (!verifyMagicChecksumAndDecryptPage(buffer, internalFileId, pageIndex)) {
final OPointer doubleWritePointer =
Expand Down Expand Up @@ -2380,8 +2431,8 @@ private OCachePointer loadFileContent(

if (verifyChecksums
&& (checksumMode == OChecksumMode.StoreAndVerify
|| checksumMode == OChecksumMode.StoreAndThrow
|| checksumMode == OChecksumMode.StoreAndSwitchReadOnlyMode)) {
|| checksumMode == OChecksumMode.StoreAndThrow
|| checksumMode == OChecksumMode.StoreAndSwitchReadOnlyMode)) {
if (!verifyMagicChecksumAndDecryptPage(buffer, internalFileId, pageIndex)) {
assertPageIsBroken(pageIndex, fileId, pointer);
}
Expand Down Expand Up @@ -2512,10 +2563,10 @@ private boolean verifyMagicChecksumAndDecryptPage(

if ((aesKey == null && magicNumber != MAGIC_NUMBER_WITH_CHECKSUM)
|| (magicNumber != MAGIC_NUMBER_WITH_CHECKSUM
&& (magicNumber & 0xFF) != MAGIC_NUMBER_WITH_CHECKSUM_ENCRYPTED)) {
&& (magicNumber & 0xFF) != MAGIC_NUMBER_WITH_CHECKSUM_ENCRYPTED)) {
if ((aesKey == null && magicNumber != MAGIC_NUMBER_WITHOUT_CHECKSUM)
|| (magicNumber != MAGIC_NUMBER_WITHOUT_CHECKSUM
&& (magicNumber & 0xFF) != MAGIC_NUMBER_WITHOUT_CHECKSUM_ENCRYPTED)) {
&& (magicNumber & 0xFF) != MAGIC_NUMBER_WITHOUT_CHECKSUM_ENCRYPTED)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,6 @@ public void write(long offset, ByteBuffer buffer) {
new OStorageException("Error during write operation to the file " + osFile), e);
}
} while (written < buffer.limit());

dirtyCounter.incrementAndGet();
assert written == buffer.limit();
} finally {
lock.sharedUnlock();
Expand All @@ -203,6 +201,8 @@ public void write(long offset, ByteBuffer buffer) {

@Override
public IOResult write(List<ORawPair<Long, ByteBuffer>> buffers) {
dirtyCounter.incrementAndGet();

final CountDownLatch latch = new CountDownLatch(buffers.size());
final AsyncIOResult asyncIOResult = new AsyncIOResult(latch);

Expand Down Expand Up @@ -296,7 +296,7 @@ private void doSynch() {
long dirtyCounterValue = dirtyCounter.get();
if (dirtyCounterValue > 0) {
try {
fileChannel.force(false);
fileChannel.force(true);
} catch (final IOException e) {
OLogManager.instance()
.warn(
Expand Down

0 comments on commit abec92f

Please sign in to comment.