Skip to content

Commit

Permalink
additional sample application from samples branch
Browse files Browse the repository at this point in the history
  • Loading branch information
JoelBender committed Feb 10, 2016
2 parents e2b4d90 + 749b934 commit 9584df6
Showing 1 changed file with 220 additions and 0 deletions.
220 changes: 220 additions & 0 deletions samples/WhoIsIAmForeign.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
#!/usr/bin/python

"""
This application presents a 'console' prompt to the user asking for Who-Is and I-Am
commands which create the related APDUs, then lines up the coorresponding I-Am
for incoming traffic and prints out the contents.
"""

import sys

from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd

from bacpypes.core import run

from bacpypes.pdu import Address, GlobalBroadcast
from bacpypes.app import LocalDeviceObject, BIPForeignApplication

from bacpypes.apdu import WhoIsRequest, IAmRequest
from bacpypes.basetypes import ServicesSupported
from bacpypes.errors import DecodingError

# some debugging
_debug = 0
_log = ModuleLogger(globals())

# globals
this_device = None
this_application = None
this_console = None

#
# WhoIsIAmApplication
#

class WhoIsIAmApplication(BIPForeignApplication):

def __init__(self, *args):
if _debug: WhoIsIAmApplication._debug("__init__ %r", args)
BIPForeignApplication.__init__(self, *args)

# keep track of requests to line up responses
self._request = None

def request(self, apdu):
if _debug: WhoIsIAmApplication._debug("request %r", apdu)

# save a copy of the request
self._request = apdu

# forward it along
BIPForeignApplication.request(self, apdu)

def confirmation(self, apdu):
if _debug: WhoIsIAmApplication._debug("confirmation %r", apdu)

# forward it along
BIPForeignApplication.confirmation(self, apdu)

def indication(self, apdu):
if _debug: WhoIsIAmApplication._debug("indication %r", apdu)

if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)):
device_type, device_instance = apdu.iAmDeviceIdentifier
if device_type != 'device':
raise DecodingError("invalid object type")

if (self._request.deviceInstanceRangeLowLimit is not None) and \
(device_instance < self._request.deviceInstanceRangeLowLimit):
pass
elif (self._request.deviceInstanceRangeHighLimit is not None) and \
(device_instance > self._request.deviceInstanceRangeHighLimit):
pass
else:
# print out the contents
sys.stdout.write('pduSource = ' + repr(apdu.pduSource) + '\n')
sys.stdout.write('iAmDeviceIdentifier = ' + str(apdu.iAmDeviceIdentifier) + '\n')
sys.stdout.write('maxAPDULengthAccepted = ' + str(apdu.maxAPDULengthAccepted) + '\n')
sys.stdout.write('segmentationSupported = ' + str(apdu.segmentationSupported) + '\n')
sys.stdout.write('vendorID = ' + str(apdu.vendorID) + '\n')
sys.stdout.flush()

# forward it along
BIPForeignApplication.indication(self, apdu)

bacpypes_debugging(WhoIsIAmApplication)

#
# WhoIsIAmConsoleCmd
#

class WhoIsIAmConsoleCmd(ConsoleCmd):

def do_whois(self, args):
"""whois [ <addr>] [ <lolimit> <hilimit> ]"""
args = args.split()
if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args)

try:
# build a request
request = WhoIsRequest()
if (len(args) == 1) or (len(args) == 3):
request.pduDestination = Address(args[0])
del args[0]
else:
request.pduDestination = GlobalBroadcast()

if len(args) == 2:
request.deviceInstanceRangeLowLimit = int(args[0])
request.deviceInstanceRangeHighLimit = int(args[1])
if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request)

# give it to the application
this_application.request(request)

except Exception as err:
WhoIsIAmConsoleCmd._exception("exception: %r", err)

def do_iam(self, args):
"""iam"""
args = args.split()
if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args)

try:
# build a request
request = IAmRequest()
request.pduDestination = GlobalBroadcast()

# set the parameters from the device object
request.iAmDeviceIdentifier = this_device.objectIdentifier
request.maxAPDULengthAccepted = this_device.maxApduLengthAccepted
request.segmentationSupported = this_device.segmentationSupported
request.vendorID = this_device.vendorIdentifier
if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request)

# give it to the application
this_application.request(request)

except Exception as err:
WhoIsIAmConsoleCmd._exception("exception: %r", err)

def do_rtn(self, args):
"""rtn <addr> <net> ... """
args = args.split()
if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args)

# safe to assume only one adapter
adapter = this_application.nsap.adapters[0]
if _debug: WhoIsIAmConsoleCmd._debug(" - adapter: %r", adapter)

# provide the address and a list of network numbers
router_address = Address(args[0])
network_list = [int(arg) for arg in args[1:]]

# pass along to the service access point
this_application.nsap.add_router_references(adapter, router_address, network_list)

bacpypes_debugging(WhoIsIAmConsoleCmd)

#
# main
#

def main():
global this_device, this_application, this_console

# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()

if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)

# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
if _debug: _log.debug(" - this_device: %r", this_device)

# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1

# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value

# make a simple application
this_application = WhoIsIAmApplication(
this_device, args.ini.address,
Address(args.ini.foreignbbmd),
int(args.ini.foreignttl),
)
if _debug: _log.debug(" - this_application: %r", this_application)

# get the services supported
services_supported = this_application.get_services_supported()
if _debug: _log.debug(" - services_supported: %r", services_supported)

# let the device object know
this_device.protocolServicesSupported = services_supported.value

# make a console
this_console = WhoIsIAmConsoleCmd()
if _debug: _log.debug(" - this_console: %r", this_console)

_log.debug("running")

run()

_log.debug("finally")

if __name__ == "__main__":
main()

0 comments on commit 9584df6

Please sign in to comment.