Skip to content

Commit

Permalink
Merge branch 'master' into zach/improve-quote-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
zlwaterfield authored Feb 3, 2025
2 parents c563239 + b3176f6 commit b64216c
Show file tree
Hide file tree
Showing 125 changed files with 2,408 additions and 676 deletions.
12 changes: 7 additions & 5 deletions .flox/env/direnv-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ else
echo "⏩ direnv hook already present in $config_file"
fi

# Add hook to shell config if not already present
if ! grep -q "warn_timeout" "$HOME/.config/direnv/direnv.toml" 2>/dev/null; then
echo "[global]\nwarn_timeout = 0 # Ignore timeout from this issue: https://github.com/direnv/direnv/issues/1065 - added by PostHog's Flox activation hook (../posthog/.flox/env/manifest.toml)" >> "$HOME/.config/direnv/direnv.toml"
echo "✅ Configured ~/.config/direnv/direnv.toml"
# Ignore direnv timeout warning
direnv_config_file="$HOME/.config/direnv/direnv.toml"
mkdir -p "$(dirname "$direnv_config_file")"
if ! grep -q "warn_timeout" "$direnv_config_file" 2>/dev/null; then
echo -e "[global]\nwarn_timeout = 0 # Ignore timeout from this issue: https://github.com/direnv/direnv/issues/1065 - added by PostHog's Flox activation hook (../posthog/.flox/env/manifest.toml)" >> "$direnv_config_file"
echo "✅ Configured $direnv_config_file"
else
echo "~/.config/direnv/direnv.toml already configured"
echo "$direnv_config_file already configured"
fi

echo "💫 direnv is now active"
Expand Down
3 changes: 2 additions & 1 deletion cypress/support/e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const E2E_TESTING = Cypress.env('E2E_TESTING')
// From: https://github.com/cypress-io/cypress/issues/300#issuecomment-688915086
Cypress.on('window:before:load', (win) => {
cy.spy(win.console, 'error')
cy.spy(win.console, 'warn')(win as any)._cypress_posthog_captures = []
cy.spy(win.console, 'warn')
;(win as any)._cypress_posthog_captures = []
})

