Skip to content

Commit

Permalink
implement partition for listen
Browse files Browse the repository at this point in the history
  • Loading branch information
rerpha committed Dec 23, 2024
1 parent af008b8 commit 737677f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
9 changes: 7 additions & 2 deletions src/saluki/listen.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import logging

from confluent_kafka import Consumer
from confluent_kafka import Consumer, TopicPartition
from saluki import try_to_deserialise_message

logger = logging.getLogger("saluki")


def listen(broker: str, topic: str) -> None:
def listen(broker: str, topic: str, partition: int | None = None) -> None:
"""
Listen to a topic and deserialise each message
:param broker: the broker address, including the port
:param topic: the topic to use
:param partition: the partition to listen to (default is all partitions in a given topic)
:return: None
"""
c = Consumer(
Expand All @@ -20,6 +21,8 @@ def listen(broker: str, topic: str) -> None:
}
)
c.subscribe([topic])
if partition is not None:
c.assign([TopicPartition(topic, partition)])
try:
logger.info(f"listening to {broker}/{topic}")
while True:
Expand All @@ -29,6 +32,8 @@ def listen(broker: str, topic: str) -> None:
if msg.error():
logger.error("Consumer error: {}".format(msg.error()))
continue
if partition is not None and msg.partition() != partition:
continue
deserialised = try_to_deserialise_message(msg.value())
logger.info(f"{msg.offset()}: {deserialised}")
except KeyboardInterrupt:
Expand Down
19 changes: 13 additions & 6 deletions src/saluki/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@ def main():
parent_parser = argparse.ArgumentParser(add_help=False)
parent_parser.add_argument("-b", "--broker", required=True, type=str)
parent_parser.add_argument("-t", "--topic", required=True, type=str)
parent_parser.add_argument(
"-p", "--partition", required=False, type=int, default=None
)
parent_parser.add_argument(
"-X",
"--kafka-config",
help="kafka options to pass through to librdkafka",
required=False,
default="",
)
parent_parser.add_argument(
"-f",
"-l",
"--log-file",
help="filename to output all data to",
required=False,
default=None,
Expand All @@ -55,9 +60,11 @@ def main():
_CONSUME, help="consumer mode", parents=[parent_parser, consumer_parser]
)
consumer_mode_parser.add_argument(
"-m", help="How many messages to go back", type=int
"-m", "--messages", help="How many messages to go back", type=int, required=True
)
consumer_mode_parser.add_argument(
"-o", "--offset", help="offset to consume from", type=int, required=True
)
consumer_mode_parser.add_argument("-o", help="offset to consume from", type=int)
consumer_mode_parser.add_argument(
"-s", "--schema", required=False, default="auto", type=str
)
Expand All @@ -76,11 +83,11 @@ def main():
sys.exit(1)
args = parser.parse_args()

if args.f:
logger.addHandler(FileHandler(args.f.name))
if args.log_file:
logger.addHandler(FileHandler(args.log_file.name))

if args.command == _LISTEN:
listen(args.broker, args.topic)
listen(args.broker, args.topic, args.partition)
elif args.command == _CONSUME:
raise NotImplementedError
elif args.command == _PRODUCE:
Expand Down

0 comments on commit 737677f

Please sign in to comment.