From 67fd31027e392808f1b5b1879239e2b557333ca5 Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Tue, 20 Aug 2024 12:49:40 +0530 Subject: [PATCH] 1.0.6.1-GA (#18) * feat: add connector config and connector stats update functions * Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs * feat: added descriptions for default configurations * feat: added descriptions for default configurations * feat: modified kafka connector input topic * feat: obsrv setup instructions * feat: revisiting open source features * feat: masterdata processor job config * Build deploy v2 (#19) * #0 - Refactor Dockerfile and Github actions workflow --------- Co-authored-by: Santhosh Vasabhaktula Co-authored-by: ManojCKrishna * Update DatasetModels.scala * Release 1.3.0 into Main branch (#34) * testing new images * testing new images * testing new images * testing new images * testing new images * build new image with bug fixes * update dockerfile * update dockerfile * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * #0 fix: add individual extraction * Issue #0 fix: upgrade ubuntu packages for vulnerabilities * #0 fix: update github actions release condition --------- Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit * Update DatasetModels.scala * Issue #2 feat: Remove kafka connector code * feat: add function to get all datasets * Release 1.3.1 into Main (#43) * testing new images * testing new images * testing new images * testing new images * testing new images * build new image with bug fixes * update dockerfile * update dockerfile * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * feat: update all failed, invalid and duplicate topic names * feat: update kafka topic names in test cases * #0 fix: add individual extraction * feat: update failed event * Update ErrorConstants.scala * feat: update failed event * Issue #0 fix: upgrade ubuntu packages for vulnerabilities * feat: add exception handling for json deserialization * Update BaseProcessFunction.scala * Update BaseProcessFunction.scala * feat: update batch failed event generation * Update ExtractionFunction.scala * feat: update invalid json exception handling * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 fix: remove cloning object * Issue #46 feat: update batch failed event * #0 fix: update github actions release condition * Issue #46 feat: add error reasons * Issue #46 feat: add exception stack trace * Issue #46 feat: add exception stack trace * Release 1.3.1 Changes (#42) * Dataset enhancements (#38) * feat: add connector config and connector stats update functions * Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs * Update DatasetModels.scala * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * #0 fix: add individual extraction --------- Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit * #0000 [SV] - Fallback to local redis instance if embedded redis is not starting * Update DatasetModels.scala * #0000 - refactor the denormalization logic 1. Do not fail the denormalization if the denorm key is missing 2. Add clear message whether the denorm is sucessful or failed or partially successful 3. Handle denorm for both text and number fields * #0000 - refactor: 1. Created a enum for dataset status and ignore events if the dataset is not in Live status 2. Created a outputtag for denorm failed stats 3. Parse event validation failed messages into a case class * #0000 - refactor: 1. Updated the DruidRouter job to publish data to router topics dynamically 2. Updated framework to created dynamicKafkaSink object * #0000 - mega refactoring: 1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres 2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures 3. Refactored serde - merged map and string serialization into one function and parameterized the function 4. Moved failed events sinking into a common base class 5. Master dataset processor can now do denormalization with another master dataset as well * #0000 - mega refactoring: 1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres 2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures 3. Refactored serde - merged map and string serialization into one function and parameterized the function 4. Moved failed events sinking into a common base class 5. Master dataset processor can now do denormalization with another master dataset as well * #0000 - mega refactoring: 1. Added validation to check if the event has a timestamp key and it is not blank nor invalid 2. Added timezone handling to store the data in druid in the TZ specified by the dataset * #0000 - minor refactoring: Updated DatasetRegistry.getDatasetSourceConfig to getAllDatasetSourceConfig * #0000 - mega refactoring: Refactored logs, error messages and metrics * #0000 - mega refactoring: Fix unit tests * #0000 - refactoring: 1. Introduced transformation mode to enable lenient transformations 2. Proper exception handling for transformer job * #0000 - refactoring: Fix test cases and code * #0000 - refactoring: upgrade embedded redis to work with macos sonoma m2 * #0000 - refactoring: Denormalizer test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Router test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Validator test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Framework test cases and bug fixes * #0000 - refactoring: kafka connector test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: improve code coverage and fix bugs * #0000 - refactoring: improve code coverage and fix bugs --- Now the code coverage is 100% * #0000 - refactoring: organize imports * #0000 - refactoring: 1. transformer test cases and bug fixes - code coverage is 100% * #0000 - refactoring: test cases and bug fixes --------- Co-authored-by: shiva-rakshith Co-authored-by: Aniket Sakinala Co-authored-by: Manjunath Davanam Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit Co-authored-by: Anand Parthasarathy * #000:feat: Removed the provided scope of the kafka-client in the framework (#40) * #0000 - feat: Add dataset-type to system events (#41) * #0000 - feat: Add dataset-type to system events * #0000 - feat: Modify tests for dataset-type in system events * #0000 - feat: Remove unused getDatasetType function * #0000 - feat: Remove unused pom test dependencies * #0000 - feat: Remove unused pom test dependencies --------- Co-authored-by: Santhosh Co-authored-by: shiva-rakshith Co-authored-by: Aniket Sakinala Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit Co-authored-by: Anand Parthasarathy * Main conflicts fixes (#44) * feat: add connector config and connector stats update functions * Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs * Update DatasetModels.scala * Release 1.3.0 into Main branch (#34) * testing new images * testing new images * testing new images * testing new images * testing new images * build new image with bug fixes * update dockerfile * update dockerfile * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * #0 fix: add individual extraction * Issue #0 fix: upgrade ubuntu packages for vulnerabilities * #0 fix: update github actions release condition --------- Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit * Update DatasetModels.scala * Issue #2 feat: Remove kafka connector code * feat: add function to get all datasets * #000:feat: Resolve conflicts --------- Co-authored-by: shiva-rakshith Co-authored-by: Aniket Sakinala Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit Co-authored-by: Santhosh Co-authored-by: Anand Parthasarathy Co-authored-by: Ravi Mula --------- Co-authored-by: ManojKrishnaChintaluri Co-authored-by: shiva-rakshith Co-authored-by: Manjunath Davanam Co-authored-by: Sowmya N Dixit Co-authored-by: Santhosh Co-authored-by: Aniket Sakinala Co-authored-by: Anand Parthasarathy Co-authored-by: Ravi Mula * update workflow file to skip tests (#45) * Release 1.3.1 into Main (#49) * testing new images * testing new images * testing new images * testing new images * testing new images * build new image with bug fixes * update dockerfile * update dockerfile * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * feat: update all failed, invalid and duplicate topic names * feat: update kafka topic names in test cases * #0 fix: add individual extraction * feat: update failed event * Update ErrorConstants.scala * feat: update failed event * Issue #0 fix: upgrade ubuntu packages for vulnerabilities * feat: add exception handling for json deserialization * Update BaseProcessFunction.scala * Update BaseProcessFunction.scala * feat: update batch failed event generation * Update ExtractionFunction.scala * feat: update invalid json exception handling * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 fix: remove cloning object * Issue #46 feat: update batch failed event * #0 fix: update github actions release condition * Issue #46 feat: add error reasons * Issue #46 feat: add exception stack trace * Issue #46 feat: add exception stack trace * Release 1.3.1 Changes (#42) * Dataset enhancements (#38) * feat: add connector config and connector stats update functions * Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs * Update DatasetModels.scala * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * #0 fix: add individual extraction --------- Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit * #0000 [SV] - Fallback to local redis instance if embedded redis is not starting * Update DatasetModels.scala * #0000 - refactor the denormalization logic 1. Do not fail the denormalization if the denorm key is missing 2. Add clear message whether the denorm is sucessful or failed or partially successful 3. Handle denorm for both text and number fields * #0000 - refactor: 1. Created a enum for dataset status and ignore events if the dataset is not in Live status 2. Created a outputtag for denorm failed stats 3. Parse event validation failed messages into a case class * #0000 - refactor: 1. Updated the DruidRouter job to publish data to router topics dynamically 2. Updated framework to created dynamicKafkaSink object * #0000 - mega refactoring: 1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres 2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures 3. Refactored serde - merged map and string serialization into one function and parameterized the function 4. Moved failed events sinking into a common base class 5. Master dataset processor can now do denormalization with another master dataset as well * #0000 - mega refactoring: 1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres 2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures 3. Refactored serde - merged map and string serialization into one function and parameterized the function 4. Moved failed events sinking into a common base class 5. Master dataset processor can now do denormalization with another master dataset as well * #0000 - mega refactoring: 1. Added validation to check if the event has a timestamp key and it is not blank nor invalid 2. Added timezone handling to store the data in druid in the TZ specified by the dataset * #0000 - minor refactoring: Updated DatasetRegistry.getDatasetSourceConfig to getAllDatasetSourceConfig * #0000 - mega refactoring: Refactored logs, error messages and metrics * #0000 - mega refactoring: Fix unit tests * #0000 - refactoring: 1. Introduced transformation mode to enable lenient transformations 2. Proper exception handling for transformer job * #0000 - refactoring: Fix test cases and code * #0000 - refactoring: upgrade embedded redis to work with macos sonoma m2 * #0000 - refactoring: Denormalizer test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Router test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Validator test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Framework test cases and bug fixes * #0000 - refactoring: kafka connector test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: improve code coverage and fix bugs * #0000 - refactoring: improve code coverage and fix bugs --- Now the code coverage is 100% * #0000 - refactoring: organize imports * #0000 - refactoring: 1. transformer test cases and bug fixes - code coverage is 100% * #0000 - refactoring: test cases and bug fixes --------- Co-authored-by: shiva-rakshith Co-authored-by: Aniket Sakinala Co-authored-by: Manjunath Davanam Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit Co-authored-by: Anand Parthasarathy * #000:feat: Removed the provided scope of the kafka-client in the framework (#40) * #0000 - feat: Add dataset-type to system events (#41) * #0000 - feat: Add dataset-type to system events * #0000 - feat: Modify tests for dataset-type in system events * #0000 - feat: Remove unused getDatasetType function * #0000 - feat: Remove unused pom test dependencies * #0000 - feat: Remove unused pom test dependencies --------- Co-authored-by: Santhosh Co-authored-by: shiva-rakshith Co-authored-by: Aniket Sakinala Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit Co-authored-by: Anand Parthasarathy * Main conflicts fixes (#44) * feat: add connector config and connector stats update functions * Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs * Update DatasetModels.scala * Release 1.3.0 into Main branch (#34) * testing new images * testing new images * testing new images * testing new images * testing new images * build new image with bug fixes * update dockerfile * update dockerfile * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * #0 fix: add individual extraction * Issue #0 fix: upgrade ubuntu packages for vulnerabilities * #0 fix: update github actions release condition --------- Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit * Update DatasetModels.scala * Issue #2 feat: Remove kafka connector code * feat: add function to get all datasets * #000:feat: Resolve conflicts --------- Co-authored-by: shiva-rakshith Co-authored-by: Aniket Sakinala Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit Co-authored-by: Santhosh Co-authored-by: Anand Parthasarathy Co-authored-by: Ravi Mula * #0000 - fix: Fix null dataset_type in DruidRouterFunction (#48) --------- Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: shiva-rakshith Co-authored-by: Sowmya N Dixit Co-authored-by: Santhosh Co-authored-by: Aniket Sakinala Co-authored-by: Anand Parthasarathy Co-authored-by: Ravi Mula * Develop to Release-1.0.0-GA (#52) (#53) * testing new images * testing new images * testing new images * testing new images * testing new images * build new image with bug fixes * update dockerfile * update dockerfile * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * feat: update all failed, invalid and duplicate topic names * feat: update kafka topic names in test cases * #0 fix: add individual extraction * feat: update failed event * Update ErrorConstants.scala * feat: update failed event * Issue #0 fix: upgrade ubuntu packages for vulnerabilities * feat: add exception handling for json deserialization * Update BaseProcessFunction.scala * Update BaseProcessFunction.scala * feat: update batch failed event generation * Update ExtractionFunction.scala * feat: update invalid json exception handling * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 fix: remove cloning object * Issue #46 feat: update batch failed event * #0 fix: update github actions release condition * Issue #46 feat: add error reasons * Issue #46 feat: add exception stack trace * Issue #46 feat: add exception stack trace * Dataset enhancements (#38) * feat: add connector config and connector stats update functions * Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs * Update DatasetModels.scala * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * #0 fix: add individual extraction --------- * #0000 [SV] - Fallback to local redis instance if embedded redis is not starting * Update DatasetModels.scala * #0000 - refactor the denormalization logic 1. Do not fail the denormalization if the denorm key is missing 2. Add clear message whether the denorm is sucessful or failed or partially successful 3. Handle denorm for both text and number fields * #0000 - refactor: 1. Created a enum for dataset status and ignore events if the dataset is not in Live status 2. Created a outputtag for denorm failed stats 3. Parse event validation failed messages into a case class * #0000 - refactor: 1. Updated the DruidRouter job to publish data to router topics dynamically 2. Updated framework to created dynamicKafkaSink object * #0000 - mega refactoring: 1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres 2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures 3. Refactored serde - merged map and string serialization into one function and parameterized the function 4. Moved failed events sinking into a common base class 5. Master dataset processor can now do denormalization with another master dataset as well * #0000 - mega refactoring: 1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres 2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures 3. Refactored serde - merged map and string serialization into one function and parameterized the function 4. Moved failed events sinking into a common base class 5. Master dataset processor can now do denormalization with another master dataset as well * #0000 - mega refactoring: 1. Added validation to check if the event has a timestamp key and it is not blank nor invalid 2. Added timezone handling to store the data in druid in the TZ specified by the dataset * #0000 - minor refactoring: Updated DatasetRegistry.getDatasetSourceConfig to getAllDatasetSourceConfig * #0000 - mega refactoring: Refactored logs, error messages and metrics * #0000 - mega refactoring: Fix unit tests * #0000 - refactoring: 1. Introduced transformation mode to enable lenient transformations 2. Proper exception handling for transformer job * #0000 - refactoring: Fix test cases and code * #0000 - refactoring: upgrade embedded redis to work with macos sonoma m2 * #0000 - refactoring: Denormalizer test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Router test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Validator test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Framework test cases and bug fixes * #0000 - refactoring: kafka connector test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: improve code coverage and fix bugs * #0000 - refactoring: improve code coverage and fix bugs --- Now the code coverage is 100% * #0000 - refactoring: organize imports * #0000 - refactoring: 1. transformer test cases and bug fixes - code coverage is 100% * #0000 - refactoring: test cases and bug fixes --------- * #000:feat: Removed the provided scope of the kafka-client in the framework (#40) * #0000 - feat: Add dataset-type to system events (#41) * #0000 - feat: Add dataset-type to system events * #0000 - feat: Modify tests for dataset-type in system events * #0000 - feat: Remove unused getDatasetType function * #0000 - feat: Remove unused pom test dependencies * #0000 - feat: Remove unused pom test dependencies * #67 feat: query system configurations from meta store * #67 fix: Refactor system configuration retrieval and update dynamic router function * #67 fix: update system config according to review * #67 fix: update test cases for system config * #67 fix: update default values in test cases * #67 fix: add get all system settings method and update test cases * #67 fix: add test case for covering exception case * #67 fix: fix data types in test cases * #67 fix: Refactor event indexing in DynamicRouterFunction * Issue #67 refactor: SystemConfig read from DB implementation * #226 fix: update test cases according to the refactor --------- Co-authored-by: Manjunath Davanam Co-authored-by: ManojKrishnaChintaluri Co-authored-by: shiva-rakshith Co-authored-by: Sowmya N Dixit Co-authored-by: Santhosh Co-authored-by: Aniket Sakinala Co-authored-by: Anand Parthasarathy * Develop to 1.0.1-GA (#59) (#60) * testing new images * testing new images * testing new images * testing new images * testing new images * build new image with bug fixes * update dockerfile * update dockerfile * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * feat: update all failed, invalid and duplicate topic names * feat: update kafka topic names in test cases * #0 fix: add individual extraction * feat: update failed event * Update ErrorConstants.scala * feat: update failed event * Issue #0 fix: upgrade ubuntu packages for vulnerabilities * feat: add exception handling for json deserialization * Update BaseProcessFunction.scala * Update BaseProcessFunction.scala * feat: update batch failed event generation * Update ExtractionFunction.scala * feat: update invalid json exception handling * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 fix: remove cloning object * Issue #46 feat: update batch failed event * #0 fix: update github actions release condition * Issue #46 feat: add error reasons * Issue #46 feat: add exception stack trace * Issue #46 feat: add exception stack trace * Dataset enhancements (#38) * feat: add connector config and connector stats update functions * Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs * Update DatasetModels.scala * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * #0 fix: add individual extraction --------- * #0000 [SV] - Fallback to local redis instance if embedded redis is not starting * Update DatasetModels.scala * #0000 - refactor the denormalization logic 1. Do not fail the denormalization if the denorm key is missing 2. Add clear message whether the denorm is sucessful or failed or partially successful 3. Handle denorm for both text and number fields * #0000 - refactor: 1. Created a enum for dataset status and ignore events if the dataset is not in Live status 2. Created a outputtag for denorm failed stats 3. Parse event validation failed messages into a case class * #0000 - refactor: 1. Updated the DruidRouter job to publish data to router topics dynamically 2. Updated framework to created dynamicKafkaSink object * #0000 - mega refactoring: 1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres 2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures 3. Refactored serde - merged map and string serialization into one function and parameterized the function 4. Moved failed events sinking into a common base class 5. Master dataset processor can now do denormalization with another master dataset as well * #0000 - mega refactoring: 1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres 2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures 3. Refactored serde - merged map and string serialization into one function and parameterized the function 4. Moved failed events sinking into a common base class 5. Master dataset processor can now do denormalization with another master dataset as well * #0000 - mega refactoring: 1. Added validation to check if the event has a timestamp key and it is not blank nor invalid 2. Added timezone handling to store the data in druid in the TZ specified by the dataset * #0000 - minor refactoring: Updated DatasetRegistry.getDatasetSourceConfig to getAllDatasetSourceConfig * #0000 - mega refactoring: Refactored logs, error messages and metrics * #0000 - mega refactoring: Fix unit tests * #0000 - refactoring: 1. Introduced transformation mode to enable lenient transformations 2. Proper exception handling for transformer job * #0000 - refactoring: Fix test cases and code * #0000 - refactoring: upgrade embedded redis to work with macos sonoma m2 * #0000 - refactoring: Denormalizer test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Router test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Validator test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Framework test cases and bug fixes * #0000 - refactoring: kafka connector test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: improve code coverage and fix bugs * #0000 - refactoring: improve code coverage and fix bugs --- Now the code coverage is 100% * #0000 - refactoring: organize imports * #0000 - refactoring: 1. transformer test cases and bug fixes - code coverage is 100% * #0000 - refactoring: test cases and bug fixes --------- * #000:feat: Removed the provided scope of the kafka-client in the framework (#40) * #0000 - feat: Add dataset-type to system events (#41) * #0000 - feat: Add dataset-type to system events * #0000 - feat: Modify tests for dataset-type in system events * #0000 - feat: Remove unused getDatasetType function * #0000 - feat: Remove unused pom test dependencies * #0000 - feat: Remove unused pom test dependencies * #67 feat: query system configurations from meta store * #67 fix: Refactor system configuration retrieval and update dynamic router function * #67 fix: update system config according to review * #67 fix: update test cases for system config * #67 fix: update default values in test cases * #67 fix: add get all system settings method and update test cases * #67 fix: add test case for covering exception case * #67 fix: fix data types in test cases * #67 fix: Refactor event indexing in DynamicRouterFunction * Issue #67 refactor: SystemConfig read from DB implementation * #226 fix: update test cases according to the refactor * Dataset Registry Update (#57) * Issue #0000: feat: updateConnectorStats method includes last run timestamp * Issue #0000: fix: updateConnectorStats sql query updated * Issue #0000: fix: updateConnectorStats sql query updated --------- Co-authored-by: Manjunath Davanam Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: shiva-rakshith Co-authored-by: Sowmya N Dixit Co-authored-by: Santhosh Co-authored-by: Aniket Sakinala Co-authored-by: Anand Parthasarathy Co-authored-by: Shreyas Bhaktharam <121869503+shreyasb22@users.noreply.github.com> * Develop to 1.0.2-GA (#65) (#66) * testing new images * testing new images * testing new images * testing new images * testing new images * build new image with bug fixes * update dockerfile * update dockerfile * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * feat: update all failed, invalid and duplicate topic names * feat: update kafka topic names in test cases * #0 fix: add individual extraction * feat: update failed event * Update ErrorConstants.scala * feat: update failed event * Issue #0 fix: upgrade ubuntu packages for vulnerabilities * feat: add exception handling for json deserialization * Update BaseProcessFunction.scala * Update BaseProcessFunction.scala * feat: update batch failed event generation * Update ExtractionFunction.scala * feat: update invalid json exception handling * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 feat: update batch failed event * Issue #46 fix: remove cloning object * Issue #46 feat: update batch failed event * #0 fix: update github actions release condition * Issue #46 feat: add error reasons * Issue #46 feat: add exception stack trace * Issue #46 feat: add exception stack trace * Dataset enhancements (#38) * feat: add connector config and connector stats update functions * Issue #33 feat: add documentation for Dataset, Datasources, Data In and Query APIs * Update DatasetModels.scala * #0 fix: upgrade packages * #0 feat: add flink dockerfiles * #0 fix: add individual extraction --------- * #0000 [SV] - Fallback to local redis instance if embedded redis is not starting * Update DatasetModels.scala * #0000 - refactor the denormalization logic 1. Do not fail the denormalization if the denorm key is missing 2. Add clear message whether the denorm is sucessful or failed or partially successful 3. Handle denorm for both text and number fields * #0000 - refactor: 1. Created a enum for dataset status and ignore events if the dataset is not in Live status 2. Created a outputtag for denorm failed stats 3. Parse event validation failed messages into a case class * #0000 - refactor: 1. Updated the DruidRouter job to publish data to router topics dynamically 2. Updated framework to created dynamicKafkaSink object * #0000 - mega refactoring: 1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres 2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures 3. Refactored serde - merged map and string serialization into one function and parameterized the function 4. Moved failed events sinking into a common base class 5. Master dataset processor can now do denormalization with another master dataset as well * #0000 - mega refactoring: 1. Made calls to getAllDatasets and getAllDatasetSources to always query postgres 2. Created BaseDatasetProcessFunction for all flink functions to extend that would dynamically resolve dataset config, initialize metrics and handle common failures 3. Refactored serde - merged map and string serialization into one function and parameterized the function 4. Moved failed events sinking into a common base class 5. Master dataset processor can now do denormalization with another master dataset as well * #0000 - mega refactoring: 1. Added validation to check if the event has a timestamp key and it is not blank nor invalid 2. Added timezone handling to store the data in druid in the TZ specified by the dataset * #0000 - minor refactoring: Updated DatasetRegistry.getDatasetSourceConfig to getAllDatasetSourceConfig * #0000 - mega refactoring: Refactored logs, error messages and metrics * #0000 - mega refactoring: Fix unit tests * #0000 - refactoring: 1. Introduced transformation mode to enable lenient transformations 2. Proper exception handling for transformer job * #0000 - refactoring: Fix test cases and code * #0000 - refactoring: upgrade embedded redis to work with macos sonoma m2 * #0000 - refactoring: Denormalizer test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Router test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Validator test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: Framework test cases and bug fixes * #0000 - refactoring: kafka connector test cases and bug fixes. Code coverage is 100% now * #0000 - refactoring: improve code coverage and fix bugs * #0000 - refactoring: improve code coverage and fix bugs --- Now the code coverage is 100% * #0000 - refactoring: organize imports * #0000 - refactoring: 1. transformer test cases and bug fixes - code coverage is 100% * #0000 - refactoring: test cases and bug fixes --------- * #000:feat: Removed the provided scope of the kafka-client in the framework (#40) * #0000 - feat: Add dataset-type to system events (#41) * #0000 - feat: Add dataset-type to system events * #0000 - feat: Modify tests for dataset-type in system events * #0000 - feat: Remove unused getDatasetType function * #0000 - feat: Remove unused pom test dependencies * #0000 - feat: Remove unused pom test dependencies * #67 feat: query system configurations from meta store * #67 fix: Refactor system configuration retrieval and update dynamic router function * #67 fix: update system config according to review * #67 fix: update test cases for system config * #67 fix: update default values in test cases * #67 fix: add get all system settings method and update test cases * #67 fix: add test case for covering exception case * #67 fix: fix data types in test cases * #67 fix: Refactor event indexing in DynamicRouterFunction * Issue #67 refactor: SystemConfig read from DB implementation * #226 fix: update test cases according to the refactor * Dataset Registry Update (#57) * Issue #0000: feat: updateConnectorStats method includes last run timestamp * Issue #0000: fix: updateConnectorStats sql query updated * Issue #0000: fix: updateConnectorStats sql query updated * #0000 - fix: Fix Postgres connection issue with defaultDatasetID (#64) * Metrics implementation for MasterDataIndexerJob (#55) * Issue #50 fix: Kafka Metrics implementation for MasterDataIndexerJob * Issue #50 fix: Changed 'ets' to UTC * Issue #50 feat: added log statements * Issue #50 fix: FIxed issue related to update query * Issue #50 fix: Code refactoring * Issue #50 fix: updated implementation of 'createDataFile' method * Issue #50 fix: code refactorig * Issue #50 test: Test cases for MasterDataIndexer * Issue #50 test: test cases implementation * Issue #50 test: Test case implementation for data-products * Issue #50 test: Test cases * Issue #50 test: test cases * Issue #50 test: test cases for data-products * Issue #50-fix: fixed jackson-databind issue * Isuue-#50-fix: code structure modifications * Issue #50-fix: code refactoring * Issue #50-fix: code refactoing * Issue-#50-Fix: test case fixes * Issue #50-fix: code formatting and code fixes * feat #50 - refactor the implementation * Issue-#50-fix: test cases fix * modified README file * revert readme file changes * revert dataset-registry * Issue-#50-fix: test cases fix * Issue-#50-fix: adding missing tests * Issue-#50-fix: refatoring code * Issue-#50-fix: code fixes and code formatting * fix #50: modified class declaration * fix #50: code refactor * fix #50: code refactor * fix #50: test cases fixes --------- * Remove kafka connector as it is moved to a independent repository --------- Signed-off-by: SurabhiAngadi Co-authored-by: Manjunath Davanam Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: shiva-rakshith Co-authored-by: Sowmya N Dixit Co-authored-by: Santhosh Co-authored-by: Aniket Sakinala Co-authored-by: Anand Parthasarathy Co-authored-by: Shreyas Bhaktharam <121869503+shreyasb22@users.noreply.github.com> Co-authored-by: SurabhiAngadi <138881390+SurabhiAngadi@users.noreply.github.com> * Release 1.0.3-GA (#72) * Pipeline Bug fixes (#74) * Sanketika-Obsrv/issue-tracker#106:fix: Fix postgres connection issue with dataset read and handling an errors while parsing the message * Sanketika-Obsrv/issue-tracker#107:fix: Denorm job fix to handle error when denorm field node is contains empty value * Sanketika-Obsrv/issue-tracker#106:fix: Review comments fix - Changed the generic exception to actual exception (NullPointer) * Pipeline Bug fixes (#74) (#77) * fix: #0000: update datasourceRef only if dataset has records * Sanketika-Obsrv/issue-tracker#180 fix: Datasource DB schema changes to include type. (#79) Co-authored-by: sowmya-dixit * Hudi connector flink job implementation (#80) * feat: Hudi Flink Implementation. * feat: local working with metastore and localstack. * #0000 - feat: Hudi Sink implementation * #0000 - feat: Hudi Sink implementation * #0000 - feat: Initialize dataset RowType during job startup * refactor: Integrate hudi connector with dataset registry. * refactor: Integrate hudi connector with dataset registry. * Sanketika-Obsrv/issue-tracker#141 refactor: Enable timestamp based partition * Sanketika-Obsrv/issue-tracker#141 refactor: Fix Hudi connector job to handle empty datasets list for lakehouse. * Sanketika-Obsrv/issue-tracker#141 fix: Set Timestamp based partition configurations only if partition key is of timestamp type. * Sanketika-Obsrv/issue-tracker#170 fix: Resolve timestamp based partition without using TimestampBasedAvroKeyGenerator. * Sanketika-Obsrv/issue-tracker#177 fix: Lakehouse connector flink job fixes. * Sanketika-Obsrv/issue-tracker#177 fix: Dockerfile changes for hudi-connector * Sanketika-Obsrv/issue-tracker#177 fix: Lakehouse connector flink job fixes. * Sanketika-Obsrv/issue-tracker#177 fix: remove unused code * Sanketika-Obsrv/issue-tracker#177 fix: remove unused code * Sanketika-Obsrv/issue-tracker#177 fix: remove unused code * Sanketika-Obsrv/issue-tracker#177 fix: remove commented code * Release 1.0.6-GA (#81) * Pipeline Bug fixes (#74) * Sanketika-Obsrv/issue-tracker#106:fix: Fix postgres connection issue with dataset read and handling an errors while parsing the message * Sanketika-Obsrv/issue-tracker#107:fix: Denorm job fix to handle error when denorm field node is contains empty value * Sanketika-Obsrv/issue-tracker#106:fix: Review comments fix - Changed the generic exception to actual exception (NullPointer) * fix: #0000: update datasourceRef only if dataset has records * Sanketika-Obsrv/issue-tracker#180 fix: Datasource DB schema changes to include type. (#79) Co-authored-by: sowmya-dixit * Hudi connector flink job implementation (#80) * feat: Hudi Flink Implementation. * feat: local working with metastore and localstack. * #0000 - feat: Hudi Sink implementation * #0000 - feat: Hudi Sink implementation * #0000 - feat: Initialize dataset RowType during job startup * refactor: Integrate hudi connector with dataset registry. * refactor: Integrate hudi connector with dataset registry. * Sanketika-Obsrv/issue-tracker#141 refactor: Enable timestamp based partition * Sanketika-Obsrv/issue-tracker#141 refactor: Fix Hudi connector job to handle empty datasets list for lakehouse. * Sanketika-Obsrv/issue-tracker#141 fix: Set Timestamp based partition configurations only if partition key is of timestamp type. * Sanketika-Obsrv/issue-tracker#170 fix: Resolve timestamp based partition without using TimestampBasedAvroKeyGenerator. * Sanketika-Obsrv/issue-tracker#177 fix: Lakehouse connector flink job fixes. * Sanketika-Obsrv/issue-tracker#177 fix: Dockerfile changes for hudi-connector * Sanketika-Obsrv/issue-tracker#177 fix: Lakehouse connector flink job fixes. * Sanketika-Obsrv/issue-tracker#177 fix: remove unused code * Sanketika-Obsrv/issue-tracker#177 fix: remove unused code * Sanketika-Obsrv/issue-tracker#177 fix: remove unused code * Sanketika-Obsrv/issue-tracker#177 fix: remove commented code --------- Co-authored-by: Manjunath Davanam Co-authored-by: SurabhiAngadi Co-authored-by: Sowmya N Dixit Co-authored-by: sowmya-dixit * Sanketika-obsrv/issue-tracker#228 feat: updated github actions for lakehouse job * # Issue:52655194 Feat - Add processingStartTime if missing during extractor job (#76) Co-authored-by: Santhosh Vasabhaktula * Sanketika-obsrv/issue-tracker#240 feat: lakehouse job changes to support retire workflow * Sanketika-obsrv/issue-tracker#240 feat: master data enhancements for lakehouse * Migrating Raw SQL Statements to Prepared Statements (#87) (#88) * #OBS-I148: Migration of SQL queries to prepared statement to avoid the SQL injection * #OBS-I148: Migration of SQL queries to prepared statement to avoid the SQL injection * #OBS-I148: Removed the unwanted imports * #OBS-I148: System Config Changes - Converted from raw query to prepared statements Co-authored-by: Ravi Mula --------- Signed-off-by: SurabhiAngadi Co-authored-by: shiva-rakshith Co-authored-by: Aniket Sakinala Co-authored-by: GayathriSrividya Co-authored-by: Manoj Krishna <92361832+ManojKrishnaChintauri@users.noreply.github.com> Co-authored-by: Santhosh Vasabhaktula Co-authored-by: ManojCKrishna Co-authored-by: ManojKrishnaChintaluri Co-authored-by: Praveen <66662436+pveleneni@users.noreply.github.com> Co-authored-by: Sowmya N Dixit Co-authored-by: Anand Parthasarathy Co-authored-by: Ravi Mula Co-authored-by: Manoj Krishna <92361832+ManojKrishnaChintaluri@users.noreply.github.com> Co-authored-by: Shreyas Bhaktharam <121869503+shreyasb22@users.noreply.github.com> Co-authored-by: SurabhiAngadi <138881390+SurabhiAngadi@users.noreply.github.com> Co-authored-by: SurabhiAngadi Co-authored-by: sowmya-dixit Co-authored-by: GayathriSrividya Co-authored-by: GayathriSrividya --- .github/workflows/build_and_deploy.yaml | 2 + Dockerfile | 19 +- .../MasterDataProcessorIndexer.scala | 2 +- .../src/main/resources/dataset-registry.sql | 1 + .../sunbird/obsrv/model/DatasetModels.scala | 2 +- .../obsrv/registry/DatasetRegistry.scala | 9 +- .../service/DatasetRegistryService.scala | 122 +++++++-- .../spec/BaseSpecWithDatasetRegistry.scala | 2 +- .../obsrv/spec/TestDatasetRegistrySpec.scala | 4 +- .../sunbird/obsrv/core/model/Constants.scala | 4 +- .../obsrv/core/model/ErrorConstants.scala | 2 +- .../obsrv/core/model/SystemConfig.scala | 13 +- .../sunbird/obsrv/core/serde/SerdeUtil.scala | 9 +- .../obsrv/core/util/PostgresConnect.scala | 37 ++- .../obsrv/denormalizer/util/DenormCache.scala | 2 +- .../functions/DynamicRouterFunction.scala | 2 +- pipeline/extractor/pom.xml | 12 + .../functions/ExtractionFunction.scala | 8 + pipeline/hudi-connector/pom.xml | 253 ++++++++++++++++++ .../src/main/resources/core-site.xml | 33 +++ .../src/main/resources/hudi-writer.conf | 41 +++ .../src/main/resources/schemas/schema.json | 108 ++++++++ .../functions/RowDataConverterFunction.scala | 43 +++ .../obsrv/streaming/HudiConnectorConfig.scala | 53 ++++ .../streaming/HudiConnectorStreamTask.scala | 153 +++++++++++ .../obsrv/streaming/TestTimestamp.scala | 19 ++ .../sunbird/obsrv/util/HudiSchemaParser.scala | 140 ++++++++++ pipeline/master-data-processor/pom.xml | 5 + .../task/MasterDataProcessorStreamTask.scala | 3 + pipeline/pom.xml | 1 + stubs/docker/apache-flink-plugins/Dockerfile | 2 +- 31 files changed, 1051 insertions(+), 55 deletions(-) create mode 100644 pipeline/hudi-connector/pom.xml create mode 100644 pipeline/hudi-connector/src/main/resources/core-site.xml create mode 100644 pipeline/hudi-connector/src/main/resources/hudi-writer.conf create mode 100644 pipeline/hudi-connector/src/main/resources/schemas/schema.json create mode 100644 pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/functions/RowDataConverterFunction.scala create mode 100644 pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala create mode 100644 pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala create mode 100644 pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/TestTimestamp.scala create mode 100644 pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/util/HudiSchemaParser.scala diff --git a/.github/workflows/build_and_deploy.yaml b/.github/workflows/build_and_deploy.yaml index 35ba8cf8..601c8f5e 100644 --- a/.github/workflows/build_and_deploy.yaml +++ b/.github/workflows/build_and_deploy.yaml @@ -31,6 +31,8 @@ jobs: target: "merged-image" - image: "master-data-processor" target: "master-data-processor-image" + - image: "lakehouse-connector" + target: "lakehouse-connector-image" steps: - uses: actions/checkout@v4 with: diff --git a/Dockerfile b/Dockerfile index b9f41aa8..fd4002be 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,34 +8,35 @@ COPY --from=build-core /root/.m2 /root/.m2 COPY . /app RUN mvn clean package -DskipTests -f /app/pipeline/pom.xml -FROM --platform=linux/x86_64 sunbird/flink:1.15.2-scala_2.12-jdk-11 as extractor-image +FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as extractor-image USER flink COPY --from=build-pipeline /app/pipeline/extractor/target/extractor-1.0.0.jar $FLINK_HOME/lib/ -FROM --platform=linux/x86_64 sunbird/flink:1.15.2-scala_2.12-jdk-11 as preprocessor-image +FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as preprocessor-image USER flink COPY --from=build-pipeline /app/pipeline/preprocessor/target/preprocessor-1.0.0.jar $FLINK_HOME/lib/ -FROM --platform=linux/x86_64 sunbird/flink:1.15.2-scala_2.12-jdk-11 as denormalizer-image +FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as denormalizer-image USER flink COPY --from=build-pipeline /app/pipeline/denormalizer/target/denormalizer-1.0.0.jar $FLINK_HOME/lib/ -FROM --platform=linux/x86_64 sunbird/flink:1.15.2-scala_2.12-jdk-11 as transformer-image +FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as transformer-image USER flink COPY --from=build-pipeline /app/pipeline/transformer/target/transformer-1.0.0.jar $FLINK_HOME/lib/ -FROM --platform=linux/x86_64 sunbird/flink:1.15.2-scala_2.12-jdk-11 as router-image +FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as router-image USER flink COPY --from=build-pipeline /app/pipeline/druid-router/target/druid-router-1.0.0.jar $FLINK_HOME/lib/ -FROM --platform=linux/x86_64 sunbird/flink:1.15.2-scala_2.12-jdk-11 as merged-image +FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as merged-image USER flink COPY --from=build-pipeline /app/pipeline/pipeline-merged/target/pipeline-merged-1.0.0.jar $FLINK_HOME/lib/ -FROM --platform=linux/x86_64 sunbird/flink:1.15.2-scala_2.12-jdk-11 as master-data-processor-image +FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as master-data-processor-image USER flink COPY --from=build-pipeline /app/pipeline/master-data-processor/target/master-data-processor-1.0.0.jar $FLINK_HOME/lib -FROM --platform=linux/x86_64 sunbird/flink:1.15.2-scala_2.12-jdk-11 as kafka-connector-image +FROM --platform=linux/x86_64 sanketikahub/flink:1.15.0-scala_2.12-lakehouse as lakehouse-connector-image USER flink -COPY --from=build-pipeline /app/pipeline/kafka-connector/target/kafka-connector-1.0.0.jar $FLINK_HOME/lib \ No newline at end of file +RUN mkdir $FLINK_HOME/custom-lib +COPY ./pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/custom-lib diff --git a/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/MasterDataProcessorIndexer.scala b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/MasterDataProcessorIndexer.scala index 2c41181c..22729aa0 100644 --- a/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/MasterDataProcessorIndexer.scala +++ b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/MasterDataProcessorIndexer.scala @@ -27,8 +27,8 @@ object MasterDataProcessorIndexer { val ingestionSpec: String = updateIngestionSpec(datasource, paths.datasourceRef, paths.ingestionPath, config) if (eventsCount > 0L) { submitIngestionTask(dataset.id, ingestionSpec, config) + DatasetRegistry.updateDatasourceRef(datasource, paths.datasourceRef) } - DatasetRegistry.updateDatasourceRef(datasource, paths.datasourceRef) if (!datasource.datasourceRef.equals(paths.datasourceRef)) { deleteDataSource(dataset.id, datasource.datasourceRef, config) } diff --git a/dataset-registry/src/main/resources/dataset-registry.sql b/dataset-registry/src/main/resources/dataset-registry.sql index ff28ae98..54373eec 100644 --- a/dataset-registry/src/main/resources/dataset-registry.sql +++ b/dataset-registry/src/main/resources/dataset-registry.sql @@ -22,6 +22,7 @@ CREATE INDEX IF NOT EXISTS datasets_status ON datasets(status); CREATE TABLE IF NOT EXISTS datasources ( datasource text PRIMARY KEY, dataset_id text REFERENCES datasets (id), + type text NOT NULL, ingestion_spec json NOT NULL, datasource_ref text NOT NULL, retention_period json, diff --git a/dataset-registry/src/main/scala/org/sunbird/obsrv/model/DatasetModels.scala b/dataset-registry/src/main/scala/org/sunbird/obsrv/model/DatasetModels.scala index 49cc51bc..3aebe8bd 100644 --- a/dataset-registry/src/main/scala/org/sunbird/obsrv/model/DatasetModels.scala +++ b/dataset-registry/src/main/scala/org/sunbird/obsrv/model/DatasetModels.scala @@ -71,7 +71,7 @@ object DatasetModels { @JsonProperty("status") status: String, @JsonProperty("connector_stats") connectorStats: Option[ConnectorStats] = None) case class DataSource(@JsonProperty("id") id: String, @JsonProperty("datasource") datasource: String, @JsonProperty("dataset_id") datasetId: String, - @JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String) + @JsonProperty("type") `type`: String, @JsonProperty("status") status: String, @JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String) } diff --git a/dataset-registry/src/main/scala/org/sunbird/obsrv/registry/DatasetRegistry.scala b/dataset-registry/src/main/scala/org/sunbird/obsrv/registry/DatasetRegistry.scala index ad239312..08921adc 100644 --- a/dataset-registry/src/main/scala/org/sunbird/obsrv/registry/DatasetRegistry.scala +++ b/dataset-registry/src/main/scala/org/sunbird/obsrv/registry/DatasetRegistry.scala @@ -8,9 +8,9 @@ import scala.collection.mutable object DatasetRegistry { - private val datasets: mutable.Map[String, Dataset] = mutable.Map[String, Dataset]() + lazy private val datasets: mutable.Map[String, Dataset] = mutable.Map[String, Dataset]() datasets ++= DatasetRegistryService.readAllDatasets() - private val datasetTransformations: Map[String, List[DatasetTransformation]] = DatasetRegistryService.readAllDatasetTransformations() + lazy private val datasetTransformations: Map[String, List[DatasetTransformation]] = DatasetRegistryService.readAllDatasetTransformations() def getAllDatasets(datasetType: String): List[Dataset] = { val datasetList = DatasetRegistryService.readAllDatasets() @@ -42,6 +42,11 @@ object DatasetRegistry { DatasetRegistryService.readDatasources(datasetId) } + def getAllDatasources(): List[DataSource] = { + val datasourceList = DatasetRegistryService.readAllDatasources() + datasourceList.getOrElse(List()) + } + def getDataSetIds(datasetType: String): List[String] = { datasets.filter(f => f._2.datasetType.equals(datasetType)).keySet.toList } diff --git a/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala b/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala index 88efb7a6..0b0abe23 100644 --- a/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala +++ b/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala @@ -6,7 +6,7 @@ import org.sunbird.obsrv.model.DatasetModels._ import org.sunbird.obsrv.model.{DatasetStatus, TransformMode} import java.io.File -import java.sql.{ResultSet, Timestamp} +import java.sql.{PreparedStatement, ResultSet, Timestamp} object DatasetRegistryService { private val configFile = new File("/data/flink/conf/baseconfig.conf") @@ -42,22 +42,28 @@ object DatasetRegistryService { } def readDataset(id: String): Option[Dataset] = { - val postgresConnect = new PostgresConnect(postgresConfig) + var preparedStatement: PreparedStatement = null + var resultSet: ResultSet = null try { - val rs = postgresConnect.executeQuery(s"SELECT * FROM datasets where id='$id'") - if (rs.next()) { - Some(parseDataset(rs)) + val query = "SELECT * FROM datasets WHERE id = ?" + preparedStatement = postgresConnect.prepareStatement(query) + preparedStatement.setString(1, id) + resultSet = postgresConnect.executeQuery(preparedStatement = preparedStatement) + if (resultSet.next()) { + Some(parseDataset(resultSet)) } else { None } } finally { + if (resultSet != null) resultSet.close() + if (preparedStatement != null) preparedStatement.close() postgresConnect.closeConnection() } } - def readAllDatasetSourceConfig(): Option[List[DatasetSourceConfig]] = { + def readAllDatasetSourceConfig(): Option[List[DatasetSourceConfig]] = { val postgresConnect = new PostgresConnect(postgresConfig) try { val rs = postgresConnect.executeQuery("SELECT * FROM dataset_source_config") @@ -70,16 +76,23 @@ object DatasetRegistryService { } } - def readDatasetSourceConfig(datasetId: String): Option[List[DatasetSourceConfig]] = { + def readDatasetSourceConfig(datasetId: String): Option[List[DatasetSourceConfig]] = { val postgresConnect = new PostgresConnect(postgresConfig) + var preparedStatement: PreparedStatement = null + var resultSet: ResultSet = null try { - val rs = postgresConnect.executeQuery(s"SELECT * FROM dataset_source_config where dataset_id='$datasetId'") - Option(Iterator.continually((rs, rs.next)).takeWhile(f => f._2).map(f => f._1).map(result => { + val query = "SELECT * FROM dataset_source_config WHERE dataset_id = ?" + preparedStatement = postgresConnect.prepareStatement(query) + preparedStatement.setString(1, datasetId) + resultSet = postgresConnect.executeQuery(preparedStatement = preparedStatement) + Option(Iterator.continually((resultSet, resultSet.next)).takeWhile(f => f._2).map(f => f._1).map(result => { val datasetSourceConfig = parseDatasetSourceConfig(result) datasetSourceConfig }).toList) } finally { + if (resultSet != null) resultSet.close() + if (preparedStatement != null) preparedStatement.close() postgresConnect.closeConnection() } } @@ -99,46 +112,95 @@ object DatasetRegistryService { } def readDatasources(datasetId: String): Option[List[DataSource]] = { - val postgresConnect = new PostgresConnect(postgresConfig) + var preparedStatement: PreparedStatement = null + var resultSet: ResultSet = null try { - val rs = postgresConnect.executeQuery(s"SELECT * FROM datasources where dataset_id='$datasetId'") - Option(Iterator.continually((rs, rs.next)).takeWhile(f => f._2).map(f => f._1).map(result => { + val query = "SELECT * FROM datasources WHERE dataset_id = ?" + preparedStatement = postgresConnect.prepareStatement(query) + preparedStatement.setString(1, datasetId) + resultSet = postgresConnect.executeQuery(preparedStatement = preparedStatement) + Option(Iterator.continually((resultSet, resultSet.next)).takeWhile(f => f._2).map(f => f._1).map(result => { parseDatasource(result) }).toList) } finally { + if (resultSet != null) resultSet.close() + if (preparedStatement != null) preparedStatement.close() postgresConnect.closeConnection() } } - def updateDatasourceRef(datasource: DataSource, datasourceRef: String): Int = { - val query = s"UPDATE datasources set datasource_ref = '$datasourceRef' where datasource='${datasource.datasource}' and dataset_id='${datasource.datasetId}'" - updateRegistry(query) + def readAllDatasources(): Option[List[DataSource]] = { + + val postgresConnect = new PostgresConnect(postgresConfig) + try { + val rs = postgresConnect.executeQuery(s"SELECT * FROM datasources") + Option(Iterator.continually((rs, rs.next)).takeWhile(f => f._2).map(f => f._1).map(result => { + parseDatasource(result) + }).toList) + } } + def updateDatasourceRef(datasource: DataSource, datasourceRef: String): Int = { + val postgresConnect = new PostgresConnect(postgresConfig) + var preparedStatement: PreparedStatement = null + val query = "UPDATE datasources SET datasource_ref = ? WHERE datasource = ? AND dataset_id = ?" + try { + preparedStatement = postgresConnect.prepareStatement(query) + preparedStatement.setString(1, datasourceRef) + preparedStatement.setString(2, datasource.datasource) + preparedStatement.setString(3, datasource.datasetId) + postgresConnect.executeUpdate(preparedStatement) + } finally { + if (preparedStatement != null) preparedStatement.close() + postgresConnect.closeConnection() + } + } + def updateConnectorStats(id: String, lastFetchTimestamp: Timestamp, records: Long): Int = { - val query = s"UPDATE dataset_source_config SET connector_stats = coalesce(connector_stats, '{}')::jsonb || " + - s"jsonb_build_object('records', COALESCE(connector_stats->>'records', '0')::int + '$records'::int) || " + - s"jsonb_build_object('last_fetch_timestamp', '${lastFetchTimestamp}'::timestamp) || " + - s"jsonb_build_object('last_run_timestamp', '${new Timestamp(System.currentTimeMillis())}'::timestamp) WHERE id = '$id';" - updateRegistry(query) + val postgresConnect = new PostgresConnect(postgresConfig) + var preparedStatement: PreparedStatement = null + val query = "UPDATE dataset_source_config SET connector_stats = COALESCE(connector_stats, '{}')::jsonb || jsonb_build_object('records', COALESCE(connector_stats->>'records', '0')::int + ? ::int) || jsonb_build_object('last_fetch_timestamp', ? ::timestamp) || jsonb_build_object('last_run_timestamp', ? ::timestamp) WHERE id = ?;" + try { + preparedStatement = postgresConnect.prepareStatement(query) + preparedStatement.setString(1, records.toString) + preparedStatement.setTimestamp(2, lastFetchTimestamp) + preparedStatement.setTimestamp(3, new Timestamp(System.currentTimeMillis())) + preparedStatement.setString(4, id) + postgresConnect.executeUpdate(preparedStatement) + } finally { + if (preparedStatement != null) preparedStatement.close() + postgresConnect.closeConnection() + } } + def updateConnectorDisconnections(id: String, disconnections: Int): Int = { - val query = s"UPDATE dataset_source_config SET connector_stats = jsonb_set(coalesce(connector_stats, '{}')::jsonb, '{disconnections}','$disconnections') WHERE id = '$id'" - updateRegistry(query) + val postgresConnect = new PostgresConnect(postgresConfig) + var preparedStatement: PreparedStatement = null + val query = "UPDATE dataset_source_config SET connector_stats = jsonb_set(coalesce(connector_stats, '{}')::jsonb, '{disconnections}', to_jsonb(?)) WHERE id = ?" + try { + preparedStatement = postgresConnect.prepareStatement(query) + preparedStatement.setInt(1, disconnections) + preparedStatement.setString(2, id) + postgresConnect.executeUpdate(preparedStatement) + } finally { + if (preparedStatement != null) preparedStatement.close() + postgresConnect.closeConnection() + } } def updateConnectorAvgBatchReadTime(id: String, avgReadTime: Long): Int = { - val query = s"UPDATE dataset_source_config SET connector_stats = jsonb_set(coalesce(connector_stats, '{}')::jsonb, '{avg_batch_read_time}','$avgReadTime') WHERE id = '$id'" - updateRegistry(query) - } - - private def updateRegistry(query: String): Int = { val postgresConnect = new PostgresConnect(postgresConfig) + var preparedStatement: PreparedStatement = null + val query = "UPDATE dataset_source_config SET connector_stats = jsonb_set(coalesce(connector_stats, '{}')::jsonb, '{avg_batch_read_time}', to_jsonb(?)) WHERE id = ?" try { - postgresConnect.executeUpdate(query) + preparedStatement = postgresConnect.prepareStatement(query) + preparedStatement.setLong(1, avgReadTime) + preparedStatement.setString(2, id) + postgresConnect.executeUpdate(preparedStatement) } finally { + if (preparedStatement != null) preparedStatement.close() postgresConnect.closeConnection() } } @@ -190,10 +252,12 @@ object DatasetRegistryService { val id = rs.getString("id") val datasource = rs.getString("datasource") val datasetId = rs.getString("dataset_id") + val datasourceType = rs.getString("type") + val datasourceStatus = rs.getString("status") val ingestionSpec = rs.getString("ingestion_spec") val datasourceRef = rs.getString("datasource_ref") - DataSource(id, datasource, datasetId, ingestionSpec, datasourceRef) + DataSource(id, datasource, datasetId, datasourceType, datasourceStatus, ingestionSpec, datasourceRef) } private def parseDatasetTransformation(rs: ResultSet): DatasetTransformation = { diff --git a/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala b/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala index 09321143..1b3edea0 100644 --- a/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala +++ b/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala @@ -36,7 +36,7 @@ class BaseSpecWithDatasetRegistry extends BaseSpecWithPostgres { private def createSchema(postgresConnect: PostgresConnect): Unit = { postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasets ( id text PRIMARY KEY, type text NOT NULL, validation_config json, extraction_config json, dedup_config json, data_schema json, denorm_config json, router_config json NOT NULL, dataset_config json NOT NULL, status text NOT NULL, tags text[], data_version INT, created_by text NOT NULL, updated_by text NOT NULL, created_date timestamp NOT NULL, updated_date timestamp NOT NULL );") - postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );") + postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), type text NOT NULL, ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );") postgresConnect.execute("CREATE TABLE IF NOT EXISTS dataset_transformations ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), field_key text NOT NULL, transformation_function json NOT NULL, status text NOT NULL, mode text, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(field_key, dataset_id) );") postgresConnect.execute("CREATE TABLE IF NOT EXISTS dataset_source_config ( id text PRIMARY KEY, dataset_id text NOT NULL REFERENCES datasets (id), connector_type text NOT NULL, connector_config json NOT NULL, status text NOT NULL, connector_stats json, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(connector_type, dataset_id) );") } diff --git a/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/TestDatasetRegistrySpec.scala b/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/TestDatasetRegistrySpec.scala index b37e801a..3d83552d 100644 --- a/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/TestDatasetRegistrySpec.scala +++ b/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/TestDatasetRegistrySpec.scala @@ -92,8 +92,8 @@ class TestDatasetRegistrySpec extends BaseSpecWithDatasetRegistry with Matchers postgresConnect.execute("insert into dataset_source_config values('sc1', 'd1', 'kafka', '{\"kafkaBrokers\":\"localhost:9090\",\"topic\":\"test-topic\"}', 'Live', null, 'System', 'System', now(), now());") postgresConnect.execute("insert into dataset_source_config values('sc2', 'd1', 'rdbms', '{\"type\":\"postgres\",\"tableName\":\"test-table\"}', 'Live', null, 'System', 'System', now(), now());") - //postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );") - postgresConnect.execute("insert into datasources values('ds1', 'd1', '{}', 'd1-datasource', 'd1-datasource-1', null, null, null, '{}', 'Live', 'System', 'System', now(), now());") + //postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), type text NOT NULL, ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );") + postgresConnect.execute("insert into datasources values('ds1', 'd1', 'druid', '{}', 'd1-datasource', 'd1-datasource-1', null, null, null, '{}', 'Live', 'System', 'System', now(), now());") postgresConnect.closeConnection() } } \ No newline at end of file diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/model/Constants.scala b/framework/src/main/scala/org/sunbird/obsrv/core/model/Constants.scala index 2cfbd307..466552dd 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/model/Constants.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/model/Constants.scala @@ -5,6 +5,7 @@ object Constants { val EVENT = "event" val INVALID_JSON = "invalid_json" val OBSRV_META = "obsrv_meta" + val PROCESSING_START_TIME = "processingStartTime" val SRC = "src" val ERROR_CODE = "error_code" val ERROR_MSG = "error_msg" @@ -14,5 +15,6 @@ object Constants { val LEVEL = "level" val TOPIC = "topic" val MESSAGE = "message" - + val DATALAKE_TYPE = "datalake" + val MASTER_DATASET_TYPE = "master-dataset" } diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/model/ErrorConstants.scala b/framework/src/main/scala/org/sunbird/obsrv/core/model/ErrorConstants.scala index 9c69aff2..b5e57d87 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/model/ErrorConstants.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/model/ErrorConstants.scala @@ -43,4 +43,4 @@ object ErrorConstants extends Enumeration { val UNSUPPORTED_PROVIDER = ErrorInternalValue("ERR_UNSUPPORTED_PROVIDER_1030", "Unsupported provider.") val ERR_SUBMIT_INGESTION_FAILED = ErrorInternalValue("ERR_MDP_1031", "Unable to submit ingestion task to druid.") val ERR_DELETE_DATASOURCE_FAILED = ErrorInternalValue("ERR_MDP_1032", "Failed to delete datasource.") -} \ No newline at end of file +} diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/model/SystemConfig.scala b/framework/src/main/scala/org/sunbird/obsrv/core/model/SystemConfig.scala index decb916f..e4c05e4c 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/model/SystemConfig.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/model/SystemConfig.scala @@ -6,7 +6,7 @@ import org.sunbird.obsrv.core.model.Models.SystemSetting import org.sunbird.obsrv.core.util.{PostgresConnect, PostgresConnectionConfig} import java.io.File -import java.sql.ResultSet +import java.sql.{PreparedStatement, ResultSet} object SystemConfig { @@ -102,10 +102,17 @@ object SystemConfigService { @throws[Exception] def getSystemSetting(key: String): Option[SystemSetting] = { val postgresConnect = new PostgresConnect(postgresConfig) + var preparedStatement: PreparedStatement = null + var rs: ResultSet = null + val query = "SELECT * FROM system_settings WHERE key = ?" + preparedStatement = postgresConnect.prepareStatement(query) + preparedStatement.setString(1, key) try { - val rs = postgresConnect.executeQuery(s"SELECT * FROM system_settings WHERE key = '$key'") + rs = postgresConnect.executeQuery(preparedStatement = preparedStatement) if (rs.next) Option(parseSystemSetting(rs)) else None } finally { + if (rs != null) rs.close() + if (preparedStatement != null) preparedStatement.close() postgresConnect.closeConnection() } } @@ -119,4 +126,4 @@ object SystemConfigService { SystemSetting(key, value, category, valueType, Option(label)) } -} \ No newline at end of file +} diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala b/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala index 56525db4..370353c7 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala @@ -7,6 +7,7 @@ import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDe import org.apache.flink.util.Collector import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerRecord +import org.slf4j.LoggerFactory import org.sunbird.obsrv.core.model.Constants import org.sunbird.obsrv.core.util.JSONUtil @@ -48,11 +49,17 @@ class MapDeserializationSchema extends KafkaRecordDeserializationSchema[mutable. class StringDeserializationSchema extends KafkaRecordDeserializationSchema[String] { private val serialVersionUID = -3224825136576915426L + private[this] val logger = LoggerFactory.getLogger(classOf[StringDeserializationSchema]) override def getProducedType: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[String]): Unit = { - out.collect(new String(record.value(), StandardCharsets.UTF_8)) + try { + out.collect(new String(record.value(), StandardCharsets.UTF_8)) + } catch { + case ex: NullPointerException => + logger.error(s"Exception while parsing the message: ${ex.getMessage}") + } } } diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/util/PostgresConnect.scala b/framework/src/main/scala/org/sunbird/obsrv/core/util/PostgresConnect.scala index 8322351c..a1a23df9 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/util/PostgresConnect.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/util/PostgresConnect.scala @@ -3,7 +3,7 @@ package org.sunbird.obsrv.core.util import org.postgresql.ds.PGSimpleDataSource import org.slf4j.LoggerFactory -import java.sql.{Connection, ResultSet, SQLException, Statement} +import java.sql.{Connection, PreparedStatement, ResultSet, SQLException, Statement} final case class PostgresConnectionConfig(user: String, password: String, database: String, host: String, port: Int, maxConnections: Int) @@ -71,6 +71,41 @@ class PostgresConnect(config: PostgresConnectionConfig) { // $COVERAGE-ON$ } + def prepareStatement(query: String): PreparedStatement = { + try { + connection.prepareStatement(query) + } catch { + case ex: SQLException => + ex.printStackTrace() + logger.error("PostgresConnect:prepareStatement() - Exception", ex) + reset() + connection.prepareStatement(query) + } + } + + def executeUpdate(preparedStatement: PreparedStatement): Int = { + try { + preparedStatement.executeUpdate() + } catch { + case ex: SQLException => + ex.printStackTrace() + logger.error("PostgresConnect:executeUpdate():PreparedStatement - Exception", ex) + reset() + preparedStatement.executeUpdate() + } + } + + def executeQuery(preparedStatement: PreparedStatement): ResultSet = { + try { + preparedStatement.executeQuery() + } catch { + case ex: SQLException => + logger.error("PostgresConnect:execute():PreparedStatement - Exception", ex) + reset() + preparedStatement.executeQuery() + } + } + def executeQuery(query:String):ResultSet = statement.executeQuery(query) } diff --git a/pipeline/denormalizer/src/main/scala/org/sunbird/obsrv/denormalizer/util/DenormCache.scala b/pipeline/denormalizer/src/main/scala/org/sunbird/obsrv/denormalizer/util/DenormCache.scala index dd94a251..db0da7d5 100644 --- a/pipeline/denormalizer/src/main/scala/org/sunbird/obsrv/denormalizer/util/DenormCache.scala +++ b/pipeline/denormalizer/src/main/scala/org/sunbird/obsrv/denormalizer/util/DenormCache.scala @@ -79,7 +79,7 @@ class DenormCache(val config: DenormalizerConfig) { if (denormFieldNode.isMissingNode) { DenormFieldStatus("", success = false, Some(ErrorConstants.DENORM_KEY_MISSING)) } else { - if (denormFieldNode.isTextual || denormFieldNode.isNumber) { + if ((denormFieldNode.isTextual && denormFieldNode.asText().nonEmpty) || denormFieldNode.isNumber) { DenormFieldStatus(denormFieldNode.asText(), success = false, None) } else { DenormFieldStatus("", success = false, Some(ErrorConstants.DENORM_KEY_NOT_A_STRING_OR_NUMBER)) diff --git a/pipeline/druid-router/src/main/scala/org/sunbird/obsrv/router/functions/DynamicRouterFunction.scala b/pipeline/druid-router/src/main/scala/org/sunbird/obsrv/router/functions/DynamicRouterFunction.scala index ed50c8eb..9d40db5c 100644 --- a/pipeline/druid-router/src/main/scala/org/sunbird/obsrv/router/functions/DynamicRouterFunction.scala +++ b/pipeline/druid-router/src/main/scala/org/sunbird/obsrv/router/functions/DynamicRouterFunction.scala @@ -44,7 +44,7 @@ class DynamicRouterFunction(config: DruidRouterConfig) extends BaseDatasetProces event.put(config.CONST_OBSRV_META, msg(config.CONST_OBSRV_META).asInstanceOf[Map[String, AnyRef]]) val tsKeyData = TimestampKeyParser.parseTimestampKey(dataset.datasetConfig, event) event.put("indexTS", tsKeyData.value) - if (tsKeyData.isValid) { + if (tsKeyData.isValid || dataset.datasetType.equalsIgnoreCase(Constants.MASTER_DATASET_TYPE)) { val routerConfig = dataset.routerConfig val topicEventMap = mutable.Map(Constants.TOPIC -> routerConfig.topic, Constants.MESSAGE -> event) ctx.output(config.routerOutputTag, topicEventMap) diff --git a/pipeline/extractor/pom.xml b/pipeline/extractor/pom.xml index 95ac031f..4cc11c58 100644 --- a/pipeline/extractor/pom.xml +++ b/pipeline/extractor/pom.xml @@ -98,6 +98,18 @@ 3.4.0 test + + io.zonky.test + embedded-postgres + 2.0.3 + test + + + io.github.embeddedkafka + embedded-kafka_2.12 + 3.4.0 + test + io.zonky.test embedded-postgres diff --git a/pipeline/extractor/src/main/scala/org/sunbird/obsrv/extractor/functions/ExtractionFunction.scala b/pipeline/extractor/src/main/scala/org/sunbird/obsrv/extractor/functions/ExtractionFunction.scala index f1fea9fb..0e79b08c 100644 --- a/pipeline/extractor/src/main/scala/org/sunbird/obsrv/extractor/functions/ExtractionFunction.scala +++ b/pipeline/extractor/src/main/scala/org/sunbird/obsrv/extractor/functions/ExtractionFunction.scala @@ -46,6 +46,7 @@ class ExtractionFunction(config: ExtractorConfig) context.output(config.systemEventsOutputTag, failedSystemEvent(Some(config.defaultDatasetID), ErrorConstants.ERR_INVALID_EVENT, FunctionalError.InvalidJsonData)) return } + addStartProcessingTimeIfMissing(batchEvent) val eventAsText = JSONUtil.serialize(batchEvent) val datasetIdOpt = batchEvent.get(config.CONST_DATASET) if (datasetIdOpt.isEmpty) { @@ -79,6 +80,13 @@ class ExtractionFunction(config: ExtractorConfig) } } + private def addStartProcessingTimeIfMissing(batchEvent: mutable.Map[String, AnyRef]): Unit = { + val obsrvMeta = batchEvent(Constants.OBSRV_META).asInstanceOf[Map[String, AnyRef]] + if(!obsrvMeta.contains(Constants.PROCESSING_START_TIME)) { + batchEvent.put(Constants.OBSRV_META, obsrvMeta ++ Map("processingStartTime" -> System.currentTimeMillis())) + } + } + private def isDuplicate(dataset: Dataset, dedupKey: Option[String], event: String, context: ProcessFunction[mutable.Map[String, AnyRef], mutable.Map[String, AnyRef]]#Context): Boolean = { try { diff --git a/pipeline/hudi-connector/pom.xml b/pipeline/hudi-connector/pom.xml new file mode 100644 index 00000000..5230d8eb --- /dev/null +++ b/pipeline/hudi-connector/pom.xml @@ -0,0 +1,253 @@ + + + 4.0.0 + + pipeline + org.sunbird.obsrv + 1.0 + + hudi-connector + 1.0.0 + Hudi Connector + + UTF-8 + 1.4.0 + + + + + org.apache.flink + flink-streaming-scala_${scala.maj.version} + ${flink.version} + provided + + + com.fasterxml.jackson.core + jackson-databind + + + + + org.sunbird.obsrv + framework + 1.0.0 + + + org.sunbird.obsrv + dataset-registry + 1.0.0 + + + org.apache.kafka + kafka-clients + + + + + org.apache.hudi + hudi-flink1.15-bundle + 0.14.1 + + + org.apache.hadoop + hadoop-common + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.flink + flink-table-api-scala-bridge_${scala.maj.version} + ${flink.version} + provided + + + io.github.classgraph + classgraph + 4.8.168 + + + org.apache.flink + flink-connector-hive_${scala.maj.version} + ${flink.version} + + + org.apache.hive + hive-metastore + 3.1.3 + + + org.apache.hadoop + hadoop-common + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + + + org.apache.hive + hive-exec + 3.1.3 + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + org.apache.hadoop + hadoop-common + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + + + src/main/scala + src/test/scala + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + false + + + com.google.code.findbugs:jsr305 + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + core-site.xml + + + + + + org.sunbird.obsrv.streaming.HudiConnectorStreamTask + + + + reference.conf + + + + + + + + + net.alchim31.maven + scala-maven-plugin + 4.4.0 + + ${java.target.runtime} + ${java.target.runtime} + ${scala.version} + false + + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + + maven-surefire-plugin + 2.22.2 + + true + + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + hudi-connector-testsuite.txt + + + + test + + test + + + + + + org.scoverage + scoverage-maven-plugin + ${scoverage.plugin.version} + + ${scala.version} + true + true + + + + + diff --git a/pipeline/hudi-connector/src/main/resources/core-site.xml b/pipeline/hudi-connector/src/main/resources/core-site.xml new file mode 100644 index 00000000..c15df562 --- /dev/null +++ b/pipeline/hudi-connector/src/main/resources/core-site.xml @@ -0,0 +1,33 @@ + + + + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3a.endpoint + http://localhost:4566 + + + fs.s3a.access.key + test + + + fs.s3a.secret.key + testSecret + + + fs.s3a.path.style.access + true + + + fs.s3a.connection.ssl.enabled + false + + + + + + \ No newline at end of file diff --git a/pipeline/hudi-connector/src/main/resources/hudi-writer.conf b/pipeline/hudi-connector/src/main/resources/hudi-writer.conf new file mode 100644 index 00000000..d9c031b5 --- /dev/null +++ b/pipeline/hudi-connector/src/main/resources/hudi-writer.conf @@ -0,0 +1,41 @@ +include "baseconfig.conf" + +kafka { + input.topic = ${job.env}".hudi.connector.in" + output.topic = ${job.env}".hudi.connector.out" + output.invalid.topic = ${job.env}".failed" + event.max.size = "1048576" # Max is only 1MB + groupId = ${job.env}"-hudi-writer-group" + producer { + max-request-size = 5242880 + } +} + +task { + checkpointing.compressed = true + checkpointing.interval = 30000 + checkpointing.pause.between.seconds = 30000 + restart-strategy.attempts = 3 + restart-strategy.delay = 30000 # in milli-seconds + parallelism = 1 + consumer.parallelism = 1 + downstream.operators.parallelism = 1 +} + +hudi { + hms { + enabled = true + uri = "thrift://localhost:9083" + database { + name = "obsrv" + username = "postgres" + password = "postgres" + } + } + table { + type = "MERGE_ON_READ" + base.path = "s3a://obsrv" + } + compaction.enabled = true + write.tasks = 1 +} \ No newline at end of file diff --git a/pipeline/hudi-connector/src/main/resources/schemas/schema.json b/pipeline/hudi-connector/src/main/resources/schemas/schema.json new file mode 100644 index 00000000..177c957a --- /dev/null +++ b/pipeline/hudi-connector/src/main/resources/schemas/schema.json @@ -0,0 +1,108 @@ +{ + "dataset": "financial_transactions", + "schema": { + "table": "financial_transactions", + "partitionColumn": "receiver_ifsc_code", + "timestampColumn": "txn_date", + "primaryKey": "txn_id", + "columnSpec": [ + { + "name": "receiver_account_number", + "type": "string" + }, + { + "name": "receiver_ifsc_code", + "type": "string" + }, + { + "name": "sender_account_number", + "type": "string" + }, + { + "name": "sender_contact_email", + "type": "string" + }, + { + "name": "sender_ifsc_code", + "type": "string" + }, + { + "name": "currency", + "type": "string" + }, + { + "name": "txn_amount", + "type": "int" + }, + { + "name": "txn_date", + "type": "string" + }, + { + "name": "txn_id", + "type": "string" + }, + { + "name": "txn_status", + "type": "string" + }, + { + "name": "txn_type", + "type": "string" + } + ] + }, + "inputFormat": { + "type": "json", + "flattenSpec": { + "fields": [ + { + "type": "root", + "name": "receiver_account_number" + }, + { + "type": "path", + "name": "sender_account_number", + "expr": "$.sender.account_number" + }, + { + "type": "path", + "name": "sender_ifsc_code", + "expr": "$.sender.ifsc_code" + }, + { + "type": "root", + "name": "receiver_ifsc_code" + }, + { + "type": "root", + "name": "sender_contact_email" + }, + { + "type": "root", + "name": "currency" + }, + { + "type": "root", + "name": "txn_amount" + }, + { + "type": "root", + "name": "txn_date" + }, + { + "type": "root", + "name": "txn_id" + }, + { + "type": "root", + "name": "txn_status" + }, + { + "type": "root", + "name": "txn_type" + } + ] + } + } +} \ No newline at end of file diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/functions/RowDataConverterFunction.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/functions/RowDataConverterFunction.scala new file mode 100644 index 00000000..aec00117 --- /dev/null +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/functions/RowDataConverterFunction.scala @@ -0,0 +1,43 @@ +package org.sunbird.obsrv.functions + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.formats.common.TimestampFormat +import org.apache.flink.formats.json.JsonToRowDataConverters +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper +import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec} +import org.apache.flink.table.data.RowData +import org.slf4j.LoggerFactory +import org.sunbird.obsrv.core.util.{JSONUtil, Util} +import org.sunbird.obsrv.streaming.HudiConnectorConfig +import scala.collection.mutable.{Map => MMap} + +class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] { + + var jsonToRowDataConverters: JsonToRowDataConverters = _ + var objectMapper: ObjectMapper = _ + var hudiSchemaParser: HudiSchemaParser = _ + + private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction]) + + override def open(parameters: Configuration): Unit = { + super.open(parameters) + jsonToRowDataConverters = new JsonToRowDataConverters(false, true, TimestampFormat.SQL) + objectMapper = new ObjectMapper() + hudiSchemaParser = new HudiSchemaParser() + } + + override def map(event: MMap[String, AnyRef]): RowData = { + convertToRowData(event) + } + + def convertToRowData(data: MMap[String, AnyRef]): RowData = { + val eventJson = JSONUtil.serialize(data) + val flattenedData = hudiSchemaParser.parseJson(datasetId, eventJson) + val rowType = hudiSchemaParser.rowTypeMap(datasetId) + val converter: JsonToRowDataConverters.JsonToRowDataConverter = jsonToRowDataConverters.createRowConverter(rowType) + val rowData = converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData] + rowData + } + +} diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala new file mode 100644 index 00000000..4f4f46cf --- /dev/null +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorConfig.scala @@ -0,0 +1,53 @@ +package org.sunbird.obsrv.streaming + +import com.typesafe.config.Config +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.streaming.api.scala.OutputTag +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.configuration.FlinkOptions +import org.sunbird.obsrv.core.streaming.BaseJobConfig + +import scala.collection.mutable + +class HudiConnectorConfig(override val config: Config) extends BaseJobConfig[mutable.Map[String, AnyRef]](config, "Flink-Hudi-Connector") { + + implicit val mapTypeInfo: TypeInformation[mutable.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[mutable.Map[String, AnyRef]]) + implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) + + override def inputTopic(): String = config.getString("kafka.input.topic") + + val kafkaDefaultOutputTopic: String = config.getString("kafka.output.topic") + + override def inputConsumer(): String = config.getString("kafka.groupId") + + override def successTag(): OutputTag[mutable.Map[String, AnyRef]] = OutputTag[mutable.Map[String, AnyRef]]("dummy-events") + + override def failedEventsOutputTag(): OutputTag[mutable.Map[String, AnyRef]] = OutputTag[mutable.Map[String, AnyRef]]("failed-events") + + val kafkaInvalidTopic: String = config.getString("kafka.output.invalid.topic") + + val invalidEventsOutputTag: OutputTag[mutable.Map[String, AnyRef]] = OutputTag[mutable.Map[String, AnyRef]]("invalid-events") + val validEventsOutputTag: OutputTag[mutable.Map[String, AnyRef]] = OutputTag[mutable.Map[String, AnyRef]]("valid-events") + + val invalidEventProducer = "invalid-events-sink" + + + val hudiTableType: String = + if (config.getString("hudi.table.type").equalsIgnoreCase("MERGE_ON_READ")) + HoodieTableType.MERGE_ON_READ.name() + else if (config.getString("hudi.table.type").equalsIgnoreCase("COPY_ON_WRITE")) + HoodieTableType.COPY_ON_WRITE.name() + else HoodieTableType.MERGE_ON_READ.name() + + val hudiBasePath: String = config.getString("hudi.table.base.path") + val hudiCompactionEnabled: Boolean = config.getBoolean("hudi.compaction.enabled") + val hudiWriteTasks: Int = config.getInt("hudi.write.tasks") + + val hmsEnabled: Boolean = if (config.hasPath("hudi.hms.enabled")) config.getBoolean("hudi.hms.enabled") else false + val hmsUsername: String = config.getString("hudi.hms.database.username") + val hmsPassword: String = config.getString("hudi.hms.database.password") + val hmsDatabaseName: String = config.getString("hudi.hms.database.name") + val hmsURI: String = config.getString("hudi.hms.uri") + +} diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala new file mode 100644 index 00000000..fd160820 --- /dev/null +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/HudiConnectorStreamTask.scala @@ -0,0 +1,153 @@ +package org.sunbird.obsrv.streaming + +import com.typesafe.config.ConfigFactory +import org.apache.commons.lang3.StringUtils +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.hudi.common.config.TimestampKeyGeneratorConfig +import org.apache.hudi.configuration.{FlinkOptions, OptionsResolver} +import org.apache.hudi.sink.utils.Pipelines +import org.apache.hudi.util.AvroSchemaConverter +import org.slf4j.LoggerFactory +import org.sunbird.obsrv.core.model.Constants +import org.sunbird.obsrv.core.streaming.{BaseStreamTask, FlinkKafkaConnector} +import org.sunbird.obsrv.core.util.FlinkUtil +import org.sunbird.obsrv.functions.RowDataConverterFunction +import org.sunbird.obsrv.registry.DatasetRegistry +import org.sunbird.obsrv.util.HudiSchemaParser +import org.apache.hudi.config.HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP +import org.apache.hudi.common.config.HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE +import org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS +import java.io.File +import java.sql.Timestamp +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter +import scala.collection.mutable +import scala.collection.mutable.{Map => MMap} + +class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: FlinkKafkaConnector) extends BaseStreamTask[mutable.Map[String, AnyRef]] { + + implicit val mutableMapTypeInfo: TypeInformation[MMap[String, AnyRef]] = TypeExtractor.getForClass(classOf[MMap[String, AnyRef]]) + private val logger = LoggerFactory.getLogger(classOf[HudiConnectorStreamTask]) + def process(): Unit = { + implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config) + process(env) + } + + override def processStream(dataStream: DataStream[mutable.Map[String, AnyRef]]): DataStream[mutable.Map[String, AnyRef]] = { + null + } + + def process(env: StreamExecutionEnvironment): Unit = { + val schemaParser = new HudiSchemaParser() + val dataSourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE) && f.status.equalsIgnoreCase("Live")) + dataSourceConfig.map{ dataSource => + val datasetId = dataSource.datasetId + val dataStream = getMapDataStream(env, config, List(datasetId), config.kafkaConsumerProperties(), consumerSourceName = s"kafka-${datasetId}", kafkaConnector) + .map(new RowDataConverterFunction(config, datasetId)) + + val conf: Configuration = new Configuration() + setHudiBaseConfigurations(conf) + setDatasetConf(conf, datasetId, schemaParser) + logger.info("conf: " + conf.toMap.toString) + val rowType = schemaParser.rowTypeMap(datasetId) + + val hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream) + val pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream) + if (OptionsResolver.needsAsyncCompaction(conf)) { + Pipelines.compact(conf, pipeline) + } else { + Pipelines.clean(conf, pipeline) + } + + }.orElse(List(addDefaultOperator(env, config, kafkaConnector))) + env.execute("Flink-Hudi-Connector") + } + + def addDefaultOperator(env: StreamExecutionEnvironment, config: HudiConnectorConfig, kafkaConnector: FlinkKafkaConnector): DataStreamSink[mutable.Map[String, AnyRef]] = { + val dataStreamSink: DataStreamSink[mutable.Map[String, AnyRef]] = getMapDataStream(env, config, kafkaConnector) + .sinkTo(kafkaConnector.kafkaSink[mutable.Map[String, AnyRef]](config.kafkaDefaultOutputTopic)) + .name(s"hudi-connector-default-sink").uid(s"hudi-connector-default-sink") + .setParallelism(config.downstreamOperatorsParallelism) + dataStreamSink + } + + def setDatasetConf(conf: Configuration, dataset: String, schemaParser: HudiSchemaParser): Unit = { + val datasetSchema = schemaParser.hudiSchemaMap(dataset) + val rowType = schemaParser.rowTypeMap(dataset) + val avroSchema = AvroSchemaConverter.convertToSchema(rowType, dataset.replace("-", "_")) + conf.setString(FlinkOptions.PATH.key, s"${config.hudiBasePath}/${datasetSchema.schema.table}") + conf.setString(FlinkOptions.TABLE_NAME, datasetSchema.schema.table) + conf.setString(FlinkOptions.RECORD_KEY_FIELD.key, datasetSchema.schema.primaryKey) + conf.setString(FlinkOptions.PRECOMBINE_FIELD.key, datasetSchema.schema.timestampColumn) + conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key, datasetSchema.schema.partitionColumn) + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key, avroSchema.toString) + + val partitionField = datasetSchema.schema.columnSpec.filter(f => f.name.equalsIgnoreCase(datasetSchema.schema.partitionColumn)).head + if(partitionField.`type`.equalsIgnoreCase("timestamp") || partitionField.`type`.equalsIgnoreCase("epoch")) { + conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key, datasetSchema.schema.partitionColumn + "_partition") + } + + if (config.hmsEnabled) { + conf.setString("hive_sync.table", datasetSchema.schema.table) + } + } + + private def setHudiBaseConfigurations(conf: Configuration): Unit = { + conf.setString(FlinkOptions.TABLE_TYPE.key, config.hudiTableType) + conf.setBoolean(FlinkOptions.METADATA_ENABLED.key, true) + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE.key, 0.1) + conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key, config.hudiCompactionEnabled) + conf.setInteger("write.tasks", config.hudiWriteTasks) + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 2) + conf.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, "num_or_time") + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true) + conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, 1) + conf.setInteger(FlinkOptions.COMPACTION_TASKS, 1) + conf.setString("hoodie.fs.atomic_creation.support", "s3a") + conf.setString(FlinkOptions.HIVE_SYNC_TABLE_PROPERTIES, "hoodie.datasource.write.drop.partition.columns=true") + conf.setBoolean(DROP_PARTITION_COLUMNS.key, true) + conf.setBoolean(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), true); // Enable dropping columns + conf.setBoolean(SCHEMA_EVOLUTION_ENABLE.key(), true); // Enable schema evolution + conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, "org.apache.hudi.common.model.PartialUpdateAvroPayload") + + if (config.hmsEnabled) { + conf.setBoolean("hive_sync.enabled", config.hmsEnabled) + conf.setString(FlinkOptions.HIVE_SYNC_DB.key(), config.hmsDatabaseName) + conf.setString("hive_sync.username", config.hmsUsername) + conf.setString("hive_sync.password", config.hmsPassword) + conf.setString("hive_sync.mode", "hms") + conf.setBoolean("hive_sync.use_jdbc", false) + conf.setString(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), config.hmsURI) + conf.setString("hoodie.fs.atomic_creation.support", "s3a") + conf.setBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP, true) + } + + } + +} + +object HudiConnectorStreamTask { + def main(args: Array[String]): Unit = { + val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path")) + val config = configFilePath.map { + path => ConfigFactory.parseFile(new File(path)).resolve() + }.getOrElse(ConfigFactory.load("hudi-writer.conf").withFallback(ConfigFactory.systemEnvironment())) + val hudiWriterConfig = new HudiConnectorConfig(config) + val kafkaUtil = new FlinkKafkaConnector(hudiWriterConfig) + val task = new HudiConnectorStreamTask(hudiWriterConfig, kafkaUtil) + task.process() + } + + def getTimestamp(ts: String): Timestamp = { + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX") + val localDateTime = if (StringUtils.isNotBlank(ts)) + LocalDateTime.from(formatter.parse(ts)) + else LocalDateTime.now + Timestamp.valueOf(localDateTime) + } +} diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/TestTimestamp.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/TestTimestamp.scala new file mode 100644 index 00000000..1c9876c0 --- /dev/null +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/streaming/TestTimestamp.scala @@ -0,0 +1,19 @@ +package org.sunbird.obsrv.streaming + +import java.sql.Timestamp +import java.time.{LocalDateTime, ZoneOffset} +import java.time.format.DateTimeFormatter + +object TestTimestamp { + + def main(args: Array[String]): Unit = { + val timestampAsString = "2023-10-15T03:56:27.522+05:30" + val pattern = "yyyy-MM-dd'T'hh:mm:ss.SSSZ" + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX") + val localDateTime = LocalDateTime.from(formatter.parse(timestampAsString)) + val timestamp = Timestamp.valueOf(localDateTime) + println("Timestamp: " + timestamp.toString) + + } + +} diff --git a/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/util/HudiSchemaParser.scala b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/util/HudiSchemaParser.scala new file mode 100644 index 00000000..aa203474 --- /dev/null +++ b/pipeline/hudi-connector/src/main/scala/org/sunbird/obsrv/util/HudiSchemaParser.scala @@ -0,0 +1,140 @@ +package org.sunbird.obsrv.util + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.core.JsonGenerator.Feature +import com.fasterxml.jackson.databind.json.JsonMapper +import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper, SerializationFeature} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.flink.table.types.logical.{BigIntType, BooleanType, DoubleType, IntType, LogicalType, MapType, RowType, VarCharType, TimestampType, DateType} +import org.slf4j.LoggerFactory +import org.sunbird.obsrv.core.model.Constants +import org.sunbird.obsrv.core.util.JSONUtil +import org.sunbird.obsrv.registry.DatasetRegistry +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Date +import scala.collection.mutable + + +case class HudiSchemaSpec(dataset: String, schema: Schema, inputFormat: InputFormat) +case class Schema(table: String, partitionColumn: String, timestampColumn: String, primaryKey: String, columnSpec: List[ColumnSpec]) +case class ColumnSpec(name: String, `type`: String) +case class InputFormat(`type`: String, flattenSpec: Option[JsonFlattenSpec] = None, columns: Option[List[String]] = None) +case class JsonFlattenSpec(fields: List[JsonFieldParserSpec]) +case class JsonFieldParserSpec(`type`: String, name: String, expr: Option[String] = None) + +class HudiSchemaParser { + + private val logger = LoggerFactory.getLogger(classOf[HudiSchemaParser]) + + @transient private val objectMapper = JsonMapper.builder() + .addModule(DefaultScalaModule) + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) + .enable(Feature.WRITE_BIGDECIMAL_AS_PLAIN) + .build() + + val df = new SimpleDateFormat("yyyy-MM-dd") + objectMapper.setSerializationInclusion(Include.NON_ABSENT) + + val hudiSchemaMap = new mutable.HashMap[String, HudiSchemaSpec]() + val rowTypeMap = new mutable.HashMap[String, RowType]() + + readSchema() + + def readSchema(): Unit = { + val datasourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE) && f.status.equalsIgnoreCase("Live")) + datasourceConfig.map{f => + val hudiSchemaSpec = JSONUtil.deserialize[HudiSchemaSpec](f.ingestionSpec) + val dataset = hudiSchemaSpec.dataset + hudiSchemaMap.put(dataset, hudiSchemaSpec) + rowTypeMap.put(dataset, createRowType(hudiSchemaSpec)) + } + } + + private def createRowType(schema: HudiSchemaSpec): RowType = { + val columnSpec = schema.schema.columnSpec + val primaryKey = schema.schema.primaryKey + val partitionColumn = schema.schema.partitionColumn + val timeStampColumn = schema.schema.timestampColumn + val partitionField = schema.schema.columnSpec.filter(f => f.name.equalsIgnoreCase(schema.schema.partitionColumn)).head + val rowTypeMap = mutable.SortedMap[String, LogicalType]() + columnSpec.sortBy(_.name).map { + spec => + val isNullable = if (spec.name.matches(s"$primaryKey|$partitionColumn|$timeStampColumn")) false else true + val columnType = spec.`type` match { + case "string" => new VarCharType(isNullable, 20) + case "double" => new DoubleType(isNullable) + case "long" => new BigIntType(isNullable) + case "int" => new IntType(isNullable) + case "boolean" => new BooleanType(true) + case "map[string, string]" => new MapType(new VarCharType(), new VarCharType()) + case "epoch" => new BigIntType(isNullable) + case _ => new VarCharType(isNullable, 20) + } + rowTypeMap.put(spec.name, columnType) + } + if(partitionField.`type`.equalsIgnoreCase("timestamp") || partitionField.`type`.equalsIgnoreCase("epoch")) { + rowTypeMap.put(partitionField.name + "_partition", new VarCharType(false, 20)) + } + val rowType: RowType = RowType.of(false, rowTypeMap.values.toArray, rowTypeMap.keySet.toArray) + logger.info("rowType: " + rowType) + rowType + } + + def parseJson(dataset: String, event: String): mutable.Map[String, Any] = { + val parserSpec = hudiSchemaMap.get(dataset) + val jsonNode = objectMapper.readTree(event) + val flattenedEventData = mutable.Map[String, Any]() + parserSpec.map { spec => + val columnSpec = spec.schema.columnSpec + val partitionField = spec.schema.columnSpec.filter(f => f.name.equalsIgnoreCase(spec.schema.partitionColumn)).head + spec.inputFormat.flattenSpec.map { + flattenSpec => + flattenSpec.fields.map { + field => + val node = retrieveFieldFromJson(jsonNode, field) + node.map { + nodeValue => + try { + val fieldDataType = columnSpec.filter(_.name.equalsIgnoreCase(field.name)).head.`type` + val fieldValue = fieldDataType match { + case "string" => objectMapper.treeToValue(nodeValue, classOf[String]) + case "int" => objectMapper.treeToValue(nodeValue, classOf[Int]) + case "long" => objectMapper.treeToValue(nodeValue, classOf[Long]) + case "double" => objectMapper.treeToValue(nodeValue, classOf[Double]) + case "epoch" => objectMapper.treeToValue(nodeValue, classOf[Long]) + case _ => objectMapper.treeToValue(nodeValue, classOf[String]) + } + if(field.name.equalsIgnoreCase(partitionField.name)){ + if(fieldDataType.equalsIgnoreCase("timestamp")) { + flattenedEventData.put(field.name + "_partition", df.format(objectMapper.treeToValue(nodeValue, classOf[Timestamp]))) + } + else if(fieldDataType.equalsIgnoreCase("epoch")) { + flattenedEventData.put(field.name + "_partition", df.format(objectMapper.treeToValue(nodeValue, classOf[Long]))) + } + } + flattenedEventData.put(field.name, fieldValue) + } + catch { + case ex: Exception => + logger.info("Hudi Schema Parser - Exception: ", ex.getMessage) + flattenedEventData.put(field.name, null) + } + + }.orElse(flattenedEventData.put(field.name, null)) + } + } + } + logger.info("flattenedEventData: " + flattenedEventData) + flattenedEventData + } + + def retrieveFieldFromJson(jsonNode: JsonNode, field: JsonFieldParserSpec): Option[JsonNode] = { + if (field.`type`.equalsIgnoreCase("path")) { + field.expr.map{ f => jsonNode.at(s"/${f.split("\\.").tail.mkString("/")}") } + } else { + Option(jsonNode.get(field.name)) + } + } +} diff --git a/pipeline/master-data-processor/pom.xml b/pipeline/master-data-processor/pom.xml index 370ec621..0dc1cc60 100644 --- a/pipeline/master-data-processor/pom.xml +++ b/pipeline/master-data-processor/pom.xml @@ -62,6 +62,11 @@ transformer 1.0.0 + + org.sunbird.obsrv.pipeline + druid-router + 1.0.0 + com.github.java-json-tools json-schema-validator diff --git a/pipeline/master-data-processor/src/main/scala/org/sunbird/obsrv/pipeline/task/MasterDataProcessorStreamTask.scala b/pipeline/master-data-processor/src/main/scala/org/sunbird/obsrv/pipeline/task/MasterDataProcessorStreamTask.scala index 7527a6c9..65847bbd 100644 --- a/pipeline/master-data-processor/src/main/scala/org/sunbird/obsrv/pipeline/task/MasterDataProcessorStreamTask.scala +++ b/pipeline/master-data-processor/src/main/scala/org/sunbird/obsrv/pipeline/task/MasterDataProcessorStreamTask.scala @@ -12,6 +12,7 @@ import org.sunbird.obsrv.extractor.task.{ExtractorConfig, ExtractorStreamTask} import org.sunbird.obsrv.pipeline.function.MasterDataProcessorFunction import org.sunbird.obsrv.preprocessor.task.{PipelinePreprocessorConfig, PipelinePreprocessorStreamTask} import org.sunbird.obsrv.transformer.task.{TransformerConfig, TransformerStreamTask} +import org.sunbird.obsrv.router.task.{DruidRouterConfig, DynamicRouterStreamTask} import java.io.File import scala.collection.mutable @@ -50,6 +51,7 @@ class MasterDataProcessorStreamTask(config: Config, masterDataConfig: MasterData val preprocessorTask = new PipelinePreprocessorStreamTask(new PipelinePreprocessorConfig(config), kafkaConnector) val denormalizerTask = new DenormalizerStreamTask(new DenormalizerConfig(config), kafkaConnector) val transformerTask = new TransformerStreamTask(new TransformerConfig(config), kafkaConnector) + val routerTask = new DynamicRouterStreamTask(new DruidRouterConfig(config), kafkaConnector) val transformedStream = transformerTask.processStream( denormalizerTask.processStream( @@ -67,6 +69,7 @@ class MasterDataProcessorStreamTask(config: Config, masterDataConfig: MasterData addDefaultSinks(processedStream, masterDataConfig, kafkaConnector) processedStream.getSideOutput(masterDataConfig.successTag()) + routerTask.processStream(transformedStream) } } diff --git a/pipeline/pom.xml b/pipeline/pom.xml index 9c37e956..220ebff4 100644 --- a/pipeline/pom.xml +++ b/pipeline/pom.xml @@ -23,6 +23,7 @@ druid-router pipeline-merged master-data-processor + hudi-connector diff --git a/stubs/docker/apache-flink-plugins/Dockerfile b/stubs/docker/apache-flink-plugins/Dockerfile index 946e90dd..1351c9c2 100644 --- a/stubs/docker/apache-flink-plugins/Dockerfile +++ b/stubs/docker/apache-flink-plugins/Dockerfile @@ -1,4 +1,4 @@ -FROM sunbird/flink:1.15.2-scala_2.12-jdk-11-source +FROM sanketikahub/flink:1.15.2-scala_2.12-jdk-11-source USER flink RUN mkdir $FLINK_HOME/plugins/s3-fs-presto RUN mkdir $FLINK_HOME/plugins/gs-fs-hadoop