diff --git a/README.md b/README.md index 888510a7..25fc98a0 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,21 @@ Code formatting is maintained via sbt scalafmt Test/scalafmt ``` +## Diverse notes +By default, diffgraph application, deserialization from storage, and serialization to storage are all multi-threaded. + +This can be globally disabled via `flatgraph.misc.Misc.force_singlethreaded()`, for easier debugging. + +In order to quickly glance the input of flatgraph files, you can extract the manifest json with `tail`, e.g. `tail someGraph.fg | jless`: +Our output writer always places the manifest at the end, with a bunch of preceding newlines, such that this will not contain binary garbage. + +This is suitable for quick command-line debugging. However, that approach will fail if e.g. somebody appended two flatgraph files -- deserialization will +read the file from the beginning, and find the offset of the true manifest from the header, and ignore trailing garbage like an appended fake manifest. +So don't dare to do security checks with that! + + + + ## Core Features - [x] Access nodes and neighbors - [x] Add nodes and edges diff --git a/core/src/main/scala/flatgraph/DiffGraphApplier.scala b/core/src/main/scala/flatgraph/DiffGraphApplier.scala index 7c883657..80f5f33a 100644 --- a/core/src/main/scala/flatgraph/DiffGraphApplier.scala +++ b/core/src/main/scala/flatgraph/DiffGraphApplier.scala @@ -3,8 +3,9 @@ package flatgraph import DiffGraphBuilder.* import flatgraph.Edge.Direction import flatgraph.Edge.Direction.{Incoming, Outgoing} -import flatgraph.misc.SchemaViolationReporter +import flatgraph.misc.{Misc, SchemaViolationReporter} +import java.util.concurrent import scala.collection.{Iterator, mutable} object DiffGraphApplier { @@ -20,10 +21,11 @@ object DiffGraphApplier { def applyDiff( graph: Graph, diff: DiffGraphBuilder, - schemaViolationReporter: SchemaViolationReporter = new SchemaViolationReporter + schemaViolationReporter: SchemaViolationReporter = new SchemaViolationReporter, + requestedExecutor: Option[concurrent.ExecutorService] = None ): Int = { if (graph.isClosed) throw new GraphClosedException(s"graph cannot be modified any longer since it's closed") - new DiffGraphApplier(graph, diff, schemaViolationReporter).applyUpdate() + new DiffGraphApplier(graph, diff, schemaViolationReporter, requestedExecutor).applyUpdate() } } @@ -33,7 +35,13 @@ abstract class NewNodePropertyInsertionHelper { /** The class that is responsible for applying diffgraphs. This is not supposed to be public API, users should stick to applyDiff */ -private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, schemaViolationReporter: SchemaViolationReporter) { +private[flatgraph] class DiffGraphApplier( + graph: Graph, + diff: DiffGraphBuilder, + schemaViolationReporter: SchemaViolationReporter, + requestedExecutor: Option[concurrent.ExecutorService] = None +) { + val executor = Misc.maybeOverrideExecutor(requestedExecutor) val newNodes = new Array[mutable.ArrayBuffer[DNode]](graph.schema.getNumberOfNodeKinds) // newEdges and delEdges are oversized, in order to permit usage of the same indexing function val newEdges = new Array[mutable.ArrayBuffer[AddEdgeProcessed]](graph.neighbors.size) @@ -43,6 +51,8 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, val delNodes = new mutable.ArrayBuffer[GNode]() val setNodeProperties = new Array[mutable.ArrayBuffer[Any]](graph.properties.size) val newNodeNewProperties = new Array[Int](graph.properties.size) + val newNodeUsers = new concurrent.atomic.AtomicIntegerArray(graph.schema.getNumberOfNodeKinds) + val jobQueue = mutable.ArrayBuffer[concurrent.Future[Unit]]() object NewNodeInterface extends BatchedUpdateInterface { override def visitContainedNode(contained: DNodeOrNode): Unit = { if (contained != null) getGNode(contained) } @@ -53,6 +63,10 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, } } + def submitJob[T](block: => Unit): Unit = { + jobQueue.addOne(executor.submit(() => block)) + } + private def insertProperty0(node: GNode, propertyKind: Int, propertyValues: Iterator[Any]): Unit = { val pos = graph.schema.propertyOffsetArrayIndex(node.nodeKind, propertyKind) if (0 > pos || pos >= setNodeProperties.length) { @@ -223,43 +237,62 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, val ndiff = splitUpdate() diff.buffer = null - // set edge properties - for { - nodeKind <- graph.schema.nodeKinds - edgeKind <- graph.schema.edgeKinds - direction <- Edge.Direction.values - } setEdgeProperty(nodeKind, direction, edgeKind) - - // remove edges - for { - nodeKind <- graph.schema.nodeKinds - edgeKind <- graph.schema.edgeKinds - direction <- Edge.Direction.values - } deleteEdges(nodeKind, direction, edgeKind) - - // add nodes - for (nodeKind <- graph.schema.nodeKinds) - addNodes(nodeKind) - - // delete nodes - if (delNodes.nonEmpty) { - deleteNodes() - } - - // add edges - for { - nodeKind <- graph.schema.nodeKinds - edgeKind <- graph.schema.edgeKinds - direction <- Direction.values - } addEdges(nodeKind, direction, edgeKind) + try { + // set edge properties + for { + nodeKind <- graph.schema.nodeKinds + edgeKind <- graph.schema.edgeKinds + direction <- Edge.Direction.values + } setEdgeProperty(nodeKind, direction, edgeKind) + + jobQueue.foreach(_.get()) + jobQueue.clear() + + // remove edges + for { + nodeKind <- graph.schema.nodeKinds + edgeKind <- graph.schema.edgeKinds + direction <- Edge.Direction.values + } deleteEdges(nodeKind, direction, edgeKind) + + jobQueue.foreach(_.get()) + jobQueue.clear() + + // add nodes. If necessary, we can start this job earlier in a separate queue; then we must add some extra structures for node sizes at various stages + for { nodeKind <- graph.schema.nodeKinds } addNodes(nodeKind) + + jobQueue.foreach(_.get()) + jobQueue.clear() + + // delete nodes + // this is harder to parallelize, only do if needed! + if (delNodes.nonEmpty) { + deleteNodes() + } - // set node properties - for (nodeKind <- graph.schema.nodeKinds) { - for (propertyKind <- graph.schema.propertyKinds) { - setNodeProperties(nodeKind, propertyKind) + // add edges + for { + nodeKind <- graph.schema.nodeKinds + edgeKind <- graph.schema.edgeKinds + direction <- Direction.values + } addEdges(nodeKind, direction, edgeKind) + + // set node properties. This doesn't need to wait for add edges to be complete! + for (nodeKind <- graph.schema.nodeKinds) { + for { propertyKind <- graph.schema.propertyKinds } setNodeProperties(nodeKind, propertyKind) + if (newNodeUsers.decrementAndGet(nodeKind) == -1) { + // whoever reaches newNodeUsers(nodeKind) == -1 first is permitted to free it. + newNodes(nodeKind) = null + } } - // we can now clear the newnodes - newNodes(nodeKind) = null + + jobQueue.foreach(_.get()) + jobQueue.clear() + } catch { + case ex: java.util.concurrent.ExecutionException => + // the whole parallelization / executor stuff wraps exceptions + // we unwrap them again, in order to not break tests for e.g. SchemaViolationException + throw ex.getCause() } ndiff } @@ -441,191 +474,214 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, if (newNodes(nodeKind) == null || newNodes(nodeKind).isEmpty) { return } - graph.nodesArray(nodeKind) = graph.nodesArray(nodeKind).appendedAll(newNodes(nodeKind).map(_.storedRef.get)) - graph.livingNodeCountByKind(nodeKind) += newNodes(nodeKind).size + submitJob { + graph.nodesArray(nodeKind) = graph.nodesArray(nodeKind).appendedAll(newNodes(nodeKind).iterator.map(_.storedRef.get)) + graph.livingNodeCountByKind(nodeKind) += newNodes(nodeKind).size + } } private def setEdgeProperty(nodeKind: Int, direction: Direction, edgeKind: Int): Unit = { val pos = graph.schema.neighborOffsetArrayIndex(nodeKind, direction, edgeKind) if (setEdgeProperties(pos) == null) return - val size = graph.neighbors(pos + 1).asInstanceOf[Array[GNode]].size - val oldQty = graph.neighbors(pos).asInstanceOf[Array[Int]] - val edgeProp = graph.neighbors(pos + 2) match { - case _: DefaultValue => - graph.schema.allocateEdgeProperty(nodeKind, direction, edgeKind, size) - case other => - other - // ^ change this once we switch away from full copy-on-write, see e.g. - // https://github.com/joernio/flatgraph/pull/163#discussion_r1537246314 - } - val propview = mutable.ArraySeq.make(edgeProp.asInstanceOf[Array[?]]).asInstanceOf[mutable.ArraySeq[Any]] - // this will fail if the edge doesn't support properties. todo: better error message - val default = graph.schema.allocateEdgeProperty(nodeKind, direction, edgeKind = edgeKind, size = 1)(0) - for (edgeRepr <- setEdgeProperties(pos)) { - val index = oldQty(edgeRepr.src.seq()) + edgeRepr.subSeq - 1 - propview(index) = if (edgeRepr.property == DefaultValue) default else edgeRepr.property + submitJob { + val size = graph.neighbors(pos + 1).asInstanceOf[Array[GNode]].size + val oldQty = graph.neighbors(pos).asInstanceOf[Array[Int]] + val edgeProp = graph.neighbors(pos + 2) match { + case _: DefaultValue => + graph.schema.allocateEdgeProperty(nodeKind, direction, edgeKind, size) + case other => + other + // ^ change this once we switch away from full copy-on-write, see e.g. + // https://github.com/joernio/flatgraph/pull/163#discussion_r1537246314 + } + val propview = mutable.ArraySeq.make(edgeProp.asInstanceOf[Array[?]]).asInstanceOf[mutable.ArraySeq[Any]] + // this will fail if the edge doesn't support properties. todo: better error message + val default = graph.schema.allocateEdgeProperty(nodeKind, direction, edgeKind = edgeKind, size = 1)(0) + for (edgeRepr <- setEdgeProperties(pos)) { + val index = oldQty(edgeRepr.src.seq()) + edgeRepr.subSeq - 1 + propview(index) = if (edgeRepr.property == DefaultValue) default else edgeRepr.property + } + graph.neighbors(pos + 2) = edgeProp + setEdgeProperties(pos) == null } - graph.neighbors(pos + 2) = edgeProp - setEdgeProperties(pos) == null } private def deleteEdges(nodeKind: Int, direction: Direction, edgeKind: Int): Unit = { - val pos = graph.schema.neighborOffsetArrayIndex(nodeKind, direction, edgeKind) - val deletions = delEdges(pos) - if (deletions == null) return - assert( - deletions.forall { edge => - edge.edgeKind == edgeKind && edge.src.nodeKind == nodeKind && edge.subSeq > 0 - }, - s"something went wrong when deleting edges - values for debugging: edgeKind=$edgeKind; nodeKind=$nodeKind" - ) - - deletions.sortInPlaceBy(numberForEdgeComparison) - dedupBy(deletions, numberForEdgeComparison) - val nodeCount = graph.nodeCountByKind(nodeKind) - val oldQty = graph.neighbors(pos).asInstanceOf[Array[Int]] - val oldNeighbors = graph.neighbors(pos + 1).asInstanceOf[Array[GNode]] - - val newQty = new Array[Int](nodeCount + 1) - val newNeighbors = new Array[GNode](get(oldQty, nodeCount) - deletions.length) - - val oldProperty = graph.neighbors(pos + 2) match { - case _: DefaultValue => null - case other => other - } - val newProperty = - if (oldProperty != null) graph.schema.allocateEdgeProperty(nodeKind, direction, edgeKind, newNeighbors.size) - else null - - var deletionCounter = 0 - var copyStartSeq = 0 - while (copyStartSeq < nodeCount) { - val deletionSeq = if (deletionCounter < deletions.size) deletions(deletionCounter).src.seq else nodeCount - // we first copy unaffected neighbors - val copyStartIndex = get(oldQty, copyStartSeq) - val deletionSeqIndexStart = get(oldQty, deletionSeq) - val deletionSeqIndexEnd = get(oldQty, deletionSeq + 1) - System.arraycopy(oldNeighbors, copyStartIndex, newNeighbors, copyStartIndex - deletionCounter, deletionSeqIndexStart - copyStartIndex) - if (oldProperty != null) - System.arraycopy(oldProperty, copyStartIndex, newProperty, copyStartIndex - deletionCounter, deletionSeqIndexStart - copyStartIndex) - - for (idx <- Range(copyStartSeq, deletionSeq + 1)) - newQty(idx) = get(oldQty, idx) - deletionCounter - - copyStartSeq = deletionSeq + 1 - // we now copy over the non-deleted edges of the critical deletionSeq - if (deletionCounter < deletions.size) { - var deletion = deletions(deletionCounter) - var idx = 0 - while (idx < deletionSeqIndexEnd - deletionSeqIndexStart) { - if (deletion != null && idx == deletion.subSeq - 1) { - assert( - deletion.dst == oldNeighbors(deletionSeqIndexStart + idx), - s"deletion.dst was supposed to be `${oldNeighbors(deletionSeqIndexStart + idx)}`, but instead is ${deletion.dst}" - ) - deletionCounter += 1 - deletion = if (deletionCounter < deletions.size) deletions(deletionCounter) else null - if (deletion != null && deletion.src.seq() != deletionSeq) deletion = null - } else { - newNeighbors(deletionSeqIndexStart + idx - deletionCounter) = oldNeighbors(deletionSeqIndexStart + idx) - if (oldProperty != null) - System.arraycopy(oldProperty, deletionSeqIndexStart + idx, newProperty, deletionSeqIndexStart + idx - deletionCounter, 1) + val pos = graph.schema.neighborOffsetArrayIndex(nodeKind, direction, edgeKind) + if (delEdges(pos) == null) return + submitJob { + val deletions = delEdges(pos) + assert( + deletions.forall { edge => + edge.edgeKind == edgeKind && edge.src.nodeKind == nodeKind && edge.subSeq > 0 + }, + s"something went wrong when deleting edges - values for debugging: edgeKind=$edgeKind; nodeKind=$nodeKind" + ) + + deletions.sortInPlaceBy(numberForEdgeComparison) + dedupBy(deletions, numberForEdgeComparison) + val nodeCount = graph.nodeCountByKind(nodeKind) + val oldQty = graph.neighbors(pos).asInstanceOf[Array[Int]] + val oldNeighbors = graph.neighbors(pos + 1).asInstanceOf[Array[GNode]] + + val newQty = new Array[Int](nodeCount + 1) + val newNeighbors = new Array[GNode](get(oldQty, nodeCount) - deletions.length) + + val oldProperty = graph.neighbors(pos + 2) match { + case _: DefaultValue => null + case other => other + } + val newProperty = + if (oldProperty != null) graph.schema.allocateEdgeProperty(nodeKind, direction, edgeKind, newNeighbors.size) + else null + + var deletionCounter = 0 + var copyStartSeq = 0 + while (copyStartSeq < nodeCount) { + val deletionSeq = if (deletionCounter < deletions.size) deletions(deletionCounter).src.seq else nodeCount + // we first copy unaffected neighbors + val copyStartIndex = get(oldQty, copyStartSeq) + val deletionSeqIndexStart = get(oldQty, deletionSeq) + val deletionSeqIndexEnd = get(oldQty, deletionSeq + 1) + System.arraycopy( + oldNeighbors, + copyStartIndex, + newNeighbors, + copyStartIndex - deletionCounter, + deletionSeqIndexStart - copyStartIndex + ) + if (oldProperty != null) + System.arraycopy( + oldProperty, + copyStartIndex, + newProperty, + copyStartIndex - deletionCounter, + deletionSeqIndexStart - copyStartIndex + ) + + for (idx <- Range(copyStartSeq, deletionSeq + 1)) + newQty(idx) = get(oldQty, idx) - deletionCounter + + copyStartSeq = deletionSeq + 1 + // we now copy over the non-deleted edges of the critical deletionSeq + if (deletionCounter < deletions.size) { + var deletion = deletions(deletionCounter) + var idx = 0 + while (idx < deletionSeqIndexEnd - deletionSeqIndexStart) { + if (deletion != null && idx == deletion.subSeq - 1) { + assert( + deletion.dst == oldNeighbors(deletionSeqIndexStart + idx), + s"deletion.dst was supposed to be `${oldNeighbors(deletionSeqIndexStart + idx)}`, but instead is ${deletion.dst}" + ) + deletionCounter += 1 + deletion = if (deletionCounter < deletions.size) deletions(deletionCounter) else null + if (deletion != null && deletion.src.seq() != deletionSeq) deletion = null + } else { + newNeighbors(deletionSeqIndexStart + idx - deletionCounter) = oldNeighbors(deletionSeqIndexStart + idx) + if (oldProperty != null) + System.arraycopy(oldProperty, deletionSeqIndexStart + idx, newProperty, deletionSeqIndexStart + idx - deletionCounter, 1) + } + idx += 1 } - idx += 1 + newQty(deletionSeq + 1) = deletionSeqIndexStart + idx - deletionCounter } - newQty(deletionSeq + 1) = deletionSeqIndexStart + idx - deletionCounter } + graph.neighbors(pos) = newQty + graph.neighbors(pos + 1) = newNeighbors + graph.neighbors(pos + 2) = newProperty match { + case null => graph.neighbors(pos + 2) + case other => other + } + delEdges(pos) = null } - graph.neighbors(pos) = newQty - graph.neighbors(pos + 1) = newNeighbors - graph.neighbors(pos + 2) = newProperty match { - case null => graph.neighbors(pos + 2) - case other => other - } - delEdges(pos) = null } private def addEdges(nodeKind: Int, direction: Direction, edgeKind: Int): Unit = { - val pos = graph.schema.neighborOffsetArrayIndex(nodeKind, direction, edgeKind) - val insertions = newEdges(pos) - if (insertions == null) { + val pos = graph.schema.neighborOffsetArrayIndex(nodeKind, direction, edgeKind) + if (newEdges(pos) == null) { return } + submitJob { + val insertions = newEdges(pos) - insertions.sortInPlaceBy(_.src.seq) + insertions.sortInPlaceBy(_.src.seq) - assert(insertions.nonEmpty, "insertions must be nonEmpty") - assert( - insertions.forall(edge => edge.src.nodeKind == nodeKind && edge.edgeKind == edgeKind), - s"something went wrong while adding edges - values for debugging: nodeKind=$nodeKind; edgeKind=$edgeKind" - ) + assert(insertions.nonEmpty, "insertions must be nonEmpty") + assert( + insertions.forall(edge => edge.src.nodeKind == nodeKind && edge.edgeKind == edgeKind), + s"something went wrong while adding edges - values for debugging: nodeKind=$nodeKind; edgeKind=$edgeKind" + ) - val nodeCount = graph.nodesArray(nodeKind).length - val oldQty = Option(graph.neighbors(pos).asInstanceOf[Array[Int]]).getOrElse(new Array[Int](1)) - val oldNeighbors = Option(graph.neighbors(pos + 1).asInstanceOf[Array[GNode]]).getOrElse(new Array[GNode](0)) - val newQty = new Array[Int](nodeCount + 1) - val newNeighbors = new Array[GNode](get(oldQty, nodeCount) + insertions.size) + val nodeCount = graph.nodesArray(nodeKind).length + val oldQty = Option(graph.neighbors(pos).asInstanceOf[Array[Int]]).getOrElse(new Array[Int](1)) + val oldNeighbors = Option(graph.neighbors(pos + 1).asInstanceOf[Array[GNode]]).getOrElse(new Array[GNode](0)) + val newQty = new Array[Int](nodeCount + 1) + val newNeighbors = new Array[GNode](get(oldQty, nodeCount) + insertions.size) - val hasNewProp = insertions.exists(_.property != DefaultValue) - val oldProperty = graph.neighbors(pos + 2) match { - case _: DefaultValue => null - case other => other - } - val newProperty = - if (hasNewProp || oldProperty != null) graph.schema.allocateEdgeProperty(nodeKind, direction, edgeKind, newNeighbors.size) else null - val newPropertyView = mutable.ArraySeq.make(newProperty).asInstanceOf[mutable.ArraySeq[Any]] - - var insertionCounter = 0 - var copyStartSeq = 0 - while (copyStartSeq < nodeCount) { - val insertionSeq = if (insertionCounter < insertions.size) insertions(insertionCounter).src.seq else nodeCount - 1 - - val copyStartIdx = get(oldQty, copyStartSeq) - val insertionIdx = get(oldQty, insertionSeq + 1) - - // copy from copyStartSeq -> insertionSeq inclusive - System.arraycopy(oldNeighbors, copyStartIdx, newNeighbors, copyStartIdx + insertionCounter, insertionIdx - copyStartIdx) - if (oldProperty != null) - System.arraycopy(oldProperty, copyStartIdx, newProperty, copyStartIdx + insertionCounter, insertionIdx - copyStartIdx) - for (idx <- Range(copyStartSeq, insertionSeq + 2)) - newQty(idx) = get(oldQty, idx) + insertionCounter - - // insert - val insertionBaseIndex = newQty(insertionSeq + 1) - insertionCounter - while (insertionCounter < insertions.size && insertions(insertionCounter).src.seq == insertionSeq) { - val insertion = insertions(insertionCounter) - newNeighbors(insertionBaseIndex + insertionCounter) = insertion.dst - if (newPropertyView != null && insertion.property != DefaultValue) { - try { - newPropertyView(insertionBaseIndex + insertionCounter) = insertion.property - } catch { - case _: ArrayStoreException | _: ClassCastException => - val edgeType = graph.schema.getEdgeLabel(nodeKind, edgeKind) - throw new UnsupportedOperationException( - s"unsupported property type `${insertion.property.getClass}` for edge type `$edgeType`" - ) + val hasNewProp = insertions.exists(_.property != DefaultValue) + val oldProperty = graph.neighbors(pos + 2) match { + case _: DefaultValue => null + case other => other + } + val newProperty = + if (hasNewProp || oldProperty != null) graph.schema.allocateEdgeProperty(nodeKind, direction, edgeKind, newNeighbors.size) else null + val newPropertyView = mutable.ArraySeq.make(newProperty).asInstanceOf[mutable.ArraySeq[Any]] + + var insertionCounter = 0 + var copyStartSeq = 0 + while (copyStartSeq < nodeCount) { + val insertionSeq = if (insertionCounter < insertions.size) insertions(insertionCounter).src.seq else nodeCount - 1 + + val copyStartIdx = get(oldQty, copyStartSeq) + val insertionIdx = get(oldQty, insertionSeq + 1) + + // copy from copyStartSeq -> insertionSeq inclusive + System.arraycopy(oldNeighbors, copyStartIdx, newNeighbors, copyStartIdx + insertionCounter, insertionIdx - copyStartIdx) + if (oldProperty != null) + System.arraycopy(oldProperty, copyStartIdx, newProperty, copyStartIdx + insertionCounter, insertionIdx - copyStartIdx) + for (idx <- Range(copyStartSeq, insertionSeq + 2)) + newQty(idx) = get(oldQty, idx) + insertionCounter + + // insert + val insertionBaseIndex = newQty(insertionSeq + 1) - insertionCounter + while (insertionCounter < insertions.size && insertions(insertionCounter).src.seq == insertionSeq) { + val insertion = insertions(insertionCounter) + newNeighbors(insertionBaseIndex + insertionCounter) = insertion.dst + if (newPropertyView != null && insertion.property != DefaultValue) { + try { + newPropertyView(insertionBaseIndex + insertionCounter) = insertion.property + } catch { + case _: ArrayStoreException | _: ClassCastException => + val edgeType = graph.schema.getEdgeLabel(nodeKind, edgeKind) + throw new UnsupportedOperationException( + s"unsupported property type `${insertion.property.getClass}` for edge type `$edgeType`" + ) + } } + insertionCounter += 1 } - insertionCounter += 1 + newQty(insertionSeq + 1) = insertionBaseIndex + insertionCounter + copyStartSeq = insertionSeq + 1 } - newQty(insertionSeq + 1) = insertionBaseIndex + insertionCounter - copyStartSeq = insertionSeq + 1 - } - graph.neighbors(pos) = newQty - graph.neighbors(pos + 1) = newNeighbors - graph.neighbors(pos + 2) = newProperty match { - case null => graph.neighbors(pos + 2) - case other => other + graph.neighbors(pos) = newQty + graph.neighbors(pos + 1) = newNeighbors + graph.neighbors(pos + 2) = newProperty match { + case null => graph.neighbors(pos + 2) + case other => other + } + newEdges(pos) = null } - newEdges(pos) = null } private def setNodeProperties(nodeKind: Int, propertyKind: Int): Unit = { - val schema = graph.schema - val pos = schema.propertyOffsetArrayIndex(nodeKind, propertyKind) - val viaNewNode = newNodeNewProperties(pos) - val propertyBuf = Option(setNodeProperties(pos)).getOrElse(mutable.ArrayBuffer.empty) - if (setNodeProperties(pos) != null || viaNewNode > 0) { + val schema = graph.schema + val pos = schema.propertyOffsetArrayIndex(nodeKind, propertyKind) + val viaNewNode = newNodeNewProperties(pos) + if (setNodeProperties(pos) == null && viaNewNode == 0) return + newNodeUsers.incrementAndGet(nodeKind) + + submitJob { + val propertyBuf = Option(setNodeProperties(pos)).getOrElse(mutable.ArrayBuffer.empty) val setPropertyPositions = Option(setNodeProperties(pos + 1)).getOrElse(mutable.ArrayBuffer.empty).asInstanceOf[mutable.ArrayBuffer[SetPropertyDesc]] graph.inverseIndices.set(pos, null) @@ -689,6 +745,11 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, graph.properties(pos + 1) = newProperty setNodeProperties(pos) = null setNodeProperties(pos + 1) = null + if (newNodeUsers.decrementAndGet(nodeKind) == -1) { + // whoever reaches newNodeUsers(nodeKind) == -1 first is permitted to free it. + newNodes(nodeKind) = null + } + } } } @@ -698,9 +759,12 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, // this is a dirty hack in order to make scala type system shut up buf.asInstanceOf[mutable.ArrayBuffer[T]].copyToArray(dst) } catch { - case _: ArrayStoreException => + case store: ArrayStoreException => val typeMaybe = buf.headOption.map(property => s": ${property.getClass}").getOrElse("") - throw new UnsupportedOperationException(s"unsupported property type$typeMaybe") + throw new UnsupportedOperationException( + s"unsupported property type$typeMaybe on ${Option(dst).map { _.getClass.toString }.orNull}", + store + ) } } diff --git a/core/src/main/scala/flatgraph/misc/Misc.scala b/core/src/main/scala/flatgraph/misc/Misc.scala new file mode 100644 index 00000000..e1eb2842 --- /dev/null +++ b/core/src/main/scala/flatgraph/misc/Misc.scala @@ -0,0 +1,27 @@ +package flatgraph.misc + +import java.util.concurrent + +object Misc { + + @volatile var _overrideExecutor: Option[concurrent.ExecutorService] => concurrent.ExecutorService = defaultExecutorProvider + + def force_singlethreaded(): Unit = { + // This one is magic -- it can get garbage collected, no manual shutdown required! + // Unfortunately this behavior is apparently not documented officially :( + // But that is the behavior on java8 and java23 and, presumably, everywhere we care about: + // https://github.com/openjdk/jdk8u/blob/1a6e3a5ea32d5c671cb46a590046f16426089921/jdk/src/share/classes/java/util/concurrent/Executors.java#L170 + // https://github.com/openjdk/jdk23u/blob/9101cc14972ce6bdeb966e67bcacc8b693c37d0a/src/java.base/share/classes/java/util/concurrent/Executors.java#L192 + this._overrideExecutor = (something: Option[concurrent.ExecutorService]) => concurrent.Executors.newSingleThreadExecutor() + } + + def defaultExecutorProvider(requested: Option[concurrent.ExecutorService]): concurrent.ExecutorService = requested.getOrElse { + java.lang.Thread.currentThread() match { + case fjt: concurrent.ForkJoinWorkerThread => fjt.getPool + case _ => concurrent.ForkJoinPool.commonPool() + } + } + + def maybeOverrideExecutor(requested: Option[concurrent.ExecutorService]): concurrent.ExecutorService = + this._overrideExecutor.apply(requested) +} diff --git a/core/src/main/scala/flatgraph/storage/Deserialization.scala b/core/src/main/scala/flatgraph/storage/Deserialization.scala index 0244e48c..355beef8 100644 --- a/core/src/main/scala/flatgraph/storage/Deserialization.scala +++ b/core/src/main/scala/flatgraph/storage/Deserialization.scala @@ -1,8 +1,8 @@ package flatgraph.storage -import com.github.luben.zstd.Zstd -import flatgraph.* +import flatgraph.{AccessHelpers, FreeSchema, GNode, Graph, Schema} import flatgraph.Edge.Direction +import flatgraph.misc.Misc import flatgraph.storage.Manifest.{GraphItem, OutlineStorage} import java.nio.channels.FileChannel @@ -11,15 +11,30 @@ import java.nio.file.Path import java.nio.{ByteBuffer, ByteOrder} import java.util.Arrays import scala.collection.mutable +import java.util.concurrent object Deserialization { - def readGraph(storagePath: Path, schemaMaybe: Option[Schema], persistOnClose: Boolean = true): Graph = { + def readGraph( + storagePath: Path, + schemaMaybe: Option[Schema], + persistOnClose: Boolean = true, + requestedExecutor: Option[concurrent.ExecutorService] = None + ): Graph = { + val executor = Misc.maybeOverrideExecutor(requestedExecutor) val fileChannel = new java.io.RandomAccessFile(storagePath.toAbsolutePath.toFile, "r").getChannel + val queue = mutable.ArrayBuffer[concurrent.Future[Any]]() + val zstdCtx = new ZstdWrapper.ZstdCtx + def submitJob[T](block: => T): concurrent.Future[T] = { + val res = executor.submit((() => block)) + queue.addOne(res.asInstanceOf[concurrent.Future[Any]]) + res + } + try { // fixme: Use convenience methods from schema to translate string->id. Fix after we get strict schema checking. val manifest = GraphItem.read(readManifest(fileChannel)) - val pool = readPool(manifest, fileChannel) + val pool = submitJob { readPool(manifest, fileChannel, zstdCtx) } val schema = schemaMaybe.getOrElse(freeSchemaFromManifest(manifest)) val storagePathMaybe = if (persistOnClose) Option(storagePath) @@ -27,13 +42,11 @@ object Deserialization { val g = new Graph(schema, storagePathMaybe) val nodekinds = mutable.HashMap[String, Short]() for (nodeKind <- g.schema.nodeKinds) nodekinds(g.schema.getNodeLabel(nodeKind)) = nodeKind.toShort - val kindRemapper = Array.fill(manifest.nodes.size)(-1.toShort) val nodeRemapper = new Array[Array[GNode]](manifest.nodes.length) for { (nodeItem, idx) <- manifest.nodes.zipWithIndex nodeKind <- nodekinds.get(nodeItem.nodeLabel) } { - kindRemapper(idx) = nodeKind val nodes = new Array[GNode](nodeItem.nnodes) for (seq <- Range(0, nodes.length)) nodes(seq) = g.schema.makeNode(g, nodeKind, seq) g.nodesArray(nodeKind) = nodes @@ -66,11 +79,17 @@ object Deserialization { val direction = Direction.fromOrdinal(edgeItem.inout) if (nodeKind.isDefined && edgeKind.isDefined) { val pos = g.schema.neighborOffsetArrayIndex(nodeKind.get, direction, edgeKind.get) - g.neighbors(pos) = deltaDecode(readArray(fileChannel, edgeItem.qty, nodeRemapper, pool).asInstanceOf[Array[Int]]) - g.neighbors(pos + 1) = readArray(fileChannel, edgeItem.neighbors, nodeRemapper, pool) - val property = readArray(fileChannel, edgeItem.property, nodeRemapper, pool) - if (property != null) - g.neighbors(pos + 2) = property + submitJob { + g.neighbors(pos) = deltaDecode(readArray(fileChannel, edgeItem.qty, nodeRemapper, pool, zstdCtx).asInstanceOf[Array[Int]]) + } + submitJob { + g.neighbors(pos + 1) = readArray(fileChannel, edgeItem.neighbors, nodeRemapper, pool, zstdCtx) + } + submitJob { + val property = readArray(fileChannel, edgeItem.property, nodeRemapper, pool, zstdCtx) + if (property != null) + g.neighbors(pos + 2) = property + } } } @@ -91,12 +110,18 @@ object Deserialization { val propertyKind = propertykinds.get((property.nodeLabel, property.propertyLabel)) if (nodeKind.isDefined && propertyKind.isDefined) { val pos = g.schema.propertyOffsetArrayIndex(nodeKind.get, propertyKind.get) - g.properties(pos) = deltaDecode(readArray(fileChannel, property.qty, nodeRemapper, pool).asInstanceOf[Array[Int]]) - g.properties(pos + 1) = readArray(fileChannel, property.property, nodeRemapper, pool) + submitJob { + g.properties(pos) = deltaDecode(readArray(fileChannel, property.qty, nodeRemapper, pool, zstdCtx).asInstanceOf[Array[Int]]) + } + submitJob { g.properties(pos + 1) = readArray(fileChannel, property.property, nodeRemapper, pool, zstdCtx) } } } + queue.foreach { _.get() } g - } finally fileChannel.close() + } catch { + case ex: java.util.concurrent.ExecutionException => + throw ex.getCause() + } finally { fileChannel.close(); zstdCtx.close(); } } private def freeSchemaFromManifest(manifest: Manifest.GraphItem): FreeSchema = { @@ -171,23 +196,17 @@ object Deserialization { } - private def readPool(manifest: GraphItem, fileChannel: FileChannel): Array[String] = { - val stringPoolLength = ZstdWrapper( - Zstd - .decompress( - fileChannel.map(FileChannel.MapMode.READ_ONLY, manifest.stringPoolLength.startOffset, manifest.stringPoolLength.compressedLength), - manifest.stringPoolLength.decompressedLength - ) - .order(ByteOrder.LITTLE_ENDIAN) - ) - val stringPoolBytes = ZstdWrapper( - Zstd - .decompress( - fileChannel.map(FileChannel.MapMode.READ_ONLY, manifest.stringPoolBytes.startOffset, manifest.stringPoolBytes.compressedLength), - manifest.stringPoolBytes.decompressedLength - ) - .order(ByteOrder.LITTLE_ENDIAN) - ) + private def readPool(manifest: GraphItem, fileChannel: FileChannel, zstdCtx: ZstdWrapper.ZstdCtx): Array[String] = { + val stringPoolLength = zstdCtx + .decompress( + fileChannel.map(FileChannel.MapMode.READ_ONLY, manifest.stringPoolLength.startOffset, manifest.stringPoolLength.compressedLength), + manifest.stringPoolLength.decompressedLength + ) + val stringPoolBytes = zstdCtx + .decompress( + fileChannel.map(FileChannel.MapMode.READ_ONLY, manifest.stringPoolBytes.startOffset, manifest.stringPoolBytes.compressedLength), + manifest.stringPoolBytes.decompressedLength + ) val poolBytes = new Array[Byte](manifest.stringPoolBytes.decompressedLength) stringPoolBytes.get(poolBytes) val pool = new Array[String](manifest.stringPoolLength.decompressedLength >> 2) @@ -215,11 +234,18 @@ object Deserialization { a } - private def readArray(channel: FileChannel, ptr: OutlineStorage, nodes: Array[Array[GNode]], stringPool: Array[String]): Array[?] = { + private def readArray( + channel: FileChannel, + ptr: OutlineStorage, + nodes: Array[Array[GNode]], + stringPoolFuture: concurrent.Future[Array[String]], + zstdCtx: ZstdWrapper.ZstdCtx + ): Array[?] = { if (ptr == null) return null - val dec = ZstdWrapper( - Zstd.decompress(channel.map(FileChannel.MapMode.READ_ONLY, ptr.startOffset, ptr.compressedLength), ptr.decompressedLength) - ).order(ByteOrder.LITTLE_ENDIAN) + if (ptr.typ == StorageType.String) stringPoolFuture.get() + + val dec = + zstdCtx.decompress(channel.map(FileChannel.MapMode.READ_ONLY, ptr.startOffset, ptr.compressedLength), ptr.decompressedLength) ptr.typ match { case StorageType.Bool => val bytes = new Array[Byte](dec.limit()) @@ -253,9 +279,10 @@ object Deserialization { dec.asDoubleBuffer().get(res) res case StorageType.String => - val res = new Array[String](dec.limit() >> 2) - val intbuf = dec.asIntBuffer() - var idx = 0 + val stringPool = stringPoolFuture.get() + val res = new Array[String](dec.limit() >> 2) + val intbuf = dec.asIntBuffer() + var idx = 0 while (idx < res.length) { val offset = intbuf.get(idx) if (offset >= 0) res(idx) = stringPool(offset) diff --git a/core/src/main/scala/flatgraph/storage/Manifest.scala b/core/src/main/scala/flatgraph/storage/Manifest.scala index f6fb853d..b5058f81 100644 --- a/core/src/main/scala/flatgraph/storage/Manifest.scala +++ b/core/src/main/scala/flatgraph/storage/Manifest.scala @@ -34,8 +34,8 @@ object Manifest { var nodes: Array[NodeItem], var edges: Array[EdgeItem], var properties: Array[PropertyItem], - val stringPoolLength: OutlineStorage, - val stringPoolBytes: OutlineStorage + val stringPoolLength: OutlineStorage = new OutlineStorage(StorageType.Int), + val stringPoolBytes: OutlineStorage = new OutlineStorage(StorageType.Byte) ) { var version = 0 } @@ -96,9 +96,9 @@ object Manifest { val nodeLabel: String, val edgeLabel: String, val inout: Byte, // 0: Incoming, 1: Outgoing; see Edge.Direction enum - var qty: OutlineStorage, - var neighbors: OutlineStorage, - var property: OutlineStorage + var qty: OutlineStorage = new OutlineStorage, + var neighbors: OutlineStorage = new OutlineStorage, + var property: OutlineStorage = new OutlineStorage ) { Edge.Direction.verifyEncodingRange(inout) } @@ -122,11 +122,20 @@ object Manifest { } } - class PropertyItem(val nodeLabel: String, val propertyLabel: String, var qty: OutlineStorage, var property: OutlineStorage) + class PropertyItem( + val nodeLabel: String, + val propertyLabel: String, + var qty: OutlineStorage = new OutlineStorage, + var property: OutlineStorage = new OutlineStorage + ) object OutlineStorage { def write(item: OutlineStorage): ujson.Value = { if (item == null) return ujson.Null + if (item.typ == null) { + assert(item.startOffset == -1L && item.compressedLength == -1 && item.decompressedLength == -1, s"bad OutlineStorage ${item}") + return ujson.Null + } val res = ujson.Obj() res(Keys.Type) = item.typ res(Keys.StartOffset) = ujson.Num(item.startOffset.toDouble) @@ -143,7 +152,8 @@ object Manifest { def read(item: ujson.Value): OutlineStorage = { if (item.isNull) return null - val res = new OutlineStorage(item.obj(Keys.Type).str) + val res = new OutlineStorage + res.typ = item.obj(Keys.Type).str res.startOffset = item.obj(Keys.StartOffset).num.toLong res.compressedLength = item.obj(Keys.CompressedLength).num.toInt res.decompressedLength = item.obj(Keys.DecompressedLength).num.toInt @@ -151,9 +161,16 @@ object Manifest { } } - class OutlineStorage(var typ: String) { + class OutlineStorage { + var typ: String = null var startOffset: Long = -1L var compressedLength: Int = -1 var decompressedLength: Int = -1 + def this(_typ: String) = { + this() + this.typ = _typ + } + + override def toString: String = super.toString + s"($typ, $startOffset, $compressedLength, $decompressedLength)" } } diff --git a/core/src/main/scala/flatgraph/storage/Serialization.scala b/core/src/main/scala/flatgraph/storage/Serialization.scala index f0b3307d..a1f28ca1 100644 --- a/core/src/main/scala/flatgraph/storage/Serialization.scala +++ b/core/src/main/scala/flatgraph/storage/Serialization.scala @@ -3,6 +3,7 @@ package flatgraph.storage import com.github.luben.zstd.Zstd import flatgraph.* import flatgraph.Edge.Direction +import flatgraph.misc.Misc import flatgraph.storage.Manifest.* import java.io.ByteArrayOutputStream @@ -12,34 +13,235 @@ import java.nio.file.{Files, Path} import java.nio.{ByteBuffer, ByteOrder} import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable +import java.util.concurrent +import java.util.concurrent.TimeUnit + +class WriterContext(val fileChannel: FileChannel, val executor: concurrent.ExecutorService) { + + val compressCtx = new ZstdWrapper.ZstdCtx + var fileOffset: Long = 16L + val compressQueue = mutable.ArrayDeque[concurrent.Future[(OutlineStorage, ByteBuffer)]]() + val writeQueue = mutable.ArrayDeque[concurrent.Future[Any]]() + val jobQueue = mutable.ArrayBuffer[() => (OutlineStorage, Array[Byte])]() + val stringQueue = mutable.ArrayDeque[(OutlineStorage, Array[String])]() + val stringpool = mutable.LinkedHashMap[String, Int]() + + def submitCompress(block: => (OutlineStorage, ByteBuffer)): Unit = { + compressQueue.addOne(executor.submit((() => block))) + } + + private def deltaEncode(padTo: Int, offsets: Array[Int]): Array[Int] = { + if (offsets == null) null + else { + // the array has one more element than needed, in order to permit deltaDecode to be in-place + val res = new Array[Int](padTo + 1) + assert(offsets.length == 0 || offsets(0) == 0) + var idx = 0 + val until = math.min(offsets.length - 1, padTo) + while (idx < until) { + res(idx) = offsets(idx + 1) - offsets(idx) + idx += 1 + } + res + } + } + + // NOT threadsafe! + private def insertString(stringPool: mutable.LinkedHashMap[String, Int])(s: String): Int = { + if (s == null) -1 + else stringPool.getOrElseUpdate(s, stringPool.size) + } + + private[flatgraph] def encodeAny(item: Any, outlineStorage: OutlineStorage = new OutlineStorage, delta: Int = -1): OutlineStorage = { + item match { + case _: DefaultValue => null + case null => null + case bools: Array[Boolean] => + outlineStorage.typ = StorageType.Bool + this.submitCompress { + val bytes = bools.map { b => if (b) 1.toByte else 0.toByte } + compressItem(outlineStorage, bytes) + } + case bytes: Array[Byte] => + outlineStorage.typ = StorageType.Byte + this.submitCompress { + compressItem(outlineStorage, bytes) + } + case shorts: Array[Short] => + outlineStorage.typ = StorageType.Short + this.submitCompress { + val bytes = new Array[Byte](2 * shorts.length) + ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().put(shorts) + compressItem(outlineStorage, bytes) + } + case ints: Array[Int] => + outlineStorage.typ = StorageType.Int + this.submitCompress { + val tmpInts = if (delta == -1) ints else deltaEncode(delta, ints) + val bytes = new Array[Byte](4 * tmpInts.length) + ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(tmpInts) + compressItem(outlineStorage, bytes) + } + case longs: Array[Long] => + outlineStorage.typ = StorageType.Long + this.submitCompress { + val bytes = new Array[Byte](8 * longs.length) + ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().put(longs) + compressItem(outlineStorage, bytes) + } + case floats: Array[Float] => + outlineStorage.typ = StorageType.Float + this.submitCompress { + val bytes = new Array[Byte](4 * floats.length) + ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer().put(floats) + compressItem(outlineStorage, bytes) + } + case doubles: Array[Double] => + outlineStorage.typ = StorageType.Double + this.submitCompress { + val bytes = new Array[Byte](8 * doubles.length) + ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer().put(doubles) + compressItem(outlineStorage, bytes) + } + case refs: Array[GNode] => + outlineStorage.typ = StorageType.Ref + this.submitCompress { + val bytes = new Array[Byte](8 * refs.length) + val buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN) + for (ref <- refs) { + if (ref == null) buf.putLong(0x0000ffffffffffffL) else buf.putLong(ref.seq().toLong + (ref.nodeKind.toLong << 32)) + } + compressItem(outlineStorage, bytes) + } + case strings: Array[String] => + outlineStorage.typ = StorageType.String + this.stringQueue.addOne((outlineStorage, strings)) + /* + val indices = strings.map(insertString(stringPool)) + val bytes = new Array[Byte](4 * strings.length) + ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(indices) + write(bytes, outlineStorage, filePtr, fileChannel) + */ + } + outlineStorage + } + + def compressItem(res: OutlineStorage, bytes: Array[Byte]): (OutlineStorage, ByteBuffer) = { + res.decompressedLength = bytes.length + val compressed = this.compressCtx.compress(bytes) + (res, compressed) + } + + // NOT threadsafe! + def writeItem(res: OutlineStorage, buf: ByteBuffer): Unit = { + res.compressedLength = buf.limit() + val outPos0 = this.fileOffset + this.fileOffset += buf.limit() + res.startOffset = outPos0 + + this.writeQueue.addOne(this.executor.submit(() => { + var outPos = outPos0 + while (buf.hasRemaining()) { + outPos += this.fileChannel.write(buf, outPos) + } + null + })) + } + + def finish(manifest: Manifest.GraphItem): Unit = { + // writes header etc + + // OK, all jobs have been submitted. Let's clear the stringqueue. + while (stringQueue.nonEmpty) { + while (writeQueue.nonEmpty && writeQueue.head.isDone) { + writeQueue.removeHead() + } + while (compressQueue.nonEmpty && compressQueue.head.isDone) { + val (item, bytes) = compressQueue.removeHead().get() + writeItem(item, bytes) + } + val (item, strings) = stringQueue.removeHead() + val indices = strings.map(insertString(stringpool)) + submitCompress { + val bytes = new Array[Byte](4 * strings.length) + ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(indices) + compressItem(item, bytes) + } + } + // OK, stringpool is ready. + val poolLenBytes = new Array[Byte](4 * stringpool.size) + val poolLenBuffer = ByteBuffer.wrap(poolLenBytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer() + val poolBytes = new ByteArrayOutputStream() + for (s <- stringpool.keysIterator) { + val bytes = s.getBytes(StandardCharsets.UTF_8) + poolBytes.write(bytes) + poolLenBuffer.put(bytes.length) + } + submitCompress { + compressItem(manifest.stringPoolLength, poolLenBytes) + } + submitCompress { + compressItem(manifest.stringPoolBytes, poolBytes.toByteArray) + } + + // OK, all jobs submitted. Let's clear the queues. + while (compressQueue.nonEmpty) { + while (writeQueue.nonEmpty && writeQueue.head.isDone) { + writeQueue.removeHead() + } + val (item, bytes) = compressQueue.removeHead().get() + writeItem(item, bytes) + } + while (writeQueue.nonEmpty) { + writeQueue.removeHead().get() + } + + var pos = fileOffset + val header = new Array[Byte](16) + val headerBuf = ByteBuffer.wrap(header) + headerBuf.order(ByteOrder.LITTLE_ENDIAN).put(Keys.Header).asLongBuffer().put(pos) + headerBuf.position(0) + var headPos = 0L + while (headerBuf.hasRemaining()) { + headPos += fileChannel.write(headerBuf, headPos) + } + val manifestObj = GraphItem.write(manifest) + // we prefix the manifest with 20 newlines. That allows us to conveniently extract the valid json manifest via `tail` + val prefixForTail = "\n" * 20 + val buf = ByteBuffer.wrap((prefixForTail + manifestObj.render()).getBytes(StandardCharsets.UTF_8)) + + while (buf.hasRemaining()) { + pos += fileChannel.write(buf, pos) + } + fileChannel.truncate(pos) + } + +} object Serialization { - def writeGraph(g: Graph, storagePath: Path): Unit = { - val fileOffset = new AtomicLong(16) - val stringPool = mutable.LinkedHashMap[String, Int]() + def writeGraph(g: Graph, storagePath: Path, requestedExecutor: Option[concurrent.ExecutorService] = None): Unit = { // ensure parent directory exists val parentDir = storagePath.getParent if (Files.notExists(parentDir)) Files.createDirectories(parentDir) - val fileChannel = - new java.io.RandomAccessFile(storagePath.toAbsolutePath.toFile, "rw").getChannel + val fileChannel = new java.io.RandomAccessFile(storagePath.toAbsolutePath.toFile, "rw").getChannel + + val writer = new WriterContext(fileChannel, Misc.maybeOverrideExecutor(requestedExecutor)) try { - innerWriteGraph(g, stringPool, fileOffset, fileChannel) + innerWriteGraph(g, writer) + } catch { + case ex: java.util.concurrent.ExecutionException => + throw ex.getCause() } finally { - stringPool.clear() + writer.compressCtx.close() fileChannel.close() } } - private def innerWriteGraph( - g: Graph, - stringPool: mutable.LinkedHashMap[String, Int], - filePtr: AtomicLong, - fileChannel: FileChannel - ): Unit = { + private def innerWriteGraph(g: Graph, writer: WriterContext): Unit = { val nodes = mutable.ArrayBuffer.empty[NodeItem] val edges = mutable.ArrayBuffer.empty[EdgeItem] val properties = mutable.ArrayBuffer.empty[PropertyItem] @@ -62,12 +264,11 @@ object Serialization { if (g.neighbors(pos) != null) { val nodeLabel = g.schema.getNodeLabel(nodeKind) val edgeLabel = g.schema.getEdgeLabel(nodeKind, edgeKind) - val edgeItem = new Manifest.EdgeItem(nodeLabel, edgeLabel, direction.encoding, null, null, null) + val edgeItem = new Manifest.EdgeItem(nodeLabel, edgeLabel, direction.encoding) edges.addOne(edgeItem) - edgeItem.qty = - encodeAny(deltaEncode(g.nodeCountByKind(nodeKind), g.neighbors(pos).asInstanceOf[Array[Int]]), filePtr, stringPool, fileChannel) - edgeItem.neighbors = encodeAny(g.neighbors(pos + 1), filePtr, stringPool, fileChannel) - edgeItem.property = encodeAny(g.neighbors(pos + 2), filePtr, stringPool, fileChannel) + writer.encodeAny(g.neighbors(pos), edgeItem.qty, delta = g.nodeCountByKind(nodeKind)) + writer.encodeAny(g.neighbors(pos + 1), edgeItem.neighbors) + writer.encodeAny(g.neighbors(pos + 2), edgeItem.property) } } for { @@ -78,108 +279,16 @@ object Serialization { if (g.properties(pos) != null) { val nodeLabel = g.schema.getNodeLabel(nodeKind) val propertyLabel = g.schema.getPropertyLabel(nodeKind, propertyKind) - val propertyItem = new Manifest.PropertyItem(nodeLabel, propertyLabel, null, null) + val propertyItem = new Manifest.PropertyItem(nodeLabel, propertyLabel) properties.addOne(propertyItem) - propertyItem.qty = - encodeAny(deltaEncode(g.nodeCountByKind(nodeKind), g.properties(pos).asInstanceOf[Array[Int]]), filePtr, stringPool, fileChannel) - propertyItem.property = encodeAny(g.properties(pos + 1), filePtr, stringPool, fileChannel) + writer.encodeAny(g.properties(pos).asInstanceOf[Array[Int]], propertyItem.qty, delta = g.nodeCountByKind(nodeKind)) + writer.encodeAny(g.properties(pos + 1), propertyItem.property) } } - val poolLenBytes = new Array[Byte](4 * stringPool.size) - val poolLenBuffer = ByteBuffer.wrap(poolLenBytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer() - val poolBytes = new ByteArrayOutputStream() - for (s <- stringPool.keysIterator) { - val bytes = s.getBytes(StandardCharsets.UTF_8) - poolBytes.write(bytes) - poolLenBuffer.put(bytes.length) - } - val poolLensStored = new OutlineStorage(StorageType.Int) - val poolBytesStored = new OutlineStorage(StorageType.Byte) - write(poolLenBytes, poolLensStored, filePtr, fileChannel) - write(poolBytes.toByteArray, poolBytesStored, filePtr, fileChannel) - var pos = filePtr.get() - val header = new Array[Byte](16) - val headerBuf = ByteBuffer.wrap(header) - headerBuf.order(ByteOrder.LITTLE_ENDIAN).put(Keys.Header).asLongBuffer().put(pos) - headerBuf.position(0) - var headPos = 0L - while (headerBuf.hasRemaining()) { - headPos += fileChannel.write(headerBuf, headPos) - } + val manifest = new GraphItem(nodes.toArray, edges.toArray, properties.toArray) + writer.finish(manifest) - val manifest = new GraphItem(nodes.toArray, edges.toArray, properties.toArray, poolLensStored, poolBytesStored) - val manifestObj = GraphItem.write(manifest) - val buf = ByteBuffer.wrap(manifestObj.render().getBytes(StandardCharsets.UTF_8)) - - while (buf.hasRemaining()) { - pos += fileChannel.write(buf, pos) - } - fileChannel.truncate(pos) - } - - private def deltaEncode(padTo: Int, offsets: Array[Int]): Array[Int] = { - if (offsets == null) null - else { - // the array has one more element than needed, in order to permit deltaDecode to be in-place - val res = new Array[Int](padTo + 1) - assert(offsets.length == 0 || offsets(0) == 0) - var idx = 0 - val until = math.min(offsets.length - 1, padTo) - while (idx < until) { - res(idx) = offsets(idx + 1) - offsets(idx) - idx += 1 - } - res - } - } - - private[flatgraph] def encodeAny( - item: Any, - filePtr: AtomicLong, - stringPool: mutable.LinkedHashMap[String, Int], - fileChannel: FileChannel - ): OutlineStorage = { - item match { - case _: DefaultValue => null - case null => null - case bools: Array[Boolean] => - write(bools.map { b => if (b) 1.toByte else 0.toByte }, new OutlineStorage(StorageType.Bool), filePtr, fileChannel) - case bytes: Array[Byte] => - write(bytes, new OutlineStorage(StorageType.Byte), filePtr, fileChannel) - case shorts: Array[Short] => - val bytes = new Array[Byte](2 * shorts.length) - ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().put(shorts) - write(bytes, new OutlineStorage(StorageType.Short), filePtr, fileChannel) - case ints: Array[Int] => - val bytes = new Array[Byte](4 * ints.length) - ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(ints) - write(bytes, new OutlineStorage(StorageType.Int), filePtr, fileChannel) - case longs: Array[Long] => - val bytes = new Array[Byte](8 * longs.length) - ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().put(longs) - write(bytes, new OutlineStorage(StorageType.Long), filePtr, fileChannel) - case floats: Array[Float] => - val bytes = new Array[Byte](4 * floats.length) - ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer().put(floats) - write(bytes, new OutlineStorage(StorageType.Float), filePtr, fileChannel) - case doubles: Array[Double] => - val bytes = new Array[Byte](8 * doubles.length) - ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer().put(doubles) - write(bytes, new OutlineStorage(StorageType.Double), filePtr, fileChannel) - case refs: Array[GNode] => - val bytes = new Array[Byte](8 * refs.length) - val buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN) - for (ref <- refs) { - if (ref == null) buf.putLong(0x0000ffffffffffffL) else buf.putLong(ref.seq().toLong + (ref.nodeKind.toLong << 32)) - } - write(bytes, new OutlineStorage(StorageType.Ref), filePtr, fileChannel) - case strings: Array[String] => - val indices = strings.map(insertString(stringPool)) - val bytes = new Array[Byte](4 * strings.length) - ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(indices) - write(bytes, new OutlineStorage(StorageType.String), filePtr, fileChannel) - } } private[flatgraph] def write(bytes: Array[Byte], res: OutlineStorage, filePtr: AtomicLong, fileChannel: FileChannel): OutlineStorage = { @@ -196,8 +305,4 @@ object Serialization { res } - private def insertString(stringPool: mutable.LinkedHashMap[String, Int])(s: String): Int = { - if (s == null) -1 - else stringPool.getOrElseUpdate(s, stringPool.size) - } } diff --git a/core/src/main/scala/flatgraph/storage/ZstdWrapper.scala b/core/src/main/scala/flatgraph/storage/ZstdWrapper.scala index fc4535f7..e32c802a 100644 --- a/core/src/main/scala/flatgraph/storage/ZstdWrapper.scala +++ b/core/src/main/scala/flatgraph/storage/ZstdWrapper.scala @@ -1,14 +1,86 @@ package flatgraph.storage +import com.github.luben.zstd +import com.github.luben.zstd.{ZstdCompressCtx, ZstdDecompressCtx} import org.slf4j.LoggerFactory +import java.nio.{ByteBuffer, ByteOrder} import java.nio.file.{Files, Paths} import scala.jdk.CollectionConverters.* import scala.util.{Properties, Try} +import scala.collection.mutable object ZstdWrapper { private val logger = LoggerFactory.getLogger(getClass) + class ZstdCtx extends AutoCloseable { + var closed = false + val compressCtxs = mutable.ArrayDeque[zstd.ZstdCompressCtx]() + val decompressCtxs = mutable.ArrayDeque[zstd.ZstdDecompressCtx]() + + // cf library documentation: The resulting buffer is ready-to-go, i.e. comes pre-flipped / rewound. + def compress(bytes: Array[Byte]): ByteBuffer = ZstdWrapper.apply { + val ctx0 = this.synchronized { + compressCtxs.removeLastOption() + } + val ctx = ctx0.getOrElse { + val res = new ZstdCompressCtx + res.setLevel(zstd.Zstd.defaultCompressionLevel()) + res + } + try { + + /** So, this allocateDirect thing may look somewhat awkward and is not the fastest. Sorry :( + * + * It is currently necessary, though, because the array-based zstdni methods have a very long JNI critical section, and this can + * lead to OOM-crashes when using G1GC before JVM-22 (until 22, GC is effectively disabled during crit sections; afterwards, only + * specific regions are pinned). + * + * The upstream code is + * https://github.com/luben/zstd-jni/blob/9b39b598313f639c817469cb4de500f767cec453/src/main/native/jni_fast_zstd.c#L488 + * + * We may consider submitting a patch upstream that allocates a temp array and only holds the critical section during a memcopy + * + * Same issue holds for decompress. + * + * The result comes ready-to-read, i.e. is flipped/rewound by the zstd compress function + */ + val buf = ByteBuffer.allocateDirect(bytes.length) + buf.put(bytes) + buf.flip() + ctx.compress(buf) + } finally { + this.synchronized { + if (this.closed) ctx.close() else compressCtxs.append(ctx) + } + } + } + + def decompress(bytes: ByteBuffer, len: Int): ByteBuffer = + ZstdWrapper.apply { + val ctx0 = this.synchronized { + decompressCtxs.removeLastOption() + } + val ctx = ctx0.getOrElse { new ZstdDecompressCtx } + try { + ctx.decompress(bytes, len).order(ByteOrder.LITTLE_ENDIAN) + } finally { + this.synchronized { + if (this.closed) ctx.close() + else decompressCtxs.append(ctx) + } + } + } + + override def close(): Unit = this.synchronized { + compressCtxs.foreach(_.close()) + compressCtxs.clear + decompressCtxs.foreach(_.close()) + decompressCtxs.clear + this.closed = true + } + } + /** zstd-jni ships system libraries that are being unpacked, loaded and executed from the system tmp directory. If that fails we get a * rather obscure error message - this wrapper adds a check if the tmp dir is executable, and enhances the error message if the zstd * invocation fails. diff --git a/odb-convert/src/main/scala/flatgraph/convert/Convert.scala b/odb-convert/src/main/scala/flatgraph/convert/Convert.scala index e1472990..6d484268 100644 --- a/odb-convert/src/main/scala/flatgraph/convert/Convert.scala +++ b/odb-convert/src/main/scala/flatgraph/convert/Convert.scala @@ -1,6 +1,6 @@ package flatgraph.convert -import flatgraph.misc.ISeq +import flatgraph.misc.{ISeq, Misc} import flatgraph.{AccessHelpers, Accessors, Edge, GNode, storage} import flatgraph.storage.{Keys, Manifest, Serialization, StorageType} import org.msgpack.core.{MessageBufferPacker, MessagePack} @@ -132,6 +132,7 @@ object Convert { fileAbsolute.createNewFile() } val fileChannel = new java.io.RandomAccessFile(fileAbsolute, "rw").getChannel + val writer = storage.WriterContext(fileChannel, Misc.maybeOverrideExecutor(None)) try { val nodes = nodeStuff.map { ns => new Manifest.NodeItem(ns.label, ns.nextId, null) } val edges = mutable.ArrayBuffer[Manifest.EdgeItem]() @@ -140,12 +141,12 @@ object Convert { node <- nodeStuff ((prefix, key), quantity) <- node.quantities.iterator } { - val deltaEncoded = quantity.addOne(0).toArray - val qty = Serialization.encodeAny(deltaEncoded, filePtr, null, fileChannel) val (valtyp, vals) = homogenize(node.values((prefix, key))) if (valtyp != null) { - val values = storage.Serialization.encodeAny(vals, filePtr, null, fileChannel) - values.typ = valtyp + val deltaEncoded = quantity.addOne(0).toArray + val qty = writer.encodeAny(deltaEncoded) + val values = writer.encodeAny(vals) + assert(values.typ == valtyp) prefix match { case NodeStuff.NODEPROPERTY => properties.addOne(new Manifest.PropertyItem(node.label, key, qty, values)) @@ -160,47 +161,16 @@ object Convert { case Some(propvalues) => val (ptype, pval) = homogenize(propvalues) if (ptype != null) { - val stored = storage.Serialization.encodeAny(pval, filePtr, null, fileChannel) - stored.typ = ptype + val stored = writer.encodeAny(pval) edgeItem.property = stored } } } } } - val poolLenBytes = new Array[Byte](4 * strings.length) - val poolLenBuffer = ByteBuffer.wrap(poolLenBytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer() - val poolBytes = new ByteArrayOutputStream() - for (s <- strings) { - val bytes = s.getBytes(StandardCharsets.UTF_8) - poolBytes.write(bytes) - poolLenBuffer.put(bytes.length) - } - val poolLensStored = new Manifest.OutlineStorage(StorageType.Int) - val poolBytesStored = new Manifest.OutlineStorage(storage.StorageType.Byte) - storage.Serialization.write(poolLenBytes, poolLensStored, filePtr, fileChannel) - storage.Serialization.write(poolBytes.toByteArray, poolBytesStored, filePtr, fileChannel) - - var pos = filePtr.get() - val header = new Array[Byte](16) - val headerBuf = ByteBuffer.wrap(header) - headerBuf.order(ByteOrder.LITTLE_ENDIAN).put(Keys.Header).asLongBuffer().put(pos) - headerBuf.position(0) - var headPos = 0L - while (headerBuf.hasRemaining()) { - headPos += fileChannel.write(headerBuf, headPos) - } - val manifest = new Manifest.GraphItem(nodes.toArray, edges.toArray, properties.toArray, poolLensStored, poolBytesStored) - val manifestObj = Manifest.GraphItem.write(manifest) - if (verbose) { - println(manifestObj.render(indent = 2)) - } - val buf = ByteBuffer.wrap(manifestObj.render().getBytes(StandardCharsets.UTF_8)) - while (buf.hasRemaining()) { - pos += fileChannel.write(buf, pos) - } - fileChannel.truncate(pos) - } finally { fileChannel.close() } + val manifest = new Manifest.GraphItem(nodes.toArray, edges.toArray, properties.toArray) + writer.finish(manifest) + } finally { fileChannel.close(); writer.compressCtx.close(); } } private def homogenize(items: mutable.ArrayBuffer[Any]): (String, Array[?]) = {