Skip to content

Commit

Permalink
FEAT: Ingest Glides Vehicle Trip Assignment Feed (#469)
Browse files Browse the repository at this point in the history
Glides began publishing a new data feed type on the evening of Dec 12, 2024. https://mbta.github.io/schemas/events/glides/com.mbta.ctd.glides.vehicle_trip_assignment.v1

This change adds a GlidesConverter to begin ingesting the "Vehicle Trip Assignment" feed.

Asana Task: https://app.asana.com/0/1205827492903547/1208962016232571
  • Loading branch information
rymarczy authored Dec 13, 2024
1 parent 6d056e2 commit fc1906a
Showing 1 changed file with 57 additions and 0 deletions.
57 changes: 57 additions & 0 deletions src/lamp_py/ingestion/glides.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,62 @@ def flatten_multitypes(record: Dict) -> Dict:
return tu_dataset


class VehicleTripAssignment(GlidesConverter):
"""
Converter for Vehcile Trip Assignment Events
https://mbta.github.io/schemas/events/glides/com.mbta.ctd.glides.vehicle_trip_assignment.v1
"""

def __init__(self) -> None:
GlidesConverter.__init__(self, base_filename="vehicle_trip_assignment.parquet")

@property
def event_schema(self) -> pyarrow.schema:
# NOTE: These tables will eventually be uniqued via polars, which will
# not work if any of the types in the schema are objects.
glides_trip_key = pyarrow.struct(
[
("serviceDate", pyarrow.string()),
("tripId", pyarrow.string()),
("scheduled", pyarrow.string()),
]
)

return pyarrow.schema(
[
("type", pyarrow.string()),
("specversion", pyarrow.string()),
("source", pyarrow.string()),
("id", pyarrow.string()),
("time", pyarrow.timestamp("ms")),
(
"data",
pyarrow.struct(
[
("vehicleId", pyarrow.string()),
("tripKey", glides_trip_key),
]
),
),
]
)

@property
def unique_key(self) -> str:
return "tripKey"

def convert_records(self) -> pd.Dataset:
process_logger = ProcessLogger(process_name="convert_records", type=self.type)
process_logger.log_start()

tu_table = pyarrow.Table.from_pylist(self.records, schema=self.event_schema)
tu_table = flatten_schema(tu_table)
tu_dataset = pd.dataset(tu_table)

process_logger.log_complete()
return tu_dataset


def ingest_glides_events(kinesis_reader: KinesisReader, metadata_queue: Queue[Optional[str]]) -> None:
"""
ingest glides records from the kinesis stream and add them to parquet files
Expand All @@ -374,6 +430,7 @@ def ingest_glides_events(kinesis_reader: KinesisReader, metadata_queue: Queue[Op
EditorChanges(),
OperatorSignIns(),
TripUpdates(),
VehicleTripAssignment(),
]

for record in kinesis_reader.get_records():
Expand Down

0 comments on commit fc1906a

Please sign in to comment.