Skip to content

Commit

Permalink
add consume mode
Browse files Browse the repository at this point in the history
  • Loading branch information
rerpha committed Dec 23, 2024
1 parent 737677f commit 82d5435
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 10 deletions.
71 changes: 71 additions & 0 deletions src/saluki/consume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import logging

from confluent_kafka import Consumer, TopicPartition
from saluki import try_to_deserialise_message

logger = logging.getLogger("saluki")


def consume(
broker: str,
topic: str,
partition: int = 0,
num_messages: int = 1,
offset: int | None = None,
go_forwards: bool = False,
) -> None:
"""
consume from a topic and deserialise each message
:param broker: the broker address, including the port
:param topic: the topic to use
:param partition: the partition to listen to (default is all partitions in a given topic)
:param num_messages: number of messages to consume
:param offset: offset to consume from/to
:param go_forwards: whether to consume forwards or backwards
:return: None
"""
c = Consumer(
{
"bootstrap.servers": broker,
"group.id": "saluki",
}
)

if go_forwards:
if offset is None:
logger.error("Can't go forwards without an offset")
return
start = offset
else:
if offset is not None:
start = offset - num_messages
else:
start = (
c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[
1
]
- num_messages
)

logger.info(f"starting at {start}")
c.assign([TopicPartition(topic, partition, start)])

try:
logger.info(f"consuming {num_messages} messages")
msgs = c.consume(num_messages)
for msg in msgs:
if msg is None:
continue
if msg.error():
logger.error("Consumer error: {}".format(msg.error()))
continue
if partition is not None and msg.partition() != partition:
continue
deserialised = try_to_deserialise_message(msg.value())
logger.info(f"{msg.offset()}: {deserialised}")
except Exception as e:
logger.error(e)
finally:
logger.debug(f"closing consumer {c}")
c.close()
4 changes: 2 additions & 2 deletions src/saluki/listen.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def listen(broker: str, topic: str, partition: int | None = None) -> None:
deserialised = try_to_deserialise_message(msg.value())
logger.info(f"{msg.offset()}: {deserialised}")
except KeyboardInterrupt:
logging.debug("finished listening")
logger.debug("finished listening")
finally:
logging.debug(f"closing consumer {c}")
logger.debug(f"closing consumer {c}")
c.close()
46 changes: 38 additions & 8 deletions src/saluki/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import logging
from logging import FileHandler


from saluki.consume import consume
from saluki.listen import listen

logger = logging.getLogger("saluki")
Expand All @@ -24,9 +26,7 @@ def main():
parent_parser = argparse.ArgumentParser(add_help=False)
parent_parser.add_argument("-b", "--broker", required=True, type=str)
parent_parser.add_argument("-t", "--topic", required=True, type=str)
parent_parser.add_argument(
"-p", "--partition", required=False, type=int, default=None
)

parent_parser.add_argument(
"-X",
"--kafka-config",
Expand Down Expand Up @@ -60,23 +60,46 @@ def main():
_CONSUME, help="consumer mode", parents=[parent_parser, consumer_parser]
)
consumer_mode_parser.add_argument(
"-m", "--messages", help="How many messages to go back", type=int, required=True
"-m",
"--messages",
help="How many messages to go back",
type=int,
required=False,
default=1,
)
consumer_mode_parser.add_argument(
"-o", "--offset", help="offset to consume from", type=int, required=True
"-o", "--offset", help="offset to consume from", type=int, required=False
)
consumer_mode_parser.add_argument(
"-s", "--schema", required=False, default="auto", type=str
)
consumer_mode_parser.add_argument(
"-g", "--go-forwards", required=False, action="store_true"
)
consumer_mode_parser.add_argument(
"-p", "--partition", required=False, type=int, default=0
)

_ = sub_parsers.add_parser(
listen_parser = sub_parsers.add_parser(
_LISTEN,
help="listen mode - listen until KeyboardInterrupt",
parents=[parent_parser, consumer_parser],
)
listen_parser.add_argument(
"-p", "--partition", required=False, type=int, default=None
)

# Producer mode - add this later
_ = sub_parsers.add_parser(_PRODUCE, help="producer mode", parents=[parent_parser])
producer_parser = sub_parsers.add_parser(
_PRODUCE, help="producer mode", parents=[parent_parser]
)
producer_parser.add_argument(
"-f",
"--filename",
help="JSON file to produce",
required=True,
type=argparse.FileType("r"),
)

if len(sys.argv) == 1:
parser.print_help()
Expand All @@ -89,7 +112,14 @@ def main():
if args.command == _LISTEN:
listen(args.broker, args.topic, args.partition)
elif args.command == _CONSUME:
raise NotImplementedError
consume(
args.broker,
args.topic,
args.partition,
args.messages,
args.offset,
args.go_forwards,
)
elif args.command == _PRODUCE:
raise NotImplementedError

Expand Down

0 comments on commit 82d5435

Please sign in to comment.