Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-warehouse): V2 pipeline release #27732

Merged
merged 5 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argume
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible types in assignment (expression has type "list[str] | None", variable has type "list[str]") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "setup_incremental_object" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "dict[str, Any]" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "base_url" to "RESTClient" has incompatible type "str | None"; expected "str" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 2 to "convert_types" has incompatible type "dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] | None"; expected "dict[str, dict[str, Any]] | None" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "exclude_keys" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "Mapping[str, Any]" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible default for argument "resolved_param" (default has type "ResolvedParam | None", argument has type "ResolvedParam") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 2 to "convert_types" has incompatible type "dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] | None"; expected "dict[str, dict[str, Any]] | None" [arg-type]
posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
Expand Down Expand Up @@ -597,6 +599,10 @@ posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: note: d
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: note: def get(self, Type, Sequence[str], /) -> Sequence[str]
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: note: def [_T] get(self, Type, _T, /) -> Sequence[str] | _T
posthog/temporal/data_imports/workflow_activities/sync_new_schemas.py:0: error: Argument "source_id" to "sync_old_schemas_with_new_schemas" has incompatible type "str"; expected "UUID" [arg-type]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "DataWarehouseCredential | Combinable | None") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "str | int | Combinable") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "dict[str, dict[str, str | bool]] | dict[str, str]", variable has type "dict[str, dict[str, str]]") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Item "None" of "dict[str, str] | None" has no attribute "get" [union-attr]
posthog/taxonomy/property_definition_api.py:0: error: Item "AnonymousUser" of "User | AnonymousUser" has no attribute "organization" [union-attr]
posthog/taxonomy/property_definition_api.py:0: error: Item "None" of "Organization | Any | None" has no attribute "is_feature_available" [union-attr]
posthog/taxonomy/property_definition_api.py:0: error: Item "ForeignObjectRel" of "Field[Any, Any] | ForeignObjectRel | GenericForeignKey" has no attribute "cached_col" [union-attr]
Expand Down Expand Up @@ -754,13 +760,6 @@ posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict k
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 20 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 21 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 22 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "FilesystemDestinationClientConfiguration" has no attribute "delta_jobs_per_write" [attr-defined]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "type[FilesystemDestinationClientConfiguration]" has no attribute "delta_jobs_per_write" [attr-defined]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "DataWarehouseCredential | Combinable | None") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "str | int | Combinable") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Right operand of "and" is never evaluated [unreachable]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Statement is unreachable [unreachable]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Name "raw_db_columns" already defined on line 0 [no-redef]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
Expand Down Expand Up @@ -790,8 +789,13 @@ posthog/api/plugin_log_entry.py:0: error: Name "timezone.datetime" is not define
posthog/api/plugin_log_entry.py:0: error: Module "django.utils.timezone" does not explicitly export attribute "datetime" [attr-defined]
posthog/api/plugin_log_entry.py:0: error: Name "timezone.datetime" is not defined [name-defined]
posthog/api/plugin_log_entry.py:0: error: Module "django.utils.timezone" does not explicitly export attribute "datetime" [attr-defined]
posthog/api/sharing.py:0: error: Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable) [union-attr]
posthog/temporal/data_imports/external_data_job.py:0: error: Argument "status" to "update_external_job_status" has incompatible type "str"; expected "Status" [arg-type]
posthog/api/sharing.py:0: error: Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable) [union-attr]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/data_imports/test_end_to_end.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_execute_calls" (hint: "_execute_calls: list[<type>] = ...") [var-annotated]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_execute_async_calls" (hint: "_execute_async_calls: list[<type>] = ...") [var-annotated]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_cursors" (hint: "_cursors: list[<type>] = ...") [var-annotated]
Expand All @@ -808,11 +812,6 @@ posthog/api/test/test_capture.py:0: error: Dict entry 0 has incompatible type "s
posthog/api/test/test_capture.py:0: error: Dict entry 0 has incompatible type "str": "float"; expected "str": "int" [dict-item]
posthog/api/test/test_capture.py:0: error: Dict entry 0 has incompatible type "str": "float"; expected "str": "int" [dict-item]
posthog/api/test/test_capture.py:0: error: Dict entry 0 has incompatible type "str": "float"; expected "str": "int" [dict-item]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/data_imports/test_end_to_end.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Incompatible types in assignment (expression has type "str | int", variable has type "int") [assignment]
posthog/api/test/batch_exports/conftest.py:0: error: Signature of "run" incompatible with supertype "Worker" [override]
posthog/api/test/batch_exports/conftest.py:0: note: Superclass:
Expand Down
1 change: 0 additions & 1 deletion posthog/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ class FlagRequestType(StrEnum):

