Skip to content

Commit

Permalink
Merge pull request #5 from mbta/jz-glides-report-collapse-by-trip-id
Browse files Browse the repository at this point in the history
feat: Add a second table to extraneous/missed predictions report that analyzes by `{stop_id, trip_id}`
  • Loading branch information
jzimbel-mbta authored Aug 29, 2024
2 parents 866a217 + 994b852 commit 0b503b3
Show file tree
Hide file tree
Showing 24 changed files with 1,673 additions and 375 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/elixir.yml → .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Elixir CI
name: CI

on: [push, pull_request]

Expand Down
56 changes: 56 additions & 0 deletions .github/workflows/report-coverage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: Report Elixir Coverage

on:
workflow_run:
workflows:
- CI
types:
- completed

permissions:
actions: read
contents: read
pull-requests: write

jobs:
report-coverage:
if: ${{ github.event.workflow_run.event == 'pull_request' && github.event.workflow_run.conclusion == 'success' }}
runs-on: ubuntu-latest

steps:
- run: mkdir -p ${{ runner.temp }}/cover
- run: echo Fetching artifacts for ${{ github.event.workflow_run.id }}, event name ${{ github.event_name }}, triggered by ${{ github.event.workflow_run.event }}
- name: Download artifact
uses: actions/[email protected]
with:
script: |
var artifacts = await github.actions.listWorkflowRunArtifacts({
owner: context.repo.owner,
repo: context.repo.repo,
run_id: ${{github.event.workflow_run.id }},
});
var matchArtifact = artifacts.data.artifacts.filter((artifact) => {
return artifact.name == "elixir-lcov"
})[0];
var download = await github.actions.downloadArtifact({
owner: context.repo.owner,
repo: context.repo.repo,
artifact_id: matchArtifact.id,
archive_format: 'zip',
});
var fs = require('fs');
fs.writeFileSync('${{ runner.temp }}/cover/elixir-lcov.zip', Buffer.from(download.data));
- working-directory: ${{ runner.temp }}/cover
run: |
unzip elixir-lcov.zip
echo "PR_SHA=$(cat PR_SHA)" >> $GITHUB_ENV
echo "PR_NUMBER=$(cat PR_NUMBER)" >> $GITHUB_ENV
- uses: actions/checkout@v2 # UNTRUSTED CODE - do not run scripts from it
with:
ref: ${{ env.PR_SHA }}
- name: Upload coverage artifact and post comment
uses: mbta/github-actions-report-lcov@v1
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
coverage-files: ${{ runner.temp }}/cover/lcov*.info
artifact-name: elixir-code-coverage
3 changes: 3 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import Config

import_config "#{config_env()}.exs"
1 change: 1 addition & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import Config
1 change: 1 addition & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import Config
5 changes: 5 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import Config

config :elixir, :time_zone_database, Tz.TimeZoneDatabase

config :transit_data, :data_lake_api, TransitData.MockDataLake
67 changes: 67 additions & 0 deletions lib/transit_data/data_lake.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule TransitData.DataLake do
@moduledoc """
Functions to work with our mbta-gtfs-s3* data lake buckets.
"""

#############
# Callbacks #
#############

@doc """
Returns a stream of keys of objects in the given bucket that match the given prefix.
"""
@callback stream_object_keys(bucket :: String.t(), prefix :: String.t()) ::
Enumerable.t(String.t())

@doc """
Returns a stream of the contents of a data lake JSON file.
Returns a tuple containing:
- a stream of maps, parsed from the JSON's `entity` field
- the timestamp, parsed from the JSON's `header` field
- the basename of the object's key
"""
@callback stream_json(bucket :: String.t(), key :: String.t()) ::
{data :: Enumerable.t(map()), timestamp :: integer, basename :: String.t()}

#################################
# Dispatching to implementation #
#################################

# The module adopting this behaviour that we use for the current environment.
@callback_module Application.compile_env(:transit_data, :data_lake_api, TransitData.S3DataLake)

defdelegate stream_object_keys(bucket, prefix), to: @callback_module
defdelegate stream_json(bucket, key), to: @callback_module
end

defmodule TransitData.S3DataLake do
@moduledoc false

@behaviour TransitData.DataLake

@impl true
def stream_json(bucket, key) do
stream =
ExAws.S3.download_file(bucket, key, :memory)
|> ExAws.stream!()
|> StreamGzip.gunzip()
|> Jaxon.Stream.from_enumerable()

timestamp =
stream
|> Jaxon.Stream.query([:root, "header", "timestamp"])
|> Enum.at(0)

objects = Jaxon.Stream.query(stream, [:root, "entity", :all])

{objects, timestamp, Path.basename(key)}
end

@impl true
def stream_object_keys(bucket, prefix) do
ExAws.S3.list_objects(bucket, prefix: prefix)
|> ExAws.stream!()
|> Stream.map(& &1.key)
end
end
13 changes: 10 additions & 3 deletions lib/transit_data/glides_report/countdown_clocks_simulation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@ defmodule TransitData.GlidesReport.CountdownClocksSimulation do

