From 42bb38b8afb459bf019542d54ffe5df04e98860f Mon Sep 17 00:00:00 2001 From: Ryan Rymarczyk Date: Wed, 2 Oct 2024 10:50:38 -0400 Subject: [PATCH] FIX: GTFS Compressed Schedule Upload (#447) The S3 sync operation for GTFS compressed parquet files was calling the "GTFSArchive.parquet_path" method to create object upload paths. This method automatically appends ".parquet" to any file passed to it. However the GTFS_ARCHIVE.db.gz file is not supposed to have a ".parquet" appended. This change stops using the "GTFSArchive.parquet_path" method and falls back to a simple path join to create the S3 object upload path. The matches the behavior of the current PROD environment which has not seen these errors. --- .../ingestion/compress_gtfs/gtfs_to_parquet.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py b/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py index 2193b546..1bd829c4 100644 --- a/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py +++ b/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py @@ -305,12 +305,10 @@ def gtfs_to_parquet() -> None: # compress each schedule in feed for schedule in feed.rows(named=True): - schedule_url = schedule["archive_url"] - schedule_pub_dt = schedule["published_dt"] schedule_details = ScheduleDetails( - schedule_url, - schedule_pub_dt, - gtfs_tmp_folder, + file_location=schedule["archive_url"], + published_dt=schedule["published_dt"], + tmp_folder=gtfs_tmp_folder, ) compress_gtfs_schedule(schedule_details) @@ -319,8 +317,9 @@ def gtfs_to_parquet() -> None: year_path = os.path.join(gtfs_tmp_folder, year) pq_folder_to_sqlite(year_path) for file in os.listdir(year_path): - local_path = os.path.join(year_path, file) - upload_path = compressed_gtfs.parquet_path(year, file).s3_uri - upload_file(local_path, upload_path) + upload_file( + file_name=os.path.join(year_path, file), + object_path=os.path.join(compressed_gtfs.s3_uri, year, file), + ) logger.log_complete()