ENRICHED_DASHBOARD_INSIGHT_IDENTIFIER = "Feature Viewed"
DATA_WAREHOUSE_TASK_QUEUE = "data-warehouse-task-queue"
DATA_WAREHOUSE_TASK_QUEUE_V2 = "v2-data-warehouse-task-queue"
BATCH_EXPORTS_TASK_QUEUE = "batch-exports-task-queue"
SYNC_BATCH_EXPORTS_TASK_QUEUE = "no-sandbox-python-django"
GENERAL_PURPOSE_TASK_QUEUE = "general-purpose-task-queue"
Expand Down
20 changes: 3 additions & 17 deletions posthog/hogql/database/s3_table.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import re
from typing import TYPE_CHECKING, Optional
from typing import Optional

from posthog.clickhouse.client.escape import substitute_params
from posthog.hogql.context import HogQLContext
from posthog.hogql.database.models import FunctionCallTable
from posthog.hogql.errors import ExposedHogQLError
from posthog.hogql.escape_sql import escape_hogql_identifier

if TYPE_CHECKING:
from posthog.warehouse.models import ExternalDataJob


def build_function_call(
url: str,
Expand All @@ -18,10 +15,7 @@ def build_function_call(
access_secret: Optional[str] = None,
structure: Optional[str] = None,
context: Optional[HogQLContext] = None,
pipeline_version: Optional["ExternalDataJob.PipelineVersion"] = None,
) -> str:
from posthog.warehouse.models import ExternalDataJob

raw_params: dict[str, str] = {}

def add_param(value: str, is_sensitive: bool = True) -> str:
Expand All @@ -42,18 +36,10 @@ def return_expr(expr: str) -> str:

# DeltaS3Wrapper format
if format == "DeltaS3Wrapper":
query_folder = "__query_v2" if pipeline_version == ExternalDataJob.PipelineVersion.V2 else "__query"

if url.endswith("/"):
if pipeline_version == ExternalDataJob.PipelineVersion.V2:
escaped_url = add_param(f"{url[:-5]}{query_folder}/*.parquet")
else:
escaped_url = add_param(f"{url[:-1]}{query_folder}/*.parquet")
escaped_url = add_param(f"{url[:-1]}__query/*.parquet")
else:
if pipeline_version == ExternalDataJob.PipelineVersion.V2:
escaped_url = add_param(f"{url[:-4]}{query_folder}/*.parquet")
else:
escaped_url = add_param(f"{url}{query_folder}/*.parquet")
escaped_url = add_param(f"{url}__query/*.parquet")

if structure:
escaped_structure = add_param(structure, False)
Expand Down
3 changes: 0 additions & 3 deletions posthog/management/commands/start_temporal_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from posthog.constants import (
BATCH_EXPORTS_TASK_QUEUE,
DATA_WAREHOUSE_TASK_QUEUE,
DATA_WAREHOUSE_TASK_QUEUE_V2,
GENERAL_PURPOSE_TASK_QUEUE,
SYNC_BATCH_EXPORTS_TASK_QUEUE,
)
Expand All @@ -32,14 +31,12 @@
SYNC_BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_WORKFLOWS,
BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_WORKFLOWS,
DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS,
DATA_WAREHOUSE_TASK_QUEUE_V2: DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS,
GENERAL_PURPOSE_TASK_QUEUE: PROXY_SERVICE_WORKFLOWS + DELETE_PERSONS_WORKFLOWS,
}
ACTIVITIES_DICT = {
SYNC_BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_ACTIVITIES,
BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_ACTIVITIES,
DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES,
DATA_WAREHOUSE_TASK_QUEUE_V2: DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES,
GENERAL_PURPOSE_TASK_QUEUE: PROXY_SERVICE_ACTIVITIES + DELETE_PERSONS_ACTIVITIES,
}

Expand Down
2 changes: 0 additions & 2 deletions posthog/temporal/data_imports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
update_external_data_job_model,
check_billing_limits_activity,
sync_new_schemas_activity,
trigger_pipeline_v2,
)

WORKFLOWS = [ExternalDataJobWorkflow]
Expand All @@ -18,5 +17,4 @@
create_source_templates,
check_billing_limits_activity,
sync_new_schemas_activity,
trigger_pipeline_v2,
]
60 changes: 0 additions & 60 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
import asyncio
import dataclasses
import datetime as dt
import json
import re
import threading
import time

from django.conf import settings
from django.db import close_old_connections
import posthoganalytics
import psutil
from temporalio import activity, exceptions, workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import WorkflowAlreadyStartedError


from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2

# TODO: remove dependency
from posthog.settings.base_variables import TEST
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.common.client import sync_connect
from posthog.temporal.data_imports.workflow_activities.check_billing_limits import (
CheckBillingLimitsActivityInputs,
check_billing_limits_activity,
Expand Down Expand Up @@ -144,32 +134,6 @@ def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInputs) ->
)


