diff --git a/src/saluki/listen.py b/src/saluki/listen.py index cb2e512..528e12c 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -1,16 +1,17 @@ import logging -from confluent_kafka import Consumer +from confluent_kafka import Consumer, TopicPartition from saluki import try_to_deserialise_message logger = logging.getLogger("saluki") -def listen(broker: str, topic: str) -> None: +def listen(broker: str, topic: str, partition: int | None = None) -> None: """ Listen to a topic and deserialise each message :param broker: the broker address, including the port :param topic: the topic to use + :param partition: the partition to listen to (default is all partitions in a given topic) :return: None """ c = Consumer( @@ -20,6 +21,8 @@ def listen(broker: str, topic: str) -> None: } ) c.subscribe([topic]) + if partition is not None: + c.assign([TopicPartition(topic, partition)]) try: logger.info(f"listening to {broker}/{topic}") while True: @@ -29,6 +32,8 @@ def listen(broker: str, topic: str) -> None: if msg.error(): logger.error("Consumer error: {}".format(msg.error())) continue + if partition is not None and msg.partition() != partition: + continue deserialised = try_to_deserialise_message(msg.value()) logger.info(f"{msg.offset()}: {deserialised}") except KeyboardInterrupt: diff --git a/src/saluki/main.py b/src/saluki/main.py index 7092563..39b982d 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -24,14 +24,19 @@ def main(): parent_parser = argparse.ArgumentParser(add_help=False) parent_parser.add_argument("-b", "--broker", required=True, type=str) parent_parser.add_argument("-t", "--topic", required=True, type=str) + parent_parser.add_argument( + "-p", "--partition", required=False, type=int, default=None + ) parent_parser.add_argument( "-X", + "--kafka-config", help="kafka options to pass through to librdkafka", required=False, default="", ) parent_parser.add_argument( - "-f", + "-l", + "--log-file", help="filename to output all data to", required=False, default=None, @@ -55,9 +60,11 @@ def main(): _CONSUME, help="consumer mode", parents=[parent_parser, consumer_parser] ) consumer_mode_parser.add_argument( - "-m", help="How many messages to go back", type=int + "-m", "--messages", help="How many messages to go back", type=int, required=True + ) + consumer_mode_parser.add_argument( + "-o", "--offset", help="offset to consume from", type=int, required=True ) - consumer_mode_parser.add_argument("-o", help="offset to consume from", type=int) consumer_mode_parser.add_argument( "-s", "--schema", required=False, default="auto", type=str ) @@ -76,11 +83,11 @@ def main(): sys.exit(1) args = parser.parse_args() - if args.f: - logger.addHandler(FileHandler(args.f.name)) + if args.log_file: + logger.addHandler(FileHandler(args.log_file.name)) if args.command == _LISTEN: - listen(args.broker, args.topic) + listen(args.broker, args.topic, args.partition) elif args.command == _CONSUME: raise NotImplementedError elif args.command == _PRODUCE: