From d796f420e974785dfd2d8cb24b4f1d51cdf7652b Mon Sep 17 00:00:00 2001 From: Ryan Rymarczyk Date: Sat, 28 Sep 2024 06:31:08 -0400 Subject: [PATCH] FEAT: Join BUS GTFS-RT Vehicle Events with Transit Master Vehicle Events (#440) This change allows for the joining of Bus events from GTFS-RT and Transit Master (TM) data sources. GTFS-RT events are joined to TM events with an "asof" join. This type of join first performs a regular LEFT JOIN on the columns of "route_id", "trip_id", "vehicle_label" and "stop_id" and then performs a nearest match on "stop_sequence". The resulting dataframe retains a "stop_sequence" (from gtfs-rt) and "tm_stop_sequence" column to verify accuracy of stop_sequence nearest join. Asana Task: https://app.asana.com/0/1205827492903547/1207771349226047 --- .../{vehicle_events.py => event_files.py} | 2 +- ...gtfs_rt_ingestion.py => events_gtfs_rt.py} | 30 +++++----- .../{gtfs.py => events_gtfs_schedule.py} | 0 .../bus_performance_manager/events_joined.py | 60 +++++++++++++++++++ .../{tm_ingestion.py => events_tm.py} | 42 ++++++++++--- tests/bus_performance_manager/test_gtfs.py | 11 +++- .../test_gtfs_rt_ingestion.py | 44 +++++++------- .../test_tm_ingestion.py | 29 +++++---- 8 files changed, 154 insertions(+), 64 deletions(-) rename src/lamp_py/bus_performance_manager/{vehicle_events.py => event_files.py} (98%) rename src/lamp_py/bus_performance_manager/{gtfs_rt_ingestion.py => events_gtfs_rt.py} (94%) rename src/lamp_py/bus_performance_manager/{gtfs.py => events_gtfs_schedule.py} (100%) create mode 100644 src/lamp_py/bus_performance_manager/events_joined.py rename src/lamp_py/bus_performance_manager/{tm_ingestion.py => events_tm.py} (90%) diff --git a/src/lamp_py/bus_performance_manager/vehicle_events.py b/src/lamp_py/bus_performance_manager/event_files.py similarity index 98% rename from src/lamp_py/bus_performance_manager/vehicle_events.py rename to src/lamp_py/bus_performance_manager/event_files.py index dbe6a99e..d0d3a70d 100644 --- a/src/lamp_py/bus_performance_manager/vehicle_events.py +++ b/src/lamp_py/bus_performance_manager/event_files.py @@ -19,7 +19,7 @@ def get_new_event_files() -> List[Dict[str, date | List[str]]]: """ - Generate a dataframe that contains a record for every service date to be + Generate a list of dictionaries that contains a record for every service date to be processed. * Collect all of the potential input filepaths, their last modified timestamp, and potential service dates. diff --git a/src/lamp_py/bus_performance_manager/gtfs_rt_ingestion.py b/src/lamp_py/bus_performance_manager/events_gtfs_rt.py similarity index 94% rename from src/lamp_py/bus_performance_manager/gtfs_rt_ingestion.py rename to src/lamp_py/bus_performance_manager/events_gtfs_rt.py index 0e6089ea..06e8ae52 100644 --- a/src/lamp_py/bus_performance_manager/gtfs_rt_ingestion.py +++ b/src/lamp_py/bus_performance_manager/events_gtfs_rt.py @@ -55,13 +55,14 @@ def read_vehicle_positions( & pl.col("vehicle.trip.trip_id").is_not_null() & pl.col("vehicle.vehicle.id").is_not_null() & pl.col("vehicle.timestamp").is_not_null() + & pl.col("vehicle.trip.start_time").is_not_null() ) .select( pl.col("vehicle.trip.route_id").cast(pl.String).alias("route_id"), pl.col("vehicle.trip.trip_id").cast(pl.String).alias("trip_id"), pl.col("vehicle.stop_id").cast(pl.String).alias("stop_id"), pl.col("vehicle.current_stop_sequence") - .cast(pl.String) + .cast(pl.Int64) .alias("stop_sequence"), pl.col("vehicle.trip.direction_id") .cast(pl.Int8) @@ -103,18 +104,17 @@ def positions_to_events(vehicle_positions: pl.DataFrame) -> pl.DataFrame: :param vehicle_positions: Dataframe of vehiclie positions :return dataframe: + service_date -> String route_id -> String trip_id -> String - stop_id -> String - stop_sequence -> String - direction_id -> Int8 start_time -> String - service_date -> String + direction_id -> Int8 + stop_id -> String + stop_sequence -> Int64 vehicle_id -> String vehicle_label -> String - current_status -> String - arrival_gtfs -> Datetime - travel_towards_gtfs -> Datetime + gtfs_travel_to_dt -> Datetime + gtfs_arrival_dt -> Datetime """ vehicle_events = vehicle_positions.pivot( values="vehicle_timestamp", @@ -141,22 +141,22 @@ def positions_to_events(vehicle_positions: pl.DataFrame) -> pl.DataFrame: vehicle_events = vehicle_events.rename( { - "STOPPED_AT": "arrival_gtfs", - "IN_TRANSIT_TO": "travel_towards_gtfs", + "STOPPED_AT": "gtfs_arrival_dt", + "IN_TRANSIT_TO": "gtfs_travel_to_dt", } ).select( [ + "service_date", "route_id", "trip_id", + "start_time", + "direction_id", "stop_id", "stop_sequence", - "direction_id", - "start_time", - "service_date", "vehicle_id", "vehicle_label", - "arrival_gtfs", - "travel_towards_gtfs", + "gtfs_travel_to_dt", + "gtfs_arrival_dt", ] ) diff --git a/src/lamp_py/bus_performance_manager/gtfs.py b/src/lamp_py/bus_performance_manager/events_gtfs_schedule.py similarity index 100% rename from src/lamp_py/bus_performance_manager/gtfs.py rename to src/lamp_py/bus_performance_manager/events_gtfs_schedule.py diff --git a/src/lamp_py/bus_performance_manager/events_joined.py b/src/lamp_py/bus_performance_manager/events_joined.py new file mode 100644 index 00000000..9d03263c --- /dev/null +++ b/src/lamp_py/bus_performance_manager/events_joined.py @@ -0,0 +1,60 @@ +import polars as pl + + +def join_gtfs_tm_events(gtfs: pl.DataFrame, tm: pl.DataFrame) -> pl.DataFrame: + """ + Join gtfs-rt and transit master (tm) event dataframes + + :return dataframe: + service_date -> String + route_id -> String + trip_id -> String + start_time -> String + direction_id -> Int8 + stop_id -> String + stop_sequence -> String + vehicle_id -> String + vehicle_label -> String + gtfs_travel_to_dt -> Datetime + gtfs_arrival_dt -> Datetime + tm_stop_sequence -> Int64 + tm_is_layover -> Bool + tm_arrival_dt -> Datetime + tm_departure_dt -> Datetime + gtfs_sort_dt -> Datetime + gtfs_depart_dt -> Datetime + """ + + # join gtfs and tm datasets using "asof" strategy for stop_sequence columns + # asof strategy finds nearest value match between "asof" columns if exact match is not found + # will perform regular left join on "by" columns + + return ( + gtfs.sort(by="stop_sequence") + .join_asof( + tm.sort("tm_stop_sequence"), + left_on="stop_sequence", + right_on="tm_stop_sequence", + by=["trip_id", "route_id", "vehicle_label", "stop_id"], + strategy="nearest", + coalesce=True, + ) + .with_columns( + ( + pl.coalesce( + ["gtfs_travel_to_dt", "gtfs_arrival_dt"], + ).alias("gtfs_sort_dt") + ) + ) + .with_columns( + ( + pl.col("gtfs_travel_to_dt") + .shift(-1) + .over( + ["vehicle_label", "trip_id"], + order_by="gtfs_sort_dt", + ) + .alias("gtfs_depart_dt") + ) + ) + ) diff --git a/src/lamp_py/bus_performance_manager/tm_ingestion.py b/src/lamp_py/bus_performance_manager/events_tm.py similarity index 90% rename from src/lamp_py/bus_performance_manager/tm_ingestion.py rename to src/lamp_py/bus_performance_manager/events_tm.py index 5ccc6eca..74e6f6b3 100644 --- a/src/lamp_py/bus_performance_manager/tm_ingestion.py +++ b/src/lamp_py/bus_performance_manager/events_tm.py @@ -40,6 +40,7 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame: "GEO_NODE_ID", "GEO_NODE_ABBR", ) + .filter(pl.col("GEO_NODE_ABBR").is_not_null()) .unique() ) @@ -52,6 +53,7 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame: "ROUTE_ID", "ROUTE_ABBR", ) + .filter(pl.col("ROUTE_ABBR").is_not_null()) .unique() ) @@ -63,6 +65,7 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame: "TRIP_ID", "TRIP_SERIAL_NUMBER", ) + .filter(pl.col("TRIP_SERIAL_NUMBER").is_not_null()) .unique() ) @@ -74,6 +77,7 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame: "VEHICLE_ID", "PROPERTY_TAG", ) + .filter(pl.col("PROPERTY_TAG").is_not_null()) .unique() ) @@ -87,8 +91,15 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame: tm_stop_crossings = ( pl.scan_parquet(tm_files) .filter( - (pl.col("ACT_ARRIVAL_TIME").is_not_null()) - | (pl.col("ACT_DEPARTURE_TIME").is_not_null()) + (pl.col("IsRevenue") == "R") + & pl.col("ROUTE_ID").is_not_null() + & pl.col("GEO_NODE_ID").is_not_null() + & pl.col("TRIP_ID").is_not_null() + & pl.col("VEHICLE_ID").is_not_null() + & ( + (pl.col("ACT_ARRIVAL_TIME").is_not_null()) + | (pl.col("ACT_DEPARTURE_TIME").is_not_null()) + ) ) .join( tm_geo_nodes, @@ -124,17 +135,19 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame: ), ) .select( - pl.col("service_date").cast(pl.Date), - pl.col("PROPERTY_TAG").cast(pl.String).alias("tm_vehicle_label"), ( pl.col("ROUTE_ABBR") .cast(pl.String) .str.strip_chars_start("0") - .alias("tm_route_id") + .alias("route_id") ), - pl.col("GEO_NODE_ID").cast(pl.String).alias("tm_geo_node_id"), - pl.col("GEO_NODE_ABBR").cast(pl.String).alias("tm_stop_id"), - pl.col("TRIP_SERIAL_NUMBER").cast(pl.String).alias("tm_trip_id"), + pl.col("TRIP_SERIAL_NUMBER").cast(pl.String).alias("trip_id"), + pl.col("GEO_NODE_ABBR").cast(pl.String).alias("stop_id"), + pl.col("PATTERN_GEO_NODE_SEQ") + .cast(pl.Int64) + .alias("tm_stop_sequence"), + pl.col("IS_LAYOVER").cast(pl.String).alias("tm_is_layover"), + pl.col("PROPERTY_TAG").cast(pl.String).alias("vehicle_label"), ( ( pl.col("service_date") @@ -159,6 +172,19 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame: .collect() ) + if tm_stop_crossings.shape[0] == 0: + schema = { + "route_id": pl.String, + "trip_id": pl.String, + "stop_id": pl.String, + "tm_stop_sequence": pl.Int64, + "tm_is_layover": pl.Boolean, + "vehicle_label": pl.String, + "tm_arrival_dt": pl.Datetime, + "tm_departure_dt": pl.Datetime, + } + tm_stop_crossings = pl.DataFrame(schema=schema) + return tm_stop_crossings diff --git a/tests/bus_performance_manager/test_gtfs.py b/tests/bus_performance_manager/test_gtfs.py index 6cb86b12..46e27661 100644 --- a/tests/bus_performance_manager/test_gtfs.py +++ b/tests/bus_performance_manager/test_gtfs.py @@ -7,7 +7,9 @@ import polars as pl import polars.testing as pl_test -from lamp_py.bus_performance_manager.gtfs import gtfs_events_for_date +from lamp_py.bus_performance_manager.events_gtfs_schedule import ( + gtfs_events_for_date, +) current_dir = os.path.join(os.path.dirname(__file__)) @@ -32,9 +34,12 @@ def mock_file_download(object_path: str, file_name: str) -> bool: return True -@mock.patch("lamp_py.bus_performance_manager.gtfs.file_list_from_s3") @mock.patch( - "lamp_py.bus_performance_manager.gtfs.download_file", new=mock_file_download + "lamp_py.bus_performance_manager.events_gtfs_schedule.file_list_from_s3" +) +@mock.patch( + "lamp_py.bus_performance_manager.events_gtfs_schedule.download_file", + new=mock_file_download, ) def test_gtfs_events_for_date(s3_patch: mock.MagicMock) -> None: """ diff --git a/tests/bus_performance_manager/test_gtfs_rt_ingestion.py b/tests/bus_performance_manager/test_gtfs_rt_ingestion.py index 55e0422d..b6c24160 100644 --- a/tests/bus_performance_manager/test_gtfs_rt_ingestion.py +++ b/tests/bus_performance_manager/test_gtfs_rt_ingestion.py @@ -5,7 +5,7 @@ import polars as pl from lamp_py.aws.s3 import get_datetime_from_partition_path -from lamp_py.bus_performance_manager.gtfs_rt_ingestion import ( +from lamp_py.bus_performance_manager.events_gtfs_rt import ( read_vehicle_positions, positions_to_events, generate_gtfs_rt_events, @@ -54,7 +54,7 @@ def get_service_date_and_files() -> Tuple[date, List[str]]: "direction_id": pl.Int8, "trip_id": pl.String, "stop_id": pl.String, - "stop_sequence": pl.String, + "stop_sequence": pl.Int64, "start_time": pl.String, "service_date": pl.String, "vehicle_id": pl.String, @@ -65,18 +65,18 @@ def get_service_date_and_files() -> Tuple[date, List[str]]: # schema for vehicle events dataframes VE_SCHEMA = { + "service_date": pl.String, "route_id": pl.String, - "direction_id": pl.Int8, "trip_id": pl.String, - "stop_id": pl.String, - "stop_sequence": pl.String, "start_time": pl.String, - "service_date": pl.String, + "direction_id": pl.Int8, + "stop_id": pl.String, + "stop_sequence": pl.Int64, "vehicle_id": pl.String, "vehicle_label": pl.String, "current_status": pl.String, - "arrival_gtfs": pl.Datetime, - "travel_towards_gtfs": pl.Datetime, + "gtfs_travel_to_dt": pl.Datetime, + "gtfs_arrival_dt": pl.Datetime, } @@ -113,26 +113,26 @@ def test_gtfs_rt_to_bus_events() -> None: assert event["direction_id"] == 0 if event["stop_id"] == "173": - assert event["travel_towards_gtfs"] == datetime( + assert event["gtfs_travel_to_dt"] == datetime( year=2024, month=6, day=1, hour=13, minute=1, second=19 ) - assert event["arrival_gtfs"] == datetime( + assert event["gtfs_arrival_dt"] == datetime( year=2024, month=6, day=1, hour=13, minute=2, second=34 ) if event["stop_id"] == "655": - assert event["travel_towards_gtfs"] == datetime( + assert event["gtfs_travel_to_dt"] == datetime( year=2024, month=6, day=1, hour=12, minute=50, second=31 ) - assert event["arrival_gtfs"] == datetime( + assert event["gtfs_arrival_dt"] == datetime( year=2024, month=6, day=1, hour=12, minute=53, second=29 ) if event["stop_id"] == "903": - assert event["travel_towards_gtfs"] == datetime( + assert event["gtfs_travel_to_dt"] == datetime( year=2024, month=6, day=1, hour=13, minute=3, second=39 ) - assert event["arrival_gtfs"] == datetime( + assert event["gtfs_arrival_dt"] == datetime( year=2024, month=6, day=1, hour=13, minute=11, second=3 ) @@ -153,16 +153,16 @@ def test_gtfs_rt_to_bus_events() -> None: # no arrival time at this stop if event["stop_id"] == "12005": - assert event["travel_towards_gtfs"] == datetime( + assert event["gtfs_travel_to_dt"] == datetime( year=2024, month=6, day=1, hour=12, minute=47, second=23 ) - assert event["arrival_gtfs"] is None + assert event["gtfs_arrival_dt"] is None if event["stop_id"] == "17091": - assert event["travel_towards_gtfs"] == datetime( + assert event["gtfs_travel_to_dt"] == datetime( year=2024, month=6, day=1, hour=12, minute=52, second=41 ) - assert event["arrival_gtfs"] is None + assert event["gtfs_arrival_dt"] is None # get an empty dataframe by reading the same files but for events the day prior. previous_service_date = service_date - timedelta(days=1) @@ -202,7 +202,7 @@ def route_one() -> pl.DataFrame: "direction_id": [0, 0, 0, 0], "trip_id": ["101", "101", "101", "101"], "stop_id": ["1", "1", "2", "2"], - "stop_sequence": ["1", "1", "2", "2"], + "stop_sequence": [1, 1, 2, 2], "start_time": ["08:45:00", "08:45:00", "08:45:00", "08:45:00"], "service_date": ["20240601", "20240601", "20240601", "20240601"], "vehicle_id": ["y1001", "y1001", "y1001", "y1001"], @@ -233,7 +233,7 @@ def route_two() -> pl.DataFrame: "direction_id": [0, 0, 0, 0, 0, 0], "trip_id": ["202", "202", "202", "202", "202", "202"], "stop_id": ["1", "1", "1", "2", "2", "2"], - "stop_sequence": ["1", "1", "1", "2", "2", "2"], + "stop_sequence": [1, 1, 1, 2, 2, 2], "start_time": [ "09:45:00", "09:45:00", @@ -282,7 +282,7 @@ def route_three() -> pl.DataFrame: "direction_id": [0, 0, 0, 0, 0, 0], "trip_id": ["303", "303", "303", "303", "303", "303"], "stop_id": ["1", "1", "1", "2", "2", "2"], - "stop_sequence": ["1", "1", "1", "2", "2", "2"], + "stop_sequence": [1, 1, 1, 2, 2, 2], "start_time": [ "10:45:00", "10:45:00", @@ -331,7 +331,7 @@ def route_four() -> pl.DataFrame: "direction_id": [0, 0, 0, 0, 0, 0], "trip_id": ["404", "404", "404", "404", "404", "404"], "stop_id": ["1", "1", "1", "2", "2", "2"], - "stop_sequence": ["1", "1", "1", "2", "2", "2"], + "stop_sequence": [1, 1, 1, 2, 2, 2], "start_time": [ "11:45:00", "11:45:00", diff --git a/tests/bus_performance_manager/test_tm_ingestion.py b/tests/bus_performance_manager/test_tm_ingestion.py index f6e98407..d43b838b 100644 --- a/tests/bus_performance_manager/test_tm_ingestion.py +++ b/tests/bus_performance_manager/test_tm_ingestion.py @@ -4,7 +4,7 @@ from _pytest.monkeypatch import MonkeyPatch import polars as pl -from lamp_py.bus_performance_manager.tm_ingestion import generate_tm_events +from lamp_py.bus_performance_manager.events_tm import generate_tm_events from ..test_resources import ( tm_geo_node_file, @@ -20,19 +20,19 @@ def test_tm_to_bus_events(monkeypatch: MonkeyPatch) -> None: run tests on each file in the test files tm stop crossings directory """ monkeypatch.setattr( - "lamp_py.bus_performance_manager.tm_ingestion.tm_geo_node_file", + "lamp_py.bus_performance_manager.events_tm.tm_geo_node_file", tm_geo_node_file, ) monkeypatch.setattr( - "lamp_py.bus_performance_manager.tm_ingestion.tm_route_file", + "lamp_py.bus_performance_manager.events_tm.tm_route_file", tm_route_file, ) monkeypatch.setattr( - "lamp_py.bus_performance_manager.tm_ingestion.tm_trip_file", + "lamp_py.bus_performance_manager.events_tm.tm_trip_file", tm_trip_file, ) monkeypatch.setattr( - "lamp_py.bus_performance_manager.tm_ingestion.tm_vehicle_file", + "lamp_py.bus_performance_manager.events_tm.tm_vehicle_file", tm_vehicle_file, ) @@ -70,11 +70,10 @@ def check_stop_crossings(stop_crossings_filepath: str) -> None: # ensure data has been extracted from the filepath assert not bus_events.is_empty() - # make sure we only have a single service date and it matches the filename service date - assert set(bus_events["service_date"]) == {service_date} - - # ensure we didn't lose any data from the raw dataset when joining - assert len(bus_events) == len(raw_stop_crossings) + # ensure we didn't lose any Revenue data from the raw dataset when joining + assert len(bus_events) == len( + raw_stop_crossings.filter((pl.col("IsRevenue") == "R")) + ) # check that crossings without trips are garage pullouts bus_garages = { @@ -87,8 +86,8 @@ def check_stop_crossings(stop_crossings_filepath: str) -> None: "qubus", "somvl", } - non_trip_events = bus_events.filter(pl.col("tm_trip_id").is_null()) - assert set(non_trip_events["tm_stop_id"]).issubset(bus_garages) + non_trip_events = bus_events.filter(pl.col("trip_id").is_null()) + assert set(non_trip_events["stop_id"]).issubset(bus_garages) # check that all arrival and departure timestamps happen after the start of the service date assert bus_events.filter( @@ -103,7 +102,7 @@ def check_stop_crossings(stop_crossings_filepath: str) -> None: # check that there are no leading zeros on route ids assert bus_events.filter( - pl.col("tm_route_id").str.starts_with("0") - | pl.col("tm_trip_id").str.starts_with("0") - | pl.col("tm_stop_id").str.starts_with("0") + pl.col("route_id").str.starts_with("0") + | pl.col("trip_id").str.starts_with("0") + | pl.col("stop_id").str.starts_with("0") ).is_empty()