Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(data-warehouse): Moved reset_pipeline from source inputs to schema sync config #27862

Merged
merged 2 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions posthog/temporal/data_imports/pipelines/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import uuid
from posthog.warehouse.models import ExternalDataJob
from django.db.models import F
from posthog.warehouse.models.external_data_source import ExternalDataSource
from posthog.warehouse.util import database_sync_to_async


Expand All @@ -13,10 +11,3 @@ def aget_external_data_job(team_id, job_id):
@database_sync_to_async
def aupdate_job_count(job_id: str, team_id: int, count: int):
ExternalDataJob.objects.filter(id=job_id, team_id=team_id).update(rows_synced=F("rows_synced") + count)


@database_sync_to_async
def aremove_reset_pipeline(source_id: uuid.UUID):
source = ExternalDataSource.objects.get(id=source_id)
source.job_inputs.pop("reset_pipeline", None)
source.save()
7 changes: 3 additions & 4 deletions posthog/temporal/data_imports/pipelines/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from posthog.temporal.data_imports.pipelines.pipeline.hogql_schema import HogQLSchema
from posthog.temporal.data_imports.pipelines.pipeline_sync import validate_schema_and_update_table_sync
from posthog.temporal.data_imports.util import prepare_s3_files_for_querying
from posthog.warehouse.models import DataWarehouseTable, ExternalDataJob, ExternalDataSchema, ExternalDataSource
from posthog.warehouse.models import DataWarehouseTable, ExternalDataJob, ExternalDataSchema


class PipelineNonDLT:
Expand Down Expand Up @@ -68,9 +68,8 @@ def run(self):
self._logger.debug("Deleting existing table due to reset_pipeline being set")
self._delta_table_helper.reset_table()

source: ExternalDataSource = self._job.pipeline
source.job_inputs.pop("reset_pipeline", None)
source.save()
self._schema.sync_type_config.pop("reset_pipeline", None)
self._schema.save()

for item in self._resource:
py_table = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ def import_data_activity_sync(inputs: ImportDataActivityInputs):

_trim_source_job_inputs(model.pipeline)

reset_pipeline = model.pipeline.job_inputs.get("reset_pipeline", "False") == "True"
schema: ExternalDataSchema | None = model.schema
assert schema is not None
reset_pipeline = schema.sync_type_config.get("reset_pipeline", False) is True

schema = (
ExternalDataSchema.objects.prefetch_related("source")
Expand Down Expand Up @@ -541,7 +543,6 @@ def _run(
rows_synced=F("rows_synced") + total_rows_synced
)

source = ExternalDataSource.objects.get(id=inputs.source_id)
source.job_inputs.pop("reset_pipeline", None)

source.save()
schema = ExternalDataSchema.objects.get(id=inputs.schema_id)
schema.sync_type_config.pop("reset_pipeline", None)
schema.save()
24 changes: 13 additions & 11 deletions posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ async def _run(
continue
assert name in (res.columns or [])

await sync_to_async(source.refresh_from_db)()
assert source.job_inputs.get("reset_pipeline", None) is None
await sync_to_async(schema.refresh_from_db)()
assert schema.sync_type_config.get("reset_pipeline", None) is None

return workflow_id, inputs

Expand Down Expand Up @@ -520,8 +520,9 @@ async def test_reset_pipeline(team, stripe_balance_transaction):
schema_name="BalanceTransaction",
table_name="stripe_balancetransaction",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id", "reset_pipeline": "True"},
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
mock_data_response=stripe_balance_transaction["data"],
sync_type_config={"reset_pipeline": True},
)


Expand Down Expand Up @@ -1249,23 +1250,24 @@ async def test_delete_table_on_reset(team, stripe_balance_transaction):
schema_name="BalanceTransaction",
table_name="stripe_balancetransaction",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id", "reset_pipeline": "True"},
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
mock_data_response=stripe_balance_transaction["data"],
sync_type_config={"reset_pipeline": True},
)

