From 2dc93ec8f417fda86ba6f0921e3c806ae94b7a0d Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Fri, 17 Jan 2020 16:26:55 -0800 Subject: [PATCH 1/2] fix(gateway): Skip empty histogram records in gateway (#600) Empty histograms cannot be ingested and queried properly as they don't have a schema. * feat(core): RecordBuilder resiliency: auto rewind on partial record; handle empty buckets during encoding --- .../binaryrecord2/RecordBuilder.scala | 19 +++++++++++- .../gateway/conversion/InfluxRecord.scala | 1 + .../gateway/conversion/InputRecord.scala | 30 ++++++++++--------- .../conversion/InputRecordBuilderSpec.scala | 19 ++++++++---- .../format/vectors/HistogramVector.scala | 9 ++++-- 5 files changed, 55 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/filodb.core/binaryrecord2/RecordBuilder.scala b/core/src/main/scala/filodb.core/binaryrecord2/RecordBuilder.scala index 2196445bc3..ad49b7972f 100644 --- a/core/src/main/scala/filodb.core/binaryrecord2/RecordBuilder.scala +++ b/core/src/main/scala/filodb.core/binaryrecord2/RecordBuilder.scala @@ -64,6 +64,23 @@ class RecordBuilder(memFactory: MemFactory, recHash = -1 } + /** + * If somehow the state is inconsistent, and only a partial record is written, + * rewind the curRecordOffset back to the curRecEndOffset. In other words, rewind the write pointer + * back to the end of previous record. Partially written data is lost, but state is consistent again. + */ + def rewind(): Unit = { + curRecEndOffset = curRecordOffset + } + + // Check that we are at end of a record. If a partial record is written, just rewind so state is not inconsistent. + private def checkPointers(): Unit = { + if (curRecEndOffset != curRecordOffset) { + logger.warn(s"Partial record was written, perhaps exception occurred. Rewinding to end of previous record.") + rewind() + } + } + // Only reset the container offsets, but not the fieldNo, mapOffset, recHash private def resetContainerPointers(): Unit = { curRecordOffset = containers.last.offset + ContainerHeaderLen @@ -87,7 +104,7 @@ class RecordBuilder(memFactory: MemFactory, * for partition keys. However for ingestion records it would be the same. */ private[core] final def startNewRecord(recSchema: RecordSchema, schemaID: Int): Unit = { - require(curRecEndOffset == curRecordOffset, s"Illegal state: $curRecEndOffset != $curRecordOffset") + checkPointers() // Set schema, hashoffset, and write schema ID if needed setSchema(recSchema) diff --git a/gateway/src/main/scala/filodb/gateway/conversion/InfluxRecord.scala b/gateway/src/main/scala/filodb/gateway/conversion/InfluxRecord.scala index 407d177a51..cef25c559f 100644 --- a/gateway/src/main/scala/filodb/gateway/conversion/InfluxRecord.scala +++ b/gateway/src/main/scala/filodb/gateway/conversion/InfluxRecord.scala @@ -238,6 +238,7 @@ final case class InfluxHistogramRecord(bytes: Array[Byte], InfluxProtocolParser.parseKeyValues(bytes, fieldDelims, fieldEnd, visitor) // Only create histogram record if we are able to parse above and it contains +Inf bucket + // This also ensures that it's not a blank histogram, which cannot be ingested if (visitor.gotInf) { val buckets = CustomBuckets(visitor.bucketTops) val hist = LongHistogram(buckets, visitor.bucketVals) diff --git a/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala b/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala index d03652329e..fb2b994c31 100644 --- a/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala +++ b/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala @@ -106,20 +106,22 @@ object InputRecord { case (k, v) => (k.toDouble, v.toLong) }.sorted - // Built up custom histogram objects and scheme, then encode - val buckets = CustomBuckets(sortedBuckets.map(_._1).toArray) - val hist = LongHistogram(buckets, sortedBuckets.map(_._2).toArray) - - // Now, write out histogram - builder.startNewRecord(promHistogram) - builder.addLong(timestamp) - builder.addDouble(sum) - builder.addDouble(count) - builder.addBlob(hist.serialize()) - - builder.addString(metric) - builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) }) - builder.endRecord() + if (sortedBuckets.nonEmpty) { + // Built up custom histogram objects and scheme, then encode + val buckets = CustomBuckets(sortedBuckets.map(_._1).toArray) + val hist = LongHistogram(buckets, sortedBuckets.map(_._2).toArray) + + // Now, write out histogram + builder.startNewRecord(promHistogram) + builder.addLong(timestamp) + builder.addDouble(sum) + builder.addDouble(count) + builder.addBlob(hist.serialize()) + + builder.addString(metric) + builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) }) + builder.endRecord() + } } } diff --git a/gateway/src/test/scala/filodb/gateway/conversion/InputRecordBuilderSpec.scala b/gateway/src/test/scala/filodb/gateway/conversion/InputRecordBuilderSpec.scala index 8ce1517141..1fb3212bfc 100644 --- a/gateway/src/test/scala/filodb/gateway/conversion/InputRecordBuilderSpec.scala +++ b/gateway/src/test/scala/filodb/gateway/conversion/InputRecordBuilderSpec.scala @@ -15,19 +15,19 @@ class InputRecordBuilderSpec extends FunSpec with Matchers { "shard" -> "0") val metric = "my_hist" + val counts = Array(10L, 20L, 25, 38, 50, 66) + val sum = counts.sum.toDouble + val count = 50.0 + val sumCountKVs = Seq("sum" -> sum, "count" -> count) + it("should writePromHistRecord to BR and be able to deserialize it") { val buckets = Array(0.5, 1, 2.5, 5, 10, Double.PositiveInfinity) - val counts = Array(10L, 20L, 25, 38, 50, 66) val expected = LongHistogram(CustomBuckets(buckets), counts) - val sum = counts.sum.toDouble - val count = 50.0 val bucketKVs = buckets.zip(counts).map { case (Double.PositiveInfinity, c) => "+Inf" -> c.toDouble case (b, c) => b.toString -> c.toDouble }.toSeq - val sumCountKVs = Seq("sum" -> sum, "count" -> count) - // 1 - sum/count at end InputRecord.writePromHistRecord(builder, metric, baseTags, 100000L, bucketKVs ++ sumCountKVs) builder.allContainers.head.iterate(Schemas.promHistogram.ingestionSchema).foreach { row => @@ -36,4 +36,13 @@ class InputRecordBuilderSpec extends FunSpec with Matchers { row.getHistogram(3) shouldEqual expected } } + + it("should skip empty histograms via writePromHistRecord, and write subsequent records") { + builder.reset() + InputRecord.writePromHistRecord(builder, metric, baseTags, 100000L, sumCountKVs) + InputRecord.writeGaugeRecord(builder, metric, baseTags, 100000L, 5.5) + + // The empty histogram should have been skipped, so we should have only one record + builder.allContainers.head.countRecords shouldEqual 1 + } } \ No newline at end of file diff --git a/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala b/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala index 663e9d15a7..ada02ceb5a 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala @@ -139,13 +139,16 @@ object BinaryHistogram extends StrictLogging { val formatCode = buckets match { case g: GeometricBuckets if g.minusOne => HistFormat_Geometric1_Delta case g: GeometricBuckets => HistFormat_Geometric_Delta + case c: CustomBuckets if c.numBuckets == 0 => HistFormat_Null case c: CustomBuckets => HistFormat_Custom_Delta } buf.putByte(2, formatCode) - val valuesIndex = buckets.serialize(buf, 3) - val finalPos = NibblePack.packDelta(values, buf, valuesIndex) - + val finalPos = if (formatCode == HistFormat_Null) { 3 } + else { + val valuesIndex = buckets.serialize(buf, 3) + NibblePack.packDelta(values, buf, valuesIndex) + } require(finalPos <= 65535, s"Histogram data is too large: $finalPos bytes needed") buf.putShort(0, (finalPos - 2).toShort) finalPos From 1f7787a63313b4d2261ddf8a1e0ea92caba83064 Mon Sep 17 00:00:00 2001 From: Jackson Jeyapaul Date: Mon, 27 Jan 2020 11:19:01 -0800 Subject: [PATCH 2/2] Update integration version to 0.9.3-integration (#610) --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index b2f1da052f..81bac7b745 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.9.2.integration-SNAPSHOT" +version in ThisBuild := "0.9.3.integration-SNAPSHOT"