Skip to content

Commit

Permalink
nrod: add untested Darwin capability, make STOMP code more generic
Browse files Browse the repository at this point in the history
- tspl-nrod can now connect to Darwin (the National Rail Enquiries
  system), if the `use_darwin` config flag is enabled.
- However, none of this is tested, and it uses tspl-zugfuhrer
  endpoints that don't actually exist yet...(!)
- The STOMP handling code was refactored to allow for this change,
  becoming more modular and service-agnostic.
  • Loading branch information
eeeeeta committed Aug 16, 2019
1 parent 8fc0756 commit 9d81bd5
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions tspl-nrod/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ tokio-core = "0.1"
[dependencies.ntrod-types]
path = "../ntrod-types"

[dependencies.darwin-types]
path = "../darwin-types"

[dependencies.tspl-sqlite]
path = "../tspl-sqlite"

Expand Down
16 changes: 11 additions & 5 deletions tspl-nrod/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,24 @@ use tspl_util::{ConfigExt, crate_name};
pub struct Config {
/// URL of a running tspl-zugfuhrer instance.
pub service_zugfuhrer: String,
/// NROD username.
/// NROD/Darwin username.
pub username: String,
/// NROD password.
/// NROD/Darwin password.
pub password: String,
/// Number of worker threads to use.
pub n_threads: u32,
/// NROD STOMP hostname.
/// NROD/Darwin STOMP hostname.
#[serde(default)]
pub stomp_host: Option<String>,
/// NROD STOMP port.
/// NROD/Darwin STOMP port.
#[serde(default)]
pub stomp_port: Option<u16>
pub stomp_port: Option<u16>,
/// Connect to Darwin instead of NROD.
#[serde(default)]
pub use_darwin: bool,
/// Darwin queue for updates.
#[serde(default)]
pub darwin_queue_updates: Option<String>
}