source = await sync_to_async(ExternalDataSource.objects.get)(id=inputs.external_data_source_id)
schema = await sync_to_async(ExternalDataSchema.objects.get)(id=inputs.external_data_schema_id)

assert source.job_inputs is not None and isinstance(source.job_inputs, dict)
source.job_inputs["reset_pipeline"] = "True"
assert schema.sync_type_config is not None and isinstance(schema.sync_type_config, dict)
schema.sync_type_config["reset_pipeline"] = True

await sync_to_async(source.save)()
await sync_to_async(schema.save)()

await _execute_run(str(uuid.uuid4()), inputs, stripe_balance_transaction["data"])

mock_delta_table_delete.assert_called()
mock_s3_delete.assert_called()

await sync_to_async(source.refresh_from_db)()
await sync_to_async(schema.refresh_from_db)()

assert source.job_inputs is not None and isinstance(source.job_inputs, dict)
assert "reset_pipeline" not in source.job_inputs.keys()
assert schema.sync_type_config is not None and isinstance(schema.sync_type_config, dict)
assert "reset_pipeline" not in schema.sync_type_config.keys()
10 changes: 4 additions & 6 deletions posthog/warehouse/api/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ def update(self, instance: ExternalDataSchema, validated_data: dict[str, Any]) -
sync_external_data_job_workflow(instance, create=False)

if trigger_refresh:
source: ExternalDataSource = instance.source
source.job_inputs.update({"reset_pipeline": True})
source.save()
instance.sync_type_config.update({"reset_pipeline": True})
validated_data["sync_type_config"].update({"reset_pipeline": True})

trigger_external_data_workflow(instance)

return super().update(instance, validated_data)
Expand Down Expand Up @@ -266,9 +266,7 @@ def resync(self, request: Request, *args: Any, **kwargs: Any):
if latest_running_job and latest_running_job.workflow_id and latest_running_job.status == "Running":
cancel_external_data_workflow(latest_running_job.workflow_id)

source: ExternalDataSource = instance.source
source.job_inputs.update({"reset_pipeline": True})
source.save()
instance.sync_type_config.update({"reset_pipeline": True})

try:
trigger_external_data_workflow(instance)
Expand Down
9 changes: 4 additions & 5 deletions posthog/warehouse/api/test/test_external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ def test_update_schema_change_sync_type(self):

assert response.status_code == 200
mock_trigger_external_data_workflow.assert_called_once()
source.refresh_from_db()
assert source.job_inputs.get("reset_pipeline") == "True"
schema.refresh_from_db()
assert schema.sync_type_config.get("reset_pipeline") is True

def test_update_schema_change_sync_type_incremental_field(self):
source = ExternalDataSource.objects.create(
Expand All @@ -211,10 +211,9 @@ def test_update_schema_change_sync_type_incremental_field(self):
assert response.status_code == 200
mock_trigger_external_data_workflow.assert_called_once()

source.refresh_from_db()
assert source.job_inputs.get("reset_pipeline") == "True"

schema.refresh_from_db()

assert schema.sync_type_config.get("reset_pipeline") is True
assert schema.sync_type_config.get("incremental_field") == "field"
assert schema.sync_type_config.get("incremental_field_type") == "integer"

Expand Down
2 changes: 1 addition & 1 deletion posthog/warehouse/models/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class SyncFrequency(models.TextChoices):
last_synced_at = models.DateTimeField(null=True, blank=True)
sync_type = models.CharField(max_length=128, choices=SyncType.choices, null=True, blank=True)

# { "incremental_field": string, "incremental_field_type": string, "incremental_field_last_value": any, "incremental_field_last_value_v2": any }
# { "incremental_field": string, "incremental_field_type": string, "incremental_field_last_value": any, "incremental_field_last_value_v2": any, "reset_pipeline": bool }
sync_type_config = models.JSONField(
default=dict,
blank=True,
Expand Down
Loading