Skip to content
This repository has been archived by the owner on Jun 2, 2020. It is now read-only.

Commit

Permalink
updated Readme
Browse files Browse the repository at this point in the history
  • Loading branch information
slimeth committed Feb 5, 2019
1 parent 2a81ad0 commit 271cead
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 165 deletions.
154 changes: 84 additions & 70 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
persipubsub
===========

.. image:: https://badges.frapsoft.com/os/mit/mit.png?v=103
:target: https://opensource.org/licenses/mit-license.php
:alt: MIT License
.. image:: https://api.travis-ci.com/Parquery/persipubsub.svg?branch=master
:target: https://api.travis-ci.com/Parquery/persipubsub.svg?branch=master
:alt: Build Status

.. image:: https://coveralls.io/repos/github/Parquery/persipubsub/badge.svg?branch=master
:target: https://coveralls.io/github/Parquery/persipubsub?branch=master
:alt: Coverage

.. image:: https://readthedocs.org/projects/persipubsub/badge/?version=latest
:target: https://persipubsub.readthedocs.io/en/latest/?badge=latest
:alt: Documentation Status

.. image:: https://badge.fury.io/py/persipubsub.svg
:target: https://badge.fury.io/py/persipubsub
Expand All @@ -12,9 +20,10 @@ persipubsub
.. image:: https://img.shields.io/pypi/pyversions/persipubsub.svg
:alt: PyPI - Python Version

.. image:: https://readthedocs.org/projects/persipubsub/badge/?version=latest
:target: https://persipubsub.readthedocs.io/en/latest/?badge=latest
:alt: Documentation Status
.. image:: https://badges.frapsoft.com/os/mit/mit.png?v=103
:target: https://opensource.org/licenses/mit-license.php
:alt: MIT License



