diff --git a/.scalafmt.conf b/.scalafmt.conf index 458cf423..212bff49 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -10,4 +10,15 @@ runner.dialect = scala213source3 spaces.beforeContextBoundColon = Always style = defaultWithAlign -rewrite.scala3.convertToNewSyntax = true # TODO remove when written in Scala3. \ No newline at end of file +rewrite.scala3.convertToNewSyntax = true # TODO remove when written in Scala3. +fileOverride { + "glob:**/scheduler-3/src/main/scala/**" { + runner.dialect = scala3 + } + "glob:**/scheduler-3/target/scala-3.6.2/src_managed/main/**" { + runner.dialect = scala3 + } + "glob:**/scheduler-3/src/test/scala/**" { + runner.dialect = scala3 + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 70945677..4152bc51 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -34,6 +34,13 @@ object Dependencies { lazy val test = Seq(testKit, scalatest) } + object Circe { + private lazy val version = "0.14.10" + + lazy val generic = "io.circe" %% "circe-generic" % version + lazy val parser = "io.circe" %% "circe-parser" % version + } + object Fs2 { private lazy val version = "3.11.0" private lazy val kafkaVersion = "3.6.0" @@ -100,6 +107,10 @@ object Dependencies { lazy val testkit = "org.typelevel" %% "otel4s-oteljava-testkit" % version % Test } + object TopicLoader { + lazy val topicLoader = "uk.sky" %% "fs2-kafka-topic-loader" % "0.1.0" + } + object Vulcan { private lazy val version = "1.11.1" @@ -147,6 +158,7 @@ object Dependencies { scalaTest, scalaTestPlusMockito ) + val scheduler: Seq[ModuleID] = core ++ runtime ++ test val scheduler3: Seq[ModuleID] = Seq( @@ -158,6 +170,8 @@ object Dependencies { Cats.effectTestkitScalatest, Cats.log4cats, Cats.log4catsSlf4j, + Circe.generic, + Circe.parser, Fs2.core, Fs2.kafka, Monocle.core, @@ -171,6 +185,7 @@ object Dependencies { OpenTelemetry.sdkAutoconfigure, Otel4s.java, Otel4s.testkit, - mouse + mouse, + TopicLoader.topicLoader ) } diff --git a/scheduler-3/src/main/resources/application.conf b/scheduler-3/src/main/resources/application.conf index a2eea4e6..40750153 100644 --- a/scheduler-3/src/main/resources/application.conf +++ b/scheduler-3/src/main/resources/application.conf @@ -1,5 +1,9 @@ scheduler.reader { schedule-topics = [${?SCHEDULE_TOPICS}] + topics { + avro = [${?AVRO_SCHEDULE_TOPICS}] + json = [${?JSON_SCHEDULE_TOPICS}] + } kafka-brokers = "localhost:9092" kafka-brokers = ${?KAFKA_BROKERS} } diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala index de1a9f7f..4210b4be 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/EventSubscriber.scala @@ -1,10 +1,141 @@ package uk.sky.scheduler -import fs2.Stream +import cats.effect.Resource.ExitCase +import cats.effect.kernel.Deferred +import cats.effect.kernel.Resource.ExitCase.Succeeded +import cats.effect.{Async, Ref, Resource} +import cats.syntax.all.* +import cats.{Monad, Parallel, Show} +import fs2.{Stream, *} +import fs2.kafka.* +import mouse.all.* +import org.typelevel.log4cats.LoggerFactory +import org.typelevel.otel4s.metrics.Meter +import org.typelevel.otel4s.{Attribute, Attributes} +import uk.sky.fs2.kafka.topicloader.TopicLoader +import uk.sky.scheduler.circe.jsonScheduleDecoder +import uk.sky.scheduler.config.{Config, KafkaConfig} import uk.sky.scheduler.domain.ScheduleEvent import uk.sky.scheduler.error.ScheduleError +import uk.sky.scheduler.kafka.avro.{avroBinaryDeserializer, avroScheduleCodec, AvroSchedule} +import uk.sky.scheduler.kafka.json.{jsonDeserializer, JsonSchedule} import uk.sky.scheduler.message.Message +import uk.sky.scheduler.converters.all.* trait EventSubscriber[F[_]] { def messages: Stream[F, Message[Either[ScheduleError, Option[ScheduleEvent]]]] } + +object EventSubscriber { + private type Output = Either[ScheduleError, Option[ScheduleEvent]] + + def kafka[F[_] : Async : Parallel : LoggerFactory]( + config: Config, + loaded: Deferred[F, Unit] + ): F[EventSubscriber[F]] = { + + val avroConsumerSettings: ConsumerSettings[F, String, Either[ScheduleError, Option[AvroSchedule]]] = { + given Resource[F, Deserializer[F, Either[ScheduleError, Option[AvroSchedule]]]] = + avroBinaryDeserializer[F, AvroSchedule].map(_.option.map(_.sequence)) + + config.kafka.consumerSettings[F, String, Either[ScheduleError, Option[AvroSchedule]]] + } + + val jsonConsumerSettings: ConsumerSettings[F, String, Either[ScheduleError, Option[JsonSchedule]]] = { + given Deserializer[F, Either[ScheduleError, Option[JsonSchedule]]] = + jsonDeserializer[F, JsonSchedule].option.map(_.sequence) + + config.kafka.consumerSettings[F, String, Either[ScheduleError, Option[JsonSchedule]]] + } + + for { + avroLoaderRef <- Ref.of[F, Boolean](false) + jsonLoadedRef <- Ref.of[F, Boolean](false) + } yield new EventSubscriber[F] { + + /** If both topics have finished loading, complete the Deferred to allow Queueing schedules. + */ + private def onLoadCompare(exitCase: ExitCase): F[Unit] = exitCase match { + case Succeeded => + for { + avroLoaded <- avroLoaderRef.get + jsonLoaded <- jsonLoadedRef.get + _ <- Async[F].whenA(avroLoaded && jsonLoaded)(loaded.complete(())) + } yield () + case ExitCase.Errored(_) | ExitCase.Canceled => Async[F].unit + } + + private val avroStream: Stream[F, ConsumerRecord[String, Either[ScheduleError, Option[AvroSchedule]]]] = + config.scheduler.reader.topics.avro.toNel + .fold(Stream.exec(avroLoaderRef.set(true) *> onLoadCompare(ExitCase.Succeeded)))( + TopicLoader.loadAndRun(_, avroConsumerSettings) { exitCase => + avroLoaderRef.set(true) *> onLoadCompare(exitCase) + } + ) + + private val jsonStream: Stream[F, ConsumerRecord[String, Either[ScheduleError, Option[JsonSchedule]]]] = + config.scheduler.reader.topics.json.toNel + .fold(Stream.exec(jsonLoadedRef.set(true) *> onLoadCompare(ExitCase.Succeeded)))( + TopicLoader.loadAndRun(_, jsonConsumerSettings) { exitCase => + jsonLoadedRef.set(true) *> onLoadCompare(exitCase) + } + ) + + override def messages: Stream[F, Message[Output]] = avroStream.merge(jsonStream).map(p => p.toMessage) + + } + } + + def observed[F[_] : Monad : Parallel : LoggerFactory : Meter](delegate: EventSubscriber[F]): F[EventSubscriber[F]] = { + given Show[ScheduleError] = { + case _: ScheduleError.InvalidAvroError => "invalid-avro" + case _: ScheduleError.NotJsonError => "not-json" + case _: ScheduleError.InvalidJsonError => "invalid-json" +// case _: ScheduleError.DecodeError => "decode" +// case _: ScheduleError.TransformationError => "transformation" + } + + def updateAttributes(source: String) = Attributes( + Attribute("message.type", "update"), + Attribute("message.source", source) + ) + + def deleteAttributes(source: String, deleteType: String) = Attributes( + Attribute("message.type", "delete"), + Attribute("message.source", source), + Attribute("message.delete.type", deleteType) + ) + + def errorAttributes(source: String, error: ScheduleError) = Attributes( + Attribute("message.type", "error"), + Attribute("message.source", source), + Attribute("message.error.type", error.show) + ) + + for { + counter <- Meter[F].counter[Long]("event-subscriber").create + logger <- LoggerFactory[F].create + } yield new EventSubscriber[F] { + override def messages: Stream[F, Message[Either[ScheduleError, Option[ScheduleEvent]]]] = + delegate.messages.evalTapChunk { case Message(key, source, value, metadata) => + val logCtx = Map("key" -> key, "source" -> source) + + value match { + case Right(Some(_)) => + logger.info(logCtx)(show"Decoded UPDATE for [$key] from $source") &> + counter.inc(updateAttributes(source)) + case Right(None) => + lazy val deleteType = metadata.isExpired.fold("expired", "canceled") + logger.info(logCtx)(show"Decoded DELETE type=[$deleteType] for [$key] from $source") &> + counter.inc(deleteAttributes(source, deleteType)) + case Left(error) => + logger.error(logCtx, error)(show"Error decoding [$key] from $source") &> + counter.inc(errorAttributes(source, error)) + } + } + } + } + + def live[F[_] : Async : Parallel : LoggerFactory : Meter](config: Config, loaded: Deferred[F, Unit]) = + kafka[F](config, loaded).flatMap(observed) +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/circe/serdes.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/circe/serdes.scala new file mode 100644 index 00000000..ef6196c8 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/circe/serdes.scala @@ -0,0 +1,19 @@ +package uk.sky.scheduler.circe + +import uk.sky.scheduler.kafka.json.JsonSchedule +import io.circe.* +import io.circe.generic.semiauto +import cats.syntax.all.* + +given jsonScheduleDecoder: Decoder[JsonSchedule] = + Decoder.forProduct5[JsonSchedule, Long, String, String, Option[String], Option[Map[String, String]]]( + "time", + "topic", + "key", + "value", + "headers" + ) { (time, topic, key, value, headers) => + JsonSchedule(time, topic, key, value, headers.combineAll) + } + +given scheduleEncoder: Encoder[JsonSchedule] = semiauto.deriveEncoder[JsonSchedule] diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala index 20fd6b30..4d2bca7b 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/config/Config.scala @@ -59,6 +59,32 @@ final case class SchedulerConfig( ) derives ConfigReader final case class ReaderConfig( + @deprecated("Remove in favour of 'topics'") scheduleTopics: List[String], + topics: TopicConfig, kafkaBrokers: String ) derives ConfigReader + +//object ReaderConfig { +// given readerConfigReader: ConfigReader[ReaderConfig] = +// ConfigReader +// .forProduct3[ReaderConfig, List[String], TopicConfig, String]("scheduleTopics", "topics", "kafkaBrokers")( +// ReaderConfig.apply +// ) +// .ensure( +// readerConfig => readerConfig.scheduleTopics.nonEmpty, +// message = _ => "Schedule topics are empty" +// ) +//} + +final case class TopicConfig(avro: List[String], json: List[String]) + +object TopicConfig { + given topicConfigReader: ConfigReader[TopicConfig] = + ConfigReader + .forProduct2[TopicConfig, List[String], List[String]]("avro", "json")(TopicConfig.apply) + .ensure( + config => config.avro.nonEmpty || config.json.nonEmpty, + _ => "Both Avro and JSON topics were empty" + ) +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/converters/all.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/converters/all.scala new file mode 100644 index 00000000..5f4001fe --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/converters/all.scala @@ -0,0 +1,3 @@ +package uk.sky.scheduler.converters + +object all extends ConsumerRecordConverter diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/converters/consumerRecord.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/converters/consumerRecord.scala new file mode 100644 index 00000000..6f54d212 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/converters/consumerRecord.scala @@ -0,0 +1,16 @@ +package uk.sky.scheduler.converters + +import fs2.kafka.ConsumerRecord +import uk.sky.scheduler.domain.ScheduleEvent +import uk.sky.scheduler.error.ScheduleError +import uk.sky.scheduler.kafka.avro.AvroSchedule +import uk.sky.scheduler.kafka.json.JsonSchedule +import uk.sky.scheduler.message.Message + +private trait ConsumerRecordConverter { + extension (cr: ConsumerRecord[String, Either[ScheduleError, Option[JsonSchedule | AvroSchedule]]]) { + def toMessage: Message[Either[ScheduleError, Option[ScheduleEvent]]] = ??? + } +} + +object consumerRecord extends ConsumerRecordConverter diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala index dc499c17..01c60f43 100644 --- a/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/error/ScheduleError.scala @@ -7,6 +7,12 @@ import org.apache.avro.Schema enum ScheduleError(val message: String, val cause: Throwable) extends Throwable(message, cause) { case InvalidAvroError(schema: Schema, error: Throwable) extends ScheduleError(show"Avro message did not conform to Schema: ${schema.getFullName}: $schema", error) + + case NotJsonError(payload: String, error: Throwable) extends ScheduleError(s"'$payload' was not valid JSON", error) + + case InvalidJsonError(payload: String, error: Throwable) + extends ScheduleError(s"JSON '$payload' did not conform to Schema", error) + } object ScheduleError { diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/avro/AvroSchedule.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/avro/AvroSchedule.scala new file mode 100644 index 00000000..2153ae84 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/avro/AvroSchedule.scala @@ -0,0 +1,11 @@ +package uk.sky.scheduler.kafka.avro + +final case class AvroSchedule( + time: Long, + topic: String, + key: Array[Byte], + value: Option[Array[Byte]], + optionalHeaders: Option[Map[String, Array[Byte]]] +) { + val headers: Map[String, Array[Byte]] = optionalHeaders.getOrElse(Map.empty[String, Array[Byte]]) +} diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/avro/serdes.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/avro/serdes.scala new file mode 100644 index 00000000..dd684974 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/avro/serdes.scala @@ -0,0 +1,45 @@ +package uk.sky.scheduler.kafka.avro + +import cats.effect.Sync +import cats.effect.kernel.Resource +import cats.syntax.all.* +import fs2.kafka.{Deserializer, Serializer, ValueDeserializer, ValueSerializer} +import org.apache.avro.Schema +import uk.sky.scheduler.error.ScheduleError +import vulcan.Codec + +given avroScheduleCodec: Codec[AvroSchedule] = Codec.record[AvroSchedule]( + name = "ScheduleWithHeaders", + namespace = "com.sky.kms.domain.Schedule" +)(field => + ( + field("time", _.time, doc = "The time to execute the Schedule, in epoch milliseconds.".some), + field("topic", _.topic, doc = "The topic to send the Schedule to.".some), + field("key", _.key, doc = "The key identifying the payload.".some), + field("value", _.value, doc = "The payload to be sent. null indicates a tombstone.".some), + field("headers", _.optionalHeaders, doc = "Optional extra metadata to send with the payload.".some) + ).mapN(AvroSchedule.apply) +) + +def avroBinaryDeserializer[F[_] : Sync, V : Codec]: Resource[F, ValueDeserializer[F, Either[ScheduleError, V]]] = + Codec[V].schema match { + case Left(error) => Resource.raiseError(error.throwable) + case Right(schema) => Resource.pure(avroBinaryDeserializer(schema)) + } + +def avroBinaryDeserializer[F[_] : Sync, V : Codec](schema: Schema): ValueDeserializer[F, Either[ScheduleError, V]] = + Deserializer.lift[F, Either[ScheduleError, V]](bytes => + Sync[F].delay( + Codec + .fromBinary[V](bytes, schema) + .leftMap(e => ScheduleError.InvalidAvroError(schema, e.throwable)) + ) + ) + +def avroBinarySerializer[F[_] : Sync, V : Codec]: ValueSerializer[F, V] = + Serializer.lift[F, V]( + Codec + .toBinary[V](_) + .leftMap(_.throwable) + .liftTo[F] + ) diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/json/JsonSchedule.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/json/JsonSchedule.scala new file mode 100644 index 00000000..ea73b012 --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/json/JsonSchedule.scala @@ -0,0 +1,9 @@ +package uk.sky.scheduler.kafka.json + +final case class JsonSchedule( + time: Long, + topic: String, + key: String, + value: Option[String], + headers: Map[String, String] +) diff --git a/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/json/serdes.scala b/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/json/serdes.scala new file mode 100644 index 00000000..cd7a220d --- /dev/null +++ b/scheduler-3/src/main/scala/uk/sky/scheduler/kafka/json/serdes.scala @@ -0,0 +1,19 @@ +package uk.sky.scheduler.kafka.json + +import cats.effect.Sync +import cats.syntax.all.* +import io.circe.syntax.* +import fs2.kafka.{Deserializer, Serializer, ValueDeserializer, ValueSerializer} +import io.circe.{parser, Decoder, Encoder} +import uk.sky.scheduler.error.ScheduleError + +def jsonDeserializer[F[_] : Sync, T : Decoder]: ValueDeserializer[F, Either[ScheduleError, T]] = + for { + payload <- Deserializer.string[F] + } yield for { + json <- parser.parse(payload).leftMap(ScheduleError.NotJsonError(payload, _)) + decoded <- json.as[T].leftMap(ScheduleError.InvalidJsonError(json.noSpaces, _)) + } yield decoded + +def jsonSerializer[F[_] : Sync, V : Encoder]: ValueSerializer[F, V] = + Serializer.string[F].contramap[V](_.asJson.noSpaces)