Skip to content

Commit

Permalink
Merge pull request #163 from katholen/timezone-fix
Browse files Browse the repository at this point in the history
Fix timezone issue
  • Loading branch information
Jeff Baker authored Jul 5, 2018
2 parents a1692f0 + 2ae5a30 commit 0543906
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.expedia.www.haystack.trace.indexer.writers.es

import java.util.concurrent.Semaphore
import java.util.{Calendar, Locale}
import java.util.{Calendar, Locale, TimeZone}

import com.expedia.open.tracing.buffer.SpanBuffer
import com.expedia.www.haystack.commons.metrics.MetricsSupport
Expand All @@ -38,6 +38,20 @@ import org.slf4j.LoggerFactory

import scala.util.Try

object ElasticSearchWriterUtils {

private val timezone = TimeZone.getTimeZone("UTC")
private val format = FastDateFormat.getInstance("yyyy-MM-dd", timezone, Locale.US)

// creates an index name based on current date. following example illustrates the naming convention of
// elastic search indices:
// haystack-span-2017-08-30
def indexName(prefix: String, indexHourBucket: Int): String = {
val currentTime = Calendar.getInstance(timezone)
val bucket: Int = currentTime.get(Calendar.HOUR_OF_DAY) / indexHourBucket
s"$prefix-${format.format(currentTime.getTime)}-$bucket"
}
}
class ElasticSearchWriter(esConfig: ElasticSearchConfiguration, indexConf: WhitelistIndexFieldConfiguration)
extends TraceWriter with MetricsSupport {

Expand All @@ -55,8 +69,6 @@ class ElasticSearchWriter(esConfig: ElasticSearchConfiguration, indexConf: White
// this semaphore controls the parallel writes to cassandra
private val inflightRequestsSemaphore = new Semaphore(esConfig.maxInFlightBulkRequests, true)

private val format = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US)

// initialize the elastic search client
private val esClient: JestClient = {
LOGGER.info("Initializing the http elastic search client with endpoint={}", esConfig.endpoint)
Expand Down Expand Up @@ -96,7 +108,7 @@ class ElasticSearchWriter(esConfig: ElasticSearchConfiguration, indexConf: White
var isSemaphoreAcquired = false

try {
addIndexOperation(traceId, packedSpanBuffer.protoObj, indexName(), isLastSpanBuffer) match {
addIndexOperation(traceId, packedSpanBuffer.protoObj, ElasticSearchWriterUtils.indexName(esConfig.indexNamePrefix, esConfig.indexHourBucket), isLastSpanBuffer) match {
case Some(bulkToDispatch) =>
inflightRequestsSemaphore.acquire()
isSemaphoreAcquired = true
Expand Down Expand Up @@ -146,13 +158,4 @@ class ElasticSearchWriter(esConfig: ElasticSearchConfiguration, indexConf: White
throw new RuntimeException(s"Fail to apply the following template to elastic search with reason=${result.getErrorMessage}")
}
}

// creates an index name based on current date. following example illustrates the naming convention of
// elastic search indices:
// haystack-span-2017-08-30
private def indexName(): String = {
val currentTime = Calendar.getInstance
val bucket: Int = currentTime.get(Calendar.HOUR_OF_DAY) / esConfig.indexHourBucket
s"${esConfig.indexNamePrefix}-${format.format(currentTime.getTime)}-$bucket"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2017 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expedia.www.haystack.trace.indexer.unit

import com.expedia.www.haystack.trace.indexer.writers.es.ElasticSearchWriterUtils
import org.scalatest.{BeforeAndAfterEach, FunSpec, GivenWhenThen, Matchers}

class ElasticSearchWriterUtilsSpec extends FunSpec with Matchers with GivenWhenThen with BeforeAndAfterEach {
var timezone: String = _

override def beforeEach() {
timezone = System.getProperty("user.timezone")
System.setProperty("user.timezone", "CST")
}

override def afterEach(): Unit = {
System.setProperty("user.timezone", timezone)
}

describe("elastic search writer") {
it("should use UTC when generating ES indexes") {
Given("the system timezone is not UTC")
System.setProperty("user.timezone", "CST")

When("the writer generates the ES indexes")
val cstName = ElasticSearchWriterUtils.indexName("haystack-traces", 6)
System.setProperty("user.timezone", "UTC")
val utcName = ElasticSearchWriterUtils.indexName("haystack-traces", 6)

Then("it should use UTC to get those indexes")
cstName shouldBe utcName
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.expedia.www.haystack.trace.reader.stores.readers.es.query

import java.text.SimpleDateFormat
import java.util.Date
import java.util.{Date, TimeZone}

import com.expedia.open.tracing.api.Operand.OperandCase
import com.expedia.open.tracing.api.{ExpressionTree, Field}
Expand All @@ -35,6 +35,8 @@ import org.elasticsearch.search.aggregations.support.ValueType
import scala.collection.JavaConverters._

abstract class QueryGenerator(nestedDocName: String, indexConfiguration: WhitelistIndexFieldConfiguration) {
private final val TIME_ZONE = TimeZone.getTimeZone("UTC")

// create search query by using filters list
protected def createFilterFieldBasedQuery(filterFields: java.util.List[Field]): BoolQueryBuilder = {
val traceContextWhitelistFields = indexConfiguration.globalTraceContextIndexFieldNames
Expand Down Expand Up @@ -139,15 +141,20 @@ abstract class QueryGenerator(nestedDocName: String, indexConfiguration: Whiteli
for (datetimeInMicros <- flooredStarttime to flooredEndtime by INDEX_BUCKET_TIME_IN_MICROS)
yield {
val date = new Date(datetimeInMicros / 1000)

val dateBucket = new SimpleDateFormat("yyyy-MM-dd").format(date)
val hourBucket = new SimpleDateFormat("HH").format(date).toInt / indexHourBucket
val dateBucket = createSimpleDateFormat("yyyy-MM-dd").format(date)
val hourBucket = createSimpleDateFormat("HH").format(date).toInt / indexHourBucket

s"$indexNamePrefix-$dateBucket-$hourBucket"
}
}
}

private def createSimpleDateFormat(pattern: String): SimpleDateFormat = {
val sdf = new SimpleDateFormat(pattern)
sdf.setTimeZone(TIME_ZONE)
sdf
}

private def isValidTimeRange(startTimeInMicros: Long,
endTimeInMicros: Long,
indexHourTtl: Int): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,31 @@
*/
package com.expedia.www.haystack.trace.reader.unit.stores.readers.es.query

import com.expedia.open.tracing.api.ExpressionTree.Operator
import com.expedia.open.tracing.api.{ExpressionTree, Field, Operand, TracesSearchRequest}
import com.expedia.open.tracing.api.{Field, TracesSearchRequest}
import com.expedia.www.haystack.trace.commons.clients.es.document.TraceIndexDoc
import com.expedia.www.haystack.trace.commons.config.entities.WhitelistIndexFieldConfiguration
import com.expedia.www.haystack.trace.reader.config.entities.ElasticSearchConfiguration
import com.expedia.www.haystack.trace.reader.stores.readers.es.ESUtils._
import com.expedia.www.haystack.trace.reader.stores.readers.es.query.TraceSearchQueryGenerator
import com.expedia.www.haystack.trace.reader.unit.BaseUnitTestSpec
import com.expedia.www.haystack.trace.reader.unit.stores.readers.es.query.helper.ExpressionTreeBuilder
import com.expedia.www.haystack.trace.reader.unit.stores.readers.es.query.helper.ExpressionTreeBuilder._
import io.searchbox.core.Search
import org.scalatest.BeforeAndAfterEach

class TraceSearchQueryGeneratorSpec extends BaseUnitTestSpec {
class TraceSearchQueryGeneratorSpec extends BaseUnitTestSpec with BeforeAndAfterEach {
private val esConfig = ElasticSearchConfiguration("endpoint", None, None, "haystack-traces", "spans", 5000, 5000, 6, 72, false)

var timezone: String = _

override def beforeEach() {
timezone = System.getProperty("user.timezone")
System.setProperty("user.timezone", "CST")
}

override def afterEach(): Unit = {
System.setProperty("user.timezone", timezone)
}

describe("TraceSearchQueryGenerator") {
it("should generate valid search queries") {
Given("a trace search request")
Expand Down Expand Up @@ -111,5 +121,16 @@ class TraceSearchQueryGeneratorSpec extends BaseUnitTestSpec {
Then("generate a valid query with fields in lowercase")
query.toJson.contains(fieldKey.toLowerCase()) should be(true)
}

it("should use UTC when determining which indexes to read") {
Given("the system timezone is NOT UTC")
System.setProperty("user.timezone", "CST")

When("getting the indexes")
val esIndexes = new TraceSearchQueryGenerator(esConfig, "spans", new WhitelistIndexFieldConfiguration).getESIndexes(1530806291394000L, 1530820646394000L, "haystack-traces", 4, 24)

Then("they are correct based off of UTC")
esIndexes shouldBe Vector("haystack-traces-2018-07-05-3", "haystack-traces-2018-07-05-4")
}
}
}

0 comments on commit 0543906

Please sign in to comment.