Source code for BAC0.core.io.Read

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
Read.py - creation of ReadProperty and ReadPropertyMultiple requests

    Used while defining an app:
    Example::

        class BasicScript(WhoisIAm, ReadProperty)

    Class::

        ReadProperty()
            def read()
            def readMultiple()

"""

import asyncio
import re
import logging

# --- standard Python modules ---
import typing as t

# from bacpypes3.core import deferred
# from bacpypes.iocb import IOCB, TimeoutError
# from bacpypes3.object import get_object_class, registered_object_types
from bacpypes3.apdu import (
    AbortPDU,
    AbortReason,
    ErrorRejectAbortNack,
    PropertyReference,
    Range,
    ReadAccessSpecification,
    ReadPropertyMultipleRequest,
    ReadRangeACK,
    ReadRangeRequest,
    RejectPDU,
    RejectReason,
)
from bacpypes3.app import Application
from bacpypes3.basetypes import (
    DateTime,
    EngineeringUnits,
    PropertyIdentifier,
    RangeByPosition,
    RangeBySequenceNumber,
    RangeByTime,
)
from bacpypes3.errors import NoResponse, ObjectError
from bacpypes3.object import get_vendor_info

# --- 3rd party modules ---
from bacpypes3.pdu import Address
from bacpypes3.primitivedata import Date, ObjectIdentifier, Tag, Time

from BAC0.core.app.asyncApp import BAC0Application

from ..utils.notes import note_and_log

# --- this application's modules ---
from .IOExceptions import (
    ApplicationNotStarted,
    NoResponseFromController,
    ReadRangeException,
    SegmentationNotSupported,
    UnknownObjectError,
    UnknownPropertyError,
    UnrecognizedService,
)

# ------------------------------------------------------------------------------


ReadValue = t.Union[float, str, t.List]
rpm_request_pattern = r"(?P<request>(?P<Object>[0-9A-Za-z-]+:\d+)[, ]+[(\[ ](?P<Properties>(?P<Property>[0-9A-Za-z-]+(\[\d+\])*[, ]*)+)[)\]]*)"


[docs] @note_and_log class ReadProperty: """ Defines BACnet Read functions: readProperty and readPropertyMultiple. Data exchange is made via a Queue object A timeout of 10 seconds allows detection of invalid device or communciation errors. """ # Attributes provided by the application that mixes this class in. # These are declared so static type checkers understand the mixin's # dependency on the hosting application instance. _started: bool this_application: BAC0Application # Logging and helper methods provided by the host application log: t.Callable[..., None] log_title: t.Callable[..., None] _log: logging.Logger
[docs] async def read( self, args: str, arr_index: t.Optional[int] = None, vendor_id: int = 0, bacoid=None, timeout: int = 10, show_property_name: bool = False, ) -> t.Any: """ Build a ReadProperty request, wait for the answer and return the value :param args: String with <addr> <type> <inst> <prop> [ <indx> ] :returns: data read from device (str representing data like 10 or True) *Example*:: import BAC0 myIPAddr = '192.168.1.10/24' bacnet = BAC0.connect(ip = myIPAddr) bacnet.read('2:5 analogInput 1 presentValue') Requests the controller at (Network 2, address 5) for the presentValue of its analog input 1 (AI:1). """ if not self._started: raise ApplicationNotStarted("BACnet stack not running - use startApp()") _this_application: BAC0Application = self.this_application _app: Application = _this_application.app args_split = args.split() ( device_address, object_identifier, property_identifier, property_array_index, ) = self.build_rp_request( args_split, arr_index=arr_index, vendor_id=vendor_id, bacoid=bacoid ) self.log_title("Read property", args_split) # Do I know you ? dic = await self.this_application.app.device_info_cache.get_device_info( device_address ) if dic is None: _iam = await self.this_application.app.who_is(address=device_address) failures = 0 while _iam == []: # retry failures += 1 await asyncio.sleep(1) _iam = await self.this_application.app.who_is(address=device_address) if failures > 5: self.log( f"Trouble with Iam... Response received from {device_address} = {_iam}", level="error", ) raise NoResponseFromController await self.this_application.app.device_info_cache.set_device_info(_iam[0]) dic = await self.this_application.app.device_info_cache.get_device_info( device_address ) self.log(f"Device Info Cache : {dic}", level="debug") try: response = await _app.read_property( device_address, object_identifier, property_identifier, property_array_index, ) except ErrorRejectAbortNack as err: response = err if "unknown-property" in str(err.reason): if "description" in args: self._log.warning( "The description property is not implemented in the device. " "Using a default value for internal needs." ) return "n/a" elif "inactiveText" in args: self._log.warning( "The inactiveText property is not implemented in the device. " "Using a default value of Off for internal needs." ) return "False" elif "activeText" in args: self._log.warning( "The activeText property is not implemented in the device. " "Using a default value of On for internal needs." ) return "True" if "units" in args: self._log.warning( "The units property is not implemented in the device. We will consider noUnits" "Using a default value for internal needs. Please note that units is a required property for BACnet objects like analog values. The device you are reading from may be non-compliant." ) return EngineeringUnits("noUnits") else: raise UnknownPropertyError(f"Unknown property {args}") else: self.log(f"Error : {err}", level="error") except ObjectError: raise UnknownObjectError(f"Unknown object {args}") # except bufferOverflow except NoResponse: raise NoResponseFromController if not isinstance(response, ErrorRejectAbortNack): return response
def _split_the_read_request(self, args, arr_index): """ When a device doesn't support segmentation, this function will split the request according to the length of the predicted result which can be known when reading the array_index number 0. This can be a very long process as some devices count a large number of properties without supporting segmentation (FieldServers are a good example) """ # parameter arr_index appears to be unused in this function? nmbr_obj = self.read(args, arr_index=0) return [self.read(args, arr_index=i) for i in range(1, nmbr_obj + 1)] # type: ignore
[docs] async def readMultiple( self, args: str, request_dict=None, vendor_id: int = 0, timeout: int = 10, show_property_name: bool = False, from_regex=False, ) -> t.Union[t.Dict, t.List]: """Build a ReadPropertyMultiple request, wait for the answer and return the values :param args: String with <addr> ( <type> <inst> ( <prop> [ <indx> ] )... )... :returns: data read from device (str representing data like 10 or True) *Example*:: import BAC0 myIPAddr = '192.168.1.10/24' bacnet = BAC0.connect(ip = myIPAddr) bacnet.readMultiple('2:5 analogInput 1 presentValue units') Requests the controller at (Network 2, address 5) for the (presentValue and units) of its analog input 1 (AI:1). """ if not self._started: raise ApplicationNotStarted("BACnet stack not running - use startApp()") _this_application: BAC0Application = self.this_application _app: Application = _this_application.app if request_dict is not None: address, parameter_list = await self.build_rpm_request_from_dict( request_dict, vendor_id ) elif from_regex: address, parameter_list = await self.build_rpm_request_from_regex( args, vendor_id=vendor_id ) self.log_title(f"Read Multiple for {address} | params : {parameter_list}") else: args_list = args.split() address, parameter_list = await self.build_rpm_request( args_list, vendor_id=vendor_id ) self.log_title("Read Multiple", args_list) # Force DeviceInfoCache dic = await self.this_application.app.device_info_cache.get_device_info(address) if dic is None: _iam = await self.this_application.app.who_is(address=address) await self.this_application.app.device_info_cache.set_device_info(_iam[0]) dic = await self.this_application.app.device_info_cache.get_device_info( address ) self.log(f"Device Info Cache : {dic}", level="debug") values: t.List = [] dict_values: t.Dict[str, t.List[t.Tuple[str, t.Any]]] = {} self.log(f"Parameter list : {parameter_list}", level="debug") try: # build an ReadPropertyMultiple request response = await _app.read_property_multiple(address, parameter_list) self.log(f"Response : {response}", level="debug") except ErrorRejectAbortNack as err: # construction error response = err self._log.exception(f"exception: {err.reason}") if "segmentation-not-supported" in str(err.reason): raise SegmentationNotSupported if "unrecognized-service" in str(err.reason): raise UnrecognizedService() if "unknown-object" in str(err.reason): self.log(f"Unknown object {args}", level="warning") raise UnknownObjectError(f"Unknown object {args}") if "unknown-property" in str(err.reason): values.append(()) # type: ignore[arg-type] return values if "no-response" in str(err.reason): # values.append("") # type: ignore[arg-type] # return values # try again try: response = await _app.read_property_multiple( address, parameter_list ) except ErrorRejectAbortNack as err: raise err if not isinstance(response, ErrorRejectAbortNack): """ TODO : Need improvement here and look for the property identifier that is coming from the response Then we'll be able to support multiple properties for the same object in the read multiple function """ for ( object_identifier, property_identifier, property_array_index, property_value, ) in response: self._log.debug( "{!r:<20} {!r:<20} {!r:<30} {!r:<20}".format( property_identifier, property_array_index, property_value, # datatype, "", ) ) if str(object_identifier) not in dict_values: dict_values[str(object_identifier)] = [] if show_property_name: values.append((str(property_value), int(property_identifier))) dict_values[str(object_identifier)].append( (str(property_identifier), (property_value, property_identifier)) ) else: values.append(property_value) dict_values[str(object_identifier)].append( (property_identifier, property_value)) if request_dict is not None: return dict_values else: return values return values
[docs] def build_rp_request( self, args: t.List[str], arr_index=None, vendor_id: int = 0, bacoid=None ) -> t.Tuple: try: addr, obj_type_str, obj_inst_str, prop_id_str = args[:4] object_identifier = ObjectIdentifier((obj_type_str, int(obj_inst_str))) except ValueError: addr, obj_type_str, prop_id_str = args[:3] object_identifier = ObjectIdentifier(obj_type_str) device_address = Address(addr) # TODO : This part needs work to find proprietary objects # obj_type = self.__get_obj_type(obj_type_str, vendor_id) prop_id: t.Union[int, str] if prop_id_str.isdigit(): prop_id = int(prop_id_str) elif "@prop_" in prop_id_str or "@idx:" in prop_id_str: if "@idx" in prop_id_str: prop_id, arr_index_str = prop_id_str.split("@idx:") arr_index = int(arr_index_str) else: prop_id = prop_id_str.split("@prop_")[1] else: prop_id = prop_id_str # type: ignore prop_id = PropertyIdentifier(prop_id) if arr_index is None: arr_index = int(args[4]) if len(args) == 5 else arr_index params = (device_address, object_identifier, prop_id, arr_index) self.log(f"{'REQUEST':<20} {params!r}", level="debug") return params
[docs] async def build_rpm_request( self, args: t.List[str], vendor_id: int = 0 ) -> ReadPropertyMultipleRequest: """ Build request from args """ property_array_index = None _this_application: BAC0Application = self.this_application _app: Application = _this_application.app self.log(args, level="debug") vendor_id = vendor_id address = Address(args.pop(0)) # get information about the device from the cache device_info = await _app.device_info_cache.get_device_info(address) # using the device info, look up the vendor information if device_info: vendor_info = get_vendor_info(device_info.vendor_identifier) else: vendor_info = get_vendor_info(vendor_id) parameter_list = [] while args: # get the object identifier and using the vendor information, look # up the class obj_id_arg: t.Union[int, str] = args.pop(0) obj_id: t.Union[int, str] obj_instance: t.Union[int, str] if obj_id_arg.isdigit(): obj_id = int(obj_id_arg) elif "@obj_" in str(obj_id_arg): obj_id = int(obj_id_arg.split("@obj_")[1]) else: obj_id = obj_id_arg if ":" not in str(obj_id_arg): obj_instance = args.pop(0) else: obj_id = obj_id_arg.split(":")[0] obj_instance = int(obj_id_arg.split(":")[1]) try: object_identifier = vendor_info.object_identifier((obj_id, obj_instance)) except UnboundLocalError: self._log.error(obj_id_arg) raise object_class = vendor_info.get_object_class(object_identifier[0]) if not object_class: await self.response(f"unrecognized object type: {object_identifier}") return properties_list = [] while args: # now get the property type from the class if "@obj_" in args[0]: break elif "@prop_" in args[0] or "@idx_" in args[0]: if "@idx_" in args[0]: prop_id, arr_index = args.pop(0).split("@idx_") else: prop_id = args.pop(0).split("@prop_")[1] else: prop_id = args.pop(0) try: property_identifier = vendor_info.property_identifier(prop_id) except ValueError: try: property_identifier = PropertyIdentifier(prop_id) except ValueError: break # probably another object if property_identifier not in ( PropertyIdentifier.all, PropertyIdentifier.required, PropertyIdentifier.optional, # "objectName", # "objectType", # "objectIdentifier", # "polarity", ): property_type = object_class.get_property_type(property_identifier) if not property_type: await _app.response( f"unrecognized property: {property_identifier}" ) return # check for a property array index if args and args[0].isdigit() and arr_index is None: property_array_index = int(args.pop(0)) # save this as a parameter properties_list.append((property_identifier, property_array_index)) elif property_array_index is not None: properties_list.append((property_identifier, arr_index)) else: properties_list.append(property_identifier) # crude check to see if the next thing is an object identifier if args and ((":" in args[0]) or ("," in args[0]) or ("-" in args[0])): break parameter_list.append(object_identifier) parameter_list.append(properties_list) if not parameter_list: await _app.response("object identifier expected") return else: return (address, parameter_list)
[docs] async def build_rpm_request_from_regex(self, args, vendor_id=0): pattern = re.compile(rpm_request_pattern) address = Address(args.split()[0]) result = re.findall(pattern, args) request = [] for each in result: _, object_identifier, properties, _, _ = each request.append(ObjectIdentifier(object_identifier)) request.append([PropertyReference(x) for x in properties.split()]) self.log(f"RPM Request from Regex : {address} | {request}", level="debug") return (address, request)
[docs] async def build_rpm_request_from_dict(self, request_dict, vendor_id=0): """ Read property multiple allow to read a lot of properties with only one request The existing RPM function is made using a string that must be created using bacpypes console style and is hard to automate. This new version will be an attempt to improve that:: _rpm = {'address': '11:2', 'objects': {'analogInput:1': ['presentValue', 'description', 'unit', 'objectList@idx:0'], 'analogInput:2': ['presentValue', 'description', 'unit', 'objectList@idx:0'], }, vendor_id: 842 } """ address = Address(request_dict["address"]) objects = request_dict["objects"] _this_application: BAC0Application = self.this_application _app: Application = _this_application.app # get information about the device from the cache device_info = await _app.device_info_cache.get_device_info(address) # using the device info, look up the vendor information if device_info: vendor_info = get_vendor_info(device_info.vendor_identifier) else: vendor_info = get_vendor_info(0) parameter_list = [] arr_index = None for obj, list_of_properties in objects.items(): object_identifier = ObjectIdentifier(obj) properties_list = [] for each in list_of_properties: if "@obj_" in each: break elif "@prop_" in each or "@idx" in each: if "@idx_" in each: prop_id, arr_index = each.split("@idx_") else: prop_id = int(each.split("@prop_")[1]) # there was no idx else: prop_id = each property_identifier = vendor_info.property_identifier(prop_id) if arr_index: properties_list.append((property_identifier, arr_index)) else: properties_list.append(property_identifier) parameter_list.append(object_identifier) parameter_list.append(properties_list) if not parameter_list: await _app.response("object identifier expected") return return (address, parameter_list)
[docs] def build_rrange_request( self, args, range_params=None, arr_index=None, vendor_id=0, bacoid=None ): addr, obj_type, obj_inst, prop_id = args[:4] vendor_id = vendor_id bacoid = bacoid if obj_type.isdigit(): obj_type = int(obj_type) obj_inst = int(obj_inst) if prop_id.isdigit(): prop_id = int(prop_id) # build a request request = ReadRangeRequest( objectIdentifier=(obj_type, obj_inst), propertyIdentifier=prop_id ) request.pduDestination = Address(addr) if range_params is not None: range_type, first, date, time, count = range_params if range_type == "p": rbp = RangeByPosition(referenceIndex=int(first), count=int(count)) request.range = Range(byPosition=rbp) elif range_type == "s": rbs = RangeBySequenceNumber( referenceSequenceNumber=int(first), count=int(count) ) request.range = Range(bySequenceNumber=rbs) elif range_type == "t": rbt = RangeByTime( referenceTime=DateTime(date=Date(date), time=Time(time)), count=int(count), ) request.range = Range(byTime=rbt) elif range_type == "x": # should be missing required parameter request.range = Range() else: raise ValueError(f"unknown range type: {range_type!r}") if len(args) == 5: request.propertyArrayIndex = int(args[4]) self.log(f"{'REQUEST':<20} {request!r}", level="debug") return request
[docs] async def readRange( self, args, range_params=None, arr_index=None, vendor_id=0, bacoid=None, timeout=10, ): """ Build a ReadRangeRequest request, wait for the answer and return the value :param args: String with <addr> <type> <inst> <prop> [ <indx> ] :param range_params: parameters defining how to query the range, a list of five elements :returns: data read from device (list of LogRecords) Range parameters (five elements): - range_type (str): one of ['p', 's', 't'] - 'p' (RangeByPosition) uses (first, count) - 's' (RangeBySequenceNumber) uses (first, count) - 't' (RangeByTime) filters by the given time and uses (date, time, count) - first (int): first element when querying by Position or Sequence Number - date (str): "YYYY-mm-DD" passed to bacpypes3.primitivedata.Date constructor - time (str): "HH:MM:SS" passed to bacpypes3.primitivedata.Time constructor - count (int): number of elements to return; negative numbers reverse the search direction *Example*:: import BAC0 from bacpypes.basetypes import Date, Time myIPAddr = '192.168.1.10/24' bacnet = BAC0.connect(ip=myIPAddr) log_records = bacnet.readRange('2:5 trendLog 1 logBuffer', range_params=('t', None, '2023-05-12', '12:00:00', 2)) for log_record in log_records: print(Date(log_record.timestamp.date), Time(log_record.timestamp.time), log_record.logDatum.realValue) # Date(2023-5-12 fri) Time(12:10:00.00) 130.331 # Date(2023-5-12 fri) Time(12:20:00.00) 134.123 log_records = bacnet.readRange('2:5 trendLog 1 logBuffer', range_params=('t', None, '2023-05-12', '12:00:00', -2)) for log_record in log_records: print(Date(log_record.timestamp.date), Time(log_record.timestamp.time), log_record.logDatum.realValue) # Date(2023-5-12 fri) Time(11:40:00.00) 123.4 # Date(2023-5-12 fri) Time(11:50:00.00) 125.1213 """ if not self._started: raise ApplicationNotStarted("BACnet stack not running - use startApp()") _this_application: BAC0Application = self.this_application _app: Application = _this_application.app args_split = args.split() self.log_title("Read range ", args_split) # get information about the device from the cache device_info = await _app.device_info_cache.get_device_info( Address(args_split[0]) ) # using the device info, look up the vendor information if device_info: vendor_info = get_vendor_info(device_info.vendor_identifier) else: vendor_info = get_vendor_info(0) try: # build ReadProperty request request = self.build_rrange_request( args_split, range_params=range_params, arr_index=arr_index, vendor_id=vendor_id, bacoid=bacoid, ) self.log(f"{'request':<20} {request!r}", level="debug") except ReadRangeException as error: # construction error self._log.exception(f"exception: {error!r}") response = await _app.request(request) if isinstance(response, ErrorRejectAbortNack): return response if not isinstance(response, ReadRangeACK): return None object_class = vendor_info.get_object_class(response.objectIdentifier[0]) datatype = object_class.get_property_type(response.propertyIdentifier) value = response.itemData.cast_out(datatype) return value
[docs] async def read_priority_array(self, addr, obj, obj_instance) -> t.List: pa = await self.read(f"{addr} {obj} {obj_instance} priorityArray") res = [pa] for each in range(1, 17): _pa = pa[each] # type: ignore[index] for k, v in _pa.__dict__.items(): if v is not None: res.append(v) return res
[docs] def find_reason(apdu): try: if apdu is TimeoutError: return "Timeout" elif apdu.pduType == RejectPDU.pduType: reasons = RejectReason.enumerations elif apdu.pduType == AbortPDU.pduType: reasons = AbortReason.enumerations else: _code = None try: _code = apdu.errorCode except AttributeError: try: _code = apdu.errorType.errorCode except AttributeError: raise ValueError("Cannot find reason...") if _code: return f"{_code}" code = apdu.apduAbortRejectReason try: return [k for k, v in reasons.items() if v == code][0] except IndexError: return code except KeyError as err: return f"KeyError: {type(apdu)} has no key {err.args[0]!r}"
[docs] def cast_datatype_from_tag(propertyValue, obj_id, prop_id): try: tag_list = propertyValue.tagList.tagList if tag_list[0].tagClass == 0: tag = tag_list[0].tagNumber datatype = Tag._app_tag_class[tag] else: from bacpypes3.constructeddata import ArrayOf subtype_tag = propertyValue.tagList.tagList[0].tagList[0].tagNumber datatype = ArrayOf(Tag._app_tag_class[subtype_tag]) value = {f"{obj_id}_{prop_id}": propertyValue.cast_out(datatype)} except Exception: value = {f"{obj_id}_{prop_id}": propertyValue} return value
[docs] def build_read_access_spec(obj_type, obj_instance, property_reference_list): return ReadAccessSpecification( objectIdentifier=(obj_type, obj_instance), listOfPropertyReferences=property_reference_list, )
[docs] def build_property_reference_list(obj_type, list_of_properties): property_reference_list = [] for prop in list_of_properties: idx = None if "@idx:" in prop: prop, idx = prop.split("@idx:") prop_id = validate_property_id(obj_type, prop) prop_reference = PropertyReference(propertyIdentifier=prop_id) if idx: prop_reference.propertyArrayIndex = int(idx) property_reference_list.append(prop_reference) return property_reference_list
[docs] def validate_property_id(obj_type, prop_id): if prop_id in PropertyIdentifier.enumerations: if prop_id in ( "all", "required", "optional", "objectName", "objectType", "objectIdentifier", "polarity", ): return prop_id # elif validate_datatype(obj_type, prop_id) is not None: # return prop_id else: raise ValueError( f"invalid property for object type : {obj_type} | {prop_id}" ) elif "@prop_" in prop_id: return int(prop_id.split("_")[1]) else: raise ValueError(f"{prop_id} is an invalid property for {obj_type}")
# def validate_datatype(obj_type, prop_id, vendor_id=842): # return get_datatype(obj_type, prop_id, vendor_id=vendor_id) if not None else False