@activity.defn
def trigger_pipeline_v2(inputs: ExternalDataWorkflowInputs):
logger = bind_temporal_worker_logger_sync(team_id=inputs.team_id)
logger.debug("Triggering V2 pipeline")

temporal = sync_connect()
try:
asyncio.run(
temporal.start_workflow(
workflow="external-data-job",
arg=dataclasses.asdict(inputs),
id=f"{inputs.external_data_schema_id}-V2",
task_queue=str(DATA_WAREHOUSE_TASK_QUEUE_V2),
retry_policy=RetryPolicy(
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=1,
non_retryable_error_types=["NondeterminismError"],
),
)
)
except WorkflowAlreadyStartedError:
pass

logger.debug("V2 pipeline triggered")


@dataclasses.dataclass
class CreateSourceTemplateInputs:
team_id: int
Expand All @@ -181,22 +145,6 @@ def create_source_templates(inputs: CreateSourceTemplateInputs) -> None:
create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id)


def log_memory_usage():
process = psutil.Process()
logger = bind_temporal_worker_logger_sync(team_id=0)

while True:
memory_info = process.memory_info()
logger.info(f"Memory Usage: RSS = {memory_info.rss / (1024 * 1024):.2f} MB")

time.sleep(10) # Log every 10 seconds


if settings.TEMPORAL_TASK_QUEUE == DATA_WAREHOUSE_TASK_QUEUE_V2:
thread = threading.Thread(target=log_memory_usage, daemon=True)
thread.start()


# TODO: update retry policies
@workflow.defn(name="external-data-job")
class ExternalDataJobWorkflow(PostHogWorkflow):
Expand All @@ -209,14 +157,6 @@ def parse_inputs(inputs: list[str]) -> ExternalDataWorkflowInputs:
async def run(self, inputs: ExternalDataWorkflowInputs):
assert inputs.external_data_schema_id is not None

if settings.TEMPORAL_TASK_QUEUE != DATA_WAREHOUSE_TASK_QUEUE_V2 and not TEST:
await workflow.execute_activity(
trigger_pipeline_v2,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned on standup, I believe with this, any workflow that gets resumed that has triggered this will end up in a bad state. (Should be minimal though so we can just reboot)

inputs,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(maximum_attempts=1),
)

update_inputs = UpdateExternalDataJobStatusInputs(
job_id=None,
status=ExternalDataJob.Status.COMPLETED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ def _get_credentials(self):

def _get_delta_table_uri(self) -> str:
normalized_resource_name = NamingConvention().normalize_identifier(self._resource_name)
# Appended __v2 on to the end of the url so that data of the V2 pipeline isn't the same as V1
return f"{settings.BUCKET_URL}/{self._job.folder_path()}/{normalized_resource_name}__v2"
return f"{settings.BUCKET_URL}/{self._job.folder_path()}/{normalized_resource_name}"

def _evolve_delta_schema(self, schema: pa.Schema) -> deltalake.DeltaTable:
delta_table = self.get_delta_table()
Expand Down
8 changes: 7 additions & 1 deletion posthog/temporal/data_imports/pipelines/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import deltalake as deltalake
from posthog.temporal.common.logger import FilteringBoundLogger
from posthog.temporal.data_imports.pipelines.pipeline.utils import (
_handle_null_columns_with_definitions,
_update_incremental_state,
_get_primary_keys,
_evolve_pyarrow_schema,
_append_debug_column_to_pyarrows_table,
_update_job_row_count,
_update_last_synced_at_sync,
table_from_py_list,
)
from posthog.temporal.data_imports.pipelines.pipeline.delta_table_helper import DeltaTableHelper
Expand Down Expand Up @@ -133,6 +135,7 @@ def _process_pa_table(self, pa_table: pa.Table, index: int):

pa_table = _append_debug_column_to_pyarrows_table(pa_table, self._load_id)
pa_table = _evolve_pyarrow_schema(pa_table, delta_table.schema() if delta_table is not None else None)
pa_table = _handle_null_columns_with_definitions(pa_table, self._resource)

table_primary_keys = _get_primary_keys(self._resource)
delta_table = self._delta_table_helper.write_to_deltalake(
Expand Down Expand Up @@ -173,11 +176,14 @@ def _post_run_operations(self, row_count: int):
process.kill()

file_uris = delta_table.file_uris()
self._logger.info(f"Preparing S3 files - total parquet files: {len(file_uris)}")
self._logger.debug(f"Preparing S3 files - total parquet files: {len(file_uris)}")
prepare_s3_files_for_querying(
self._job.folder_path(), self._resource_name, file_uris, ExternalDataJob.PipelineVersion.V2
)

self._logger.debug("Updating last synced at timestamp on schema")
_update_last_synced_at_sync(self._schema, self._job)

self._logger.debug("Validating schema and updating table")

validate_schema_and_update_table_sync(
Expand Down
Loading
Loading