Skip to content

Commit

Permalink
add revenue to pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
rymarczy committed Oct 1, 2024
1 parent 9d37551 commit fe59247
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 119 deletions.
35 changes: 27 additions & 8 deletions src/lamp_py/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import boto3
import botocore
import botocore.exceptions
from botocore.exceptions import ClientError
import pandas
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pyarrow.dataset as pd
from botocore.exceptions import ClientError
from pyarrow import Table, fs
from pyarrow.util import guid

Expand Down Expand Up @@ -254,7 +255,10 @@ def get_zip_buffer(filename: str) -> IO[bytes]:


def file_list_from_s3(
bucket_name: str, file_prefix: str, max_list_size: int = 250_000
bucket_name: str,
file_prefix: str,
max_list_size: int = 250_000,
in_filter: Optional[str] = None,
) -> List[str]:
"""
get a list of s3 objects
Expand Down Expand Up @@ -283,7 +287,10 @@ def file_list_from_s3(
for obj in page["Contents"]:
if obj["Size"] == 0:
continue
filepaths.append(os.path.join("s3://", bucket_name, obj["Key"]))
if in_filter is None or in_filter in obj["Key"]:
filepaths.append(
os.path.join("s3://", bucket_name, obj["Key"])
)

if len(filepaths) > max_list_size:
break
Expand Down Expand Up @@ -673,15 +680,27 @@ def read_parquet(
) -> pandas.core.frame.DataFrame:
"""
read parquet file or files from s3 and return it as a pandas dataframe
if requested column from "columns" does not exist in parquet file then
the column will be added as all nulls, this was added to capture
vehicle.trip.revenue field from VehiclePosition files starting december 2023
"""
retry_attempts = 2
for retry_attempt in range(retry_attempts + 1):
try:
df = (
_get_pyarrow_dataset(filename, filters)
.to_table(columns=columns)
.to_pandas(self_destruct=True)
)
ds = _get_pyarrow_dataset(filename, filters)
if columns is None:
table = ds.to_table(columns=columns)

else:
read_columns = list(set(ds.schema.names) & set(columns))
table = ds.to_table(columns=read_columns)
for null_column in set(columns).difference(ds.schema.names):
table = table.append_column(
null_column, pa.nulls(table.num_rows)
)

df = table.to_pandas(self_destruct=True)
break
except Exception as exception:
if retry_attempt == retry_attempts:
Expand Down
16 changes: 9 additions & 7 deletions src/lamp_py/ingestion/light_rail_gps.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,18 @@ def ingest_light_rail_gps() -> None:

s3_files = [file for file in s3_files if "LightRailRawGPS" in file]

dataframe, archive_files, error_files = dataframe_from_gz(s3_files)
if len(s3_files) > 0:

write_parquet(dataframe)
dataframe, archive_files, error_files = dataframe_from_gz(s3_files)

write_parquet(dataframe)

if len(archive_files) > 0:
move_s3_objects(archive_files, os.environ["ARCHIVE_BUCKET"])
if len(error_files) > 0:
move_s3_objects(error_files, os.environ["ERROR_BUCKET"])

logger.log_complete()

except Exception as exception:
logger.log_failure(exception)

if len(archive_files) > 0:
move_s3_objects(archive_files, os.environ["ARCHIVE_BUCKET"])
if len(error_files) > 0:
move_s3_objects(error_files, os.environ["ERROR_BUCKET"])
Original file line number Diff line number Diff line change
Expand Up @@ -47,51 +47,6 @@ def upgrade() -> None:
postgresql_where=sa.text("rail_pm_processed = false"),
)

# pull metadata from the rail performance manager database into the
# metadata database. the table may or may not exist, so wrap this in a try
# except
try:
rpm_db_manager = DatabaseManager(
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER
)

insert_data = []
# pull metadata from the rail performance manager database via direct
# sql query. the metadata_log table may or may not exist.
with rpm_db_manager.session.begin() as session:
result = session.execute(
text("SELECT path, processed, process_fail FROM metadata_log")
)
for row in result:
(path, processed, process_fail) = row
insert_data.append(
{
"path": path,
"rail_pm_processed": processed,
"rail_pm_process_fail": process_fail,
}
)

except ProgrammingError as error:
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
# no metadata_log table in the rail performance manager database
#
# Raise all other sql errors
insert_data = []
original_error = error.orig
if (
original_error is not None
and hasattr(original_error, "pgcode")
and original_error.pgcode == "42P01"
):
logging.info("No Metadata Table in Rail Performance Manager")
else:
raise

# insert data into the metadata database
if insert_data:
op.bulk_insert(MetadataLog.__table__, insert_data)

# ### end Alembic commands ###


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,59 +29,6 @@

def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
while True:
try:
rpm_db_manager = DatabaseManager(
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER
)
md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA)

with rpm_db_manager.session.begin() as session:
legacy_result = session.execute(
text("SELECT path FROM metadata_log")
)
legacy_paths = set(
[record[0] for record in legacy_result.fetchall()]
)

modern_result = md_db_manager.select_as_list(
sa.select(MetadataLog.path)
)
modern_paths = set([record["path"] for record in modern_result])

missing_paths = legacy_paths - modern_paths
if len(missing_paths) == 0:
break
else:
logging.error(
"Detected %s paths in Legacy Metadata Table not found in Metadata Database",
len(missing_paths),
)
except ProgrammingError as error:
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
# no metadata_log table in the rail performance manager database
#
# Raise all other sql errors
original_error = error.orig
if (
original_error is not None
and hasattr(original_error, "pgcode")
and original_error.pgcode == "42P01"
):
logging.info("No Metadata Table in Rail Performance Manager")
legacy_paths = set()
else:
logging.exception(
"Programming Error when checking Metadata Log"
)
time.sleep(15)
continue

except Exception as error:
logging.exception("Programming Error when checking Metadata Log")
time.sleep(15)
continue

op.drop_index("ix_metadata_log_not_processed", table_name="metadata_log")
op.drop_table("metadata_log")
# ### end Alembic commands ###
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""add revenue columns
Revision ID: 32ba735d080c
Revises: 896dedd8a4db
Create Date: 2024-09-20 08:47:52.784591
"""

