Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate system-building from the transport layer #12

Merged
merged 10 commits into from
Dec 20, 2024
5 changes: 3 additions & 2 deletions lib/protoflow-blocks/src/blocks/sys/read_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl StdioSystem for ReadFile {
#[cfg(test)]
mod tests {
extern crate std;

use super::ReadFile;
use crate::{System, SystemBuilding, SystemExecution};

Expand Down Expand Up @@ -122,7 +123,7 @@ mod tests {
system.connect(&path, &read_file.path);
system.connect(&read_file.output, &output);

let thrd = std::thread::spawn(move || system.execute().and_then(|p| p.join()).unwrap());
let process = system.execute().unwrap();

path.send(&temp_file.path().to_string_lossy().into())
.unwrap();
Expand All @@ -143,6 +144,6 @@ mod tests {
"want EOS signal after path port is closed"
);

thrd.join().unwrap()
process.join().unwrap();
}
}
10 changes: 9 additions & 1 deletion lib/protoflow-blocks/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,12 @@ impl fmt::Debug for System {
}

impl SystemExecution for System {
fn prepare(&self) -> BlockResult<()> {
SystemExecution::prepare(&self.0)
}

fn execute(self) -> BlockResult<Rc<dyn Process>> {
self.0.execute()
SystemExecution::execute(self.0)
}
}

Expand All @@ -126,6 +130,10 @@ impl SystemBuilding for System {
fn connect<M: Message>(&mut self, source: &OutputPort<M>, target: &InputPort<M>) -> bool {
self.0.connect(source, target)
}

fn validate(&self) -> BlockResult<()> {
self.0.validate()
}
}

impl AllBlocks for System {}
Expand Down
4 changes: 2 additions & 2 deletions lib/protoflow-blocks/tests/json_roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ fn json_roundtrip() -> Result<(), ()> {
system.connect(&decode.output, &encode.input);
system.connect(&encode.output, &output);

let thread = std::thread::spawn(|| system.execute().unwrap().join().unwrap());
let process = system.execute().unwrap();

let message = output.recv().unwrap().unwrap();

thread.join().unwrap();
process.join().unwrap();

assert_eq!(input_bytes, message);

Expand Down
87 changes: 69 additions & 18 deletions lib/protoflow-core/src/input_port.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
// This is free and unencumbered software released into the public domain.

use crate::{
prelude::{fmt, Arc, Cow, MaybeLabeled, MaybeNamed, PhantomData},
InputPortID, Message, MessageReceiver, Port, PortID, PortResult, PortState, System, Transport,
prelude::{fmt, Arc, Cow, MaybeLabeled, MaybeNamed, PhantomData, RwLock},
InputPortID, Message, MessageReceiver, Port, PortError, PortID, PortResult, PortState, System,
Transport,
};

#[derive(Clone)] //, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct InputPort<T: Message> {
pub(crate) id: InputPortID,
pub(crate) transport: Arc<dyn Transport>,
pub(crate) state: Arc<RwLock<InputPortState>>,
_phantom: PhantomData<T>,
}

impl<T: Message> InputPort<T> {
pub fn new<X: Transport + Default>(system: &System<X>) -> Self {
let runtime = system.runtime.as_ref();
let transport = runtime.transport.clone();
let id = system.connection_config.borrow_mut().add_input();
let connection = Default::default();
let state = Arc::new(RwLock::new(InputPortState { id, connection }));
Self {
_phantom: PhantomData,
id: transport.open_input().unwrap(),
transport,
state,
}
}

pub fn close(&mut self) -> PortResult<bool> {
self.transport.close(PortID::Input(self.id))
let mut state = self.state.write();
let InputPortConnection::Running(ref transport) = state.connection else {
return Ok(false);
};
transport.close(PortID::Input(state.id))?;
state.connection = InputPortConnection::Closed;
Ok(true)
}

pub fn recv(&self) -> PortResult<Option<T>> {
match self.transport.recv(self.id)? {
let state = self.state.read();
let InputPortConnection::Running(ref transport) = state.connection else {
return Err(PortError::Disconnected);
};

match transport.recv(state.id)? {
None => Ok(None), // EOS (port closed)
Some(encoded_message) => {
if encoded_message.len() == 0 {
if encoded_message.is_empty() {
Ok(None) // EOS (port disconnected)
} else {
match T::decode_length_delimited(encoded_message) {
Expand All @@ -44,7 +55,12 @@ impl<T: Message> InputPort<T> {
}

pub fn try_recv(&self) -> PortResult<Option<T>> {
match self.transport.try_recv(self.id)? {
let state = self.state.read();
let InputPortConnection::Running(ref transport) = state.connection else {
return Err(PortError::Disconnected);
};

match transport.try_recv(state.id)? {
None => Ok(None), // EOS
Some(encoded_message) => match T::decode(encoded_message) {
Ok(message) => Ok(Some(message)),
Expand All @@ -68,13 +84,18 @@ impl<T: Message> MaybeLabeled for InputPort<T> {

impl<T: Message> Port for InputPort<T> {
fn id(&self) -> PortID {
PortID::Input(self.id)
PortID::Input(self.state.read().id)
}

fn state(&self) -> PortState {
self.transport
.state(PortID::Input(self.id))
.unwrap_or(PortState::Closed)
let state = self.state.read();
match state.connection {
InputPortConnection::Closed => PortState::Closed,
InputPortConnection::Ready => PortState::Open,
InputPortConnection::Running(ref transport) => transport
.state(PortID::Input(state.id))
.unwrap_or(PortState::Closed),
}
}

fn close(&mut self) -> PortResult<bool> {
Expand All @@ -94,12 +115,42 @@ impl<T: Message> MessageReceiver<T> for InputPort<T> {

impl<T: Message> fmt::Display for InputPort<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "→{}", self.id)
write!(f, "→{}", self.id())
}
}

impl<T: Message> fmt::Debug for InputPort<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("InputPort").field("id", &self.id).finish()
f.debug_struct("InputPort")
.field("state", &self.state.read())
.finish()
}
}

#[derive(Clone, Debug)]
pub(crate) struct InputPortState {
pub(crate) id: InputPortID,
pub(crate) connection: InputPortConnection,
}

#[derive(Clone, Default)]
pub(crate) enum InputPortConnection {
#[default]
Ready,
Running(Arc<dyn Transport>),
Closed,
}

impl core::fmt::Debug for InputPortConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"InputPortConnection::{}",
match self {
Self::Ready => "Ready",
Self::Running(_) => "Running",
Self::Closed => "Closed",
}
)
}
}
77 changes: 61 additions & 16 deletions lib/protoflow-core/src/output_port.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,49 @@
// This is free and unencumbered software released into the public domain.

use crate::{
prelude::{fmt, Arc, Bytes, Cow, MaybeLabeled, MaybeNamed, PhantomData},
Message, MessageSender, OutputPortID, Port, PortID, PortResult, PortState, System, Transport,
prelude::{fmt, Arc, Bytes, Cow, MaybeLabeled, MaybeNamed, PhantomData, RwLock},
Message, MessageSender, OutputPortID, Port, PortError, PortID, PortResult, PortState, System,
Transport,
};

#[derive(Clone)] //, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct OutputPort<T: Message> {
pub(crate) id: OutputPortID,
pub(crate) transport: Arc<dyn Transport>,
pub(crate) state: Arc<RwLock<OutputPortState>>,
_phantom: PhantomData<T>,
}

impl<T: Message> OutputPort<T> {
pub fn new<X: Transport + Default>(system: &System<X>) -> Self {
let runtime = system.runtime.as_ref();
let transport = runtime.transport.clone();
let id = system.connection_config.borrow_mut().add_output();
let connection = Default::default();
let state = Arc::new(RwLock::new(OutputPortState { id, connection }));
Self {
_phantom: PhantomData,
id: transport.open_output().unwrap(),
transport,
state,
}
}

pub fn close(&mut self) -> PortResult<bool> {
self.transport.close(PortID::Output(self.id))
let mut state = self.state.write();
let OutputPortConnection::Running(ref transport) = state.connection else {
return Ok(false);
};
transport.close(PortID::Output(state.id))?;
state.connection = OutputPortConnection::Closed;
Ok(true)
}

pub fn send<'a>(&self, message: impl Into<&'a T>) -> PortResult<()>
where
T: 'a,
{
let state = self.state.read();
let OutputPortConnection::Running(ref transport) = state.connection else {
return Err(PortError::Disconnected);
};
let message: &T = message.into();
let bytes = Bytes::from(message.encode_length_delimited_to_vec());
self.transport.send(self.id, bytes)
transport.send(state.id, bytes)
}
}

Expand All @@ -51,13 +61,18 @@ impl<T: Message> MaybeLabeled for OutputPort<T> {

impl<T: Message> Port for OutputPort<T> {
fn id(&self) -> PortID {
PortID::Output(self.id)
PortID::Output(self.state.read().id)
}

fn state(&self) -> PortState {
self.transport
.state(PortID::Output(self.id))
.unwrap_or(PortState::Closed)
let state = self.state.read();
match state.connection {
OutputPortConnection::Closed => PortState::Closed,
OutputPortConnection::Ready => PortState::Open,
OutputPortConnection::Running(ref transport) => transport
.state(PortID::Output(state.id))
.unwrap_or(PortState::Closed),
}
}

fn close(&mut self) -> PortResult<bool> {
Expand All @@ -76,12 +91,42 @@ impl<T: Message> MessageSender<T> for OutputPort<T> {

impl<T: Message> fmt::Display for OutputPort<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}→", self.id)
write!(f, "{}→", self.id())
}
}

impl<T: Message> fmt::Debug for OutputPort<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("OutputPort").field("id", &self.id).finish()
f.debug_struct("OutputPort")
.field("state", &self.state.read())
.finish()
}
}

#[derive(Clone, Debug)]
pub(crate) struct OutputPortState {
pub(crate) id: OutputPortID,
pub(crate) connection: OutputPortConnection,
}

#[derive(Clone, Default)]
pub(crate) enum OutputPortConnection {
#[default]
Ready,
Running(Arc<dyn Transport>),
Closed,
}

impl core::fmt::Debug for OutputPortConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"OutputPortConnection::{}",
match self {
Self::Ready => "Ready",
Self::Running(_) => "Running",
Self::Closed => "Closed",
}
)
}
}
3 changes: 3 additions & 0 deletions lib/protoflow-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub type Instant = Duration;
#[doc(hidden)]
pub use bytes;

#[doc(hidden)]
pub use parking_lot::RwLock;

#[doc(hidden)]
pub use prost;

Expand Down
Loading