diff --git a/PKG-INFO b/PKG-INFO index 1d89f53..c29e9b0 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 1.0 Name: qpid-tools -Version: 0.14 +Version: 0.16 Summary: Diagnostic and management tools for Apache Qpid brokers. Home-page: http://qpid.apache.org/ Author: Apache Qpid diff --git a/setup.py b/setup.py index dd849dc..85e6a54 100755 --- a/setup.py +++ b/setup.py @@ -20,12 +20,15 @@ from distutils.core import setup setup(name="qpid-tools", - version="0.14", + version="0.16", author="Apache Qpid", author_email="dev@qpid.apache.org", + package_dir={'' : 'src/py'}, + packages=["qpidtoollibs"], scripts=["src/py/qpid-cluster", "src/py/qpid-cluster-store", "src/py/qpid-config", + "src/py/qpid-ha", "src/py/qpid-printevents", "src/py/qpid-queue-stats", "src/py/qpid-route", diff --git a/src/py/qmf-tool b/src/py/qmf-tool index 8413ca2..db51c96 100755 --- a/src/py/qmf-tool +++ b/src/py/qmf-tool @@ -266,7 +266,7 @@ class QmfData: self.conn_options = conn_options self.qmf_options = qmf_options self.agent_filter = '[]' - self.connection = cqpid.Connection(self.url, self.conn_options) + self.connection = cqpid.Connection(self.url, **self.conn_options) self.connection.open() self.session = qmf2.ConsoleSession(self.connection, self.qmf_options) self.session.setAgentFilter(self.agent_filter) diff --git a/src/py/qpid-config b/src/py/qpid-config index bb49b9d..1308df7 100755 --- a/src/py/qpid-config +++ b/src/py/qpid-config @@ -18,12 +18,18 @@ # specific language governing permissions and limitations # under the License. # +import pdb import os from optparse import OptionParser, OptionGroup, IndentedHelpFormatter import sys import locale -from qmf.console import Session + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) + +from qpid.messaging import Connection +from qpidtoollibs import BrokerAgent usage = """ Usage: qpid-config [OPTIONS] @@ -36,15 +42,16 @@ Usage: qpid-config [OPTIONS] qpid-config [OPTIONS] bind [binding-key] [-f -|filename] [all|any] k1=v1 [, k2=v2...] - qpid-config [OPTIONS] unbind [binding-key]""" + qpid-config [OPTIONS] unbind [binding-key] + qpid-config [OPTIONS] reload-acl""" description = """ Examples: $ qpid-config add queue q $ qpid-config add exchange direct d -a localhost:5672 -$ qpid-config exchanges -a 10.1.1.7:10000 -$ qpid-config queues -a guest/guest@broker-host:10000 +$ qpid-config exchanges -b 10.1.1.7:10000 +$ qpid-config queues -b guest/guest@broker-host:10000 Add Exchange values: @@ -55,7 +62,7 @@ Add Exchange values: xml XML Exchange - allows content filtering using an XQuery -Queue Limit Actions +Queue Limit Actions: none (default) - Use broker's default policy reject - Reject enqueued messages @@ -63,12 +70,14 @@ Queue Limit Actions ring - Replace oldest unacquired message with new ring-strict - Replace oldest message, reject if oldest is acquired -Queue Ordering Policies +Replication levels: - fifo (default) - First in, first out - lvq - Last Value Queue ordering, allows queue browsing - lvq-no-browse - Last Value Queue ordering, browsing clients may lose data""" + none - no replication + configuration - replicate queue and exchange existence and bindings, but not messages. + all - replicate configuration and messages +""" +REPLICATE_LEVELS= ["none", "configuration", "all"] class Config: def __init__(self): @@ -77,8 +86,9 @@ class Config: self._connTimeout = 10 self._ignoreDefault = False self._altern_ex = None - self._passive = False self._durable = False + self._replicate = None + self._ha_admin = False self._clusterDurable = False self._if_empty = True self._if_unused = True @@ -87,8 +97,8 @@ class Config: self._maxQueueSize = None self._maxQueueCount = None self._limitPolicy = None - self._order = None self._msgSequence = False + self._lvq_key = None self._ive = False self._eventGeneration = None self._file = None @@ -100,6 +110,7 @@ class Config: self._msgGroupHeader = None self._sharedMsgGroup = False self._extra_arguments = [] + self._start_replica = None self._returnCode = 0 config = Config() @@ -110,8 +121,7 @@ MAX_QUEUE_SIZE = "qpid.max_size" MAX_QUEUE_COUNT = "qpid.max_count" POLICY_TYPE = "qpid.policy_type" CLUSTER_DURABLE = "qpid.persist_last_node" -LVQ = "qpid.last_value_queue" -LVQNB = "qpid.last_value_queue_no_browse" +LVQ_KEY = "qpid.last_value_queue_key" MSG_SEQUENCE = "qpid.msg_sequence" IVE = "qpid.ive" QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" @@ -121,14 +131,18 @@ FLOW_STOP_SIZE = "qpid.flow_stop_size" FLOW_RESUME_SIZE = "qpid.flow_resume_size" MSG_GROUP_HDR_KEY = "qpid.group_header_key" SHARED_MSG_GROUP = "qpid.shared_msg_group" +REPLICATE = "qpid.replicate" #There are various arguments to declare that have specific program #options in this utility. However there is now a generic mechanism for #passing arguments as well. The SPECIAL_ARGS list contains the #arguments for which there are specific program options defined #i.e. the arguments for which there is special processing on add and #list -SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, - MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP] +SPECIAL_ARGS=[ + FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE, + LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION, + FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, + MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE] class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings @@ -146,7 +160,7 @@ class JHelpFormatter(IndentedHelpFormatter): def Usage(): print usage - exit(-1) + sys.exit(-1) def OptionsAndArguments(argv): """ Set global variables for options, return arguments """ @@ -160,8 +174,8 @@ def OptionsAndArguments(argv): group1 = OptionGroup(parser, "General Options") group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="", help="Maximum time to wait for broker connection (in seconds)") - group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list") - group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="
", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:]") + group1.add_option("-r", "--recursive", action="store_true", help="Show bindings in queue or exchange list") + group1.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="
", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:]") group1.add_option("--sasl-mechanism", action="store", type="string", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option_group(group1) @@ -171,8 +185,9 @@ def OptionsAndArguments(argv): group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues") group2.add_option("--alternate-exchange", action="store", type="string", metavar="", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.") - group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.") group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.") + group2.add_option("--replicate", action="store", metavar="", help="Enable automatic replication in a HA cluster. is 'none', 'configuration' or 'all').") + group2.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") parser.add_option_group(group2) group3 = OptionGroup(parser, "Options for Adding Queues") @@ -182,7 +197,7 @@ def OptionsAndArguments(argv): group3.add_option("--max-queue-size", action="store", type="int", metavar="", help="Maximum in-memory queue size as bytes") group3.add_option("--max-queue-count", action="store", type="int", metavar="", help="Maximum in-memory queue size as a number of messages") group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="", help="Action to take when queue limit is reached") - group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="", help="Queue ordering policy") + group3.add_option("--lvq-key", action="store", metavar="", help="Last Value Queue key") group3.add_option("--generate-queue-events", action="store", type="int", metavar="", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.") group3.add_option("--flow-stop-size", action="store", type="int", metavar="", help="Turn on sender flow control when the number of queued bytes exceeds this value.") @@ -198,6 +213,7 @@ def OptionsAndArguments(argv): help="Allow message group consumption across multiple consumers.") group3.add_option("--argument", dest="extra_arguments", action="append", default=[], metavar="", help="Specify a key-value pair to add to queue arguments") + group3.add_option("--start-replica", metavar="", help="Start replication from the same-named queue at ") # no option for declaring an exclusive queue - which can only be used by the session that creates it. parser.add_option_group(group3) @@ -224,11 +240,11 @@ def OptionsAndArguments(argv): except: args = encArgs - if opts.bindings: + if opts.recursive: config._recursive = True - if opts.broker_addr: - config._host = opts.broker_addr - if opts.timeout: + if opts.broker: + config._host = opts.broker + if opts.timeout is not None: config._connTimeout = opts.timeout if config._connTimeout == 0: config._connTimeout = None @@ -236,28 +252,31 @@ def OptionsAndArguments(argv): config._ignoreDefault = True if opts.alternate_exchange: config._altern_ex = opts.alternate_exchange - if opts.passive: - config._passive = True if opts.durable: config._durable = True + if opts.replicate: + if not opts.replicate in REPLICATE_LEVELS: + raise Exception("Invalid replication level '%s', should be one of: %s" % (opts.replicate, ", ".join(REPLICATE_LEVELS))) + config._replicate = opts.replicate + if opts.ha_admin: config._ha_admin = True if opts.cluster_durable: config._clusterDurable = True if opts.file: config._file = opts.file - if opts.file_count: + if opts.file_count is not None: config._fileCount = opts.file_count - if opts.file_size: + if opts.file_size is not None: config._fileSize = opts.file_size - if opts.max_queue_size != None: + if opts.max_queue_size is not None: config._maxQueueSize = opts.max_queue_size - if opts.max_queue_count: + if opts.max_queue_count is not None: config._maxQueueCount = opts.max_queue_count if opts.limit_policy: config._limitPolicy = opts.limit_policy - if opts.order: - config._order = opts.order if opts.sequence: config._msgSequence = True + if opts.lvq_key: + config._lvq_key = opts.lvq_key if opts.ive: config._ive = True if opts.generate_queue_events: @@ -271,13 +290,13 @@ def OptionsAndArguments(argv): config._if_unused = False if opts.sasl_mechanism: config._sasl_mechanism = opts.sasl_mechanism - if opts.flow_stop_size: + if opts.flow_stop_size is not None: config._flowStopSize = opts.flow_stop_size - if opts.flow_resume_size: + if opts.flow_resume_size is not None: config._flowResumeSize = opts.flow_resume_size - if opts.flow_stop_count: + if opts.flow_stop_count is not None: config._flowStopCount = opts.flow_stop_count - if opts.flow_resume_count: + if opts.flow_resume_count is not None: config._flowResumeCount = opts.flow_resume_count if opts.group_header: config._msgGroupHeader = opts.group_header @@ -285,6 +304,8 @@ def OptionsAndArguments(argv): config._sharedMsgGroup = True if opts.extra_arguments: config._extra_arguments = opts.extra_arguments + if opts.start_replica: + config._start_replica = opts.start_replica return args @@ -331,27 +352,24 @@ def snarf_header_args(args): class BrokerManager: def __init__(self): self.brokerName = None - self.qmf = None + self.conn = None self.broker = None - self.mechanism = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a + client_properties={} + if config._ha_admin: client_properties["qpid.ha-admin"] = 1 + self.conn = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties) + self.broker = BrokerAgent(self.conn) def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) + if self.conn: + self.conn.close() def Overview(self): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - print "Total Exchanges: %d" % len (exchanges) + exchanges = self.broker.getAllExchanges() + queues = self.broker.getAllQueues() + print "Total Exchanges: %d" % len(exchanges) etype = {} for ex in exchanges: if ex.type not in etype: @@ -362,16 +380,16 @@ class BrokerManager: print "%15s: %d" % (typ, etype[typ]) print - print " Total Queues: %d" % len (queues) + print " Total Queues: %d" % len(queues) durable = 0 for queue in queues: if queue.durable: durable = durable + 1 print " durable: %d" % durable - print " non-durable: %d" % (len (queues) - durable) + print " non-durable: %d" % (len(queues) - durable) def ExchangeList(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() caption1 = "Type " caption2 = "Exchange Name" maxNameLen = len(caption2) @@ -398,31 +416,35 @@ class BrokerManager: args = ex.arguments if not args: args = {} if ex.durable: print "--durable", + if REPLICATE in args: print "--replicate=%s" % args[REPLICATE], if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", if IVE in args and args[IVE] == 1: print "--ive", if ex.altExchange: - print "--alternate-exchange=%s" % ex._altExchange_.name, + print "--alternate-exchange=%s" % ex.altExchange, print def ExchangeListRecurse(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() + bindings = self.broker.getAllBindings() + queues = self.broker.getAllQueues() for ex in exchanges: if config._ignoreDefault and not ex.name: continue if self.match(ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: - if bind.exchangeRef == ex.getObjectId(): + if bind.exchangeRef == ex.name: qname = "" queue = self.findById(queues, bind.queueRef) if queue != None: qname = queue.name - print " bind [%s] => %s" % (bind.bindingKey, qname) + if bind.arguments: + print " bind [%s] => %s %s" % (bind.bindingKey, qname, bind.arguments) + else: + print " bind [%s] => %s" % (bind.bindingKey, qname) def QueueList(self, filter): - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + queues = self.broker.getAllQueues() caption = "Queue Name" maxNameLen = len(caption) found = False @@ -447,6 +469,7 @@ class BrokerManager: args = q.arguments if not args: args = {} if q.durable: print "--durable", + if REPLICATE in args: print "--replicate=%s" % args[REPLICATE], if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", if q.autoDelete: print "auto-del", if q.exclusive: print "excl", @@ -455,11 +478,10 @@ class BrokerManager: if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE], if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT], if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), - if LVQ in args and args[LVQ] == 1: print "--order lvq", - if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", + if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY], if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION], if q.altExchange: - print "--alternate-exchange=%s" % q._altExchange_.name, + print "--alternate-exchange=%s" % q.altExchange, if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE], if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE], if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT], @@ -469,14 +491,14 @@ class BrokerManager: print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS]) def QueueListRecurse(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() + bindings = self.broker.getAllBindings() + queues = self.broker.getAllQueues() for queue in queues: if self.match(queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: - if bind.queueRef == queue.getObjectId(): + if bind.queueRef == queue.name: ename = "" ex = self.findById(exchanges, bind.exchangeRef) if ex != None: @@ -484,7 +506,10 @@ class BrokerManager: if ename == "": if config._ignoreDefault: continue ename = "''" - print " bind [%s] => %s" % (bind.bindingKey, ename) + if bind.arguments: + print " bind [%s] => %s %s" % (bind.bindingKey, ename, bind.arguments) + else: + print " bind [%s] => %s" % (bind.bindingKey, ename) def AddExchange(self, args): if len(args) < 2: @@ -492,20 +517,31 @@ class BrokerManager: etype = args[0] ename = args[1] declArgs = {} + for a in config._extra_arguments: + r = a.split("=", 1) + if len(r) == 2: value = r[1] + else: value = None + declArgs[r[0]] = value + if config._msgSequence: declArgs[MSG_SEQUENCE] = 1 if config._ive: declArgs[IVE] = 1 - if config._altern_ex != None: - self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) - else: - self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, passive=config._passive, durable=config._durable, arguments=declArgs) + if config._altern_ex: + declArgs['alternate-exchange'] = config._altern_ex + if config._durable: + declArgs['durable'] = 1 + if config._replicate: + declArgs[REPLICATE] = config._replicate + self.broker.addExchange(etype, ename, declArgs) + def DelExchange(self, args): if len(args) < 1: Usage() ename = args[0] - self.broker.getAmqpSession().exchange_delete(exchange=ename) + self.broker.delExchange(ename) + def AddQueue(self, args): if len(args) < 1: @@ -522,9 +558,9 @@ class BrokerManager: declArgs[FILECOUNT] = config._fileCount declArgs[FILESIZE] = config._fileSize - if config._maxQueueSize != None: + if config._maxQueueSize is not None: declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize - if config._maxQueueCount: + if config._maxQueueCount is not None: declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount if config._limitPolicy: if config._limitPolicy == "none": @@ -538,25 +574,20 @@ class BrokerManager: elif config._limitPolicy == "ring-strict": declArgs[POLICY_TYPE] = "ring_strict" - if config._clusterDurable: + if config._clusterDurable: declArgs[CLUSTER_DURABLE] = 1 - if config._order: - if config._order == "fifo": - pass - elif config._order == "lvq": - declArgs[LVQ] = 1 - elif config._order == "lvq-no-browse": - declArgs[LVQNB] = 1 + if config._lvq_key: + declArgs[LVQ_KEY] = config._lvq_key if config._eventGeneration: declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration - if config._flowStopSize: + if config._flowStopSize is not None: declArgs[FLOW_STOP_SIZE] = config._flowStopSize - if config._flowResumeSize: + if config._flowResumeSize is not None: declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize - if config._flowStopCount: + if config._flowStopCount is not None: declArgs[FLOW_STOP_COUNT] = config._flowStopCount - if config._flowResumeCount: + if config._flowResumeCount is not None: declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount if config._msgGroupHeader: @@ -564,17 +595,21 @@ class BrokerManager: if config._sharedMsgGroup: declArgs[SHARED_MSG_GROUP] = 1 - if config._altern_ex != None: - self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) - else: - self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs) - + if config._altern_ex: + declArgs['alternate-exchange'] = config._altern_ex + if config._durable: + declArgs['durable'] = 1 + if config._replicate: + declArgs[REPLICATE] = config._replicate + self.broker.addQueue(qname, declArgs) + if config._start_replica: # Start replication + self.broker._method("replicate", {"broker":config._start_replica, "queue":qname}, "org.apache.qpid.ha:habroker:ha-broker") def DelQueue(self, args): if len(args) < 1: Usage() qname = args[0] - self.broker.getAmqpSession().queue_delete(queue=qname, if_empty=config._if_empty, if_unused=config._if_unused) + self.broker.delQueue(qname) def Bind(self, args): @@ -587,7 +622,7 @@ class BrokerManager: key = args[2] # query the exchange to determine its type. - res = self.broker.getAmqpSession().exchange_query(ename) + res = self.broker.getExchange(ename) # type of the xchg determines the processing of the rest of # argv. if it's an xml xchg, we want to find a file @@ -596,7 +631,7 @@ class BrokerManager: # map containing key/value pairs. if neither of those, extra # args are ignored. ok = True - _args = None + _args = {} if res.type == "xml": # this checks/imports the -f arg [ok, xquery] = snarf_xquery_args() @@ -610,10 +645,7 @@ class BrokerManager: if not ok: sys.exit(1) - self.broker.getAmqpSession().exchange_bind(queue=qname, - exchange=ename, - binding_key=key, - arguments=_args) + self.broker.bind(ename, qname, key, _args) def Unbind(self, args): if len(args) < 2: @@ -623,11 +655,20 @@ class BrokerManager: key = "" if len(args) > 2: key = args[2] - self.broker.getAmqpSession().exchange_unbind(queue=qname, exchange=ename, binding_key=key) + self.broker.unbind(ename, qname, key) + + def ReloadAcl(self): + try: + self.broker.reloadAclFile() + except Exception, e: + if str(e).find('No object found') != -1: + print "Failed: ACL Module Not Loaded in Broker" + else: + raise def findById(self, items, id): for item in items: - if item.getObjectId() == id: + if item.name == id: return item return None @@ -685,6 +726,8 @@ def main(argv=None): bm.Bind(args[1:]) elif cmd == "unbind": bm.Unbind(args[1:]) + elif cmd == "reload-acl": + bm.ReloadAcl() else: Usage() except KeyboardInterrupt: @@ -713,9 +756,9 @@ def main(argv=None): if e.__class__.__name__ != "Timeout": print "Failed: %s: %s" % (e.__class__.__name__, e) return 1 - return config._returnCode + if __name__ == "__main__": sys.exit(main()) diff --git a/src/py/qpid-ha b/src/py/qpid-ha new file mode 100755 index 0000000..5c757f3 --- /dev/null +++ b/src/py/qpid-ha @@ -0,0 +1,159 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import qmf.console, optparse, sys, time, os +from qpid.management import managementChannel, managementClient +from qpid.messaging import Connection +from qpid.messaging import Message as QpidMessage +from qpidtoollibs.broker import BrokerAgent +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +# QMF address for the HA broker object. +HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker" + +class Command: + commands = {} + + def __init__(self, name, help, args=[]): + Command.commands[name] = self + self.name = name + self.args = args + usage="%s [options] %s\n\n%s"%(name, " ".join(args), help) + self.help = help + self.op=optparse.OptionParser(usage) + self.op.add_option("-b", "--broker", metavar="", help="Connect to broker at ") + + def execute(self): + opts, args = self.op.parse_args() + if len(args) != len(self.args)+1: + self.op.print_help() + raise Exception("Wrong number of arguments") + broker = opts.broker or "localhost:5672" + connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1}) + qmf_broker = BrokerAgent(connection) + ha_broker = qmf_broker.getHaBroker() + if not ha_broker: raise Exception("HA module is not loaded on broker at %s"%broker) + try: return self.do_execute(qmf_broker, ha_broker, opts, args) + finally: connection.close() + + def do_execute(self, qmf_broker, opts, args): + raise Exception("Command '%s' is not yet implemented"%self.name) + +class PromoteCmd(Command): + def __init__(self): + Command.__init__(self, "promote","Promote broker from backup to primary") + def do_execute(self, qmf_broker, ha_broker, opts, args): + qmf_broker._method("promote", {}, HA_BROKER) +PromoteCmd() + +class ReadyCmd(Command): + def __init__(self): + Command.__init__(self, "ready", "Test if a backup broker is ready.\nReturn 0 if broker is a ready backup, non-0 otherwise.") + self.op.add_option( + "--wait", type="int", metavar="", default=None, + help="Wait up to for broker to be ready. 0 means wait forever.") + def do_execute(self, qmf_broker, ha_broker, opts, args): + if (ha_broker.status == "backup"): return + if (ha_broker.status != "catch-up"): + raise Exception("Broker is not a backup, status is '%s'"%ha_broker.status) + if (opts.wait is None): return 1 + delay = 0.1 + timeout = time.time() + opts.wait + while opts.wait == 0 or time.time() < timeout: + time.sleep(delay) + delay = min(2*delay, 1) + ha_broker = qmf_broker.getHaBroker() + if (ha_broker.status == "backup"): return + return 1 +ReadyCmd() + +class ReplicateCmd(Command): + def __init__(self): + Command.__init__(self, "replicate", "Set up replication from on to on the current broker.", ["", ""]) + def do_execute(self, qmf_broker, ha_broker, opts, args): + qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER) +ReplicateCmd() + +class SetCmd(Command): + def __init__(self): + Command.__init__(self, "set", "Set HA configuration settings") + def add(optname, metavar, type, help): + self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store") + add("--brokers", "", "string", "HA brokers use to connect to each other") + add("--public-brokers", "", "string", "Clients use to connect to HA brokers") + add("--backups", "", "int", "Expect backups to be running"), + + def do_execute(self, qmf_broker, ha_broker, opts, args): + if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER) + if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER) + if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER) + +SetCmd() + +class QueryCmd(Command): + def __init__(self): + Command.__init__(self, "query", "Print HA configuration settings") + + def do_execute(self, qmf_broker, ha_broker, opts, args): + hb = ha_broker + for x in [("Status:", hb.status), + ("Brokers URL:", hb.brokers), + ("Public URL:", hb.publicBrokers), + ("Expected Backups:", hb.expectedBackups) + ]: + print "%-20s %s"%(x[0], x[1]) +QueryCmd() + +def print_usage(prog): + print "usage: %s []\n\nCommands are:\n"%prog + for name, command in Command.commands.iteritems(): + help = command.help + print " %-12s %s."%(name, help.split(".")[0]) + print "\nFor help with a command type: %s --help\n"%prog + +def find_command(args): + """Find a command among the arguments and options""" + for arg in args: + if arg in Command.commands: + return Command.commands[arg] + return None + +def main(argv): + try: + args=argv[1:] + if args and args[0] == "--help-all": + for c in Command.commands.itervalues(): + c.op.print_help(); print + return 1 + command = find_command(args) + if not command: + print_usage(os.path.basename(argv[0])); + return 1; + if command.execute(): return 1 + except Exception, e: + print e + return 1 + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/src/py/qpid-route b/src/py/qpid-route index f90416d..0316c24 100755 --- a/src/py/qpid-route +++ b/src/py/qpid-route @@ -62,6 +62,7 @@ class Config: self._ack = 0 self._connTimeout = 10 self._client_sasl_mechanism = None + self._ha_admin = False config = Config() @@ -96,7 +97,7 @@ def OptionsAndArguments(argv): parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="", help="Transport to use for links, defaults to tcp") parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.") - + parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") opts, encArgs = parser.parse_args(args=argv) try: @@ -128,6 +129,9 @@ def OptionsAndArguments(argv): if opts.transport: config._transport = opts.transport + if opts.ha_admin: + config._ha_admin = True + if opts.ack: config._ack = opts.ack @@ -143,7 +147,9 @@ class RouteManager: self.local = BrokerURL(localBroker) self.remote = None self.qmf = Session() - self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism) + client_properties = {} + if config._ha_admin: client_properties["qpid.ha-admin"] = 1 + self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism, client_properties=client_properties) self.broker._waitForStable() self.agent = self.broker.getBrokerAgent() diff --git a/src/py/qpid-stat b/src/py/qpid-stat index ce3f5d1..5a816ba 100755 --- a/src/py/qpid-stat +++ b/src/py/qpid-stat @@ -21,13 +21,18 @@ import os from optparse import OptionParser, OptionGroup -from time import sleep ### debug import sys import locale import socket import re -from qmf.console import Session, Console -from qpid.disp import Display, Header, Sorter +from qpid.messaging import Connection + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) + +from qpidtoollibs import BrokerAgent +from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong + class Config: def __init__(self): @@ -37,8 +42,8 @@ class Config: self._limit = 50 self._increasing = False self._sortcol = None - self._cluster_detail = False self._sasl_mechanism = None + self._ha_admin = False config = Config() @@ -47,50 +52,45 @@ def OptionsAndArguments(argv): global config - parser = OptionParser(usage="usage: %prog [options] BROKER", - description="Example: $ qpid-stat -q broker-host:10000") + parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]") group1 = OptionGroup(parser, "General Options") - group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="", help="Maximum time to wait for broker connection (in seconds)") - group1.add_option("--sasl-mechanism", action="store", type="string", metavar="", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="", + help="URL of the broker to query") + group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="", + help="Maximum time to wait for broker connection (in seconds)") + group1.add_option("--sasl-mechanism", action="store", type="string", metavar="", + help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") parser.add_option_group(group1) group2 = OptionGroup(parser, "Display Options") - group2.add_option("-b", "--broker", help="Show Brokers", - action="store_const", const="b", dest="show") - group2.add_option("-c", "--connections", help="Show Connections", - action="store_const", const="c", dest="show") - group2.add_option("-e", "--exchanges", help="Show Exchanges", - action="store_const", const="e", dest="show") - group2.add_option("-q", "--queues", help="Show Queues", - action="store_const", const="q", dest="show") - group2.add_option("-u", "--subscriptions", help="Show Subscriptions", - action="store_const", const="u", dest="show") - group2.add_option("-S", "--sort-by", metavar="", - help="Sort by column name") - group2.add_option("-I", "--increasing", action="store_true", default=False, - help="Sort by increasing value (default = decreasing)") - group2.add_option("-L", "--limit", default=50, metavar="", - help="Limit output to n rows") - group2.add_option("-C", "--cluster", action="store_true", default=False, - help="Display per-broker cluster detail.") + group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show") + group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") + group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") + group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") + group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") + group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") + group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show") + group2.add_option("-S", "--sort-by", metavar="", help="Sort by column name") + group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") + group2.add_option("-L", "--limit", type="int", default=50, metavar="", help="Limit output to n rows") + parser.add_option_group(group2) opts, args = parser.parse_args(args=argv) if not opts.show: - parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help") + parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help") config._types = opts.show config._sortcol = opts.sort_by + config._host = opts.broker config._connTimeout = opts.timeout config._increasing = opts.increasing config._limit = opts.limit - config._cluster_detail = opts.cluster config._sasl_mechanism = opts.sasl_mechanism - - if args: - config._host = args[0] + config._ha_admin = opts.ha_admin return args @@ -119,86 +119,26 @@ class IpAddr: bestAddr = addrPort return bestAddr -class Broker(object): - def __init__(self, qmf, broker): - self.broker = broker - - agents = qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a - - bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] - self.currentTime = bobj.getTimestamps()[0] - try: - self.uptime = bobj.uptime - except: - self.uptime = 0 - self.connections = {} - self.sessions = {} - self.exchanges = {} - self.queues = {} - self.subscriptions = {} - package = "org.apache.qpid.broker" - - list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent) - for conn in list: - if not conn.shadow: - self.connections[conn.getObjectId()] = conn - - list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent) - for sess in list: - if sess.connectionRef in self.connections: - self.sessions[sess.getObjectId()] = sess - - list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent) - for exchange in list: - self.exchanges[exchange.getObjectId()] = exchange - - list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent) - for queue in list: - self.queues[queue.getObjectId()] = queue - - list = qmf.getObjects(_class="subscription", _package=package, _agent=self.brokerAgent) - for subscription in list: - self.subscriptions[subscription.getObjectId()] = subscription - - def getName(self): - return self.broker.getUrl() - - def getCurrentTime(self): - return self.currentTime - - def getUptime(self): - return self.uptime - -class BrokerManager(Console): +class BrokerManager: def __init__(self): self.brokerName = None - self.qmf = None + self.connection = None self.broker = None - self.brokers = [] self.cluster = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.qmf = Session() - self.mechanism = mechanism - self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a + client_properties={} + if config._ha_admin: client_properties["qpid.ha-admin"] = 1 + self.connection = Connection.establish(self.url, sasl_mechanisms=mechanism, client_properties=client_properties) + self.broker = BrokerAgent(self.connection) def Disconnect(self): """ Release any allocated brokers. Ignore any failures as the tool is shutting down. """ try: - if self.broker: - self.qmf.delBroker(self.broker) - else: - for b in self.brokers: self.qmf.delBroker(b.broker) + connection.close() except: pass @@ -238,62 +178,63 @@ class BrokerManager(Console): hosts.append(bestUrl) return hosts - def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): - if len(subs) == 0: - return - this = subs[0] - remaining = subs[1:] - newindent = indent + " " - if this == 'b': - pass - elif this == 'c': - if broker: - for oid in broker.connections: - iconn = broker.connections[oid] - self.printConnSub(indent, broker.getName(), iconn) - self.displaySubs(remaining, newindent, broker=broker, conn=iconn, - sess=sess, exchange=exchange, queue=queue) - elif this == 's': - pass - elif this == 'e': - pass - elif this == 'q': - pass - print - - def displayBroker(self, subs): + def displayBroker(self): disp = Display(prefix=" ") heads = [] - heads.append(Header('broker')) - heads.append(Header('cluster')) heads.append(Header('uptime', Header.DURATION)) - heads.append(Header('conn', Header.KMG)) - heads.append(Header('sess', Header.KMG)) - heads.append(Header('exch', Header.KMG)) - heads.append(Header('queue', Header.KMG)) + heads.append(Header('connections', Header.COMMAS)) + heads.append(Header('sessions', Header.COMMAS)) + heads.append(Header('exchanges', Header.COMMAS)) + heads.append(Header('queues', Header.COMMAS)) rows = [] - for broker in self.brokers: - if self.cluster: - ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) - else: - ctext = "" - row = (broker.getName(), ctext, broker.getUptime(), - len(broker.connections), len(broker.sessions), - len(broker.exchanges), len(broker.queues)) - rows.append(row) - title = "Brokers" - if config._sortcol: - sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) - dispRows = sorter.getSorted() - else: - dispRows = rows - disp.formattedTable(title, heads, dispRows) + broker = self.broker.getBroker() + connections = self.getConnectionMap() + sessions = self.getSessionMap() + exchanges = self.getExchangeMap() + queues = self.getQueueMap() + row = (broker.getUpdateTime() - broker.getCreateTime(), + len(connections), len(sessions), + len(exchanges), len(queues)) + rows.append(row) + disp.formattedTable('Broker Summary:', heads, rows) + + if 'queueCount' not in broker.values: + return - def displayConn(self, subs): + print + heads = [] + heads.append(Header('Statistic')) + heads.append(Header('Messages', Header.COMMAS)) + heads.append(Header('Bytes', Header.COMMAS)) + rows = [] + rows.append(['queue-depth', broker.msgDepth, broker.byteDepth]) + rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues]) + rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues]) + rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues]) + rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues]) + rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues]) + rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues]) + rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth]) + rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues]) + rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues]) + rows.append(['acquires', broker.acquires, None]) + rows.append(['releases', broker.releases, None]) + rows.append(['discards-no-route', broker.discardsNoRoute, None]) + rows.append(['discards-ttl-expired', broker.discardsTtl, None]) + rows.append(['discards-limit-overflow', broker.discardsOverflow, None]) + rows.append(['discards-ring-overflow', broker.discardsRing, None]) + rows.append(['discards-lvq-replace', broker.discardsLvq, None]) + rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None]) + rows.append(['discards-purged', broker.discardsPurge, None]) + rows.append(['reroutes', broker.reroutes, None]) + rows.append(['abandoned', broker.abandoned, None]) + rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None]) + disp.formattedTable('Aggregate Broker Statistics:', heads, rows) + + + def displayConn(self): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) heads.append(Header('client-addr')) heads.append(Header('cproc')) heads.append(Header('cpid')) @@ -303,25 +244,20 @@ class BrokerManager(Console): heads.append(Header('msgIn', Header.KMG)) heads.append(Header('msgOut', Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.connections: - conn = broker.connections[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(conn.address) - row.append(conn.remoteProcessName) - row.append(conn.remotePid) - row.append(conn.authIdentity) - row.append(broker.getCurrentTime() - conn.getTimestamps()[1]) - idle = broker.getCurrentTime() - conn.getTimestamps()[0] - row.append(broker.getCurrentTime() - conn.getTimestamps()[0]) - row.append(conn.framesFromClient) - row.append(conn.framesToClient) - rows.append(row) + connections = self.broker.getAllConnections() + broker = self.broker.getBroker() + for conn in connections: + row = [] + row.append(conn.address) + row.append(conn.remoteProcessName) + row.append(conn.remotePid) + row.append(conn.authIdentity) + row.append(broker.getUpdateTime() - conn.getCreateTime()) + row.append(broker.getUpdateTime() - conn.getUpdateTime()) + row.append(conn.msgsFromClient) + row.append(conn.msgsToClient) + rows.append(row) title = "Connections" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -329,14 +265,12 @@ class BrokerManager(Console): dispRows = rows disp.formattedTable(title, heads, dispRows) - def displaySession(self, subs): + def displaySession(self): disp = Display(prefix=" ") - def displayExchange(self, subs): + def displayExchange(self): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) heads.append(Header("exchange")) heads.append(Header("type")) heads.append(Header("dur", Header.Y)) @@ -348,26 +282,21 @@ class BrokerManager(Console): heads.append(Header("byteOut", Header.KMG)) heads.append(Header("byteDrop", Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.exchanges: - ex = broker.exchanges[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(ex.name) - row.append(ex.type) - row.append(ex.durable) - row.append(ex.bindingCount) - row.append(ex.msgReceives) - row.append(ex.msgRoutes) - row.append(ex.msgDrops) - row.append(ex.byteReceives) - row.append(ex.byteRoutes) - row.append(ex.byteDrops) - rows.append(row) + exchanges = self.broker.getAllExchanges() + for ex in exchanges: + row = [] + row.append(ex.name) + row.append(ex.type) + row.append(ex.durable) + row.append(ex.bindingCount) + row.append(ex.msgReceives) + row.append(ex.msgRoutes) + row.append(ex.msgDrops) + row.append(ex.byteReceives) + row.append(ex.byteRoutes) + row.append(ex.byteDrops) + rows.append(row) title = "Exchanges" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -375,11 +304,9 @@ class BrokerManager(Console): dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayQueue(self, subs): + def displayQueues(self): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) heads.append(Header("queue")) heads.append(Header("dur", Header.Y)) heads.append(Header("autoDel", Header.Y)) @@ -393,28 +320,23 @@ class BrokerManager(Console): heads.append(Header("cons", Header.KMG)) heads.append(Header("bind", Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.queues: - q = broker.queues[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(q.name) - row.append(q.durable) - row.append(q.autoDelete) - row.append(q.exclusive) - row.append(q.msgDepth) - row.append(q.msgTotalEnqueues) - row.append(q.msgTotalDequeues) - row.append(q.byteDepth) - row.append(q.byteTotalEnqueues) - row.append(q.byteTotalDequeues) - row.append(q.consumerCount) - row.append(q.bindingCount) - rows.append(row) + queues = self.broker.getAllQueues() + for q in queues: + row = [] + row.append(q.name) + row.append(q.durable) + row.append(q.autoDelete) + row.append(q.exclusive) + row.append(q.msgDepth) + row.append(q.msgTotalEnqueues) + row.append(q.msgTotalDequeues) + row.append(q.byteDepth) + row.append(q.byteTotalEnqueues) + row.append(q.byteTotalDequeues) + row.append(q.consumerCount) + row.append(q.bindingCount) + rows.append(row) title = "Queues" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -422,46 +344,102 @@ class BrokerManager(Console): dispRows = rows disp.formattedTable(title, heads, dispRows) - def displaySubscriptions(self, subs): + + def displayQueue(self, name): + queue = self.broker.getQueue(name) + if not queue: + print "Queue '%s' not found" % name + return + + disp = Display(prefix=" ") + heads = [] + heads.append(Header('Name')) + heads.append(Header('Durable', Header.YN)) + heads.append(Header('AutoDelete', Header.YN)) + heads.append(Header('Exclusive', Header.YN)) + heads.append(Header('FlowStopped', Header.YN)) + heads.append(Header('FlowStoppedCount', Header.COMMAS)) + heads.append(Header('Consumers', Header.COMMAS)) + heads.append(Header('Bindings', Header.COMMAS)) + rows = [] + rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive, + queue.flowStopped, queue.flowStoppedCount, + queue.consumerCount, queue.bindingCount]) + disp.formattedTable("Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Property')) + heads.append(Header('Value')) + rows = [] + rows.append(['arguments', queue.arguments]) + rows.append(['alt-exchange', queue.altExchange]) + disp.formattedTable("Optional Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Statistic')) + heads.append(Header('Messages', Header.COMMAS)) + heads.append(Header('Bytes', Header.COMMAS)) + rows = [] + rows.append(['queue-depth', queue.msgDepth, queue.byteDepth]) + rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues]) + rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues]) + rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues]) + rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues]) + rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues]) + rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues]) + rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth]) + rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues]) + rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues]) + rows.append(['acquires', queue.acquires, None]) + rows.append(['releases', queue.releases, None]) + rows.append(['discards-ttl-expired', queue.discardsTtl, None]) + rows.append(['discards-limit-overflow', queue.discardsOverflow, None]) + rows.append(['discards-ring-overflow', queue.discardsRing, None]) + rows.append(['discards-lvq-replace', queue.discardsLvq, None]) + rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None]) + rows.append(['discards-purged', queue.discardsPurge, None]) + rows.append(['reroutes', queue.reroutes, None]) + disp.formattedTable("Statistics:", heads, rows) + + + def displaySubscriptions(self): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) - heads.append(Header("subscription")) + heads.append(Header("subscr")) heads.append(Header("queue")) - heads.append(Header("connection")) - heads.append(Header("processName")) - heads.append(Header("processId")) - heads.append(Header("browsing", Header.Y)) - heads.append(Header("acknowledged", Header.Y)) - heads.append(Header("exclusive", Header.Y)) + heads.append(Header("conn")) + heads.append(Header("procName")) + heads.append(Header("procId")) + heads.append(Header("browse", Header.Y)) + heads.append(Header("acked", Header.Y)) + heads.append(Header("excl", Header.Y)) heads.append(Header("creditMode")) heads.append(Header("delivered", Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.subscriptions: - s = broker.subscriptions[oid] - row = [] - try: - if self.cluster: - row.append(broker.getName()) - row.append(s.name) - row.append(self.qmf.getObjects(_objectId=s.queueRef)[0].name) - connectionRef = self.qmf.getObjects(_objectId=s.sessionRef)[0].connectionRef - row.append(self.qmf.getObjects(_objectId=connectionRef)[0].address) - row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remoteProcessName) - row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remotePid) - row.append(s.browsing) - row.append(s.acknowledged) - row.append(s.exclusive) - row.append(s.creditMode) - row.append(s.delivered) - rows.append(row) - except: - pass + subscriptions = self.broker.getAllSubscriptions() + sessions = self.getSessionMap() + connections = self.getConnectionMap() + for s in subscriptions: + row = [] + try: + row.append(s.name) + row.append(s.queueRef) + session = sessions[s.sessionRef] + connection = connections[session.connectionRef] + row.append(connection.address) + row.append(connection.remoteProcessName) + row.append(connection.remotePid) + row.append(s.browsing) + row.append(s.acknowledged) + row.append(s.exclusive) + row.append(s.creditMode) + row.append(s.delivered) + rows.append(row) + except: + pass title = "Subscriptions" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -469,34 +447,75 @@ class BrokerManager(Console): dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayMain(self, main, subs): - if main == 'b': self.displayBroker(subs) - elif main == 'c': self.displayConn(subs) - elif main == 's': self.displaySession(subs) - elif main == 'e': self.displayExchange(subs) - elif main == 'q': self.displayQueue(subs) - elif main == 'u': self.displaySubscriptions(subs) - - def display(self): - if config._cluster_detail or config._types[0] == 'b': - # always show cluster detail when dumping broker stats - self._getCluster() - if self.cluster: - memberList = self.cluster.members.split(";") - hostList = self._getHostList(memberList) - self.qmf.delBroker(self.broker) - self.broker = None - if config._host.find("@") > 0: - authString = config._host.split("@")[0] + "@" + def displayMemory(self): + disp = Display(prefix=" ") + heads = [Header('Statistic'), Header('Value', Header.COMMAS)] + rows = [] + memory = self.broker.getMemory() + for k,v in memory.values.items(): + if k != 'name': + rows.append([k, v]) + disp.formattedTable('Broker Memory Statistics:', heads, rows) + + def displayAcl(self): + acl = self.broker.getAcl() + if not acl: + print "ACL Policy Module is not installed" + return + disp = Display(prefix=" ") + heads = [Header('Statistic'), Header('Value')] + rows = [] + rows.append(['policy-file', acl.policyFile]) + rows.append(['enforcing', YN(acl.enforcingAcl)]) + rows.append(['has-transfer-acls', YN(acl.transferAcl)]) + rows.append(['last-acl-load', TimeLong(acl.lastAclLoad)]) + rows.append(['acl-denials', Commas(acl.aclDenyCount)]) + disp.formattedTable('ACL Policy Statistics:', heads, rows) + + def getExchangeMap(self): + exchanges = self.broker.getAllExchanges() + emap = {} + for e in exchanges: + emap[e.name] = e + return emap + + def getQueueMap(self): + queues = self.broker.getAllQueues() + qmap = {} + for q in queues: + qmap[q.name] = q + return qmap + + def getSessionMap(self): + sessions = self.broker.getAllSessions() + smap = {} + for s in sessions: + smap[s.name] = s + return smap + + def getConnectionMap(self): + connections = self.broker.getAllConnections() + cmap = {} + for c in connections: + cmap[c.address] = c + return cmap + + def displayMain(self, names, main): + if main == 'g': self.displayBroker() + elif main == 'c': self.displayConn() + elif main == 's': self.displaySession() + elif main == 'e': self.displayExchange() + elif main == 'q': + if len(names) >= 1: + self.displayQueue(names[0]) else: - authString = "" - for host in hostList: - b = self.qmf.addBroker(authString + host, config._connTimeout) - self.brokers.append(Broker(self.qmf, b)) - else: - self.brokers.append(Broker(self.qmf, self.broker)) + self.displayQueues() + elif main == 'u': self.displaySubscriptions() + elif main == 'm': self.displayMemory() + elif main == 'acl': self.displayAcl() - self.displayMain(config._types[0], config._types[1:]) + def display(self, names): + self.displayMain(names, config._types) def main(argv=None): @@ -506,7 +525,7 @@ def main(argv=None): try: bm.SetBroker(config._host, config._sasl_mechanism) - bm.display() + bm.display(args) bm.Disconnect() return 0 except KeyboardInterrupt: diff --git a/src/py/qpid-tool b/src/py/qpid-tool index df8b7e3..af948b1 100755 --- a/src/py/qpid-tool +++ b/src/py/qpid-tool @@ -252,6 +252,20 @@ class QmfData(Console): elif tokens[0].isdigit(): self.showObjectById(int(tokens[0])) + def _build_object_name(self, obj): + values = [] + for p,v in obj.getProperties(): + if p.name != "vhostRef" and p.index == 1: + if p.name == "brokerRef": # reference to broker + values.append('org.apache.qpid.broker:broker:amqp-broker') + else: + values.append(str(v)) + + object_key = ",".join(values) + class_key = obj.getClassKey(); + return class_key.getPackageName() + ":" + class_key.getClassName() + ":" + object_key + + def do_call(self, data): tokens = data.split() if len(tokens) < 2: @@ -290,10 +304,8 @@ class QmfData(Console): object_id = obj.getObjectId(); if not object_id.isV2 and obj.getAgent().isV2: - object_key = ",".join([str(v) for p, v in obj.getProperties() if p.name != "vhostRef" and p.index == 1]) - class_key = obj.getClassKey(); - object_name = class_key.getPackageName() + ":" + class_key.getClassName() + ":" + object_key - object_id = ObjectId.create(object_id.agentName, object_name) + object_name = self._build_object_name(obj) + object_id = ObjectId.create(object_id.agentName, object_name) self.session._sendMethodRequest(self.broker, obj.getClassKey(), object_id, methodName, args) diff --git a/src/py/qpidtoollibs/__init__.py b/src/py/qpidtoollibs/__init__.py new file mode 100644 index 0000000..2815bac --- /dev/null +++ b/src/py/qpidtoollibs/__init__.py @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpidtoollibs.broker import * +from qpidtoollibs.disp import * + diff --git a/src/py/qpidtoollibs/broker.py b/src/py/qpidtoollibs/broker.py new file mode 100644 index 0000000..c2340de --- /dev/null +++ b/src/py/qpidtoollibs/broker.py @@ -0,0 +1,394 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import Message +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +class BrokerAgent(object): + """ + Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection. + """ + def __init__(self, conn): + self.conn = conn + self.sess = self.conn.session() + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ + str(uuid4()) + self.reply_rx = self.sess.receiver(self.reply_to) + self.reply_rx.capacity = 10 + self.tx = self.sess.sender("qmf.default.direct/broker") + self.next_correlator = 1 + + def close(self): + """ + Close the proxy session. This will not affect the connection used in creating the object. + """ + self.sess.close() + + def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): + props = {'method' : 'request', + 'qmf.opcode' : '_method_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_object_id' : {'_object_name' : addr}, + '_method_name' : method, + '_arguments' : arguments} + + message = Message(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + response = self.reply_rx.fetch(timeout) + self.sess.acknowledge() + if response.properties['qmf.opcode'] == '_exception': + raise Exception("Exception from Agent: %r" % response.content['_values']) + if response.properties['qmf.opcode'] != '_method_response': + raise Exception("bad response: %r" % response.properties) + return response.content['_arguments'] + + def _sendRequest(self, opcode, content): + props = {'method' : 'request', + 'qmf.opcode' : opcode, + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + message = Message(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + return correlator + + def _doClassQuery(self, class_name): + query = {'_what' : 'OBJECT', + '_schema_id' : {'_class_name' : class_name}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + self.sess.acknowledge() + return items + + def _doNameQuery(self, object_id): + query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + self.sess.acknowledge() + if len(items) == 1: + return items[0] + return None + + def _getAllBrokerObjects(self, cls): + items = self._doClassQuery(cls.__name__.lower()) + objs = [] + for item in items: + objs.append(cls(self, item)) + return objs + + def _getBrokerObject(self, cls, oid): + obj = self._doNameQuery(oid) + if obj: + return cls(self, obj) + return None + + def _getSingleObject(self, cls): + # + # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because + # of a bug that used to be in the broker whereby by-name queries did not return the + # object timestamps. + # + objects = self._getAllBrokerObjects(cls) + if objects: return objects[0] + return None + + def getBroker(self): + """ + Get the Broker object that contains broker-scope statistics and operations. + """ + return self._getSingleObject(Broker) + + + def getCluster(self): + return self._getSingleObject(Cluster) + + def getHaBroker(self): + return self._getSingleObject(HaBroker) + + def getAllConnections(self): + return self._getAllBrokerObjects(Connection) + + def getConnection(self, oid): + return self._getBrokerObject(Connection, "org.apache.qpid.broker:connection:%s" % oid) + + def getAllSessions(self): + return self._getAllBrokerObjects(Session) + + def getSession(self, oid): + return self._getBrokerObject(Session, "org.apache.qpid.broker:session:%s" % oid) + + def getAllSubscriptions(self): + return self._getAllBrokerObjects(Subscription) + + def getSubscription(self, oid): + return self._getBrokerObject(Subscription, "org.apache.qpid.broker:subscription:%s" % oid) + + def getAllExchanges(self): + return self._getAllBrokerObjects(Exchange) + + def getExchange(self, name): + return self._getBrokerObject(Exchange, "org.apache.qpid.broker:exchange:%s" % name) + + def getAllQueues(self): + return self._getAllBrokerObjects(Queue) + + def getQueue(self, name): + return self._getBrokerObject(Queue, "org.apache.qpid.broker:queue:%s" % name) + + def getAllBindings(self): + return self._getAllBrokerObjects(Binding) + + def getAllLinks(self): + return self._getAllBrokerObjects(Link) + + def getAcl(self): + return self._getSingleObject(Acl) + + def echo(self, sequence, body): + """Request a response to test the path to the management broker""" + pass + + def connect(self, host, port, durable, authMechanism, username, password, transport): + """Establish a connection to another broker""" + pass + + def queueMoveMessages(self, srcQueue, destQueue, qty): + """Move messages from one queue to another""" + pass + + def setLogLevel(self, level): + """Set the log level""" + pass + + def getLogLevel(self): + """Get the log level""" + pass + + def setTimestampConfig(self, receive): + """Set the message timestamping configuration""" + pass + + def getTimestampConfig(self): + """Get the message timestamping configuration""" + pass + + def addExchange(self, exchange_type, name, options={}, **kwargs): + properties = {} + properties['exchange-type'] = exchange_type + for k,v in options.items(): + properties[k] = v + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'exchange', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delExchange(self, name): + args = {'type': 'exchange', 'name': name} + self._method('delete', args) + + def addQueue(self, name, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'queue', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delQueue(self, name): + args = {'type': 'queue', 'name': name} + self._method('delete', args) + + def bind(self, exchange, queue, key, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'properties': properties, + 'strict': True} + self._method('create', args) + + def unbind(self, exchange, queue, key, **kwargs): + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'strict': True} + self._method('delete', args) + + def reloadAclFile(self): + self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") + + def create(self, _type, name, properties, strict): + """Create an object of the specified type""" + pass + + def delete(self, _type, name, options): + """Delete an object of the specified type""" + pass + + def query(self, _type, oid): + """Query the current state of an object""" + return self._getBrokerObject(self, _type, oid) + + +class BrokerObject(object): + def __init__(self, broker, content): + self.broker = broker + self.content = content + self.values = content['_values'] + + def __getattr__(self, key): + if key not in self.values: + return None + value = self.values[key] + if value.__class__ == dict and '_object_name' in value: + full_name = value['_object_name'] + colon = full_name.find(':') + if colon > 0: + full_name = full_name[colon+1:] + colon = full_name.find(':') + if colon > 0: + return full_name[colon+1:] + return value + + def getObjectId(self): + return self.content['_object_id']['_object_name'] + + def getAttributes(self): + return self.values + + def getCreateTime(self): + return self.content['_create_ts'] + + def getDeleteTime(self): + return self.content['_delete_ts'] + + def getUpdateTime(self): + return self.content['_update_ts'] + + def update(self): + """ + Reload the property values from the agent. + """ + refreshed = self.broker._getBrokerObject(self.__class__, self.getObjectId()) + if refreshed: + self.content = refreshed.content + self.values = self.content['_values'] + else: + raise Exception("No longer exists on the broker") + +class Broker(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Cluster(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class HaBroker(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Memory(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Connection(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def close(self): + pass + +class Session(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Subscription(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def __repr__(self): + return "subscription name undefined" + +class Exchange(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Binding(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def __repr__(self): + return "Binding key: %s" % self.values['bindingKey'] + +class Queue(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def purge(self, request): + """Discard all or some messages on a queue""" + self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name) + + def reroute(self, request, useAltExchange, exchange, filter={}): + """Remove all or some messages on this queue and route them to an exchange""" + self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, + "org.apache.qpid.broker:queue:%s" % self.name) + +class Link(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Acl(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) diff --git a/src/py/qpidtoollibs/disp.py b/src/py/qpidtoollibs/disp.py new file mode 100644 index 0000000..a0c7737 --- /dev/null +++ b/src/py/qpidtoollibs/disp.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from time import strftime, gmtime + +def YN(val): + if val: + return 'Y' + return 'N' + +def Commas(value): + sval = str(value) + result = "" + while True: + if len(sval) == 0: + return result + left = sval[:-3] + right = sval[-3:] + result = right + result + if len(left) > 0: + result = ',' + result + sval = left + +def TimeLong(value): + return strftime("%c", gmtime(value / 1000000000)) + +def TimeShort(value): + return strftime("%X", gmtime(value / 1000000000)) + + +class Header: + """ """ + NONE = 1 + KMG = 2 + YN = 3 + Y = 4 + TIME_LONG = 5 + TIME_SHORT = 6 + DURATION = 7 + COMMAS = 8 + + def __init__(self, text, format=NONE): + self.text = text + self.format = format + + def __repr__(self): + return self.text + + def __str__(self): + return self.text + + def formatted(self, value): + try: + if value == None: + return '' + if self.format == Header.NONE: + return value + if self.format == Header.KMG: + return self.num(value) + if self.format == Header.YN: + if value: + return 'Y' + return 'N' + if self.format == Header.Y: + if value: + return 'Y' + return '' + if self.format == Header.TIME_LONG: + return TimeLong(value) + if self.format == Header.TIME_SHORT: + return TimeShort(value) + if self.format == Header.DURATION: + if value < 0: value = 0 + sec = value / 1000000000 + min = sec / 60 + hour = min / 60 + day = hour / 24 + result = "" + if day > 0: + result = "%dd " % day + if hour > 0 or result != "": + result += "%dh " % (hour % 24) + if min > 0 or result != "": + result += "%dm " % (min % 60) + result += "%ds" % (sec % 60) + return result + if self.format == Header.COMMAS: + return Commas(value) + except: + return "?" + + def numCell(self, value, tag): + fp = float(value) / 1000. + if fp < 10.0: + return "%1.2f%c" % (fp, tag) + if fp < 100.0: + return "%2.1f%c" % (fp, tag) + return "%4d%c" % (value / 1000, tag) + + def num(self, value): + if value < 1000: + return "%4d" % value + if value < 1000000: + return self.numCell(value, 'k') + value /= 1000 + if value < 1000000: + return self.numCell(value, 'm') + value /= 1000 + return self.numCell(value, 'g') + + +class Display: + """ Display formatting for QPID Management CLI """ + + def __init__(self, spacing=2, prefix=" "): + self.tableSpacing = spacing + self.tablePrefix = prefix + self.timestampFormat = "%X" + + def formattedTable(self, title, heads, rows): + fRows = [] + for row in rows: + fRow = [] + col = 0 + for cell in row: + fRow.append(heads[col].formatted(cell)) + col += 1 + fRows.append(fRow) + headtext = [] + for head in heads: + headtext.append(head.text) + self.table(title, headtext, fRows) + + def table(self, title, heads, rows): + """ Print a table with autosized columns """ + + # Pad the rows to the number of heads + for row in rows: + diff = len(heads) - len(row) + for idx in range(diff): + row.append("") + + print title + if len (rows) == 0: + return + colWidth = [] + col = 0 + line = self.tablePrefix + for head in heads: + width = len (head) + for row in rows: + cellWidth = len (unicode (row[col])) + if cellWidth > width: + width = cellWidth + colWidth.append (width + self.tableSpacing) + line = line + head + if col < len (heads) - 1: + for i in range (colWidth[col] - len (head)): + line = line + " " + col = col + 1 + print line + line = self.tablePrefix + for width in colWidth: + for i in range (width): + line = line + "=" + print line + + for row in rows: + line = self.tablePrefix + col = 0 + for width in colWidth: + line = line + unicode (row[col]) + if col < len (heads) - 1: + for i in range (width - len (unicode (row[col]))): + line = line + " " + col = col + 1 + print line + + def do_setTimeFormat (self, fmt): + """ Select timestamp format """ + if fmt == "long": + self.timestampFormat = "%c" + elif fmt == "short": + self.timestampFormat = "%X" + + def timestamp (self, nsec): + """ Format a nanosecond-since-the-epoch timestamp for printing """ + return strftime (self.timestampFormat, gmtime (nsec / 1000000000)) + + def duration(self, nsec): + if nsec < 0: nsec = 0 + sec = nsec / 1000000000 + min = sec / 60 + hour = min / 60 + day = hour / 24 + result = "" + if day > 0: + result = "%dd " % day + if hour > 0 or result != "": + result += "%dh " % (hour % 24) + if min > 0 or result != "": + result += "%dm " % (min % 60) + result += "%ds" % (sec % 60) + return result + +class Sortable: + """ """ + def __init__(self, row, sortIndex): + self.row = row + self.sortIndex = sortIndex + if sortIndex >= len(row): + raise Exception("sort index exceeds row boundary") + + def __cmp__(self, other): + return cmp(self.row[self.sortIndex], other.row[self.sortIndex]) + + def getRow(self): + return self.row + +class Sorter: + """ """ + def __init__(self, heads, rows, sortCol, limit=0, inc=True): + col = 0 + for head in heads: + if head.text == sortCol: + break + col += 1 + if col == len(heads): + raise Exception("sortCol '%s', not found in headers" % sortCol) + + list = [] + for row in rows: + list.append(Sortable(row, col)) + list.sort() + if not inc: + list.reverse() + count = 0 + self.sorted = [] + for row in list: + self.sorted.append(row.getRow()) + count += 1 + if count == limit: + break + + def getSorted(self): + return self.sorted