Skip to content

Commit

Permalink
Simplify and fix asyncio warnings in core and example
Browse files Browse the repository at this point in the history
  • Loading branch information
samuell committed Mar 1, 2021
1 parent 7c58e8c commit b32239c
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 15 deletions.
6 changes: 3 additions & 3 deletions examples/string_processing_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import flowbase


def main():
async def main():
net = flowbase.Network()

# Initialize components
Expand Down Expand Up @@ -32,7 +32,7 @@ def main():
printer.in_lines = stringjoiner.out_lines

# Run the full event loop
net.run()
await net.run()


class HiSayer:
Expand Down Expand Up @@ -97,4 +97,4 @@ async def run(self):


if __name__ == "__main__":
main()
flowbase.run(main(), debug=True)
2 changes: 1 addition & 1 deletion flowbase/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from flowbase.flowbase import Process, Network, Port
from flowbase.flowbase import Process, Network, Port, run
19 changes: 9 additions & 10 deletions flowbase/flowbase.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
"""
Copyright (c) 2020 Samuel Lampa <[email protected]>
Copyright (c) 2021 Samuel Lampa <[email protected]>
"""

import asyncio
import typing


def run(awaitable, **kwargs):
asyncio.run(awaitable, **kwargs)


class Process:
def run(self):
async def run(self):
raise NotImplementedError(
f"str(type(self)) can not be used directly, but must be subclassed"
)


class Network(Process):
_processes = {}
_driver_process = None

def __init__(self):
self._loop = asyncio.get_event_loop()

def add_process(self, name: str, process: Process):
self._processes[name] = process
self._loop.create_task(process.run())
self._driver_process = process
asyncio.create_task(process.run())

def run(self):
self._loop.run_until_complete(self._driver_process.run())
async def run(self):
[await p.run() for _, p in self._processes.items()]


class Port(asyncio.Queue):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

setup(
name="flowbase",
version="0.0.2",
version="0.0.3",
description="Python implementation of the FlowBase Flow-based Programming micro-framework idea (see flowbase.org)",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit b32239c

Please sign in to comment.