Skip to content

Commit

Permalink
Approach 1- track rev stats at the same time as fwd stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Jan 24, 2025
1 parent cd24881 commit fd03291
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ public void writeReverseDelta(OutputStream os, ProducerOptionalBlobPartConfig.Op
if(partStreams != null)
partStreamsByType = partStreams.getStreamsByType();

stateEngine.prepareForWrite(true);

stateEngine.prepareForWrite();
// stateEngine.prepareForWrite(true); // SNAP: TODO: remove

if(stateEngine.isRestored())
stateEngine.ensureAllNecessaryStatesRestored();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void prepareForWrite() {
maxOrdinal = ordinalMap.maxOrdinal();
gatherShardingStats(maxOrdinal);

gatherStatistics(numShards);
gatherStatistics(numShards != revNumShards);
}

private void gatherStatistics(boolean numShardsChanged) {
Expand Down Expand Up @@ -351,12 +351,11 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet

@Override
public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, boolean isReverse) {
boolean numShardsChanged = this.revNumShards != this.numShards;
int numShards = this.numShards;
long[] totalOfListSizes = this.totalOfListSizes;
if (numShardsChanged && isReverse) {
int bitsPerListPointer = this.bitsPerListPointer;
if (isReverse && this.numShards != this.revNumShards) {
numShards = this.revNumShards;
totalOfListSizes = this.revTotalOfListSizes;
bitsPerListPointer = this.revBitsPerListPointer;
}

maxOrdinal = ordinalMap.maxOrdinal();
Expand Down Expand Up @@ -431,13 +430,13 @@ public void writeCalculatedDelta(DataOutputStream os, int[] maxShardOrdinal) thr

/// for unsharded blobs, support pre v2.1.0 clients
if(numShards == 1) {
writeCalculatedDeltaShard(os, 0, maxShardOrdinal);
writeCalculatedDeltaShard(os, 0, maxShardOrdinal, bitsPerListPointer, totalOfListSizes);
} else {
/// overall max ordinal
VarInt.writeVInt(os, maxOrdinal);

for(int i=0;i<numShards;i++) {
writeCalculatedDeltaShard(os, i, maxShardOrdinal);
writeCalculatedDeltaShard(os, i, maxShardOrdinal, bitsPerListPointer, totalOfListSizes);
}
}

Expand All @@ -449,23 +448,25 @@ public void writeCalculatedDelta(DataOutputStream os, int[] maxShardOrdinal) thr

@Override
public void writeCalculatedDelta(DataOutputStream os, boolean isReverse, int[] maxShardOrdinal) throws IOException {
boolean numShardsChanged = this.revNumShards != this.numShards;
int numShards = this.numShards;
int bitsPerListPointer = this.bitsPerListPointer;
long[] totalOfListSizes = this.totalOfListSizes;
if (numShardsChanged && isReverse) {
if (isReverse && this.numShards != this.revNumShards) {
numShards = this.revNumShards;
bitsPerListPointer = this.revBitsPerListPointer;
totalOfListSizes = this.revTotalOfListSizes;
}


/// for unsharded blobs, support pre v2.1.0 clients
if(numShards == 1) {
writeCalculatedDeltaShard(os, 0, maxShardOrdinal);
writeCalculatedDeltaShard(os, 0, maxShardOrdinal, bitsPerListPointer, totalOfListSizes);
} else {
/// overall max ordinal
VarInt.writeVInt(os, maxOrdinal);

for(int i=0;i<numShards;i++) {
writeCalculatedDeltaShard(os, i, maxShardOrdinal);
writeCalculatedDeltaShard(os, i, maxShardOrdinal, bitsPerListPointer, totalOfListSizes);
}
}

Expand All @@ -476,7 +477,7 @@ public void writeCalculatedDelta(DataOutputStream os, boolean isReverse, int[] m
}


private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber, int[] maxShardOrdinal) throws IOException {
private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber, int[] maxShardOrdinal, int bitsPerListPointer, long[] totalOfListSizes) throws IOException {
/// 1) max shard ordinal
VarInt.writeVInt(os, maxShardOrdinal[shardNumber]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ public class HollowMapTypeWriteState extends HollowTypeWriteState {

/// statistics required for writing fixed length set data
private int bitsPerMapPointer;
private int revBitsPerMapPointer;
private int bitsPerMapSizeValue;
private int bitsPerKeyElement;
private int bitsPerValueElement;
private long totalOfMapBuckets[];
private long revTotalOfMapBuckets[];

/// data required for writing snapshot or delta
private FixedLengthElementArray mapPointersAndSizesArray[];
Expand Down Expand Up @@ -79,7 +81,7 @@ public void prepareForWrite() {
maxOrdinal = ordinalMap.maxOrdinal();
gatherShardingStats(maxOrdinal);

gatherStatistics(numShards);
gatherStatistics(numShards != revNumShards);
}

private void gatherStatistics(boolean numShardsChanged) {
Expand All @@ -91,7 +93,9 @@ private void gatherStatistics(boolean numShardsChanged) {
ByteData data = ordinalMap.getByteData().getUnderlyingArray();

totalOfMapBuckets = new long[numShards];
// SNAP: TOOD: missing revNumShards for approach 1
if (numShardsChanged) {
revTotalOfMapBuckets = new long[revNumShards];
}

for(int i=0;i<=maxOrdinal;i++) {
if(currentCyclePopulated.get(i) || previousCyclePopulated.get(i)) {
Expand Down Expand Up @@ -123,6 +127,9 @@ private void gatherStatistics(boolean numShardsChanged) {
}

totalOfMapBuckets[i & (numShards-1)] += numBuckets;
if (numShardsChanged) {
revTotalOfMapBuckets[i & (revNumShards-1)] += numBuckets;
}
}
}

Expand All @@ -138,6 +145,15 @@ private void gatherStatistics(boolean numShardsChanged) {
bitsPerMapSizeValue = 64 - Long.numberOfLeadingZeros(maxMapSize);

bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(maxShardTotalOfMapBuckets);

if (numShardsChanged) {
long revMaxShardTotalOfMapBuckets = 0;
for(int i=0;i<revNumShards;i++) {
if(revTotalOfMapBuckets[i] > revMaxShardTotalOfMapBuckets)
revMaxShardTotalOfMapBuckets = revTotalOfMapBuckets[i];
}
revBitsPerMapPointer = 64 - Long.numberOfLeadingZeros(revMaxShardTotalOfMapBuckets);
}
}

@Override
Expand Down Expand Up @@ -509,10 +525,11 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet

@Override
public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, boolean isReverse) {
boolean numShardsChanged = this.revNumShards != this.numShards;
int numShards = this.numShards;
if (numShardsChanged && isReverse) {
int bitsPerMapPointer = this.bitsPerMapPointer;
if (isReverse && this.numShards != this.revNumShards) {
numShards = this.revNumShards;
bitsPerMapPointer = this.revBitsPerMapPointer;
}

maxOrdinal = ordinalMap.maxOrdinal();
Expand Down Expand Up @@ -634,13 +651,13 @@ public void writeCalculatedDelta(DataOutputStream os, int[] maxShardOrdinal) thr

/// for unsharded blobs, support pre v2.1.0 clients
if(numShards == 1) {
writeCalculatedDeltaShard(os, 0, maxShardOrdinal);
writeCalculatedDeltaShard(os, 0, maxShardOrdinal, bitsPerMapPointer, totalOfMapBuckets);
} else {
/// overall max ordinal
VarInt.writeVInt(os, maxOrdinal);

for(int i=0;i<numShards;i++) {
writeCalculatedDeltaShard(os, i, maxShardOrdinal);
writeCalculatedDeltaShard(os, i, maxShardOrdinal, bitsPerMapPointer, totalOfMapBuckets);
}
}

Expand All @@ -652,21 +669,24 @@ public void writeCalculatedDelta(DataOutputStream os, int[] maxShardOrdinal) thr

@Override
public void writeCalculatedDelta(DataOutputStream os, boolean isReverse, int[] maxShardOrdinal) throws IOException {
boolean numShardsChanged = this.revNumShards != this.numShards;
int numShards = this.numShards;
if (numShardsChanged && isReverse) {
int bitsPerMapPointer = this.bitsPerMapPointer;
long[] totalOfMapBuckets = this.totalOfMapBuckets;
if (isReverse && this.numShards != this.revNumShards) {
numShards = this.revNumShards;
bitsPerMapPointer = this.revBitsPerMapPointer;
totalOfMapBuckets = this.revTotalOfMapBuckets;
}

/// for unsharded blobs, support pre v2.1.0 clients
if(numShards == 1) {
writeCalculatedDeltaShard(os, 0, maxShardOrdinal);
writeCalculatedDeltaShard(os, 0, maxShardOrdinal, bitsPerMapPointer, totalOfMapBuckets);
} else {
/// overall max ordinal
VarInt.writeVInt(os, maxOrdinal);

for(int i=0;i<numShards;i++) {
writeCalculatedDeltaShard(os, i, maxShardOrdinal);
writeCalculatedDeltaShard(os, i, maxShardOrdinal, bitsPerMapPointer, totalOfMapBuckets);
}
}

Expand All @@ -676,7 +696,7 @@ public void writeCalculatedDelta(DataOutputStream os, boolean isReverse, int[] m
deltaRemovedOrdinals = null;
}

private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber, int[] maxShardOrdinal) throws IOException {
private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber, int[] maxShardOrdinal, int bitsPerMapPointer, long[] totalOfMapBuckets) throws IOException {

int bitsPerMapFixedLengthPortion = bitsPerMapSizeValue + bitsPerMapPointer;
int bitsPerMapEntry = bitsPerKeyElement + bitsPerValueElement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void calculateSnapshot() {
maxOrdinal = ordinalMap.maxOrdinal();

int numBitsPerRecord = fieldStats.getNumBitsPerRecord();

fixedLengthLongArray = new FixedLengthElementArray[numShards];
varLengthByteArrays = new ByteDataArray[numShards][];
recordBitOffset = new long[numShards];
Expand Down Expand Up @@ -299,52 +299,55 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet

@Override
public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, boolean isReverse) {
throw new UnsupportedOperationException();

// maxOrdinal = ordinalMap.maxOrdinal();
// int numBitsPerRecord = fieldStats.getNumBitsPerRecord();
//
// ThreadSafeBitSet deltaAdditions = toCyclePopulated.andNot(fromCyclePopulated);
//
// fixedLengthLongArray = new FixedLengthElementArray[numShards];
// deltaAddedOrdinals = new ByteDataArray[numShards];
// deltaRemovedOrdinals = new ByteDataArray[numShards];
// varLengthByteArrays = new ByteDataArray[numShards][];
// recordBitOffset = new long[numShards];
// int numAddedRecordsInShard[] = new int[numShards];
//
// int shardMask = numShards - 1;
//
// int addedOrdinal = deltaAdditions.nextSetBit(0);
// while(addedOrdinal != -1) {
// numAddedRecordsInShard[addedOrdinal & shardMask]++;
// addedOrdinal = deltaAdditions.nextSetBit(addedOrdinal + 1);
// }
//
// for(int i=0;i<numShards;i++) {
// fixedLengthLongArray[i] = new FixedLengthElementArray(WastefulRecycler.DEFAULT_INSTANCE, (long)numAddedRecordsInShard[i] * numBitsPerRecord);
// deltaAddedOrdinals[i] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
// deltaRemovedOrdinals[i] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
// varLengthByteArrays[i] = new ByteDataArray[getSchema().numFields()];
// }
//
// int previousRemovedOrdinal[] = new int[numShards];
// int previousAddedOrdinal[] = new int[numShards];
//
// for(int i=0;i<=maxOrdinal;i++) {
// int shardNumber = i & shardMask;
// if(deltaAdditions.get(i)) {
// addRecord(i, recordBitOffset[shardNumber], fixedLengthLongArray[shardNumber], varLengthByteArrays[shardNumber]);
// recordBitOffset[shardNumber] += numBitsPerRecord;
// int shardOrdinal = i / numShards;
// VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]);
// previousAddedOrdinal[shardNumber] = shardOrdinal;
// } else if(fromCyclePopulated.get(i) && !toCyclePopulated.get(i)) {
// int shardOrdinal = i / numShards;
// VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]);
// previousRemovedOrdinal[shardNumber] = shardOrdinal;
// }
// }
int numShards = this.numShards;
if (isReverse && this.numShards != this.revNumShards) {
numShards = this.revNumShards;
}

maxOrdinal = ordinalMap.maxOrdinal();
int numBitsPerRecord = fieldStats.getNumBitsPerRecord();

ThreadSafeBitSet deltaAdditions = toCyclePopulated.andNot(fromCyclePopulated);

fixedLengthLongArray = new FixedLengthElementArray[numShards];
deltaAddedOrdinals = new ByteDataArray[numShards];
deltaRemovedOrdinals = new ByteDataArray[numShards];
varLengthByteArrays = new ByteDataArray[numShards][];
recordBitOffset = new long[numShards];
int numAddedRecordsInShard[] = new int[numShards];

int shardMask = numShards - 1;

int addedOrdinal = deltaAdditions.nextSetBit(0);
while(addedOrdinal != -1) {
numAddedRecordsInShard[addedOrdinal & shardMask]++;
addedOrdinal = deltaAdditions.nextSetBit(addedOrdinal + 1);
}

for(int i=0;i<numShards;i++) {
fixedLengthLongArray[i] = new FixedLengthElementArray(WastefulRecycler.DEFAULT_INSTANCE, (long)numAddedRecordsInShard[i] * numBitsPerRecord);
deltaAddedOrdinals[i] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
deltaRemovedOrdinals[i] = new ByteDataArray(WastefulRecycler.DEFAULT_INSTANCE);
varLengthByteArrays[i] = new ByteDataArray[getSchema().numFields()];
}

int previousRemovedOrdinal[] = new int[numShards];
int previousAddedOrdinal[] = new int[numShards];

for(int i=0;i<=maxOrdinal;i++) {
int shardNumber = i & shardMask;
if(deltaAdditions.get(i)) {
addRecord(i, recordBitOffset[shardNumber], fixedLengthLongArray[shardNumber], varLengthByteArrays[shardNumber]);
recordBitOffset[shardNumber] += numBitsPerRecord;
int shardOrdinal = i / numShards;
VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]);
previousAddedOrdinal[shardNumber] = shardOrdinal;
} else if(fromCyclePopulated.get(i) && !toCyclePopulated.get(i)) {
int shardOrdinal = i / numShards;
VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]);
previousRemovedOrdinal[shardNumber] = shardOrdinal;
}
}
}

@Override
Expand Down Expand Up @@ -372,30 +375,28 @@ public void writeCalculatedDelta(DataOutputStream os, int[] maxShardOrdinal) thr

@Override
public void writeCalculatedDelta(DataOutputStream os, boolean isReverse, int[] maxShardOrdinal) throws IOException {
throw new IllegalStateException();
// boolean numShardsChanged = this.revNumShards != this.numShards;
// int numShards = this.numShards;
// if (numShardsChanged && isReverse) {
// numShards = this.revNumShards;
// }
//
// /// for unsharded blobs, support pre v2.1.0 clients
// if(numShards == 1) {
// writeCalculatedDeltaShard(os, 0, maxShardOrdinal);
// } else {
// /// overall max ordinal
// VarInt.writeVInt(os, maxOrdinal);
//
// for(int i=0;i<numShards;i++) {
// writeCalculatedDeltaShard(os, i, maxShardOrdinal);
// }
// }
//
// fixedLengthLongArray = null;
// varLengthByteArrays = null;
// deltaAddedOrdinals = null;
// deltaRemovedOrdinals = null;
// recordBitOffset = null;
int numShards = this.numShards;
if (isReverse && this.numShards != this.revNumShards) {
numShards = this.revNumShards;
}

/// for unsharded blobs, support pre v2.1.0 clients
if(numShards == 1) {
writeCalculatedDeltaShard(os, 0, maxShardOrdinal);
} else {
/// overall max ordinal
VarInt.writeVInt(os, maxOrdinal);

for(int i=0;i<numShards;i++) {
writeCalculatedDeltaShard(os, i, maxShardOrdinal);
}
}

fixedLengthLongArray = null;
varLengthByteArrays = null;
deltaAddedOrdinals = null;
deltaRemovedOrdinals = null;
recordBitOffset = null;
}

private void writeCalculatedDeltaShard(DataOutputStream os, int shardNumber, int[] maxShardOrdinal) throws IOException {
Expand Down
Loading

0 comments on commit fd03291

Please sign in to comment.