Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create event subscriber - WIP #370

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,15 @@ runner.dialect = scala213source3
spaces.beforeContextBoundColon = Always
style = defaultWithAlign

rewrite.scala3.convertToNewSyntax = true # TODO remove when written in Scala3.
rewrite.scala3.convertToNewSyntax = true # TODO remove when written in Scala3.
fileOverride {
"glob:**/scheduler-3/src/main/scala/**" {
runner.dialect = scala3
}
"glob:**/scheduler-3/target/scala-3.6.2/src_managed/main/**" {
runner.dialect = scala3
}
"glob:**/scheduler-3/src/test/scala/**" {
runner.dialect = scala3
}
}
17 changes: 16 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ object Dependencies {
lazy val test = Seq(testKit, scalatest)
}

object Circe {
private lazy val version = "0.14.10"

lazy val generic = "io.circe" %% "circe-generic" % version
lazy val parser = "io.circe" %% "circe-parser" % version
}

object Fs2 {
private lazy val version = "3.11.0"
private lazy val kafkaVersion = "3.6.0"
Expand Down Expand Up @@ -100,6 +107,10 @@ object Dependencies {
lazy val testkit = "org.typelevel" %% "otel4s-oteljava-testkit" % version % Test
}

object TopicLoader {
lazy val topicLoader = "uk.sky" %% "fs2-kafka-topic-loader" % "0.1.0"
}

object Vulcan {
private lazy val version = "1.11.1"

Expand Down Expand Up @@ -147,6 +158,7 @@ object Dependencies {
scalaTest,
scalaTestPlusMockito
)

val scheduler: Seq[ModuleID] = core ++ runtime ++ test

val scheduler3: Seq[ModuleID] = Seq(
Expand All @@ -158,6 +170,8 @@ object Dependencies {
Cats.effectTestkitScalatest,
Cats.log4cats,
Cats.log4catsSlf4j,
Circe.generic,
Circe.parser,
Fs2.core,
Fs2.kafka,
Monocle.core,
Expand All @@ -171,6 +185,7 @@ object Dependencies {
OpenTelemetry.sdkAutoconfigure,
Otel4s.java,
Otel4s.testkit,
mouse
mouse,
TopicLoader.topicLoader
)
}
4 changes: 4 additions & 0 deletions scheduler-3/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
scheduler.reader {
schedule-topics = [${?SCHEDULE_TOPICS}]
topics {
avro = [${?AVRO_SCHEDULE_TOPICS}]
json = [${?JSON_SCHEDULE_TOPICS}]
}
kafka-brokers = "localhost:9092"
kafka-brokers = ${?KAFKA_BROKERS}
}
Expand Down
133 changes: 132 additions & 1 deletion scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,141 @@
package uk.sky.scheduler

import fs2.Stream
import cats.effect.Resource.ExitCase
import cats.effect.kernel.Deferred
import cats.effect.kernel.Resource.ExitCase.Succeeded
import cats.effect.{Async, Ref, Resource}
import cats.syntax.all.*
import cats.{Monad, Parallel, Show}
import fs2.{Stream, *}
import fs2.kafka.*
import mouse.all.*
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.otel4s.metrics.Meter
import org.typelevel.otel4s.{Attribute, Attributes}
import uk.sky.fs2.kafka.topicloader.TopicLoader
import uk.sky.scheduler.circe.jsonScheduleDecoder
import uk.sky.scheduler.config.{Config, KafkaConfig}
import uk.sky.scheduler.domain.ScheduleEvent
import uk.sky.scheduler.error.ScheduleError
import uk.sky.scheduler.kafka.avro.{avroBinaryDeserializer, avroScheduleCodec, AvroSchedule}
import uk.sky.scheduler.kafka.json.{jsonDeserializer, JsonSchedule}
import uk.sky.scheduler.message.Message
import uk.sky.scheduler.converters.all.*

trait EventSubscriber[F[_]] {
def messages: Stream[F, Message[Either[ScheduleError, Option[ScheduleEvent]]]]
}

object EventSubscriber {
private type Output = Either[ScheduleError, Option[ScheduleEvent]]

def kafka[F[_] : Async : Parallel : LoggerFactory](
config: Config,
loaded: Deferred[F, Unit]
): F[EventSubscriber[F]] = {

val avroConsumerSettings: ConsumerSettings[F, String, Either[ScheduleError, Option[AvroSchedule]]] = {
given Resource[F, Deserializer[F, Either[ScheduleError, Option[AvroSchedule]]]] =
avroBinaryDeserializer[F, AvroSchedule].map(_.option.map(_.sequence))

config.kafka.consumerSettings[F, String, Either[ScheduleError, Option[AvroSchedule]]]
}

val jsonConsumerSettings: ConsumerSettings[F, String, Either[ScheduleError, Option[JsonSchedule]]] = {
given Deserializer[F, Either[ScheduleError, Option[JsonSchedule]]] =
jsonDeserializer[F, JsonSchedule].option.map(_.sequence)

config.kafka.consumerSettings[F, String, Either[ScheduleError, Option[JsonSchedule]]]
}

for {
avroLoaderRef <- Ref.of[F, Boolean](false)
jsonLoadedRef <- Ref.of[F, Boolean](false)
} yield new EventSubscriber[F] {

/** If both topics have finished loading, complete the Deferred to allow Queueing schedules.
*/
private def onLoadCompare(exitCase: ExitCase): F[Unit] = exitCase match {
case Succeeded =>
for {
avroLoaded <- avroLoaderRef.get
jsonLoaded <- jsonLoadedRef.get
_ <- Async[F].whenA(avroLoaded && jsonLoaded)(loaded.complete(()))
} yield ()
case ExitCase.Errored(_) | ExitCase.Canceled => Async[F].unit
}

private val avroStream: Stream[F, ConsumerRecord[String, Either[ScheduleError, Option[AvroSchedule]]]] =
config.scheduler.reader.topics.avro.toNel
.fold(Stream.exec(avroLoaderRef.set(true) *> onLoadCompare(ExitCase.Succeeded)))(
TopicLoader.loadAndRun(_, avroConsumerSettings) { exitCase =>
avroLoaderRef.set(true) *> onLoadCompare(exitCase)
}
)

private val jsonStream: Stream[F, ConsumerRecord[String, Either[ScheduleError, Option[JsonSchedule]]]] =
config.scheduler.reader.topics.json.toNel
.fold(Stream.exec(jsonLoadedRef.set(true) *> onLoadCompare(ExitCase.Succeeded)))(
TopicLoader.loadAndRun(_, jsonConsumerSettings) { exitCase =>
jsonLoadedRef.set(true) *> onLoadCompare(exitCase)
}
)

override def messages: Stream[F, Message[Output]] = avroStream.merge(jsonStream).map(p => p.toMessage)

}
}

def observed[F[_] : Monad : Parallel : LoggerFactory : Meter](delegate: EventSubscriber[F]): F[EventSubscriber[F]] = {
given Show[ScheduleError] = {
case _: ScheduleError.InvalidAvroError => "invalid-avro"
case _: ScheduleError.NotJsonError => "not-json"
case _: ScheduleError.InvalidJsonError => "invalid-json"
// case _: ScheduleError.DecodeError => "decode"
// case _: ScheduleError.TransformationError => "transformation"
}

def updateAttributes(source: String) = Attributes(
Attribute("message.type", "update"),
Attribute("message.source", source)
)

def deleteAttributes(source: String, deleteType: String) = Attributes(
Attribute("message.type", "delete"),
Attribute("message.source", source),
Attribute("message.delete.type", deleteType)
)

def errorAttributes(source: String, error: ScheduleError) = Attributes(
Attribute("message.type", "error"),
Attribute("message.source", source),
Attribute("message.error.type", error.show)
)

for {
counter <- Meter[F].counter[Long]("event-subscriber").create
logger <- LoggerFactory[F].create
} yield new EventSubscriber[F] {
override def messages: Stream[F, Message[Either[ScheduleError, Option[ScheduleEvent]]]] =
delegate.messages.evalTapChunk { case Message(key, source, value, metadata) =>
val logCtx = Map("key" -> key, "source" -> source)

value match {
case Right(Some(_)) =>
logger.info(logCtx)(show"Decoded UPDATE for [$key] from $source") &>
counter.inc(updateAttributes(source))
case Right(None) =>
lazy val deleteType = metadata.isExpired.fold("expired", "canceled")
logger.info(logCtx)(show"Decoded DELETE type=[$deleteType] for [$key] from $source") &>
counter.inc(deleteAttributes(source, deleteType))
case Left(error) =>
logger.error(logCtx, error)(show"Error decoding [$key] from $source") &>
counter.inc(errorAttributes(source, error))
}
}
}
}

def live[F[_] : Async : Parallel : LoggerFactory : Meter](config: Config, loaded: Deferred[F, Unit]) =
kafka[F](config, loaded).flatMap(observed)
}
19 changes: 19 additions & 0 deletions scheduler-3/src/main/scala/uk/sky/scheduler/circe/serdes.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package uk.sky.scheduler.circe

import uk.sky.scheduler.kafka.json.JsonSchedule
import io.circe.*
import io.circe.generic.semiauto
import cats.syntax.all.*

given jsonScheduleDecoder: Decoder[JsonSchedule] =
Decoder.forProduct5[JsonSchedule, Long, String, String, Option[String], Option[Map[String, String]]](
"time",
"topic",
"key",
"value",
"headers"
) { (time, topic, key, value, headers) =>
JsonSchedule(time, topic, key, value, headers.combineAll)
}

given scheduleEncoder: Encoder[JsonSchedule] = semiauto.deriveEncoder[JsonSchedule]
26 changes: 26 additions & 0 deletions scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,32 @@ final case class SchedulerConfig(
) derives ConfigReader

final case class ReaderConfig(
@deprecated("Remove in favour of 'topics'")
scheduleTopics: List[String],
topics: TopicConfig,
kafkaBrokers: String
) derives ConfigReader

//object ReaderConfig {
// given readerConfigReader: ConfigReader[ReaderConfig] =
// ConfigReader
// .forProduct3[ReaderConfig, List[String], TopicConfig, String]("scheduleTopics", "topics", "kafkaBrokers")(
// ReaderConfig.apply
// )
// .ensure(
// readerConfig => readerConfig.scheduleTopics.nonEmpty,
// message = _ => "Schedule topics are empty"
// )
//}

final case class TopicConfig(avro: List[String], json: List[String])

object TopicConfig {
given topicConfigReader: ConfigReader[TopicConfig] =
ConfigReader
.forProduct2[TopicConfig, List[String], List[String]]("avro", "json")(TopicConfig.apply)
.ensure(
config => config.avro.nonEmpty || config.json.nonEmpty,
_ => "Both Avro and JSON topics were empty"
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package uk.sky.scheduler.converters

object all extends ConsumerRecordConverter
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package uk.sky.scheduler.converters

import fs2.kafka.ConsumerRecord
import uk.sky.scheduler.domain.ScheduleEvent
import uk.sky.scheduler.error.ScheduleError
import uk.sky.scheduler.kafka.avro.AvroSchedule
import uk.sky.scheduler.kafka.json.JsonSchedule
import uk.sky.scheduler.message.Message

private trait ConsumerRecordConverter {
extension (cr: ConsumerRecord[String, Either[ScheduleError, Option[JsonSchedule | AvroSchedule]]]) {
def toMessage: Message[Either[ScheduleError, Option[ScheduleEvent]]] = ???
}
}

object consumerRecord extends ConsumerRecordConverter
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import org.apache.avro.Schema
enum ScheduleError(val message: String, val cause: Throwable) extends Throwable(message, cause) {
case InvalidAvroError(schema: Schema, error: Throwable)
extends ScheduleError(show"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error)

case NotJsonError(payload: String, error: Throwable) extends ScheduleError(s"'$payload' was not valid JSON", error)

case InvalidJsonError(payload: String, error: Throwable)
extends ScheduleError(s"JSON '$payload' did not conform to Schema", error)

}

object ScheduleError {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package uk.sky.scheduler.kafka.avro

final case class AvroSchedule(
time: Long,
topic: String,
key: Array[Byte],
value: Option[Array[Byte]],
optionalHeaders: Option[Map[String, Array[Byte]]]
) {
val headers: Map[String, Array[Byte]] = optionalHeaders.getOrElse(Map.empty[String, Array[Byte]])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package uk.sky.scheduler.kafka.avro

import cats.effect.Sync
import cats.effect.kernel.Resource
import cats.syntax.all.*
import fs2.kafka.{Deserializer, Serializer, ValueDeserializer, ValueSerializer}
import org.apache.avro.Schema
import uk.sky.scheduler.error.ScheduleError
import vulcan.Codec

given avroScheduleCodec: Codec[AvroSchedule] = Codec.record[AvroSchedule](
name = "ScheduleWithHeaders",
namespace = "com.sky.kms.domain.Schedule"
)(field =>
(
field("time", _.time, doc = "The time to execute the Schedule, in epoch milliseconds.".some),
field("topic", _.topic, doc = "The topic to send the Schedule to.".some),
field("key", _.key, doc = "The key identifying the payload.".some),
field("value", _.value, doc = "The payload to be sent. null indicates a tombstone.".some),
field("headers", _.optionalHeaders, doc = "Optional extra metadata to send with the payload.".some)
).mapN(AvroSchedule.apply)
)

def avroBinaryDeserializer[F[_] : Sync, V : Codec]: Resource[F, ValueDeserializer[F, Either[ScheduleError, V]]] =
Codec[V].schema match {
case Left(error) => Resource.raiseError(error.throwable)
case Right(schema) => Resource.pure(avroBinaryDeserializer(schema))
}

def avroBinaryDeserializer[F[_] : Sync, V : Codec](schema: Schema): ValueDeserializer[F, Either[ScheduleError, V]] =
Deserializer.lift[F, Either[ScheduleError, V]](bytes =>
Sync[F].delay(
Codec
.fromBinary[V](bytes, schema)
.leftMap(e => ScheduleError.InvalidAvroError(schema, e.throwable))
)
)

def avroBinarySerializer[F[_] : Sync, V : Codec]: ValueSerializer[F, V] =
Serializer.lift[F, V](
Codec
.toBinary[V](_)
.leftMap(_.throwable)
.liftTo[F]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package uk.sky.scheduler.kafka.json

final case class JsonSchedule(
time: Long,
topic: String,
key: String,
value: Option[String],
headers: Map[String, String]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package uk.sky.scheduler.kafka.json

import cats.effect.Sync
import cats.syntax.all.*
import io.circe.syntax.*
import fs2.kafka.{Deserializer, Serializer, ValueDeserializer, ValueSerializer}
import io.circe.{parser, Decoder, Encoder}
import uk.sky.scheduler.error.ScheduleError

def jsonDeserializer[F[_] : Sync, T : Decoder]: ValueDeserializer[F, Either[ScheduleError, T]] =
for {
payload <- Deserializer.string[F]
} yield for {
json <- parser.parse(payload).leftMap(ScheduleError.NotJsonError(payload, _))
decoded <- json.as[T].leftMap(ScheduleError.InvalidJsonError(json.noSpaces, _))
} yield decoded

def jsonSerializer[F[_] : Sync, V : Encoder]: ValueSerializer[F, V] =
Serializer.string[F].contramap[V](_.asJson.noSpaces)