diff --git a/src/jukebox/run_rpc_tool.py b/src/jukebox/run_rpc_tool.py index 47f861af5..40a3391b1 100644 --- a/src/jukebox/run_rpc_tool.py +++ b/src/jukebox/run_rpc_tool.py @@ -3,397 +3,487 @@ Command Line Interface to the Jukebox RPC Server A command line tool for sending RPC commands to the running jukebox app. -This uses the same interface as the WebUI. Can be used for additional control -or for debugging. - -The tool features auto-completion and command history. - -The list of available commands is fetched from the running Jukebox service. - -.. todo: - - kwargs support - +Features auto-completion, command history, and RPC command execution. +Supports JSON arguments for complex data structures. """ import argparse -import zmq +import json +from dataclasses import dataclass +from typing import List, Dict, Any, Tuple import curses import curses.ascii +import zmq import jukebox.rpc.client as rpc -# Developers note: Scripting at it's dirty end :-) +@dataclass +class CliState: + """Encapsulates CLI state and configuration""" + url: str + client: rpc.RpcClient + rpc_help: Dict[str, Dict[str, str]] = None + candidates: List[str] = None + history: List[str] = None + prompt: str = '> ' + + def __post_init__(self): + self.rpc_help = {} + self.candidates = [] + self.history = [''] + + +class CommandParser: + """Handles parsing and execution of RPC commands with JSON and quoted string support""" + + @staticmethod + def parse_command(cmd: str) -> Tuple[List[str], List[Any], Dict[str, Any]]: + """ + Parse command string into command parts, positional args, and keyword args + Returns: (command_parts, args, kwargs) + """ + # Split while preserving quotes and JSON structures + parts = CommandParser._split_preserving_json(cmd.strip()) + if not parts: + return [], [], {} + + # Split cmd on '.' into package.plugin.method + command_parts = [v for v in parts[0].split('.') if len(v) > 0] + + # Process remaining parts into args and kwargs + args = [] + kwargs = {} + seen_keys = set() # Track seen keyword argument names + + for part in parts[1:]: + # Check if part is a kwarg (contains '=') + if '=' in part: + key, value = part.split('=', 1) + key = key.strip() + value = value.strip() + + # Check for duplicate keyword arguments + if key in seen_keys: + raise ValueError(f"Duplicate keyword argument: {key}") + seen_keys.add(key) + + # Handle the value based on its format + kwargs[key] = CommandParser._parse_value(value) + else: + # Handle as positional argument + args.append(CommandParser._parse_value(part)) + + return command_parts, args, kwargs + + @staticmethod + def _parse_value(value: str) -> Any: + """Parse a value string into appropriate type""" + # Strip quotes if present (only if matching quotes at start and end) + if (value.startswith('"') and value.endswith('"')) or \ + (value.startswith("'") and value.endswith("'")): + value = value[1:-1] + + # Try to parse as JSON if it looks like JSON + if value.startswith('{') or value.startswith('['): + try: + return json.loads(value) + except json.JSONDecodeError: + # If JSON parsing fails, continue with other parsing attempts + pass + + # Try to convert to number if appropriate + return CommandParser.convert_to_number(value) + + @staticmethod + def _split_preserving_json(cmd: str) -> List[str]: # noqa: C901 + """Split command string while preserving quoted strings and JSON structures""" + parts = [] + current = [] + brace_count = 0 + bracket_count = 0 + in_single_quotes = False + in_double_quotes = False + escape = False + + for char in cmd: + if escape: + current.append(char) + escape = False + continue + + if char == '\\': + escape = True + current.append(char) + continue + + if char == '"' and not in_single_quotes: + in_double_quotes = not in_double_quotes + current.append(char) + continue + + if char == "'" and not in_double_quotes: + in_single_quotes = not in_single_quotes + current.append(char) + continue + + if not in_single_quotes and not in_double_quotes: + if char == '{': + brace_count += 1 + elif char == '}': + brace_count -= 1 + elif char == '[': + bracket_count += 1 + elif char == ']': + bracket_count -= 1 + elif char.isspace() and brace_count == 0 and bracket_count == 0: + if current: + parts.append(''.join(current)) + current = [] + continue + + current.append(char) + + if current: + parts.append(''.join(current)) + + # Validate quote matching + if in_single_quotes: + raise ValueError("Unmatched single quote") + if in_double_quotes: + raise ValueError("Unmatched double quote") + + return parts + + @staticmethod + def convert_to_number(value: str) -> Any: + """Convert string to number if possible""" + # Try integer + try: + return int(value) + except ValueError: + pass + + # Try float + try: + return float(value) + except ValueError: + pass + + # Try hex + if value.isalnum() and value.startswith('0x'): + try: + return int(value, base=16) + except ValueError: + pass + + return value -# Careful: curses and default outputs don't mix! -# In case you'll get an error, most likely your terminal may become funny -# Best bet: Just don't configure any logger at all! -# import logging -# import misc.loggingext -# logger = misc.loggingext.configure_default(logging.ERROR) +class JukeboxCli: + """Main CLI class handling user interaction and command execution""" -url: str -client: rpc.RpcClient -rpc_help = {} -candidates = [] -history = [''] -prompt = '> ' + def __init__(self, url: str): + self.state = CliState(url=url, client=rpc.RpcClient(url)) + self.command_parser = CommandParser() + def update_help(self, scr) -> None: + """Update available RPC commands from server""" + try: + rpc_help_tmp = self.state.client.enque('misc', 'rpc_cmd_help') + self.state.rpc_help = {k: rpc_help_tmp[k] for k in sorted(rpc_help_tmp.keys())} + except Exception: + self._show_connection_error(scr) + return + + # Add CLI specific commands + self._add_cli_commands() + self.state.candidates = list(self.state.rpc_help.keys()) + + def _add_cli_commands(self) -> None: + """Add CLI-specific commands to help""" + cli_commands = { + "help": {'description': "Print RPC Server command list (all commands that start with ...)", + 'signature': "(cmd_starts_with='')"}, + 'usage': {'description': "Usage help and key bindings", 'signature': "()"}, + 'exit': {'description': "Exit RPC Client", 'signature': "()"} + } + self.state.rpc_help.update(cli_commands) + + def execute_command(self, scr, cmd: str) -> None: + """Execute a command and display results""" + if not cmd.strip(): + return + + if cmd == 'help': + self._show_help(scr) + return + elif cmd == 'usage': + self._show_usage(scr) + return + elif cmd == 'exit': + return + + command_parts, args, kwargs = self.command_parser.parse_command(cmd) + + if not (2 <= len(command_parts) <= 3): + scr.addstr(":: Error = Ill-formatted command\n") + return -def add_cli(): - global rpc_help - rpc_help["help"] = {'description': "Print RPC Server command list (all commands that start with ...)", - 'signature': "(cmd_starts_with='')"} - rpc_help['usage'] = {'description': "Usage help and key bindings", 'signature': "()"} - rpc_help['exit'] = {'description': "Exit RPC Client", 'signature': "()"} + method = command_parts[2] if len(command_parts) == 3 else None + try: + response = self.state.client.enque( + command_parts[0], + command_parts[1], + method, + args=args, + kwargs=kwargs + ) + scr.addstr(f"\n:: Response =\n{response}\n\n") + except zmq.error.Again: + self._show_connection_error(scr) + except Exception as e: + scr.addstr(f":: Exception response =\n{e}\n") -def get_help(scr): - global rpc_help - global candidates - rpc_help = {} - try: - rpc_help_tmp = client.enque('misc', 'rpc_cmd_help') - except Exception: + def run(self, scr) -> None: + """Main CLI loop""" + self._setup_screen(scr) + self._show_welcome(scr) + self.update_help(scr) + self._show_usage(scr) + + cmd = '' + while cmd != 'exit': + cmd = self._get_input(scr) + scr.addstr("\n") + self.execute_command(scr, cmd) + + def _setup_screen(self, scr) -> None: + """Configure screen settings""" + scr.idlok(True) + scr.scrollok(True) + curses.noecho() + + def _show_connection_error(self, scr) -> None: + """Display connection error message""" scr.addstr("\n\n" + '-' * 70 + "\n") scr.addstr("Could not reach RPC Server. Jukebox running? Correct Port?\n") scr.addstr('-' * 70 + "\n\n") scr.refresh() - else: - # Sort the commands (Python 3.7 has ordered entries in dicts!) - rpc_help = {k: rpc_help_tmp[k] for k in sorted(rpc_help_tmp.keys())} - add_cli() - candidates = rpc_help.keys() - - -def format_help(scr, topic): - global rpc_help - # Always update help, in case Jukebox App has been restarted in between - scr.erase() - get_help(scr) - max_y, max_x = scr.getmaxyx() - scr.addstr("Available commands:\n\n") - for key, value in rpc_help.items(): - sign: str = value['signature'] - sign = sign[sign.find('('):] - func = f"{key}{sign}" - # print(f"{func:50}: {value['description']}") - if key.startswith(topic): - scr.addstr(f"{func:50}: {value['description']}\n") - [y, x] = scr.getyx() - if y == max_y - 1: - scr.addstr("--HIT A KEY TO CONTINUE--") - scr.getch() - scr.erase() - scr.addstr("\n") - scr.refresh() - - -def format_welcome(scr): - scr.addstr("\n\n" + '-' * 70 + "\n") - scr.addstr("RPC Tool\n") - scr.addstr('-' * 70 + "\n") - scr.addstr(f"Connection url: '{client.address}'\n") - try: - jukebox_version = client.enque('misc', 'get_version') - except Exception: - jukebox_version = "unknown" - scr.addstr(f"Jukebox version: {jukebox_version}\n") - scr.addstr(f"Pyzmq version: {zmq.pyzmq_version()}; ZMQ version: {zmq.zmq_version()}; has draft API: {zmq.DRAFT_API}\n") - scr.addstr('-' * 70 + "\n") - - -def format_usage(scr): - scr.addstr("\n\nUsage:\n") - scr.addstr(" > cmd [arg1] [arg2] [arg3]\n") - scr.addstr("e.g.\n") - scr.addstr(' > volume.ctrl.set_volume 50\n') - - # General content playback examples (for webapp/CLI) - scr.addstr("\nPlaying content (using play_content):\n") - scr.addstr(' > player.ctrl.play_content \'{"artist":"Pink Floyd","album":"The Wall"}\' album\n') - scr.addstr(' > player.ctrl.play_content "/music/classical" folder true\n') - scr.addstr(' > player.ctrl.play_content "/music/favorites/track.mp3" single\n') - - # Update the reader examples section - scr.addstr("\nPlaying content from physical readers (using play_from_reader):\n") - scr.addstr(' > player.ctrl.play_from_reader \'{"artist":"Pink Floyd","album":"The Wall"}\' album false toggle\n') - scr.addstr(' > player.ctrl.play_from_reader "/music/classical" folder true replay\n') - scr.addstr(' > player.ctrl.play_from_reader "/music/stories" folder false none\n') - - scr.addstr("\n") - scr.addstr("Quoting:\n") - scr.addstr(" - Use single quotes (\') for JSON content\n") - scr.addstr(' - Use double quotes (") for simple string arguments containing spaces\n') - scr.addstr(' - Escape quotes within quoted strings with \\\n') - scr.addstr("\n") - scr.addstr("Content Types:\n") - scr.addstr(" - album: requires JSON with artist and album\n") - scr.addstr(" - single: direct path to audio file\n") - scr.addstr(" - folder: path to folder (optional recursive flag)\n") - scr.addstr("\n") - scr.addstr("Second Swipe Actions (for play_card):\n") - scr.addstr(" - none: disable second swipe\n") - scr.addstr(" - toggle: toggle play/pause\n") - scr.addstr(" - play: start playing\n") - scr.addstr(" - skip: next track\n") - scr.addstr(" - rewind: restart playlist\n") - scr.addstr(" - replay: restart folder\n") - scr.addstr(" - replay_if_stopped: restart if stopped\n") - scr.addstr("\n") - scr.addstr("Use for auto-completion of commands!\n") - scr.addstr("Use / for command history!\n") - scr.addstr("\n") - scr.addstr("Type help , to get a list of all commands'\n") - scr.addstr("Type usage , to get this usage help'\n") - scr.addstr("\n") - scr.addstr("After Jukebox app restart, call help once to update command list from jukebox app\n") - scr.addstr("\n") - scr.addstr("To exit, press Ctrl-D or type 'exit'\n") - scr.addstr("\n") - scr.refresh() - - -def get_common_beginning(strings): - """ - Return the strings that are common to the beginning of each string in the strings list. - """ - result = [] - limit = min([len(s) for s in strings]) - for i in range(limit): - chs = set([s[i] for s in strings]) - if len(chs) == 1: - result.append(chs.pop()) - else: - break - return ''.join(result) - - -def autocomplete(msg): - # logger.debug(f"Autocomplete {msg}") - # Get all stings that match the beginning - # candidates = ["ap1", 'ap2', 'appbbb3', 'appbbb4', 'appbbb5', 'appbbb6', 'exit'] - matches = [s for s in candidates if s.startswith(msg)] - if len(matches) == 0: - # Matches is empty: nothing found - return msg, matches - common = get_common_beginning(matches) - return common, matches - - -def is_printable(ch: int): - return 32 <= ch <= 127 - - -def reprompt(scr, msg, y, x): - scr.move(y, 0) - scr.clrtoeol() - scr.addstr(prompt) - scr.addstr(msg) - scr.move(y, x) - - -def get_input(scr): # noqa: C901 - curses.noecho() - ch = 0 - msg = '' - ihist = '' - hidx = len(history) - [y, x] = scr.getyx() - reprompt(scr, msg, y, len(prompt) + len(msg)) - scr.refresh() - while ch != ord(b'\n'): + + def _show_welcome(self, scr) -> None: + """Display welcome message and connection information""" + scr.addstr("\n\n" + '-' * 70 + "\n") + scr.addstr("RPC Tool\n") + scr.addstr('-' * 70 + "\n") + scr.addstr(f"Connection url: '{self.state.client.address}'\n") + try: - ch = scr.getch() - except KeyboardInterrupt: - msg = 'exit' - break - [y, x] = scr.getyx() - pos = x - len(prompt) - if ch == ord(b'\t'): - msg, matches = autocomplete(msg) - if len(matches) > 1: - scr.addstr('\n') - scr.addstr(', '.join(matches)) - scr.addstr('\n') - scr.clrtobot() - reprompt(scr, msg, y, len(prompt) + len(msg)) - if ch == ord(b'\n'): - break - if ch == 4: - msg = 'exit' - break - elif ch == curses.KEY_BACKSPACE or ch == 127: - if pos > 0: - scr.delch(y, x - 1) - msg = msg[0:pos - 1] + msg[pos:] - elif ch == curses.KEY_DC: - scr.delch(y, x) - msg = msg[0:pos] + msg[pos + 1:] - elif ch == curses.KEY_LEFT: - if pos > 0: - scr.move(y, x - 1) - elif ch == curses.KEY_RIGHT: - if pos < len(msg): - scr.move(y, x + 1) - elif ch == curses.KEY_HOME: - scr.move(y, len(prompt)) - elif ch == curses.KEY_END: - scr.move(y, len(prompt) + len(msg)) - elif ch == curses.KEY_UP: - if hidx == len(history): - ihist = msg - hidx = max(hidx - 1, 0) - msg = history[hidx] - reprompt(scr, msg, y, len(prompt) + len(msg)) - elif ch == curses.KEY_DOWN: - hidx = min(hidx + 1, len(history)) - if hidx == len(history): - msg = ihist - else: - msg = history[hidx] - reprompt(scr, msg, y, len(prompt) + len(msg)) - elif is_printable(ch): - msg = msg[0:pos] + curses.ascii.unctrl(ch) + msg[pos:] - reprompt(scr, msg, y, x + 1) - # else: - # print(f" {ch} -- {type(ch)}") + jukebox_version = self.state.client.enque('misc', 'get_version') + except Exception: + jukebox_version = "unknown" + + scr.addstr(f"Jukebox version: {jukebox_version}\n") + scr.addstr(f"Pyzmq version: {zmq.pyzmq_version()}; ZMQ version: {zmq.zmq_version()}; " + f"has draft API: {zmq.DRAFT_API}\n") + scr.addstr('-' * 70 + "\n") scr.refresh() - scr.refresh() - history.append(msg) - return msg + def _show_help(self, scr, topic: str = '') -> None: + """Display help information for commands""" + scr.erase() + self.update_help(scr) + max_y, max_x = scr.getmaxyx() + scr.addstr("Available commands:\n\n") + + for key, value in self.state.rpc_help.items(): + if not key.startswith(topic): + continue + + sign: str = value['signature'] + sign = sign[sign.find('('):] + func = f"{key}{sign}" + scr.addstr(f"{func:50}: {value['description']}\n") + + # Handle pagination + y, x = scr.getyx() + if y == max_y - 1: + scr.addstr("--HIT A KEY TO CONTINUE--") + scr.getch() + scr.erase() -def tonum(string_value): - ret = string_value - try: - ret = int(string_value) - except ValueError: - pass - else: - return ret - try: - ret = float(string_value) - except ValueError: - pass - else: - return ret - if string_value.isalnum() and string_value.startswith('0x'): - try: - ret = int(string_value, base=16) - except ValueError: - pass - else: - return ret - return ret - - -def main(scr): - global candidates - scr.idlok(True) - scr.scrollok(True) - format_welcome(scr) - get_help(scr) - format_usage(scr) - cmd = '' - while cmd != 'exit': - cmd = get_input(scr) scr.addstr("\n") - # Split on whitespaces to separate cmd and arg list - dec = [v for v in cmd.strip().split(' ') if len(v) > 0] - if len(dec) == 0: - continue - elif dec[0] == 'help': - topic = '' - if len(dec) > 1: - topic = dec[1] - format_help(scr, topic) - continue - elif dec[0] == 'usage': - format_usage(scr) - continue - # scr.addstr(f"\n{cmd}\n") - # Split cmd on '.' into package.plugin.method - # Remove duplicate '.' along the way - sl = [v for v in dec[0].split('.') if len(v) > 0] - fargs = [tonum(a) for a in dec[1:]] - scr.addstr(f"\n:: Command = {sl}, args = {fargs}\n") - response = None - method = None - if not (2 <= len(sl) <= 3): - scr.addstr(":: Error = Ill-formatted command\n") - continue - if len(sl) == 3: - method = sl[2] - try: - response = client.enque(sl[0], sl[1], method, args=fargs) - except zmq.error.Again: - scr.addstr("\n\n" + '-' * 70 + "\n") - scr.addstr("Could not reach RPC Server. Jukebox running? Correct Port?\n") - scr.addstr('-' * 70 + "\n\n") - scr.refresh() - except Exception as e: - scr.addstr(f":: Exception response =\n{e}\n") - else: - scr.addstr(f"\n:: Response =\n{response}\n\n") + scr.refresh() + + def _show_usage(self, scr) -> None: + """Display usage information and key bindings""" + scr.addstr("\n\nUsage:\n") + scr.addstr(" > cmd [arg1] [arg2] [kwarg1=value1]\n") + scr.addstr("Examples:\n") + scr.addstr(" > volume.ctrl.set_volume 50\n") + example = ( + ' > player.ctrl.play_from_reader ' + 'content={"albumartist": "Taylor Swift", "album": "Fearless"} ' + 'content_type=album\n' + ) + scr.addstr(example) + scr.addstr("\nSupported argument formats:\n") + scr.addstr(" - Simple values (strings, numbers)\n") + scr.addstr(" - Hexadecimal numbers (0x...)\n") + scr.addstr(" - JSON objects and arrays for keyword arguments\n") + scr.addstr("Note: JSON must be valid and properly quoted\n") + scr.addstr("\n") + scr.addstr("Use for auto-completion of commands!\n") + scr.addstr("Use / for command history!\n") + scr.addstr("\n") + scr.addstr("Type help , to get a list of all commands\n") + scr.addstr("Type usage , to get this usage help\n") + scr.addstr("\n") + scr.addstr("To exit, press Ctrl-D or type 'exit'\n") + scr.addstr("\n") + scr.refresh() + def _get_common_beginning(self, strings: List[str]) -> str: + """Find common prefix among a list of strings""" + if not strings: + return "" -def runcmd(cmd): - """ - Just run a command. - Right now duplicates more or less main() - :todo remove duplication of code - """ - - # Split on whitespaces to separate cmd and arg list - dec = [v for v in cmd.strip().split(' ') if len(v) > 0] - if len(dec) == 0: - return - # Split cmd on '.' into package.plugin.method - # Remove duplicate '.' along the way - sl = [v for v in dec[0].split('.') if len(v) > 0] - fargs = [tonum(a) for a in dec[1:]] - response = None - method = None - if not (2 <= len(sl) <= 3): - print(":: Error = Ill-formatted command\n") - return - if len(sl) == 3: - method = sl[2] - try: - response = client.enque(sl[0], sl[1], method, args=fargs) - except zmq.error.Again: - print("\n\n" + '-' * 70 + "\n") - print("Could not reach RPC Server. Jukebox running? Correct Port?\n") - print('-' * 70 + "\n\n") - return - except Exception as e: - print(f":: Exception response =\n{e}\n") - return - else: - print(f"\n:: Response =\n{response}\n\n") + result = [] + limit = min(len(s) for s in strings) + for i in range(limit): + chars = set(s[i] for s in strings) + if len(chars) == 1: + result.append(chars.pop()) + else: + break + + return ''.join(result) + + def _autocomplete(self, msg: str) -> Tuple[str, List[str]]: + """Handle command autocompletion""" + matches = [s for s in self.state.candidates if s.startswith(msg)] + if not matches: + return msg, matches + + common = self._get_common_beginning(matches) + return common, matches + + def _is_printable(self, ch: int) -> bool: + """Check if character is printable""" + return 32 <= ch <= 127 + + def _reprompt(self, scr, msg: str, y: int, x: int) -> None: + """Redraw prompt and message""" + scr.move(y, 0) + scr.clrtoeol() + scr.addstr(self.state.prompt) + scr.addstr(msg) + scr.move(y, x) + + def _get_input(self, scr) -> str: # noqa: C901 + """Handle user input with history and autocompletion""" + ch = 0 + msg = '' + ihist = '' + hidx = len(self.state.history) + + y, x = scr.getyx() + self._reprompt(scr, msg, y, len(self.state.prompt) + len(msg)) + scr.refresh() -if __name__ == '__main__': + while ch != ord(b'\n'): + try: + ch = scr.getch() + except KeyboardInterrupt: + return 'exit' + + y, x = scr.getyx() + pos = x - len(self.state.prompt) + + if ch == ord(b'\t'): + msg, matches = self._autocomplete(msg) + if len(matches) > 1: + scr.addstr('\n') + scr.addstr(', '.join(matches)) + scr.addstr('\n') + scr.clrtobot() + self._reprompt(scr, msg, y, len(self.state.prompt) + len(msg)) + + elif ch == ord(b'\n'): + break + elif ch == 4: # Ctrl-D + return 'exit' + elif ch in (curses.KEY_BACKSPACE, 127): + if pos > 0: + scr.delch(y, x - 1) + msg = msg[0:pos - 1] + msg[pos:] + elif ch == curses.KEY_DC: + scr.delch(y, x) + msg = msg[0:pos] + msg[pos + 1:] + elif ch == curses.KEY_LEFT: + if pos > 0: + scr.move(y, x - 1) + elif ch == curses.KEY_RIGHT: + if pos < len(msg): + scr.move(y, x + 1) + elif ch == curses.KEY_HOME: + scr.move(y, len(self.state.prompt)) + elif ch == curses.KEY_END: + scr.move(y, len(self.state.prompt) + len(msg)) + elif ch == curses.KEY_UP: + if hidx == len(self.state.history): + ihist = msg + hidx = max(hidx - 1, 0) + msg = self.state.history[hidx] + self._reprompt(scr, msg, y, len(self.state.prompt) + len(msg)) + elif ch == curses.KEY_DOWN: + hidx = min(hidx + 1, len(self.state.history)) + msg = ihist if hidx == len(self.state.history) else self.state.history[hidx] + self._reprompt(scr, msg, y, len(self.state.prompt) + len(msg)) + elif self._is_printable(ch): + msg = msg[0:pos] + curses.ascii.unctrl(ch) + msg[pos:] + self._reprompt(scr, msg, y, x + 1) + + scr.refresh() + + self.state.history.append(msg) + return msg + + +def main(): + """CLI entry point with argument parsing""" default_tcp = 5555 default_ws = 5556 - url = f"tcp://localhost:{default_tcp}" - argparser = argparse.ArgumentParser(description='The Jukebox RPC command line tool', - epilog=f'Default connection: {url}') - port_group = argparser.add_mutually_exclusive_group() - port_group.add_argument("-w", "--websocket", - help=f"Use websocket protocol on PORT [default: {default_ws}]", - nargs='?', const=default_ws, - metavar="PORT", default=None) - port_group.add_argument("-t", "--tcp", - help=f"Use tcp protocol on PORT [default: {default_tcp}]", - nargs='?', const=default_tcp, - metavar="PORT", default=None) - port_group.add_argument("-c", "--command", - help="Send command to Jukebox server", - default=None) - args = argparser.parse_args() - + default_url = f"tcp://localhost:{default_tcp}" + + parser = argparse.ArgumentParser( + description='The Jukebox RPC command line tool', + epilog=f'Default connection: {default_url}' + ) + + port_group = parser.add_mutually_exclusive_group() + port_group.add_argument( + "-w", "--websocket", + help=f"Use websocket protocol on PORT [default: {default_ws}]", + nargs='?', const=default_ws, + metavar="PORT", default=None + ) + port_group.add_argument( + "-t", "--tcp", + help=f"Use tcp protocol on PORT [default: {default_tcp}]", + nargs='?', const=default_tcp, + metavar="PORT", default=None + ) + port_group.add_argument( + "-c", "--command", + help="Send command to Jukebox server", + default=None + ) + + args = parser.parse_args() + + url = default_url if args.websocket is not None: url = f"ws://localhost:{args.websocket}" elif args.tcp is not None: @@ -401,12 +491,17 @@ def runcmd(cmd): print(f">>> RPC Client connect on {url}") - client = rpc.RpcClient(url) + cli = JukeboxCli(url) if args.command is not None: - runcmd(args.command) - exit(0) + # Handle single command execution + cli.execute_command(None, args.command) else: - curses.wrapper(main) + # Run interactive CLI + curses.wrapper(cli.run) print(">>> RPC Client exited!") + + +if __name__ == '__main__': + main()