"Τὰ πάντα ῥεῖ καὶ οὐδὲν μένει" — Heraclitus
Protoflow is a Rust implementation of flow-based programming (FBP), with messages encoded as Protocol Buffers. It can be used to implement dataflow systems consisting of interconnected blocks that process messages.
Tip
🚧 We are building in public. This is presently under heavy construction.
- Implements a flow-based programming (FBP) dataflow scheduler.
- Constructs systems by connecting reusable components called blocks.
- Uses Protocol Buffers messages for inter-block communication.
- Currently offers a threaded runtime with an in-process transport.
- Planned support for pluggable runtimes (threaded, async, etc).
- Planned support for pluggable transports (in-process, socket, etc).
- Includes a command-line interface (CLI) for executing Protoflow blocks.
- Supports opting out of any feature using comprehensive feature flags.
- Adheres to the Rust API Guidelines in its naming conventions.
- 100% free and unencumbered public domain software.
- Rust 1.70+
cargo install protoflow
brew tap asimov-platform/tap
brew install protoflow --HEAD
For Rust examples, see the examples
directory. Good places to start are
the echo_lines
and count_lines
examples:
cargo run --example echo_lines < CHANGES.md
cargo run --example count_lines < README.md
The count_lines
example
use protoflow::{blocks::*, BlockResult};
pub fn main() -> BlockResult {
System::run(|s| {
let stdin = s.read_stdin();
let line_decoder = s.decode_lines();
s.connect(&stdin.output, &line_decoder.input);
let counter = s.count::<String>();
s.connect(&line_decoder.output, &counter.input);
let count_encoder = s.encode_lines();
s.connect(&counter.count, &count_encoder.input);
let stdout = s.write_stdout();
s.connect(&count_encoder.output, &stdout.input);
})
}
-
System: A collection of blocks that are connected together. Systems are the top-level entities in a Protoflow program.
-
Block: An encapsulated system component that processes messages. Blocks are the autonomous units of computation in a system.
-
Port: A named connection point on a block that sends or receives messages. Ports are the only interfaces through which blocks communicate with each other.
-
Message: A unit of data that flows between blocks in a system. Messages are Protocol Buffers packets that are processed by blocks.
The built-in blocks provided by Protoflow are listed below:
Block | Description |
---|---|
Buffer |
Stores all messages it receives. |
ConcatStrings |
Concatenates the received string messages, with an optional delimiter string inserted between each message. |
Const |
Sends a constant value. |
Count |
Counts the number of messages it receives, while optionally passing them through. |
Decode |
Decodes messages from a byte stream. |
DecodeCSV |
Decodes the received input bytes message into a structured CSV format, separating the header and rows as prost_types::Value . |
DecodeHex |
Decodes hexadecimal stream to byte stream. |
DecodeJSON |
Decodes JSON messages from a byte stream. |
Delay |
Passes messages through while delaying them by a fixed or random duration. |
Drop |
Discards all messages it receives. |
Encode |
Encodes messages to a byte stream. |
EncodeCSV |
Encodes the provided header and rows, given as prost_types::Value , into a CSV-formatted byte stream. |
EncodeHex |
Encodes a byte stream into hexadecimal form. |
EncodeJSON |
Encodes messages into JSON format. |
Hash |
Computes the cryptographic hash of a byte stream. |
Random |
Generates and sends a random value. |
ReadDir |
Reads file names from a file system directory. |
ReadEnv |
Reads the value of an environment variable. |
ReadFile |
Reads bytes from the contents of a file. |
ReadSocket |
Reads bytes from a TCP socket. |
ReadStdin |
Reads bytes from standard input (aka stdin). |
SplitString |
Splits the received input message, with an optional delimiter string parameter. |
WriteFile |
Writes or appends bytes to the contents of a file. |
WriteSocket |
Writes bytes to a TCP socket |
WriteStderr |
Writes bytes to standard error (aka stderr). |
WriteStdout |
Writes bytes to standard output (aka stdout). |
A block that simply stores all messages it receives.
block-beta
columns 4
Source space:2 Buffer
Source-- "input" -->Buffer
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Buffer block
class Source hidden
protoflow execute Buffer
A block for concatenating all string messages it receives, with an optional delimiter string inserted between each message
block-beta
columns 7
Source space:2 ConcatStrings space:2 Sink
Source-- "input" -->ConcatStrings
ConcatStrings-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class ConcatStrings block
class Source hidden
class Sink hidden
protoflow execute ConcatStrings delimiter=","
A block for sending a constant value.
block-beta
columns 4
Const space:2 Sink
Const-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Const block
class Sink hidden
protoflow execute Const value=Hello
A block that counts the number of messages it receives, while optionally passing them through.
block-beta
columns 7
Source space:2 Count space:2 Sink
space:7
space:7
space:3 Result space:3
Source-- "input" -->Count
Count-- "output" -->Sink
Count-- "count" -->Result
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Count block
class Source hidden
class Sink hidden
class Result hidden
protoflow execute Count
A block that decodes T
messages from a byte stream.
block-beta
columns 7
Source space:2 Decode space:2 Sink
Source-- "input" -->Decode
Decode-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Decode block
class Source hidden
class Sink hidden
protoflow execute Decode encoding=text
A block that decodes CSV files from a byte stream into a header and rows represented as prost_types::Value
block-beta
columns 7
space:5 Sink1 space:1
space:1 Source space:1 DecodeCSV space:3
space:5 Sink2 space:1
Source-- "input" -->DecodeCSV
DecodeCSV-- "header" -->Sink1
DecodeCSV-- "content" -->Sink2
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class DecodeCSV block
class Source hidden
class Sink1 hidden
class Sink2 hidden
protoflow execute DecodeCSV
A block that decodes a hexadecimal byte stream into bytes
block-beta
columns 7
Source space:2 DecodeHex space:2 Sink
Source-- "input" -->DecodeHex
DecodeHex-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class DecodeHex block
class Source hidden
class Sink hidden
protoflow execute DecodeHex
A block that decodes JSON messages from a byte stream.
block-beta
columns 7
Source space:2 DecodeJSON space:2 Sink
Source-- "input" -->DecodeJSON
DecodeJSON-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class DecodeJSON block
class Source hidden
class Sink hidden
protoflow execute DecodeJSON
A block that passes messages through while delaying them by a fixed or random duration.
block-beta
columns 7
Source space:2 Delay space:2 Sink
Source-- "input" -->Delay
Delay-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Delay block
class Source hidden
class Sink hidden
protoflow execute Delay fixed=2
A block that simply discards all messages it receives.
block-beta
columns 4
Source space:2 Drop
Source-- "input" -->Drop
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Drop block
class Source hidden
protoflow execute Drop
A block that encodes T
messages to a byte stream.
block-beta
columns 7
Source space:2 Encode space:2 Sink
Source-- "input" -->Encode
Encode-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Encode block
class Source hidden
class Sink hidden
protoflow execute Encode encoding=text
protoflow execute Encode encoding=protobuf
A block that encodes CSV files by converting a header and rows, provided as prost_types::Value
streams, into a byte stream
block-beta
columns 7
space:1 Source1 space:5
space:3 EncodeCSV space:1 Sink space:1
space:1 Source2 space:5
Source1-- "header" -->EncodeCSV
Source2-- "rows" -->EncodeCSV
EncodeCSV-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class EncodeCSV block
class Source1 hidden
class Source2 hidden
class Sink hidden
protoflow execute EncodeCSV
A block that encodes a byte stream into hexadecimal form.
block-beta
columns 7
Source space:2 EncodeHex space:2 Sink
Source-- "input" -->EncodeHex
EncodeHex-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class EncodeHex block
class Source hidden
class Sink hidden
protoflow execute EncodeHex
A block that encodes messages into JSON format.
block-beta
columns 7
Source space:2 EncodeJSON space:2 Sink
Source-- "input" -->EncodeJSON
EncodeJSON-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class EncodeJSON block
class Source hidden
class Sink hidden
protoflow execute EncodeJSON
A block that computes the cryptographic hash of a byte stream, while optionally passing it through.
block-beta
columns 7
Source space:2 Hash space:2 Sink
space:7
space:7
space:3 Result space:3
Source-- "input" -->Hash
Hash-- "output" -->Sink
Hash-- "hash" -->Result
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Hash block
class Source hidden
class Sink hidden
class Result hidden
protoflow execute Hash algorithm=blake3
A block for generating and sending a random value.
block-beta
columns 4
Random space:2 Sink
Random-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Random block
class Sink hidden
protoflow execute Random seed=42
A block that reads file names from a file system directory.
block-beta
columns 4
Config space:3
space:4
space:4
ReadDir space:2 Sink
Config-- "path" -->ReadDir
ReadDir-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class ReadDir block
class Config hidden
class Sink hidden
protoflow execute ReadDir path=/tmp
A block that reads the value of an environment variable.
block-beta
columns 4
Config space:3
space:4
space:4
ReadEnv space:2 Sink
Config-- "name" -->ReadEnv
ReadEnv-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class ReadEnv block
class Config hidden
class Sink hidden
protoflow execute ReadEnv name=TERM
A block that reads bytes from the contents of a file.
block-beta
columns 4
Config space:3
space:4
space:4
ReadFile space:2 Sink
Config-- "path" -->ReadFile
ReadFile-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class ReadFile block
class Config hidden
class Sink hidden
protoflow execute ReadFile path=/tmp/file.txt
A block that reads bytes from a TCP socket.
block-beta
columns 4
ReadSocket space:2 Sink
ReadSocket-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class ReadSocket block
class Sink hidden
protoflow execute ReadSocket connection=tcp://127.0.0.1:7077 buffer_size=1024
A block that reads bytes from standard input (aka stdin).
block-beta
columns 4
ReadStdin space:2 Sink
ReadStdin-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class ReadStdin block
class Sink hidden
protoflow execute ReadStdin < input.txt
A block that splits the received input message, with an optional delimiter string parameter
block-beta
columns 7
Source space:2 SplitString space:2 Sink
Source-- "input" -->SplitString
SplitString-- "output" -->Sink
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class SplitString block
class Source hidden
class Sink hidden
protoflow execute SplitString delimiter=","
A block that writes or appends bytes to the contents of a file.
block-beta
columns 4
space:3 Config
space:4
space:4
Source space:2 WriteFile
Config-- "path" -->WriteFile
Source-- "input" -->WriteFile
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class WriteFile block
class Config hidden
class Source hidden
protoflow execute WriteFile path=/tmp/file.txt
A block that writes bytes to TCP socket.
block-beta
columns 4
Source space:2 WriteSocket
Source-- "input" -->WriteSocket
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class WriteSocket block
class Source hidden
protoflow execute WriteSocket connection=tcp://127.0.0.1:7077 buffer_size=1024
A block that writes bytes to standard error (aka stderr).
block-beta
columns 4
Source space:2 WriteStderr
Source-- "input" -->WriteStderr
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class WriteStderr block
class Source hidden
protoflow execute WriteStderr < input.txt 2> output.txt
A block that writes bytes to standard output (aka stdout).
block-beta
columns 4
Source space:2 WriteStdout
Source-- "input" -->WriteStdout
classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class WriteStdout block
class Source hidden
protoflow execute WriteStdout < input.txt > output.txt
git clone https://github.com/asimov-platform/protoflow.git
- Do your best to adhere to the existing coding conventions and idioms.
- Make sure to run
cargo fmt
prior to submitting your pull request. - Don't leave trailing whitespace on any line, and make sure all text files include a terminating newline character.
To add a new block type implementation, make sure to examine and amend:
- The block type reference (table and subsections) in this README.
- The appropriate subdirectory under
lib/protoflow-blocks/src/blocks/
, such ascore
,flow
,hash
,io
,math
,sys
, ortext
. - The
BlockTag
enum inlib/protoflow-blocks/src/block_tag.rs
, which lists the names of all available block types. - The
BlockConfig
enum inlib/protoflow-blocks/src/block_config.rs
, which implements block instantiation and Serde deserialization. - The system-building DSL in
lib/protoflow-blocks/src/system.rs
, which provides convenience builder methods for system definition. - The
build_stdio_system()
function inlib/protoflow-blocks/src/lib.rs
, which is used by the CLI to instantiate blocks for standard I/O. - The documented block diagrams and sequence diagrams under
lib/protoflow-blocks/doc/
, which are embedded in the README and docs.
Note
If a block implementation requires additional crate dependencies, it may be appropriate for that block availability to be featured-gated so as to enable developers to opt out of those dependencies.
- Blocks must not panic; use other error-handling strategies. Ideally, block
implementations should be robust and infallible. When that's not possible,
consider encoding errors by having the output message type be an enum (cf.
Rust's
Result
) or consider having a dedicated error output port. If truly necessary, abort block execution by returning aBlockError
. - Blocks should not generally spawn threads.
- Blocks should document their system resource requirements, if any.
- Blocks should use the
tracing
crate for logging any errors, warnings, and debug output. However, since tracing is an optional feature and dependency, do make sure to feature-gate any use of tracing behind a#[cfg(feature = "tracing")]
guard.