Skip to content

Commit

Permalink
#000: Git conflict resolve - Refactor V2 Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
manjudr committed Sep 3, 2024
2 parents 67fd310 + c8b7d48 commit 8d9afa4
Show file tree
Hide file tree
Showing 95 changed files with 2,656 additions and 398 deletions.
11 changes: 5 additions & 6 deletions .github/workflows/build_and_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@ jobs:
target: "transformer-image"
- image: "druid-router"
target: "router-image"
- image: "merged-pipeline"
target: "merged-image"
- image: "unified-pipeline"
target: "unified-image"
- image: "master-data-processor"
target: "master-data-processor-image"
- image: "lakehouse-connector"
target: "lakehouse-connector-image"
steps:
- uses: actions/checkout@v4
- image: "cache-indexer"
target: "cache-indexer-image"
with:
fetch-depth: 0

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

Expand Down Expand Up @@ -97,7 +96,7 @@ jobs:
run: |
cd deploy/terraform/aws
terragrunt init
terragrunt apply -auto-approve -var merged_pipeline_enabled={{ vars.MERGED_PIPELINE || 'true' }} --replace='module.flink.helm_release.flink' \
terragrunt apply -auto-approve -var unified_pipeline_enabled={{ vars.MERGED_PIPELINE || 'true' }} --replace='module.flink.helm_release.flink' \
-var flink_image_tag=${{ github.ref_name }}
azure-deploy:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/upload_artifact.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
- image: "denormalizer"
- image: "transformer"
- image: "druid-router"
- image: "pipeline-merged"
- image: "unified-pipeline"
- image: "master-data-processor"
steps:
- name: Get Tag Name
Expand Down
9 changes: 7 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ FROM --platform=linux/x86_64 maven:3.9.4-eclipse-temurin-11-focal AS build-core
COPY . /app
RUN mvn clean install -DskipTests -f /app/framework/pom.xml
RUN mvn clean install -DskipTests -f /app/dataset-registry/pom.xml
RUN mvn clean install -DskipTests -f /app/transformation-sdk/pom.xml

