From fd032913e9b877ea7152f8dd05cbf0e82be4e7ea Mon Sep 17 00:00:00 2001 From: Sunjeet Singh Date: Thu, 23 Jan 2025 18:08:01 -0800 Subject: [PATCH] Approach 1- track rev stats at the same time as fwd stats --- .../hollow/core/write/HollowBlobWriter.java | 5 +- .../core/write/HollowListTypeWriteState.java | 25 +-- .../core/write/HollowMapTypeWriteState.java | 42 +++-- .../write/HollowObjectTypeWriteState.java | 143 +++++++++--------- .../core/write/HollowSetTypeWriteState.java | 12 +- .../core/write/HollowTypeWriteState.java | 12 +- .../core/write/HollowWriteStateEngine.java | 30 +++- 7 files changed, 159 insertions(+), 110 deletions(-) diff --git a/hollow/src/main/java/com/netflix/hollow/core/write/HollowBlobWriter.java b/hollow/src/main/java/com/netflix/hollow/core/write/HollowBlobWriter.java index fe520b287..34196e099 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/write/HollowBlobWriter.java +++ b/hollow/src/main/java/com/netflix/hollow/core/write/HollowBlobWriter.java @@ -200,8 +200,9 @@ public void writeReverseDelta(OutputStream os, ProducerOptionalBlobPartConfig.Op if(partStreams != null) partStreamsByType = partStreams.getStreamsByType(); - stateEngine.prepareForWrite(true); - + stateEngine.prepareForWrite(); + // stateEngine.prepareForWrite(true); // SNAP: TODO: remove + if(stateEngine.isRestored()) stateEngine.ensureAllNecessaryStatesRestored(); diff --git a/hollow/src/main/java/com/netflix/hollow/core/write/HollowListTypeWriteState.java b/hollow/src/main/java/com/netflix/hollow/core/write/HollowListTypeWriteState.java index 3271a4d4e..55ab2f297 100644 --- a/hollow/src/main/java/com/netflix/hollow/core/write/HollowListTypeWriteState.java +++ b/hollow/src/main/java/com/netflix/hollow/core/write/HollowListTypeWriteState.java @@ -73,7 +73,7 @@ public void prepareForWrite() { maxOrdinal = ordinalMap.maxOrdinal(); gatherShardingStats(maxOrdinal); - gatherStatistics(numShards); + gatherStatistics(numShards != revNumShards); } private void gatherStatistics(boolean numShardsChanged) { @@ -351,12 +351,11 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet @Override public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, boolean isReverse) { - boolean numShardsChanged = this.revNumShards != this.numShards; int numShards = this.numShards; - long[] totalOfListSizes = this.totalOfListSizes; - if (numShardsChanged && isReverse) { + int bitsPerListPointer = this.bitsPerListPointer; + if (isReverse && this.numShards != this.revNumShards) { numShards = this.revNumShards; - totalOfListSizes = this.revTotalOfListSizes; + bitsPerListPointer = this.revBitsPerListPointer; } maxOrdinal = ordinalMap.maxOrdinal(); @@ -431,13 +430,13 @@ public void writeCalculatedDelta(DataOutputStream os, int[] maxShardOrdinal) thr /// for unsharded blobs, support pre v2.1.0 clients if(numShards == 1) { - writeCalculatedDeltaShard(os, 0, maxShardOrdinal); + writeCalculatedDeltaShard(os, 0, maxShardOrdinal, bitsPerListPointer, totalOfListSizes); } else { /// overall max ordinal VarInt.writeVInt(os, maxOrdinal); for(int i=0;i revMaxShardTotalOfMapBuckets) + revMaxShardTotalOfMapBuckets = revTotalOfMapBuckets[i]; + } + revBitsPerMapPointer = 64 - Long.numberOfLeadingZeros(revMaxShardTotalOfMapBuckets); + } } @Override @@ -509,10 +525,11 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet @Override public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, boolean isReverse) { - boolean numShardsChanged = this.revNumShards != this.numShards; int numShards = this.numShards; - if (numShardsChanged && isReverse) { + int bitsPerMapPointer = this.bitsPerMapPointer; + if (isReverse && this.numShards != this.revNumShards) { numShards = this.revNumShards; + bitsPerMapPointer = this.revBitsPerMapPointer; } maxOrdinal = ordinalMap.maxOrdinal(); @@ -634,13 +651,13 @@ public void writeCalculatedDelta(DataOutputStream os, int[] maxShardOrdinal) thr /// for unsharded blobs, support pre v2.1.0 clients if(numShards == 1) { - writeCalculatedDeltaShard(os, 0, maxShardOrdinal); + writeCalculatedDeltaShard(os, 0, maxShardOrdinal, bitsPerMapPointer, totalOfMapBuckets); } else { /// overall max ordinal VarInt.writeVInt(os, maxOrdinal); for(int i=0;i typeStateEntry : writeStates.entrySet()) { + executor.execute(new Runnable() { + @Override + public void run() { + typeStateEntry.getValue().prepareForWrite(); + } + }); + } + + executor.awaitSuccessfulCompletion(); + } catch(Exception ex) { + throw new HollowWriteStateException("Failed to prepare for write", ex); + } + + preparedForNextCycle = false; } - public void prepareForWrite(boolean recomputeStats) { + public void prepareForWrite(boolean recomputeStats) { // SNAP: TODO: remove if(!preparedForNextCycle) { if (recomputeStats) { // already prepared for write but stats needs to be recomputed (when reverse delta has different num shards)