Source code for BAC0.core.app.ScriptApplication

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
SimpleApplication
=================

A basic BACnet application (bacpypes BIPSimpleApplication) for interacting with
the bacpypes BACnet stack.  It enables the base-level BACnet functionality
(a.k.a. device discovery) - meaning it can send & receive WhoIs & IAm messages.

Additional functionality is enabled by inheriting this application, and then
extending it with more functions. [See BAC0.scripts for more examples of this.]

"""
import typing as t

# --- standard Python modules ---
from collections import defaultdict
from typing import Any, Dict, Optional

from bacpypes.apdu import IAmRequest, ReadRangeACK, SimpleAckPDU

# --- 3rd party modules ---
from bacpypes.app import ApplicationIOController
from bacpypes.appservice import ApplicationServiceAccessPoint, StateMachineAccessPoint
from bacpypes.bvllservice import (
    BIPBBMD,
    AnnexJCodec,
    BIPForeign,
    BIPSimple,
    UDPMultiplexer,
)
from bacpypes.comm import Client, bind
from bacpypes.constructeddata import (
    Array,
    List,
    SequenceOfAny,
)
from bacpypes.core import deferred
from bacpypes.errors import ExecutionError, RejectException
from bacpypes.iocb import IOCB
from bacpypes.local.device import LocalDeviceObject
from bacpypes.netservice import NetworkServiceAccessPoint
from bacpypes.object import PropertyError
from bacpypes.pdu import Address
from bacpypes.service.cov import ChangeOfValueServices

# basic services
from bacpypes.service.device import WhoHasIHaveServices, WhoIsIAmServices
from bacpypes.service.object import (
    ReadWritePropertyMultipleServices,
    ReadWritePropertyServices,
)

from ..functions.Discover import NetworkServiceElementWithRequests

# --- this application's modules ---
from ..utils.notes import note_and_log

# ------------------------------------------------------------------------------


[docs]class common_mixin: """ They take message coming from the network that are not generated from a request we made. """
[docs] def do_IAmRequest(self, apdu): """Given an I-Am request, cache it.""" self._log.debug("do_IAmRequest {!r}".format(apdu)) # build a key from the source, just use the instance number key = (str(apdu.pduSource), apdu.iAmDeviceIdentifier[1]) self.i_am_counter[key] += 1 self._last_i_am_received.append(key)
[docs] def do_IHaveRequest(self, apdu): """Given an I-Have request, cache it.""" self._log.debug("do_IHaveRequest {!r}".format(apdu)) # build a key from the source, using object name key = (str(apdu.pduSource), apdu.objectName) self.i_have_counter[key] += 1 self._last_i_have_received.append(key)
[docs] def do_WhoIsRequest(self, apdu): """Respond to a Who-Is request.""" # build a key from the source and parameters key = ( str(apdu.pduSource), apdu.deviceInstanceRangeLowLimit, apdu.deviceInstanceRangeHighLimit, ) self._log.debug( "do_WhoIsRequest from {} | {} to {}".format(key[0], key[1], key[2]) ) # count the times this has been received self.who_is_counter[key] += 1 low_limit = key[1] high_limit = key[2] # count the times this has been received self.who_is_counter[key] += 1 if low_limit is not None and self.localDevice.objectIdentifier[1] < low_limit: return if high_limit is not None and self.localDevice.objectIdentifier[1] > high_limit: return # generate an I-Am self._log.debug("Responding to Who is by a Iam") self.iam_req.pduDestination = apdu.pduSource iocb = IOCB(self.iam_req) # make an IOCB deferred(self.request_io, iocb)
[docs] def do_ConfirmedCOVNotificationRequest(self, apdu): # look up the process identifier context = self.subscription_contexts.get(apdu.subscriberProcessIdentifier, None) if not context or apdu.pduSource != context.address: self._log.warning( "Unsollicited COV Notification received from {} ({}). Have you restarted the application recently ?".format( apdu.pduSource, apdu ) ) # this is turned into cancel_cov request and sent back to the client else: # now tell the context object elements = context.cov_notification(apdu) # success response = SimpleAckPDU(context=apdu) # send a confirmation self.response(response) self._log.debug("Confirmed COV Notification: {}".format(elements)) self.subscription_contexts["context_callback"](elements) # execute callback if context.callback is not None: context.callback(elements=elements)
[docs] def do_UnconfirmedCOVNotificationRequest(self, apdu): # look up the process identifier context = self.subscription_contexts.get(apdu.subscriberProcessIdentifier, None) if not context or apdu.pduSource != context.address: return # now tell the context object elements = context.cov_notification(apdu) self._log.debug("Unconfirmed COV Notification: {}".format(elements)) self.subscription_contexts["context_callback"](elements) # execute callback if context.callback is not None: context.callback(elements=elements)
[docs] def do_ReadRangeRequest(self, apdu): self._log.debug("do_ReadRangeRequest %r", apdu) # extract the object identifier objId = apdu.objectIdentifier # get the object obj = self.get_object_id(objId) self._log.debug(" - object: %r", obj) if not obj: raise ExecutionError(errorClass="object", errorCode="unknownObject") # get the datatype datatype = obj.get_datatype(apdu.propertyIdentifier) self._log.debug(" - datatype: %r", datatype) # must be a list, or an array of lists if issubclass(datatype, List): pass elif ( (apdu.propertyArrayIndex is not None) and issubclass(datatype, Array) and issubclass(datatype.subtype, List) ): pass else: raise ExecutionError(errorClass="property", errorCode="propertyIsNotAList") # get the value self._log.debug(apdu.__dict__) value = obj.ReadProperty(apdu.propertyIdentifier, apdu.propertyArrayIndex) self._log.debug(f" - value: {value.__repr__()} | of type {type(value)}") if value is None: raise PropertyError(apdu.propertyIdentifier) if isinstance(value, List): self._log.debug(" - value is a list of: %r", datatype.subtype) # datatype = datatype.subtype if apdu.range.byPosition: range_by_position = apdu.range.byPosition self._log.debug(" - range_by_position: %r", range_by_position) elif apdu.range.bySequenceNumber: range_by_sequence_number = apdu.range.bySequenceNumber self._log.debug( " - range_by_sequence_number: %r", range_by_sequence_number ) elif apdu.range.byTime: range_by_time = apdu.range.byTime self._log.debug(" - range_by_time: %r", range_by_time) else: raise RejectException("missingRequiredParameter") # this is an ack resp = ReadRangeACK(context=apdu) resp.objectIdentifier = objId resp.propertyIdentifier = apdu.propertyIdentifier resp.propertyArrayIndex = apdu.propertyArrayIndex resp.resultFlags = [1, 1, 0] resp.itemCount = len(value) # save the result in the item data item_data = SequenceOfAny() item_data.cast_in(value) resp.itemData = item_data self._log.debug(" - itemData : %r", resp.itemData) self._log.debug(" - resp: %r", resp) self.response(resp)
# return the result # iocb = IOCB(resp) # make an IOCB # deferred(self.request_io, iocb)
[docs]@note_and_log class BAC0Application( common_mixin, ApplicationIOController, WhoIsIAmServices, WhoHasIHaveServices, ReadWritePropertyServices, ReadWritePropertyMultipleServices, ChangeOfValueServices, ): """ Defines a basic BACnet/IP application to process BACnet requests. :param *args: local object device, local IP address See BAC0.scripts.BasicScript for more details. """ def __init__( self, localDevice: LocalDeviceObject, localAddress: Address, networkNumber: int = None, bbmdAddress=None, bbmdTTL: int = 0, deviceInfoCache=None, aseID=None, iam_req: Optional[IAmRequest] = None, subscription_contexts: Optional[Dict[Any, Any]] = None, ) -> None: ApplicationIOController.__init__( self, localDevice, deviceInfoCache, aseID=aseID ) self.iam_req = iam_req # local address might be useful for subclasses if isinstance(localAddress, Address): self.localAddress = localAddress else: self.localAddress = Address(localAddress) self.networkNumber = networkNumber # include a application decoder self.asap = ApplicationServiceAccessPoint() # pass the device object to the state machine access point so it # can know if it should support segmentation self.smap = StateMachineAccessPoint(localDevice) # the segmentation state machines need access to the same device # information cache as the application self.smap.deviceInfoCache = self.deviceInfoCache # a network service access point will be needed self.nsap = NetworkServiceAccessPoint() # give the NSAP a generic network layer service element self.nse = NetworkServiceElementWithRequests() bind(self.nse, self.nsap) # bind the top layers bind(self, self.asap, self.smap, self.nsap) # create a generic BIP stack, bound to the Annex J server # on the UDP multiplexer self.bip = BIPSimple() self.annexj = AnnexJCodec() self.mux = UDPMultiplexer(self.localAddress) # bind the bottom layers bind(self.bip, self.annexj, self.mux.annexJ) # bind the BIP stack to the network, no network number self.nsap.bind(self.bip, net=self.networkNumber, address=self.localAddress) self.i_am_counter: t.Dict[t.Tuple[str, int], int] = defaultdict(int) self.i_have_counter = defaultdict(int) self.who_is_counter = defaultdict(int) # keep track of requests to line up responses self._request = None self._last_i_am_received = [] self._last_i_have_received = [] # to support CoV self.subscription_contexts = subscription_contexts
[docs] def close_socket(self): # pass to the multiplexer, then down to the sockets self.mux.close_socket()
[docs] def request(self, apdu): # save a copy of the request self._request = apdu # forward it along super(BAC0Application, self).request(apdu)
[docs]@note_and_log class BAC0ForeignDeviceApplication( common_mixin, ApplicationIOController, WhoIsIAmServices, WhoHasIHaveServices, ReadWritePropertyServices, ReadWritePropertyMultipleServices, ChangeOfValueServices, ): """ Defines a basic BACnet/IP application to process BACnet requests. :param *args: local object device, local IP address See BAC0.scripts.BasicScript for more details. """ def __init__( self, localDevice, localAddress, networkNumber: int = None, bbmdAddress=None, bbmdTTL=0, deviceInfoCache=None, aseID=None, iam_req=None, subscription_contexts=None, ): ApplicationIOController.__init__( self, localDevice, deviceInfoCache, aseID=aseID ) self.iam_req = iam_req # local address might be useful for subclasses if isinstance(localAddress, Address): self.localAddress = localAddress else: self.localAddress = Address(localAddress) # include a application decoder self.asap = ApplicationServiceAccessPoint() # pass the device object to the state machine access point so it # can know if it should support segmentation self.smap = StateMachineAccessPoint(localDevice) # the segmentation state machines need access to the same device # information cache as the application self.smap.deviceInfoCache = self.deviceInfoCache # a network service access point will be needed self.nsap = NetworkServiceAccessPoint() # give the NSAP a generic network layer service element self.nse = NetworkServiceElementWithRequests() bind(self.nse, self.nsap) # bind the top layers bind(self, self.asap, self.smap, self.nsap) # create a generic BIP stack, bound to the Annex J server # on the UDP multiplexer self.bip = BIPForeign(bbmdAddress, bbmdTTL) self.annexj = AnnexJCodec() self.mux = UDPMultiplexer(self.localAddress, noBroadcast=True) # bind the bottom layers bind(self.bip, self.annexj, self.mux.annexJ) # bind the NSAP to the stack, no network number self.nsap.bind(self.bip) self.i_am_counter = defaultdict(int) self.i_have_counter = defaultdict(int) self.who_is_counter = defaultdict(int) # keep track of requests to line up responses self._request = None self._last_i_am_received = [] self._last_i_have_received = [] # to support CoV self.subscription_contexts = subscription_contexts
[docs] def close_socket(self): # pass to the multiplexer, then down to the sockets self.mux.close_socket()
[docs]class NullClient(Client): def __init__(self, cid=None): Client.__init__(self, cid=cid)
[docs] def confirmation(self, *args, **kwargs): pass
[docs]@note_and_log class BAC0BBMDDeviceApplication( common_mixin, ApplicationIOController, WhoIsIAmServices, WhoHasIHaveServices, ReadWritePropertyServices, ReadWritePropertyMultipleServices, ChangeOfValueServices, ): """ Defines a basic BACnet/IP application to process BACnet requests. :param *args: local object device, local IP address See BAC0.scripts.BasicScript for more details. """ bdt = [] def __init__( self, localDevice, localAddress, networkNumber: int = None, bdtable=[], deviceInfoCache=None, aseID=None, iam_req=None, subscription_contexts=None, ): self.bdtable = bdtable null_client = NullClient() ApplicationIOController.__init__( self, localDevice, deviceInfoCache, aseID=aseID ) self.iam_req = iam_req # local address might be useful for subclasses if isinstance(localAddress, Address): self.localAddress = localAddress else: self.localAddress = Address(localAddress) # include a application decoder self.asap = ApplicationServiceAccessPoint() # pass the device object to the state machine access point so it # can know if it should support segmentation self.smap = StateMachineAccessPoint(localDevice) # the segmentation state machines need access to the same device # information cache as the application self.smap.deviceInfoCache = self.deviceInfoCache # a network service access point will be needed self.nsap = NetworkServiceAccessPoint() # give the NSAP a generic network layer service element self.nse = NetworkServiceElementWithRequests() bind(self.nse, self.nsap) # bind the top layers bind(self, self.asap, self.smap, self.nsap) # create a generic BIP stack, bound to the Annex J server # on the UDP multiplexer self.bip = BIPBBMD(self.localAddress) self.annexj = AnnexJCodec() self.mux = UDPMultiplexer(self.localAddress, noBroadcast=False) # bind the bottom layers # bind(self.bip, self.annexj, self.mux.annexJ) bind(null_client, self.bip, self.annexj, self.mux.annexJ) if self.bdtable: for bdtentry in self.bdtable: self.add_peer(bdtentry) # bind the NSAP to the stack, no network number self.nsap.bind(self.bip) # self.nsap.bind(self.bip, net=self.networkNumber) self.i_am_counter = defaultdict(int) self.i_have_counter = defaultdict(int) self.who_is_counter = defaultdict(int) # keep track of requests to line up responses self._request = None self._last_i_am_received = [] self._last_i_have_received = [] # to support CoV self.subscription_contexts = subscription_contexts
[docs] def add_peer(self, address): try: bdt_address = Address(address) self.bip.add_peer(bdt_address) except Exception: raise
[docs] def remove_peer(self, address): try: bdt_address = Address(address) self.bip.remove_peer(bdt_address) except Exception: raise
[docs] def close_socket(self): # pass to the multiplexer, then down to the sockets self.mux.close_socket()