Skip to content

Commit

Permalink
FIX: Reset RT_ALERTS Ingestion dataset on schema change (#457)
Browse files Browse the repository at this point in the history
The RT_ALERTS dataset that we ingest produces parquet files that contain columns consisting of struct types. If the struct type of the column changes, a newly ingested dataset can not be merged with the existing dataset from S3.

If this situation emerges, the S3 dataset will be over-written by the newly ingested dataset, avoiding the merge errors for the day. This will only be applied to Alerts datasets, as the contents of Alerts RT data does not change often throughout the day. Since Alerts events are persisted with started/complete/modified timestamp fields.
  • Loading branch information
rymarczy authored Oct 25, 2024
1 parent b67232c commit e08367a
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions src/lamp_py/ingestion/convert_gtfs_rt.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,23 @@ def make_hash_dataset(self, table: pyarrow.Table, local_path: str) -> pyarrow.da

if self.sync_with_s3(local_path):
hash_gtfs_rt_parquet(local_path)
out_ds = pd.dataset(
[
pd.dataset(table),
pd.dataset(local_path),
]
)
# RT_ALERTS parquet files contain columns with nested structure types
# if a new nested field is ingested, combining of the new and existing nested column is not possible
# this try/except is meant to catch that error and reset the schema for the sevice day to the new nested structure
# RT_ALERTS updates are essentially the same throughout a service day so resetting the
# dataset will have minimal impact on archived data
try:
out_ds = pd.dataset(
[
pd.dataset(table),
pd.dataset(local_path),
]
)
except pyarrow.ArrowTypeError as exception:
if self.config_type == ConfigType.RT_ALERTS:
out_ds = pd.dataset(table)
else:
raise exception

return out_ds

Expand Down

0 comments on commit e08367a

Please sign in to comment.