Skip to content

Commit

Permalink
Merge tag 'upstream/0.16'
Browse files Browse the repository at this point in the history
Upstream version 0.16
  • Loading branch information
cajus committed May 18, 2012
2 parents 111e103 + afa8744 commit f6eb39f
Show file tree
Hide file tree
Showing 11 changed files with 1,304 additions and 382 deletions.
2 changes: 1 addition & 1 deletion PKG-INFO
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Metadata-Version: 1.0
Name: qpid-tools
Version: 0.14
Version: 0.16
Summary: Diagnostic and management tools for Apache Qpid brokers.
Home-page: http://qpid.apache.org/
Author: Apache Qpid
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
from distutils.core import setup

setup(name="qpid-tools",
version="0.14",
version="0.16",
author="Apache Qpid",
author_email="[email protected]",
package_dir={'' : 'src/py'},
packages=["qpidtoollibs"],
scripts=["src/py/qpid-cluster",
"src/py/qpid-cluster-store",
"src/py/qpid-config",
"src/py/qpid-ha",
"src/py/qpid-printevents",
"src/py/qpid-queue-stats",
"src/py/qpid-route",
Expand Down
2 changes: 1 addition & 1 deletion src/py/qmf-tool
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class QmfData:
self.conn_options = conn_options
self.qmf_options = qmf_options
self.agent_filter = '[]'
self.connection = cqpid.Connection(self.url, self.conn_options)
self.connection = cqpid.Connection(self.url, **self.conn_options)
self.connection.open()
self.session = qmf2.ConsoleSession(self.connection, self.qmf_options)
self.session.setAgentFilter(self.agent_filter)
Expand Down
245 changes: 144 additions & 101 deletions src/py/qpid-config

Large diffs are not rendered by default.

159 changes: 159 additions & 0 deletions src/py/qpid-ha
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import qmf.console, optparse, sys, time, os
from qpid.management import managementChannel, managementClient
from qpid.messaging import Connection
from qpid.messaging import Message as QpidMessage
from qpidtoollibs.broker import BrokerAgent
try:
from uuid import uuid4
except ImportError:
from qpid.datatypes import uuid4

# QMF address for the HA broker object.
HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"

class Command:
commands = {}

def __init__(self, name, help, args=[]):
Command.commands[name] = self
self.name = name
self.args = args
usage="%s [options] %s\n\n%s"%(name, " ".join(args), help)
self.help = help
self.op=optparse.OptionParser(usage)
self.op.add_option("-b", "--broker", metavar="<url>", help="Connect to broker at <url>")

def execute(self):
opts, args = self.op.parse_args()
if len(args) != len(self.args)+1:
self.op.print_help()
raise Exception("Wrong number of arguments")
broker = opts.broker or "localhost:5672"
connection = Connection.establish(broker, client_properties={"qpid.ha-admin":1})
qmf_broker = BrokerAgent(connection)
ha_broker = qmf_broker.getHaBroker()
if not ha_broker: raise Exception("HA module is not loaded on broker at %s"%broker)
try: return self.do_execute(qmf_broker, ha_broker, opts, args)
finally: connection.close()

def do_execute(self, qmf_broker, opts, args):
raise Exception("Command '%s' is not yet implemented"%self.name)

class PromoteCmd(Command):
def __init__(self):
Command.__init__(self, "promote","Promote broker from backup to primary")
def do_execute(self, qmf_broker, ha_broker, opts, args):
qmf_broker._method("promote", {}, HA_BROKER)
PromoteCmd()

class ReadyCmd(Command):
def __init__(self):
Command.__init__(self, "ready", "Test if a backup broker is ready.\nReturn 0 if broker is a ready backup, non-0 otherwise.")
self.op.add_option(
"--wait", type="int", metavar="<seconds>", default=None,
help="Wait up to <seconds> for broker to be ready. 0 means wait forever.")
def do_execute(self, qmf_broker, ha_broker, opts, args):
if (ha_broker.status == "backup"): return
if (ha_broker.status != "catch-up"):
raise Exception("Broker is not a backup, status is '%s'"%ha_broker.status)
if (opts.wait is None): return 1
delay = 0.1
timeout = time.time() + opts.wait
while opts.wait == 0 or time.time() < timeout:
time.sleep(delay)
delay = min(2*delay, 1)
ha_broker = qmf_broker.getHaBroker()
if (ha_broker.status == "backup"): return
return 1
ReadyCmd()

class ReplicateCmd(Command):
def __init__(self):
Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"])
def do_execute(self, qmf_broker, ha_broker, opts, args):
qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER)
ReplicateCmd()

class SetCmd(Command):
def __init__(self):
Command.__init__(self, "set", "Set HA configuration settings")
def add(optname, metavar, type, help):
self.op.add_option(optname, metavar=metavar, type=type, help=help, action="store")
add("--brokers", "<url>", "string", "HA brokers use <url> to connect to each other")
add("--public-brokers", "<url>", "string", "Clients use <url> to connect to HA brokers")
add("--backups", "<n>", "int", "Expect <n> backups to be running"),

def do_execute(self, qmf_broker, ha_broker, opts, args):
if (opts.brokers): qmf_broker._method("setBrokers", {"url":opts.brokers}, HA_BROKER)
if (opts.public_brokers): qmf_broker._method("setPublicBrokers", {"url":opts.public_brokers}, HA_BROKER)
if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER)

