Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement kafka consumer and sink #162

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ lazy val root = project.in(file("."))
.settings(
name := "snowplow-s3-loader",
version := "0.7.0",
description := "Load the contents of a Kinesis stream or NSQ topic to S3"
description := "Load the contents of a Kinesis stream, NSQ or Kafka topic to S3"
)
.settings(BuildSettings.buildSettings)
.settings(BuildSettings.sbtAssemblySettings)
Expand All @@ -41,6 +41,7 @@ lazy val root = project.in(file("."))
Dependencies.Libraries.snowplowTracker,
Dependencies.Libraries.pureconfig,
Dependencies.Libraries.igluCoreJson4s,
Dependencies.Libraries.kafka,
// Scala (test only)
Dependencies.Libraries.specs2,
// Thrift (test only)
Expand Down
9 changes: 9 additions & 0 deletions examples/config.hocon.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
# 'kafka' for reading records from a Kafka topic
source = "{{source}}"

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
# 'kafka' for writing records to a Kafka topic
sink = "{{sink}}"

# The following are used to authenticate for the Amazon Kinesis sink.
Expand Down Expand Up @@ -38,6 +40,13 @@ nsq {
lookupPort = {{nsqlookupdPort}}
}

# Config for Kafka
kafka {
brokers = "{{kafkaBrokers}}"
appName = "{{appName}}"
startFromBeginning = false
}

