diff --git a/lib/protoflow-blocks/src/blocks/sys/read_file.rs b/lib/protoflow-blocks/src/blocks/sys/read_file.rs index 50ec4a42..09490ed7 100644 --- a/lib/protoflow-blocks/src/blocks/sys/read_file.rs +++ b/lib/protoflow-blocks/src/blocks/sys/read_file.rs @@ -91,6 +91,7 @@ impl StdioSystem for ReadFile { #[cfg(test)] mod tests { extern crate std; + use super::ReadFile; use crate::{System, SystemBuilding, SystemExecution}; @@ -122,7 +123,7 @@ mod tests { system.connect(&path, &read_file.path); system.connect(&read_file.output, &output); - let thrd = std::thread::spawn(move || system.execute().and_then(|p| p.join()).unwrap()); + let process = system.execute().unwrap(); path.send(&temp_file.path().to_string_lossy().into()) .unwrap(); @@ -143,6 +144,6 @@ mod tests { "want EOS signal after path port is closed" ); - thrd.join().unwrap() + process.join().unwrap(); } } diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index 1ac658c0..8d0da65a 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -100,8 +100,12 @@ impl fmt::Debug for System { } impl SystemExecution for System { + fn prepare(&self) -> BlockResult<()> { + SystemExecution::prepare(&self.0) + } + fn execute(self) -> BlockResult> { - self.0.execute() + SystemExecution::execute(self.0) } } @@ -126,6 +130,10 @@ impl SystemBuilding for System { fn connect(&mut self, source: &OutputPort, target: &InputPort) -> bool { self.0.connect(source, target) } + + fn validate(&self) -> BlockResult<()> { + self.0.validate() + } } impl AllBlocks for System {} diff --git a/lib/protoflow-blocks/tests/json_roundtrip.rs b/lib/protoflow-blocks/tests/json_roundtrip.rs index 99e802ab..4353814f 100644 --- a/lib/protoflow-blocks/tests/json_roundtrip.rs +++ b/lib/protoflow-blocks/tests/json_roundtrip.rs @@ -19,11 +19,11 @@ fn json_roundtrip() -> Result<(), ()> { system.connect(&decode.output, &encode.input); system.connect(&encode.output, &output); - let thread = std::thread::spawn(|| system.execute().unwrap().join().unwrap()); + let process = system.execute().unwrap(); let message = output.recv().unwrap().unwrap(); - thread.join().unwrap(); + process.join().unwrap(); assert_eq!(input_bytes, message); diff --git a/lib/protoflow-core/src/input_port.rs b/lib/protoflow-core/src/input_port.rs index d5673f19..71458d4c 100644 --- a/lib/protoflow-core/src/input_port.rs +++ b/lib/protoflow-core/src/input_port.rs @@ -1,37 +1,48 @@ // This is free and unencumbered software released into the public domain. use crate::{ - prelude::{fmt, Arc, Cow, MaybeLabeled, MaybeNamed, PhantomData}, - InputPortID, Message, MessageReceiver, Port, PortID, PortResult, PortState, System, Transport, + prelude::{fmt, Arc, Cow, MaybeLabeled, MaybeNamed, PhantomData, RwLock}, + InputPortID, Message, MessageReceiver, Port, PortError, PortID, PortResult, PortState, System, + Transport, }; #[derive(Clone)] //, Debug, Eq, Ord, PartialEq, PartialOrd)] pub struct InputPort { - pub(crate) id: InputPortID, - pub(crate) transport: Arc, + pub(crate) state: Arc>, _phantom: PhantomData, } impl InputPort { pub fn new(system: &System) -> Self { - let runtime = system.runtime.as_ref(); - let transport = runtime.transport.clone(); + let id = system.connection_config.borrow_mut().add_input(); + let connection = Default::default(); + let state = Arc::new(RwLock::new(InputPortState { id, connection })); Self { _phantom: PhantomData, - id: transport.open_input().unwrap(), - transport, + state, } } pub fn close(&mut self) -> PortResult { - self.transport.close(PortID::Input(self.id)) + let mut state = self.state.write(); + let InputPortConnection::Running(ref transport) = state.connection else { + return Ok(false); + }; + transport.close(PortID::Input(state.id))?; + state.connection = InputPortConnection::Closed; + Ok(true) } pub fn recv(&self) -> PortResult> { - match self.transport.recv(self.id)? { + let state = self.state.read(); + let InputPortConnection::Running(ref transport) = state.connection else { + return Err(PortError::Disconnected); + }; + + match transport.recv(state.id)? { None => Ok(None), // EOS (port closed) Some(encoded_message) => { - if encoded_message.len() == 0 { + if encoded_message.is_empty() { Ok(None) // EOS (port disconnected) } else { match T::decode_length_delimited(encoded_message) { @@ -44,7 +55,12 @@ impl InputPort { } pub fn try_recv(&self) -> PortResult> { - match self.transport.try_recv(self.id)? { + let state = self.state.read(); + let InputPortConnection::Running(ref transport) = state.connection else { + return Err(PortError::Disconnected); + }; + + match transport.try_recv(state.id)? { None => Ok(None), // EOS Some(encoded_message) => match T::decode(encoded_message) { Ok(message) => Ok(Some(message)), @@ -68,13 +84,18 @@ impl MaybeLabeled for InputPort { impl Port for InputPort { fn id(&self) -> PortID { - PortID::Input(self.id) + PortID::Input(self.state.read().id) } fn state(&self) -> PortState { - self.transport - .state(PortID::Input(self.id)) - .unwrap_or(PortState::Closed) + let state = self.state.read(); + match state.connection { + InputPortConnection::Closed => PortState::Closed, + InputPortConnection::Ready => PortState::Open, + InputPortConnection::Running(ref transport) => transport + .state(PortID::Input(state.id)) + .unwrap_or(PortState::Closed), + } } fn close(&mut self) -> PortResult { @@ -94,12 +115,42 @@ impl MessageReceiver for InputPort { impl fmt::Display for InputPort { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "→{}", self.id) + write!(f, "→{}", self.id()) } } impl fmt::Debug for InputPort { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("InputPort").field("id", &self.id).finish() + f.debug_struct("InputPort") + .field("state", &self.state.read()) + .finish() + } +} + +#[derive(Clone, Debug)] +pub(crate) struct InputPortState { + pub(crate) id: InputPortID, + pub(crate) connection: InputPortConnection, +} + +#[derive(Clone, Default)] +pub(crate) enum InputPortConnection { + #[default] + Ready, + Running(Arc), + Closed, +} + +impl core::fmt::Debug for InputPortConnection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "InputPortConnection::{}", + match self { + Self::Ready => "Ready", + Self::Running(_) => "Running", + Self::Closed => "Closed", + } + ) } } diff --git a/lib/protoflow-core/src/output_port.rs b/lib/protoflow-core/src/output_port.rs index 71b61326..b4b9762a 100644 --- a/lib/protoflow-core/src/output_port.rs +++ b/lib/protoflow-core/src/output_port.rs @@ -1,39 +1,49 @@ // This is free and unencumbered software released into the public domain. use crate::{ - prelude::{fmt, Arc, Bytes, Cow, MaybeLabeled, MaybeNamed, PhantomData}, - Message, MessageSender, OutputPortID, Port, PortID, PortResult, PortState, System, Transport, + prelude::{fmt, Arc, Bytes, Cow, MaybeLabeled, MaybeNamed, PhantomData, RwLock}, + Message, MessageSender, OutputPortID, Port, PortError, PortID, PortResult, PortState, System, + Transport, }; #[derive(Clone)] //, Debug, Eq, Ord, PartialEq, PartialOrd)] pub struct OutputPort { - pub(crate) id: OutputPortID, - pub(crate) transport: Arc, + pub(crate) state: Arc>, _phantom: PhantomData, } impl OutputPort { pub fn new(system: &System) -> Self { - let runtime = system.runtime.as_ref(); - let transport = runtime.transport.clone(); + let id = system.connection_config.borrow_mut().add_output(); + let connection = Default::default(); + let state = Arc::new(RwLock::new(OutputPortState { id, connection })); Self { _phantom: PhantomData, - id: transport.open_output().unwrap(), - transport, + state, } } pub fn close(&mut self) -> PortResult { - self.transport.close(PortID::Output(self.id)) + let mut state = self.state.write(); + let OutputPortConnection::Running(ref transport) = state.connection else { + return Ok(false); + }; + transport.close(PortID::Output(state.id))?; + state.connection = OutputPortConnection::Closed; + Ok(true) } pub fn send<'a>(&self, message: impl Into<&'a T>) -> PortResult<()> where T: 'a, { + let state = self.state.read(); + let OutputPortConnection::Running(ref transport) = state.connection else { + return Err(PortError::Disconnected); + }; let message: &T = message.into(); let bytes = Bytes::from(message.encode_length_delimited_to_vec()); - self.transport.send(self.id, bytes) + transport.send(state.id, bytes) } } @@ -51,13 +61,18 @@ impl MaybeLabeled for OutputPort { impl Port for OutputPort { fn id(&self) -> PortID { - PortID::Output(self.id) + PortID::Output(self.state.read().id) } fn state(&self) -> PortState { - self.transport - .state(PortID::Output(self.id)) - .unwrap_or(PortState::Closed) + let state = self.state.read(); + match state.connection { + OutputPortConnection::Closed => PortState::Closed, + OutputPortConnection::Ready => PortState::Open, + OutputPortConnection::Running(ref transport) => transport + .state(PortID::Output(state.id)) + .unwrap_or(PortState::Closed), + } } fn close(&mut self) -> PortResult { @@ -76,12 +91,42 @@ impl MessageSender for OutputPort { impl fmt::Display for OutputPort { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}→", self.id) + write!(f, "{}→", self.id()) } } impl fmt::Debug for OutputPort { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("OutputPort").field("id", &self.id).finish() + f.debug_struct("OutputPort") + .field("state", &self.state.read()) + .finish() + } +} + +#[derive(Clone, Debug)] +pub(crate) struct OutputPortState { + pub(crate) id: OutputPortID, + pub(crate) connection: OutputPortConnection, +} + +#[derive(Clone, Default)] +pub(crate) enum OutputPortConnection { + #[default] + Ready, + Running(Arc), + Closed, +} + +impl core::fmt::Debug for OutputPortConnection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "OutputPortConnection::{}", + match self { + Self::Ready => "Ready", + Self::Running(_) => "Running", + Self::Closed => "Closed", + } + ) } } diff --git a/lib/protoflow-core/src/prelude.rs b/lib/protoflow-core/src/prelude.rs index 95df8052..b587bc17 100644 --- a/lib/protoflow-core/src/prelude.rs +++ b/lib/protoflow-core/src/prelude.rs @@ -46,6 +46,9 @@ pub type Instant = Duration; #[doc(hidden)] pub use bytes; +#[doc(hidden)] +pub use parking_lot::RwLock; + #[doc(hidden)] pub use prost; diff --git a/lib/protoflow-core/src/system.rs b/lib/protoflow-core/src/system.rs index b8917b1d..471a5f73 100644 --- a/lib/protoflow-core/src/system.rs +++ b/lib/protoflow-core/src/system.rs @@ -1,12 +1,16 @@ // This is free and unencumbered software released into the public domain. use crate::{ - prelude::{fmt, Arc, Box, Bytes, PhantomData, Rc, String, VecDeque}, + prelude::{ + fmt, Arc, BTreeMap, BTreeSet, Box, Bytes, PhantomData, Rc, RefCell, RwLock, String, + ToString, VecDeque, + }, runtimes::StdRuntime, transports::MpscTransport, types::Any, - Block, BlockID, BlockResult, BoxedBlock, BoxedBlockType, InputPort, InputPortID, Message, - OutputPort, OutputPortID, PortID, PortResult, Process, Runtime, Transport, + Block, BlockError, BlockID, BlockResult, BoxedBlock, BoxedBlockType, InputPort, + InputPortConnection, InputPortID, InputPortState, Message, OutputPort, OutputPortConnection, + OutputPortID, OutputPortState, Port, PortID, PortResult, Process, Runtime, Transport, }; #[cfg(feature = "tokio")] @@ -57,9 +61,16 @@ pub trait SystemBuilding { /// /// Both ports must be of the same message type. fn connect(&mut self, source: &OutputPort, target: &InputPort) -> bool; + + /// Validates system for execution. + fn validate(&self) -> BlockResult<()>; } pub trait SystemExecution { + /// Prepare: + /// - Calls the transport layer to connect all the output->input ports. + /// The connections are defined by `SystemBuilding.connect()`. + fn prepare(&self) -> BlockResult<()>; /// Executes the system, returning the system process. fn execute(self) -> BlockResult>; } @@ -71,9 +82,30 @@ pub struct System { /// The registered blocks in the system. pub(crate) blocks: VecDeque, + pub(crate) connection_config: RefCell, + _phantom: PhantomData, } +#[derive(Default, Debug)] +pub(crate) struct SystemConnections { + pub(crate) outputs: BTreeMap>>, + pub(crate) inputs: BTreeMap>>, + pub(crate) connections: BTreeSet<(OutputPortID, InputPortID)>, +} + +impl SystemConnections { + pub(crate) fn add_output(&mut self) -> OutputPortID { + let id = self.outputs.len() + 1; + OutputPortID::try_from(id as isize).unwrap() + } + + pub(crate) fn add_input(&mut self) -> InputPortID { + let id = self.inputs.len() + 1; + InputPortID::try_from(-(id as isize)).unwrap() + } +} + pub type Subsystem = System; impl fmt::Debug for System { @@ -99,6 +131,7 @@ impl System { Self { runtime: runtime.clone(), blocks: VecDeque::new(), + connection_config: Default::default(), _phantom: PhantomData, } } @@ -109,11 +142,22 @@ impl System { } pub fn input(&self) -> InputPort { - InputPort::new(self) + let port = InputPort::new(self); + let state = port.state.clone(); + let id = state.read().id; + self.connection_config.borrow_mut().inputs.insert(id, state); + port } pub fn output(&self) -> OutputPort { - OutputPort::new(self) + let port = OutputPort::new(self); + let state = port.state.clone(); + let id = state.read().id; + self.connection_config + .borrow_mut() + .outputs + .insert(id, state); + port } pub fn block(&mut self, block: B) -> B { @@ -148,18 +192,16 @@ impl System { } pub fn connect(&self, source: &OutputPort, target: &InputPort) -> bool { - self.connect_by_id(PortID::Output(source.id), PortID::Input(target.id)) - .unwrap() + self.connect_by_id(source.id(), target.id()).unwrap() } #[doc(hidden)] pub fn connect_by_id(&self, source_id: PortID, target_id: PortID) -> PortResult { - let runtime = self.runtime.as_ref(); - let transport = runtime.transport.as_ref(); - transport.connect( + self.connection_config.borrow_mut().connections.insert(( OutputPortID(source_id.into()), InputPortID(target_id.into()), - ) + )); + Ok(true) } } @@ -184,10 +226,85 @@ impl SystemBuilding for System { fn connect(&mut self, source: &OutputPort, target: &InputPort) -> bool { System::connect(self, source, target) } + + fn validate(&self) -> BlockResult<()> { + Ok(()) // TODO + } } impl SystemExecution for System { + fn prepare(&self) -> BlockResult<()> { + // Prepare opens ports in the runtime's transport and connects them + // according to `self.connection_config`. + + let connection_config = self.connection_config.borrow(); + + // A map to go from the pre-created system port IDs to the actual transport port IDs. + let mut output_port_system_to_transport_id = BTreeMap::new(); + + // Open output ports in transport + for (system_id, state) in connection_config.outputs.iter() { + let transport_id = self + .runtime + .transport + .open_output() + .map_err(BlockError::PortError)?; + + output_port_system_to_transport_id.insert(system_id, transport_id); + + let mut state = state.write(); + // Update the port's state with the transport port ID. + state.id = transport_id; + // And give the port access to the transport. + state.connection = OutputPortConnection::Running(self.runtime.transport.clone()); + } + + // A map to go from the pre-created system port IDs to the actual transport port IDs. + let mut input_port_system_to_transport_id = BTreeMap::new(); + + // Open input ports in transport. + for (system_id, state) in connection_config.inputs.iter() { + let transport_id = self + .runtime + .transport + .open_input() + .map_err(BlockError::PortError)?; + + input_port_system_to_transport_id.insert(system_id, transport_id); + + let mut state = state.write(); + // Update the port's state with the transport port ID. + state.id = transport_id; + // And give the port access to the transport. + state.connection = InputPortConnection::Running(self.runtime.transport.clone()); + } + + // Connect all the ports. + for (system_out_id, system_in_id) in connection_config.connections.clone() { + let transport_out_id = output_port_system_to_transport_id.get(&system_out_id); + let transport_in_id = input_port_system_to_transport_id.get(&system_in_id); + + let Some((&transport_out_id, &transport_in_id)) = + Option::zip(transport_out_id, transport_in_id) + else { + // This is programmer error and the system will definitively not be ok for + // execution. + return Err(BlockError::Other( + "Failed to connect ports for execution".to_string(), + )); + }; + + self.runtime + .transport + .connect(transport_out_id, transport_in_id) + .map_err(BlockError::PortError)?; + } + + Ok(()) + } + fn execute(self) -> BlockResult> { - System::execute(self) + SystemExecution::prepare(&self)?; + self.execute() } } diff --git a/lib/protoflow/tests/mpsc.rs b/lib/protoflow/tests/mpsc.rs index bbf43cb4..72419469 100644 --- a/lib/protoflow/tests/mpsc.rs +++ b/lib/protoflow/tests/mpsc.rs @@ -4,13 +4,13 @@ use protoflow::{ blocks::{Const, Drop}, runtimes::StdRuntime, transports::MpscTransport, - Runtime, System, + System, SystemExecution, }; #[test] fn execute_mpsc_transport() -> Result<(), ()> { let transport = MpscTransport::new(); - let mut runtime = StdRuntime::new(transport).unwrap(); + let runtime = StdRuntime::new(transport).unwrap(); let mut system = System::new(&runtime); let constant = system.block(Const { output: system.output(), @@ -18,7 +18,7 @@ fn execute_mpsc_transport() -> Result<(), ()> { }); let blackhole = system.block(Drop::new(system.input())); system.connect(&constant.output, &blackhole.input); - let process = runtime.execute(system).unwrap(); + let process = SystemExecution::execute(system).unwrap(); process.join().unwrap(); Ok(()) } diff --git a/lib/protoflow/tests/zst.rs b/lib/protoflow/tests/zst.rs index cd5629c7..3948f20d 100644 --- a/lib/protoflow/tests/zst.rs +++ b/lib/protoflow/tests/zst.rs @@ -1,11 +1,13 @@ // This is free and unencumbered software released into the public domain. -use protoflow::{blocks::Const, runtimes::StdRuntime, transports::MpscTransport, Runtime, System}; +use protoflow::{ + blocks::Const, runtimes::StdRuntime, transports::MpscTransport, System, SystemExecution, +}; #[test] fn const_with_numeric_zero() -> Result<(), ()> { let transport = MpscTransport::new(); - let mut runtime = StdRuntime::new(transport).unwrap(); + let runtime = StdRuntime::new(transport).unwrap(); let mut system = System::new(&runtime); let constant: Const = system.block(Const { @@ -16,11 +18,9 @@ fn const_with_numeric_zero() -> Result<(), ()> { system.connect(&constant.output, &output); - std::thread::spawn(move || { - let process = runtime.execute(system).unwrap(); - process.join().unwrap(); - }); + let process = SystemExecution::execute(system).unwrap(); assert_eq!(output.recv(), Ok(Some(0))); // not Ok(None) + process.join().unwrap(); Ok(()) }