From 6a9984ed7856459b82417659effa9ec3553a556b Mon Sep 17 00:00:00 2001 From: Neptune Date: Wed, 23 Oct 2024 14:40:11 -0500 Subject: [PATCH] Add new conditional parameter for read (#322) * Add new conditional parameter for read * Add docs for new read parameter * Fix cases where message is NULL when reading with conditional * Add missing function ending * Fix tests --- docs/api/sql/functions.md | 22 ++++- pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql | 105 ++++++++++++++++++++++ pgmq-extension/sql/pgmq.sql | 20 +++-- pgmq-extension/test/expected/base.out | 26 +++++- pgmq-extension/test/sql/base.sql | 14 ++- 5 files changed, 174 insertions(+), 13 deletions(-) diff --git a/docs/api/sql/functions.md b/docs/api/sql/functions.md index f5612293..0091712e 100644 --- a/docs/api/sql/functions.md +++ b/docs/api/sql/functions.md @@ -79,7 +79,8 @@ Read 1 or more messages from a queue. The VT specifies the amount of time in sec pgmq.read( queue_name text, vt integer, - qty integer) + qty integer, + conditional jsonb DEFAULT '{}') RETURNS SETOF pgmq.message_record @@ -92,8 +93,11 @@ RETURNS SETOF pgmq.message_record | queue_name | text | The name of the queue | | vt | integer | Time in seconds that the message become invisible after reading | | qty | integer | The number of messages to read from the queue. Defaults to 1 | +| conditional | jsonb | Filters the messages by their json content. Defaults to '{}' - no filtering | -Example: +Examples: + +Read messages from a queue ```sql select * from pgmq.read('my_queue', 10, 2); @@ -104,6 +108,16 @@ select * from pgmq.read('my_queue', 10, 2); (2 rows) ``` +Read a message from a queue with message filtering + +```sql +select * from pgmq.read('my_queue', 10, 2, '{"hello": "world_1"}'); + msg_id | read_ct | enqueued_at | vt | message +--------+---------+-------------------------------+-------------------------------+---------------------- + 2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"} +(1 row) +``` + --- ### read_with_poll @@ -119,7 +133,8 @@ Same as read(). Also provides convenient long-poll functionality. vt integer, qty integer, max_poll_seconds integer DEFAULT 5, - poll_interval_ms integer DEFAULT 100 + poll_interval_ms integer DEFAULT 100, + conditional jsonb DEFAULT '{}' ) RETURNS SETOF pgmq.message_record @@ -134,6 +149,7 @@ RETURNS SETOF pgmq.message_record | qty | integer | The number of messages to read from the queue. Defaults to 1. | | max_poll_seconds | integer | Time in seconds to wait for new messages to reach the queue. Defaults to 5. | | poll_interval_ms | integer | Milliseconds between the internal poll operations. Defaults to 100. | +| conditional | jsonb | Filters the messages by their json content. Defaults to '{}' - no filtering | Example: diff --git a/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql b/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql index 39abc086..6a007640 100644 --- a/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql +++ b/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql @@ -1,3 +1,108 @@ +-- read +-- reads a number of messages from a queue, setting a visibility timeout on them +DROP FUNCTION IF EXISTS pgmq.read(TEXT, INTEGER, INTEGER); +CREATE FUNCTION pgmq.read( + queue_name TEXT, + vt INTEGER, + qty INTEGER, + conditional JSONB DEFAULT '{}' +) +RETURNS SETOF pgmq.message_record AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + sql := FORMAT( + $QUERY$ + WITH cte AS + ( + SELECT msg_id + FROM pgmq.%I + WHERE vt <= clock_timestamp() AND CASE + WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer + ELSE 1 + END = 1 + ORDER BY msg_id ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE pgmq.%I m + SET + vt = clock_timestamp() + %L, + read_ct = read_ct + 1 + FROM cte + WHERE m.msg_id = cte.msg_id + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + $QUERY$, + qtable, conditional, qtable, make_interval(secs => vt) + ); + RETURN QUERY EXECUTE sql USING qty; +END; +$$ LANGUAGE plpgsql; + +---- read_with_poll +---- reads a number of messages from a queue, setting a visibility timeout on them +DROP FUNCTION IF EXISTS pgmq.read_with_poll(TEXT, INTEGER, INTEGER, INTEGER, INTEGER); +CREATE FUNCTION pgmq.read_with_poll( + queue_name TEXT, + vt INTEGER, + qty INTEGER, + max_poll_seconds INTEGER DEFAULT 5, + poll_interval_ms INTEGER DEFAULT 100, + conditional JSONB DEFAULT '{}' +) +RETURNS SETOF pgmq.message_record AS $$ +DECLARE + r pgmq.message_record; + stop_at TIMESTAMP; + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds); + LOOP + IF (SELECT clock_timestamp() >= stop_at) THEN + RETURN; + END IF; + + sql := FORMAT( + $QUERY$ + WITH cte AS + ( + SELECT msg_id + FROM pgmq.%I + WHERE vt <= clock_timestamp() AND CASE + WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer + ELSE 1 + END = 1 + ORDER BY msg_id ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE pgmq.%I m + SET + vt = clock_timestamp() + %L, + read_ct = read_ct + 1 + FROM cte + WHERE m.msg_id = cte.msg_id + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + $QUERY$, + qtable, conditional, qtable, make_interval(secs => vt) + ); + + FOR r IN + EXECUTE sql USING qty + LOOP + RETURN NEXT r; + END LOOP; + IF FOUND THEN + RETURN; + ELSE + PERFORM pg_sleep(poll_interval_ms / 1000); + END IF; + END LOOP; +END; +$$ LANGUAGE plpgsql; + DROP FUNCTION IF EXISTS pgmq.drop_queue(TEXT, BOOLEAN); CREATE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN) diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index b1182d20..87c29e25 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -57,7 +57,8 @@ $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq.read( queue_name TEXT, vt INTEGER, - qty INTEGER + qty INTEGER, + conditional JSONB DEFAULT '{}' ) RETURNS SETOF pgmq.message_record AS $$ DECLARE @@ -70,7 +71,10 @@ BEGIN ( SELECT msg_id FROM pgmq.%I - WHERE vt <= clock_timestamp() + WHERE vt <= clock_timestamp() AND CASE + WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer + ELSE 1 + END = 1 ORDER BY msg_id ASC LIMIT $1 FOR UPDATE SKIP LOCKED @@ -83,7 +87,7 @@ BEGIN WHERE m.msg_id = cte.msg_id RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; $QUERY$, - qtable, qtable, make_interval(secs => vt) + qtable, conditional, qtable, make_interval(secs => vt) ); RETURN QUERY EXECUTE sql USING qty; END; @@ -96,7 +100,8 @@ CREATE FUNCTION pgmq.read_with_poll( vt INTEGER, qty INTEGER, max_poll_seconds INTEGER DEFAULT 5, - poll_interval_ms INTEGER DEFAULT 100 + poll_interval_ms INTEGER DEFAULT 100, + conditional JSONB DEFAULT '{}' ) RETURNS SETOF pgmq.message_record AS $$ DECLARE @@ -117,7 +122,10 @@ BEGIN ( SELECT msg_id FROM pgmq.%I - WHERE vt <= clock_timestamp() + WHERE vt <= clock_timestamp() AND CASE + WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer + ELSE 1 + END = 1 ORDER BY msg_id ASC LIMIT $1 FOR UPDATE SKIP LOCKED @@ -130,7 +138,7 @@ BEGIN WHERE m.msg_id = cte.msg_id RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; $QUERY$, - qtable, qtable, make_interval(secs => vt) + qtable, conditional, qtable, make_interval(secs => vt) ); FOR r IN diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index 3af09001..7426fdaa 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -64,9 +64,16 @@ SELECT * from pgmq.send('test_default_queue', '{"hello": "world"}'); (1 row) -- read message --- vt=2, limit=1 +-- vt=0, limit=1 \set msg_id 1 -SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); +SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 0, 1); + ?column? +---------- + t +(1 row) + +-- read message using conditional +SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1, '{"hello": "world"}'); ?column? ---------- t @@ -93,6 +100,21 @@ SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue', 10, 1, 10 t (1 row) +-- set VT to 5 seconds again for another read_with_poll test +SELECT vt > clock_timestamp() + '4 seconds'::interval + FROM pgmq.set_vt('test_default_queue', :msg_id, 5); + ?column? +---------- + t +(1 row) + +-- read again, now using poll to block until message is ready +SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue', 10, 1, 10, 100, '{"hello": "world"}'); + ?column? +---------- + t +(1 row) + -- after reading it, set VT to now SELECT msg_id = :msg_id FROM pgmq.set_vt('test_default_queue', :msg_id, 0); ?column? diff --git a/pgmq-extension/test/sql/base.sql b/pgmq-extension/test/sql/base.sql index 3e03fabb..d5606631 100644 --- a/pgmq-extension/test/sql/base.sql +++ b/pgmq-extension/test/sql/base.sql @@ -25,9 +25,12 @@ SELECT pgmq.create('test_default_queue'); SELECT * from pgmq.send('test_default_queue', '{"hello": "world"}'); -- read message --- vt=2, limit=1 +-- vt=0, limit=1 \set msg_id 1 -SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); +SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 0, 1); + +-- read message using conditional +SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1, '{"hello": "world"}'); -- set VT to 5 seconds SELECT vt > clock_timestamp() + '4 seconds'::interval @@ -39,6 +42,13 @@ SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); -- read again, now using poll to block until message is ready SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue', 10, 1, 10); +-- set VT to 5 seconds again for another read_with_poll test +SELECT vt > clock_timestamp() + '4 seconds'::interval + FROM pgmq.set_vt('test_default_queue', :msg_id, 5); + +-- read again, now using poll to block until message is ready +SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue', 10, 1, 10, 100, '{"hello": "world"}'); + -- after reading it, set VT to now SELECT msg_id = :msg_id FROM pgmq.set_vt('test_default_queue', :msg_id, 0);