Skip to content

Commit

Permalink
Add new conditional parameter for read (#322)
Browse files Browse the repository at this point in the history
* Add new conditional parameter for read

* Add docs for new read parameter

* Fix cases where message is NULL when reading with conditional

* Add missing function ending

* Fix tests
  • Loading branch information
Neptune650 authored Oct 23, 2024
1 parent be39b73 commit 6a9984e
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 13 deletions.
22 changes: 19 additions & 3 deletions docs/api/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ Read 1 or more messages from a queue. The VT specifies the amount of time in sec
pgmq.read(
queue_name text,
vt integer,
qty integer)
qty integer,
conditional jsonb DEFAULT '{}')

RETURNS SETOF <a href="../types/#message_record">pgmq.message_record</a>
</code>
Expand All @@ -92,8 +93,11 @@ RETURNS SETOF <a href="../types/#message_record">pgmq.message_record</a>
| queue_name | text | The name of the queue |
| vt | integer | Time in seconds that the message become invisible after reading |
| qty | integer | The number of messages to read from the queue. Defaults to 1 |
| conditional | jsonb | Filters the messages by their json content. Defaults to '{}' - no filtering |

Example:
Examples:

Read messages from a queue

```sql
select * from pgmq.read('my_queue', 10, 2);
Expand All @@ -104,6 +108,16 @@ select * from pgmq.read('my_queue', 10, 2);
(2 rows)
```

Read a message from a queue with message filtering

```sql
select * from pgmq.read('my_queue', 10, 2, '{"hello": "world_1"}');
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+----------------------
2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"}
(1 row)
```

---

### read_with_poll
Expand All @@ -119,7 +133,8 @@ Same as read(). Also provides convenient long-poll functionality.
vt integer,
qty integer,
max_poll_seconds integer DEFAULT 5,
poll_interval_ms integer DEFAULT 100
poll_interval_ms integer DEFAULT 100,
conditional jsonb DEFAULT '{}'
)
RETURNS SETOF <a href="../types/#message_record">pgmq.message_record</a>
</code>
Expand All @@ -134,6 +149,7 @@ RETURNS SETOF <a href="../types/#message_record">pgmq.message_record</a>
| qty | integer | The number of messages to read from the queue. Defaults to 1. |
| max_poll_seconds | integer | Time in seconds to wait for new messages to reach the queue. Defaults to 5. |
| poll_interval_ms | integer | Milliseconds between the internal poll operations. Defaults to 100. |
| conditional | jsonb | Filters the messages by their json content. Defaults to '{}' - no filtering |

Example:

Expand Down
105 changes: 105 additions & 0 deletions pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,108 @@
-- read
-- reads a number of messages from a queue, setting a visibility timeout on them
DROP FUNCTION IF EXISTS pgmq.read(TEXT, INTEGER, INTEGER);
CREATE FUNCTION pgmq.read(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
conditional JSONB DEFAULT '{}'
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
vt = clock_timestamp() + %L,
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);
RETURN QUERY EXECUTE sql USING qty;
END;
$$ LANGUAGE plpgsql;

---- read_with_poll
---- reads a number of messages from a queue, setting a visibility timeout on them
DROP FUNCTION IF EXISTS pgmq.read_with_poll(TEXT, INTEGER, INTEGER, INTEGER, INTEGER);
CREATE FUNCTION pgmq.read_with_poll(
queue_name TEXT,
vt INTEGER,
qty INTEGER,
max_poll_seconds INTEGER DEFAULT 5,
poll_interval_ms INTEGER DEFAULT 100,
conditional JSONB DEFAULT '{}'
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
r pgmq.message_record;
stop_at TIMESTAMP;
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds);
LOOP
IF (SELECT clock_timestamp() >= stop_at) THEN
RETURN;
END IF;

sql := FORMAT(
$QUERY$
WITH cte AS
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE pgmq.%I m
SET
vt = clock_timestamp() + %L,
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);

FOR r IN
EXECUTE sql USING qty
LOOP
RETURN NEXT r;
END LOOP;
IF FOUND THEN
RETURN;
ELSE
PERFORM pg_sleep(poll_interval_ms / 1000);
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;

