Skip to content

Commit

Permalink
chore(data-warehouse): Moved reset_pipeline from source inputs to sch…
Browse files Browse the repository at this point in the history
…ema sync config (#27862)
  • Loading branch information
Gilbert09 authored Jan 24, 2025
1 parent feee9c7 commit d816645
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 41 deletions.
9 changes: 0 additions & 9 deletions posthog/temporal/data_imports/pipelines/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import uuid
from posthog.warehouse.models import ExternalDataJob
from django.db.models import F
from posthog.warehouse.models.external_data_source import ExternalDataSource
from posthog.warehouse.util import database_sync_to_async


Expand All @@ -13,10 +11,3 @@ def aget_external_data_job(team_id, job_id):
@database_sync_to_async
def aupdate_job_count(job_id: str, team_id: int, count: int):
ExternalDataJob.objects.filter(id=job_id, team_id=team_id).update(rows_synced=F("rows_synced") + count)


@database_sync_to_async
def aremove_reset_pipeline(source_id: uuid.UUID):
source = ExternalDataSource.objects.get(id=source_id)
source.job_inputs.pop("reset_pipeline", None)
source.save()
7 changes: 3 additions & 4 deletions posthog/temporal/data_imports/pipelines/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from posthog.temporal.data_imports.pipelines.pipeline.hogql_schema import HogQLSchema
from posthog.temporal.data_imports.pipelines.pipeline_sync import validate_schema_and_update_table_sync
from posthog.temporal.data_imports.util import prepare_s3_files_for_querying
from posthog.warehouse.models import DataWarehouseTable, ExternalDataJob, ExternalDataSchema, ExternalDataSource
from posthog.warehouse.models import DataWarehouseTable, ExternalDataJob, ExternalDataSchema


class PipelineNonDLT:
Expand Down Expand Up @@ -68,9 +68,8 @@ def run(self):
self._logger.debug("Deleting existing table due to reset_pipeline being set")
self._delta_table_helper.reset_table()

source: ExternalDataSource = self._job.pipeline
source.job_inputs.pop("reset_pipeline", None)
source.save()
self._schema.sync_type_config.pop("reset_pipeline", None)
self._schema.save()

for item in self._resource:
py_table = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ def import_data_activity_sync(inputs: ImportDataActivityInputs):

_trim_source_job_inputs(model.pipeline)

reset_pipeline = model.pipeline.job_inputs.get("reset_pipeline", "False") == "True"
schema: ExternalDataSchema | None = model.schema
assert schema is not None
reset_pipeline = schema.sync_type_config.get("reset_pipeline", False) is True

schema = (
ExternalDataSchema.objects.prefetch_related("source")
Expand Down Expand Up @@ -541,7 +543,6 @@ def _run(
rows_synced=F("rows_synced") + total_rows_synced
)

source = ExternalDataSource.objects.get(id=inputs.source_id)
source.job_inputs.pop("reset_pipeline", None)

source.save()
schema = ExternalDataSchema.objects.get(id=inputs.schema_id)
schema.sync_type_config.pop("reset_pipeline", None)
schema.save()
24 changes: 13 additions & 11 deletions posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ async def _run(
continue
assert name in (res.columns or [])

await sync_to_async(source.refresh_from_db)()
assert source.job_inputs.get("reset_pipeline", None) is None
await sync_to_async(schema.refresh_from_db)()
assert schema.sync_type_config.get("reset_pipeline", None) is None

return workflow_id, inputs

Expand Down Expand Up @@ -520,8 +520,9 @@ async def test_reset_pipeline(team, stripe_balance_transaction):
schema_name="BalanceTransaction",
table_name="stripe_balancetransaction",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id", "reset_pipeline": "True"},
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
mock_data_response=stripe_balance_transaction["data"],
sync_type_config={"reset_pipeline": True},
)


Expand Down Expand Up @@ -1249,23 +1250,24 @@ async def test_delete_table_on_reset(team, stripe_balance_transaction):
schema_name="BalanceTransaction",
table_name="stripe_balancetransaction",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id", "reset_pipeline": "True"},
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
mock_data_response=stripe_balance_transaction["data"],
sync_type_config={"reset_pipeline": True},
)

source = await sync_to_async(ExternalDataSource.objects.get)(id=inputs.external_data_source_id)
schema = await sync_to_async(ExternalDataSchema.objects.get)(id=inputs.external_data_schema_id)

assert source.job_inputs is not None and isinstance(source.job_inputs, dict)
source.job_inputs["reset_pipeline"] = "True"
assert schema.sync_type_config is not None and isinstance(schema.sync_type_config, dict)
schema.sync_type_config["reset_pipeline"] = True

await sync_to_async(source.save)()
await sync_to_async(schema.save)()

await _execute_run(str(uuid.uuid4()), inputs, stripe_balance_transaction["data"])

mock_delta_table_delete.assert_called()
mock_s3_delete.assert_called()

await sync_to_async(source.refresh_from_db)()
await sync_to_async(schema.refresh_from_db)()

assert source.job_inputs is not None and isinstance(source.job_inputs, dict)
assert "reset_pipeline" not in source.job_inputs.keys()
assert schema.sync_type_config is not None and isinstance(schema.sync_type_config, dict)
assert "reset_pipeline" not in schema.sync_type_config.keys()
10 changes: 4 additions & 6 deletions posthog/warehouse/api/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ def update(self, instance: ExternalDataSchema, validated_data: dict[str, Any]) -
sync_external_data_job_workflow(instance, create=False)

if trigger_refresh:
source: ExternalDataSource = instance.source
source.job_inputs.update({"reset_pipeline": True})
source.save()
instance.sync_type_config.update({"reset_pipeline": True})
validated_data["sync_type_config"].update({"reset_pipeline": True})

trigger_external_data_workflow(instance)

return super().update(instance, validated_data)
Expand Down Expand Up @@ -266,9 +266,7 @@ def resync(self, request: Request, *args: Any, **kwargs: Any):
if latest_running_job and latest_running_job.workflow_id and latest_running_job.status == "Running":
cancel_external_data_workflow(latest_running_job.workflow_id)

source: ExternalDataSource = instance.source
source.job_inputs.update({"reset_pipeline": True})
source.save()
instance.sync_type_config.update({"reset_pipeline": True})

try:
trigger_external_data_workflow(instance)
Expand Down
9 changes: 4 additions & 5 deletions posthog/warehouse/api/test/test_external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ def test_update_schema_change_sync_type(self):

assert response.status_code == 200
mock_trigger_external_data_workflow.assert_called_once()
source.refresh_from_db()
assert source.job_inputs.get("reset_pipeline") == "True"
schema.refresh_from_db()
assert schema.sync_type_config.get("reset_pipeline") is True

def test_update_schema_change_sync_type_incremental_field(self):
source = ExternalDataSource.objects.create(
Expand All @@ -211,10 +211,9 @@ def test_update_schema_change_sync_type_incremental_field(self):
assert response.status_code == 200
mock_trigger_external_data_workflow.assert_called_once()

source.refresh_from_db()
assert source.job_inputs.get("reset_pipeline") == "True"

schema.refresh_from_db()

assert schema.sync_type_config.get("reset_pipeline") is True
assert schema.sync_type_config.get("incremental_field") == "field"
assert schema.sync_type_config.get("incremental_field_type") == "integer"

Expand Down
2 changes: 1 addition & 1 deletion posthog/warehouse/models/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class SyncFrequency(models.TextChoices):
last_synced_at = models.DateTimeField(null=True, blank=True)
sync_type = models.CharField(max_length=128, choices=SyncType.choices, null=True, blank=True)

# { "incremental_field": string, "incremental_field_type": string, "incremental_field_last_value": any, "incremental_field_last_value_v2": any }
# { "incremental_field": string, "incremental_field_type": string, "incremental_field_last_value": any, "incremental_field_last_value_v2": any, "reset_pipeline": bool }
sync_type_config = models.JSONField(
default=dict,
blank=True,
Expand Down

0 comments on commit d816645

Please sign in to comment.