from alembic import op
import sqlalchemy as sa

from lamp_py.postgres.rail_performance_manager_schema import (
TempEventCompare,
VehicleTrips,
)

# revision identifiers, used by Alembic.
revision = "32ba735d080c"
down_revision = "896dedd8a4db"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.execute(
f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER rt_trips_update_branch_trunk;"
)
op.execute(
f"ALTER TABLE public.vehicle_trips DISABLE TRIGGER update_vehicle_trips_modified;"
)
op.drop_index("ix_vehicle_trips_composite_1", table_name="vehicle_trips")
op.drop_constraint("vehicle_trips_unique_trip", table_name="vehicle_trips")

op.add_column(
"temp_event_compare", sa.Column("revenue", sa.Boolean(), nullable=True)
)
op.add_column(
"vehicle_trips", sa.Column("revenue", sa.Boolean(), nullable=True)
)
op.execute(sa.update(TempEventCompare).values(revenue=True))
op.execute(sa.update(VehicleTrips).values(revenue=True))
op.alter_column("temp_event_compare", "revenue", nullable=False)
op.alter_column("vehicle_trips", "revenue", nullable=False)

op.create_unique_constraint(
"vehicle_trips_unique_trip",
"vehicle_trips",
["service_date", "route_id", "trip_id"],
)
op.create_index(
"ix_vehicle_trips_composite_1",
"vehicle_trips",
["route_id", "direction_id", "vehicle_id"],
unique=False,
)
op.execute(
f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER rt_trips_update_branch_trunk;"
)
op.execute(
f"ALTER TABLE public.vehicle_trips ENABLE TRIGGER update_vehicle_trips_modified;"
)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("vehicle_trips", "revenue")
op.drop_column("temp_event_compare", "revenue")
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions src/lamp_py/performance_manager/l0_gtfs_rt_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def combine_events(
"direction_id",
"parent_station",
"vehicle_id",
"revenue",
"stop_id",
"stop_sequence",
"trip_id",
Expand Down
1 change: 1 addition & 0 deletions src/lamp_py/performance_manager/l0_rt_trip_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def reduce_trip_updates(trip_updates: pandas.DataFrame) -> pandas.DataFrame:
trip_updates["stop_sequence"] = None
trip_updates["vehicle_label"] = None
trip_updates["vehicle_consist"] = None
trip_updates["revenue"] = True

process_logger.add_metadata(after_row_count=trip_updates.shape[0])
process_logger.log_complete()
Expand Down
7 changes: 7 additions & 0 deletions src/lamp_py/performance_manager/l0_rt_vehicle_positions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def get_vp_dataframe(
"vehicle.trip.route_id",
"vehicle.trip.start_date",
"vehicle.trip.start_time",
"vehicle.trip.revenue",
"vehicle.vehicle.id",
"vehicle.trip.trip_id",
"vehicle.vehicle.label",
Expand Down Expand Up @@ -64,6 +65,7 @@ def get_vp_dataframe(
"vehicle.trip.route_id": "route_id",
"vehicle.trip.start_date": "start_date",
"vehicle.trip.start_time": "start_time",
"vehicle.trip.revenue": "revenue",
"vehicle.vehicle.id": "vehicle_id",
"vehicle.trip.trip_id": "trip_id",
"vehicle.vehicle.label": "vehicle_label",
Expand Down Expand Up @@ -125,6 +127,11 @@ def transform_vp_datatypes(
vehicle_positions["direction_id"]
).astype(numpy.bool_)

# fix revenue field, NULL is True
vehicle_positions["revenue"] = numpy.where(
vehicle_positions["revenue"].eq(False), False, True
).astype(numpy.bool_)

# store start_time as seconds from start of day as int64
vehicle_positions["start_time"] = (
vehicle_positions["start_time"]
Expand Down
5 changes: 3 additions & 2 deletions src/lamp_py/performance_manager/l1_cte_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def rt_trips_subquery(service_date: int) -> sa.sql.selectable.Subquery:
VehicleTrips.vehicle_id,
VehicleTrips.stop_count,
VehicleTrips.static_trip_id_guess,
VehicleTrips.revenue,
VehicleEvents.pm_trip_id,
VehicleEvents.stop_sequence,
VehicleEvents.parent_station,
Expand Down Expand Up @@ -301,8 +302,8 @@ def trips_for_headways_subquery(
.select_from(rt_trips_sub)
.where(
# drop trips with one stop count, probably not valid
rt_trips_sub.c.stop_count
> 1,
rt_trips_sub.c.stop_count > 1,
rt_trips_sub.c.revenue == sa.true(),
)
.order_by(
rt_trips_sub.c.pm_trip_id,
Expand Down
4 changes: 4 additions & 0 deletions src/lamp_py/performance_manager/l1_rt_trips.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None:
TempEventCompare.vehicle_label,
TempEventCompare.vehicle_consist,
TempEventCompare.static_version_key,
TempEventCompare.revenue,
)
.distinct(
TempEventCompare.service_date,
Expand All @@ -127,6 +128,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None:
"vehicle_label",
"vehicle_consist",
"static_version_key",
"revenue",
]

trip_insert_query = (
Expand All @@ -152,6 +154,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None:
TempEventCompare.trip_id,
TempEventCompare.vehicle_label,
TempEventCompare.vehicle_consist,
TempEventCompare.revenue,
)
.distinct(
TempEventCompare.service_date,
Expand Down Expand Up @@ -179,6 +182,7 @@ def load_new_trip_data(db_manager: DatabaseManager) -> None:
trip_id=distinct_update_query.c.trip_id,
vehicle_label=distinct_update_query.c.vehicle_label,
vehicle_consist=distinct_update_query.c.vehicle_consist,
revenue=distinct_update_query.c.revenue,
)
.where(
VehicleTrips.service_date == distinct_update_query.c.service_date,
Expand Down
2 changes: 2 additions & 0 deletions src/lamp_py/postgres/rail_performance_manager_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class VehicleTrips(RpmSqlBase): # pylint: disable=too-few-public-methods
vehicle_consist = sa.Column(sa.String(), nullable=True)
direction = sa.Column(sa.String(30), nullable=True)
direction_destination = sa.Column(sa.String(60), nullable=True)
revenue = sa.Column(sa.Boolean, nullable=False)

# static trip matching
static_trip_id_guess = sa.Column(sa.String(512), nullable=True)
Expand Down Expand Up @@ -171,6 +172,7 @@ class TempEventCompare(RpmSqlBase): # pylint: disable=too-few-public-methods
vehicle_id = sa.Column(sa.String(60), nullable=False)
vehicle_label = sa.Column(sa.String(128), nullable=True)
vehicle_consist = sa.Column(sa.String(), nullable=True)
revenue = sa.Column(sa.Boolean, nullable=False)

# forign key to static schedule expected values
static_version_key = sa.Column(
Expand Down
Loading

0 comments on commit fe59247

Please sign in to comment.