DROP FUNCTION IF EXISTS pgmq.drop_queue(TEXT, BOOLEAN);

CREATE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN)
Expand Down
20 changes: 14 additions & 6 deletions pgmq-extension/sql/pgmq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ $$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq.read(
queue_name TEXT,
vt INTEGER,
qty INTEGER
qty INTEGER,
conditional JSONB DEFAULT '{}'
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
Expand All @@ -70,7 +71,10 @@ BEGIN
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp()
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
Expand All @@ -83,7 +87,7 @@ BEGIN
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
$QUERY$,
qtable, qtable, make_interval(secs => vt)
qtable, conditional, qtable, make_interval(secs => vt)
);
RETURN QUERY EXECUTE sql USING qty;
END;
Expand All @@ -96,7 +100,8 @@ CREATE FUNCTION pgmq.read_with_poll(
vt INTEGER,
qty INTEGER,
max_poll_seconds INTEGER DEFAULT 5,
poll_interval_ms INTEGER DEFAULT 100
poll_interval_ms INTEGER DEFAULT 100,
conditional JSONB DEFAULT '{}'
)
RETURNS SETOF pgmq.message_record AS $$
DECLARE
Expand All @@ -117,7 +122,10 @@ BEGIN
(
SELECT msg_id
FROM pgmq.%I
WHERE vt <= clock_timestamp()
WHERE vt <= clock_timestamp() AND CASE
WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer
ELSE 1
END = 1
ORDER BY msg_id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
Expand All @@ -130,7 +138,7 @@ BEGIN
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
$QUERY$,
qtable, qtable, make_interval(secs => vt)
qtable, conditional, qtable, make_interval(secs => vt)
);
FOR r IN
Expand Down
26 changes: 24 additions & 2 deletions pgmq-extension/test/expected/base.out
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,16 @@ SELECT * from pgmq.send('test_default_queue', '{"hello": "world"}');
(1 row)

-- read message
-- vt=2, limit=1
-- vt=0, limit=1
\set msg_id 1
SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1);
SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 0, 1);
?column?
----------
t
(1 row)

-- read message using conditional
SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1, '{"hello": "world"}');
?column?
----------
t
Expand All @@ -93,6 +100,21 @@ SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue', 10, 1, 10
t
(1 row)

-- set VT to 5 seconds again for another read_with_poll test
SELECT vt > clock_timestamp() + '4 seconds'::interval
FROM pgmq.set_vt('test_default_queue', :msg_id, 5);
?column?
----------
t
(1 row)

-- read again, now using poll to block until message is ready
SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue', 10, 1, 10, 100, '{"hello": "world"}');
?column?
----------
t
(1 row)

-- after reading it, set VT to now
SELECT msg_id = :msg_id FROM pgmq.set_vt('test_default_queue', :msg_id, 0);
?column?
Expand Down
14 changes: 12 additions & 2 deletions pgmq-extension/test/sql/base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ SELECT pgmq.create('test_default_queue');
SELECT * from pgmq.send('test_default_queue', '{"hello": "world"}');

-- read message
-- vt=2, limit=1
-- vt=0, limit=1
\set msg_id 1
SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1);
SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 0, 1);

-- read message using conditional
SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1, '{"hello": "world"}');

-- set VT to 5 seconds
SELECT vt > clock_timestamp() + '4 seconds'::interval
Expand All @@ -39,6 +42,13 @@ SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1);
-- read again, now using poll to block until message is ready
SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue', 10, 1, 10);

-- set VT to 5 seconds again for another read_with_poll test
SELECT vt > clock_timestamp() + '4 seconds'::interval
FROM pgmq.set_vt('test_default_queue', :msg_id, 5);

-- read again, now using poll to block until message is ready
SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue', 10, 1, 10, 100, '{"hello": "world"}');

-- after reading it, set VT to now
SELECT msg_id = :msg_id FROM pgmq.set_vt('test_default_queue', :msg_id, 0);

Expand Down

0 comments on commit 6a9984e

Please sign in to comment.