Skip to content

Commit

Permalink
Obsrv V2 Release (#19)
Browse files Browse the repository at this point in the history
* #OBS-I182: Fix the issue with cache indexer

* #OBS-I182: Cache Indexer fix |Removed the kafka-client and casting the number to long value

* #OBS-I182: Cache Indexer fix | Removing the obsrv_meta information before indexing into cache

---------

Co-authored-by: Santhosh Vasabhaktula <[email protected]>
  • Loading branch information
manjudr and SanthoshVasabhaktula authored Sep 3, 2024
1 parent 49043d7 commit c8b7d48
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ trait SystemEventHandler {
}

private def getTime(timespans: Map[String, AnyRef], producer: Producer): Option[Long] = {
timespans.get(producer.toString).map(f => f.asInstanceOf[Long])
timespans.get(producer.toString).map(f => f.asInstanceOf[Number].longValue())
}

private def getStat(obsrvMeta: Map[String, AnyRef], stat: Stats): Option[Long] = {
obsrvMeta.get(stat.toString).map(f => f.asInstanceOf[Long])
obsrvMeta.get(stat.toString).map(f => f.asInstanceOf[Number].longValue())
}

def getError(error: ErrorConstants.Error, producer: Producer, functionalError: FunctionalError): Option[ErrorLog] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,40 @@ class MapDeserializationSchema extends KafkaRecordDeserializationSchema[mutable.

}

class TopicDeserializationSchema extends KafkaRecordDeserializationSchema[mutable.Map[String, AnyRef]] {

private val serialVersionUID = -3224825136576915426L

override def getProducedType: TypeInformation[mutable.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[mutable.Map[String, AnyRef]])

override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[mutable.Map[String, AnyRef]]): Unit = {
val msg = try {
val event = JSONUtil.deserialize[Map[String, AnyRef]](record.value())
mutable.Map[String, AnyRef](
"dataset" -> record.topic(),
"event" -> event
)
} catch {
case _: Exception =>
mutable.Map[String, AnyRef](Constants.INVALID_JSON -> new String(record.value, "UTF-8"))
}
initObsrvMeta(msg, record)
out.collect(msg)
}

private def initObsrvMeta(msg: mutable.Map[String, AnyRef], record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = {
if (!msg.contains("obsrv_meta")) {
msg.put("obsrv_meta", Map(
"syncts" -> record.timestamp(),
"processingStartTime" -> System.currentTimeMillis(),
"flags" -> Map(),
"timespans" -> Map(),
"error" -> Map()
))
}
}
}

class StringDeserializationSchema extends KafkaRecordDeserializationSchema[String] {

private val serialVersionUID = -3224825136576915426L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ abstract class BaseStreamTask[T] extends BaseStreamTaskSink[T] {
.rebalance()
}

def getTopicMapDataStream(env: StreamExecutionEnvironment, config: BaseJobConfig[T], kafkaTopics: List[String],
consumerSourceName: String, kafkaConnector: FlinkKafkaConnector): DataStream[mutable.Map[String, AnyRef]] = {
env.fromSource(kafkaConnector.kafkaTopicMapSource(kafkaTopics), WatermarkStrategy.noWatermarks[mutable.Map[String, AnyRef]](), consumerSourceName)
.uid(consumerSourceName).setParallelism(config.kafkaConsumerParallelism)
.rebalance()
}

def getStringDataStream(env: StreamExecutionEnvironment, config: BaseJobConfig[T], kafkaConnector: FlinkKafkaConnector): DataStream[String] = {
env.fromSource(kafkaConnector.kafkaStringSource(config.inputTopic()), WatermarkStrategy.noWatermarks[String](), config.inputConsumer())
.uid(config.inputConsumer()).setParallelism(config.kafkaConsumerParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ class FlinkKafkaConnector(config: BaseJobConfig[_]) extends Serializable {
.build()
}

def kafkaTopicMapSource(kafkaTopics: List[String]): KafkaSource[mutable.Map[String, AnyRef]] = {
KafkaSource.builder[mutable.Map[String, AnyRef]]()
.setTopics(kafkaTopics.asJava)
.setDeserializer(new TopicDeserializationSchema)
.setProperties(config.kafkaConsumerProperties())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build()
}

def kafkaMapDynamicSink(): KafkaSink[mutable.Map[String, AnyRef]] = {
KafkaSink.builder[mutable.Map[String, AnyRef]]()
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
Expand Down
12 changes: 0 additions & 12 deletions pipeline/cache-indexer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,12 @@
<groupId>org.sunbird.obsrv</groupId>
<artifactId>dataset-registry</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_${scala.maj.version}</artifactId>
<version>4.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.maj.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class CacheIndexerStreamTask(config: CacheIndexerConfig, kafkaConnector: FlinkKa

val datasets = DatasetRegistry.getAllDatasets(Some(DatasetType.master.toString))
val datasetIds = datasets.map(f => f.id)
val dataStream = getMapDataStream(env, config, datasetIds, config.kafkaConsumerProperties(), consumerSourceName = s"cache-indexer-consumer", kafkaConnector)
val dataStream = getTopicMapDataStream(env, config, datasetIds, consumerSourceName = s"cache-indexer-consumer", kafkaConnector)
processStream(dataStream)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package org.sunbird.obsrv.util

import org.json4s.native.JsonMethods._
import org.json4s.{JNothing, JValue}
import org.json4s.{JField, JNothing, JValue}
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.core.cache.RedisConnect
import org.sunbird.obsrv.core.model.Constants.OBSRV_META
import org.sunbird.obsrv.model.DatasetModels.Dataset
import org.sunbird.obsrv.pipeline.task.CacheIndexerConfig
import redis.clients.jedis.Jedis
Expand Down Expand Up @@ -37,7 +38,11 @@ class MasterDataCache(val config: CacheIndexerConfig) {
def process(dataset: Dataset, key: String, event: JValue): (Int, Int) = {
val jedis = this.datasetPipelineMap(dataset.id)
val dataFromCache = getDataFromCache(dataset, key, jedis)
updateCache(dataset, dataFromCache, key, event, jedis)
val updatedEvent = event.removeField {
case JField(OBSRV_META, _) => true
case _ => false
}
updateCache(dataset, dataFromCache, key, updatedEvent, jedis)
(if (dataFromCache == null) 1 else 0, if (dataFromCache == null) 0 else 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package org.sunbird.obsrv.fixture

object EventFixture {

val VALID_BATCH_EVENT_D3_INSERT = """{"dataset":"dataset3","event":{"code":"HYUN-CRE-D6","manufacturer":"Hyundai","model":"Creta","variant":"SX(O)","modelYear":"2023","price":"2200000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"[email protected]","locationId":"KUN12345"}}}"""
val VALID_BATCH_EVENT_D3_INSERT_2 = """{"dataset":"dataset3","event":{"code":"HYUN-TUC-D6","manufacturer":"Hyundai","model":"Tucson","variant":"Signature","modelYear":"2023","price":"4000000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"[email protected]","locationId":"KUN134567"}}}"""
val VALID_BATCH_EVENT_D3_UPDATE = """{"dataset":"dataset3","event":{"code":"HYUN-CRE-D6","dealer":{"email":"[email protected]","locationId":"KUN12345"},"safety":"3 Star (Global NCAP)","seatingCapacity":5}}"""
val VALID_BATCH_EVENT_D4 = """{"dataset":"dataset4","event":{"code":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}}"""
val INVALID_BATCH_EVENT_D4 = """{"dataset":"dataset4","event":{"code1":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}}"""
val VALID_BATCH_EVENT_D3_INSERT = """{"code":"HYUN-CRE-D6","manufacturer":"Hyundai","model":"Creta","variant":"SX(O)","modelYear":"2023","price":"2200000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"[email protected]","locationId":"KUN12345"}}"""
val VALID_BATCH_EVENT_D3_INSERT_2 = """{"code":"HYUN-TUC-D6","manufacturer":"Hyundai","model":"Tucson","variant":"Signature","modelYear":"2023","price":"4000000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"[email protected]","locationId":"KUN134567"}}"""
val VALID_BATCH_EVENT_D3_UPDATE = """{"code":"HYUN-CRE-D6","dealer":{"email":"[email protected]","locationId":"KUN12345"},"safety":"3 Star (Global NCAP)","seatingCapacity":5}"""
val VALID_BATCH_EVENT_D4 = """{"code":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}"""
val INVALID_BATCH_EVENT_D4 = """{"code1":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}"""
}

0 comments on commit c8b7d48

Please sign in to comment.