#!/usr/bin/env python
# type: ignore
"""
Rebuilt Commandable
"""
from bacpypes.basetypes import (
BinaryPV,
ChannelValue,
DateTime,
DoorValue,
PriorityArray,
PriorityValue,
)
from bacpypes.debugging import ModuleLogger, bacpypes_debugging
from bacpypes.errors import ExecutionError
from bacpypes.local.object import CurrentPropertyListMixIn
from bacpypes.object import (
AccessDoorObject,
AnalogOutputObject,
AnalogValueObject,
BinaryOutputObject,
BinaryValueObject,
BitStringValueObject,
ChannelObject,
CharacterStringValueObject,
DatePatternValueObject,
DateTimePatternValueObject,
DateTimeValueObject,
DateValueObject,
IntegerValueObject,
LargeAnalogValueObject,
LightingOutputObject,
MultiStateOutputObject,
MultiStateValueObject,
OctetStringValueObject,
PositiveIntegerValueObject,
Property,
ReadableProperty,
TimePatternValueObject,
TimeValueObject,
WritableProperty,
register_object_type,
)
from bacpypes.primitivedata import (
BitString,
CharacterString,
Date,
Double,
Enumerated,
Integer,
OctetString,
Real,
Time,
Unsigned,
)
from bacpypes.task import OneShotTask
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# Commandable
#
[docs]@bacpypes_debugging
def Commandable(
datatype,
presentValue="presentValue",
priorityArray="priorityArray",
relinquishDefault="relinquishDefault",
):
if _debug:
Commandable._debug("Commandable %r ...", datatype)
class _Commando(object):
properties = [
WritableProperty(presentValue, datatype),
ReadableProperty(priorityArray, PriorityArray),
ReadableProperty(relinquishDefault, datatype),
]
_pv_choice = None
def __init__(self, **kwargs):
super(_Commando, self).__init__(**kwargs)
# build a default value in case one is needed
default_value = datatype().value
if issubclass(datatype, Enumerated):
default_value = datatype._xlate_table[default_value]
if _debug:
Commandable._debug(" - default_value: %r", default_value)
# see if a present value was provided
if presentValue not in kwargs:
setattr(self, presentValue, default_value)
# see if a priority array was provided
if priorityArray not in kwargs:
setattr(self, priorityArray, PriorityArray())
# see if a present value was provided
if relinquishDefault not in kwargs:
setattr(self, relinquishDefault, default_value)
def _highest_priority_value(self):
if _debug:
Commandable._debug("_highest_priority_value")
priority_array = getattr(self, priorityArray)
for i in range(1, 17):
priority_value = priority_array[i]
if priority_value.null is None:
if _debug:
Commandable._debug(" - found at index: %r", i)
value = getattr(priority_value, _Commando._pv_choice)
value_source = "###"
if issubclass(datatype, Enumerated):
value = datatype._xlate_table[value]
if _debug:
Commandable._debug(" - remapped enumeration: %r", value)
break
else:
value = getattr(self, relinquishDefault)
value_source = None
if _debug:
Commandable._debug(
" - value, value_source: %r, %r", value, value_source
)
# return what you found
return value, value_source
def WriteProperty(
self, property, value, arrayIndex=None, priority=None, direct=False
):
if _debug:
Commandable._debug(
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
property,
value,
arrayIndex,
priority,
direct,
)
# when writing to the presentValue with a priority
if property == presentValue:
if _debug:
Commandable._debug(
" - writing to %s, priority %r", presentValue, priority
)
# default (lowest) priority
if priority is None:
priority = 16
if _debug:
Commandable._debug(
" - translate to priority array, index %d", priority
)
# translate to updating the priority array
property = priorityArray
arrayIndex = priority
priority = None
# update the priority array entry
if property == priorityArray:
if arrayIndex is None:
if _debug:
Commandable._debug(" - writing entire %s", priorityArray)
# pass along the request
super(_Commando, self).WriteProperty(
property,
value,
arrayIndex=arrayIndex,
priority=priority,
direct=direct,
)
else:
if _debug:
Commandable._debug(
" - writing to %s, array index %d",
priorityArray,
arrayIndex,
)
# check the bounds
if arrayIndex == 0:
raise ExecutionError(
errorClass="property", errorCode="writeAccessDenied"
)
if (arrayIndex < 1) or (arrayIndex > 16):
raise ExecutionError(
errorClass="property", errorCode="invalidArrayIndex"
)
# update the specific priorty value element
priority_value = getattr(self, priorityArray)[arrayIndex]
if _debug:
Commandable._debug(" - priority_value: %r", priority_value)
# the null or the choice has to be set, the other clear
if value == ():
if _debug:
Commandable._debug(" - write a null")
priority_value.null = value
setattr(priority_value, _Commando._pv_choice, None)
else:
if _debug:
Commandable._debug(" - write a value")
if issubclass(datatype, Enumerated):
value = datatype._xlate_table[value]
if _debug:
Commandable._debug(
" - remapped enumeration: %r", value
)
priority_value.null = None
setattr(priority_value, _Commando._pv_choice, value)
# look for the highest priority value
value, value_source = self._highest_priority_value()
# compare with the current value
current_value = getattr(self, presentValue)
if value == current_value:
if _debug:
Commandable._debug(" - no present value change")
return
# turn this into a present value change
property = presentValue
arrayIndex = priority = None
# allow the request to pass through
if _debug:
Commandable._debug(
" - super: %r %r arrayIndex=%r priority=%r",
property,
value,
arrayIndex,
priority,
)
super(_Commando, self).WriteProperty(
property, value, arrayIndex=arrayIndex, priority=priority, direct=direct
)
# look up a matching priority value choice
for element in PriorityValue.choiceElements:
if issubclass(datatype, element.klass):
_Commando._pv_choice = element.name
break
else:
_Commando._pv_choice = "constructedValue"
if _debug:
Commandable._debug(" - _pv_choice: %r", _Commando._pv_choice)
# return the class
return _Commando
#
# MinOnOffTask
#
[docs]@bacpypes_debugging
class MinOnOffTask(OneShotTask):
def __init__(self, binary_obj):
if _debug:
MinOnOffTask._debug("__init__ %s", repr(binary_obj))
OneShotTask.__init__(self)
# save a reference to the object
self.binary_obj = binary_obj
# listen for changes to the present value
self.binary_obj._property_monitors["presentValue"].append(
self.present_value_change
)
[docs] def present_value_change(self, old_value, new_value):
if _debug:
MinOnOffTask._debug("present_value_change %r %r", old_value, new_value)
# if there's no value change, skip all this
if old_value == new_value:
if _debug:
MinOnOffTask._debug(" - no state change")
return
# get the minimum on/off time
if new_value == "inactive":
task_delay = getattr(self.binary_obj, "minimumOnTime") or 0
if _debug:
MinOnOffTask._debug(" - minimum on: %r", task_delay)
elif new_value == "active":
task_delay = getattr(self.binary_obj, "minimumOffTime") or 0
if _debug:
MinOnOffTask._debug(" - minimum off: %r", task_delay)
else:
raise ValueError(
"unrecognized present value for %r: %r"
% (self.binary_obj.objectIdentifier, new_value)
)
# if there's no delay, don't bother
if not task_delay:
if _debug:
MinOnOffTask._debug(" - no delay")
return
# set the value at priority 6
self.binary_obj.WriteProperty("presentValue", new_value, priority=6)
# install this to run, if there is a delay
self.install_task(delta=task_delay)
[docs] def process_task(self):
if _debug:
MinOnOffTask._debug("process_task(%s)", self.binary_obj.objectName)
# clear the value at priority 6
self.binary_obj.WriteProperty("presentValue", (), priority=6)
#
# MinOnOff
#
[docs]@bacpypes_debugging
class MinOnOff(object):
def __init__(self, **kwargs):
if _debug:
MinOnOff._debug("__init__ ...")
super(MinOnOff, self).__init__(**kwargs)
# create the timer task
self._min_on_off_task = MinOnOffTask(self)
#
# Commandable Standard Objects
#
[docs]class AccessDoorObjectCmd(Commandable(DoorValue), AccessDoorObject):
pass
[docs]class AnalogOutputObjectCmd(Commandable(Real), AnalogOutputObject):
pass
[docs]class AnalogValueObjectCmd(Commandable(Real), AnalogValueObject):
pass
### class BinaryLightingOutputObjectCmd(Commandable(Real), BinaryLightingOutputObject):
### pass
[docs]class BinaryOutputObjectCmd(Commandable(BinaryPV), MinOnOff, BinaryOutputObject):
pass
[docs]class BinaryValueObjectCmd(Commandable(BinaryPV), MinOnOff, BinaryValueObject):
pass
[docs]class BitStringValueObjectCmd(Commandable(BitString), BitStringValueObject):
pass
[docs]class CharacterStringValueObjectCmd(
Commandable(CharacterString), CharacterStringValueObject
):
pass
[docs]class DateValueObjectCmd(Commandable(Date), DateValueObject):
pass
[docs]class DatePatternValueObjectCmd(Commandable(Date), DatePatternValueObject):
pass
[docs]class DateTimeValueObjectCmd(Commandable(DateTime), DateTimeValueObject):
pass
[docs]class DateTimePatternValueObjectCmd(Commandable(DateTime), DateTimePatternValueObject):
pass
[docs]class IntegerValueObjectCmd(Commandable(Integer), IntegerValueObject):
pass
[docs]class LargeAnalogValueObjectCmd(Commandable(Double), LargeAnalogValueObject):
pass
[docs]class LightingOutputObjectCmd(Commandable(Real), LightingOutputObject):
pass
[docs]class MultiStateOutputObjectCmd(Commandable(Unsigned), MultiStateOutputObject):
pass
[docs]class MultiStateValueObjectCmd(Commandable(Unsigned), MultiStateValueObject):
pass
[docs]class OctetStringValueObjectCmd(Commandable(OctetString), OctetStringValueObject):
pass
[docs]class PositiveIntegerValueObjectCmd(Commandable(Unsigned), PositiveIntegerValueObject):
pass
[docs]class TimeValueObjectCmd(Commandable(Time), TimeValueObject):
pass
[docs]class TimePatternValueObjectCmd(Commandable(Time), TimePatternValueObject):
pass
#
# ChannelValueProperty
#
[docs]class ChannelValueProperty(Property):
def __init__(self):
if _debug:
ChannelValueProperty._debug("__init__")
Property.__init__(
self,
"presentValue",
ChannelValue,
default=None,
optional=False,
mutable=True,
)
[docs] def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug:
ChannelValueProperty._debug(
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
obj,
value,
arrayIndex,
priority,
direct,
)
### Clause 12.53.5, page 487
raise NotImplementedError()
#
# ChannelObjectCmd
#
[docs]class ChannelObjectCmd(ChannelObject):
properties = [ChannelValueProperty()]
##
##
##
##
##
[docs]@register_object_type(vendor_id=999)
class LocalAnalogValueObjectCmd(CurrentPropertyListMixIn, AnalogValueObjectCmd):
pass
[docs]@register_object_type(vendor_id=999)
class LocalBinaryOutputObjectCmd(CurrentPropertyListMixIn, BinaryOutputObjectCmd):
pass
[docs]@register_object_type(vendor_id=999)
class LocalDateValueObjectCmd(CurrentPropertyListMixIn, DateValueObjectCmd):
pass
"""
#
# __main__
#
def main():
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a commandable analog value object, add to the device
avo1 = LocalAnalogValueObjectCmd(
objectIdentifier=('analogValue', 1),
objectName='avo1',
)
if _debug: _log.debug(" - avo1: %r", avo1)
this_application.add_object(avo1)
# make a commandable binary output object, add to the device
boo1 = LocalBinaryOutputObjectCmd(
objectIdentifier=('binaryOutput', 1),
objectName='boo1',
presentValue='inactive',
relinquishDefault='inactive',
minimumOnTime=5, # let it warm up
minimumOffTime=10, # let it cool off
)
if _debug: _log.debug(" - boo1: %r", boo1)
this_application.add_object(boo1)
# get the current date
today = Date().now()
# make a commandable date value object, add to the device
dvo1 = LocalDateValueObjectCmd(
objectIdentifier=('dateValue', 1),
objectName='dvo1',
presentValue=today.value,
)
if _debug: _log.debug(" - dvo1: %r", dvo1)
this_application.add_object(dvo1)
if _debug: _log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()
"""