Cypress.on('window:load', (win) => {
Expand Down
39 changes: 30 additions & 9 deletions dags/definitions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from dagster import Definitions, load_assets_from_modules, ScheduleDefinition
from dagster import (
Definitions,
load_assets_from_modules,
run_status_sensor,
ScheduleDefinition,
RunRequest,
DagsterRunStatus,
)
from dagster_aws.s3.io_manager import s3_pickle_io_manager
from dagster_aws.s3.resources import s3_resource
from dagster import fs_io_manager
Expand All @@ -9,15 +16,10 @@

all_assets = load_assets_from_modules([ch_examples, orm_examples])

# Schedule to run deletes at 10 PM on Saturdays
deletes_schedule = ScheduleDefinition(
job=deletes.deletes_job,
cron_schedule="0 22 * * 6", # At 22:00 (10 PM) on Saturday
execution_timezone="UTC",
name="deletes_schedule",
)

env = "local" if settings.DEBUG else "prod"


# Define resources for different environments
resources_by_env = {
"prod": {
Expand All @@ -33,12 +35,31 @@
},
}


# Get resources for current environment, fallback to local if env not found
resources = resources_by_env.get(env, resources_by_env["local"])


# Schedule to run squash at 10 PM on Saturdays
squash_schedule = ScheduleDefinition(
job=squash_person_overrides,
cron_schedule="0 22 * * 6", # At 22:00 (10 PM) on Saturday
execution_timezone="UTC",
name="squash_person_overrides_schedule",
)


@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS, monitored_jobs=[squash_person_overrides], request_job=deletes.deletes_job
)
def run_deletes_after_squash(context):
return RunRequest(run_key=None)


defs = Definitions(
assets=all_assets,
jobs=[squash_person_overrides, deletes.deletes_job],
schedules=[deletes_schedule],
schedules=[squash_schedule],
sensors=[run_deletes_after_squash],
resources=resources,
)
62 changes: 47 additions & 15 deletions dags/deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class PendingPersonEventDeletesTable:
timestamp: datetime
team_id: int | None = None
cluster: str = settings.CLICKHOUSE_CLUSTER
is_reporting: bool = False

@property
def timestamp_isoformat(self) -> str:
Expand All @@ -100,7 +101,10 @@ def clickhouse_timestamp(self) -> str:

@property
def table_name(self) -> str:
return f"pending_person_deletes_{self.clickhouse_timestamp}"
if self.is_reporting:
return "pending_person_deletes_reporting"
else:
return f"pending_person_deletes_{self.clickhouse_timestamp}"

@property
def qualified_name(self):
Expand All @@ -119,13 +123,20 @@ def create_table_query(self) -> str:
ORDER BY (team_id, key)
"""

@property
def truncate_table_query(self) -> str:
return f"TRUNCATE TABLE {self.qualified_name} ON CLUSTER '{self.cluster}'"

@property
def drop_table_query(self) -> str:
return f"DROP TABLE IF EXISTS {self.qualified_name} ON CLUSTER '{self.cluster}'"

def create(self, client: Client) -> None:
client.execute(self.create_table_query)

def truncate(self, client: Client) -> None:
client.execute(self.truncate_table_query)

def drop(self, client: Client) -> None:
client.execute(self.drop_table_query)

Expand Down Expand Up @@ -266,27 +277,46 @@ def create_pending_person_deletions_table(
return table


@op
def create_reporting_pending_person_deletions_table(
config: DeleteConfig,
cluster: ResourceParam[ClickhouseCluster],
) -> PendingPersonEventDeletesTable:
"""Create a merge tree table in ClickHouse to store pending deletes."""
table = PendingPersonEventDeletesTable(
timestamp=config.parsed_timestamp,
cluster=settings.CLICKHOUSE_CLUSTER,
is_reporting=True,
)
cluster.any_host_by_role(table.create, NodeRole.WORKER).result()
cluster.any_host_by_role(table.truncate, NodeRole.WORKER).result()
return table


@op
def load_pending_person_deletions(
context: OpExecutionContext,
create_pending_person_deletions_table: PendingPersonEventDeletesTable,
cleanup_delete_assets: bool | None = None,
) -> PendingPersonEventDeletesTable:
"""Query postgres using django ORM to get pending person deletions and insert directly into ClickHouse."""

if not create_pending_person_deletions_table.team_id:
# Use Django's queryset iterator for memory efficiency
pending_deletions = AsyncDeletion.objects.filter(
deletion_type=DeletionType.Person,
delete_verified_at__isnull=True,
created_at__lte=create_pending_person_deletions_table.timestamp,
).iterator()
if create_pending_person_deletions_table.is_reporting:
pending_deletions = AsyncDeletion.objects.all().iterator()
else:
pending_deletions = AsyncDeletion.objects.filter(
deletion_type=DeletionType.Person,
team_id=create_pending_person_deletions_table.team_id,
delete_verified_at__isnull=True,
created_at__lte=create_pending_person_deletions_table.timestamp,
).iterator()
if not create_pending_person_deletions_table.team_id:
pending_deletions = AsyncDeletion.objects.filter(
deletion_type=DeletionType.Person,
delete_verified_at__isnull=True,
created_at__lte=create_pending_person_deletions_table.timestamp,
).iterator()
else:
pending_deletions = AsyncDeletion.objects.filter(
deletion_type=DeletionType.Person,
team_id=create_pending_person_deletions_table.team_id,
delete_verified_at__isnull=True,
created_at__lte=create_pending_person_deletions_table.timestamp,
).iterator()

# Process and insert in chunks
chunk_size = 10000
Expand Down Expand Up @@ -471,10 +501,12 @@ def cleanup_delete_assets(
@job
def deletes_job():
"""Job that handles deletion of person events."""
report_person_table = create_reporting_pending_person_deletions_table()
person_table = create_pending_person_deletions_table()
loaded_person_table = load_pending_person_deletions(person_table)
create_deletes_dict_op = create_deletes_dict(loaded_person_table)
load_dict = load_and_verify_deletes_dictionary(create_deletes_dict_op)
delete_events = delete_person_events(load_dict)
waited_mutation = wait_for_delete_mutations(delete_events)
cleanup_delete_assets(person_table, create_deletes_dict_op, waited_mutation)
cleaned = cleanup_delete_assets(person_table, create_deletes_dict_op, waited_mutation)
load_pending_person_deletions(report_person_table, cleaned)
9 changes: 6 additions & 3 deletions dags/person_overrides.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import time
import uuid
from dataclasses import dataclass
Expand Down Expand Up @@ -198,7 +199,7 @@ def overrides_delete_mutation_runner(self) -> MutationRunner:
return MutationRunner(
PERSON_DISTINCT_ID_OVERRIDES_TABLE,
f"""
DELETE WHERE
DELETE FROM {PERSON_DISTINCT_ID_OVERRIDES_TABLE} WHERE
isNotNull(dictGetOrNull(%(name)s, 'version', (team_id, distinct_id)) as snapshot_version)
AND snapshot_version >= version
""",
Expand Down Expand Up @@ -229,11 +230,13 @@ class PopulateSnapshotTableConfig(dagster.Config):
description="The upper bound (non-inclusive) timestamp used when selecting person overrides to be squashed. The "
"value can be provided in any format that is can be parsed by ClickHouse. This value should be far enough in "
"the past that there is no reasonable likelihood that events or overrides prior to this time have not yet been "
"written to the database and replicated to all hosts in the cluster."
"written to the database and replicated to all hosts in the cluster.",
default=(datetime.datetime.now() - datetime.timedelta(days=2)).strftime("%Y-%m-%d %H:%M:%S"),
)
limit: int | None = pydantic.Field(
description="The number of rows to include in the snapshot. If provided, this can be used to limit the total "
"amount of memory consumed by the squash process during execution."
"amount of memory consumed by the squash process during execution.",
default=None,
)


Expand Down
File renamed without changes.
5 changes: 5 additions & 0 deletions dags/tests/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,8 @@ def get_pending_deletes() -> list[AsyncDeletion]:
assert not any(cluster.map_all_hosts(table.exists).result().values())
deletes_dict = PendingDeletesDictionary(source=table)
assert not any(cluster.map_all_hosts(deletes_dict.exists).result().values())
report_table = PendingPersonEventDeletesTable(timestamp=timestamp, is_reporting=True)
assert all(cluster.map_all_hosts(report_table.exists).result().values())

# clean up the reporting table
cluster.map_all_hosts(report_table.drop).result()
File renamed without changes.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit b64216c

Please sign in to comment.