Skip to content

Commit

Permalink
fix: increase the receive_timeout to 24 hours for delete job (#28231)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuziontech authored Feb 3, 2025
1 parent ee89c57 commit fd8baf5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 18 deletions.
25 changes: 14 additions & 11 deletions dags/deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
)
from django.conf import settings
from functools import partial
import uuid

from posthog.clickhouse.cluster import (
ClickhouseCluster,
Mutation,
Expand All @@ -34,7 +36,9 @@ class ClickhouseClusterResource(ConfigurableResource):
client_settings: dict[str, str] = {
"max_execution_time": "0",
"max_memory_usage": "0",
"receive_timeout": f"{10 * 60}",
"receive_timeout": f"{24 * 60 * 60}", # wait 24 hours for a response from CH
"mutations_sync": "0",
"lightweight_deletes_sync": "0",
}

def create_resource(self, context: InitResourceContext) -> ClickhouseCluster:
Expand All @@ -60,17 +64,13 @@ class DeleteConfig(Config):
)
max_execution_time: int = pydantic.Field(
default=0,
description="The maximum amount of time to wait for the dictionary to be loaded before considering the operation "
description="The maximum amount of time to wait for the dictionary load to complete before considering the operation "
"a failure, or 0 to wait an unlimited amount of time.",
)
max_memory_usage: int = pydantic.Field(
default=0,
description="The maximum amount of memory to use for the dictionary, or 0 to use an unlimited amount.",
)
lightweight_deletes_sync: int = pydantic.Field(
default=0,
description="0 is async. 1 is local sync. 2 is cluster sync.",
)

@property
def parsed_timestamp(self) -> datetime:
Expand All @@ -83,7 +83,7 @@ def parsed_timestamp(self) -> datetime:
@dataclass
class PendingPersonEventDeletesTable:
"""
Represents a temporary table storing pending person event deletions.
Represents a table storing pending person event deletions.
"""

timestamp: datetime
Expand All @@ -110,6 +110,12 @@ def table_name(self) -> str:
def qualified_name(self):
return f"{settings.CLICKHOUSE_DATABASE}.{self.table_name}"

@property
def zk_path(self) -> str:
ns_uuid = uuid.uuid4()
testing = f"testing/{ns_uuid}/" if settings.TEST else ""
return f"/clickhouse/tables/{testing}noshard/{self.table_name}"

@property
def create_table_query(self) -> str:
return f"""
Expand All @@ -119,7 +125,7 @@ def create_table_query(self) -> str:
key String,
created_at DateTime,
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/noshard/{self.table_name}', '{{shard}}-{{replica}}')
ENGINE = ReplicatedReplacingMergeTree('{self.zk_path}', '{{shard}}-{{replica}}')
ORDER BY (team_id, key)
"""

Expand Down Expand Up @@ -168,7 +174,6 @@ def checksum(self, client: Client):
@dataclass
class PendingDeletesDictionary:
source: PendingPersonEventDeletesTable
lightweight_deletes_sync: int = 0

@property
def name(self) -> str:
Expand Down Expand Up @@ -258,7 +263,6 @@ def delete_mutation_runner(self) -> MutationRunner:
timestamp <= dictGet('{self.qualified_name}', 'created_at', (team_id, person_id))
""",
{},
settings={"mutations_sync": self.lightweight_deletes_sync},
)


Expand Down Expand Up @@ -381,7 +385,6 @@ def sync_replica(client: Client):

del_dict = PendingDeletesDictionary(
source=load_pending_person_deletions,
lightweight_deletes_sync=config.lightweight_deletes_sync,
)

cluster.any_host_by_role(
Expand Down
8 changes: 1 addition & 7 deletions dags/tests/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,7 @@ def get_pending_deletes() -> list[AsyncDeletion]:

# Run the deletion job
deletes_job.execute_in_process(
run_config={
"ops": {
"create_pending_person_deletions_table": {
"config": {"timestamp": timestamp.isoformat(), "lightweight_deletes_sync": 1}
}
}
},
run_config={"ops": {"create_pending_person_deletions_table": {"config": {"timestamp": timestamp.isoformat()}}}},
resources={"cluster": cluster},
)

Expand Down

0 comments on commit fd8baf5

Please sign in to comment.