diff --git a/Cargo.lock b/Cargo.lock index 0f184a3..b41ee2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2228,6 +2228,7 @@ dependencies = [ "chashmap 2.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "darwin-types 0.2.0", "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "flate2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/tspl-nrod/Cargo.toml b/tspl-nrod/Cargo.toml index 52638d6..78005d6 100644 --- a/tspl-nrod/Cargo.toml +++ b/tspl-nrod/Cargo.toml @@ -23,6 +23,9 @@ tokio-core = "0.1" [dependencies.ntrod-types] path = "../ntrod-types" +[dependencies.darwin-types] +path = "../darwin-types" + [dependencies.tspl-sqlite] path = "../tspl-sqlite" diff --git a/tspl-nrod/src/config.rs b/tspl-nrod/src/config.rs index 4d40c74..792a6fe 100644 --- a/tspl-nrod/src/config.rs +++ b/tspl-nrod/src/config.rs @@ -8,18 +8,24 @@ use tspl_util::{ConfigExt, crate_name}; pub struct Config { /// URL of a running tspl-zugfuhrer instance. pub service_zugfuhrer: String, - /// NROD username. + /// NROD/Darwin username. pub username: String, - /// NROD password. + /// NROD/Darwin password. pub password: String, /// Number of worker threads to use. pub n_threads: u32, - /// NROD STOMP hostname. + /// NROD/Darwin STOMP hostname. #[serde(default)] pub stomp_host: Option, - /// NROD STOMP port. + /// NROD/Darwin STOMP port. #[serde(default)] - pub stomp_port: Option + pub stomp_port: Option, + /// Connect to Darwin instead of NROD. + #[serde(default)] + pub use_darwin: bool, + /// Darwin queue for updates. + #[serde(default)] + pub darwin_queue_updates: Option } impl ConfigExt for Config { diff --git a/tspl-nrod/src/conn.rs b/tspl-nrod/src/conn.rs index 9de9a75..0e9c23a 100644 --- a/tspl-nrod/src/conn.rs +++ b/tspl-nrod/src/conn.rs @@ -1,7 +1,8 @@ -//! Actually connecting to NROD, and reconnecting on failure. +//! Actually connecting to a STOMP message queue, and reconnecting on failure. use stomp::session::{SessionEvent, Session}; use stomp::header::Header; +use stomp::frame::Frame; use stomp::session_builder::SessionBuilder; use stomp::subscription::AckOrNack; use stomp::connection::*; @@ -9,38 +10,132 @@ use tokio_core::reactor::{Timeout, Handle}; use crossbeam_channel::Sender; use failure::Error; use futures::{Poll, Async, Future, Stream}; +use flate2::bufread::GzDecoder; +use std::io::prelude::*; use log::*; use std::time::Duration; -use crate::worker::WorkerMessage; -use crate::config::Config; +use crate::nrod::NrodMessage; +use crate::darwin::DarwinMessage; use crate::errors::Result; -pub struct NrodProcessor { +pub trait StompType { + fn on_connect(&mut self, sess: &mut Session) -> Result<()>; + fn on_message(&mut self, dest: &str, frame: &Frame) -> Result<()>; +} + +pub struct Nrod { + tx: Sender +} +impl StompType for Nrod { + fn on_connect(&mut self, sess: &mut Session) -> Result<()> { + info!("Connected to NTROD; subscribing..."); + sess.subscription("/topic/TRAIN_MVT_ALL_TOC") + .with(Header::new("activemq.subscriptionName", "tspl-nrod")) + .start(); + Ok(()) + } + fn on_message(&mut self, dest: &str, frame: &Frame) -> Result<()> { + debug!("Got a NTROD message addressed to {}", dest); + if dest == "/topic/TRAIN_MVT_ALL_TOC" { + let st = String::from_utf8_lossy(&frame.body); + self.tx.send(NrodMessage::Movement(st.into())); + } + Ok(()) + } +} +pub struct Darwin { + tx: Sender, + queue_updates: Option +} +impl Darwin { + fn updates_name(&self) -> &str { + self.queue_updates + .as_ref().map(|x| x as &str) + .unwrap_or("darwin.pushport-v16") + } +} +impl StompType for Darwin { + fn on_connect(&mut self, sess: &mut Session) -> Result<()> { + info!("Connected to Darwin; subscribing.."); + let name = self.updates_name(); + sess.subscription(name) + .with(Header::new("activemq.subscriptionName", "tspl-darwin")) + .start(); + Ok(()) + } + fn on_message(&mut self, dest: &str, frame: &Frame) -> Result<()> { + debug!("Got a Darwin message addressed to {}", dest); + if dest == self.updates_name() { + let mut gz = GzDecoder::new(&frame.body as &[u8]); + let mut st = String::new(); + if let Err(e) = gz.read_to_string(&mut st) { + error!("Failed to deflate frame: {}", e); + return Ok(()); + } + self.tx.send(DarwinMessage::Pport(st)); + } + Ok(()) + } +} +pub struct NrodConfig<'a> { + pub username: &'a str, + pub password: &'a str, + pub stomp_host: Option<&'a str>, + pub stomp_port: Option +} +pub struct DarwinConfig<'a> { + pub username: &'a str, + pub password: &'a str, + pub stomp_host: &'a str, + pub stomp_port: Option, + pub queue_updates: Option<&'a str> +} +pub struct StompProcessor { sess: Session, hdl: Handle, timeout: Option, timeout_ms: u64, - tx: Sender, + inner: T } -impl NrodProcessor { - pub fn new(conf: &Config, tx: Sender, hdl: Handle) -> Result { +impl StompProcessor { + pub fn new_nrod(conf: &NrodConfig, tx: Sender, hdl: Handle) -> Result> { let stomp_host = conf.stomp_host - .as_ref().map(|x| x as &str) .unwrap_or("datafeeds.networkrail.co.uk"); let sess = SessionBuilder::new(stomp_host, conf.stomp_port.unwrap_or(61618)) - .with(Credentials(&conf.username, &conf.password)) + .with(Credentials(conf.username, conf.password)) + .with(Header::new("client-id", "eta@theta.eu.org")) + .with(HeartBeat(5_000, 2_000)) + .start(hdl.clone())?; + let inner = Nrod { tx }; + Ok(StompProcessor { + sess, hdl, inner, + timeout: None, + timeout_ms: 1000 + }) + } +} +impl StompProcessor { + pub fn new_darwin(conf: &DarwinConfig, tx: Sender, hdl: Handle) -> Result> { + let stomp_host = conf.stomp_host; + let sess = SessionBuilder::new(stomp_host, conf.stomp_port.unwrap_or(61613)) + .with(Credentials(conf.username, conf.password)) .with(Header::new("client-id", "eta@theta.eu.org")) .with(HeartBeat(5_000, 2_000)) .start(hdl.clone())?; - Ok(Self { - sess, hdl, tx, + let inner = Darwin { + tx, + queue_updates: conf.queue_updates + .map(|x| x.into()) + }; + Ok(StompProcessor { + sess, hdl, inner, timeout: None, timeout_ms: 1000 }) } } -impl Future for NrodProcessor { +impl Future for StompProcessor where T: StompType { type Item = (); type Error = Error; @@ -62,10 +157,7 @@ impl Future for NrodProcessor { let ev = ev.unwrap(); match ev { Connected => { - info!("Connected to NTROD; subscribing..."); - self.sess.subscription("/topic/TRAIN_MVT_ALL_TOC") - .with(Header::new("activemq.subscriptionName", "tspl-nrod")) - .start(); + self.inner.on_connect(&mut self.sess)?; }, ErrorFrame(fr) => { error!("Error frame, reconnecting: {:?}", fr); @@ -74,11 +166,7 @@ impl Future for NrodProcessor { Message { destination, frame, .. } => { self.timeout = None; self.timeout_ms = 1000; - debug!("Got a NTROD message addressed to {}", destination); - if destination == "/topic/TRAIN_MVT_ALL_TOC" { - let st = String::from_utf8_lossy(&frame.body); - self.tx.send(WorkerMessage::Movement(st.into())); - } + self.inner.on_message(&destination, &frame)?; self.sess.acknowledge_frame(&frame, AckOrNack::Ack); }, Disconnected(reason) => { diff --git a/tspl-nrod/src/darwin.rs b/tspl-nrod/src/darwin.rs new file mode 100644 index 0000000..39b7676 --- /dev/null +++ b/tspl-nrod/src/darwin.rs @@ -0,0 +1,237 @@ +//! Handling Darwin messages. + +use chashmap::CHashMap; +use crossbeam_channel::Receiver; +use std::sync::Arc; +use tspl_sqlite::uuid::Uuid; +use tspl_util::rpc::{RpcError, MicroserviceRpc}; +use tspl_util::user_agent; +use tspl_zugfuhrer::types::{Train, TrainMvt}; +use darwin_types::pport::{Pport, PportElement}; +use darwin_types::forecasts::{Ts, PlatformData, TsTimeData}; +use reqwest::header::HeaderMap; +use reqwest::Method; +use failure::format_err; +use chrono::prelude::*; +use log::*; + +use crate::errors::*; + +pub type DarwinRidStore = Arc>; + +pub enum DarwinMessage { + Pport(String) +} + +// Scheduled day offset. +pub struct DarwinWorker { + rx: Receiver, + /// A map of Darwin RIDs to `tspl-zugfuhrer` UUIDs. + rid_to_tspl: DarwinRidStore, + /// RPC for `tspl-zugfuhrer`. + zrpc: MicroserviceRpc, +} +impl DarwinWorker { + pub fn new(rx: Receiver, ts: DarwinRidStore, base_url: String) -> Self { + let zrpc = MicroserviceRpc::new(user_agent!(), "zugfuhrer", base_url); + Self { rx, rid_to_tspl: ts, zrpc } + } + fn lookup_or_activate_train(&mut self, rid: &str, uid: &str, ssd: NaiveDate) -> Result { + if let Some(ret) = self.rid_to_tspl.get(rid) { + return Ok(*ret); + } + debug!("Querying zugfuhrer for RID {}", rid); + let train: Option = match self.zrpc.req(Method::GET, format!("/trains/by-darwin-rid/{}", rid)) { + Ok(t) => Some(t), + Err(RpcError::RemoteNotFound) => None, + Err(e) => Err(e)? + }; + if let Some(train) = train { + self.rid_to_tspl.insert(rid.to_owned(), train.tspl_id); + Ok(train.tspl_id) + } + else { + debug!("Activating Darwin train; (rid, uid, ssd) = ({}, {}, {})", rid, uid, ssd); + let mut hdrs = HeaderMap::new(); + hdrs.insert("X-tspl-darwin-rid", rid.parse()?); + hdrs.insert("X-tspl-schedule-uid", uid.parse()?); + hdrs.insert("X-tspl-activation-date", ssd.to_string().parse()?); + let train: Train = self.zrpc.req_with_headers(Method::POST, "/trains/activate-fuzzy", hdrs)?; + info!("Activated Darwin RID {} as {}.", rid, train.tspl_id); + self.rid_to_tspl.insert(rid.to_owned(), train.tspl_id); + Ok(train.tspl_id) + } + } + fn process_ts(&mut self, ts: Ts) -> Result<()> { + debug!("Processing TS for RID {}", ts.rid); + let tspl_id = self.lookup_or_activate_train(&ts.rid, &ts.uid, ts.start_date)?; + // Step 1: flatten out ts.locations into something that looks + // more like our TrainMvt schema. + struct TsUpdate { + // The scheduled time of this movement - i.e. this will + // refer to the movement we're looking to update. + time_sched: NaiveTime, + // Scheduled day offset. + day_offset: u8, + tiploc: String, + action: u8, + // Data about what actual / estimated times exist. + tstd: TsTimeData, + platform: Option, + } + let mut day_offset = 0; + let mut last_time = None; + let mut update_day_offset = |time: NaiveTime| { + // Time difference Interpret as + // --------------- ------------ + // Less than -6 hours Crossed midnight + // Between -6 and 0 hours Back in time + // Between 0 and +18 hours Normal increasing time + // Greater than +18 hours Back in time and crossed midnight + match last_time { + Some(last) => { + if time < last { + let dur = last.signed_duration_since(time); + if dur.num_hours() >= 6 { + day_offset += 1; + } + } + else { + let dur = time.signed_duration_since(last); + if dur.num_hours() >= 18 { + day_offset -= 1; + } + } + }, + None => { + last_time = Some(time); + } + } + day_offset + }; + let mut updates = vec![]; + for loc in ts.locations { + if let Some(arr) = loc.arr { + let st = loc.timings.wta + .or(loc.timings.pta) + // Timings can sometimes be missing, for some strange + // reason? + // + // FIXME: less copypasta-y error message? + .ok_or(format_err!("some darwin timings missing"))?; + let day_offset = update_day_offset(st); + updates.push(TsUpdate { + time_sched: st, + tiploc: loc.tiploc.clone(), + tstd: arr, + action: 0, + day_offset, + platform: loc.plat.clone() + }); + } + if let Some(dep) = loc.dep { + let st = loc.timings.wtd + .or(loc.timings.ptd) + .ok_or(format_err!("some darwin timings missing"))?; + let day_offset = update_day_offset(st); + updates.push(TsUpdate { + time_sched: st, + tiploc: loc.tiploc.clone(), + tstd: dep, + action: 1, + day_offset, + platform: loc.plat.clone() + }); + } + if let Some(pass) = loc.pass { + let st = loc.timings.wtp + .ok_or(format_err!("some darwin timings missing"))?; + let day_offset = update_day_offset(st); + updates.push(TsUpdate { + time_sched: st, + tiploc: loc.tiploc.clone(), + tstd: pass, + action: 2, + day_offset, + platform: None + }); + } + } + // Step 2: use the generated updates to fire off some RPC calls. + for upd in updates { + // The time associated with this movement. + // If none of these fields are populated, + // we don't actually have one! + let time = upd.tstd.at + .or(upd.tstd.wet) + .or(upd.tstd.et); + let time = match time { + Some(t) => t, + None => continue + }; + let mut hdrs = HeaderMap::new(); + hdrs.insert("X-tspl-mvt-tiploc", upd.tiploc.parse()?); + hdrs.insert("X-tspl-mvt-planned-time", upd.time_sched.to_string().parse()?); + hdrs.insert("X-tspl-mvt-planned-day-offset", upd.day_offset.to_string().parse()?); + hdrs.insert("X-tspl-mvt-planned-action", upd.action.to_string().parse()?); + if upd.tstd.at_removed { + // The previous actual time has just been removed! + let hdrs = hdrs.clone(); + if let Err(e) = self.zrpc.req_with_headers::<_, ()>(Method::POST, format!("/trains/{}/darwin/at-removed", tspl_id), hdrs) { + warn!("Failed to remove actual time at {} for {} ({}): {}", upd.tiploc, ts.rid, tspl_id, e); + } + } + let actual = upd.tstd.at.is_some(); + let unknown_delay = upd.tstd.delayed; + if let Some(pd) = upd.platform { + hdrs.insert("X-tspl-mvt-platform", pd.platform.parse()?); + hdrs.insert("X-tspl-mvt-platsup", pd.platsup.to_string().parse()?); + } + hdrs.insert("X-tspl-mvt-updated-time", time.to_string().parse()?); + hdrs.insert("X-tspl-mvt-time-actual", actual.to_string().parse()?); + hdrs.insert("X-tspl-mvt-delay-unknown", unknown_delay.to_string().parse()?); + match self.zrpc.req_with_headers::<_, TrainMvt>(Method::POST, format!("/trains/{}/darwin/update", tspl_id), hdrs) { + Ok(_) => { + info!("Processed updated time for {} ({}) of {} at {}.", ts.rid, tspl_id, time, upd.tiploc); + }, + Err(e) => { + warn!("Failed to process update for {} ({}) at {}: {}", ts.rid, tspl_id, upd.tiploc, e); + continue; + } + } + } + Ok(()) + } + pub fn process_pport(&mut self, pp: Pport) { + info!("Processing Darwin push port element, version {}, timestamp {}", pp.version, pp.ts); + match pp.inner { + PportElement::DataResponse(dr) => { + info!("Processing Darwin data response message, origin {:?}, source {:?}, rid {:?}", dr.update_origin, dr.request_source, dr.request_id); + for ts in dr.train_status { + if let Err(e) = self.process_ts(ts) { + warn!("Failed to process TS: {}", e); + } + } + } + _ => {} + } + } + pub fn on_pport(&mut self, st: &str) { + match darwin_types::parse_pport_document(st.as_bytes()) { + Ok(pp) => self.process_pport(pp), + Err(e) => { + warn!("Failed to parse push port document: {}", e); + warn!("Document was: {}", st); + } + } + } + pub fn run(&mut self) { + loop { + let data = self.rx.recv().unwrap(); + match data { + DarwinMessage::Pport(d) => self.on_pport(&d) + } + } + } +} + diff --git a/tspl-nrod/src/main.rs b/tspl-nrod/src/main.rs index d449c6f..204b48e 100644 --- a/tspl-nrod/src/main.rs +++ b/tspl-nrod/src/main.rs @@ -1,15 +1,18 @@ -//! Connects to the NROD TRUST Train Movements system. +//! Connects to various sources of live data, and relays information to tspl-zugfuhrer. pub mod errors; pub mod config; -pub mod worker; +pub mod nrod; +pub mod darwin; pub mod conn; use log::*; +use failure::format_err; use tspl_util::ConfigExt; use self::config::Config; -use crate::conn::NrodProcessor; -use crate::worker::NrodWorker; +use crate::conn::{StompProcessor, DarwinConfig, NrodConfig}; +use crate::nrod::NrodWorker; +use crate::darwin::DarwinWorker; use tokio_core::reactor::Core; use chashmap::CHashMap; use std::sync::Arc; @@ -21,20 +24,55 @@ fn main() -> Result<()> { info!("tspl-nrod, but not yet"); info!("loading config"); let cfg = Config::load()?; - info!("initializing NROD session"); + if cfg.use_darwin && cfg.stomp_host.is_none() { + Err(format_err!("stomp_host must be specified if darwin is used"))? + } + let variant = if cfg.use_darwin { "Darwin" } else { "NROD" }; + info!("initializing {} session", variant); let mut core = Core::new().unwrap(); let hdl = core.handle(); - let (tx, rx) = crossbeam_channel::unbounded(); - let proc = NrodProcessor::new(&cfg, tx, hdl)?; - info!("spawning {} worker thread(s)", cfg.n_threads); - let cmap = Arc::new(CHashMap::new()); - for _ in 0..cfg.n_threads { - let mut worker = NrodWorker::new(rx.clone(), cmap.clone(), cfg.service_zugfuhrer.clone()); - thread::spawn(move || { - worker.run(); - }); + if cfg.use_darwin { + let (tx, rx) = crossbeam_channel::unbounded(); + let dcfg = DarwinConfig { + username: &cfg.username, + password: &cfg.password, + stomp_host: &cfg.stomp_host + .as_ref().unwrap() as &str, + stomp_port: cfg.stomp_port, + queue_updates: cfg.darwin_queue_updates + .as_ref().map(|x| x as &str) + }; + let proc = StompProcessor::new_darwin(&dcfg, tx, hdl)?; + info!("spawning {} worker thread(s)", cfg.n_threads); + let cmap = Arc::new(CHashMap::new()); + for _ in 0..cfg.n_threads { + let mut worker = DarwinWorker::new(rx.clone(), cmap.clone(), cfg.service_zugfuhrer.clone()); + thread::spawn(move || { + worker.run(); + }); + } + info!("tspl-nrod running in Darwin mode!"); + core.run(proc)?; + } + else { + let (tx, rx) = crossbeam_channel::unbounded(); + let ncfg = NrodConfig { + username: &cfg.username, + password: &cfg.password, + stomp_host: cfg.stomp_host.as_ref().map(|x| x as &str), + stomp_port: cfg.stomp_port + }; + let proc = StompProcessor::new_nrod(&ncfg, tx, hdl)?; + info!("spawning {} worker thread(s)", cfg.n_threads); + let cmap = Arc::new(CHashMap::new()); + for _ in 0..cfg.n_threads { + let mut worker = NrodWorker::new(rx.clone(), cmap.clone(), cfg.service_zugfuhrer.clone()); + thread::spawn(move || { + worker.run(); + }); + } + info!("tspl-nrod running!"); + core.run(proc)?; } - info!("tspl-nrod running!"); - core.run(proc)?; Ok(()) } diff --git a/tspl-nrod/src/worker.rs b/tspl-nrod/src/nrod.rs similarity index 95% rename from tspl-nrod/src/worker.rs rename to tspl-nrod/src/nrod.rs index 75cad0f..3aa591c 100644 --- a/tspl-nrod/src/worker.rs +++ b/tspl-nrod/src/nrod.rs @@ -1,4 +1,4 @@ -//! Worker threads, responsible for dealing with a stream of messages. +//! Worker threads for connecting to the NROD TRUST Train Movements system. use chashmap::CHashMap; use crossbeam_channel::Receiver; @@ -19,11 +19,11 @@ use crate::errors::*; pub type TrustTsplStore = Arc>; -pub enum WorkerMessage { +pub enum NrodMessage { Movement(String) } pub struct NrodWorker { - rx: Receiver, + rx: Receiver, /// A map of TRUST IDs to `tspl-zugfuhrer` UUIDs. trust_to_tspl: TrustTsplStore, /// RPC for `tspl-zugfuhrer`. @@ -38,7 +38,7 @@ fn trust_id_extract_day_of_month(tid: &str) -> Result { Ok(as_dom) } impl NrodWorker { - pub fn new(rx: Receiver, ts: TrustTsplStore, base_url: String) -> Self { + pub fn new(rx: Receiver, ts: TrustTsplStore, base_url: String) -> Self { let zrpc = MicroserviceRpc::new(user_agent!(), "zugfuhrer", base_url); Self { rx, trust_to_tspl: ts, zrpc } } @@ -150,7 +150,7 @@ impl NrodWorker { loop { let data = self.rx.recv().unwrap(); match data { - WorkerMessage::Movement(d) => self.on_mvt_message(&d) + NrodMessage::Movement(d) => self.on_mvt_message(&d) } } }