SetCmd()

class QueryCmd(Command):
def __init__(self):
Command.__init__(self, "query", "Print HA configuration settings")

def do_execute(self, qmf_broker, ha_broker, opts, args):
hb = ha_broker
for x in [("Status:", hb.status),
("Brokers URL:", hb.brokers),
("Public URL:", hb.publicBrokers),
("Expected Backups:", hb.expectedBackups)
]:
print "%-20s %s"%(x[0], x[1])
QueryCmd()

def print_usage(prog):
print "usage: %s <command> [<arguments>]\n\nCommands are:\n"%prog
for name, command in Command.commands.iteritems():
help = command.help
print " %-12s %s."%(name, help.split(".")[0])
print "\nFor help with a command type: %s <command> --help\n"%prog

def find_command(args):
"""Find a command among the arguments and options"""
for arg in args:
if arg in Command.commands:
return Command.commands[arg]
return None

def main(argv):
try:
args=argv[1:]
if args and args[0] == "--help-all":
for c in Command.commands.itervalues():
c.op.print_help(); print
return 1
command = find_command(args)
if not command:
print_usage(os.path.basename(argv[0]));
return 1;
if command.execute(): return 1
except Exception, e:
print e
return 1

if __name__ == "__main__":
sys.exit(main(sys.argv))
10 changes: 8 additions & 2 deletions src/py/qpid-route
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Config:
self._ack = 0
self._connTimeout = 10
self._client_sasl_mechanism = None
self._ha_admin = False

config = Config()

Expand Down Expand Up @@ -96,7 +97,7 @@ def OptionsAndArguments(argv):
parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")

parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")

parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
opts, encArgs = parser.parse_args(args=argv)

try:
Expand Down Expand Up @@ -128,6 +129,9 @@ def OptionsAndArguments(argv):
if opts.transport:
config._transport = opts.transport

if opts.ha_admin:
config._ha_admin = True

if opts.ack:
config._ack = opts.ack

Expand All @@ -143,7 +147,9 @@ class RouteManager:
self.local = BrokerURL(localBroker)
self.remote = None
self.qmf = Session()
self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism)
client_properties = {}
if config._ha_admin: client_properties["qpid.ha-admin"] = 1
self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism, client_properties=client_properties)
self.broker._waitForStable()
self.agent = self.broker.getBrokerAgent()

Expand Down
Loading

0 comments on commit f6eb39f

Please sign in to comment.