diff --git a/.github/workflows/unix.yml b/.github/workflows/unix.yml index d58343cb9..39d29f8c6 100644 --- a/.github/workflows/unix.yml +++ b/.github/workflows/unix.yml @@ -51,7 +51,7 @@ jobs: - name: Install RSL Web Interface Dependencies uses: pnpm/action-setup@v2 with: - version: ^8.6.12 + version: ^8.10.5 run_install: | - cwd: ui/rsl @@ -83,7 +83,7 @@ jobs: - name: Install Protocol Tool Dependencies uses: pnpm/action-setup@v2 with: - version: ^8.6.12 + version: ^8.10.5 run_install: | - cwd: tools/protocol @@ -193,7 +193,7 @@ jobs: - name: Install RSL Web Interface Dependencies uses: pnpm/action-setup@v2 with: - version: ^8.6.12 + version: ^8.10.5 run_install: | - cwd: ./ui/rsl @@ -344,7 +344,7 @@ jobs: uses: pnpm/action-setup@v2 if: ${{ !matrix.config.skipui }} with: - version: ^8.6.12 + version: ^8.10.5 run_install: | - cwd: ./ui/rsl diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml index 95d41e84a..aec055226 100644 --- a/.github/workflows/windows.yml +++ b/.github/workflows/windows.yml @@ -106,7 +106,7 @@ jobs: uses: pnpm/action-setup@v2 if: matrix.config.webui == 'On' with: - version: ^8.6.12 + version: ^8.10.5 run_install: | - cwd: ./ui/rsl @@ -120,7 +120,7 @@ jobs: if: matrix.config.mode == 'Release' uses: pnpm/action-setup@v2 with: - version: ^8.6.12 + version: ^8.10.5 run_install: | - cwd: tools/protocol diff --git a/base/core/include/motis/core/debug/trip.h b/base/core/include/motis/core/debug/trip.h index 5706077ec..5ec1a21e6 100644 --- a/base/core/include/motis/core/debug/trip.h +++ b/base/core/include/motis/core/debug/trip.h @@ -135,7 +135,8 @@ struct trip_with_sections { out << cista::ptr_cast(mt) << "/" << mt->trip_idx_ << " "; } out << "] arriving from " - << station{sched, re->from_->get_station()->id_} << "\n"; + << station{sched, re->from_->get_station()->id_} << " at " + << format_time(through_lc.a_time_) << "\n"; } } } @@ -152,7 +153,8 @@ struct trip_with_sections { out << cista::ptr_cast(mt) << "/" << mt->trip_idx_ << " "; } out << "] departing to " - << station{sched, re.to_->get_station()->id_} << "\n"; + << station{sched, re.to_->get_station()->id_} << " at " + << format_time(through_lc.d_time_) << "\n"; } } } diff --git a/docs/api/schemas/motis/paxmon.yaml b/docs/api/schemas/motis/paxmon.yaml index 96b54e1e3..a7342e673 100644 --- a/docs/api/schemas/motis/paxmon.yaml +++ b/docs/api/schemas/motis/paxmon.yaml @@ -109,6 +109,8 @@ PaxMonRerouteLogRoute: description: Probability before the reroute new_probability: description: Probability after the reroute + localization: + description: TODO PaxMonRerouteLogEntry: description: Information about a reroute event fields: @@ -128,11 +130,12 @@ PaxMonRerouteLogEntry: description: Index and probabilities of the old route new_routes: description: Indices and probabilities of the new routes - localization: - description: | - Location of the passenger group at the system time of the reroute. - - Alternative routes are searched from this location. + update_number: + description: > + The update number at the time of the reroute. Updates are processed in + batches. Updates with the same update number are processed in the same + batch. Note that there may be multiple update numbers with the same + system time. PaxMonDataSource: description: | Passenger group ID based on the input data. diff --git a/modules/paxforecast/include/motis/paxforecast/affected_route_info.h b/modules/paxforecast/include/motis/paxforecast/affected_route_info.h new file mode 100644 index 000000000..03c686286 --- /dev/null +++ b/modules/paxforecast/include/motis/paxforecast/affected_route_info.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include + +#include "motis/paxmon/broken_transfer_info.h" +#include "motis/paxmon/localization.h" + +namespace motis::paxforecast { + +// probability is stored because it may be changed before the simulation begins +// passenger count is stored for convenience +struct passenger_group_with_route_and_probability { + motis::paxmon::passenger_group_with_route pgwr_{}; + float probability_{}; + std::uint16_t passengers_{}; +}; + +struct affected_route_info { + inline bool broken() const { return broken_transfer_info_.has_value(); } + + passenger_group_with_route_and_probability pgwrap_{}; + unsigned destination_station_id_{}; + motis::paxmon::passenger_localization loc_now_{}; + motis::paxmon::passenger_localization loc_broken_{}; + std::optional broken_transfer_info_; + std::uint32_t alts_now_{}; // index -> alternatives set + std::uint32_t alts_broken_{}; // index -> alternatives set +}; + +} // namespace motis::paxforecast diff --git a/modules/paxforecast/include/motis/paxforecast/alternatives.h b/modules/paxforecast/include/motis/paxforecast/alternatives.h index 6e83347a2..95f854d1c 100644 --- a/modules/paxforecast/include/motis/paxforecast/alternatives.h +++ b/modules/paxforecast/include/motis/paxforecast/alternatives.h @@ -2,6 +2,8 @@ #include +#include "motis/hash_map.h" +#include "motis/pair.h" #include "motis/vector.h" #include "motis/core/schedule/schedule.h" @@ -29,16 +31,45 @@ struct alternative { bool is_original_{}; bool is_recommended_{}; measures::load_level load_info_{measures::load_level::UNKNOWN}; + + // set by simulation: + float pick_probability_{}; +}; + +struct alternative_routing_options { + bool use_cache_{}; + duration pretrip_interval_length_{}; + bool allow_start_metas_{}; + bool allow_dest_metas_{}; +}; + +struct alternatives_request { + motis::paxmon::passenger_localization localization_{}; + unsigned destination_station_id_{}; + + // result, set by alternative routing: + std::vector alternatives_; +}; + +struct alternatives_set { + std::vector requests_; + mcd::hash_map, + std::uint32_t> + request_key_to_idx_; + + std::uint32_t add_request( + motis::paxmon::passenger_localization const& localization, + unsigned destination_station_id); + + void find(motis::paxmon::universe const& uv, schedule const& sched, + routing_cache& cache, alternative_routing_options const& options); }; std::vector find_alternatives( motis::paxmon::universe const& uv, schedule const& sched, - routing_cache& cache, - mcd::vector const& group_measures, - unsigned const destination_station_id, + routing_cache& cache, unsigned const destination_station_id, motis::paxmon::passenger_localization const& localization, - motis::paxmon::compact_journey const* remaining_journey, bool use_cache, - duration pretrip_interval_length, bool allow_start_metas, - bool allow_dest_metas); + alternative_routing_options options); } // namespace motis::paxforecast diff --git a/modules/paxforecast/include/motis/paxforecast/behavior/util.h b/modules/paxforecast/include/motis/paxforecast/behavior/util.h index e52db2268..b96d81a05 100644 --- a/modules/paxforecast/include/motis/paxforecast/behavior/util.h +++ b/modules/paxforecast/include/motis/paxforecast/behavior/util.h @@ -11,6 +11,8 @@ #include "motis/paxmon/passenger_group.h" +#include "motis/paxforecast/alternatives.h" + namespace motis::paxforecast::behavior { template @@ -66,13 +68,13 @@ inline void only_keep_best_alternative(std::vector& probabilities, } inline std::vector calc_new_probabilites( - float const base_prob, std::vector const& pick_probs, + float const base_prob, std::vector const& alts, float const threshold) { - if (pick_probs.empty()) { + if (alts.empty()) { return {}; } auto probs = utl::to_vec( - pick_probs, [&](auto const& pick_prob) { return base_prob * pick_prob; }); + alts, [&](auto const& alt) { return base_prob * alt.pick_probability_; }); auto total_sum = 0.0F; auto kept_sum = 0.0F; auto rescale = false; @@ -91,7 +93,8 @@ inline std::vector calc_new_probabilites( p /= scale; } } else if (kept_sum == 0.0F) { - probs = pick_probs; + probs = utl::to_vec(alts, + [&](auto const& alt) { return alt.pick_probability_; }); only_keep_best_alternative(probs, base_prob); } return probs; diff --git a/modules/paxforecast/include/motis/paxforecast/combined_passenger_group.h b/modules/paxforecast/include/motis/paxforecast/combined_passenger_group.h deleted file mode 100644 index 883602c0a..000000000 --- a/modules/paxforecast/include/motis/paxforecast/combined_passenger_group.h +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include -#include - -#include "motis/core/journey/journey.h" - -#include "motis/paxmon/index_types.h" -#include "motis/paxmon/localization.h" - -#include "motis/paxforecast/alternatives.h" - -namespace motis::paxforecast { - -// probability is stored because it may be changed before the simulation begins -// passenger count is stored for convenience -struct passenger_group_with_route_and_probability { - motis::paxmon::passenger_group_with_route pgwr_{}; - float probability_{}; - std::uint16_t passengers_{}; -}; - -struct combined_passenger_group { - unsigned destination_station_id_{}; - std::uint16_t passengers_{}; - bool has_major_delay_groups_{false}; - motis::paxmon::passenger_localization localization_; - std::vector group_routes_; - std::vector alternatives_; -}; - -} // namespace motis::paxforecast diff --git a/modules/paxforecast/include/motis/paxforecast/load_forecast.h b/modules/paxforecast/include/motis/paxforecast/load_forecast.h deleted file mode 100644 index a31b91bc2..000000000 --- a/modules/paxforecast/include/motis/paxforecast/load_forecast.h +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#include -#include - -#include "motis/core/schedule/schedule.h" - -#include "motis/paxforecast/simulation_result.h" -#include "motis/paxmon/get_load.h" -#include "motis/paxmon/load_info.h" -#include "motis/paxmon/universe.h" - -namespace motis::paxforecast { - -struct load_forecast { - std::vector trips_; -}; - -load_forecast calc_load_forecast(schedule const& sched, - motis::paxmon::universe const& uv, - simulation_result const& sim_result); - -} // namespace motis::paxforecast diff --git a/modules/paxforecast/include/motis/paxforecast/messages.h b/modules/paxforecast/include/motis/paxforecast/messages.h index 23368bb38..438b9e45e 100644 --- a/modules/paxforecast/include/motis/paxforecast/messages.h +++ b/modules/paxforecast/include/motis/paxforecast/messages.h @@ -5,16 +5,10 @@ #include "motis/paxmon/universe.h" -#include "motis/paxforecast/load_forecast.h" #include "motis/paxforecast/measures/measures.h" -#include "motis/paxforecast/simulation_result.h" namespace motis::paxforecast { -motis::module::msg_ptr make_forecast_update_msg( - schedule const& sched, motis::paxmon::universe const& uv, - simulation_result const& sim_result, load_forecast const& lfc); - measures::measure_collection from_fbs( schedule const& sched, flatbuffers::Vector> const* ms); diff --git a/modules/paxforecast/include/motis/paxforecast/paxforecast.h b/modules/paxforecast/include/motis/paxforecast/paxforecast.h index b3111c8bc..c97f8cee2 100644 --- a/modules/paxforecast/include/motis/paxforecast/paxforecast.h +++ b/modules/paxforecast/include/motis/paxforecast/paxforecast.h @@ -39,13 +39,12 @@ struct paxforecast : public motis::module::module { std::string routing_cache_filename_; routing_cache routing_cache_; - bool calc_load_forecast_{true}; - bool publish_load_forecast_{false}; - bool deterministic_mode_{false}; duration min_delay_improvement_{5}; bool revert_forecasts_{false}; float probability_threshold_{0.01F}; + float uninformed_pax_{0.F}; + float major_delay_switch_{0.F}; bool allow_start_metas_{false}; bool allow_dest_metas_{false}; diff --git a/modules/paxforecast/include/motis/paxforecast/revert_forecast.h b/modules/paxforecast/include/motis/paxforecast/revert_forecast.h index 82c1b5008..83e3fb977 100644 --- a/modules/paxforecast/include/motis/paxforecast/revert_forecast.h +++ b/modules/paxforecast/include/motis/paxforecast/revert_forecast.h @@ -2,24 +2,15 @@ #include -#include "motis/hash_map.h" - #include "motis/core/schedule/schedule.h" #include "motis/paxmon/index_types.h" -#include "motis/paxmon/localization.h" #include "motis/paxmon/universe.h" -#include "motis/paxforecast/simulation_result.h" - namespace motis::paxforecast { void revert_forecasts( motis::paxmon::universe& uv, schedule const& sched, - simulation_result const& sim_result, - std::vector const& pgwrs, - mcd::hash_map const& - pgwr_localizations); + std::vector const& pgwrs); } // namespace motis::paxforecast diff --git a/modules/paxforecast/include/motis/paxforecast/simulate_behavior.h b/modules/paxforecast/include/motis/paxforecast/simulate_behavior.h index bf27a9de5..b925c4f55 100644 --- a/modules/paxforecast/include/motis/paxforecast/simulate_behavior.h +++ b/modules/paxforecast/include/motis/paxforecast/simulate_behavior.h @@ -2,156 +2,59 @@ #include #include -#include -#include +#include #include "utl/zip.h" #include "motis/core/schedule/schedule.h" #include "motis/module/context/motis_parallel_for.h" +#include "motis/module/message.h" #include "motis/paxmon/compact_journey.h" #include "motis/paxmon/graph_access.h" #include "motis/paxmon/localization.h" +#include "motis/paxmon/reroute_log_entry.h" #include "motis/paxmon/universe.h" +#include "motis/paxforecast/affected_route_info.h" +#include "motis/paxforecast/alternatives.h" #include "motis/paxforecast/behavior/util.h" -#include "motis/paxforecast/combined_passenger_group.h" -#include "motis/paxforecast/simulation_result.h" namespace motis::paxforecast { -template -double avg(std::vector const& data) { - double mean = 0.; - if (data.empty()) { - return mean; - } - for (auto const& v : data) { - mean += static_cast(v); - } - mean /= static_cast(data.size()); - return mean; -} - -inline void add_group_to_alternative(schedule const& sched, - motis::paxmon::universe& uv, - simulation_result& result, - motis::paxmon::additional_group const& ag, - alternative const& alt) { - for_each_edge( - sched, uv, alt.compact_journey_, - [&](motis::paxmon::journey_leg const&, motis::paxmon::edge const* e) { - if (e->is_trip()) { - result.additional_groups_[e].emplace_back(ag); - } - }); -} - -struct sim_data { - explicit sim_data(simulation_result& result) : result_{result} {} - - void finish_stats(std::uint64_t combined_group_count) { - result_.stats_.combined_group_count_ = combined_group_count; - result_.stats_.found_alt_count_avg_ = avg(found_alt_count_); - result_.stats_.picked_alt_count_avg_ = avg(picked_alt_count_); - result_.stats_.best_alt_prob_avg_ = avg(best_alt_prob_); - result_.stats_.second_alt_prob_avg_ = avg(second_alt_prob_); - } +struct simulation_options { + float probability_threshold_{}; + float uninformed_pax_{}; +}; - simulation_result& result_; - std::mutex result_mutex_; - std::vector found_alt_count_; - std::vector picked_alt_count_; - std::vector best_alt_prob_; - std::vector second_alt_prob_; +struct update_groups_context { + motis::module::message_creator mc_; + std::vector> reroutes_; }; template -inline void simulate_behavior_for_cpg(schedule const& sched, - motis::paxmon::universe& uv, - PassengerBehavior& pb, - combined_passenger_group const& cpg, - sim_data& sd, - float const probability_threshold) { - if (cpg.group_routes_.empty()) { - return; - } - auto const allocation = pb.pick_routes(cpg.alternatives_); - auto guard = std::lock_guard{sd.result_mutex_}; - sd.result_.stats_.group_route_count_ += cpg.group_routes_.size(); - for (auto const& pgwrap : cpg.group_routes_) { - auto& group_route_result = sd.result_.group_route_results_[pgwrap.pgwr_]; - group_route_result.localization_ = &cpg.localization_; - auto const new_probs = behavior::calc_new_probabilites( - pgwrap.probability_, allocation, probability_threshold); - std::uint8_t picked = 0; - for (auto const& [alt, probability] : - utl::zip(cpg.alternatives_, new_probs)) { - if (probability == 0.0F) { - continue; - } - group_route_result.alternative_probabilities_.emplace_back(&alt, - probability); - add_group_to_alternative( - sched, uv, sd.result_, - paxmon::additional_group{pgwrap.passengers_, probability}, alt); - ++picked; - } - sd.found_alt_count_.emplace_back( - static_cast(cpg.alternatives_.size())); - sd.picked_alt_count_.emplace_back(picked); - if (picked == 1) { - sd.best_alt_prob_.emplace_back( - group_route_result.alternative_probabilities_.front().second); - } else if (picked > 1) { - auto top = std::vector>(2); - std::partial_sort_copy( - begin(group_route_result.alternative_probabilities_), - end(group_route_result.alternative_probabilities_), begin(top), - end(top), [](auto const& lhs, auto const& rhs) { - return lhs.second > rhs.second; - }); - sd.best_alt_prob_.emplace_back(top[0].second); - sd.second_alt_prob_.emplace_back(top[1].second); - } +inline void simulate_behavior_for_alternatives(PassengerBehavior& pb, + std::vector& alts) { + auto const allocation = pb.pick_routes(alts); + for (auto const& [alt, probability] : utl::zip(alts, allocation)) { + alt.pick_probability_ = probability; } } template -inline simulation_result simulate_behavior( - schedule const& sched, motis::paxmon::universe& uv, - std::map> const& - combined_groups, - PassengerBehavior& pb, float const probability_threshold) { - simulation_result result; - sim_data sd{result}; - motis_parallel_for(combined_groups, ([&](auto const& cpgs) { - for (auto const& cpg : cpgs.second) { - simulate_behavior_for_cpg(sched, uv, pb, cpg, sd, - probability_threshold); - } - })); - sd.finish_stats(combined_groups.size()); - return result; +inline void simulate_behavior_for_alternatives(PassengerBehavior& pb, + alternatives_set& alts_set) { + motis_parallel_for(alts_set.requests_, [&](auto& req) { + simulate_behavior_for_alternatives(pb, req.alternatives_); + }); } -template -inline simulation_result simulate_behavior( +void simulate_behavior_for_route( schedule const& sched, motis::paxmon::universe& uv, - mcd::hash_map, - combined_passenger_group> const& combined_groups, - PassengerBehavior& pb, float const probability_threshold) { - simulation_result result; - sim_data sd{result}; - motis_parallel_for(combined_groups, ([&](auto const& cpgs) { - simulate_behavior_for_cpg(sched, uv, pb, cpgs.second, sd, - probability_threshold); - })); - sd.finish_stats(combined_groups.size()); - return result; -} + update_groups_context& ug_ctx, simulation_options const& options, + affected_route_info const& ar, std::vector const& alts_now, + std::vector const& alts_broken, + motis::paxmon::reroute_reason_t const default_reroute_reason); } // namespace motis::paxforecast diff --git a/modules/paxforecast/include/motis/paxforecast/simulation_result.h b/modules/paxforecast/include/motis/paxforecast/simulation_result.h deleted file mode 100644 index cedc2b843..000000000 --- a/modules/paxforecast/include/motis/paxforecast/simulation_result.h +++ /dev/null @@ -1,43 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -#include "motis/hash_map.h" - -#include "motis/paxforecast/alternatives.h" - -#include "motis/paxmon/additional_group.h" -#include "motis/paxmon/index_types.h" -#include "motis/paxmon/universe.h" - -namespace motis::paxforecast { - -struct group_simulation_result { - motis::paxmon::passenger_localization const* localization_{}; - std::vector> alternative_probabilities_; -}; - -struct simulation_result_stats { - double found_alt_count_avg_{}; - double picked_alt_count_avg_{}; - double best_alt_prob_avg_{}; - double second_alt_prob_avg_{}; - std::uint64_t group_route_count_{}; - std::uint64_t combined_group_count_{}; -}; - -struct simulation_result { - mcd::hash_map> - additional_groups_; - mcd::hash_map - group_route_results_; - simulation_result_stats stats_; -}; - -} // namespace motis::paxforecast diff --git a/modules/paxforecast/include/motis/paxforecast/statistics.h b/modules/paxforecast/include/motis/paxforecast/statistics.h index 64a4f79af..8d7940ab1 100644 --- a/modules/paxforecast/include/motis/paxforecast/statistics.h +++ b/modules/paxforecast/include/motis/paxforecast/statistics.h @@ -22,11 +22,6 @@ struct tick_statistics { t_find_alternatives_ += rhs.t_find_alternatives_; t_add_alternatives_ += rhs.t_add_alternatives_; t_passenger_behavior_ += rhs.t_passenger_behavior_; - t_calc_load_forecast_ += rhs.t_calc_load_forecast_; - t_load_forecast_fbs_ += rhs.t_load_forecast_fbs_; - t_write_load_forecast_ += rhs.t_write_load_forecast_; - t_publish_load_forecast_ += rhs.t_publish_load_forecast_; - t_total_load_forecast_ += rhs.t_total_load_forecast_; t_update_tracked_groups_ += rhs.t_update_tracked_groups_; t_total_ += rhs.t_total_; @@ -57,11 +52,6 @@ struct tick_statistics { std::uint64_t t_find_alternatives_{}; std::uint64_t t_add_alternatives_{}; std::uint64_t t_passenger_behavior_{}; - std::uint64_t t_calc_load_forecast_{}; - std::uint64_t t_load_forecast_fbs_{}; - std::uint64_t t_write_load_forecast_{}; - std::uint64_t t_publish_load_forecast_{}; - std::uint64_t t_total_load_forecast_{}; std::uint64_t t_update_tracked_groups_{}; std::uint64_t t_total_{}; }; diff --git a/modules/paxforecast/include/motis/paxforecast/update_tracked_groups.h b/modules/paxforecast/include/motis/paxforecast/update_tracked_groups.h deleted file mode 100644 index 89344aa2f..000000000 --- a/modules/paxforecast/include/motis/paxforecast/update_tracked_groups.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include -#include - -#include "motis/hash_map.h" - -#include "motis/core/schedule/schedule.h" - -#include "motis/paxmon/broken_transfer_info.h" -#include "motis/paxmon/localization.h" -#include "motis/paxmon/monitoring_event.h" -#include "motis/paxmon/universe.h" - -#include "motis/paxforecast/simulation_result.h" -#include "motis/paxforecast/statistics.h" - -namespace motis::paxforecast { - -void update_tracked_groups( - schedule const& sched, motis::paxmon::universe& uv, - simulation_result const& sim_result, - std::map const& pgwr_event_types, - std::map> const& - broken_transfer_infos, - mcd::hash_map const& - pgwr_localizations, - tick_statistics& tick_stats, - motis::paxmon::reroute_reason_t const default_reroute_reason); - -} // namespace motis::paxforecast diff --git a/modules/paxforecast/src/alternatives.cc b/modules/paxforecast/src/alternatives.cc index 8c7b55738..e290a3cd8 100644 --- a/modules/paxforecast/src/alternatives.cc +++ b/modules/paxforecast/src/alternatives.cc @@ -19,7 +19,9 @@ #include "motis/core/conv/trip_conv.h" #include "motis/core/journey/message_to_journeys.h" #include "motis/core/journey/print_journey.h" + #include "motis/module/context/motis_call.h" +#include "motis/module/context/motis_spawn.h" #include "motis/module/message.h" #include "motis/paxmon/debug.h" @@ -158,9 +160,7 @@ msg_ptr pretrip_station_query(universe const& uv, schedule const& sched, std::string get_cache_key(schedule const& sched, unsigned const destination_station_id, passenger_localization const& localization, - duration const pretrip_interval_length, - bool const allow_start_metas, - bool const allow_dest_metas) { + alternative_routing_options const& options) { if (localization.in_trip()) { auto const et = to_extern_trip(sched, localization.in_trip_); return fmt::format( @@ -170,7 +170,8 @@ std::string get_cache_key(schedule const& sched, localization.at_station_->eva_nr_.view(), sched.stations_.at(destination_station_id)->eva_nr_.view(), et.station_id_, et.train_nr_, et.time_, et.target_station_id_, - et.target_time_, et.line_id_, allow_start_metas, allow_dest_metas); + et.target_time_, et.line_id_, options.allow_start_metas_, + options.allow_dest_metas_); } else { auto const interchange_time = localization.first_station_ @@ -183,25 +184,23 @@ std::string get_cache_key(schedule const& sched, localization.current_arrival_time_ + interchange_time, localization.at_station_->eva_nr_.view(), sched.stations_.at(destination_station_id)->eva_nr_.view(), - pretrip_interval_length != 0 - ? fmt::format(":{}", pretrip_interval_length) + options.pretrip_interval_length_ != 0 + ? fmt::format(":{}", options.pretrip_interval_length_) : "", - allow_start_metas, allow_dest_metas); + options.allow_start_metas_, options.allow_dest_metas_); } } msg_ptr send_routing_request(universe const& uv, schedule const& sched, unsigned const destination_station_id, passenger_localization const& localization, - duration const pretrip_interval_length, - bool const allow_start_metas, - bool const allow_dest_metas) { + alternative_routing_options const& options) { msg_ptr query_msg; if (localization.in_trip()) { query_msg = ontrip_train_query( uv, sched, localization.in_trip_, localization.at_station_->index_, localization.current_arrival_time_, destination_station_id, - allow_start_metas, allow_dest_metas); + options.allow_start_metas_, options.allow_dest_metas_); } else { auto const interchange_time = localization.first_station_ @@ -210,16 +209,17 @@ msg_ptr send_routing_request(universe const& uv, schedule const& sched, ->transfer_time_; auto const earliest_possible_departure = localization.current_arrival_time_ + interchange_time; - if (pretrip_interval_length == 0) { + if (options.pretrip_interval_length_ == 0) { query_msg = ontrip_station_query( uv, sched, localization.at_station_->index_, earliest_possible_departure, destination_station_id, - allow_start_metas, allow_dest_metas); + options.allow_start_metas_, options.allow_dest_metas_); } else { query_msg = pretrip_station_query( uv, sched, localization.at_station_->index_, - earliest_possible_departure, pretrip_interval_length, - destination_station_id, allow_start_metas, allow_dest_metas); + earliest_possible_departure, options.pretrip_interval_length_, + destination_station_id, options.allow_start_metas_, + options.allow_dest_metas_); } } @@ -230,29 +230,23 @@ msg_ptr get_routing_response(universe const& uv, schedule const& sched, routing_cache& cache, unsigned const destination_station_id, passenger_localization const& localization, - bool const use_cache, - duration const pretrip_interval_length, - bool const allow_start_metas, - bool const allow_dest_metas) { - if (use_cache && cache.is_open()) { + alternative_routing_options const& options) { + if (options.use_cache_ && cache.is_open()) { assert(uv.uses_default_schedule()); - auto const cache_key = get_cache_key(sched, destination_station_id, - localization, pretrip_interval_length, - allow_start_metas, allow_dest_metas); + auto const cache_key = + get_cache_key(sched, destination_station_id, localization, options); auto const cache_key_view = std::string_view{ reinterpret_cast(cache_key.data()), cache_key.size()}; auto msg = cache.get(cache_key_view); if (!msg) { msg = send_routing_request(uv, sched, destination_station_id, - localization, pretrip_interval_length, - allow_start_metas, allow_dest_metas); + localization, options); cache.put(cache_key_view, msg); } return msg; } else { return send_routing_request(uv, sched, destination_station_id, localization, - pretrip_interval_length, allow_start_metas, - allow_dest_metas); + options); } } @@ -261,12 +255,10 @@ msg_ptr get_routing_response(universe const& uv, schedule const& sched, std::vector find_alternative_journeys( universe const& uv, schedule const& sched, routing_cache& cache, unsigned const destination_station_id, - passenger_localization const& localization, bool const use_cache, - duration const pretrip_interval_length, bool const allow_start_metas, - bool const allow_dest_metas, bool const debug) { + passenger_localization const& localization, + alternative_routing_options const& options, bool debug) { auto const response_msg = get_routing_response( - uv, sched, cache, destination_station_id, localization, use_cache, - pretrip_interval_length, allow_start_metas, allow_dest_metas); + uv, sched, cache, destination_station_id, localization, options); auto const response = motis_content(RoutingResponse, response_msg); auto alternatives = message_to_journeys(response); @@ -296,88 +288,39 @@ std::vector find_alternative_journeys( return alternatives; } -bool contains_trip(alternative const& alt, extern_trip const& searched_trip) { - return std::any_of(begin(alt.journey_.trips_), end(alt.journey_.trips_), - [&](journey::trip const& jt) { - return jt.extern_trip_ == searched_trip; - }); -} - -bool is_recommended(alternative const& alt, - measures::trip_recommendation const& m) { - // TODO(pablo): check interchange stop - return contains_trip(alt, m.recommended_trip_); -} - -void check_measures( - alternative& alt, - mcd::vector const& group_measures) { - for (auto const* mv : group_measures) { - std::visit( - utl::overloaded{// - [&](measures::trip_recommendation const& m) { - if (is_recommended(alt, m)) { - alt.is_recommended_ = true; - } - }, - [&](measures::trip_load_information const& m) { - // TODO(pablo): handle case where load - // information for multiple trips in the - // journey is available - if (contains_trip(alt, m.trip_)) { - alt.load_info_ = m.level_; - } - }, - [&](measures::trip_load_recommendation const& m) { - for (auto const& tll : m.full_trips_) { - if (contains_trip(alt, tll.trip_)) { - alt.load_info_ = tll.level_; - } - } - for (auto const& tll : m.recommended_trips_) { - if (contains_trip(alt, tll.trip_)) { - alt.load_info_ = tll.level_; - alt.is_recommended_ = true; - } - } - }}, - *mv); - } -} - std::vector find_alternatives( universe const& uv, schedule const& sched, routing_cache& cache, - mcd::vector const& group_measures, unsigned const destination_station_id, passenger_localization const& localization, - compact_journey const* remaining_journey, bool use_cache, - duration const pretrip_interval_length, bool allow_start_metas, - bool const allow_dest_metas) { + alternative_routing_options options) { // never use cache for schedule forks if (!uv.uses_default_schedule()) { - use_cache = false; + options.use_cache_ = false; } if (!localization.first_station_) { - allow_start_metas = false; + options.allow_start_metas_ = false; } auto const debug = false; // default alternative routing auto const journeys = find_alternative_journeys( - uv, sched, cache, destination_station_id, localization, use_cache, - pretrip_interval_length, allow_start_metas, allow_dest_metas, debug); + uv, sched, cache, destination_station_id, localization, options, debug); auto alternatives = utl::to_vec(journeys, [&](journey const& j) { auto const arrival_time = unix_to_motistime( sched.schedule_begin_, j.stops_.back().arrival_.timestamp_); auto const dur = static_cast(arrival_time - localization.current_arrival_time_); - return alternative{ - j, to_compact_journey(j, sched), arrival_time, dur, j.transfers_, true}; + return alternative{.journey_ = j, + .compact_journey_ = to_compact_journey(j, sched), + .arrival_time_ = arrival_time, + .duration_ = dur, + .transfers_ = j.transfers_}; }); - if (alternatives.empty() && (allow_start_metas || allow_dest_metas)) { + if (alternatives.empty() && + (options.allow_start_metas_ || options.allow_dest_metas_)) { if (std::any_of(begin(localization.at_station_->equivalent_), end(localization.at_station_->equivalent_), [&](auto const& eq) { @@ -390,34 +333,38 @@ std::vector find_alternatives( } } - // TODO(pablo): add additional alternatives for recommended trips (if not - // already found) - if (remaining_journey != nullptr) { - auto recommended_trips_not_found = 0ULL; - for (auto const* mv : group_measures) { - std::visit(utl::overloaded{[&](measures::trip_recommendation const& m) { - if (!std::any_of(begin(alternatives), end(alternatives), - [&](alternative const& alt) { - return is_recommended(alt, m); - })) { - ++recommended_trips_not_found; - } - }}, - *mv); - } - if (recommended_trips_not_found > 0) { - LOG(info) - << recommended_trips_not_found - << " recommended trips not included in any alternative journeys"; - } - } + return alternatives; +} - // TODO(pablo): mark original journey - for (auto& alt : alternatives) { - check_measures(alt, group_measures); +std::uint32_t alternatives_set::add_request( + passenger_localization const& localization, + unsigned const destination_station_id) { + auto const key = mcd::pair{localization, destination_station_id}; + if (auto const it = request_key_to_idx_.find(key); + it != end(request_key_to_idx_)) { + return it->second; + } else { + auto const idx = static_cast(requests_.size()); + requests_.emplace_back(alternatives_request{ + .localization_ = localization, + .destination_station_id_ = destination_station_id}); + request_key_to_idx_[key] = idx; + return idx; } +} - return alternatives; +void alternatives_set::find(universe const& uv, schedule const& sched, + routing_cache& cache, + alternative_routing_options const& options) { + auto futures = utl::to_vec(requests_, [&](alternatives_request& req) { + return spawn_job_void([&uv, &sched, &cache, &req, &options] { + req.alternatives_ = + find_alternatives(uv, sched, cache, req.destination_station_id_, + req.localization_, options); + }); + }); + ctx::await_all(futures); + cache.sync(); } } // namespace motis::paxforecast diff --git a/modules/paxforecast/src/api/apply_measures.cc b/modules/paxforecast/src/api/apply_measures.cc index 5a75abb00..0e676116b 100644 --- a/modules/paxforecast/src/api/apply_measures.cc +++ b/modules/paxforecast/src/api/apply_measures.cc @@ -3,6 +3,7 @@ #include "motis/core/common/date_time_util.h" #include "motis/core/common/logging.h" #include "motis/core/common/raii.h" +#include "motis/core/common/timing.h" #include "motis/core/access/trip_access.h" @@ -16,13 +17,12 @@ #include "motis/paxmon/loader/capacities/load_capacities.h" -#include "motis/paxforecast/combined_passenger_group.h" +#include "motis/paxforecast/affected_route_info.h" #include "motis/paxforecast/error.h" #include "motis/paxforecast/messages.h" #include "motis/paxforecast/paxforecast.h" #include "motis/paxforecast/simulate_behavior.h" #include "motis/paxforecast/universe_data.h" -#include "motis/paxforecast/update_tracked_groups.h" #include "motis/paxforecast/measures/affected_groups.h" #include "motis/paxforecast/measures/measures.h" @@ -36,6 +36,10 @@ using namespace motis::paxforecast; namespace motis::paxforecast::api { +using sim_combined_routes_t = mcd::hash_map< + mcd::pair, + std::vector>; + void apply_update_capacities_measure(universe& uv, schedule const& sched, measures::update_capacities const& m) { auto& caps = uv.capacity_maps_; @@ -93,6 +97,129 @@ void apply_override_capacity_measure(universe& uv, schedule const& sched, } } +bool contains_trip(alternative const& alt, extern_trip const& searched_trip) { + return std::any_of(begin(alt.journey_.trips_), end(alt.journey_.trips_), + [&](journey::trip const& jt) { + return jt.extern_trip_ == searched_trip; + }); +} + +bool is_recommended(alternative const& alt, + measures::trip_recommendation const& m) { + // TODO(pablo): check interchange stop + return contains_trip(alt, m.recommended_trip_); +} + +void check_measures( + alternative& alt, + mcd::vector const& group_measures) { + for (auto const* mv : group_measures) { + std::visit( + utl::overloaded{// + [&](measures::trip_recommendation const& m) { + if (is_recommended(alt, m)) { + alt.is_recommended_ = true; + } + }, + [&](measures::trip_load_information const& m) { + // TODO(pablo): handle case where load + // information for multiple trips in the + // journey is available + if (contains_trip(alt, m.trip_)) { + alt.load_info_ = m.level_; + } + }, + [&](measures::trip_load_recommendation const& m) { + for (auto const& tll : m.full_trips_) { + if (contains_trip(alt, tll.trip_)) { + alt.load_info_ = tll.level_; + } + } + for (auto const& tll : m.recommended_trips_) { + if (contains_trip(alt, tll.trip_)) { + alt.load_info_ = tll.level_; + alt.is_recommended_ = true; + } + } + }}, + *mv); + } +} + +template +void sim_and_update_groups( + paxforecast& mod, universe& uv, schedule const& sched, + PassengerBehavior& pb, + measures::affected_groups_info const& affected_groups, + std::vector const& affected_routes, + alternatives_set const& alts_set, sim_combined_routes_t const& combined, + tick_statistics& tick_stats) { + auto const constexpr REROUTE_BATCH_SIZE = 5'000; + + MOTIS_START_TIMING(update_tracked_groups); + + auto const options = + simulation_options{.probability_threshold_ = mod.probability_threshold_, + .uninformed_pax_ = mod.uninformed_pax_}; + auto ug_ctx = update_groups_context{.mc_ = message_creator()}; + auto const empty_alts = std::vector{}; + + auto const send_reroutes = [&]() { + if (ug_ctx.reroutes_.empty()) { + return; + } + tick_stats.rerouted_group_routes_ += ug_ctx.reroutes_.size(); + LOG(motis::logging::info) + << "update_groups: sending " << ug_ctx.reroutes_.size() << " reroutes"; + ug_ctx.mc_.create_and_finish( + MsgContent_PaxMonRerouteGroupsRequest, + CreatePaxMonRerouteGroupsRequest( + ug_ctx.mc_, uv.id_, ug_ctx.mc_.CreateVector(ug_ctx.reroutes_)) + .Union(), + "/paxmon/reroute_groups"); + auto const msg = make_msg(ug_ctx.mc_); + motis_call(msg)->val(); + ug_ctx.reroutes_.clear(); + ug_ctx.mc_.Clear(); + }; + + for (auto const& ce : combined) { + auto const& remaining_planned_journey = ce.first.second; + auto const& affected_route_indices = ce.second; + if (affected_route_indices.empty()) { + continue; + } + + auto const& first_affected_route = + affected_routes.at(affected_route_indices.front()); + auto const& group_measures = + affected_groups.measures_.at(first_affected_route.pgwrap_.pgwr_); + auto alts = + alts_set.requests_.at(first_affected_route.alts_now_).alternatives_; + + for (auto& alt : alts) { + check_measures(alt, group_measures); + alt.is_original_ = (alt.compact_journey_ == remaining_planned_journey); + } + + simulate_behavior_for_alternatives(pb.pb_, alts); + + for (auto const ar_idx : affected_route_indices) { + auto const& ar = affected_routes.at(ar_idx); + simulate_behavior_for_route(sched, uv, ug_ctx, options, ar, alts, + empty_alts, reroute_reason_t::SIMULATION); + if (ug_ctx.reroutes_.size() >= REROUTE_BATCH_SIZE) { + send_reroutes(); + } + } + } + + send_reroutes(); + + MOTIS_STOP_TIMING(update_tracked_groups); + tick_stats.t_update_tracked_groups_ += MOTIS_TIMING_MS(update_tracked_groups); +} + msg_ptr apply_measures(paxforecast& mod, paxmon_data& data, msg_ptr const& msg) { scoped_timer const all_timer{"apply_measures"}; @@ -212,63 +339,53 @@ msg_ptr apply_measures(paxforecast& mod, paxmon_data& data, total_affected_groups += affected_groups.measures_.size(); LOG(info) << "affected groups: " << affected_groups.measures_.size(); - // combine groups by (localization, remaining planned journey) - auto combined = - mcd::hash_map, - combined_passenger_group>{}; - mcd::hash_map - pgwr_localizations; + auto affected_routes = std::vector{}; + auto alts_set = alternatives_set{}; + auto combined = sim_combined_routes_t{}; + for (auto const& [pgwr, loc] : affected_groups.localization_) { auto const& pg = uv.passenger_groups_.group(pgwr.pg_); auto const& gr = uv.passenger_groups_.route(pgwr); auto const cj = uv.passenger_groups_.journey(gr.compact_journey_index_); auto const remaining_planned_journey = get_suffix(sched, cj, loc); - pgwr_localizations[pgwr] = &loc; + if (remaining_planned_journey.legs_.empty()) { continue; } - auto& cpg = combined[{loc, remaining_planned_journey}]; - cpg.group_routes_.emplace_back(passenger_group_with_route_and_probability{ - pgwr, gr.probability_, pg.passengers_}); - cpg.passengers_ += pg.passengers_; - cpg.localization_ = loc; - } - LOG(info) << "combined: " << combined.size(); + auto& ar = affected_routes.emplace_back(affected_route_info{ + .pgwrap_ = + passenger_group_with_route_and_probability{ + .pgwr_ = pgwr, + .probability_ = gr.probability_, + .passengers_ = pg.passengers_}, + .destination_station_id_ = cj.destination_station_id(), + .loc_now_ = loc, + }); + + ar.alts_now_ = + alts_set.add_request(ar.loc_now_, ar.destination_station_id_); + + combined[{loc, remaining_planned_journey}].emplace_back( + static_cast(affected_routes.size() - 1)); + } manual_timer alternatives_timer{"apply_measures: find alternatives"}; - std::vector> futures; - for (auto& ce : combined) { - auto const& loc = ce.first.first; - auto const& remaining_planned_journey = ce.first.second; - auto& cpg = ce.second; - - std::set measures_set; - for (auto const& pgwrap : cpg.group_routes_) { - for (auto const* mv : affected_groups.measures_.at(pgwrap.pgwr_)) { - measures_set.insert(mv); - } - } - auto group_measures = mcd::to_vec(measures_set); - futures.emplace_back(spawn_job_void([&, group_measures] { - cpg.alternatives_ = find_alternatives( - uv, sched, mod.routing_cache_, group_measures, - remaining_planned_journey.destination_station_id(), loc, - &remaining_planned_journey, false, 61, mod.allow_start_metas_, - mod.allow_dest_metas_); - })); - } - ctx::await_all(futures); - mod.routing_cache_.sync(); + alts_set.find(uv, sched, mod.routing_cache_, + alternative_routing_options{ + .use_cache_ = false, + .pretrip_interval_length_ = 61, + .allow_start_metas_ = mod.allow_start_metas_, + .allow_dest_metas_ = mod.allow_dest_metas_}); alternatives_timer.stop_and_print(); t_find_alternatives += alternatives_timer.duration_ms(); - total_alternative_routings += combined.size(); + total_alternative_routings += alts_set.requests_.size(); { manual_timer alt_trips_timer{"add alternatives to graph"}; - for (auto& [grp_key, cpg] : combined) { - total_alternatives_found += cpg.alternatives_.size(); - for (auto const& alt : cpg.alternatives_) { + for (auto const& req : alts_set.requests_) { + total_alternatives_found += req.alternatives_.size(); + for (auto const& alt : req.alternatives_) { for (auto const& leg : alt.compact_journey_.legs_) { get_or_add_trip(sched, uv, leg.trip_idx_); } @@ -278,17 +395,12 @@ msg_ptr apply_measures(paxforecast& mod, paxmon_data& data, t_add_alternatives_to_graph += alt_trips_timer.duration_ms(); } - manual_timer sim_timer{"passenger behavior simulation"}; + manual_timer update_groups_timer{"sim + update groups"}; + auto tick_stats = tick_statistics{}; auto pb = behavior::default_behavior{mod.deterministic_mode_}; - auto const sim_result = simulate_behavior(sched, uv, combined, pb.pb_, - mod.probability_threshold_); - sim_timer.stop_and_print(); - t_behavior_simulation += sim_timer.duration_ms(); - - manual_timer update_groups_timer{"update groups"}; - tick_statistics tick_stats; - update_tracked_groups(sched, uv, sim_result, {}, {}, pgwr_localizations, - tick_stats, reroute_reason_t::SIMULATION); + sim_and_update_groups(mod, uv, sched, pb, affected_groups, affected_routes, + alts_set, combined, tick_stats); + update_groups_timer.stop_and_print(); t_update_groups += update_groups_timer.duration_ms(); uv_storage.metrics_.add(sched.system_time_, now(), tick_stats); diff --git a/modules/paxforecast/src/load_forecast.cc b/modules/paxforecast/src/load_forecast.cc deleted file mode 100644 index c54e56bb7..000000000 --- a/modules/paxforecast/src/load_forecast.cc +++ /dev/null @@ -1,79 +0,0 @@ -#include "motis/paxforecast/load_forecast.h" - -#include - -#include "utl/pipes.h" -#include "utl/to_vec.h" - -#include "motis/hash_map.h" -#include "motis/hash_set.h" - -#include "motis/core/common/logging.h" -#include "motis/module/context/motis_parallel_for.h" - -#include "motis/paxmon/load_info.h" - -using namespace motis::paxmon; -using namespace motis::logging; - -namespace motis::paxforecast { - -load_forecast calc_load_forecast(schedule const& sched, universe const& uv, - simulation_result const& sim_result) { - mcd::hash_map edges; - mcd::hash_set trips; - std::mutex mutex; - - LOG(info) << "calc_load_forecast: " << sim_result.additional_groups_.size() - << " edges with additional groups"; - - motis_parallel_for(sim_result.additional_groups_, [&](auto const& entry) { - auto const e = entry.first; - if (!e->is_trip()) { - return; - } - if (e->clasz_ != service_class::ICE && e->clasz_ != service_class::IC && - e->clasz_ != service_class::OTHER) { - return; - } - auto const& additional_groups = entry.second; - auto pdf = get_load_pdf(uv.passenger_groups_, - uv.pax_connection_info_.group_routes(e->pci_)); - add_additional_groups(pdf, additional_groups); - auto cdf = get_cdf(pdf); - - std::lock_guard const guard{mutex}; - edges.emplace( - e, make_edge_load_info(uv, e, std::move(pdf), std::move(cdf), true)); - for (auto const& trp : e->get_trips(sched)) { - trips.emplace(trp); - } - }); - - load_forecast lfc; - lfc.trips_ = utl::to_vec(trips, [&](auto const trp) { - return trip_load_info{ - trp, - utl::all(uv.trip_data_.edges(trp)) // - | utl::transform([&](auto const& e) { return e.get(uv); }) // - | utl::remove_if([](auto const e) { return !e->is_trip(); }) // - | utl::transform([&](auto const e) { - auto const it = edges.find(e); - if (it != end(edges)) { - return it->second; - } else { - auto pdf = get_load_pdf( - uv.passenger_groups_, - uv.pax_connection_info_.group_routes(e->pci_)); - auto cdf = get_cdf(pdf); - return make_edge_load_info(uv, e, std::move(pdf), - std::move(cdf), false); - } - }) // - | utl::vec()}; - }); - - return lfc; -} - -} // namespace motis::paxforecast diff --git a/modules/paxforecast/src/messages.cc b/modules/paxforecast/src/messages.cc index fa6d5b7fe..3af408f47 100644 --- a/modules/paxforecast/src/messages.cc +++ b/modules/paxforecast/src/messages.cc @@ -18,51 +18,6 @@ using namespace motis::paxmon; namespace motis::paxforecast { -Offset get_passenger_group_route_forecast( - FlatBufferBuilder& fbb, schedule const& sched, universe const& uv, - passenger_group_with_route const& pgwr, - group_simulation_result const& group_result) { - return CreatePaxForecastGroupRoute( - fbb, to_fbs(sched, uv.passenger_groups_, fbb, pgwr), - fbs_localization_type(*group_result.localization_), - to_fbs(sched, fbb, *group_result.localization_), - fbb.CreateVector(utl::to_vec( - group_result.alternative_probabilities_, [&](auto const& alt) { - return CreatePaxForecastAlternative( - fbb, to_fbs(sched, fbb, alt.first->compact_journey_), - alt.second); - }))); -} - -Offset>> to_fbs(FlatBufferBuilder& fbb, - schedule const& sched, - universe const& uv, - load_forecast const& lfc) { - return fbb.CreateVector(utl::to_vec(lfc.trips_, [&](auto const& tfc) { - return to_fbs(fbb, sched, uv, tfc); - })); -} - -msg_ptr make_forecast_update_msg(schedule const& sched, universe const& uv, - simulation_result const& sim_result, - load_forecast const& lfc) { - message_creator fbb; - fbb.create_and_finish( - MsgContent_PaxForecastUpdate, - CreatePaxForecastUpdate(fbb, uv.id_, sched.system_time_, - fbb.CreateVector(utl::to_vec( - sim_result.group_route_results_, - [&](auto const& entry) { - return get_passenger_group_route_forecast( - fbb, sched, uv, entry.first, - entry.second); - })), - to_fbs(fbb, sched, uv, lfc)) - .Union(), - "/paxforecast/passenger_forecast"); - return make_msg(fbb); -} - std::uint32_t get_station_index(schedule const& sched, String const* eva) { return get_station(sched, {eva->c_str(), eva->Length()})->index_; } diff --git a/modules/paxforecast/src/monitoring_update.cc b/modules/paxforecast/src/monitoring_update.cc index 136bb43a9..b78936e55 100644 --- a/modules/paxforecast/src/monitoring_update.cc +++ b/modules/paxforecast/src/monitoring_update.cc @@ -1,13 +1,12 @@ #include "motis/paxforecast/monitoring_update.h" -#include +#include +#include #include #include #include "fmt/format.h" -#include "utl/erase_if.h" -#include "utl/to_vec.h" #include "utl/verify.h" #include "motis/core/common/date_time_util.h" @@ -15,9 +14,10 @@ #include "motis/core/common/timing.h" #include "motis/module/context/motis_call.h" -#include "motis/module/context/motis_publish.h" #include "motis/module/context/motis_spawn.h" +#include "motis/core/access/trip_access.h" + #include "motis/paxmon/fbs_compact_journey_util.h" #include "motis/paxmon/get_universe.h" #include "motis/paxmon/graph_access.h" @@ -26,13 +26,11 @@ #include "motis/paxmon/messages.h" #include "motis/paxmon/monitoring_event.h" -#include "motis/paxforecast/combined_passenger_group.h" -#include "motis/paxforecast/messages.h" +#include "motis/paxforecast/affected_route_info.h" #include "motis/paxforecast/paxforecast.h" #include "motis/paxforecast/revert_forecast.h" #include "motis/paxforecast/simulate_behavior.h" #include "motis/paxforecast/universe_data.h" -#include "motis/paxforecast/update_tracked_groups.h" #include "motis/paxforecast/behavior/default_behavior.h" @@ -42,13 +40,6 @@ using namespace motis::logging; namespace motis::paxforecast { -auto const constexpr REMOVE_GROUPS_BATCH_SIZE = 10'000; - -struct passenger_group_with_route_and_localization { - passenger_group_with_route pgwr_{}; - passenger_localization const* loc_{}; -}; - void log_destination_reachable( universe& uv, schedule const& sched, passenger_group_with_route_and_probability const& pgwrap, @@ -73,390 +64,304 @@ void log_destination_reachable( log_entries.emplace_back(reroute_log_entry{ static_cast(log_new_routes.index()), reroute_log_route_info{pgwrap.pgwr_.route_, pgwrap.probability_, - pgwrap.probability_}, + pgwrap.probability_, to_log_localization(loc)}, sched.system_time_, now(), + uv.update_number_, reroute_reason_t::DESTINATION_REACHABLE, - {}, - to_log_localization(loc)}); + {}}); } } -bool has_better_alternative(std::vector const& alts, - time expected_arrival_time, - duration min_improvement) { - auto const latest_accepted_arrival = expected_arrival_time - min_improvement; - return std::any_of(begin(alts), end(alts), - [latest_accepted_arrival](alternative const& alt) { - return alt.arrival_time_ <= latest_accepted_arrival; - }); -} +void find_alternatives_set(paxforecast& mod, universe& uv, + schedule const& sched, tick_statistics& tick_stats, + alternatives_set& alts_set) { + { + MOTIS_START_TIMING(find_alternatives); + scoped_timer const alt_timer{"on_monitoring_event: find alternatives"}; + LOG(info) << "find alternatives: " << alts_set.requests_.size() + << " routing requests (using cache=" + << mod.routing_cache_.is_open() << ")..."; + tick_stats.routing_requests_ += alts_set.requests_.size(); + alts_set.find(uv, sched, mod.routing_cache_, + alternative_routing_options{ + .use_cache_ = true, + .pretrip_interval_length_ = 0, + .allow_start_metas_ = mod.allow_start_metas_, + .allow_dest_metas_ = mod.allow_dest_metas_}); + MOTIS_STOP_TIMING(find_alternatives); + tick_stats.t_find_alternatives_ += MOTIS_TIMING_MS(find_alternatives); + } -// TODO(pablo): major delay groups -> "broken_group_routes" -void send_remove_group_routes( - schedule const& sched, universe const& uv, - std::vector& - group_routes_to_remove, - std::map> const& broken_transfer_infos, - tick_statistics& tick_stats, reroute_reason_t const reason) { - if (group_routes_to_remove.empty()) { - return; + auto alternatives_found = 0ULL; + { + MOTIS_START_TIMING(add_alternatives); + scoped_timer const alt_trips_timer{"add alternatives to graph"}; + for (auto const& req : alts_set.requests_) { + alternatives_found += req.alternatives_.size(); + for (auto const& alt : req.alternatives_) { + for (auto const& leg : alt.compact_journey_.legs_) { + get_or_add_trip(sched, uv, leg.trip_idx_); + } + } + } + tick_stats.alternatives_found_ += alternatives_found; + MOTIS_STOP_TIMING(add_alternatives); + tick_stats.t_add_alternatives_ += MOTIS_TIMING_MS(add_alternatives); } - tick_stats.removed_group_routes_ += group_routes_to_remove.size(); - message_creator mc; - mc.create_and_finish( - MsgContent_PaxMonRerouteGroupsRequest, - CreatePaxMonRerouteGroupsRequest( - mc, uv.id_, - mc.CreateVector(utl::to_vec( - group_routes_to_remove, - [&](auto const& pgwrl) { - auto const& pgwr = pgwrl.pgwr_; - return CreatePaxMonRerouteGroup( - mc, pgwr.pg_, pgwr.route_, - mc.CreateVector( - std::vector>{}), - static_cast(reason), - broken_transfer_info_to_fbs(mc, sched, - broken_transfer_infos.at(pgwr)), - false, - mc.CreateVector(std::vector{ - to_fbs_localization_wrapper(sched, mc, *pgwrl.loc_)})); - }))) - .Union(), - "/paxmon/reroute_groups"); - auto const remove_msg = make_msg(mc); - motis_call(remove_msg)->val(); - group_routes_to_remove.clear(); -} -void on_monitoring_update(paxforecast& mod, paxmon_data& data, - msg_ptr const& msg) { - auto const mon_update = motis_content(PaxMonUpdate, msg); - MOTIS_START_TIMING(total); - auto const uv_access = get_universe_and_schedule(data, mon_update->universe(), - ctx::access_t::WRITE); - auto const& sched = uv_access.sched_; - auto& uv = uv_access.uv_; + LOG(info) << "alternatives: " << alts_set.requests_.size() + << " routing requests => " << alternatives_found << " alternatives"; +} - tick_statistics tick_stats; - tick_stats.system_time_ = sched.system_time_; +passenger_group_with_route_and_probability event_to_pgwrap( + PaxMonEvent const* event) { + return passenger_group_with_route_and_probability{ + passenger_group_with_route{ + static_cast(event->group_route()->group_id()), + static_cast( + event->group_route()->route()->index())}, + event->group_route()->route()->probability(), + event->group_route()->passenger_count()}; +} - auto const current_time = - unix_to_motistime(sched.schedule_begin_, sched.system_time_); - utl::verify(current_time != INVALID_TIME, - "paxforecast::on_monitoring_event: invalid current system time: " - "system_time={}, schedule_begin={}", - sched.system_time_, sched.schedule_begin_); +void run_simulation(paxforecast& mod, tick_statistics& tick_stats, + alternatives_set& alts_set) { + MOTIS_START_TIMING(passenger_behavior); + auto pb = behavior::default_behavior{mod.deterministic_mode_}; + simulate_behavior_for_alternatives(pb.pb_, alts_set); + MOTIS_STOP_TIMING(passenger_behavior); + tick_stats.t_passenger_behavior_ += MOTIS_TIMING_MS(passenger_behavior); +} - std::map> - combined_groups; - std::map pgwr_event_types; - std::map expected_arrival_times; - std::map> - broken_transfer_infos; - std::vector unbroken_transfers; - auto delayed_group_routes = 0ULL; +void update_groups(universe& uv, schedule const& sched, + std::vector const& affected_routes, + alternatives_set const& alts_set, + tick_statistics& tick_stats, + simulation_options const& options, + reroute_reason_t const reason) { + auto const constexpr REROUTE_BATCH_SIZE = 5'000; - for (auto const& event : *mon_update->events()) { - auto const pgwr = passenger_group_with_route{ - static_cast(event->group_route()->group_id()), - static_cast( - event->group_route()->route()->index())}; - auto pgwrap = passenger_group_with_route_and_probability{ - pgwr, event->group_route()->route()->probability(), - event->group_route()->passenger_count()}; - auto const localization = - from_fbs(sched, event->localization_type(), event->localization()); - auto const destination_station_id = get_destination_station_id( - sched, event->group_route()->route()->journey()); + MOTIS_START_TIMING(update_tracked_groups); - auto const next_stop_is_destination = - localization.at_station_->index_ == destination_station_id; + auto ug_ctx = update_groups_context{.mc_ = message_creator()}; + auto const empty_alts = std::vector{}; - if (event->type() == PaxMonEventType_NO_PROBLEM) { - unbroken_transfers.push_back(pgwr); - log_destination_reachable(uv, sched, pgwrap, localization); - // TODO(pablo): if current p=0, behavior simulation won't work - // check if we need it anyway - continue; - // if (event->group_route()->route()->planned()) { - // continue; - // } - } else if ((next_stop_is_destination && - event->type() != PaxMonEventType_BROKEN_TRANSFER) || - pgwrap.probability_ == 0.0F) { - continue; + auto const send_reroutes = [&]() { + if (ug_ctx.reroutes_.empty()) { + return; + } + tick_stats.rerouted_group_routes_ += ug_ctx.reroutes_.size(); + LOG(motis::logging::info) + << "update_groups: sending " << ug_ctx.reroutes_.size() << " reroutes"; + ug_ctx.mc_.create_and_finish( + MsgContent_PaxMonRerouteGroupsRequest, + CreatePaxMonRerouteGroupsRequest( + ug_ctx.mc_, uv.id_, ug_ctx.mc_.CreateVector(ug_ctx.reroutes_)) + .Union(), + "/paxmon/reroute_groups"); + auto const msg = make_msg(ug_ctx.mc_); + motis_call(msg)->val(); + ug_ctx.reroutes_.clear(); + ug_ctx.mc_.Clear(); + }; + + for (auto const& ar : affected_routes) { + simulate_behavior_for_route( + sched, uv, ug_ctx, options, ar, + alts_set.requests_.at(ar.alts_now_).alternatives_, + ar.loc_broken_.valid() + ? alts_set.requests_.at(ar.alts_broken_).alternatives_ + : empty_alts, + reason); + if (ug_ctx.reroutes_.size() >= REROUTE_BATCH_SIZE) { + send_reroutes(); } + } - auto const major_delay = - event->type() == PaxMonEventType_MAJOR_DELAY_EXPECTED; + send_reroutes(); - if (major_delay) { - ++delayed_group_routes; - expected_arrival_times.insert( - {pgwr, unix_to_motistime(sched.schedule_begin_, - event->expected_arrival_time())}); + MOTIS_STOP_TIMING(update_tracked_groups); + tick_stats.t_update_tracked_groups_ += MOTIS_TIMING_MS(update_tracked_groups); +} + +void handle_broken_transfers(paxforecast& mod, universe& uv, + schedule const& sched, tick_statistics& tick_stats, + PaxMonUpdate const* mon_update) { + auto const use_uninformed_pax = mod.uninformed_pax_ > 0.F; + auto affected_routes = std::vector{}; + auto alts_set = alternatives_set{}; + + for (auto const& event : *mon_update->events()) { + if (event->type() != PaxMonEventType_BROKEN_TRANSFER) { + continue; } - auto const inserted = pgwr_event_types.insert( - {pgwr, static_cast(event->type())}); - utl::verify(inserted.second, - "multiple monitoring updates for passenger group"); - broken_transfer_infos[pgwr] = - from_fbs(sched, event->reachability()->broken_transfer()); - - auto& destination_groups = combined_groups[destination_station_id]; - // TODO(pablo): localization includes the scheduled arrival time, which - // is needed later (journey prefix calculation). to make sure this works, - // the scheduled time is currently included in the comparison. - // it might be better to only check the current arrival time - // and store the scheduled arrival time / localization per group - // instead of per combined group. - auto cpg = std::find_if( - begin(destination_groups), end(destination_groups), - [&](auto const& g) { return g.localization_ == localization; }); - if (cpg == end(destination_groups)) { - destination_groups.emplace_back( - combined_passenger_group{destination_station_id, - event->group_route()->passenger_count(), - major_delay, - localization, - {pgwrap}, - {}}); - } else { - cpg->passengers_ += event->group_route()->passenger_count(); - cpg->group_routes_.push_back(pgwrap); - if (major_delay) { - cpg->has_major_delay_groups_ = true; - } + auto const pgwrap = event_to_pgwrap(event); + auto loc_now = + from_fbs(sched, event->localization_type(), event->localization()); + auto const destination_station_id = get_destination_station_id( + sched, event->group_route()->route()->journey()); + + if (pgwrap.probability_ == 0.F) { + continue; } - } - mcd::hash_map - pgwr_localizations; - for (auto& cgs : combined_groups) { - for (auto& cpg : cgs.second) { - for (auto const& pgwrap : cpg.group_routes_) { - pgwr_localizations[pgwrap.pgwr_] = &cpg.localization_; - } + auto& ar = affected_routes.emplace_back(affected_route_info{ + .pgwrap_ = pgwrap, + .destination_station_id_ = destination_station_id, + .loc_now_ = std::move(loc_now), + .broken_transfer_info_ = + from_fbs(sched, event->reachability()->broken_transfer()), + }); + + ar.alts_now_ = + alts_set.add_request(ar.loc_now_, ar.destination_station_id_); + + if (use_uninformed_pax && ar.broken_transfer_info_) { + ar.loc_broken_ = localize_broken_transfer( + sched, + uv.passenger_groups_.journey( + uv.passenger_groups_.route(ar.pgwrap_.pgwr_) + .compact_journey_index_), + *ar.broken_transfer_info_); + ar.alts_broken_ = + alts_set.add_request(ar.loc_broken_, ar.destination_station_id_); } } - if (combined_groups.empty()) { - if (!unbroken_transfers.empty() && mod.revert_forecasts_) { - revert_forecasts(uv, sched, simulation_result{}, unbroken_transfers, - pgwr_localizations); - } + if (alts_set.requests_.empty()) { return; } - LOG(info) << mon_update->events()->size() << " monitoring updates, " - << pgwr_event_types.size() << " groups, " << combined_groups.size() - << " combined groups"; - - tick_stats.monitoring_events_ = mon_update->events()->size(); - tick_stats.group_routes_ = pgwr_event_types.size(); - tick_stats.combined_groups_ = combined_groups.size(); - tick_stats.major_delay_group_routes_ = delayed_group_routes; + tick_stats.group_routes_ += affected_routes.size(); + tick_stats.combined_groups_ += alts_set.requests_.size(); - auto routing_requests = 0ULL; - auto alternatives_found = 0ULL; + find_alternatives_set(mod, uv, sched, tick_stats, alts_set); + run_simulation(mod, tick_stats, alts_set); + update_groups( + uv, sched, affected_routes, alts_set, tick_stats, + simulation_options{.probability_threshold_ = mod.probability_threshold_, + .uninformed_pax_ = mod.uninformed_pax_}, + reroute_reason_t::BROKEN_TRANSFER); +} - { - MOTIS_START_TIMING(find_alternatives); - scoped_timer const alt_timer{"on_monitoring_event: find alternatives"}; - std::vector> futures; - for (auto& cgs : combined_groups) { - auto const destination_station_id = cgs.first; - for (auto& cpg : cgs.second) { - ++routing_requests; - futures.emplace_back( - spawn_job_void([&mod, &uv, &sched, destination_station_id, &cpg] { - cpg.alternatives_ = find_alternatives( - uv, sched, mod.routing_cache_, {}, destination_station_id, - cpg.localization_, nullptr, true, 0, mod.allow_start_metas_, - mod.allow_dest_metas_); - })); - } - } - LOG(info) << "find alternatives: " << routing_requests - << " routing requests (using cache=" - << mod.routing_cache_.is_open() << ")..."; - ctx::await_all(futures); - mod.routing_cache_.sync(); - MOTIS_STOP_TIMING(find_alternatives); - tick_stats.t_find_alternatives_ = MOTIS_TIMING_MS(find_alternatives); +void handle_major_delays(paxforecast& mod, universe& uv, schedule const& sched, + tick_statistics& tick_stats, + PaxMonUpdate const* mon_update) { + auto const switch_prob = mod.major_delay_switch_; + auto const stay_prob = 1.F - switch_prob; + if (switch_prob == 0.F) { + return; } - { - MOTIS_START_TIMING(add_alternatives); - scoped_timer const alt_trips_timer{"add alternatives to graph"}; - for (auto& cgs : combined_groups) { - for (auto& cpg : cgs.second) { - alternatives_found += cpg.alternatives_.size(); - for (auto const& alt : cpg.alternatives_) { - for (auto const& leg : alt.compact_journey_.legs_) { - get_or_add_trip(sched, uv, leg.trip_idx_); - } - } - } + auto affected_routes = std::vector{}; + auto alts_set = alternatives_set{}; + + auto last_pgi = std::numeric_limits::max(); + for (auto const& event : *mon_update->events()) { + if (event->type() != PaxMonEventType_MAJOR_DELAY_EXPECTED) { + continue; } - MOTIS_STOP_TIMING(add_alternatives); - tick_stats.t_add_alternatives_ = MOTIS_TIMING_MS(add_alternatives); - } - LOG(info) << "alternatives: " << routing_requests << " routing requests => " - << alternatives_found << " alternatives"; + auto pgwrap = event_to_pgwrap(event); + // probability may have changed because of broken transfers in other + // routes of the same group + pgwrap.probability_ = uv.passenger_groups_.route(pgwrap.pgwr_).probability_; - tick_stats.routing_requests_ = routing_requests; - tick_stats.alternatives_found_ = alternatives_found; + if (pgwrap.probability_ == 0.F) { + continue; + } - auto removed_group_route_count = 0ULL; - if (delayed_group_routes > 0) { - std::vector - group_routes_to_remove; - for (auto& cgs : combined_groups) { - for (auto& cpg : cgs.second) { - if (!cpg.has_major_delay_groups_) { - continue; - } + if (last_pgi == pgwrap.pgwr_.pg_) { + // TODO(pablo): major delays for more than one route per group + // are currently not supported because processing them separately + // breaks things + LOG(info) << "skipping major delay for group " << pgwrap.pgwr_.pg_ + << " because of multiple delayed routes"; + continue; + } + last_pgi = pgwrap.pgwr_.pg_; - // remove groups without better alternatives from cpg - // so that they are not included in the simulation - // (they remain unchanged) - utl::erase_if( - cpg.group_routes_, - [&](passenger_group_with_route_and_probability const& pgwrap) { - if (pgwr_event_types.at(pgwrap.pgwr_) != - monitoring_event_type::MAJOR_DELAY_EXPECTED) { - return false; - } - auto const expected_current_arrival_time = - expected_arrival_times.at(pgwrap.pgwr_); - utl::verify(expected_current_arrival_time != INVALID_TIME, - "invalid expected arrival time for delayed group"); - return !has_better_alternative(cpg.alternatives_, - expected_current_arrival_time, - mod.min_delay_improvement_); - }); - - // group routes with better alternatives are removed from the paxmon - // graph and included in the simulation - for (auto const& pgwrap : cpg.group_routes_) { - if (pgwr_event_types.at(pgwrap.pgwr_) == - monitoring_event_type::MAJOR_DELAY_EXPECTED) { - group_routes_to_remove.emplace_back( - passenger_group_with_route_and_localization{ - pgwrap.pgwr_, &cpg.localization_}); - ++tick_stats.major_delay_group_routes_with_alternatives_; - ++removed_group_route_count; - } - } + auto& ar = affected_routes.emplace_back(affected_route_info{ + .pgwrap_ = pgwrap, + .destination_station_id_ = get_destination_station_id( + sched, event->group_route()->route()->journey()), + .loc_now_ = from_fbs(sched, event->localization_type(), + event->localization())}); - if (group_routes_to_remove.size() >= REMOVE_GROUPS_BATCH_SIZE) { - send_remove_group_routes(sched, uv, group_routes_to_remove, - broken_transfer_infos, tick_stats, - reroute_reason_t::MAJOR_DELAY_EXPECTED); - } - } - } - send_remove_group_routes(sched, uv, group_routes_to_remove, - broken_transfer_infos, tick_stats, - reroute_reason_t::MAJOR_DELAY_EXPECTED); - LOG(info) << "delayed group routes: " << delayed_group_routes - << ", removed group routes: " << removed_group_route_count - << " (tick total: " << tick_stats.removed_group_routes_ << ")"; + ar.alts_now_ = + alts_set.add_request(ar.loc_now_, ar.destination_station_id_); } - MOTIS_START_TIMING(passenger_behavior); - manual_timer sim_timer{"passenger behavior simulation"}; - auto pb = behavior::default_behavior{mod.deterministic_mode_}; - auto const sim_result = simulate_behavior(sched, uv, combined_groups, pb.pb_, - mod.probability_threshold_); - sim_timer.stop_and_print(); - MOTIS_STOP_TIMING(passenger_behavior); - tick_stats.t_passenger_behavior_ = MOTIS_TIMING_MS(passenger_behavior); - - LOG(info) << "forecast: " << sim_result.additional_groups_.size() - << " edges affected"; - LOG(info) << fmt::format( - "simulation average statistics: alternatives found: {:.2f}, alternatives " - "picked: {:.2f}, P(best): {:.2f}%, P(2nd best): {:.2f}% ({} group " - "routes, {} " - "combined)", - sim_result.stats_.found_alt_count_avg_, - sim_result.stats_.picked_alt_count_avg_, - sim_result.stats_.best_alt_prob_avg_ * 100, - sim_result.stats_.second_alt_prob_avg_ * 100, - sim_result.stats_.group_route_count_, - sim_result.stats_.combined_group_count_); - - if (mod.behavior_stats_file_.is_open() && uv.id_ == 0) { - fmt::print(mod.behavior_stats_file_, - "{},{},{},{:.4f},{:.4f},{:.2f},{:.2f}\n", - static_cast(sched.system_time_), - sim_result.stats_.group_route_count_, - sim_result.stats_.combined_group_count_, - sim_result.stats_.found_alt_count_avg_, - sim_result.stats_.picked_alt_count_avg_, - sim_result.stats_.best_alt_prob_avg_ * 100, - sim_result.stats_.second_alt_prob_avg_ * 100); + if (alts_set.requests_.empty()) { + return; } - if (mod.calc_load_forecast_) { - MOTIS_START_TIMING(total_load_forecast); - - MOTIS_START_TIMING(calc_load_forecast); - manual_timer load_forecast_timer{"load forecast"}; - auto const lfc = calc_load_forecast(sched, uv, sim_result); - load_forecast_timer.stop_and_print(); - MOTIS_STOP_TIMING(calc_load_forecast); - tick_stats.t_calc_load_forecast_ = MOTIS_TIMING_MS(calc_load_forecast); - - MOTIS_START_TIMING(load_forecast_fbs); - manual_timer load_forecast_msg_timer{"load forecast make msg"}; - auto const forecast_msg = - make_forecast_update_msg(sched, uv, sim_result, lfc); - load_forecast_msg_timer.stop_and_print(); - MOTIS_STOP_TIMING(load_forecast_fbs); - tick_stats.t_load_forecast_fbs_ = MOTIS_TIMING_MS(load_forecast_fbs); - - MOTIS_START_TIMING(write_load_forecast); - if (mod.forecast_file_.is_open() && uv.id_ == 0) { - scoped_timer const load_forecast_msg_timer{"load forecast to json"}; - mod.forecast_file_ << forecast_msg->to_json(json_format::SINGLE_LINE) - << std::endl; - } - MOTIS_STOP_TIMING(write_load_forecast); - tick_stats.t_write_load_forecast_ = MOTIS_TIMING_MS(write_load_forecast); + find_alternatives_set(mod, uv, sched, tick_stats, alts_set); + run_simulation(mod, tick_stats, alts_set); + update_groups( + uv, sched, affected_routes, alts_set, tick_stats, + simulation_options{.probability_threshold_ = mod.probability_threshold_, + .uninformed_pax_ = stay_prob}, + reroute_reason_t::MAJOR_DELAY_EXPECTED); +} + +void handle_unbroken_transfers(paxforecast& mod, universe& uv, + schedule const& sched, + PaxMonUpdate const* mon_update) { + auto unbroken_transfers = std::vector{}; - MOTIS_START_TIMING(publish_load_forecast); - if (mod.publish_load_forecast_) { - ctx::await_all(motis_publish(forecast_msg)); + for (auto const& event : *mon_update->events()) { + if (event->type() != PaxMonEventType_NO_PROBLEM) { + continue; } - MOTIS_STOP_TIMING(publish_load_forecast); - tick_stats.t_publish_load_forecast_ = - MOTIS_TIMING_MS(publish_load_forecast); - MOTIS_STOP_TIMING(total_load_forecast); - tick_stats.t_total_load_forecast_ = MOTIS_TIMING_MS(total_load_forecast); - } + auto const pgwr = passenger_group_with_route{ + static_cast(event->group_route()->group_id()), + static_cast( + event->group_route()->route()->index())}; + auto pgwrap = passenger_group_with_route_and_probability{ + pgwr, event->group_route()->route()->probability(), + event->group_route()->passenger_count()}; + // probability may have changed because of broken transfers in other + // routes of the same group + pgwrap.probability_ = uv.passenger_groups_.route(pgwr).probability_; + auto const localization = + from_fbs(sched, event->localization_type(), event->localization()); - MOTIS_START_TIMING(update_tracked_groups); - scoped_timer const update_tracked_groups_timer{"update tracked groups"}; - update_tracked_groups(sched, uv, sim_result, pgwr_event_types, - broken_transfer_infos, pgwr_localizations, tick_stats, - reroute_reason_t::REVERT_FORECAST); - MOTIS_STOP_TIMING(update_tracked_groups); - tick_stats.t_update_tracked_groups_ = MOTIS_TIMING_MS(update_tracked_groups); + log_destination_reachable(uv, sched, pgwrap, localization); + unbroken_transfers.push_back(pgwr); + } if (!unbroken_transfers.empty() && mod.revert_forecasts_) { - revert_forecasts(uv, sched, sim_result, unbroken_transfers, - pgwr_localizations); + revert_forecasts(uv, sched, unbroken_transfers); } +} + +void on_monitoring_update(paxforecast& mod, paxmon_data& data, + msg_ptr const& msg) { + auto const mon_update = motis_content(PaxMonUpdate, msg); + MOTIS_START_TIMING(total); + auto const uv_access = get_universe_and_schedule(data, mon_update->universe(), + ctx::access_t::WRITE); + auto const& sched = uv_access.sched_; + auto& uv = uv_access.uv_; + + auto tick_stats = tick_statistics{ + .system_time_ = static_cast(sched.system_time_), + .monitoring_events_ = mon_update->events()->size()}; + + auto const current_time = + unix_to_motistime(sched.schedule_begin_, sched.system_time_); + utl::verify(current_time != INVALID_TIME, + "paxforecast::on_monitoring_event: invalid current system time: " + "system_time={}, schedule_begin={}", + sched.system_time_, sched.schedule_begin_); + + handle_broken_transfers(mod, uv, sched, tick_stats, mon_update); + handle_major_delays(mod, uv, sched, tick_stats, mon_update); + handle_unbroken_transfers(mod, uv, sched, mon_update); MOTIS_STOP_TIMING(total); tick_stats.t_total_ = MOTIS_TIMING_MS(total); diff --git a/modules/paxforecast/src/paxforecast.cc b/modules/paxforecast/src/paxforecast.cc index 90f01eb24..677bd6c78 100644 --- a/modules/paxforecast/src/paxforecast.cc +++ b/modules/paxforecast/src/paxforecast.cc @@ -26,10 +26,6 @@ paxforecast::paxforecast() : module("Passenger Forecast", "paxforecast") { "output file for behavior statistics"); param(routing_cache_filename_, "routing_cache", "optional cache file for routing queries"); - param(calc_load_forecast_, "calc_load_forecast", - "calculate load forecast (required for output/publish)"); - param(publish_load_forecast_, "publish_load_forecast", - "publish load forecast"); param(stats_file_, "stats", "statistics file"); param(deterministic_mode_, "deterministic_mode", "all passengers always pick the best alternative"); @@ -41,6 +37,11 @@ paxforecast::paxforecast() : module("Passenger Forecast", "paxforecast") { param(probability_threshold_, "probability_threshold", "minimum allowed route probability (routes with lower probability are " "dropped)"); + param(uninformed_pax_, "uninformed_pax", + "percentage of passengers that ignore forecasts and announcements"); + param(major_delay_switch_, "major_delay_switch", + "percentage of passengers that may switch to alternatives in case of " + "an expected major delay"); param( allow_start_metas_, "allow_start_metas", "allow using equivalent stations as start station in alternative routes"); @@ -63,9 +64,10 @@ void paxforecast::init(motis::module::registry& reg) { behavior_stats_file_.exceptions(std::ios_base::failbit | std::ios_base::badbit); behavior_stats_file_.open(behavior_stats_filename_); - behavior_stats_file_ << "system_time,group_route_count,cpg_count," - << "found_alt_count_avg,picked_alt_count_avg," - << "best_alt_prob_avg,second_alt_prob_avg\n"; + behavior_stats_file_ + << "system_time,event_type,group_route_count,cpg_count," + << "found_alt_count_avg,picked_alt_count_avg," + << "best_alt_prob_avg,second_alt_prob_avg\n"; } if (!routing_cache_filename_.empty()) { diff --git a/modules/paxforecast/src/revert_forecast.cc b/modules/paxforecast/src/revert_forecast.cc index 4789992ad..9f0d288b6 100644 --- a/modules/paxforecast/src/revert_forecast.cc +++ b/modules/paxforecast/src/revert_forecast.cc @@ -1,16 +1,27 @@ #include "motis/paxforecast/revert_forecast.h" #include -#include #include +#include +#include +#include +#include -#include "utl/enumerate.h" +#include "cista/reflection/comparable.h" + +#include "utl/to_set.h" #include "utl/to_vec.h" +#include "utl/verify.h" + +#include "motis/core/common/dynamic_fws_multimap.h" +#include "motis/core/common/fws_graph.h" #include "motis/module/context/motis_call.h" #include "motis/module/message.h" +#include "motis/paxmon/localization.h" #include "motis/paxmon/messages.h" +#include "motis/paxmon/reachability.h" #include "motis/paxmon/temp_passenger_group.h" using namespace motis::paxmon; @@ -21,131 +32,297 @@ namespace motis::paxforecast { namespace { -struct print_log_route_info { - friend std::ostream& operator<<(std::ostream& out, - print_log_route_info const& p) { - auto const& ri = p.ri_; - out << "{r=" << ri.route_ << ", p=" << ri.previous_probability_ << "->" - << ri.new_probability_ << "}"; - return out; - } - reroute_log_route_info const& ri_; +struct reroute_node { + CISTA_COMPARABLE() + local_group_route_index route_{}; + float probability_{}; }; -struct print_log_entry { - friend std::ostream& operator<<(std::ostream& out, print_log_entry const& p) { - auto const& e = p.entry_; - out << "{old_route=" << print_log_route_info{e.old_route_} - << ", reason=" << e.reason_; - auto const new_routes = p.pgc_.log_entry_new_routes_.at(e.index_); - out << ", new_routes=["; - for (auto const& nr : new_routes) { - out << " " << print_log_route_info{nr}; - } - out << " ]"; - if (e.broken_transfer_) { - auto const& t = e.broken_transfer_.value(); - out << ", broken_transfer={" - << "leg=" << t.leg_index_ << "}"; - } - out << "}"; - return out; - } - - reroute_log_entry const& entry_; - passenger_group_container const& pgc_; +struct reroute_edge { + CISTA_COMPARABLE() + std::uint32_t from_{}; + std::uint32_t to_{}; }; } // namespace +// TODO(pablo): simple check for now, can maybe be improved later +bool can_switch(passenger_localization const& from, + passenger_localization const& to) { + return from.at_station_ == to.at_station_ && from.in_trip_ == to.in_trip_; +} + +bool can_switch(reroute_log_localization const& from, + reroute_log_localization const& to) { + return from.station_id_ == to.station_id_ && from.in_trip_ == to.in_trip_ && + from.trip_idx_ == to.trip_idx_; +} + +fws_graph build_reroute_graph( + passenger_group_container const& pgc, passenger_group_index const pgi) { + auto graph = fws_graph{}; + auto leaves = dynamic_fws_multimap{}; + auto reverts = std::vector{}; + auto const routes = pgc.routes(pgi); + + graph.emplace_back_node(static_cast(0), 1.F); + leaves[0].emplace_back(0U); + + auto const get_parent = + [&](std::uint32_t const node_idx) -> std::optional { + for (auto const& e : graph.incoming_edges(node_idx)) { + return e.from_; + } + return {}; + }; + + auto const process_reverts = [&]() { + if (reverts.empty()) { + return; + } + + auto reactivated_routes = utl::to_set( + reverts, [](auto const& le) { return le->old_route_.route_; }); + auto reverted_routes = std::set{}; + auto localizations = + std::vector(routes.size()); + + for (auto const* le : reverts) { + localizations[le->old_route_.route_] = &le->old_route_.localization_; + for (auto const& nr : pgc.log_entry_new_routes_.at(le->index_)) { + localizations[nr.route_] = &nr.localization_; + if (nr.previous_probability_ > nr.new_probability_) { + reverted_routes.insert(nr.route_); + } + } + } + + for (auto const reverted_route : reverted_routes) { + auto const leaf_localization = localizations.at(reverted_route); + utl::verify(leaf_localization != nullptr, + "revert_forecast: leaf localization not found"); + + auto const old_leaves = utl::to_vec(leaves[reverted_route], + [&](auto const idx) { return idx; }); + leaves[reverted_route].clear(); + + for (auto const old_leaf : old_leaves) { + auto candidate = std::optional{}; // node index + + for (auto node_idx = get_parent(old_leaf); node_idx; + node_idx = get_parent(*node_idx)) { + auto const& node = graph.nodes_.at(*node_idx); + if (!reactivated_routes.contains(node.route_)) { + continue; + } + auto const candidate_localization = localizations.at(node.route_); + utl::verify(candidate_localization != nullptr, + "revert_forecast: candidate localization not found"); + // leaf_localization != nullptr is checked in verify above + // NOLINTNEXTLINE(clang-analyzer-core.NonNullParamChecker) + if (can_switch(*leaf_localization, *candidate_localization)) { + candidate = *node_idx; + } + } + + if (candidate) { + auto const candidate_node = graph.nodes_.at(*candidate); + auto const new_node_idx = graph.nodes_.size(); + auto const old_leaf_prob = graph.nodes_.at(old_leaf).probability_; + graph.emplace_back_node(candidate_node.route_, old_leaf_prob); + graph.push_back_edge(reroute_edge{old_leaf, new_node_idx}); + leaves[candidate_node.route_].emplace_back(new_node_idx); + } else { + leaves[reverted_route].emplace_back(old_leaf); + } + } + } + + reverts.clear(); + }; + + for (auto const& le : pgc.reroute_log_entries(pgi)) { + if (le.reason_ == reroute_reason_t::REVERT_FORECAST) { + if (!reverts.empty() && + reverts.front()->update_number_ != le.update_number_) { + process_reverts(); + } + reverts.emplace_back(&le); + } else { + process_reverts(); + auto const prev_nodes = utl::to_vec(leaves[le.old_route_.route_], + [&](auto const idx) { return idx; }); + leaves[le.old_route_.route_].clear(); + auto const new_routes = pgc.log_entry_new_routes_.at(le.index_); + + auto const total_outgoing_prob = + std::accumulate(begin(new_routes), end(new_routes), 0.F, + [&](auto const sum, auto const& new_route) { + if (new_route.route_ == le.old_route_.route_) { + return sum + new_route.new_probability_; + } else { + return sum + (new_route.new_probability_ - + new_route.previous_probability_); + } + }); + + for (auto const prev_node_idx : prev_nodes) { + auto const parent_prob = graph.nodes_[prev_node_idx].probability_; + for (auto const& new_route : new_routes) { + auto const prob_change = new_route.route_ == le.old_route_.route_ + ? new_route.new_probability_ + : new_route.new_probability_ - + new_route.previous_probability_; + auto const pick_prob = prob_change / total_outgoing_prob; + auto const abs_prob = pick_prob * parent_prob; + auto const new_node_idx = graph.nodes_.size(); + graph.emplace_back_node(new_route.route_, abs_prob); + graph.push_back_edge(reroute_edge{prev_node_idx, new_node_idx}); + leaves[new_route.route_].emplace_back(new_node_idx); + } + } + } + } + + process_reverts(); + + return graph; +} + void revert_forecast(universe& uv, schedule const& sched, FlatBufferBuilder& fbb, std::vector>& reroutes, - passenger_group_with_route const& pgwr, - passenger_localization const* loc) { + passenger_group_index const pgi, + std::vector& unbroken_routes) { auto const& pgc = uv.passenger_groups_; - auto const log_entries = pgc.reroute_log_entries(pgwr.pg_); - if (log_entries.empty()) { + + auto const graph = build_reroute_graph(pgc, pgi); + if (graph.nodes_.size() <= 1) { return; } - std::cout << "revert_forecast: pg=" << pgwr.pg_ << ", route=" << pgwr.route_ - << ", log_entries=" << log_entries.size() << "\n"; + // for localization + auto const current_time = + unix_to_motistime(sched.schedule_begin_, sched.system_time_); + + auto const routes = pgc.routes(pgi); + utl::verify(routes.size() == unbroken_routes.size(), + "revert_forecast: invalid unbroken_routes size"); - for (auto const& entry : log_entries) { - std::cout << " " << print_log_entry{entry, pgc} << "\n"; + // remove routes that already have probability > 0 + // TODO(pablo): support reverting major delay reroutes + auto unbroken_count = 0U; + for (auto i = 0U; i < routes.size(); ++i) { + if (unbroken_routes[i]) { + if (routes[i].probability_ != 0.F) { + unbroken_routes[i] = false; + } else { + ++unbroken_count; + } + } + } + if (unbroken_count == 0) { + return; } - auto const routes = pgc.routes(pgwr.pg_); - auto orig_probs = utl::to_vec( - routes, [](group_route const& gr) { return gr.probability_; }); + auto const localizations = utl::to_vec(routes, [&](auto const& route) { + if (route.broken_) { + return passenger_localization{}; + } else { + auto const reachability = + get_reachability(uv, pgc.journey(route.compact_journey_index_)); + return localize(sched, reachability, current_time); + } + }); - auto const print_probs = [&]() { - std::cout << " --> route probs=["; - for (auto const& p : orig_probs) { - std::cout << " " << p; + constexpr auto const kNoParent = std::numeric_limits::max(); + auto parents = std::vector(graph.nodes_.size(), kNoParent); + auto stack = std::vector{0U}; + auto leaves = std::vector{}; + + while (!stack.empty()) { + auto const parent_idx = stack.back(); + stack.pop_back(); + auto const outgoing_edges = graph.outgoing_edges(parent_idx); + for (auto const& e : outgoing_edges) { + parents[e.to_] = parent_idx; + stack.emplace_back(e.to_); } - std::cout << " ]\n"; - }; + if (outgoing_edges.empty()) { + leaves.emplace_back(parent_idx); + } + } - print_probs(); - for (auto idx_to_revert = log_entries.size() - 1;; --idx_to_revert) { - auto const& entry = log_entries.at(idx_to_revert); - std::cout << " log entry " << idx_to_revert << ": " - << print_log_entry{entry, pgc} << "\n"; - orig_probs[entry.old_route_.route_] = - entry.old_route_.previous_probability_; - for (auto const& nr : pgc.log_entry_new_routes_.at(entry.index_)) { - orig_probs[nr.route_] = nr.previous_probability_; - } - print_probs(); - if (entry.old_route_.route_ == pgwr.route_) { - if (entry.reason_ == reroute_reason_t::REVERT_FORECAST || - entry.reason_ == reroute_reason_t::MANUAL) { - std::cout << " abort: because of " << print_log_entry{entry, pgc} - << std::endl; - return; - } else { - break; + auto prob_changes = std::vector(routes.size() * routes.size()); + + for (auto const leaf_idx : leaves) { + auto const& leaf = graph.nodes_[leaf_idx]; + auto const& leaf_loc = localizations.at(leaf.route_); + auto candidate = std::optional{}; + + for (auto node_idx = parents[leaf_idx]; node_idx != kNoParent; + node_idx = parents[node_idx]) { + auto const& node = graph.nodes_[node_idx]; + if (!unbroken_routes[node.route_]) { + continue; } + + auto const& node_loc = localizations.at(node.route_); + if (!can_switch(leaf_loc, node_loc)) { + continue; + } + + candidate = node.route_; } - if (idx_to_revert == 0) { - std::cout << " abort: no matching reroute entry found" << std::endl; - return; + + if (candidate) { + // candidate found -> move probability from leaf to candidate + auto const c = *candidate; + auto const offset = c * routes.size(); + prob_changes[offset + leaf.route_] -= leaf.probability_; + prob_changes[offset + c] += leaf.probability_; } } + auto new_routes = std::vector>{}; - for (auto const [i, p] : utl::enumerate(orig_probs)) { - new_routes.emplace_back( - to_fbs(sched, fbb, - temp_group_route{ - static_cast(i), p, - compact_journey{}, INVALID_TIME, - 0 /* estimated delay - updated by reroute groups api */, - route_source_flags::NONE, false /* planned */ - })); - } - auto fbs_localization = - std::vector>{}; - if (loc != nullptr) { - fbs_localization.emplace_back( - to_fbs_localization_wrapper(sched, fbb, *loc)); + for (auto reactivated_route_idx = static_cast(0); + reactivated_route_idx < + static_cast(routes.size()); + ++reactivated_route_idx) { + auto const offset = reactivated_route_idx * routes.size(); + + for (auto route_idx = static_cast(0); + route_idx < static_cast(routes.size()); + ++route_idx) { + auto const p_change = prob_changes[offset + route_idx]; + if (p_change == 0.F) { + continue; + } + new_routes.emplace_back( + to_fbs(sched, fbb, + temp_group_route{ + route_idx, p_change, compact_journey{}, INVALID_TIME, + 0 /* estimated delay - updated by reroute groups api */, + route_source_flags::NONE, false /* planned */ + })); + } + + if (!new_routes.empty()) { + reroutes.emplace_back(CreatePaxMonRerouteGroup( + fbb, pgi, reactivated_route_idx, fbb.CreateVector(new_routes), + paxmon::PaxMonRerouteReason_RevertForecast, + broken_transfer_info_to_fbs(fbb, sched, std::nullopt), false, + fbb.CreateVector(std::vector>{ + to_fbs_localization_wrapper( + sched, fbb, localizations.at(reactivated_route_idx))}))); + } + new_routes.clear(); } - reroutes.emplace_back(CreatePaxMonRerouteGroup( - fbb, pgwr.pg_, pgwr.route_, fbb.CreateVector(new_routes), - paxmon::PaxMonRerouteReason_RevertForecast, - broken_transfer_info_to_fbs(fbb, sched, std::nullopt), true, - fbb.CreateVector(fbs_localization))); - std::cout << std::endl; } -void revert_forecasts( - universe& uv, schedule const& sched, simulation_result const& sim_result, - std::vector const& pgwrs, - mcd::hash_map const& pgwr_localizations) { +void revert_forecasts(universe& uv, schedule const& sched, + std::vector const& pgwrs) { auto const constexpr BATCH_SIZE = 5'000; - // TODO(pablo): refactoring (update_tracked_groups) message_creator mc; auto reroutes = std::vector>{}; @@ -164,41 +341,32 @@ void revert_forecasts( mc.Clear(); }; - auto last_group = std::numeric_limits::max(); - for (auto const& pgwr : pgwrs) { - - (void)sim_result; - // TODO(pablo): won't work if current p=0 - /* - if (auto it = sim_result.group_route_results_.find(pgwr); - it != end(sim_result.group_route_results_)) { - std::cout << "revert_forecast: group route " << pgwr.pg_ << "#" - << pgwr.route_ << " has " - << it->second.alternative_probabilities_.size() - << " alternatives" << std::endl; - } - */ + auto const pgc = uv.passenger_groups_; + auto current_pgi = std::numeric_limits::max(); + auto unbroken_routes = std::vector{}; - if (pgwr.pg_ == last_group) { - // TODO(pablo): for now, always revert the earliest route - std::cout << "revert_forecast: skipping multiple reverts per group: " - << pgwr.pg_ << std::endl; - continue; - } - last_group = pgwr.pg_; - - passenger_localization const* loc = nullptr; - if (auto const it = pgwr_localizations.find(pgwr); - it != end(pgwr_localizations)) { - loc = it->second; + auto const handle_group = [&]() { + if (current_pgi == std::numeric_limits::max()) { + return; } - - revert_forecast(uv, sched, mc, reroutes, pgwr, loc); + revert_forecast(uv, sched, mc, reroutes, current_pgi, unbroken_routes); if (reroutes.size() >= BATCH_SIZE) { send_reroutes(); } + }; + + for (auto const& pgwr : pgwrs) { + if (pgwr.pg_ != current_pgi) { + handle_group(); + current_pgi = pgwr.pg_; + unbroken_routes.clear(); + unbroken_routes.resize(pgc.routes(pgwr.pg_).size()); + } + unbroken_routes[pgwr.route_] = true; } + handle_group(); + send_reroutes(); } diff --git a/modules/paxforecast/src/simulate_behavior.cc b/modules/paxforecast/src/simulate_behavior.cc new file mode 100644 index 000000000..d5a750410 --- /dev/null +++ b/modules/paxforecast/src/simulate_behavior.cc @@ -0,0 +1,219 @@ +#include "motis/paxforecast/simulate_behavior.h" + +#include "utl/zip.h" + +#include "motis/core/debug/trip.h" + +#include "motis/paxmon/compact_journey_util.h" +#include "motis/paxmon/messages.h" +#include "motis/paxmon/reachability.h" +#include "motis/paxmon/temp_passenger_group.h" + +#include "motis/paxforecast/behavior/util.h" + +using namespace motis::paxmon; +using namespace flatbuffers; + +namespace motis::paxforecast { + +void merge_journeys_failed(schedule const& sched, universe& uv, + affected_route_info const& ar, + passenger_localization const& loc, + alternative const& alt, + fws_compact_journey const& old_journey, + compact_journey const& journey_prefix) { + std::cout << "simulate_behavior_for_route: merge_journeys failed for group " + << ar.pgwrap_.pgwr_.pg_ << "." << ar.pgwrap_.pgwr_.route_ << "\n"; + std::cout << "\noriginal planned journey:\n"; + print_compact_journey(sched, old_journey); + + auto const print_localization = [&](passenger_localization const& loc) { + std::cout << "localization: in_trip=" << loc.in_trip() + << ", first_station=" << loc.first_station_ + << ", station=" << loc.at_station_->name_.str() + << ", schedule_arrival_time=" + << format_time(loc.schedule_arrival_time_) + << ", current_arrival_time=" + << format_time(loc.current_arrival_time_) << "\n"; + if (loc.in_trip()) { + std::cout << "in trip:\n"; + print_trip(sched, loc.in_trip_); + } + }; + + print_localization(loc); + + std::cout << "\ntrying to merge journeys:\nprefix:\n"; + print_compact_journey(sched, journey_prefix); + std::cout << "\nsuffix:\n"; + print_compact_journey(sched, alt.compact_journey_); + + std::cout << "\ntrips:" << std::endl; + for (auto const& leg : old_journey.legs()) { + std::cout << motis::debug::trip_with_sections{sched, + get_trip(sched, + leg.trip_idx_)} + << std::endl; + } + + auto const current_time = + unix_to_motistime(sched.schedule_begin_, sched.system_time_); + auto const search_time = + static_cast