From 5798feacbae82361e4cd41107f6a4b5eb54dcc48 Mon Sep 17 00:00:00 2001 From: Jayson Reis Date: Mon, 9 Mar 2020 15:45:20 +0100 Subject: [PATCH 1/3] Implement kafka consumer and sink This should make kafka sources to send TSV files to S3 --- build.sbt | 3 +- examples/config.hocon.sample | 9 ++ project/Dependencies.scala | 2 + .../loader/IBufferedOutput.scala | 74 ++++++++++ .../loader/KafkaSourceExecutor.scala | 134 ++++++++++++++++++ .../loader/NsqSourceExecutor.scala | 70 +++------ .../loader/S3Loader.scala | 19 ++- .../loader/model.scala | 20 +++ .../loader/sinks/KafkaSink.scala | 69 +++++++++ 9 files changed, 344 insertions(+), 56 deletions(-) create mode 100644 src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala create mode 100644 src/main/scala/com.snowplowanalytics.s3/loader/KafkaSourceExecutor.scala create mode 100644 src/main/scala/com.snowplowanalytics.s3/loader/sinks/KafkaSink.scala diff --git a/build.sbt b/build.sbt index 41c643e..4f978df 100644 --- a/build.sbt +++ b/build.sbt @@ -16,7 +16,7 @@ lazy val root = project.in(file(".")) .settings( name := "snowplow-s3-loader", version := "0.7.0", - description := "Load the contents of a Kinesis stream or NSQ topic to S3" + description := "Load the contents of a Kinesis stream, NSQ or Kafka topic to S3" ) .settings(BuildSettings.buildSettings) .settings(BuildSettings.sbtAssemblySettings) @@ -41,6 +41,7 @@ lazy val root = project.in(file(".")) Dependencies.Libraries.snowplowTracker, Dependencies.Libraries.pureconfig, Dependencies.Libraries.igluCoreJson4s, + Dependencies.Libraries.kafka, // Scala (test only) Dependencies.Libraries.specs2, // Thrift (test only) diff --git a/examples/config.hocon.sample b/examples/config.hocon.sample index 8e4015e..c78b22c 100644 --- a/examples/config.hocon.sample +++ b/examples/config.hocon.sample @@ -3,12 +3,14 @@ # Sources currently supported are: # 'kinesis' for reading records from a Kinesis stream # 'nsq' for reading records from a NSQ topic +# 'kafka' for reading records from a Kafka topic source = "{{source}}" # Sink is used for sending events which processing failed. # Sinks currently supported are: # 'kinesis' for writing records to a Kinesis stream # 'nsq' for writing records to a NSQ topic +# 'kafka' for writing records to a Kafka topic sink = "{{sink}}" # The following are used to authenticate for the Amazon Kinesis sink. @@ -38,6 +40,13 @@ nsq { lookupPort = {{nsqlookupdPort}} } +# Config for Kafka +kafka { + brokers = "{{kafkaBrokers}}" + appName = "{{appName}}" + startFromBeginning = false +} + kinesis { # LATEST: most recent data. # TRIM_HORIZON: oldest available data. diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2aa4403..0096fa1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -40,6 +40,7 @@ object Dependencies { val snowplowTracker = "0.3.0" val pureconfig = "0.8.0" val igluCore = "0.5.0" + val kafka = "2.4.0" // Scala (test only) val specs2 = "3.9.1" } @@ -76,6 +77,7 @@ object Dependencies { val snowplowTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker" % V.snowplowTracker val pureconfig = "com.github.pureconfig" %% "pureconfig" % V.pureconfig val igluCoreJson4s = "com.snowplowanalytics" %% "iglu-core-json4s" % V.igluCore + val kafka = "org.apache.kafka" %% "kafka" % V.kafka // Scala (test only) val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % "test" } diff --git a/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala b/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala new file mode 100644 index 0000000..05d6d1f --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala @@ -0,0 +1,74 @@ +package com.snowplowanalytics.s3.loader + +import com.snowplowanalytics.s3.loader.model.S3Config +import com.snowplowanalytics.s3.loader.serializers.ISerializer +import org.joda.time.{DateTime, DateTimeZone} +import org.joda.time.format.DateTimeFormat +import org.slf4j.Logger + +import scala.collection.JavaConversions._ + +trait IBufferedOutput { + val log: Logger + val serializer: ISerializer + val s3Emitter: S3Emitter + val s3Config: S3Config + + def getBaseFilename(startTime: Long, endTime: Long): String = { + val currentTimeObject = new DateTime(System.currentTimeMillis()) + val startTimeObject = new DateTime(startTime) + val endTimeObject = new DateTime(endTime) + + val fileName = (s3Config.filenamePrefix :: Some( + DateFormat.print(currentTimeObject) + ) :: Some(TimeFormat.print(startTimeObject)) :: Some( + TimeFormat.print(endTimeObject) + ) :: Some(math.abs(util.Random.nextInt).toString) :: Nil).flatten + + val baseFolder = s3Config.outputDirectory :: formatFolderDatePrefix( + currentTimeObject + ) :: Some(fileName.mkString("-")) :: Nil + + val baseName = baseFolder.flatten.mkString("/") + baseName + } + + private val TimeFormat = + DateTimeFormat.forPattern("HHmmssSSS").withZone(DateTimeZone.UTC) + private val DateFormat = + DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC) + + private val folderDateFormat = + s3Config.dateFormat.map(format => DateTimeFormat.forPattern(format)) + private def formatFolderDatePrefix(currentTime: DateTime): Option[String] = + folderDateFormat.map(formatter => formatter.print(currentTime)) + + def flushMessages(messages: List[EmitterInput], + bufferStartTime: Long, + bufferEndTime: Long): Unit = { + val baseFilename = getBaseFilename(bufferStartTime, bufferEndTime) + val serializationResults = + serializer.serialize(messages, baseFilename) + val (successes, failures) = + serializationResults.results.partition(_.isValid) + + log.info( + s"Successfully serialized ${successes.size} records out of ${successes.size + failures.size}" + ) + + if (successes.nonEmpty) { + serializationResults.namedStreams.foreach { stream => + val connectionAttemptStartTime = System.currentTimeMillis() + s3Emitter.attemptEmit(stream, false, connectionAttemptStartTime) match { + case false => log.error(s"Error while sending to S3") + case true => + log.info(s"Successfully sent ${successes.size} records") + } + } + } + + if (failures.nonEmpty) { + s3Emitter.sendFailures(failures) + } + } +} diff --git a/src/main/scala/com.snowplowanalytics.s3/loader/KafkaSourceExecutor.scala b/src/main/scala/com.snowplowanalytics.s3/loader/KafkaSourceExecutor.scala new file mode 100644 index 0000000..d2bb0bc --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.s3/loader/KafkaSourceExecutor.scala @@ -0,0 +1,134 @@ +/** + * Copyright (c) 2014-2020 Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache + * License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. + * + * See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.s3.loader + +import java.time.Duration +import java.util + +// Kafka +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition + +// Logger +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +// Scala +import scala.collection.JavaConversions._ +import scala.collection.mutable.ListBuffer + +// Tracker +import com.snowplowanalytics.snowplow.scalatracker.Tracker + +// cats +import cats.syntax.validated._ + +//AWS libs +import com.amazonaws.auth.AWSCredentialsProvider + +// This project +import com.snowplowanalytics.s3.loader.model._ +import com.snowplowanalytics.s3.loader.serializers._ +import com.snowplowanalytics.s3.loader.sinks._ + +/** + * Executor for KafkaSource + * + * @param config S3Loader configuration + * @param provider AWSCredentialsProvider + * @param badSink Configured BadSink + * @param serializer Serializer instance + * @param maxConnectionTime Max time for trying to connect S3 instance + */ +class KafkaSourceExecutor(config: S3LoaderConfig, + provider: AWSCredentialsProvider, + badSink: ISink, + val serializer: ISerializer, + maxConnectionTime: Long, + tracker: Option[Tracker]) + extends Runnable + with IBufferedOutput { + lazy val log: Logger = LoggerFactory.getLogger(getClass) + lazy val s3Config: S3Config = config.s3 + private val kafkaConsumer = { + val consumer = + new KafkaConsumer[String, RawRecord](config.kafka.properties) + val topicName = config.streams.inStreamName + val topics = topicName :: Nil + var seeked = false + consumer.subscribe( + topics, + new ConsumerRebalanceListener() { + override def onPartitionsRevoked( + partitions: util.Collection[TopicPartition] + ): Unit = {} + + override def onPartitionsAssigned( + partitions: util.Collection[TopicPartition] + ): Unit = { + if (!config.kafka.startFromBeginning || seeked) { + return + } + consumer.seekToBeginning(partitions) + seeked = true + } + } + ) + consumer + } + private val pollTime = Duration.ofMillis(config.kafka.pollTime.getOrElse(1000)) + private val msgBuffer = new ListBuffer[EmitterInput]() + val s3Emitter = + new S3Emitter(config.s3, provider, badSink, maxConnectionTime, tracker) + private var bufferStartTime = System.currentTimeMillis() + + override def run(): Unit = { + while (true) { + val records = kafkaConsumer.poll(pollTime) + log.debug("Received %d records", records.count()) + + records.foreach(record => { + log.debug( + s"Processing record ${record.key()} partition id ${record.partition()}" + ) + val validMsg = record.value().valid + msgBuffer += validMsg + if (shouldFlush) flush() + }) + } + } + + private def shouldFlush: Boolean = { + msgBuffer.nonEmpty && (msgBuffer.length >= config.streams.buffer.recordLimit || timerDepleted()) + } + + private def timerDepleted(): Boolean = { + (System + .currentTimeMillis() - bufferStartTime) > config.streams.buffer.timeLimit + } + + private def flush(): Unit = { + val bufferEndTime = System.currentTimeMillis() + flushMessages(msgBuffer.toList, bufferStartTime, bufferEndTime) + msgBuffer.clear() + bufferStartTime = bufferEndTime + kafkaConsumer.commitSync() + } +} diff --git a/src/main/scala/com.snowplowanalytics.s3/loader/NsqSourceExecutor.scala b/src/main/scala/com.snowplowanalytics.s3/loader/NsqSourceExecutor.scala index 4524942..2e9a553 100644 --- a/src/main/scala/com.snowplowanalytics.s3/loader/NsqSourceExecutor.scala +++ b/src/main/scala/com.snowplowanalytics.s3/loader/NsqSourceExecutor.scala @@ -26,10 +26,10 @@ import com.snowplowanalytics.client.nsq.NSQConfig import com.snowplowanalytics.client.nsq.callbacks.NSQMessageCallback import com.snowplowanalytics.client.nsq.callbacks.NSQErrorCallback import com.snowplowanalytics.client.nsq.exceptions.NSQException +import org.slf4j.Logger // Scala import scala.collection.mutable.ListBuffer -import scala.collection.JavaConversions._ // Tracker import com.snowplowanalytics.snowplow.scalatracker.Tracker @@ -40,10 +40,6 @@ import cats.syntax.validated._ //AWS libs import com.amazonaws.auth.AWSCredentialsProvider -// Joda-Time -import org.joda.time.{DateTime, DateTimeZone} -import org.joda.time.format.DateTimeFormat - // Logging import org.slf4j.LoggerFactory @@ -65,34 +61,23 @@ class NsqSourceExecutor( config: S3LoaderConfig, provider: AWSCredentialsProvider, badSink: ISink, - serializer: ISerializer, + val serializer: ISerializer, maxConnectionTime: Long, tracker: Option[Tracker] -) extends Runnable { +) extends Runnable with IBufferedOutput { - lazy val log = LoggerFactory.getLogger(getClass()) + lazy val log: Logger = LoggerFactory.getLogger(getClass) + lazy val s3Config: S3Config = config.s3 //nsq messages will be buffered in msgBuffer until buffer size become equal to nsqBufferSize val msgBuffer = new ListBuffer[EmitterInput]() - val s3Emitter = new S3Emitter(config.s3, provider, badSink, maxConnectionTime, tracker) - private val TimeFormat = DateTimeFormat.forPattern("HHmmssSSS").withZone(DateTimeZone.UTC) - private val DateFormat = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC) - - private def getBaseFilename(startTime: Long, endTime: Long): String = { - val currentTimeObject = new DateTime(System.currentTimeMillis()) - val startTimeObject = new DateTime(startTime) - val endTimeObject = new DateTime(endTime) - - DateFormat.print(currentTimeObject) + "-" + - TimeFormat.print(startTimeObject) + "-" + - TimeFormat.print(endTimeObject) + "-" + - math.abs(util.Random.nextInt) - } + val s3Emitter = + new S3Emitter(config.s3, provider, badSink, maxConnectionTime, tracker) override def run: Unit = { - val nsqCallback = new NSQMessageCallback { + val nsqCallback = new NSQMessageCallback { //start time of filling the buffer var bufferStartTime = System.currentTimeMillis() val nsqBufferSize = config.streams.buffer.recordLimit @@ -105,25 +90,7 @@ class NsqSourceExecutor( if (msgBuffer.size >= nsqBufferSize) { //finish time of filling the buffer val bufferEndTime = System.currentTimeMillis() - val baseFilename = getBaseFilename(bufferStartTime, bufferEndTime) - val serializationResults = serializer.serialize(msgBuffer.toList, baseFilename) - val (successes, failures) = serializationResults.results.partition(_.isValid) - - log.info(s"Successfully serialized ${successes.size} records out of ${successes.size + failures.size}") - - if (successes.nonEmpty) { - serializationResults.namedStreams.foreach { stream => - val connectionAttemptStartTime = System.currentTimeMillis() - s3Emitter.attemptEmit(stream, false, connectionAttemptStartTime) match { - case false => log.error(s"Error while sending to S3") - case true => log.info(s"Successfully sent ${successes.size} records") - } - } - } - - if (failures.nonEmpty) { - s3Emitter.sendFailures(failures) - } + flushMessages(msgBuffer.toList, bufferStartTime, bufferEndTime) msgBuffer.clear() //make buffer start time of the next buffer the buffer finish time of the current buffer @@ -135,18 +102,23 @@ class NsqSourceExecutor( val errorCallback = new NSQErrorCallback { override def error(e: NSQException) = - log.error(s"Exception while consuming topic $config.streams.inStreamName", e) + log.error( + s"Exception while consuming topic $config.streams.inStreamName", + e + ) } val lookup = new DefaultNSQLookup // use NSQLookupd lookup.addLookupAddress(config.nsq.host, config.nsq.lookupPort) - val consumer = new NSQConsumer(lookup, - config.streams.inStreamName, - config.nsq.channelName, - nsqCallback, - new NSQConfig(), - errorCallback) + val consumer = new NSQConsumer( + lookup, + config.streams.inStreamName, + config.nsq.channelName, + nsqCallback, + new NSQConfig(), + errorCallback + ) consumer.start() } } diff --git a/src/main/scala/com.snowplowanalytics.s3/loader/S3Loader.scala b/src/main/scala/com.snowplowanalytics.s3/loader/S3Loader.scala index a766820..1905536 100644 --- a/src/main/scala/com.snowplowanalytics.s3/loader/S3Loader.scala +++ b/src/main/scala/com.snowplowanalytics.s3/loader/S3Loader.scala @@ -15,15 +15,12 @@ package com.snowplowanalytics.s3.loader import java.util.Properties import cats.syntax.validated._ - import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration - import org.slf4j.LoggerFactory - import com.snowplowanalytics.s3.loader.model.S3LoaderConfig import com.snowplowanalytics.s3.loader.serializers.{GZipSerializer, LzoSerializer} -import com.snowplowanalytics.s3.loader.sinks.{KinesisSink, NsqSink} +import com.snowplowanalytics.s3.loader.sinks.{KafkaSink, KinesisSink, NsqSink} object S3Loader { @@ -43,6 +40,7 @@ object S3Loader { val badSink = config.sink match { case "kinesis" => new KinesisSink(credentials, kinesisSinkEndpoint, kinesisSinkRegion, kinesisSinkName, tracker) case "nsq" => new NsqSink(config) + case "kafka" => new KafkaSink(config) } val serializer = config.s3.format match { @@ -72,8 +70,17 @@ object S3Loader { maxConnectionTime, tracker ).valid + // Read records from Kafka + case "kafka" => + new KafkaSourceExecutor(config, + credentials, + badSink, + serializer, + maxConnectionTime, + tracker + ).valid - case _ => s"Source must be set to kinesis' or 'NSQ'. Provided: ${config.source}".invalid + case _ => s"Source must be set to one of 'kinesis', 'NSQ' or 'kafka'. Provided: ${config.source}".invalid } executor.fold( @@ -89,7 +96,7 @@ object S3Loader { // This does not apply to NSQ because NSQ consumer is non-blocking and fall here // right after consumer.start() config.source match { - case "kinesis" => System.exit(1) + case "kinesis" | "kafka" => System.exit(1) // do anything case "nsq" => } diff --git a/src/main/scala/com.snowplowanalytics.s3/loader/model.scala b/src/main/scala/com.snowplowanalytics.s3/loader/model.scala index c5eb55e..ee36ff5 100644 --- a/src/main/scala/com.snowplowanalytics.s3/loader/model.scala +++ b/src/main/scala/com.snowplowanalytics.s3/loader/model.scala @@ -24,6 +24,7 @@ import java.text.SimpleDateFormat // Scala import scala.util.Try +import java.util.Properties package model { case class AWSConfig(accessKey: String, secretKey: String) @@ -33,6 +34,24 @@ package model { port: Int, lookupPort: Int ) + case class KafkaConfig( + brokers: String, + appName: String, + pollTime: Option[Long], + startFromBeginning: Boolean + ) { + val properties = new Properties() + properties.setProperty("bootstrap.servers", brokers) + properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") + properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") + + properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") + + properties.setProperty("enable.auto.commit", "false") + + properties.setProperty("group.id", appName) + } case class KinesisConfig( initialPosition: String, initialTimestamp: Option[String], @@ -91,6 +110,7 @@ package model { sink: String, aws: AWSConfig, nsq: NSQConfig, + kafka: KafkaConfig, kinesis: KinesisConfig, streams: StreamsConfig, s3: S3Config, diff --git a/src/main/scala/com.snowplowanalytics.s3/loader/sinks/KafkaSink.scala b/src/main/scala/com.snowplowanalytics.s3/loader/sinks/KafkaSink.scala new file mode 100644 index 0000000..f414d3d --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.s3/loader/sinks/KafkaSink.scala @@ -0,0 +1,69 @@ +/** + * Copyright (c) 2014-2020 Snowplow Analytics Ltd. + * All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache + * License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. + * + * See the Apache License Version 2.0 for the specific language + * governing permissions and limitations there under. + */ +package com.snowplowanalytics.s3.loader.sinks + +// Kafka +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} + +// Logging +import org.slf4j.LoggerFactory + +// Concurrent libraries +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Random, Success} +import scala.util.Failure + +// This project +import com.snowplowanalytics.s3.loader.model.S3LoaderConfig + +/** + * Kafka sink + * + * @param config Configuration for Kafka + */ +class KafkaSink(config: S3LoaderConfig) extends ISink { + private val log = LoggerFactory.getLogger(getClass) + private lazy val producer = new KafkaProducer[String, String](config.kafka.properties) + + private def put(record: ProducerRecord[String, String]): Future[RecordMetadata] = Future { + producer.send(record).get() + } + + /** + * Write a record to the Kafka topic + * + * @param output The string record to write + * @param key Unused parameter which exists to extend ISink + * @param good Unused parameter which exists to extend ISink + */ + override def store(output: String, key: Option[String], good: Boolean): Unit = { + val record = new ProducerRecord[String, String](config.streams.outStreamName, key.getOrElse(Random.nextString(6)), output) + put(record) onComplete { + case Success(result) => + log.info("Writing successful") + log.info(s" + PartitionId: ${result.partition()}") + log.info(s" + Offset: ${result.offset()}") + case Failure(f) => + log.error("Writing failed") + log.error(" + " + f.getMessage) + } + } +} + From b5f7cac6c0e6e30c42e9d2ee321fbe22ee5e20ed Mon Sep 17 00:00:00 2001 From: Jayson Reis Date: Mon, 16 Mar 2020 16:10:51 +0100 Subject: [PATCH 2/3] Change list creation to use cats.syntax.option --- .../loader/IBufferedOutput.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala b/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala index 05d6d1f..ef6fc36 100644 --- a/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala +++ b/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala @@ -1,9 +1,10 @@ package com.snowplowanalytics.s3.loader +import cats.syntax.option._ import com.snowplowanalytics.s3.loader.model.S3Config import com.snowplowanalytics.s3.loader.serializers.ISerializer -import org.joda.time.{DateTime, DateTimeZone} import org.joda.time.format.DateTimeFormat +import org.joda.time.{DateTime, DateTimeZone} import org.slf4j.Logger import scala.collection.JavaConversions._ @@ -19,15 +20,17 @@ trait IBufferedOutput { val startTimeObject = new DateTime(startTime) val endTimeObject = new DateTime(endTime) - val fileName = (s3Config.filenamePrefix :: Some( - DateFormat.print(currentTimeObject) - ) :: Some(TimeFormat.print(startTimeObject)) :: Some( - TimeFormat.print(endTimeObject) - ) :: Some(math.abs(util.Random.nextInt).toString) :: Nil).flatten + val fileName = (s3Config.filenamePrefix :: + DateFormat.print(currentTimeObject).some :: + TimeFormat.print(startTimeObject).some :: + TimeFormat.print(endTimeObject).some :: + math.abs(util.Random.nextInt).toString.some :: + Nil).flatten - val baseFolder = s3Config.outputDirectory :: formatFolderDatePrefix( - currentTimeObject - ) :: Some(fileName.mkString("-")) :: Nil + val baseFolder = s3Config.outputDirectory :: + formatFolderDatePrefix(currentTimeObject) :: fileName + .mkString("-").some :: + Nil val baseName = baseFolder.flatten.mkString("/") baseName From 7c413daf6fc4884586a0d05494c1ba9bef472a7e Mon Sep 17 00:00:00 2001 From: Jayson Reis Date: Mon, 16 Mar 2020 18:01:32 +0100 Subject: [PATCH 3/3] Move private methods to bottom of IBufferedOutput --- .../loader/IBufferedOutput.scala | 62 +++++++++---------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala b/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala index ef6fc36..6e4eadb 100644 --- a/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala +++ b/src/main/scala/com.snowplowanalytics.s3/loader/IBufferedOutput.scala @@ -15,37 +15,6 @@ trait IBufferedOutput { val s3Emitter: S3Emitter val s3Config: S3Config - def getBaseFilename(startTime: Long, endTime: Long): String = { - val currentTimeObject = new DateTime(System.currentTimeMillis()) - val startTimeObject = new DateTime(startTime) - val endTimeObject = new DateTime(endTime) - - val fileName = (s3Config.filenamePrefix :: - DateFormat.print(currentTimeObject).some :: - TimeFormat.print(startTimeObject).some :: - TimeFormat.print(endTimeObject).some :: - math.abs(util.Random.nextInt).toString.some :: - Nil).flatten - - val baseFolder = s3Config.outputDirectory :: - formatFolderDatePrefix(currentTimeObject) :: fileName - .mkString("-").some :: - Nil - - val baseName = baseFolder.flatten.mkString("/") - baseName - } - - private val TimeFormat = - DateTimeFormat.forPattern("HHmmssSSS").withZone(DateTimeZone.UTC) - private val DateFormat = - DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC) - - private val folderDateFormat = - s3Config.dateFormat.map(format => DateTimeFormat.forPattern(format)) - private def formatFolderDatePrefix(currentTime: DateTime): Option[String] = - folderDateFormat.map(formatter => formatter.print(currentTime)) - def flushMessages(messages: List[EmitterInput], bufferStartTime: Long, bufferEndTime: Long): Unit = { @@ -74,4 +43,35 @@ trait IBufferedOutput { s3Emitter.sendFailures(failures) } } + + private[this] def getBaseFilename(startTime: Long, endTime: Long): String = { + val currentTimeObject = new DateTime(System.currentTimeMillis()) + val startTimeObject = new DateTime(startTime) + val endTimeObject = new DateTime(endTime) + + val fileName = (s3Config.filenamePrefix :: + DateFormat.print(currentTimeObject).some :: + TimeFormat.print(startTimeObject).some :: + TimeFormat.print(endTimeObject).some :: + math.abs(util.Random.nextInt).toString.some :: + Nil).flatten + + val baseFolder = s3Config.outputDirectory :: + formatFolderDatePrefix(currentTimeObject) :: fileName + .mkString("-").some :: + Nil + + val baseName = baseFolder.flatten.mkString("/") + baseName + } + + private[this] val TimeFormat = + DateTimeFormat.forPattern("HHmmssSSS").withZone(DateTimeZone.UTC) + private[this] val DateFormat = + DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC) + + private[this] val folderDateFormat = + s3Config.dateFormat.map(format => DateTimeFormat.forPattern(format)) + private[this] def formatFolderDatePrefix(currentTime: DateTime): Option[String] = + folderDateFormat.map(formatter => formatter.print(currentTime)) }