alias TransitData.GlidesReport

@type t :: %{(stop_id :: String.t()) => GlidesReport.Sign.t()}
@type t :: %{stop_id => GlidesReport.Sign.t()}

# Returns a set of {stop_id, timestamp} tuples, each representing an instance where
# a predicted time (timestamp) appeared on the countdown clock for a stop (stop_id).
@type stop_id :: String.t()

@type timestamp :: integer

@doc """
Returns a set of {stop_id, timestamp} tuples, each representing an instance where
a predicted time (timestamp) appeared on the countdown clock for a stop (stop_id).
"""
@spec get_all_top_two_times(Enumerable.t(stop_id)) :: MapSet.t({stop_id, timestamp})
def get_all_top_two_times(stop_ids) do
trip_updates_for_simulation(stop_ids)
|> Enum.reduce(%{}, fn tr_upd, signs -> apply_trip_update(signs, tr_upd) end)
Expand Down
31 changes: 31 additions & 0 deletions lib/transit_data/glides_report/departure.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule TransitData.GlidesReport.Departure do
@moduledoc """
Data structure representing a single departure from a stop
(either predicted or actual)
"""

alias TransitData.GlidesReport.Spec.Common
alias TransitData.GlidesReport.Util

@type t :: %__MODULE__{
trip: Common.trip_id(),
stop: Common.stop_id(),
timestamp: Common.timestamp(),
# Hour part of the timestamp (in Eastern TZ)
hour: 0..23,
# Minute part of the timestamp
minute: 0..59
}

@type minute :: 0..59

@enforce_keys [:trip, :stop, :timestamp, :hour, :minute]
defstruct @enforce_keys

@spec new(Common.trip_id(), Common.stop_id(), Common.timestamp()) :: t()
def new(trip, stop, timestamp) do
hour = Util.unix_timestamp_to_local_hour(timestamp)
minute = Util.unix_timestamp_to_local_minute(timestamp)
%__MODULE__{trip: trip, stop: stop, timestamp: timestamp, hour: hour, minute: minute}
end
end
30 changes: 6 additions & 24 deletions lib/transit_data/glides_report/loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule TransitData.GlidesReport.Loader do

# If a breaking change is made to how files are saved or how their data is structured,
# this value lets us make a clean break to a new directory for downloads.
defp loader_version, do: "1.1"
defp loader_version, do: "1.2"