kinesis {
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object Dependencies {
val snowplowTracker = "0.3.0"
val pureconfig = "0.8.0"
val igluCore = "0.5.0"
val kafka = "2.4.0"
// Scala (test only)
val specs2 = "3.9.1"
}
Expand Down Expand Up @@ -76,6 +77,7 @@ object Dependencies {
val snowplowTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker" % V.snowplowTracker
val pureconfig = "com.github.pureconfig" %% "pureconfig" % V.pureconfig
val igluCoreJson4s = "com.snowplowanalytics" %% "iglu-core-json4s" % V.igluCore
val kafka = "org.apache.kafka" %% "kafka" % V.kafka
// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % "test"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.snowplowanalytics.s3.loader

import cats.syntax.option._
import com.snowplowanalytics.s3.loader.model.S3Config
import com.snowplowanalytics.s3.loader.serializers.ISerializer
import org.joda.time.format.DateTimeFormat
import org.joda.time.{DateTime, DateTimeZone}
import org.slf4j.Logger

import scala.collection.JavaConversions._

trait IBufferedOutput {
val log: Logger
val serializer: ISerializer
val s3Emitter: S3Emitter
val s3Config: S3Config

def flushMessages(messages: List[EmitterInput],
bufferStartTime: Long,
bufferEndTime: Long): Unit = {
val baseFilename = getBaseFilename(bufferStartTime, bufferEndTime)
val serializationResults =
serializer.serialize(messages, baseFilename)
val (successes, failures) =
serializationResults.results.partition(_.isValid)

log.info(
s"Successfully serialized ${successes.size} records out of ${successes.size + failures.size}"
)

if (successes.nonEmpty) {
serializationResults.namedStreams.foreach { stream =>
val connectionAttemptStartTime = System.currentTimeMillis()
s3Emitter.attemptEmit(stream, false, connectionAttemptStartTime) match {
case false => log.error(s"Error while sending to S3")
case true =>
log.info(s"Successfully sent ${successes.size} records")
}
}
}

if (failures.nonEmpty) {
s3Emitter.sendFailures(failures)
}
}

private[this] def getBaseFilename(startTime: Long, endTime: Long): String = {
val currentTimeObject = new DateTime(System.currentTimeMillis())
val startTimeObject = new DateTime(startTime)
val endTimeObject = new DateTime(endTime)

val fileName = (s3Config.filenamePrefix ::
DateFormat.print(currentTimeObject).some ::
TimeFormat.print(startTimeObject).some ::
TimeFormat.print(endTimeObject).some ::
math.abs(util.Random.nextInt).toString.some ::
Nil).flatten

val baseFolder = s3Config.outputDirectory ::
formatFolderDatePrefix(currentTimeObject) :: fileName
.mkString("-").some ::
Nil

val baseName = baseFolder.flatten.mkString("/")
baseName
}

private[this] val TimeFormat =
DateTimeFormat.forPattern("HHmmssSSS").withZone(DateTimeZone.UTC)
private[this] val DateFormat =
DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC)

private[this] val folderDateFormat =
s3Config.dateFormat.map(format => DateTimeFormat.forPattern(format))
private[this] def formatFolderDatePrefix(currentTime: DateTime): Option[String] =
folderDateFormat.map(formatter => formatter.print(currentTime))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* Copyright (c) 2014-2020 Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache
* License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied.
*
* See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.s3.loader

import java.time.Duration
import java.util

// Kafka
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition

// Logger
import org.slf4j.Logger
import org.slf4j.LoggerFactory

// Scala
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

// Tracker
import com.snowplowanalytics.snowplow.scalatracker.Tracker

// cats
import cats.syntax.validated._

//AWS libs
import com.amazonaws.auth.AWSCredentialsProvider

// This project
import com.snowplowanalytics.s3.loader.model._
import com.snowplowanalytics.s3.loader.serializers._
import com.snowplowanalytics.s3.loader.sinks._

/**
* Executor for KafkaSource
*
* @param config S3Loader configuration
* @param provider AWSCredentialsProvider
* @param badSink Configured BadSink
* @param serializer Serializer instance
* @param maxConnectionTime Max time for trying to connect S3 instance
*/
class KafkaSourceExecutor(config: S3LoaderConfig,
provider: AWSCredentialsProvider,
badSink: ISink,
val serializer: ISerializer,
maxConnectionTime: Long,
tracker: Option[Tracker])
extends Runnable
with IBufferedOutput {
lazy val log: Logger = LoggerFactory.getLogger(getClass)
lazy val s3Config: S3Config = config.s3
private val kafkaConsumer = {
val consumer =
new KafkaConsumer[String, RawRecord](config.kafka.properties)
val topicName = config.streams.inStreamName
val topics = topicName :: Nil
var seeked = false
consumer.subscribe(
topics,
new ConsumerRebalanceListener() {
override def onPartitionsRevoked(
partitions: util.Collection[TopicPartition]
): Unit = {}

override def onPartitionsAssigned(
partitions: util.Collection[TopicPartition]
): Unit = {
if (!config.kafka.startFromBeginning || seeked) {
return
}
consumer.seekToBeginning(partitions)
seeked = true
}
}
)
consumer
}
private val pollTime = Duration.ofMillis(config.kafka.pollTime.getOrElse(1000))
private val msgBuffer = new ListBuffer[EmitterInput]()
val s3Emitter =
new S3Emitter(config.s3, provider, badSink, maxConnectionTime, tracker)
private var bufferStartTime = System.currentTimeMillis()

override def run(): Unit = {
while (true) {
val records = kafkaConsumer.poll(pollTime)
log.debug("Received %d records", records.count())

records.foreach(record => {
log.debug(
s"Processing record ${record.key()} partition id ${record.partition()}"
)
val validMsg = record.value().valid
msgBuffer += validMsg
if (shouldFlush) flush()
})
}
}

private def shouldFlush: Boolean = {
msgBuffer.nonEmpty && (msgBuffer.length >= config.streams.buffer.recordLimit || timerDepleted())
}

private def timerDepleted(): Boolean = {
(System
.currentTimeMillis() - bufferStartTime) > config.streams.buffer.timeLimit
}

private def flush(): Unit = {
val bufferEndTime = System.currentTimeMillis()
flushMessages(msgBuffer.toList, bufferStartTime, bufferEndTime)
msgBuffer.clear()
bufferStartTime = bufferEndTime
kafkaConsumer.commitSync()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import com.snowplowanalytics.client.nsq.NSQConfig
import com.snowplowanalytics.client.nsq.callbacks.NSQMessageCallback
import com.snowplowanalytics.client.nsq.callbacks.NSQErrorCallback
import com.snowplowanalytics.client.nsq.exceptions.NSQException
import org.slf4j.Logger

// Scala
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConversions._

// Tracker
import com.snowplowanalytics.snowplow.scalatracker.Tracker
Expand All @@ -40,10 +40,6 @@ import cats.syntax.validated._
//AWS libs
import com.amazonaws.auth.AWSCredentialsProvider

// Joda-Time
import org.joda.time.{DateTime, DateTimeZone}
import org.joda.time.format.DateTimeFormat

// Logging
import org.slf4j.LoggerFactory

Expand All @@ -65,34 +61,23 @@ class NsqSourceExecutor(
config: S3LoaderConfig,
provider: AWSCredentialsProvider,
badSink: ISink,
serializer: ISerializer,
val serializer: ISerializer,
maxConnectionTime: Long,
tracker: Option[Tracker]
) extends Runnable {
) extends Runnable with IBufferedOutput {

lazy val log = LoggerFactory.getLogger(getClass())
lazy val log: Logger = LoggerFactory.getLogger(getClass)
lazy val s3Config: S3Config = config.s3

//nsq messages will be buffered in msgBuffer until buffer size become equal to nsqBufferSize
val msgBuffer = new ListBuffer[EmitterInput]()

val s3Emitter = new S3Emitter(config.s3, provider, badSink, maxConnectionTime, tracker)
private val TimeFormat = DateTimeFormat.forPattern("HHmmssSSS").withZone(DateTimeZone.UTC)
private val DateFormat = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC)

private def getBaseFilename(startTime: Long, endTime: Long): String = {
val currentTimeObject = new DateTime(System.currentTimeMillis())
val startTimeObject = new DateTime(startTime)
val endTimeObject = new DateTime(endTime)

DateFormat.print(currentTimeObject) + "-" +
TimeFormat.print(startTimeObject) + "-" +
TimeFormat.print(endTimeObject) + "-" +
math.abs(util.Random.nextInt)
}
val s3Emitter =
new S3Emitter(config.s3, provider, badSink, maxConnectionTime, tracker)

override def run: Unit = {

val nsqCallback = new NSQMessageCallback {
val nsqCallback = new NSQMessageCallback {
//start time of filling the buffer
var bufferStartTime = System.currentTimeMillis()
val nsqBufferSize = config.streams.buffer.recordLimit
Expand All @@ -105,25 +90,7 @@ class NsqSourceExecutor(
if (msgBuffer.size >= nsqBufferSize) {
//finish time of filling the buffer
val bufferEndTime = System.currentTimeMillis()
val baseFilename = getBaseFilename(bufferStartTime, bufferEndTime)
val serializationResults = serializer.serialize(msgBuffer.toList, baseFilename)
val (successes, failures) = serializationResults.results.partition(_.isValid)

log.info(s"Successfully serialized ${successes.size} records out of ${successes.size + failures.size}")

if (successes.nonEmpty) {
serializationResults.namedStreams.foreach { stream =>
val connectionAttemptStartTime = System.currentTimeMillis()
s3Emitter.attemptEmit(stream, false, connectionAttemptStartTime) match {
case false => log.error(s"Error while sending to S3")
case true => log.info(s"Successfully sent ${successes.size} records")
}
}
}

if (failures.nonEmpty) {
s3Emitter.sendFailures(failures)
}
flushMessages(msgBuffer.toList, bufferStartTime, bufferEndTime)

msgBuffer.clear()
//make buffer start time of the next buffer the buffer finish time of the current buffer
Expand All @@ -135,18 +102,23 @@ class NsqSourceExecutor(

val errorCallback = new NSQErrorCallback {
override def error(e: NSQException) =
log.error(s"Exception while consuming topic $config.streams.inStreamName", e)
log.error(
s"Exception while consuming topic $config.streams.inStreamName",
e
)
}

val lookup = new DefaultNSQLookup
// use NSQLookupd
lookup.addLookupAddress(config.nsq.host, config.nsq.lookupPort)
val consumer = new NSQConsumer(lookup,
config.streams.inStreamName,
config.nsq.channelName,
nsqCallback,
new NSQConfig(),
errorCallback)
val consumer = new NSQConsumer(
lookup,
config.streams.inStreamName,
config.nsq.channelName,
nsqCallback,
new NSQConfig(),
errorCallback
)
consumer.start()
}
}
Loading