From 6fe4dc194cccbc1df86b6259efe63ef19ab61b93 Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Sun, 8 Oct 2017 00:07:21 -0400 Subject: [PATCH] version 0.16.4 released --- py25/bacpypes/__init__.py | 2 +- py25/bacpypes/app.py | 11 + py25/bacpypes/constructeddata.py | 5 + py25/bacpypes/pdu.py | 4 +- py25/bacpypes/primitivedata.py | 14 +- py27/bacpypes/__init__.py | 2 +- py27/bacpypes/app.py | 11 + py27/bacpypes/constructeddata.py | 5 + py27/bacpypes/pdu.py | 4 +- py27/bacpypes/primitivedata.py | 12 +- py34/bacpypes/__init__.py | 2 +- py34/bacpypes/app.py | 11 + py34/bacpypes/constructeddata.py | 5 + samples/MultipleReadPropertyHammer.py | 229 ++++++++++++++++++ samples/RandomAnalogValueSleep.py | 148 +++++++++++ sandbox/add_remove_property.py | 59 +++++ tests/test_pdu/test_address.py | 175 +++++++++++-- .../test_character_string.py | 9 +- tests/test_primitive_data/test_enumerated.py | 42 +++- .../test_object_identifier.py | 10 +- tests/test_primitive_data/test_object_type.py | 22 +- 21 files changed, 741 insertions(+), 41 deletions(-) create mode 100755 samples/MultipleReadPropertyHammer.py create mode 100644 samples/RandomAnalogValueSleep.py create mode 100644 sandbox/add_remove_property.py diff --git a/py25/bacpypes/__init__.py b/py25/bacpypes/__init__.py index c8b909c1..c391a4bf 100755 --- a/py25/bacpypes/__init__.py +++ b/py25/bacpypes/__init__.py @@ -18,7 +18,7 @@ # Project Metadata # -__version__ = '0.16.3' +__version__ = '0.16.4' __author__ = 'Joel Bender' __email__ = 'joel@carrickbender.com' diff --git a/py25/bacpypes/app.py b/py25/bacpypes/app.py index 0dc4b464..059da41e 100755 --- a/py25/bacpypes/app.py +++ b/py25/bacpypes/app.py @@ -135,8 +135,12 @@ def get_device_info(self, key): current_info = DeviceInfo() current_info.address = key current_info._cache_keys = (None, key) + current_info._ref_count = 1 self.cache[key] = current_info + else: + if _debug: DeviceInfoCache._debug(" - reference bump") + current_info._ref_count += 1 if _debug: DeviceInfoCache._debug(" - current_info: %r", current_info) @@ -177,11 +181,18 @@ def release_device_info(self, info): has finished with the device information.""" if _debug: DeviceInfoCache._debug("release_device_info %r", info) + # this information record might be used by more than one SSM + if info._ref_count > 1: + if _debug: DeviceInfoCache._debug(" - multiple references") + info._ref_count -= 1 + return + cache_id, cache_address = info._cache_keys if cache_id is not None: del self.cache[cache_id] if cache_address is not None: del self.cache[cache_address] + if _debug: DeviceInfoCache._debug(" - released") bacpypes_debugging(DeviceInfoCache) diff --git a/py25/bacpypes/constructeddata.py b/py25/bacpypes/constructeddata.py index 2e589b8e..1d1cade5 100755 --- a/py25/bacpypes/constructeddata.py +++ b/py25/bacpypes/constructeddata.py @@ -600,6 +600,11 @@ def index(self, value): # not found raise ValueError("%r not in array" % (value,)) + def remove(self, item): + # find the index of the item and delete it + indx = self.index(item) + self.__delitem__(indx) + def encode(self, taglist): if _debug: ArrayOf._debug("(%r)encode %r", self.__class__.__name__, taglist) diff --git a/py25/bacpypes/pdu.py b/py25/bacpypes/pdu.py index 4ed3df52..825309ab 100755 --- a/py25/bacpypes/pdu.py +++ b/py25/bacpypes/pdu.py @@ -91,7 +91,7 @@ def decode_address(self, addr): self.addrAddr = struct.pack('B', addr) self.addrLen = 1 - elif isinstance(addr, str): + elif isinstance(addr, basestring): if _debug: Address._debug(" - str") m = ip_address_mask_port_re.match(addr) @@ -263,7 +263,7 @@ def decode_address(self, addr): addr, port = addr self.addrPort = int(port) - if isinstance(addr, str): + if isinstance(addr, basestring): if not addr: # when ('', n) is passed it is the local host address, but that # could be more than one on a multihomed machine, the empty string diff --git a/py25/bacpypes/primitivedata.py b/py25/bacpypes/primitivedata.py index 1335041c..64070aa9 100755 --- a/py25/bacpypes/primitivedata.py +++ b/py25/bacpypes/primitivedata.py @@ -1073,7 +1073,7 @@ def __init__(self, arg=None): # convert it to a string if you can self.value = self._xlate_table.get(arg, arg) - elif isinstance(arg, str): + elif isinstance(arg, basestring): if arg not in self._xlate_table: raise ValueError("undefined enumeration '%s'" % (arg,)) self.value = arg @@ -1088,7 +1088,7 @@ def __getitem__(self, item): def get_long(self): if isinstance(self.value, (int, long)): return self.value - elif isinstance(self.value, str): + elif isinstance(self.value, basestring): return long(self._xlate_table[self.value]) else: raise TypeError("%s is an invalid enumeration value datatype" % (type(self.value),)) @@ -1132,7 +1132,7 @@ def encode(self, tag): value = long(self.value) elif isinstance(self.value, long): value = self.value - elif isinstance(self.value, str): + elif isinstance(self.value, basestring): value = self._xlate_table[self.value] else: raise TypeError("%s is an invalid enumeration value datatype" % (type(self.value),)) @@ -1170,7 +1170,7 @@ def is_valid(cls, arg): value is wrong for the enumeration, the encoding will fail. """ return (isinstance(arg, (int, long)) and (arg >= 0)) or \ - isinstance(arg, str) + isinstance(arg, basestring) def __str__(self): return "%s(%s)" % (self.__class__.__name__, self.value) @@ -1607,7 +1607,7 @@ def set_tuple(self, objType, objInstance): objType = self.objectTypeClass._xlate_table.get(objType, objType) elif isinstance(objType, long): objType = self.objectTypeClass._xlate_table.get(objType, int(objType)) - elif isinstance(objType, str): + elif isinstance(objType, basestring): # make sure the type is known if objType not in self.objectTypeClass._xlate_table: raise ValueError("unrecognized object type '%s'" % (objType,)) @@ -1629,7 +1629,7 @@ def get_tuple(self): pass elif isinstance(objType, long): objType = int(objType) - elif isinstance(objType, str): + elif isinstance(objType, basestring): # turn it back into an integer objType = self.objectTypeClass()[objType] else: @@ -1680,7 +1680,7 @@ def __str__(self): # rip it apart objType, objInstance = self.value - if isinstance(objType, str): + if isinstance(objType, basestring): typestr = objType elif objType < 0: typestr = "Bad %d" % (objType,) diff --git a/py27/bacpypes/__init__.py b/py27/bacpypes/__init__.py index c8b909c1..c391a4bf 100755 --- a/py27/bacpypes/__init__.py +++ b/py27/bacpypes/__init__.py @@ -18,7 +18,7 @@ # Project Metadata # -__version__ = '0.16.3' +__version__ = '0.16.4' __author__ = 'Joel Bender' __email__ = 'joel@carrickbender.com' diff --git a/py27/bacpypes/app.py b/py27/bacpypes/app.py index bd35a64d..3aa33ec6 100755 --- a/py27/bacpypes/app.py +++ b/py27/bacpypes/app.py @@ -135,8 +135,12 @@ def get_device_info(self, key): current_info = DeviceInfo() current_info.address = key current_info._cache_keys = (None, key) + current_info._ref_count = 1 self.cache[key] = current_info + else: + if _debug: DeviceInfoCache._debug(" - reference bump") + current_info._ref_count += 1 if _debug: DeviceInfoCache._debug(" - current_info: %r", current_info) @@ -177,11 +181,18 @@ def release_device_info(self, info): has finished with the device information.""" if _debug: DeviceInfoCache._debug("release_device_info %r", info) + # this information record might be used by more than one SSM + if info._ref_count > 1: + if _debug: DeviceInfoCache._debug(" - multiple references") + info._ref_count -= 1 + return + cache_id, cache_address = info._cache_keys if cache_id is not None: del self.cache[cache_id] if cache_address is not None: del self.cache[cache_address] + if _debug: DeviceInfoCache._debug(" - released") # # Application diff --git a/py27/bacpypes/constructeddata.py b/py27/bacpypes/constructeddata.py index 75818ad3..38e491c8 100755 --- a/py27/bacpypes/constructeddata.py +++ b/py27/bacpypes/constructeddata.py @@ -598,6 +598,11 @@ def index(self, value): # not found raise ValueError("%r not in array" % (value,)) + def remove(self, item): + # find the index of the item and delete it + indx = self.index(item) + self.__delitem__(indx) + def encode(self, taglist): if _debug: ArrayOf._debug("(%r)encode %r", self.__class__.__name__, taglist) diff --git a/py27/bacpypes/pdu.py b/py27/bacpypes/pdu.py index 55053617..eb848bd6 100755 --- a/py27/bacpypes/pdu.py +++ b/py27/bacpypes/pdu.py @@ -93,7 +93,7 @@ def decode_address(self, addr): self.addrAddr = struct.pack('B', addr) self.addrLen = 1 - elif isinstance(addr, str): + elif isinstance(addr, basestring): if _debug: Address._debug(" - str") m = ip_address_mask_port_re.match(addr) @@ -259,7 +259,7 @@ def decode_address(self, addr): addr, port = addr self.addrPort = int(port) - if isinstance(addr, str): + if isinstance(addr, basestring): if not addr: # when ('', n) is passed it is the local host address, but that # could be more than one on a multihomed machine, the empty string diff --git a/py27/bacpypes/primitivedata.py b/py27/bacpypes/primitivedata.py index 1047f43e..91c10d7d 100755 --- a/py27/bacpypes/primitivedata.py +++ b/py27/bacpypes/primitivedata.py @@ -1078,7 +1078,7 @@ def __init__(self, arg=None): # convert it to a string if you can self.value = self._xlate_table.get(arg, arg) - elif isinstance(arg, str): + elif isinstance(arg, basestring): if arg not in self._xlate_table: raise ValueError("undefined enumeration '%s'" % (arg,)) self.value = arg @@ -1093,7 +1093,7 @@ def __getitem__(self, item): def get_long(self): if isinstance(self.value, (int, long)): return self.value - elif isinstance(self.value, str): + elif isinstance(self.value, basestring): return long(self._xlate_table[self.value]) else: raise TypeError("%s is an invalid enumeration value datatype" % (type(self.value),)) @@ -1137,7 +1137,7 @@ def encode(self, tag): value = long(self.value) elif isinstance(self.value, long): value = self.value - elif isinstance(self.value, str): + elif isinstance(self.value, basestring): value = self._xlate_table[self.value] else: raise TypeError("%s is an invalid enumeration value datatype" % (type(self.value),)) @@ -1613,7 +1613,7 @@ def set_tuple(self, objType, objInstance): objType = self.objectTypeClass._xlate_table.get(objType, objType) elif isinstance(objType, long): objType = self.objectTypeClass._xlate_table.get(objType, int(objType)) - elif isinstance(objType, str): + elif isinstance(objType, basestring): # make sure the type is known if objType not in self.objectTypeClass._xlate_table: raise ValueError("unrecognized object type '%s'" % (objType,)) @@ -1635,7 +1635,7 @@ def get_tuple(self): pass elif isinstance(objType, long): objType = int(objType) - elif isinstance(objType, str): + elif isinstance(objType, basestring): # turn it back into an integer objType = self.objectTypeClass()[objType] else: @@ -1686,7 +1686,7 @@ def __str__(self): # rip it apart objType, objInstance = self.value - if isinstance(objType, str): + if isinstance(objType, basestring): typestr = objType elif objType < 0: typestr = "Bad %d" % (objType,) diff --git a/py34/bacpypes/__init__.py b/py34/bacpypes/__init__.py index ae4133b8..6692b8eb 100755 --- a/py34/bacpypes/__init__.py +++ b/py34/bacpypes/__init__.py @@ -18,7 +18,7 @@ # Project Metadata # -__version__ = '0.16.3' +__version__ = '0.16.4' __author__ = 'Joel Bender' __email__ = 'joel@carrickbender.com' diff --git a/py34/bacpypes/app.py b/py34/bacpypes/app.py index bd35a64d..3aa33ec6 100755 --- a/py34/bacpypes/app.py +++ b/py34/bacpypes/app.py @@ -135,8 +135,12 @@ def get_device_info(self, key): current_info = DeviceInfo() current_info.address = key current_info._cache_keys = (None, key) + current_info._ref_count = 1 self.cache[key] = current_info + else: + if _debug: DeviceInfoCache._debug(" - reference bump") + current_info._ref_count += 1 if _debug: DeviceInfoCache._debug(" - current_info: %r", current_info) @@ -177,11 +181,18 @@ def release_device_info(self, info): has finished with the device information.""" if _debug: DeviceInfoCache._debug("release_device_info %r", info) + # this information record might be used by more than one SSM + if info._ref_count > 1: + if _debug: DeviceInfoCache._debug(" - multiple references") + info._ref_count -= 1 + return + cache_id, cache_address = info._cache_keys if cache_id is not None: del self.cache[cache_id] if cache_address is not None: del self.cache[cache_address] + if _debug: DeviceInfoCache._debug(" - released") # # Application diff --git a/py34/bacpypes/constructeddata.py b/py34/bacpypes/constructeddata.py index 45dc2285..54b9f5a9 100755 --- a/py34/bacpypes/constructeddata.py +++ b/py34/bacpypes/constructeddata.py @@ -598,6 +598,11 @@ def index(self, value): # not found raise ValueError("%r not in array" % (value,)) + def remove(self, item): + # find the index of the item and delete it + indx = self.index(item) + self.__delitem__(indx) + def encode(self, taglist): if _debug: ArrayOf._debug("(%r)encode %r", self.__class__.__name__, taglist) diff --git a/samples/MultipleReadPropertyHammer.py b/samples/MultipleReadPropertyHammer.py new file mode 100755 index 00000000..7bfa4e8b --- /dev/null +++ b/samples/MultipleReadPropertyHammer.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python + +""" +Mutliple Read Property Hammer + +This application blasts a list of ReadPropertyRequest messages with no +regard to the number of simultaneous requests to the same device. The +ReadPointListApplication is constructed like the BIPSimpleApplication but +without the ApplicationIOController interface and sieve. +""" + +import os +from time import time as _time +from copy import copy as _copy + +from random import shuffle + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run +from bacpypes.comm import bind +from bacpypes.task import RecurringTask + +from bacpypes.pdu import Address + +from bacpypes.app import Application +from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint +from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement +from bacpypes.bvllservice import BIPSimple, AnnexJCodec, UDPMultiplexer + +from bacpypes.apdu import ReadPropertyRequest + +from bacpypes.service.device import LocalDeviceObject, WhoIsIAmServices +from bacpypes.service.object import ReadWritePropertyServices + + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +args = None +this_application = None + +# settings +INTERVAL = float(os.getenv('INTERVAL', 10.0)) + +# point list, set according to your device +point_list = [ + ('10.0.1.21:47809', 'analogValue', 1, 'presentValue'), + ('10.0.1.21:47809', 'analogValue', 2, 'presentValue'), + ('10.0.1.21:47809', 'analogValue', 3, 'presentValue'), + ('10.0.1.21:47809', 'analogValue', 4, 'presentValue'), + ('10.0.1.21:47809', 'analogValue', 5, 'presentValue'), + ] + +# +# ReadPointListApplication +# + +@bacpypes_debugging +class ReadPointListApplication(Application, WhoIsIAmServices, ReadWritePropertyServices, RecurringTask): + + def __init__(self, localDevice, localAddress, deviceInfoCache=None, aseID=None): + if _debug: ReadPointListApplication._debug("__init__ %r %r deviceInfoCache=%r aseID=%r", localDevice, localAddress, deviceInfoCache, aseID) + global args + + Application.__init__(self, localDevice, deviceInfoCache, aseID=aseID) + RecurringTask.__init__(self, args.interval * 1000) + + # local address might be useful for subclasses + if isinstance(localAddress, Address): + self.localAddress = localAddress + else: + self.localAddress = Address(localAddress) + + # include a application decoder + self.asap = ApplicationServiceAccessPoint() + + # pass the device object to the state machine access point so it + # can know if it should support segmentation + self.smap = StateMachineAccessPoint(localDevice) + + # the segmentation state machines need access to the same device + # information cache as the application + self.smap.deviceInfoCache = self.deviceInfoCache + + # a network service access point will be needed + self.nsap = NetworkServiceAccessPoint() + + # give the NSAP a generic network layer service element + self.nse = NetworkServiceElement() + bind(self.nse, self.nsap) + + # bind the top layers + bind(self, self.asap, self.smap, self.nsap) + + # create a generic BIP stack, bound to the Annex J server + # on the UDP multiplexer + self.bip = BIPSimple() + self.annexj = AnnexJCodec() + self.mux = UDPMultiplexer(self.localAddress) + + # bind the bottom layers + bind(self.bip, self.annexj, self.mux.annexJ) + + # bind the BIP stack to the network, no network number + self.nsap.bind(self.bip) + + # install the task + self.install_task() + + # timer + self.start_time = None + + # pending requests + self.pending_requests = {} + + def process_task(self): + if _debug: ReadPointListApplication._debug("process_task") + global point_list + + # we might not have finished from the last round + if self.pending_requests: + if _debug: ReadPointListApplication._debug(" - %d pending", len(self.pending_requests)) + return + + # start the clock + self.start_time = _time() + + # make a copy of the point list and shuffle it + point_list_copy = _copy(point_list) + shuffle(point_list_copy) + + # loop through the points + for addr, obj_type, obj_inst, prop_id in point_list_copy: + # build a request + request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + if _debug: ReadPointListApplication._debug(" - request: %r", request) + + # send the request + self.request(request) + + # get the destination address from the pdu + request_key = request.pduDestination, request.apduInvokeID + if _debug: ReadPointListApplication._debug(" - request_key: %r", request_key) + + # make sure it's unused + if request_key in self.pending_requests: + raise RuntimeError("request key already used: %r" % (request_key,)) + + # add this to pending requests + self.pending_requests[request_key] = request + + def confirmation(self, apdu): + if _debug: ReadPointListApplication._debug("confirmation %r", apdu) + + # get the source address from the pdu + request_key = apdu.pduSource, apdu.apduInvokeID + if _debug: ReadPointListApplication._debug(" - request_key: %r", request_key) + + # make sure it's unused + if request_key not in self.pending_requests: + raise RuntimeError("request missing: %r" % (request_key,)) + + # this is no longer pending + del self.pending_requests[request_key] + + # we could be done with this interval + if not self.pending_requests: + elapsed_time = _time() - self.start_time + if _debug: ReadPointListApplication._debug(" - completed interval, %r seconds", elapsed_time) + + +# +# __main__ +# + +def main(): + global args, this_application + + # parse the command line arguments + parser = ConfigArgumentParser(description=__doc__) + + # add an option to override the interval time + parser.add_argument('--interval', type=float, + help="amount of time between intervals", + default=INTERVAL, + ) + + # parse the command line arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a simple application + this_application = ReadPointListApplication(this_device, args.ini.address) + + # get the services supported + services_supported = this_application.get_services_supported() + if _debug: _log.debug(" - services_supported: %r", services_supported) + + # let the device object know + this_device.protocolServicesSupported = services_supported.value + + _log.debug("running") + + run() + + _log.debug("fini") + + +if __name__ == "__main__": + main() diff --git a/samples/RandomAnalogValueSleep.py b/samples/RandomAnalogValueSleep.py new file mode 100644 index 00000000..ea71cdbc --- /dev/null +++ b/samples/RandomAnalogValueSleep.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python + +""" +Random Value Property with Sleep + +This application is a server of analog value objects that return a random +number when the present value is read. This version has an additional +'sleep' time that slows down its performance. +""" + +import os +import random +import time + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.primitivedata import Real +from bacpypes.object import AnalogValueObject, Property, register_object_type +from bacpypes.errors import ExecutionError + +from bacpypes.app import BIPSimpleApplication +from bacpypes.service.device import LocalDeviceObject + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# settings +SLEEP_TIME = float(os.getenv('SLEEP_TIME', 0.1)) +RANDOM_OBJECT_COUNT = int(os.getenv('RANDOM_OBJECT_COUNT', 10)) + +# globals +args = None + +# +# RandomValueProperty +# + +class RandomValueProperty(Property): + + def __init__(self, identifier): + if _debug: RandomValueProperty._debug("__init__ %r", identifier) + Property.__init__(self, identifier, Real, default=0.0, optional=True, mutable=False) + + def ReadProperty(self, obj, arrayIndex=None): + if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex) + global args + + # access an array + if arrayIndex is not None: + raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray') + + # sleep a little + time.sleep(args.sleep) + + # return a random value + value = random.random() * 100.0 + if _debug: RandomValueProperty._debug(" - value: %r", value) + + return value + + def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False): + if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct) + raise ExecutionError(errorClass='property', errorCode='writeAccessDenied') + +bacpypes_debugging(RandomValueProperty) + +# +# Random Value Object Type +# + +class RandomAnalogValueObject(AnalogValueObject): + + properties = [ + RandomValueProperty('presentValue'), + ] + + def __init__(self, **kwargs): + if _debug: RandomAnalogValueObject._debug("__init__ %r", kwargs) + AnalogValueObject.__init__(self, **kwargs) + +bacpypes_debugging(RandomAnalogValueObject) +register_object_type(RandomAnalogValueObject) + +# +# __main__ +# + +def main(): + global args + + # parse the command line arguments + parser = ConfigArgumentParser(description=__doc__) + + # add an option to override the sleep time + parser.add_argument('--sleep', type=float, + help="sleep before returning the value", + default=SLEEP_TIME, + ) + + # parse the command line arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=('device', int(args.ini.objectidentifier)), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # get the services supported + services_supported = this_application.get_services_supported() + if _debug: _log.debug(" - services_supported: %r", services_supported) + + # let the device object know + this_device.protocolServicesSupported = services_supported.value + + # make some random input objects + for i in range(1, RANDOM_OBJECT_COUNT+1): + ravo = RandomAnalogValueObject( + objectIdentifier=('analogValue', i), + objectName='Random-%d' % (i,), + ) + _log.debug(" - ravo: %r", ravo) + this_application.add_object(ravo) + + # make sure they are all there + _log.debug(" - object list: %r", this_device.objectList) + + _log.debug("running") + + run() + + _log.debug("fini") + +if __name__ == "__main__": + main() diff --git a/sandbox/add_remove_property.py b/sandbox/add_remove_property.py new file mode 100644 index 00000000..e5a90be7 --- /dev/null +++ b/sandbox/add_remove_property.py @@ -0,0 +1,59 @@ + +from bacpypes.basetypes import PropertyIdentifier +from bacpypes.constructeddata import ArrayOf +from bacpypes.object import AnalogValueObject + +# create an array of property identifiers datatype +ArrayOfPropertyIdentifier = ArrayOf(PropertyIdentifier) + +aopi = ArrayOfPropertyIdentifier() +aopi.append('objectName') +aopi.append('objectType') +aopi.append('description') +aopi.debug_contents() + +aopi.remove('objectType') +aopi.debug_contents() + +print("Create an Analog Value Object") +av = AnalogValueObject( + objectName='av-sample', + objectIdentifier=('analogValue', 1), + description="sample", + ) +av.debug_contents() +print("") + +print("Change the description") +av.description = "something else" +av.debug_contents() +print("") + + +# get the description property by the attribute name +description_property = av._attr_to_property('description') +print("description_property = %r" % (description_property,)) +print("") + +print("Delete the property") +av.delete_property(description_property) +print("...property deleted") + +try: + av.description = "this raises an exception" +except Exception as err: + print(repr(err)) +av.debug_contents() +print("") + +print("===== Add the property") +av.add_property(description_property) +print("...property added") + +try: + av.description = "this works" +except Exception as err: + print(repr(err)) +av.debug_contents() +print("") + diff --git a/tests/test_pdu/test_address.py b/tests/test_pdu/test_address.py index af382970..682a24e5 100644 --- a/tests/test_pdu/test_address.py +++ b/tests/test_pdu/test_address.py @@ -74,8 +74,8 @@ def test_address_int(self): with self.assertRaises(ValueError): Address(256) - def test_address_ipv4(self): - if _debug: TestAddress._debug("test_address_ipv4") + def test_address_ipv4_str(self): + if _debug: TestAddress._debug("test_address_ipv4_str") # test IPv4 local station address test_addr = Address("1.2.3.4") @@ -92,16 +92,42 @@ def test_address_ipv4(self): self.match_address(test_addr, 2, None, 6, '01020304bb7f') assert str(test_addr) == "0x01020304bb7f" - def test_address_eth(self): - if _debug: TestAddress._debug("test_address_eth") + def test_address_ipv4_unicode(self): + if _debug: TestAddress._debug("test_address_ipv4_unicode") + + # test IPv4 local station address + test_addr = Address(u"1.2.3.4") + self.match_address(test_addr, 2, None, 6, '01020304BAC0') + assert str(test_addr) == u"1.2.3.4" + + # test IPv4 local station address with non-standard port + test_addr = Address(u"1.2.3.4:47809") + self.match_address(test_addr, 2, None, 6, '01020304BAC1') + assert str(test_addr) == u"1.2.3.4:47809" + + # test IPv4 local station address with unrecognized port + test_addr = Address(u"1.2.3.4:47999") + self.match_address(test_addr, 2, None, 6, '01020304bb7f') + assert str(test_addr) == u"0x01020304bb7f" + + def test_address_eth_str(self): + if _debug: TestAddress._debug("test_address_eth_str") # test Ethernet local station address test_addr = Address("01:02:03:04:05:06") self.match_address(test_addr, 2, None, 6, '010203040506') assert str(test_addr) == "0x010203040506" - def test_address_local_station(self): - if _debug: TestAddress._debug("test_address_local_station") + def test_address_eth_unicode(self): + if _debug: TestAddress._debug("test_address_eth_unicode") + + # test Ethernet local station address + test_addr = Address(u"01:02:03:04:05:06") + self.match_address(test_addr, 2, None, 6, '010203040506') + assert str(test_addr) == u"0x010203040506" + + def test_address_local_station_str(self): + if _debug: TestAddress._debug("test_address_local_station_str") # test integer local station test_addr = Address("1") @@ -134,16 +160,58 @@ def test_address_local_station(self): self.match_address(test_addr, 2, None, 2, '0102') assert str(test_addr) == "0x0102" - def test_address_local_broadcast(self): - if _debug: TestAddress._debug("test_address_local_broadcast") + def test_address_local_station_unicode(self): + if _debug: TestAddress._debug("test_address_local_station_unicode") + + # test integer local station + test_addr = Address(u"1") + self.match_address(test_addr, 2, None, 1, '01') + assert str(test_addr) == u"1" + + test_addr = Address(u"254") + self.match_address(test_addr, 2, None, 1, 'fe') + assert str(test_addr) == u"254" + + # test bad integer string + with self.assertRaises(ValueError): + Address("256") + + # test modern hex string + test_addr = Address(u"0x01") + self.match_address(test_addr, 2, None, 1, '01') + assert str(test_addr) == u"1" + + test_addr = Address(u"0x0102") + self.match_address(test_addr, 2, None, 2, '0102') + assert str(test_addr) == u"0x0102" + + # test old school hex string + test_addr = Address(u"X'01'") + self.match_address(test_addr, 2, None, 1, '01') + assert str(test_addr) == u"1" + + test_addr = Address(u"X'0102'") + self.match_address(test_addr, 2, None, 2, '0102') + assert str(test_addr) == u"0x0102" + + def test_address_local_broadcast_str(self): + if _debug: TestAddress._debug("test_address_local_broadcast_str") # test local broadcast test_addr = Address("*") self.match_address(test_addr, 1, None, None, None) assert str(test_addr) == "*" - def test_address_remote_broadcast(self): - if _debug: TestAddress._debug("test_address_remote_broadcast") + def test_address_local_broadcast_unicode(self): + if _debug: TestAddress._debug("test_address_local_broadcast_unicode") + + # test local broadcast + test_addr = Address(u"*") + self.match_address(test_addr, 1, None, None, None) + assert str(test_addr) == u"*" + + def test_address_remote_broadcast_str(self): + if _debug: TestAddress._debug("test_address_remote_broadcast_str") # test remote broadcast test_addr = Address("1:*") @@ -154,8 +222,20 @@ def test_address_remote_broadcast(self): with self.assertRaises(ValueError): Address("65536:*") - def test_address_remote_station(self): - if _debug: TestAddress._debug("test_address_remote_station") + def test_address_remote_broadcast_unicode(self): + if _debug: TestAddress._debug("test_address_remote_broadcast_unicode") + + # test remote broadcast + test_addr = Address(u"1:*") + self.match_address(test_addr, 3, 1, None, None) + assert str(test_addr) == u"1:*" + + # test remote broadcast bad network + with self.assertRaises(ValueError): + Address("65536:*") + + def test_address_remote_station_str(self): + if _debug: TestAddress._debug("test_address_remote_station_str") # test integer remote station test_addr = Address("1:2") @@ -198,14 +278,66 @@ def test_address_remote_station(self): with self.assertRaises(ValueError): Address("65536:X'02'") - def test_address_global_broadcast(self): - if _debug: TestAddress._debug("test_address_global_broadcast") + def test_address_remote_station_unicode(self): + if _debug: TestAddress._debug("test_address_remote_station_unicode") + + # test integer remote station + test_addr = Address(u"1:2") + self.match_address(test_addr, 4, 1, 1, '02') + assert str(test_addr) == u"1:2" + + test_addr = Address(u"1:254") + self.match_address(test_addr, 4, 1, 1, 'fe') + assert str(test_addr) == u"1:254" + + # test bad network and node + with self.assertRaises(ValueError): + Address(u"65536:2") + with self.assertRaises(ValueError): + Address(u"1:256") + + # test modern hex string + test_addr = Address(u"1:0x02") + self.match_address(test_addr, 4, 1, 1, '02') + assert str(test_addr) == u"1:2" + + # test bad network + with self.assertRaises(ValueError): + Address(u"65536:0x02") + + test_addr = Address(u"1:0x0203") + self.match_address(test_addr, 4, 1, 2, '0203') + assert str(test_addr) == u"1:0x0203" + + # test old school hex string + test_addr = Address(u"1:X'02'") + self.match_address(test_addr, 4, 1, 1, '02') + assert str(test_addr) == u"1:2" + + test_addr = Address(u"1:X'0203'") + self.match_address(test_addr, 4, 1, 2, '0203') + assert str(test_addr) == u"1:0x0203" + + # test bad network + with self.assertRaises(ValueError): + Address(u"65536:X'02'") + + def test_address_global_broadcast_str(self): + if _debug: TestAddress._debug("test_address_global_broadcast_str") # test local broadcast test_addr = Address("*:*") self.match_address(test_addr, 5, None, None, None) assert str(test_addr) == "*:*" + def test_address_global_broadcast_unicode(self): + if _debug: TestAddress._debug("test_address_global_broadcast_unicode") + + # test local broadcast + test_addr = Address(u"*:*") + self.match_address(test_addr, 5, None, None, None) + assert str(test_addr) == u"*:*" + @bacpypes_debugging class TestLocalStation(unittest.TestCase, MatchAddressMixin): @@ -368,8 +500,8 @@ def test_global_broadcast(self): @bacpypes_debugging class TestAddressEquality(unittest.TestCase, MatchAddressMixin): - def test_address_equality(self): - if _debug: TestAddressEquality._debug("test_address_equality") + def test_address_equality_str(self): + if _debug: TestAddressEquality._debug("test_address_equality_str") assert Address(1) == LocalStation(1) assert Address("2") == LocalStation(2) @@ -377,3 +509,14 @@ def test_address_equality(self): assert Address("3:4") == RemoteStation(3, 4) assert Address("5:*") == RemoteBroadcast(5) assert Address("*:*") == GlobalBroadcast() + + def test_address_equality_unicode(self): + if _debug: TestAddressEquality._debug("test_address_equality_unicode") + + assert Address(1) == LocalStation(1) + assert Address(u"2") == LocalStation(2) + assert Address(u"*") == LocalBroadcast() + assert Address(u"3:4") == RemoteStation(3, 4) + assert Address(u"5:*") == RemoteBroadcast(5) + assert Address(u"*:*") == GlobalBroadcast() + diff --git a/tests/test_primitive_data/test_character_string.py b/tests/test_primitive_data/test_character_string.py index 15f66ec8..10430561 100644 --- a/tests/test_primitive_data/test_character_string.py +++ b/tests/test_primitive_data/test_character_string.py @@ -93,6 +93,13 @@ def test_character_string_str(self): assert obj.value == "hello" assert str(obj) == "CharacterString(0,X'68656c6c6f')" + def test_character_string_unicode(self): + if _debug: TestCharacterString._debug("test_character_string_unicode") + + obj = CharacterString(u"hello") + assert obj.value == u"hello" + assert str(obj) == "CharacterString(0,X'68656c6c6f')" + def test_character_string_tag(self): if _debug: TestCharacterString._debug("test_character_string_tag") @@ -126,4 +133,4 @@ def test_character_string_endec(self): obj = CharacterString(character_string_tag('')) character_string_endec("", '00') - character_string_endec("abc", '00616263') \ No newline at end of file + character_string_endec("abc", '00616263') diff --git a/tests/test_primitive_data/test_enumerated.py b/tests/test_primitive_data/test_enumerated.py index 67d823c6..31fe4c1c 100644 --- a/tests/test_primitive_data/test_enumerated.py +++ b/tests/test_primitive_data/test_enumerated.py @@ -18,6 +18,14 @@ _log = ModuleLogger(globals()) +class QuickBrownFox(Enumerated): + enumerations = { + 'quick': 0, + 'brown': 1, + 'fox': 2, + } + + @bacpypes_debugging def enumerated_tag(x): """Convert a hex string to an enumerated application tag.""" @@ -93,6 +101,38 @@ def test_enumerated_int(self): with self.assertRaises(ValueError): Enumerated(-1) + def test_enumerated_str(self): + if _debug: TestEnumerated._debug("test_enumerated_str") + + obj = QuickBrownFox('quick') + assert obj.value == 'quick' + assert str(obj) == "QuickBrownFox(quick)" + + with self.assertRaises(ValueError): + QuickBrownFox(-1) + with self.assertRaises(ValueError): + QuickBrownFox('lazyDog') + + tag = Tag(Tag.applicationTagClass, Tag.enumeratedAppTag, 1, xtob('01')) + obj = QuickBrownFox(tag) + assert obj.value == 'brown' + + def test_enumerated_unicode(self): + if _debug: TestEnumerated._debug("test_enumerated_unicode") + + obj = QuickBrownFox(u'quick') + assert obj.value == u'quick' + assert str(obj) == "QuickBrownFox(quick)" + + with self.assertRaises(ValueError): + QuickBrownFox(-1) + with self.assertRaises(ValueError): + QuickBrownFox(u'lazyDog') + + tag = Tag(Tag.applicationTagClass, Tag.enumeratedAppTag, 1, xtob('01')) + obj = QuickBrownFox(tag) + assert obj.value == u'brown' + def test_enumerated_tag(self): if _debug: TestEnumerated._debug("test_enumerated_tag") @@ -138,4 +178,4 @@ def test_enumerated_endec(self): enumerated_endec(8388608, '800000') enumerated_endec(2147483647, '7fffffff') - enumerated_endec(2147483648, '80000000') \ No newline at end of file + enumerated_endec(2147483648, '80000000') diff --git a/tests/test_primitive_data/test_object_identifier.py b/tests/test_primitive_data/test_object_identifier.py index d14f161c..c97a1444 100644 --- a/tests/test_primitive_data/test_object_identifier.py +++ b/tests/test_primitive_data/test_object_identifier.py @@ -97,6 +97,12 @@ def test_object_identifier_int(self): def test_object_identifier_tuple(self): if _debug: TestObjectIdentifier._debug("test_object_identifier_tuple") + obj = ObjectIdentifier(('analogInput', 0)) + assert obj.value == ('analogInput', 0) + + obj = ObjectIdentifier((u'analogInput', 0)) + assert obj.value == (u'analogInput', 0) + with self.assertRaises(ValueError): ObjectIdentifier((0, -1)) with self.assertRaises(ValueError): @@ -136,5 +142,7 @@ def test_object_identifier_endec(self): # test standard types object_identifier_endec(('analogInput', 0), '00000000') + object_identifier_endec((u'analogInput', 0), '00000000') + + # test vendor types - # test vendor types \ No newline at end of file diff --git a/tests/test_primitive_data/test_object_type.py b/tests/test_primitive_data/test_object_type.py index 62b8eb0d..dc6e4707 100644 --- a/tests/test_primitive_data/test_object_type.py +++ b/tests/test_primitive_data/test_object_type.py @@ -23,7 +23,7 @@ class MyObjectType(ObjectType): 'myAnalogInput': 128, 'myAnalogOutput': 129, 'myAnalogValue': 130, - } + } expand_enumerations(MyObjectType) @@ -113,6 +113,13 @@ def test_object_type_str(self): obj = ObjectType('analogInput') assert obj.value == 'analogInput' + def test_object_type_unicode(self): + if _debug: TestObjectType._debug("test_object_type_unicode") + + # known strings are accepted + obj = ObjectType(u'analogInput') + assert obj.value == u'analogInput' + def test_extended_object_type_int(self): if _debug: TestObjectType._debug("test_extended_object_type_int") @@ -137,6 +144,17 @@ def test_extended_object_type_str(self): with self.assertRaises(ValueError): MyObjectType('snork') + def test_extended_object_type_unicode(self): + if _debug: TestObjectType._debug("test_extended_object_type_unicode") + + # known strings are accepted + obj = MyObjectType(u'myAnalogInput') + assert obj.value == u'myAnalogInput' + + # unknown strings are rejected + with self.assertRaises(ValueError): + MyObjectType(u'snork') + def test_object_type_tag(self): if _debug: TestObjectType._debug("test_object_type_tag") @@ -174,4 +192,4 @@ def test_object_type_endec(self): object_type_endec('analogOutput', '01') object_type_endec(127, '7f') - object_type_endec(128, '80') \ No newline at end of file + object_type_endec(128, '80')