@doc """
Loads data into ETS tables, and returns counts of files found locally vs downloaded.
Expand Down Expand Up @@ -300,12 +300,11 @@ defmodule TransitData.GlidesReport.Loader do
# Downloads VehiclePosition or TripUpdate files and returns the local filenames they were downloaded to.
defp download_files(remote_prefix, table_name, s3_bucket, count, existing_filenames) do
stream =
ExAws.S3.list_objects(s3_bucket, prefix: remote_prefix)
|> ExAws.stream!()
TransitData.DataLake.stream_object_keys(s3_bucket, remote_prefix)
# Find a file matching the prefix and table name.
|> Stream.filter(&s3_object_match?(&1, table_name, existing_filenames))
# Download the file to memory and stream the JSON objects under its "entity" key.
|> Stream.map(&stream_s3_json(&1, s3_bucket))
|> Stream.map(&TransitData.DataLake.stream_json(s3_bucket, &1))
# Clean up and filter data.
|> Stream.map(fn {objects, timestamp, filename} ->
objects = clean_up(objects, timestamp, table_name)
Expand All @@ -317,7 +316,7 @@ defmodule TransitData.GlidesReport.Loader do
|> Stream.map(fn {objects, timestamp, filename} ->
objects =
Stream.map(objects, fn obj ->
AtomicMap.convert(obj, underscore: false)
AtomicMap.convert(obj, safe: false, underscore: false)
end)

local_filename = s3_filename_to_local_filename(filename)
Expand All @@ -334,8 +333,8 @@ defmodule TransitData.GlidesReport.Loader do
end
end

defp s3_object_match?(obj, table_name, existing_filenames) do
s3_filename = Path.basename(obj.key)
defp s3_object_match?(obj_key, table_name, existing_filenames) do
s3_filename = Path.basename(obj_key)

cond do
not Regex.match?(~r"(realtime|rtr)_#{table_name}_enhanced.json.gz$", s3_filename) -> false
Expand All @@ -344,23 +343,6 @@ defmodule TransitData.GlidesReport.Loader do
end
end

defp stream_s3_json(obj, s3_bucket) do
stream =
ExAws.S3.download_file(s3_bucket, obj.key, :memory)
|> ExAws.stream!()
|> StreamGzip.gunzip()
|> Jaxon.Stream.from_enumerable()

timestamp =
stream
|> Jaxon.Stream.query([:root, "header", "timestamp"])
|> Enum.at(0)

objects = Jaxon.Stream.query(stream, [:root, "entity", :all])

{objects, timestamp, Path.basename(obj.key)}
end

# Loads a locally-stored Erlang External Term Format file into an ETS table.
defp load_file_into_table(table_name, local_path) do
local_path
Expand Down
8 changes: 6 additions & 2 deletions lib/transit_data/glides_report/spec/trip_update.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ defmodule TransitData.GlidesReport.Spec.TripUpdate do
@type key :: String.t()

@type value :: %{
id: Common.trip_id(),
# Normally the same trip ID as .trip_update.trip.trip_id,
# but in dev-blue it seems to be getting prefixed with the timestamp for some reason.
# (So it's inadvisable to use this field for anything but a unique ID)
id: String.t(),
trip_update: %{
timestamp: Common.timestamp(),
stop_time_update:
Expand All @@ -27,7 +30,8 @@ defmodule TransitData.GlidesReport.Spec.TripUpdate do
# parent stop ID of the relevant terminal by calling
# GlidesReport.TripUpdate.normalize_stop_id/1 on it.
stop_id: Common.stop_id()
})
}),
trip: %{trip_id: Common.trip_id()}
}
}
end
51 changes: 40 additions & 11 deletions lib/transit_data/glides_report/trip_update.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ defmodule TransitData.GlidesReport.TripUpdate do

alias TransitData.GlidesReport

@doc """
Cleans up a TripUpdate parsed from raw JSON. (With keys not yet converted to
atoms)
Returns nil if there is no data relevant to Glides terminals in the TripUpdate.
- Canceled TripUpdates are discarded.
- Nonrevenue TripUpdates are discarded.
- `.trip_update.timestamp` is replaced with the given `header_timestamp` if
missing or nil
- `.trip_update.stop_time_update` list is filtered to non-skipped entries with
defined departure times, at Glides terminal stops. If the filtered list is
empty, the entire TripUpdate is discarded.
- All unused fields are removed.
"""
@spec clean_up(map, integer) :: map | nil
def clean_up(tr_upd, header_timestamp)

def clean_up(%{"trip_update" => %{"trip" => %{"schedule_relationship" => "CANCELED"}}}, _) do
Expand All @@ -18,10 +34,11 @@ defmodule TransitData.GlidesReport.TripUpdate do
header_timestamp
) do
tr_upd
|> update_in(["trip_update", "stop_time_update"], &clean_up_stop_times/1)
# If the trip update is missing a timestamp, substitute the timestamp from the header.
|> update_in(["trip_update", "timestamp"], &(&1 || header_timestamp))
|> update_in(["trip_update"], &Map.take(&1, ["timestamp", "stop_time_update"]))
|> update_in(["trip_update", "stop_time_update"], &clean_up_stop_times/1)
|> update_in(["trip_update", "trip"], &Map.take(&1, ["trip_id"]))
|> update_in(["trip_update"], &Map.take(&1, ["timestamp", "stop_time_update", "trip"]))
|> then(fn cleaned_tr_upd ->
# If all stop times have been removed, discard the entire trip update.
if Enum.empty?(cleaned_tr_upd["trip_update"]["stop_time_update"]) do
Expand All @@ -45,14 +62,16 @@ defmodule TransitData.GlidesReport.TripUpdate do
end

defp clean_up_stop_times(stop_times) do
glides_terminals = GlidesReport.Terminals.all_first_stops()

stop_times
# Ignore stop times that aren't relevant to Glides terminals.
|> Stream.reject(&(&1["stop_id"] not in GlidesReport.Terminals.all_stops()))
# Select stop times that have departure times and aren't skipped.
|> Stream.filter(fn stop_time ->
has_departure_time = not is_nil(stop_time["departure"]["time"])
is_skipped = stop_time["schedule_relationship"] == "SKIPPED"
has_departure_time and not is_skipped
cond do
is_nil(stop_time["departure"]["time"]) -> false
stop_time["schedule_relationship"] == "SKIPPED" -> false
stop_time["stop_id"] not in glides_terminals -> false
:else -> true
end
end)
# Prune all but the relevant fields.
|> Enum.map(fn
Expand Down Expand Up @@ -80,8 +99,18 @@ defmodule TransitData.GlidesReport.TripUpdate do
def filter_by_advance_notice(tr_upd, min_advance_notice_sec) do
time_of_creation = tr_upd.trip_update.timestamp

update_in(tr_upd.trip_update.stop_time_update, fn stop_times ->
Enum.filter(stop_times, &(&1.departure.time - time_of_creation >= min_advance_notice_sec))
end)
filtered_stop_times =
Enum.filter(
tr_upd.trip_update.stop_time_update,
&(&1.departure.time - time_of_creation >= min_advance_notice_sec)
)

case filtered_stop_times do
[] ->
nil

filtered_stop_times ->
put_in(tr_upd.trip_update.stop_time_update, filtered_stop_times)
end
end
end
Loading

0 comments on commit 0b503b3

Please sign in to comment.