Skip to content

Commit

Permalink
use custom dedup table
Browse files Browse the repository at this point in the history
  • Loading branch information
bbrehm committed Jan 27, 2025
1 parent edabc69 commit c64b3fe
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 9 deletions.
90 changes: 90 additions & 0 deletions core/src/main/scala/flatgraph/misc/DedupTable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package flatgraph.misc

import java.util

/** A basic LinkedHashSet-like structure, based on an open hashmap. Somewhat surprisingly, it actually pays to store part of the hash in the
* upper bits of the index into tha values array
*
* This is a fastpath to avoid some unneeded expensive isequal-checks. (expensive because they imply following an object reference)
*
* The thing is arranged such that a zero value in pos signifies an empty slot
*/
private[flatgraph] class DedupTable {
var capacity = 1024
var size = 0
var strs = new Array[String](capacity)
var pos = new Array[Int](capacity)

def insert(str: String): Int = {
val cap = capacity
val mask = cap - 1
if (str == null) return -1
val h = strengthenHash(str.hashCode)
var p = h & mask
val needle = h & ~mask
while (true) {
val idx0 = pos(p)
val idx = (idx0 & mask) - 1
if (idx == -1) {
val dst = size
size = size + 1
strs(dst) = str
pos(p) = (dst + 1) | needle
if (size + (size >> 1) > capacity) grow()
return dst
} else if ((idx0 & ~mask) == needle && str.equals(strs(idx))) return idx
p = p + 1
if (p == cap) {
// overflow -- rare, so better branch than branchless
p = 0
}
}
???
}

def insertRehash(str: String): Unit = {
val cap = capacity
val mask = cap - 1
val h = strengthenHash(str.hashCode)
var p = h & mask
val needle = h & ~mask
while (true) {
val idx0 = pos(p)
val idx = (idx0 & mask) - 1
if (idx == -1) {
val dst = size
size = size + 1
pos(p) = (dst + 1) | needle
return
}
p = p + 1
if (p == cap) {
// overflow -- rare, so better branch than branchless
p = 0
}
}
???
}

def grow(): Unit = {
val oldsize = size
size = 0
capacity = capacity * 2
pos = new Array[Int](capacity)
strs = util.Arrays.copyOf(strs, capacity)
for (idx <- Range(0, oldsize)) insertRehash(strs(idx))
}

def strengthenHash(hash0: Int): Int = {
// using the simple murmur 32 bit mixing to strengthen the hash
var hash = hash0 ^ (hash0 >>> 16)
hash *= 0x85ebca6b
hash ^= hash >>> 13
hash
// murmur does a bit more, but we don't need that.
// hash *= 0xc2b2ae35
// hash ^= hash >>> 16
// hash & (capacity - 1)
}

}
12 changes: 3 additions & 9 deletions core/src/main/scala/flatgraph/storage/Serialization.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu
val writeQueue = mutable.ArrayDeque[concurrent.Future[Any]]()
val jobQueue = mutable.ArrayBuffer[() => (OutlineStorage, Array[Byte])]()
val stringQueue = mutable.ArrayDeque[(OutlineStorage, Array[String])]()
val stringpool = mutable.LinkedHashMap[String, Int]()
val stringpool = new flatgraph.misc.DedupTable

def submitCompress(block: => (OutlineStorage, ByteBuffer)): Unit = {
compressQueue.addOne(executor.submit((() => block)))
Expand All @@ -46,12 +46,6 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu
}
}

// NOT threadsafe!
private def insertString(stringPool: mutable.LinkedHashMap[String, Int])(s: String): Int = {
if (s == null) -1
else stringPool.getOrElseUpdate(s, stringPool.size)
}

private[flatgraph] def encodeAny(item: Any, outlineStorage: OutlineStorage = new OutlineStorage, delta: Int = -1): OutlineStorage = {
item match {
case _: DefaultValue => null
Expand Down Expand Up @@ -161,7 +155,7 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu
writeItem(item, bytes)
}
val (item, strings) = stringQueue.removeHead()
val indices = strings.map(insertString(stringpool))
val indices = strings.map(stringpool.insert)
submitCompress {
val bytes = new Array[Byte](4 * strings.length)
ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(indices)
Expand All @@ -172,7 +166,7 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu
val poolLenBytes = new Array[Byte](4 * stringpool.size)
val poolLenBuffer = ByteBuffer.wrap(poolLenBytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer()
val poolBytes = new ByteArrayOutputStream()
for (s <- stringpool.keysIterator) {
for (s <- stringpool.strs.iterator.take(stringpool.size)) {
val bytes = s.getBytes(StandardCharsets.UTF_8)
poolBytes.write(bytes)
poolLenBuffer.put(bytes.length)
Expand Down

0 comments on commit c64b3fe

Please sign in to comment.