#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
Write.py - creation of WriteProperty requests
Used while defining an app
Example::
class BasicScript(WhoisIAm, WriteProperty)
Class::
WriteProperty()
def write()
"""
import re
from bacpypes3.apdu import ErrorRejectAbortNack
from bacpypes3.app import Application
from bacpypes3.basetypes import PropertyIdentifier
# --- 3rd party modules ---
from bacpypes3.debugging import ModuleLogger
from bacpypes3.errors import PropertyError
from bacpypes3.pdu import Address
from bacpypes3.primitivedata import Null, ObjectIdentifier
from BAC0.tasks.DoOnce import DoOnce
from ..app.asyncApp import BAC0Application
from ..utils.notes import note_and_log
# --- this application's modules ---
from .IOExceptions import (
ApplicationNotStarted,
NoResponseFromController,
WritePropertyException,
)
# ------------------------------------------------------------------------------
# some debugging
_debug = 0
_LOG = ModuleLogger(globals())
WRITE_REGEX = r"(?P<address>(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:/\d{1,2})?(?::\d+)?|\d+:\d+) (?P<objId>(@obj_)?[-\w:]*[: ]*\d*) (?P<propId>(@prop_)?\w*(-\w*)?)[ ]?(?P<value>-?[\w\d]*\.?\d*)?[ ]?(?P<indx>-|\d*)?[ ]?(?P<priority>(1[0-6]|[0-9]))?"
write_pattern = re.compile(WRITE_REGEX)
[docs]
@note_and_log
class WriteProperty:
"""
Defines BACnet Write functions: WriteProperty [WritePropertyMultiple not supported]
"""
[docs]
def write(self, args):
write_task = DoOnce(fn=self._write, args=args)
write_task.start()
async def _write(self, args):
"""Build a WriteProperty request, wait for an answer, and return status [True if ok, False if not].
:param args: String with <addr> <type> <inst> <prop> <value> [ <indx> ] - [ <priority> ]
:returns: return status [True if ok, False if not]
*Example*::
import BAC0
bacnet = BAC0.lite()
bacnet.write('2:5 analogValue 1 presentValue 100 - 8')
Direct the controller at (Network 2, address 5) to write 100 to the presentValues of
its analogValue 1 (AV:1) at priority 8
"""
if not self._started:
raise ApplicationNotStarted("BACnet stack not running - use startApp()")
_this_application: BAC0Application = self.this_application
_app: Application = _this_application.app
self.log_title("Write property", args)
(
device_address,
object_identifier,
property_identifier,
value,
property_array_index,
priority,
) = request = self.build_wp_request(args)
# print(request)
try:
response = await _app.write_property(
device_address,
object_identifier,
property_identifier,
value,
property_array_index,
priority,
)
if response == "-no property type-" or response == "-no object class-":
self.log(
f"exception: {response!r}, you probably need to define proprietary objects and property for this vendor (or import the specific module)",
level="error",
)
return response
except ErrorRejectAbortNack as err:
self.log(f"exception: {err}", level="error")
raise NoResponseFromController(f"APDU Abort Reason : {err}")
except ValueError as err:
self.log(f"exception: {err}", level="error")
raise ValueError(f"Invalid value for property : {err}")
except (WritePropertyException, PropertyError) as error:
# construction error
self.log(f"exception: {error}", level="error")
return response
except TypeError as error:
# construction error
self.log(
f"Type Error exception: {error} | Requests = {request}", level="error"
)
return "response"
@classmethod
def _parse_wp_args(cls, args):
"""
Utility to parse the string of the request.
Supports @obj_ and @prop_ syntax for objest type and property id, useful with proprietary objects and properties.
"""
global write_pattern
match = write_pattern.search(args)
try:
address = match.group("address")
objId = match.group("objId")
prop_id = match.group("propId")
value = match.group("value")
indx = match.group("indx")
priority = match.group("priority")
except AttributeError:
raise ValueError(f"Invalid request | {args}")
if ":" in objId:
obj_type, obj_inst = objId.split(":")
else:
obj_type, obj_inst = objId.split(" ")
if obj_type.isdigit():
obj_type = int(obj_type)
elif "@obj_" in obj_type:
obj_type = int(obj_type.split("_")[1])
obj_inst = int(obj_inst)
if "@prop_" in prop_id:
prop_id = prop_id.split("_")[1]
if prop_id.isdigit():
prop_id = int(prop_id)
value = Null(()) if value == "null" else value
indx = None if indx == "-" or indx is None or indx == "" else int(indx)
priority = None if priority is None else int(priority)
return (address, obj_type, obj_inst, prop_id, value, priority, indx)
[docs]
def build_wp_request(self, args, vendor_id=0):
vendor_id = vendor_id
(
address,
obj_type,
obj_inst,
prop_id,
value,
priority,
indx,
) = WriteProperty._parse_wp_args(args)
object_identifier = ObjectIdentifier((obj_type, obj_inst))
property_identifier = PropertyIdentifier(prop_id)
device_address = Address(address)
request = (
device_address,
object_identifier,
property_identifier,
value,
indx,
priority,
)
self.log_subtitle("Creating Request")
self.log(
f"{'indx':<20} {'priority':<20} {'datatype':<20} {'value':<20}",
level="debug",
)
self.log(f"{'REQUEST':<20} {request}", level="debug")
return request
'''
def writeMultiple(self, addr=None, args=None, vendor_id=0, timeout=10):
"""Build a WritePropertyMultiple request, wait for an answer
:param addr: destination of request (ex. '2:3' or '192.168.1.2')
:param args: list of String with <type> <inst> <prop> <value> [ <indx> ] - [ <priority> ]
:param vendor_id: Mandatory for registered proprietary object and properties
:param timeout: used by IOCB to discard request if timeout reached
:returns: return status [True if ok, False if not]
*Example*::
import BAC0
bacnet = BAC0.lite()
r = ['analogValue 1 presentValue 100','analogValue 2 presentValue 100','analogValue 3 presentValue 100 - 8','@obj_142 1 @prop_1042 True']
bacnet.writeMultiple(addr='2:5',args=r,vendor_id=842)
# or
# bacnet.writeMultiple('2:5',r)
"""
if not self._started:
raise ApplicationNotStarted("BACnet stack not running - use startApp()")
self.log_title("Write property multiple", args)
try:
# build a WritePropertyMultiple request
iocb = IOCB(self.build_wpm_request(args, vendor_id=vendor_id, addr=addr))
iocb.set_timeout(timeout)
# pass to the BACnet stack
deferred(self.this_application.request_io, iocb)
self.log(f"{'iocb':<20} {iocb!r}", level='debug')
except WritePropertyException as error:
# construction error
self._log.exception(f"exception: {error!r}")
iocb.wait() # Wait for BACnet response
if iocb.ioResponse: # successful response
apdu = iocb.ioResponse
if not isinstance(apdu, SimpleAckPDU): # expect an ACK
self.log("Not an ack, see debug for more infos.", level='warning')
self.log(f"Not an ack. | APDU : {apdu} / {type(apdu)}", level='debug')
return
if iocb.ioError: # unsuccessful: error/reject/abort
apdu = iocb.ioError
reason = find_reason(apdu)
raise NoResponseFromController(f"APDU Abort Reason : {reason}")
def build_wpm_request(self, args, vendor_id=0, addr=None):
if not addr:
raise ValueError("Please provide addr")
address = Address(addr)
self.log_subtitle("Creating Write Multiple Request")
self.log(f"{'indx':<20} {'priority':<20} {'datatype':<20} {'value':<20}", level='debug')
was = []
listOfObjects = []
for each in args:
property_values = []
if isinstance(each, str):
(
object_identifier,
property_identifier,
value,
priority,
indx,
) = self._parse_wp_args(each)
elif isinstance(each, tuple):
# Supported but not really the best choice as the parser will
# catch some edge cases for you
object_identifier, property_identifier, value, priority, indx = each
else:
raise ValueError("Wrong encoding of request")
existingObject = next(
(obj for obj in was if obj.objectIdentifier == (obj_type, obj_inst)),
None,
)
if existingObject is None:
property_values.append(
PropertyValue(
propertyIdentifier=prop_id,
propertyArrayIndex=indx,
value=value,
priority=priority,
)
)
was.append(
WriteAccessSpecification(
objectIdentifier=(obj_type, obj_inst),
listOfProperties=property_values,
)
)
else:
existingObject.listOfProperties.append(
PropertyValue(
propertyIdentifier=prop_id,
propertyArrayIndex=indx,
value=value,
priority=priority,
)
)
datatype = get_datatype(obj_type, prop_id, vendor_id=vendor_id)
self._log.debug(
f"{indx!r:<20} {priority!r:<20} {datatype!r:<20} {value!r:<20}"
)
# build a request
request = WritePropertyMultipleRequest(listOfWriteAccessSpecs=was)
request.pduDestination = Address(addr)
self.log(f"{'REQUEST':<20} {request}", level='debug')
return request
'''