``persipubsub`` implements a persistent, thread-safe and process-safe `lmdb
Expand Down Expand Up @@ -80,17 +89,31 @@ Usage

The usage of the library consists of two steps: deployment and runtime

Environment
-----------

For improve the accessibility of the library, an environment class lets you
create and initialize any ``persipubsub`` component which you need in
deployment or runtime step.

Initialize environment
^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python
import persipubsub.environment
env = persipubsub.environment.new_environment(path="/home/user/queue/")
Deployment
----------

In the deployment stage the library sets up the queue structure which is saved
in a JSON file.
In the deployment stage the library sets up the queue structure with the control.

config.json
^^^^^^^^^^^
Control
^^^^^^^

Publishers, subscribers and queues need to be defined before runtime in the
config JSON file.
A control unit to initialize and maintain queues.

.. note::

Expand All @@ -100,65 +123,50 @@ config JSON file.
prune_first, which deletes the oldest messages, and prune_last, which
deletes the latest messages.

.. code-block:: json
{
"pub": {
"out_queue": "/home/user/queues/queue",
"subscribers": ["sub"]
},
"sub": {
"in_queue": "/home/user/queues/queue"
},
"queues": {
"/home/user/queues/queue": {
"path": "/home/user/queues/queue",
"max_reader_num": 1024,
"max_db_num": 1024,
"max_db_size_bytes": 34359738368,
"subscribers": ["sub"],
"high-water-mark": {
"MSG_TIMEOUT_SECS": 600,
"MAX_MSGS_NUM": 10000,
"HWM_LMDB_SIZE_BYTES": 1000000,
"strategy": "prune_first"
}
}
}
}
Control
^^^^^^^

A control unit to initialize and maintain queues.

Initialize all queues
"""""""""""""""""""""
Initialize queue
""""""""""""""""

.. code-block:: python
import persipubsub.control
import persipubsub.environment
import persipubsub.queue
persipubsub.control.initialize_all_dbs(config_pth="/home/user/config.json")
env = persipubsub.environment.new_environment(path="/home/user/new-queue/")
# Initialize a queue with default values.
control = env.new_control()
# Define all optional parameters of the queue.
hwm = persipubsub.queue._HighWaterMark()
strategy = persipubsub.queue._Strategy.prune_first
control = env.new_control(subscriber_ids=["sub1", "sub2"], max_readers=2,
max_size=10*4096, high_watermark=hwm,
strategy=strategy)
Prune all dangling messages
"""""""""""""""""""""""""""

.. code-block:: python
import persipubsub.control
import persipubsub.environment
env = persipubsub.environment.new_environment(
path="/home/user/queue-with-dangling-messages/")
control = env.new_control()
persipubsub.control.prune_dangling_messages(
config_pth="/home/user/config.json")
control.prune_dangling_messages()
Clear all messages
""""""""""""""""""

.. code-block:: python
import persipubsub.control
import persipubsub.environment
env = persipubsub.environment.new_environment(
path="/home/user/queue-with-subscribers-and-messages/")
control = env.new_control()
persipubsub.control.clear_all_subs(config_pth="/home/user/config.json")
control.clear_all_subscribers()
Runtime
Expand All @@ -184,10 +192,11 @@ be initialized as following.

.. code-block:: python
import persipubsub.publisher
import persipubsub.environment
pub = persipubsub.publisher.Pub()
pub.init(pub_id="pub", config_pth="/home/user/config.json")
env = persipubsub.environment.new_environment(path="/home/user/queue/")
pub = env.new_publisher()
Send a message
""""""""""""""
Expand All @@ -197,7 +206,7 @@ Send a message
msg = "Hello there!".encode('utf-8')
pub.send(msg=msg)
# subscribers have now a message in the queue.
# Subscribers have now a message in the queue.
Send many messages at once
""""""""""""""""""""""""""
Expand All @@ -222,38 +231,43 @@ be initialized as following.

.. code-block:: python
import persipubsub.subscriber
import persipubsub.environment
env = persipubsub.environment.new_environment(path="/home/user/queue/")
sub = persipubsub.subscriber.Sub()
sub.init(sub_id="sub", config_pth="/home/user/config.json")
sub = env.new_subscriber(identifier="sub")
Receive a message
"""""""""""""""""

.. code-block:: python
# one message in queue
# One message in queue
with sub.receive() as msg:
# do something with the message
print(msg) # b'Hello there!'
# sub queue is now empty
# This subscriber's queue is now empty
Catch up with latest message
""""""""""""""""""""""""""""

In case when the subscriber's loop is spinning slower then the publisher's,
there is a possibility receive the latest message and discard the others ones.
Used in the case that a particular subscriber cares only about the very last
message and other subscribers care about all the messages in the queue.

.. note::
For the other use case, when you only want to store the latest message and all
subscribers are interested only in the latest, then use high water mark
max_msgs_num = 1.

.. code-block:: python
# many outdated messages in queue
sub.pop_to_top()
# most recent message left in queue
with sub.receive() as msg:
# do something with the message
# Many outdated messages in queue
with sub.receive_to_top() as msg:
# do something with the latest message
# sub queue is now empty
# This subscriber's queue is now empty.
Documentation
=============
Expand Down
1 change: 1 addition & 0 deletions docs/source/persipubsub/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ persipubsub

persipubsub.ctl
persipubsub.env
persipubsub.queue
persipubsub.publisher
persipubsub.subscriber
persipubsub
5 changes: 5 additions & 0 deletions docs/source/persipubsub/persipubsub.queue.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
High Water Mark and Strategy
****************************

.. automodule:: persipubsub.queue
:members:
24 changes: 12 additions & 12 deletions persipubsub/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def set_queue_parameters(max_reader_num: int, max_db_num: int,
db=queue_db)


def set_hwm(hwm: persipubsub.queue._HighWaterMark,
def set_hwm(hwm: persipubsub.queue.HighWaterMark,
env: lmdb.Environment) -> None:
"""
Set high water mark values for queue.
Expand All @@ -62,7 +62,7 @@ def set_hwm(hwm: persipubsub.queue._HighWaterMark,
db=queue_db)


def set_strategy(strategy: persipubsub.queue._Strategy,
def set_strategy(strategy: persipubsub.queue.Strategy,
env: lmdb.Environment) -> None:
"""
Set pruning strategy for queue.
Expand Down Expand Up @@ -124,10 +124,10 @@ def init(self,
subscriber_ids: Optional[Sequence[str]] = None,
max_readers: int = 1024,
max_size: int = 32 * 1024**3,
high_watermark: persipubsub.queue._HighWaterMark = persipubsub.
queue._HighWaterMark(),
strategy: persipubsub.queue._Strategy = persipubsub.queue.
_Strategy.prune_first) -> None:
high_watermark: persipubsub.queue.HighWaterMark = persipubsub.
queue.HighWaterMark(),
strategy: persipubsub.queue.Strategy = persipubsub.queue.Strategy.
prune_first) -> None:
"""
Initialize control with a (re)initialized queue.
Expand All @@ -154,8 +154,8 @@ def _reinitialize_queue(self) -> None:
"""Reinitialize the queue which is maintained by the control."""
self.queue = persipubsub.queue._Queue()
self.queue.init(path=self.path)
assert isinstance(self.queue.sub_list, List)
self.subscriber_ids = set(self.queue.sub_list)
assert isinstance(self.queue.subscriber_ids, List)
self.subscriber_ids = set(self.queue.subscriber_ids)

# pylint: disable=too-many-arguments
@icontract.require(lambda max_readers: max_readers >= 0)
Expand All @@ -169,9 +169,9 @@ def _initialize_queue(self,
max_readers: int = 1024,
max_size: int = 32 * 1024**3,
high_watermark: persipubsub.queue.
_HighWaterMark = persipubsub.queue._HighWaterMark(),
strategy: persipubsub.queue._Strategy = persipubsub.
queue._Strategy.prune_first) -> None:
HighWaterMark = persipubsub.queue.HighWaterMark(),
strategy: persipubsub.queue.Strategy = persipubsub.
queue.Strategy.prune_first) -> None:
"""
Initialize queue.
Expand Down Expand Up @@ -261,7 +261,7 @@ def prune_dangling_messages(self) -> None:
"""Prune all dangling messages from the lmdb."""
assert isinstance(self.queue, persipubsub.queue._Queue)
persipubsub.queue._prune_dangling_messages_for(
queue=self.queue, sub_list=list(self.subscriber_ids))
queue=self.queue, subscriber_ids=list(self.subscriber_ids))

# pylint: disable=too-many-locals
def _prune_all_messages_for(self, sub_id: str) -> None:
Expand Down
8 changes: 4 additions & 4 deletions persipubsub/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ def new_control(self,
max_readers: int = 1024,
max_size: int = 32 * 1024**3,
high_watermark: persipubsub.queue.
_HighWaterMark = persipubsub.queue._HighWaterMark(),
strategy: persipubsub.queue._Strategy = persipubsub.queue.
_Strategy.prune_first) -> persipubsub.control.Control:
HighWaterMark = persipubsub.queue.HighWaterMark(),
strategy: persipubsub.queue.Strategy = persipubsub.queue.
Strategy.prune_first) -> persipubsub.control.Control:
"""
Fabricate a new control.
Expand Down Expand Up @@ -69,7 +69,7 @@ def new_subscriber(self,
:return: Subscriber to receive messages
"""
subscriber = persipubsub.subscriber.Subscriber()
subscriber.init(sub_id=identifier, path=self.path)
subscriber.init(identifier=identifier, path=self.path)
return subscriber


Expand Down
Loading

0 comments on commit 271cead

Please sign in to comment.