From d81664588934bc73b7157d3343bb36bd56d78e7b Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 24 Jan 2025 17:20:18 +0100 Subject: [PATCH] chore(data-warehouse): Moved reset_pipeline from source inputs to schema sync config (#27862) --- .../data_imports/pipelines/helpers.py | 9 ------- .../pipelines/pipeline/pipeline.py | 7 +++--- .../workflow_activities/import_data_sync.py | 11 +++++---- .../tests/data_imports/test_end_to_end.py | 24 ++++++++++--------- posthog/warehouse/api/external_data_schema.py | 10 ++++---- .../api/test/test_external_data_schema.py | 9 ++++--- .../warehouse/models/external_data_schema.py | 2 +- 7 files changed, 31 insertions(+), 41 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/helpers.py b/posthog/temporal/data_imports/pipelines/helpers.py index d0cc153f4e11d..9be1f1111e236 100644 --- a/posthog/temporal/data_imports/pipelines/helpers.py +++ b/posthog/temporal/data_imports/pipelines/helpers.py @@ -1,7 +1,5 @@ -import uuid from posthog.warehouse.models import ExternalDataJob from django.db.models import F -from posthog.warehouse.models.external_data_source import ExternalDataSource from posthog.warehouse.util import database_sync_to_async @@ -13,10 +11,3 @@ def aget_external_data_job(team_id, job_id): @database_sync_to_async def aupdate_job_count(job_id: str, team_id: int, count: int): ExternalDataJob.objects.filter(id=job_id, team_id=team_id).update(rows_synced=F("rows_synced") + count) - - -@database_sync_to_async -def aremove_reset_pipeline(source_id: uuid.UUID): - source = ExternalDataSource.objects.get(id=source_id) - source.job_inputs.pop("reset_pipeline", None) - source.save() diff --git a/posthog/temporal/data_imports/pipelines/pipeline/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline/pipeline.py index c1a8b95bb0abe..10054464a43d9 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline/pipeline.py @@ -19,7 +19,7 @@ from posthog.temporal.data_imports.pipelines.pipeline.hogql_schema import HogQLSchema from posthog.temporal.data_imports.pipelines.pipeline_sync import validate_schema_and_update_table_sync from posthog.temporal.data_imports.util import prepare_s3_files_for_querying -from posthog.warehouse.models import DataWarehouseTable, ExternalDataJob, ExternalDataSchema, ExternalDataSource +from posthog.warehouse.models import DataWarehouseTable, ExternalDataJob, ExternalDataSchema class PipelineNonDLT: @@ -68,9 +68,8 @@ def run(self): self._logger.debug("Deleting existing table due to reset_pipeline being set") self._delta_table_helper.reset_table() - source: ExternalDataSource = self._job.pipeline - source.job_inputs.pop("reset_pipeline", None) - source.save() + self._schema.sync_type_config.pop("reset_pipeline", None) + self._schema.save() for item in self._resource: py_table = None diff --git a/posthog/temporal/data_imports/workflow_activities/import_data_sync.py b/posthog/temporal/data_imports/workflow_activities/import_data_sync.py index a9a058bb52261..c4d509a72b4dc 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data_sync.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data_sync.py @@ -90,7 +90,9 @@ def import_data_activity_sync(inputs: ImportDataActivityInputs): _trim_source_job_inputs(model.pipeline) - reset_pipeline = model.pipeline.job_inputs.get("reset_pipeline", "False") == "True" + schema: ExternalDataSchema | None = model.schema + assert schema is not None + reset_pipeline = schema.sync_type_config.get("reset_pipeline", False) is True schema = ( ExternalDataSchema.objects.prefetch_related("source") @@ -541,7 +543,6 @@ def _run( rows_synced=F("rows_synced") + total_rows_synced ) - source = ExternalDataSource.objects.get(id=inputs.source_id) - source.job_inputs.pop("reset_pipeline", None) - - source.save() + schema = ExternalDataSchema.objects.get(id=inputs.schema_id) + schema.sync_type_config.pop("reset_pipeline", None) + schema.save() diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 5bebcd72a5f57..b5b2ab42b6632 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -172,8 +172,8 @@ async def _run( continue assert name in (res.columns or []) - await sync_to_async(source.refresh_from_db)() - assert source.job_inputs.get("reset_pipeline", None) is None + await sync_to_async(schema.refresh_from_db)() + assert schema.sync_type_config.get("reset_pipeline", None) is None return workflow_id, inputs @@ -520,8 +520,9 @@ async def test_reset_pipeline(team, stripe_balance_transaction): schema_name="BalanceTransaction", table_name="stripe_balancetransaction", source_type="Stripe", - job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id", "reset_pipeline": "True"}, + job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, mock_data_response=stripe_balance_transaction["data"], + sync_type_config={"reset_pipeline": True}, ) @@ -1249,23 +1250,24 @@ async def test_delete_table_on_reset(team, stripe_balance_transaction): schema_name="BalanceTransaction", table_name="stripe_balancetransaction", source_type="Stripe", - job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id", "reset_pipeline": "True"}, + job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, mock_data_response=stripe_balance_transaction["data"], + sync_type_config={"reset_pipeline": True}, ) - source = await sync_to_async(ExternalDataSource.objects.get)(id=inputs.external_data_source_id) + schema = await sync_to_async(ExternalDataSchema.objects.get)(id=inputs.external_data_schema_id) - assert source.job_inputs is not None and isinstance(source.job_inputs, dict) - source.job_inputs["reset_pipeline"] = "True" + assert schema.sync_type_config is not None and isinstance(schema.sync_type_config, dict) + schema.sync_type_config["reset_pipeline"] = True - await sync_to_async(source.save)() + await sync_to_async(schema.save)() await _execute_run(str(uuid.uuid4()), inputs, stripe_balance_transaction["data"]) mock_delta_table_delete.assert_called() mock_s3_delete.assert_called() - await sync_to_async(source.refresh_from_db)() + await sync_to_async(schema.refresh_from_db)() - assert source.job_inputs is not None and isinstance(source.job_inputs, dict) - assert "reset_pipeline" not in source.job_inputs.keys() + assert schema.sync_type_config is not None and isinstance(schema.sync_type_config, dict) + assert "reset_pipeline" not in schema.sync_type_config.keys() diff --git a/posthog/warehouse/api/external_data_schema.py b/posthog/warehouse/api/external_data_schema.py index 56208208d5eaf..b3bcafcb45d4e 100644 --- a/posthog/warehouse/api/external_data_schema.py +++ b/posthog/warehouse/api/external_data_schema.py @@ -184,9 +184,9 @@ def update(self, instance: ExternalDataSchema, validated_data: dict[str, Any]) - sync_external_data_job_workflow(instance, create=False) if trigger_refresh: - source: ExternalDataSource = instance.source - source.job_inputs.update({"reset_pipeline": True}) - source.save() + instance.sync_type_config.update({"reset_pipeline": True}) + validated_data["sync_type_config"].update({"reset_pipeline": True}) + trigger_external_data_workflow(instance) return super().update(instance, validated_data) @@ -266,9 +266,7 @@ def resync(self, request: Request, *args: Any, **kwargs: Any): if latest_running_job and latest_running_job.workflow_id and latest_running_job.status == "Running": cancel_external_data_workflow(latest_running_job.workflow_id) - source: ExternalDataSource = instance.source - source.job_inputs.update({"reset_pipeline": True}) - source.save() + instance.sync_type_config.update({"reset_pipeline": True}) try: trigger_external_data_workflow(instance) diff --git a/posthog/warehouse/api/test/test_external_data_schema.py b/posthog/warehouse/api/test/test_external_data_schema.py index b63f3f1dfab4f..ebdb357085936 100644 --- a/posthog/warehouse/api/test/test_external_data_schema.py +++ b/posthog/warehouse/api/test/test_external_data_schema.py @@ -183,8 +183,8 @@ def test_update_schema_change_sync_type(self): assert response.status_code == 200 mock_trigger_external_data_workflow.assert_called_once() - source.refresh_from_db() - assert source.job_inputs.get("reset_pipeline") == "True" + schema.refresh_from_db() + assert schema.sync_type_config.get("reset_pipeline") is True def test_update_schema_change_sync_type_incremental_field(self): source = ExternalDataSource.objects.create( @@ -211,10 +211,9 @@ def test_update_schema_change_sync_type_incremental_field(self): assert response.status_code == 200 mock_trigger_external_data_workflow.assert_called_once() - source.refresh_from_db() - assert source.job_inputs.get("reset_pipeline") == "True" - schema.refresh_from_db() + + assert schema.sync_type_config.get("reset_pipeline") is True assert schema.sync_type_config.get("incremental_field") == "field" assert schema.sync_type_config.get("incremental_field_type") == "integer" diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index 423468a9ce945..e48ec83fbcbb0 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -54,7 +54,7 @@ class SyncFrequency(models.TextChoices): last_synced_at = models.DateTimeField(null=True, blank=True) sync_type = models.CharField(max_length=128, choices=SyncType.choices, null=True, blank=True) - # { "incremental_field": string, "incremental_field_type": string, "incremental_field_last_value": any, "incremental_field_last_value_v2": any } + # { "incremental_field": string, "incremental_field_type": string, "incremental_field_last_value": any, "incremental_field_last_value_v2": any, "reset_pipeline": bool } sync_type_config = models.JSONField( default=dict, blank=True,