Skip to content

Commit

Permalink
Release 0.9.9.8: Revert lock tracking; Add TSP.toString; Delay shutdo…
Browse files Browse the repository at this point in the history
…wn after lock issue
  • Loading branch information
vishramachandran authored Nov 4, 2020
2 parents a6afc04 + 4ef0160 commit 97e4087
Show file tree
Hide file tree
Showing 14 changed files with 35 additions and 38 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/filodb.core/query/RangeVector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ object SerializedRangeVector extends StrictLogging {
val oldContainerOpt = builder.currentContainer
val startRecordNo = oldContainerOpt.map(_.numRecords).getOrElse(0)
try {
ChunkMap.validateNoSharedLocks(execPlanName)
ChunkMap.validateNoSharedLocks()
val rows = rv.rows
while (rows.hasNext) {
numRows += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class RangeVectorSpec extends AnyFunSpec with Matchers {
val builder = SerializedRangeVector.newBuilder()

// Sharing one builder across multiple input RangeVectors
val srvs = rvs.map(rv => SerializedRangeVector(rv, builder, schema, "RangeVectorSpec"))
val srvs = rvs.map(rv => SerializedRangeVector(rv, builder, schema, "Unit-test"))

// Now verify each of them
val observedTs = srvs(0).rows.toSeq.map(_.getLong(0))
Expand Down
37 changes: 13 additions & 24 deletions memory/src/main/scala/filodb.memory/data/ChunkMap.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package filodb.memory.data

import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.{HashMap, Map}
import scala.concurrent.duration._

Expand Down Expand Up @@ -61,13 +59,6 @@ object ChunkMap extends StrictLogging {
override def initialValue() = new HashMap[ChunkMap, Int]
}

/**
* FIXME: Remove this after debugging is done.
* This keeps track of which thread is running which execPlan.
* Entry is added on lock acquisition, removed when lock is released.
*/
private val execPlanTracker = new ConcurrentHashMap[Thread, String]

// Returns true if the current thread has acquired the shared lock at least once.
private def hasSharedLock(inst: ChunkMap): Boolean = sharedLockCounts.get.contains(inst)

Expand All @@ -93,7 +84,6 @@ object ChunkMap extends StrictLogging {
*/
//scalastyle:off null
def releaseAllSharedLocks(): Int = {
execPlanTracker.remove(Thread.currentThread())
var total = 0
val countMap = sharedLockCounts.get
if (countMap != null) {
Expand All @@ -120,19 +110,23 @@ object ChunkMap extends StrictLogging {
* consumption from a query iterator. If there are lingering locks,
* it is quite possible a lock acquire or release bug exists
*/
def validateNoSharedLocks(execPlan: String, unitTest: Boolean = false): Unit = {
val t = Thread.currentThread()
if (execPlanTracker.containsKey(t)) {
logger.error(s"Current thread ${t.getName} did not release lock for execPlan: ${execPlanTracker.get(t)}")
def validateNoSharedLocks(unitTest: Boolean = false): Unit = {
// Count up the number of held locks.
var total = 0
val countMap = sharedLockCounts.get
if (countMap != null) {
for ((inst, amt) <- countMap) {
if (amt > 0) {
total += amt
}
}
}

val numLocksReleased = ChunkMap.releaseAllSharedLocks()
if (numLocksReleased > 0) {
val ex = new RuntimeException(s"Number of locks was non-zero: $numLocksReleased. " +
if (total > 10) { // lenient check for now
val ex = new RuntimeException(s"Number of locks lingering: $total. " +
s"This is indicative of a possible lock acquisition/release bug.")
Shutdown.haltAndCatchFire(ex)
}
execPlanTracker.put(t, execPlan)
}

}
Expand Down Expand Up @@ -258,7 +252,6 @@ class ChunkMap(val memFactory: MemFactory, var capacity: Int) {
var warned = false

// scalastyle:off null
var locks1: ConcurrentHashMap[Thread, String] = null
while (true) {
if (tryAcquireExclusive(timeoutNanos)) {
if (arrayPtr == 0) {
Expand All @@ -279,14 +272,10 @@ class ChunkMap(val memFactory: MemFactory, var capacity: Int) {
}
exclusiveLockWait.increment()
_logger.warn(s"Waiting for exclusive lock: $this")
locks1 = new ConcurrentHashMap[Thread, String](execPlanTracker)
warned = true
} else if (warned && timeoutNanos >= MaxExclusiveRetryTimeoutNanos) {
val locks2 = new ConcurrentHashMap[Thread, String](execPlanTracker)
locks2.entrySet().retainAll(locks1.entrySet())
val lockState = UnsafeUtils.getIntVolatile(this, lockStateOffset)
Shutdown.haltAndCatchFire(new RuntimeException(s"Following execPlan locks have not been released for a" +
s"while: $locks2 $locks1 $execPlanTracker $lockState"))
Shutdown.haltAndCatchFire(new RuntimeException(s"Unable to acquire exclusive lock: $lockState"))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion query/src/main/scala/filodb/query/exec/ExecPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ trait ExecPlan extends QueryCommand {
srv
case rv: RangeVector =>
// materialize, and limit rows per RV
val srv = SerializedRangeVector(rv, builder, recSchema, printTree(false))
val srv = SerializedRangeVector(rv, builder, recSchema, getClass.getSimpleName, span)
numResultSamples += srv.numRowsInt
// fail the query instead of limiting range vectors and returning incomplete/inaccurate results
if (enforceLimit && numResultSamples > queryContext.sampleLimit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ case class MetadataRemoteExec(queryEndpoint: String,
val rangeVector = IteratorBackedRangeVector(new CustomRangeVectorKey(Map.empty),
new UTF8MapIteratorRowReader(iteratorMap.toIterator))

val srvSeq = Seq(SerializedRangeVector(rangeVector, builder, recordSchema, printTree(false)))
val srvSeq = Seq(SerializedRangeVector(rangeVector, builder, recordSchema, this.getClass.getSimpleName, span))

span.finish()
QueryResult(id, resultSchema, srvSeq)
Expand Down
4 changes: 2 additions & 2 deletions query/src/main/scala/filodb/query/exec/PromQlRemoteExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ case class PromQlRemoteExec(queryEndpoint: String,
override def numRows: Option[Int] = Option(samples.size)

}
SerializedRangeVector(rv, builder, defaultRecSchema, printTree(false))
SerializedRangeVector(rv, builder, defaultRecSchema, "PromQlRemoteExec-default")
// TODO: Handle stitching with verbose flag
}
QueryResult(id, defaultResultSchema, rangeVectors)
Expand Down Expand Up @@ -196,7 +196,7 @@ case class PromQlRemoteExec(queryEndpoint: String,
override def numRows: Option[Int] = Option(samples.size)

}
SerializedRangeVector(rv, builder, histRecSchema, printTree(false))
SerializedRangeVector(rv, builder, histRecSchema, "PromQlRemoteExec-hist")
// TODO: Handle stitching with verbose flag
}
QueryResult(id, histResultSchema, rangeVectors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ final case class SortFunctionMapper(function: SortFunctionId) extends RangeVecto

// Create SerializedRangeVector so that sorting does not consume rows iterator
val resultRv = source.toListL.map { rvs =>
rvs.map(SerializedRangeVector(_, builder, recSchema, s"SortRangeVectorTransformer: $args")).
rvs.map(SerializedRangeVector(_, builder, recSchema, getClass.getSimpleName)).
sortBy { rv => if (rv.rows.hasNext) rv.rows.next().getDouble(1) else Double.NaN
}(ordering)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package filodb.query.exec.aggregator

import scala.collection.mutable

import kamon.Kamon

import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.memstore.FiloSchedulers
import filodb.core.memstore.FiloSchedulers.QuerySchedName
Expand Down Expand Up @@ -86,10 +88,11 @@ class CountValuesRowAggregator(label: String, limit: Int = 1000) extends RowAggr
ColumnInfo("value", ColumnType.DoubleColumn))
val recSchema = SerializedRangeVector.toSchema(colSchema)
val resRvs = mutable.Map[RangeVectorKey, RecordBuilder]()
val span = Kamon.spanBuilder(s"execplan-scan-latency-TopBottomK").start()
try {
FiloSchedulers.assertThreadName(QuerySchedName)
// aggRangeVector.rows.take below triggers the ChunkInfoIterator which requires lock/release
ChunkMap.validateNoSharedLocks(s"CountValues-$label")
ChunkMap.validateNoSharedLocks()
aggRangeVector.rows.take(limit).foreach { row =>
val rowMap = CountValuesSerDeser.deserialize(row.getBlobBase(1),
row.getBlobNumBytes(1), row.getBlobOffset(1))
Expand All @@ -108,6 +111,7 @@ class CountValuesRowAggregator(label: String, limit: Int = 1000) extends RowAggr
aggRangeVector.rows.close()
ChunkMap.releaseAllSharedLocks()
}
span.finish()
resRvs.map { case (key, builder) =>
val numRows = builder.allContainers.map(_.countRecords()).sum
new SerializedRangeVector(key, numRows, builder.allContainers, recSchema, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package filodb.query.exec.aggregator

import scala.collection.mutable

import kamon.Kamon

import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.memstore.FiloSchedulers
import filodb.core.memstore.FiloSchedulers.QuerySchedName
Expand Down Expand Up @@ -86,9 +88,10 @@ class TopBottomKRowAggregator(k: Int, bottomK: Boolean) extends RowAggregator {
ColumnInfo("value", ColumnType.DoubleColumn))
val recSchema = SerializedRangeVector.toSchema(colSchema)
val resRvs = mutable.Map[RangeVectorKey, RecordBuilder]()
val span = Kamon.spanBuilder(s"execplan-scan-latency-TopBottomK").start()
try {
FiloSchedulers.assertThreadName(QuerySchedName)
ChunkMap.validateNoSharedLocks(s"TopkQuery-$k-$bottomK")
ChunkMap.validateNoSharedLocks()
// We limit the results wherever it is materialized first. So it is done here.
aggRangeVector.rows.take(limit).foreach { row =>
var i = 1
Expand All @@ -108,6 +111,7 @@ class TopBottomKRowAggregator(k: Int, bottomK: Boolean) extends RowAggregator {
aggRangeVector.rows().close()
ChunkMap.releaseAllSharedLocks()
}
span.finish()
resRvs.map { case (key, builder) =>
val numRows = builder.allContainers.map(_.countRecords).sum
new SerializedRangeVector(key, numRows, builder.allContainers, recSchema, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class AggrOverRangeVectorsSpec extends RawDataWindowingSpec with ScalaFutures {
val recSchema = SerializedRangeVector.toSchema(Seq(ColumnInfo("timestamp", ColumnType.TimestampColumn),
ColumnInfo("tdig", ColumnType.StringColumn)))
val builder = SerializedRangeVector.newBuilder()
val srv = SerializedRangeVector(result7(0), builder, recSchema, "AggrOverRangeVectorsSpec")
val srv = SerializedRangeVector(result7(0), builder, recSchema, "Unit-Test")

val resultObs7b = RangeVectorAggregator.present(agg7, Observable.now(srv), 1000)
val finalResult = resultObs7b.toListL.runAsync.futureValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class InProcessPlanDispatcherSpec extends AnyFunSpec
}

after {
ChunkMap.validateNoSharedLocks("InProcessPlanDispatcherSpec", true)
ChunkMap.validateNoSharedLocks(true)
}

override def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.scalatest.matchers.should.Matchers

class AbsentFunctionSpec extends AnyFunSpec with Matchers with ScalaFutures with BeforeAndAfter {
after {
ChunkMap.validateNoSharedLocks("AbsentFunctionSpec", true)
ChunkMap.validateNoSharedLocks(true)
}

val config: Config = ConfigFactory.load("application_test.conf").getConfig("filodb")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait RawDataWindowingSpec extends AnyFunSpec with Matchers with BeforeAndAfter
protected val tsBufferPool2 = new WriteBufferPool(TestData.nativeMem, downsampleSchema.data, storeConf)

after {
ChunkMap.validateNoSharedLocks(getClass().toString(), true)
ChunkMap.validateNoSharedLocks(true)
}

override def afterAll(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.9.9.7"
version in ThisBuild := "0.9.9.8"

0 comments on commit 97e4087

Please sign in to comment.