From fd8baf5b9e0996114ca333b9d354568b58644d10 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Mon, 3 Feb 2025 14:18:47 -0800 Subject: [PATCH] fix: increase the receive_timeout to 24 hours for delete job (#28231) --- dags/deletes.py | 25 ++++++++++++++----------- dags/tests/test_deletes.py | 8 +------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/dags/deletes.py b/dags/deletes.py index 225a14f62f642..04ee38706aef1 100644 --- a/dags/deletes.py +++ b/dags/deletes.py @@ -15,6 +15,8 @@ ) from django.conf import settings from functools import partial +import uuid + from posthog.clickhouse.cluster import ( ClickhouseCluster, Mutation, @@ -34,7 +36,9 @@ class ClickhouseClusterResource(ConfigurableResource): client_settings: dict[str, str] = { "max_execution_time": "0", "max_memory_usage": "0", - "receive_timeout": f"{10 * 60}", + "receive_timeout": f"{24 * 60 * 60}", # wait 24 hours for a response from CH + "mutations_sync": "0", + "lightweight_deletes_sync": "0", } def create_resource(self, context: InitResourceContext) -> ClickhouseCluster: @@ -60,17 +64,13 @@ class DeleteConfig(Config): ) max_execution_time: int = pydantic.Field( default=0, - description="The maximum amount of time to wait for the dictionary to be loaded before considering the operation " + description="The maximum amount of time to wait for the dictionary load to complete before considering the operation " "a failure, or 0 to wait an unlimited amount of time.", ) max_memory_usage: int = pydantic.Field( default=0, description="The maximum amount of memory to use for the dictionary, or 0 to use an unlimited amount.", ) - lightweight_deletes_sync: int = pydantic.Field( - default=0, - description="0 is async. 1 is local sync. 2 is cluster sync.", - ) @property def parsed_timestamp(self) -> datetime: @@ -83,7 +83,7 @@ def parsed_timestamp(self) -> datetime: @dataclass class PendingPersonEventDeletesTable: """ - Represents a temporary table storing pending person event deletions. + Represents a table storing pending person event deletions. """ timestamp: datetime @@ -110,6 +110,12 @@ def table_name(self) -> str: def qualified_name(self): return f"{settings.CLICKHOUSE_DATABASE}.{self.table_name}" + @property + def zk_path(self) -> str: + ns_uuid = uuid.uuid4() + testing = f"testing/{ns_uuid}/" if settings.TEST else "" + return f"/clickhouse/tables/{testing}noshard/{self.table_name}" + @property def create_table_query(self) -> str: return f""" @@ -119,7 +125,7 @@ def create_table_query(self) -> str: key String, created_at DateTime, ) - ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/noshard/{self.table_name}', '{{shard}}-{{replica}}') + ENGINE = ReplicatedReplacingMergeTree('{self.zk_path}', '{{shard}}-{{replica}}') ORDER BY (team_id, key) """ @@ -168,7 +174,6 @@ def checksum(self, client: Client): @dataclass class PendingDeletesDictionary: source: PendingPersonEventDeletesTable - lightweight_deletes_sync: int = 0 @property def name(self) -> str: @@ -258,7 +263,6 @@ def delete_mutation_runner(self) -> MutationRunner: timestamp <= dictGet('{self.qualified_name}', 'created_at', (team_id, person_id)) """, {}, - settings={"mutations_sync": self.lightweight_deletes_sync}, ) @@ -381,7 +385,6 @@ def sync_replica(client: Client): del_dict = PendingDeletesDictionary( source=load_pending_person_deletions, - lightweight_deletes_sync=config.lightweight_deletes_sync, ) cluster.any_host_by_role( diff --git a/dags/tests/test_deletes.py b/dags/tests/test_deletes.py index 171264bdeb8f0..5f857c0783c28 100644 --- a/dags/tests/test_deletes.py +++ b/dags/tests/test_deletes.py @@ -78,13 +78,7 @@ def get_pending_deletes() -> list[AsyncDeletion]: # Run the deletion job deletes_job.execute_in_process( - run_config={ - "ops": { - "create_pending_person_deletions_table": { - "config": {"timestamp": timestamp.isoformat(), "lightweight_deletes_sync": 1} - } - } - }, + run_config={"ops": {"create_pending_person_deletions_table": {"config": {"timestamp": timestamp.isoformat()}}}}, resources={"cluster": cluster}, )