Postgres extension that enables FetchQ capabilities.
- Quick Start
- Queue Basics
- FIFO Workflow
- Planned Workflow
- Consuming a Queue
- Handling a Document
- Data Structure
- Fail-safe Mechanism
- Quick Start (local)
- Development
- API
Fetchq is distributed as a Docker image, you can download from: https://hub.docker.com/r/fetchq/fetchq
You can run it from any Docker-enabled machine with the following command:
docker run --rm \
--name fetchq \
-p 5432:5432 \
-e "POSTGRES_PASSWORD=postgres" \
fetchq/fetchq:13.3-4.0.1
This runs a PostgreSQL instance with Fetchq installed and ready to work.
The first step in using Fetchq is to assert a queue:
SELECT * FROM fetchq.queue_create('q1');
This command will return a queue_id
and a was_created
flag that tells you whether the queue did exists or not.
Once you are done with the queue and want to drop it, you can run the following command:
SELECT * FROM fetchq.queue_drop('q1');
First-in-first-out is a classic queue paradigm where a document is appendend at the end of the queue, and the queue is processed from the top.
In Fetchq a document is a simple JSON and you can append it using the following command:
SELECT * FROM fetchq.doc_append('q1', '{"foo": 123}', 0, 0);
The signature of the function is:
fetchq.doc_append(
queueName,
document,
version,
priority
): subject
NOTE: more about version and priority later.
With this method you and append the same document multiple times, if you need to insert unique documents please refer to the next paragraph.
The following method inserts a document in the queue at an arbitrary point that is decided by the expected execution date:
SELECT * FROM fetchq.doc_push('q1', 'doc1', 0, 0, NOW() + INTERVAL '1m', '{"foo": 123}');
The signature of the function is:
fetchq.doc_push(
queueName,
subject,
version,
priority,
nextIteration,
document
): queued_docs
--
When you insert a document using the doc_push
method you are in control of its subject. That is a string and it is unique in the queue.
If you try to push the same subject twice, the second request will silently fail and you get a queued_docs=0
response.
The nextIteration
is a point in time before which it is guaranteed that the document will NOT be executed.
As any queue systems, documents get processed by some workers. So Fetchq will make the document visible for processing only when nextIteration
is in the past.
👉 A classic trick that I use when I want to run a document righ away in a planned queue is to set this parameter some hundred centuries in the past.
The version parameter is an integer that should refer to the document's payload schema.
When you ask Fetchq for a document to process, you can specify the version you are interesed into.
👉 This makes it possible to progressively migrate document's payload schemas through time.
The priority is an integer that is used to choose which document to iterate first.
The higher the priority, the earlier the processing.
The priority is independend from the nextIteration
. Among all the documents due for execution, documents with the higher priority get processed first.
The reason behind the "fetch" in "Fetchq" is that basic clients implement a simple polling mechanism to ask:
what do I do now?
A lot of efforts were made to make this question easy to answer from the db point of view. Partial indexes were the key to success.
Here is the query that yelds documents due for execution:
SELECT * FROM fetchq.doc_pick('q1', 0, 1, '5s');
The signature of the function is:
fetchq.doc_pick(
queueName,
version,
limit,
lockDuration
): documentRow
When you pick a document you can target a spefici version of the payload. This is just a filter on the queue that make sure that you get documents of a particular type that you expect.
A document's payload is stored in JSONB format, you can manipulate it, add indexes to it and use it for very complex things... but you can't enforce its structure. It's JSON! The version is a small tool to circumvent this issue.
Set the maximum amount of documents that you plan to pick for execution.
It's the maximum expected execution time. It's expressed as PostgreSQL interval like 5s
or 1 minute
.
Once you are done processin the document, you should communicate back to Fetchq what do you plan to do with it.
Here are the options:
[[ TO BE COMPLETED ]]
SELECT * FROM fetchq.doc_drop('q1', 'doc1');
[[ TO BE COMPLETED ]]
SELECT * FROM fetchq.doc_complete('q1', 'doc1');
[[ TO BE COMPLETED ]]
SELECT * FROM fetchq.doc_reschedule('q1', 'doc1', NOW() + INTERVAL '1d');
[[ TO BE COMPLETED ]]
SELECT * FROM fetchq.doc_kill('q1', 'doc1');
[[ TO BE COMPLETED ]]
Fetchq at its core is just a clever data structure and a bunch of indexes. It doent's "live" inside PostgreSQL.
In order to update document's statuses and prepare them for execution, there is the need to run some maintenance jobs periodically.
The following command will execute the maintenance job on every queue in the database, for a maximum of 100 documents in each queue:
SELECT * FROM fetchq.mnt_run_all(100);
NOTE: A document that is scheduled for future execution must go through a maintenance iteration before it becomes available for processing.
👉 A classic Fetchq client runs this job in the background.
The more often you run this job, the more responsive the queue. It means that documents that becomes "due for execution" are made "available for execution" quickly.
That comes with a cost in terms of memory and, mostly, CPU. When running Fetchq in production we suggest to monitor its server's memory and CPU so to find a good balance between performances and resources consumption.
Fetchq ships with a fail-safe mechanism that makes sure that a document gets re-executed in case of failure. It's based on a clever usage of the document's status
and nextExecution
.
When a document gets picked its status
is set to "active" and its nextIteration
is set in the future for the amount of time specified by the lockDuration
.
The maintenance job monitor the queue's data for active documents who's nextIteration
is overdue. Those documents are considered "orphans", that is, documents that were left pending without an explicit handling action.
When this condition presents, Fetchq re-instate those documents as "pending" and they will get processed once again.
Fetchq will try to re-iterate an orphan document for a maximum of 5 times, then will mark that document's status
as "killed" and stop trying.
You can change this threshold for a specific queue:
SELECT * FROM fetchq.queue_set_max_attempts('q1', 3);
[[ TO BE COMPLETED ]]
You can easily run the production version of Fetchq using Docker on a Linux machine:
make production
At this point you can connect to:
postgres://postgres:${POSTGRES_PASSWORD:-postgres}@postgres:5432/postgres
This will give you an empty PostgreSQL instance with Fetchq installed as an extension and initialized as well.
From here, you can just start creating queues and handling documents:
select * from fetchq.queue_create('foo');
select * from fetchq.doc_append('foo', '{"a":1}', 0, 0);
make start
will run postgresmake stop
will kill postgresmake test-run
will build the extension and run the tests against a running Postgres instancemake test
runs a full test agains a newly created database instancemake init
re-initialize a running Postgres instance
In order to actually run the tests while develping:
make reset
make start-pg
make test-run