diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 711d9d621d42d..dcfdd0e7427ae 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -287,6 +287,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs): run_id=job_id, schema_id=inputs.external_data_schema_id, source_id=inputs.external_data_source_id, + reset_pipeline=inputs.reset_pipeline, ) timeout_params = ( diff --git a/posthog/temporal/data_imports/workflow_activities/import_data_sync.py b/posthog/temporal/data_imports/workflow_activities/import_data_sync.py index 80c2c5b829692..0d566087520a5 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data_sync.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data_sync.py @@ -2,7 +2,7 @@ import uuid from datetime import datetime from dateutil import parser -from typing import Any +from typing import Any, Optional from django.conf import settings from django.db import close_old_connections @@ -35,6 +35,7 @@ class ImportDataActivityInputs: schema_id: uuid.UUID source_id: uuid.UUID run_id: str + reset_pipeline: Optional[bool] = None def process_incremental_last_value(value: Any | None, field_type: IncrementalFieldType | None) -> Any | None: @@ -92,7 +93,11 @@ def import_data_activity_sync(inputs: ImportDataActivityInputs): schema: ExternalDataSchema | None = model.schema assert schema is not None - reset_pipeline = schema.sync_type_config.get("reset_pipeline", False) is True + + if inputs.reset_pipeline is not None: + reset_pipeline = inputs.reset_pipeline + else: + reset_pipeline = schema.sync_type_config.get("reset_pipeline", False) is True logger.debug(f"schema.sync_type_config = {schema.sync_type_config}") logger.debug(f"reset_pipeline = {reset_pipeline}") diff --git a/posthog/temporal/utils.py b/posthog/temporal/utils.py index a1c22b0e0827f..23d8e406ea8c0 100644 --- a/posthog/temporal/utils.py +++ b/posthog/temporal/utils.py @@ -1,4 +1,5 @@ import dataclasses +from typing import Optional import uuid @@ -9,3 +10,4 @@ class ExternalDataWorkflowInputs: external_data_source_id: uuid.UUID external_data_schema_id: uuid.UUID | None = None billable: bool = True + reset_pipeline: Optional[bool] = None