impl ConfigExt for Config {
Expand Down
130 changes: 109 additions & 21 deletions tspl-nrod/src/conn.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,141 @@
//! Actually connecting to NROD, and reconnecting on failure.
//! Actually connecting to a STOMP message queue, and reconnecting on failure.
use stomp::session::{SessionEvent, Session};
use stomp::header::Header;
use stomp::frame::Frame;
use stomp::session_builder::SessionBuilder;
use stomp::subscription::AckOrNack;
use stomp::connection::*;
use tokio_core::reactor::{Timeout, Handle};
use crossbeam_channel::Sender;
use failure::Error;
use futures::{Poll, Async, Future, Stream};
use flate2::bufread::GzDecoder;
use std::io::prelude::*;
use log::*;
use std::time::Duration;

use crate::worker::WorkerMessage;
use crate::config::Config;
use crate::nrod::NrodMessage;
use crate::darwin::DarwinMessage;
use crate::errors::Result;

pub struct NrodProcessor {
pub trait StompType {
fn on_connect(&mut self, sess: &mut Session) -> Result<()>;
fn on_message(&mut self, dest: &str, frame: &Frame) -> Result<()>;
}

pub struct Nrod {
tx: Sender<NrodMessage>
}
impl StompType for Nrod {
fn on_connect(&mut self, sess: &mut Session) -> Result<()> {
info!("Connected to NTROD; subscribing...");
sess.subscription("/topic/TRAIN_MVT_ALL_TOC")
.with(Header::new("activemq.subscriptionName", "tspl-nrod"))
.start();
Ok(())
}
fn on_message(&mut self, dest: &str, frame: &Frame) -> Result<()> {
debug!("Got a NTROD message addressed to {}", dest);
if dest == "/topic/TRAIN_MVT_ALL_TOC" {
let st = String::from_utf8_lossy(&frame.body);
self.tx.send(NrodMessage::Movement(st.into()));
}
Ok(())
}
}
pub struct Darwin {
tx: Sender<DarwinMessage>,
queue_updates: Option<String>
}
impl Darwin {
fn updates_name(&self) -> &str {
self.queue_updates
.as_ref().map(|x| x as &str)
.unwrap_or("darwin.pushport-v16")
}
}
impl StompType for Darwin {
fn on_connect(&mut self, sess: &mut Session) -> Result<()> {
info!("Connected to Darwin; subscribing..");
let name = self.updates_name();
sess.subscription(name)
.with(Header::new("activemq.subscriptionName", "tspl-darwin"))
.start();
Ok(())
}
fn on_message(&mut self, dest: &str, frame: &Frame) -> Result<()> {
debug!("Got a Darwin message addressed to {}", dest);
if dest == self.updates_name() {
let mut gz = GzDecoder::new(&frame.body as &[u8]);
let mut st = String::new();
if let Err(e) = gz.read_to_string(&mut st) {
error!("Failed to deflate frame: {}", e);
return Ok(());
}
self.tx.send(DarwinMessage::Pport(st));
}
Ok(())
}
}
pub struct NrodConfig<'a> {
pub username: &'a str,
pub password: &'a str,
pub stomp_host: Option<&'a str>,
pub stomp_port: Option<u16>
}
pub struct DarwinConfig<'a> {
pub username: &'a str,
pub password: &'a str,
pub stomp_host: &'a str,
pub stomp_port: Option<u16>,
pub queue_updates: Option<&'a str>
}
pub struct StompProcessor<T> {
sess: Session,
hdl: Handle,
timeout: Option<Timeout>,
timeout_ms: u64,
tx: Sender<WorkerMessage>,
inner: T
}
impl NrodProcessor {
pub fn new(conf: &Config, tx: Sender<WorkerMessage>, hdl: Handle) -> Result<Self> {
impl StompProcessor<Nrod> {
pub fn new_nrod(conf: &NrodConfig, tx: Sender<NrodMessage>, hdl: Handle) -> Result<StompProcessor<Nrod>> {
let stomp_host = conf.stomp_host
.as_ref().map(|x| x as &str)
.unwrap_or("datafeeds.networkrail.co.uk");
let sess = SessionBuilder::new(stomp_host, conf.stomp_port.unwrap_or(61618))
.with(Credentials(&conf.username, &conf.password))
.with(Credentials(conf.username, conf.password))
.with(Header::new("client-id", "[email protected]"))
.with(HeartBeat(5_000, 2_000))
.start(hdl.clone())?;
let inner = Nrod { tx };
Ok(StompProcessor {
sess, hdl, inner,
timeout: None,
timeout_ms: 1000
})
}
}
impl StompProcessor<Darwin> {
pub fn new_darwin(conf: &DarwinConfig, tx: Sender<DarwinMessage>, hdl: Handle) -> Result<StompProcessor<Darwin>> {
let stomp_host = conf.stomp_host;
let sess = SessionBuilder::new(stomp_host, conf.stomp_port.unwrap_or(61613))
.with(Credentials(conf.username, conf.password))
.with(Header::new("client-id", "[email protected]"))
.with(HeartBeat(5_000, 2_000))
.start(hdl.clone())?;
Ok(Self {
sess, hdl, tx,
let inner = Darwin {
tx,
queue_updates: conf.queue_updates
.map(|x| x.into())
};
Ok(StompProcessor {
sess, hdl, inner,
timeout: None,
timeout_ms: 1000
})
}
}
impl Future for NrodProcessor {
impl<T> Future for StompProcessor<T> where T: StompType {
type Item = ();
type Error = Error;

Expand All @@ -62,10 +157,7 @@ impl Future for NrodProcessor {
let ev = ev.unwrap();
match ev {
Connected => {
info!("Connected to NTROD; subscribing...");
self.sess.subscription("/topic/TRAIN_MVT_ALL_TOC")
.with(Header::new("activemq.subscriptionName", "tspl-nrod"))
.start();
self.inner.on_connect(&mut self.sess)?;
},
ErrorFrame(fr) => {
error!("Error frame, reconnecting: {:?}", fr);
Expand All @@ -74,11 +166,7 @@ impl Future for NrodProcessor {
Message { destination, frame, .. } => {
self.timeout = None;
self.timeout_ms = 1000;
debug!("Got a NTROD message addressed to {}", destination);
if destination == "/topic/TRAIN_MVT_ALL_TOC" {
let st = String::from_utf8_lossy(&frame.body);
self.tx.send(WorkerMessage::Movement(st.into()));
}
self.inner.on_message(&destination, &frame)?;
self.sess.acknowledge_frame(&frame, AckOrNack::Ack);
},
Disconnected(reason) => {
Expand Down
Loading

0 comments on commit 9d81bd5

Please sign in to comment.