FROM --platform=linux/x86_64 maven:3.9.4-eclipse-temurin-11-focal AS build-pipeline
COPY --from=build-core /root/.m2 /root/.m2
Expand All @@ -28,9 +29,9 @@ FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as rout
USER flink
COPY --from=build-pipeline /app/pipeline/druid-router/target/druid-router-1.0.0.jar $FLINK_HOME/lib/

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as merged-image
FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as unified-image
USER flink
COPY --from=build-pipeline /app/pipeline/pipeline-merged/target/pipeline-merged-1.0.0.jar $FLINK_HOME/lib/
COPY --from=build-pipeline /app/pipeline/unified-pipeline/target/unified-pipeline-1.0.0.jar $FLINK_HOME/lib/

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as master-data-processor-image
USER flink
Expand All @@ -40,3 +41,7 @@ FROM --platform=linux/x86_64 sanketikahub/flink:1.15.0-scala_2.12-lakehouse as l
USER flink
RUN mkdir $FLINK_HOME/custom-lib
COPY ./pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/custom-lib

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as cache-indexer-image
USER flink
COPY --from=build-pipeline /app/pipeline/cache-indexer/target/cache-indexer-1.0.0.jar $FLINK_HOME/lib
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ object MasterDataProcessorIndexer {
logger.info(s"createDataFile() | START | dataset=${dataset.id} ")
import spark.implicits._
val readWriteConf = ReadWriteConfig(scanCount = config.getInt("redis.scan.count"), maxPipelineSize = config.getInt("redis.max.pipeline.size"))
val redisConfig = new RedisConfig(initialHost = RedisEndpoint(host = dataset.datasetConfig.redisDBHost.get, port = dataset.datasetConfig.redisDBPort.get, dbNum = dataset.datasetConfig.redisDB.get))
val cacheConfig = dataset.datasetConfig.cacheConfig.get
val redisConfig = new RedisConfig(initialHost = RedisEndpoint(host = cacheConfig.redisDBHost.get, port = cacheConfig.redisDBPort.get, dbNum = cacheConfig.redisDB.get))
val ts: Long = new DateTime(DateTimeZone.UTC).withTimeAtStartOfDay().getMillis
val rdd = spark.sparkContext.fromRedisKV("*")(redisConfig = redisConfig, readWriteConfig = readWriteConf).map(
f => CommonUtil.processEvent(f._2, ts)
Expand All @@ -83,9 +84,9 @@ object MasterDataProcessorIndexer {
}

private def getDatasets(): List[Dataset] = {
val datasets: List[Dataset] = DatasetRegistry.getAllDatasets("master-dataset")
val datasets: List[Dataset] = DatasetRegistry.getAllDatasets(Some("master"))
datasets.filter(dataset => {
dataset.datasetConfig.indexData.nonEmpty && dataset.datasetConfig.indexData.get && dataset.status == DatasetStatus.Live
dataset.datasetConfig.indexingConfig.olapStoreEnabled && dataset.status == DatasetStatus.Live
})
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.module.scala.JsonScalaEnumeration
import org.sunbird.obsrv.core.model.SystemConfig
import org.sunbird.obsrv.model.DatasetStatus.DatasetStatus
import org.sunbird.obsrv.model.DatasetStatus.{DatasetStatus, Value}
import org.sunbird.obsrv.model.TransformMode.TransformMode
import org.sunbird.obsrv.model.ValidationMode.ValidationMode

Expand All @@ -25,33 +25,50 @@ object DatasetModels {
case class ValidationConfig(@JsonProperty("validate") validate: Option[Boolean] = Some(true),
@JsonProperty("mode") @JsonScalaEnumeration(classOf[ValidationModeType]) mode: Option[ValidationMode])

case class DenormFieldConfig(@JsonProperty("denorm_key") denormKey: String, @JsonProperty("redis_db") redisDB: Int,
@JsonProperty("denorm_out_field") denormOutField: String)
case class DenormFieldConfig(@JsonProperty("denorm_key") denormKey: Option[String], @JsonProperty("redis_db") redisDB: Int,
@JsonProperty("denorm_out_field") denormOutField: String, @JsonProperty("jsonata_expr") jsonAtaExpr: Option[String])

case class DenormConfig(@JsonProperty("redis_db_host") redisDBHost: String, @JsonProperty("redis_db_port") redisDBPort: Int,
@JsonProperty("denorm_fields") denormFields: List[DenormFieldConfig])

case class RouterConfig(@JsonProperty("topic") topic: String)

case class DatasetConfig(@JsonProperty("data_key") key: String, @JsonProperty("timestamp_key") tsKey: String, @JsonProperty("entry_topic") entryTopic: String,
@JsonProperty("exclude_fields") excludeFields: Option[List[String]] = None, @JsonProperty("redis_db_host") redisDBHost: Option[String] = None,
@JsonProperty("redis_db_port") redisDBPort: Option[Int] = None, @JsonProperty("redis_db") redisDB: Option[Int] = None,
@JsonProperty("index_data") indexData: Option[Boolean] = None, @JsonProperty("timestamp_format") tsFormat: Option[String] = None,
@JsonProperty("dataset_tz") datasetTimezone: Option[String] = None)
case class IndexingConfig(@JsonProperty("olap_store_enabled") olapStoreEnabled: Boolean, @JsonProperty("lakehouse_enabled") lakehouseEnabled: Boolean,
@JsonProperty("cache_enabled") cacheEnabled: Boolean)

case class KeysConfig(@JsonProperty("data_key") dataKey: Option[String], @JsonProperty("partition_key") partitionKey: Option[String],
@JsonProperty("timestamp_key") tsKey: Option[String], @JsonProperty("timestamp_format") tsFormat: Option[String])

case class CacheConfig(@JsonProperty("redis_db_host") redisDBHost: Option[String], @JsonProperty("redis_db_port") redisDBPort: Option[Int],
@JsonProperty("redis_db") redisDB: Option[Int])

case class DatasetConfigV1(@JsonProperty("data_key") key: String, @JsonProperty("timestamp_key") tsKey: String, @JsonProperty("entry_topic") entryTopic: String,
@JsonProperty("exclude_fields") excludeFields: Option[List[String]] = None, @JsonProperty("redis_db_host") redisDBHost: Option[String] = None,
@JsonProperty("redis_db_port") redisDBPort: Option[Int] = None, @JsonProperty("redis_db") redisDB: Option[Int] = None,
@JsonProperty("index_data") indexData: Option[Boolean] = None, @JsonProperty("timestamp_format") tsFormat: Option[String] = None,
@JsonProperty("dataset_tz") datasetTimezone: Option[String] = None)

case class DatasetConfig(@JsonProperty("indexing_config") indexingConfig: IndexingConfig,
@JsonProperty("keys_config") keysConfig: KeysConfig,
@JsonProperty("exclude_fields") excludeFields: Option[List[String]] = None,
@JsonProperty("dataset_tz") datasetTimezone: Option[String] = None,
@JsonProperty("cache_config") cacheConfig: Option[CacheConfig] = None)

case class Dataset(@JsonProperty("id") id: String, @JsonProperty("type") datasetType: String, @JsonProperty("extraction_config") extractionConfig: Option[ExtractionConfig],
@JsonProperty("dedup_config") dedupConfig: Option[DedupConfig], @JsonProperty("validation_config") validationConfig: Option[ValidationConfig],
@JsonProperty("data_schema") jsonSchema: Option[String], @JsonProperty("denorm_config") denormConfig: Option[DenormConfig],
@JsonProperty("router_config") routerConfig: RouterConfig, datasetConfig: DatasetConfig, @JsonProperty("status") @JsonScalaEnumeration(classOf[DatasetStatusType]) status: DatasetStatus,
@JsonProperty("tags") tags: Option[Array[String]] = None, @JsonProperty("data_version") dataVersion: Option[Int] = None)
@JsonProperty("router_config") routerConfig: RouterConfig, datasetConfig: DatasetConfig,
@JsonProperty("status") @JsonScalaEnumeration(classOf[DatasetStatusType]) status: DatasetStatus,
@JsonProperty("entry_topic") entryTopic: String, @JsonProperty("tags") tags: Option[Array[String]] = None,
@JsonProperty("data_version") dataVersion: Option[Int] = None, @JsonProperty("api_version") apiVersion: Option[String] = None)

case class Condition(@JsonProperty("type") `type`: String, @JsonProperty("expr") expr: String)

case class TransformationFunction(@JsonProperty("type") `type`: String, @JsonProperty("condition") condition: Option[Condition], @JsonProperty("expr") expr: String)

case class DatasetTransformation(@JsonProperty("id") id: String, @JsonProperty("dataset_id") datasetId: String,
@JsonProperty("field_key") fieldKey: String, @JsonProperty("transformation_function") transformationFunction: TransformationFunction,
@JsonProperty("status") status: String, @JsonProperty("mode") @JsonScalaEnumeration(classOf[TransformModeType]) mode: Option[TransformMode] = Some(TransformMode.Strict))
@JsonProperty("mode") @JsonScalaEnumeration(classOf[TransformModeType]) mode: Option[TransformMode] = Some(TransformMode.Strict))

case class ConnectorConfig(@JsonProperty("kafkaBrokers") kafkaBrokers: String, @JsonProperty("topic") topic: String, @JsonProperty("type") databaseType: String,
@JsonProperty("connection") connection: Connection, @JsonProperty("tableName") tableName: String, @JsonProperty("databaseName") databaseName: String,
Expand Down Expand Up @@ -94,4 +111,10 @@ class DatasetStatusType extends TypeReference[DatasetStatus.type]
object DatasetStatus extends Enumeration {
type DatasetStatus = Value
val Draft, Publish, Live, Retired, Purged = Value
}

class DatasetTypeType extends TypeReference[DatasetType.type]
object DatasetType extends Enumeration {
type DatasetType = Value
val event, transaction, master = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ object DatasetRegistry {
datasets ++= DatasetRegistryService.readAllDatasets()
lazy private val datasetTransformations: Map[String, List[DatasetTransformation]] = DatasetRegistryService.readAllDatasetTransformations()

def getAllDatasets(datasetType: String): List[Dataset] = {
def getAllDatasets(datasetType: Option[String]): List[Dataset] = {
val datasetList = DatasetRegistryService.readAllDatasets()
datasetList.filter(f => f._2.datasetType.equals(datasetType)).values.toList
if(datasetType.isDefined) {
datasetList.filter(f => f._2.datasetType.equals(datasetType.get)).values.toList
} else {
datasetList.values.toList
}

}

def getDataset(id: String): Option[Dataset] = {
Expand Down Expand Up @@ -47,8 +52,8 @@ object DatasetRegistry {
datasourceList.getOrElse(List())
}

def getDataSetIds(datasetType: String): List[String] = {
datasets.filter(f => f._2.datasetType.equals(datasetType)).keySet.toList
def getDataSetIds(): List[String] = {
datasets.keySet.toList
}

def updateDatasourceRef(datasource: DataSource, datasourceRef: String): Int = {
Expand Down
Loading

0 comments on commit 8d9afa4

Please sign in to comment.