Skip to content

Commit

Permalink
Multi tenancy (#72)
Browse files Browse the repository at this point in the history
* adding decorators updating the proto span extractor

* adding init commit for haystack collector multi ten

* added tenant configuration

* adding default tenant-id tag as shared in case no defaults found in additional tags configuration

* creating kafka config

* adding plugin configuration

* passing plugin configuration

* loading multiple external decorators

* plugin config is now a list within plugin

* making the single sink dispatch to multiple kafka clusters

* updating kafka record sink for multiple kafka

* fixed tests

* modifying according to new config contract and creating tests

* resolving minor bugs

* adding pluginsloader tests

* removing need to add -Djava.ext.dirs

* tests failing intermittently plausible fix

* trying to fix kafka issues

* adding sample span decorator for user ref and integration testing

* fixing bug closing the kafka producer

* updating version for collector etc

* reverting version bump because of scalactic tripleequals issue

* refactoring and renaming

* updating tests

* deleting submodule haystack-idl
  • Loading branch information
doctorXWrites authored and ashishagg committed May 30, 2019
1 parent 8ba7b56 commit 38d99df
Show file tree
Hide file tree
Showing 43 changed files with 1,223 additions and 96 deletions.
52 changes: 45 additions & 7 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,48 @@
### Maven ###
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties

# Avoid ignoring Maven wrapper jar file (.jar files are usually ignored)
!/.mvn/wrapper/maven-wrapper.jar

### Java ###

# Compiled class file
*.class
*.iml

# Log file
*.log
.classpath
.project
logs/
target/

# BlueJ files
*.ctxt

# Mobile Tools for Java (J2ME)
.mtj.tmp/

# Package Files #
*.jar
*.war
*.ear
*.zip
*.tar.gz
*.rar

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

# IDE
.idea/
kinesis/build/integration-tests/scripts/node_modules/
package-lock.json
*.iml
*.ipr
*.iws
*.classpath
*.project
*.settings
*.DS_Store
4 changes: 0 additions & 4 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +0,0 @@
[submodule "haystack-idl"]
path = haystack-idl
url = https://github.com/ExpediaDotCom/haystack-idl.git
branch = master
Binary file added .mvn/wrapper/maven-wrapper.jar
Binary file not shown.
1 change: 1 addition & 0 deletions .mvn/wrapper/maven-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip
30 changes: 7 additions & 23 deletions commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>com.expedia.www</groupId>
<artifactId>haystack-span-decorators</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down Expand Up @@ -43,29 +50,6 @@

<build>
<plugins>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.3.0.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<includeDirectories>
<include>${project.basedir}/../haystack-idl/proto</include>
</includeDirectories>
<inputDirectories>
<include>${project.basedir}/../haystack-idl/proto</include>
</inputDirectories>
<outputDirectory>target/generated-sources</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,14 @@ import java.time.temporal.ChronoUnit
import java.util.concurrent.ConcurrentHashMap

import com.expedia.open.tracing.Span
import com.expedia.www.haystack.collector.commons.ProtoSpanExtractor.DurationIsInvalid
import com.expedia.www.haystack.collector.commons.ProtoSpanExtractor.OperationNameIsRequired
import com.expedia.www.haystack.collector.commons.ProtoSpanExtractor.ServiceNameIsRequired
import com.expedia.www.haystack.collector.commons.ProtoSpanExtractor.SmallestAllowedStartTimeMicros
import com.expedia.www.haystack.collector.commons.ProtoSpanExtractor.SpanIdIsRequired
import com.expedia.www.haystack.collector.commons.ProtoSpanExtractor.StartTimeIsInvalid
import com.expedia.www.haystack.collector.commons.ProtoSpanExtractor.TraceIdIsRequired
import com.expedia.www.haystack.collector.commons.config.ExtractorConfiguration
import com.expedia.www.haystack.collector.commons.config.Format
import com.expedia.www.haystack.collector.commons.record.KeyValueExtractor
import com.expedia.www.haystack.collector.commons.record.KeyValuePair
import com.expedia.www.haystack.collector.commons.ProtoSpanExtractor._
import com.expedia.www.haystack.collector.commons.config.{ExtractorConfiguration, Format}
import com.expedia.www.haystack.collector.commons.record.{KeyValueExtractor, KeyValuePair}
import com.expedia.www.haystack.span.decorators.SpanDecorator
import com.google.protobuf.util.JsonFormat
import org.slf4j.Logger

import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.{Failure, Success, Try}

object ProtoSpanExtractor {
private val DaysInYear1970 = 365
Expand All @@ -60,7 +51,7 @@ object ProtoSpanExtractor {
}

class ProtoSpanExtractor(extractorConfiguration: ExtractorConfiguration,
val LOGGER: Logger)
val LOGGER: Logger, spanDecorators: List[SpanDecorator])
extends KeyValueExtractor with MetricsSupport {

private val printer = JsonFormat.printer().omittingInsignificantWhitespace()
Expand Down Expand Up @@ -141,9 +132,11 @@ class ProtoSpanExtractor(extractorConfiguration: ExtractorConfiguration,
match {
case Success(span) =>
validSpanMeter.mark()

val updatedSpan = decorateSpan(span)
val kvPair = extractorConfiguration.outputFormat match {
case Format.JSON => KeyValuePair(span.getTraceId.getBytes, printer.print(span).getBytes(Charset.forName("UTF-8")))
case Format.PROTO => KeyValuePair(span.getTraceId.getBytes, recordBytes)
case Format.JSON => KeyValuePair(updatedSpan.getTraceId.getBytes, printer.print(span).getBytes(Charset.forName("UTF-8")))
case Format.PROTO => KeyValuePair(updatedSpan.getTraceId.getBytes, updatedSpan.toByteArray)
}
List(kvPair)

Expand All @@ -156,4 +149,16 @@ class ProtoSpanExtractor(extractorConfiguration: ExtractorConfiguration,
Nil
}
}

private def decorateSpan(span: Span): Span = {
if (spanDecorators.isEmpty) {
return span
}

var spanBuilder = span.toBuilder
spanDecorators.foreach(decorator => {
spanBuilder = decorator.decorate(spanBuilder)
})
spanBuilder.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.expedia.www.haystack.collector.commons

import com.expedia.www.haystack.span.decorators.plugin.config.Plugin
import com.expedia.www.haystack.span.decorators.plugin.loader.SpanDecoratorPluginLoader
import com.expedia.www.haystack.span.decorators.{AdditionalTagsSpanDecorator, SpanDecorator}
import com.typesafe.config.ConfigFactory
import org.slf4j.Logger

import scala.collection.JavaConverters._

object SpanDecoratorFactory {
def get(pluginConfig: Plugin, additionalTagsConfig: Map[String, String], LOGGER: Logger): List[SpanDecorator] = {
var tempList = List[SpanDecorator]()
if (pluginConfig != null) {
val externalSpanDecorators: List[SpanDecorator] = SpanDecoratorPluginLoader.getInstance(LOGGER, pluginConfig).getSpanDecorators().asScala.toList
if (externalSpanDecorators != null) {
tempList = tempList ++: externalSpanDecorators
}
}

val additionalTagsSpanDecorator = new AdditionalTagsSpanDecorator()
additionalTagsSpanDecorator.init(ConfigFactory.parseMap(additionalTagsConfig.asJava))
tempList.::(additionalTagsSpanDecorator)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package com.expedia.www.haystack.collector.commons.config

import java.io.File
import java.util
import java.util.Properties

import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueType}
import com.expedia.www.haystack.span.decorators.plugin.config.{Plugin, PluginConfiguration}
import com.typesafe.config._
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerConfig.{KEY_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG}
import org.apache.kafka.common.serialization.ByteArraySerializer
Expand Down Expand Up @@ -147,4 +149,56 @@ object ConfigurationLoader {
val extractor = config.getConfig("extractor")
ExtractorConfiguration(outputFormat = if (extractor.hasPath("output.format")) Format.withName(extractor.getString("output.format")) else Format.PROTO)
}

def externalKafkaConfiguration(config: Config): List[ExternalKafkaConfiguration] = {
if (!config.hasPath("external.kafka")) {
return List[ExternalKafkaConfiguration]()
}

val kafkaProducerConfig: ConfigObject = config.getObject("external.kafka")
kafkaProducerConfig.unwrapped().map(c => {
val props = new Properties()
val cfg = ConfigFactory.parseMap(c._2.asInstanceOf[util.HashMap[String, Object]])
val topic = cfg.getString("config.topic")
val tags = cfg.getConfig("tags").entrySet().foldRight(Map[String, String]())((t, tMap) => {
tMap + (t.getKey -> t.getValue.unwrapped().toString)
})
val temp = cfg.getConfig("config.props").entrySet() foreach {
kv => {
props.setProperty(kv.getKey, kv.getValue.unwrapped().toString)
}
}

props.put(KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getCanonicalName)
props.put(VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getCanonicalName)

ExternalKafkaConfiguration(tags, KafkaProduceConfiguration(topic, props))
}).toList
}

def additionalTagsConfiguration(config: Config): Map[String, String] = {
if (!config.hasPath("additionaltags")) {
return Map[String, String]()
}
val additionalTagsConfig = config.getConfig("additionaltags")
val additionalTags = additionalTagsConfig.entrySet().foldRight(Map[String, String]())((t, tMap) => {
tMap + (t.getKey -> t.getValue.unwrapped().toString)
})
additionalTags
}

def pluginConfigurations(config: Config): Plugin = {
if (!config.hasPath("plugins")) {
return null
}
val directory = config.getString("plugins.directory")
val pluginConfigurationsList = config.getObject("plugins").unwrapped().filter(c => !"directory".equals(c._1)).map(c => {
val pluginConfig = ConfigFactory.parseMap(c._2.asInstanceOf[util.HashMap[String, Object]])
new PluginConfiguration(
pluginConfig.getString("name"),
pluginConfig.getConfig("config")
)
}).toList
new Plugin(directory, pluginConfigurationsList)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ package com.expedia.www.haystack.collector.commons.config
import java.util.Properties

case class KafkaProduceConfiguration(topic: String, props: Properties)

case class ExternalKafkaConfiguration(tags: Map[String, String], kafkaProduceConfiguration: KafkaProduceConfiguration)
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,74 @@

package com.expedia.www.haystack.collector.commons.sink.kafka

import com.expedia.open.tracing.{Span, Tag}
import com.expedia.www.haystack.collector.commons.MetricsSupport
import com.expedia.www.haystack.collector.commons.config.KafkaProduceConfiguration
import com.expedia.www.haystack.collector.commons.config.{ExternalKafkaConfiguration, KafkaProduceConfiguration}
import com.expedia.www.haystack.collector.commons.record.KeyValuePair
import com.expedia.www.haystack.collector.commons.sink.RecordSink
import org.apache.kafka.clients.producer.{ProducerRecord, _}
import org.slf4j.LoggerFactory

class KafkaRecordSink(config: KafkaProduceConfiguration) extends RecordSink with MetricsSupport {
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

class KafkaRecordSink(config: KafkaProduceConfiguration,
additionalKafkaProducerConfigs: List[ExternalKafkaConfiguration]) extends RecordSink with MetricsSupport {

private val LOGGER = LoggerFactory.getLogger(classOf[KafkaRecordSink])

private val producer: KafkaProducer[Array[Byte], Array[Byte]] = new KafkaProducer[Array[Byte], Array[Byte]](config.props)
private val defaultProducer: KafkaProducer[Array[Byte], Array[Byte]] = new KafkaProducer[Array[Byte], Array[Byte]](config.props)
private val additionalProducers: List[KafkaProducers] = additionalKafkaProducerConfigs
.map(cfg => {
KafkaProducers(cfg.tags, cfg.kafkaProduceConfiguration.topic, new KafkaProducer[Array[Byte], Array[Byte]](cfg.kafkaProduceConfiguration.props))
})

override def toAsync(kvPair: KeyValuePair[Array[Byte], Array[Byte]],
callback: (KeyValuePair[Array[Byte], Array[Byte]], Exception) => Unit = null): Unit = {
val kafkaMessage = new ProducerRecord(config.topic, kvPair.key, kvPair.value)

producer.send(kafkaMessage, new Callback {
defaultProducer.send(kafkaMessage, new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (e != null) {
LOGGER.error(s"Fail to produce the message to kafka for topic=${config.topic} with reason", e)
}
if(callback != null) callback(kvPair, e)
}
})

getMatchingProducers(additionalProducers, Span.parseFrom(kvPair.value)).foreach(p => {
val tempKafkaMessage = new ProducerRecord(p.topic, kvPair.key, kvPair.value)
p.producer.send(tempKafkaMessage, new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (e != null) {
LOGGER.error(s"Fail to produce the message to kafka for topic=${p.topic} with reason", e)
}
if(callback != null) callback(kvPair, e)
}
})
})
}

private def getMatchingProducers(producers: List[KafkaProducers], span: Span): List[KafkaProducers] = {
val tagList: List[Tag] = span.getTagsList.asScala.toList
producers.filter(producer => producer.isMatched(tagList))
}

override def close(): Unit = {
if(producer != null) {
if(defaultProducer != null) {
defaultProducer.flush()
defaultProducer.close()
}
additionalProducers.foreach(p => p.close())
}

case class KafkaProducers(tags: Map[String, String], topic: String, producer: KafkaProducer[Array[Byte], Array[Byte]]) {
def isMatched(spanTags: List[Tag]): Boolean = {
val filteredTags = spanTags.filter(t => t.getVStr.equals(tags.getOrElse(t.getKey, null)))
filteredTags.size.equals(tags.size)
}

def close(): Unit = {
producer.flush()
producer.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package com.expedia.www.haystack.collector.commons.unit
import java.nio.charset.Charset

import com.expedia.open.tracing.Span
import com.expedia.www.haystack.collector.commons.{MetricsSupport, ProtoSpanExtractor}
import com.expedia.www.haystack.collector.commons.config.{ExtractorConfiguration, Format}
import com.expedia.www.haystack.collector.commons.{MetricsSupport, ProtoSpanExtractor}
import com.google.protobuf.util.JsonFormat
import org.scalatest.{FunSpec, Matchers}
import org.slf4j.LoggerFactory
Expand All @@ -37,7 +37,7 @@ class KeyExtractorSpec extends FunSpec with Matchers with MetricsSupport {
"trace-id-2" -> createSpan("trace-id-2", "spanId_2", "service_2", "operation", StartTimeMicros, DurationMicros))

spanMap.foreach(sp => {
val kvPairs = new ProtoSpanExtractor(ExtractorConfiguration(Format.PROTO), LoggerFactory.getLogger(classOf[ProtoSpanExtractor])).extractKeyValuePairs(sp._2.toByteArray)
val kvPairs = new ProtoSpanExtractor(ExtractorConfiguration(Format.PROTO), LoggerFactory.getLogger(classOf[ProtoSpanExtractor]), List()).extractKeyValuePairs(sp._2.toByteArray)
kvPairs.size shouldBe 1

kvPairs.head.key shouldBe sp._1.getBytes
Expand All @@ -53,7 +53,7 @@ class KeyExtractorSpec extends FunSpec with Matchers with MetricsSupport {
"trace-id-2" -> createSpan("trace-id-2", "spanId_2", "service_2", "operation", StartTimeMicros, 1))

spanMap.foreach(sp => {
val kvPairs = new ProtoSpanExtractor(ExtractorConfiguration(Format.JSON), LoggerFactory.getLogger(classOf[ProtoSpanExtractor])).extractKeyValuePairs(sp._2.toByteArray)
val kvPairs = new ProtoSpanExtractor(ExtractorConfiguration(Format.JSON), LoggerFactory.getLogger(classOf[ProtoSpanExtractor]), List()).extractKeyValuePairs(sp._2.toByteArray)
kvPairs.size shouldBe 1

kvPairs.head.key shouldBe sp._1.getBytes
Expand Down
Loading

0 comments on commit 38d99df

Please sign in to comment.