Source code for BAC0.scripts.Lite

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
ReadWriteScript - extended version of BasicScript.py

As everything is handled by the BasicScript, select the additional features you want::

    # Create a class that implements a basic script with read and write functions
    from BAC0.scripts.BasicScript import BasicScript
    from BAC0.core.io.Read import ReadProperty
    from BAC0.core.io.Write import WriteProperty
    class ReadWriteScript(BasicScript,ReadProperty,WriteProperty)

Once the class is created, create the local object and use it::

    bacnet = ReadWriteScript(localIPAddr = '192.168.1.10')
    bacnet.read('2:5 analogInput 1 presentValue)

"""
import typing as t

# --- standard Python modules ---
import weakref
from collections import namedtuple

from ..core.devices.Device import RPDeviceConnected, RPMDeviceConnected
from ..core.devices.Points import Point
from ..core.devices.Trends import TrendLog
from ..core.devices.Virtuals import VirtualPoint
from ..core.functions.Calendar import Calendar
from ..core.functions.cov import CoV
from ..core.functions.DeviceCommunicationControl import DeviceCommunicationControl
from ..core.functions.Discover import Discover
from ..core.functions.GetIPAddr import HostIP
from ..core.functions.Reinitialize import Reinitialize
from ..core.functions.Schedule import Schedule
from ..core.functions.Text import TextMixin
from ..core.functions.TimeSync import TimeSync
from ..core.io.IOExceptions import (
    NoResponseFromController,
    NumerousPingFailures,
    Timeout,
    UnrecognizedService,
)
from ..core.io.Read import ReadProperty
from ..core.io.Simulate import Simulation
from ..core.io.Write import WriteProperty
from ..core.utils.notes import note_and_log
from ..infos import __version__ as version

# --- this application's modules ---
from ..scripts.Base import Base
from ..tasks.RecurringTask import RecurringTask
from ..tasks.UpdateCOV import Update_local_COV

try:
    from ..db.influxdb import ConnectionError, InfluxDB

    INFLUXDB = True
except ImportError:
    INFLUXDB = False

from bacpypes.pdu import Address

# ------------------------------------------------------------------------------


[docs]@note_and_log class Lite( Base, Discover, ReadProperty, WriteProperty, Simulation, TimeSync, Reinitialize, DeviceCommunicationControl, CoV, Schedule, Calendar, TextMixin, ): """ Build a BACnet application to accept read and write requests. [Basic Whois/IAm functions are implemented in parent BasicScript class.] Once created, execute a whois() to build a list of available controllers. Initialization requires information on the local device. :param ip='127.0.0.1': Address must be in the same subnet as the BACnet network [BBMD and Foreign Device - not supported] """ def __init__( self, ip: t.Optional[str] = None, port: t.Optional[int] = None, mask: t.Optional[int] = None, bbmdAddress=None, bbmdTTL: int = 0, bdtable=None, ping: bool = True, ping_delay: int = 300, db_params: t.Optional[t.Dict[str, t.Any]] = None, **params, ) -> None: self._log.info( "Starting BAC0 version {} ({})".format( version, self.__module__.split(".")[-1] ) ) self._log.info("Use BAC0.log_level to adjust verbosity of the app.") self._log.info("Ex. BAC0.log_level('silence') or BAC0.log_level('error')") self._log.debug("Configurating app") self._registered_devices = weakref.WeakValueDictionary() # Ping task will deal with all registered device and disconnect them if they do not respond. self._ping_task = RecurringTask( self.ping_registered_devices, delay=ping_delay, name="Ping Task" ) if ping: self._ping_task.start() if ip is None: host = HostIP(port) ip_addr = host.address else: try: ip, subnet_mask_and_port = ip.split("/") try: mask_s, port_s = subnet_mask_and_port.split(":") mask = int(mask_s) port = int(port_s) except ValueError: mask = int(subnet_mask_and_port) except ValueError: ip = ip if not mask: mask = 24 if not port: port = 47808 ip_addr = Address("{}/{}:{}".format(ip, mask, port)) self._log.info( f"Using ip : {ip_addr} on port {ip_addr.addrPort} | broadcast : {ip_addr.addrBroadcastTuple[0]}" ) Base.__init__( self, localIPAddr=ip_addr, bbmdAddress=bbmdAddress, bbmdTTL=bbmdTTL, bdtable=bdtable, **params, ) self._log.info("Device instance (id) : {boid}".format(boid=self.Boid)) self.bokehserver = False self._points_to_trend = weakref.WeakValueDictionary() # Announce yourself self.iam() # Do what's needed to support COV self._update_local_cov_task = namedtuple( "_update_local_cov_task", ["task", "running"] ) self._update_local_cov_task.task = Update_local_COV( self, delay=1, name="Update Local COV Task" ) self._update_local_cov_task.task.start() self._update_local_cov_task.running = True self._log.info("Update Local COV Task started (required to support COV)") # Activate InfluxDB if params are available if db_params and INFLUXDB: try: self.database = ( InfluxDB(db_params) if db_params["name"].lower() == "influxdb" else None ) self._log.info( "Connection made to InfluxDB bucket : {}".format( self.database.bucket ) ) except ConnectionError: self._log.error( "Unable to connect to InfluxDB. Please validate parameters" ) @property def known_network_numbers(self): """ This function will read the table of known network numbers gathered by the NetworkServiceElement. It will also look into the discoveredDevices property and add any network number that would not be in the NSE table. """ if self.discoveredDevices: for each in self.discoveredDevices: addr, instance = each net = addr.split(":")[0] if ":" in addr else None if net: try: self.this_application.nse._learnedNetworks.add(int(net)) except ValueError: pass # proabbly a IP address with a specified port other than 0xBAC0 return self.this_application.nse._learnedNetworks
[docs] def discover( self, networks: t.Union[str, t.List[int], int] = "known", limits: t.Tuple[int, int] = (0, 4194303), global_broadcast: bool = False, reset: bool = False, ): """ Discover is meant to be the function used to explore the network when we connect. It will trigger whois request using the different options provided with parameters. By default, a local broadcast will be used. This is required as in big BACnet network, global broadcast can lead to network flood and loss of data. If not parameters are given, BAC0 will try to : * Find the network on which it is * Find routers for other networks (accessible via local broadcast) * Detect "known networks" * Use the list of known networks and create whois request to find all devices on those networks This should be sufficient for most cases. Once discovery is done, user may access the list of "discovered devices" using :: bacnet.discoveredDevices :param networks (list, integer) : A simple integer or a list of integer representing the network numbers used to issue whois request. :param limits (tuple) : tuple made of 2 integer, the low limit and the high limit. Those are the device instances used in the creation of the whois request. Min : 0 ; Max : 4194303 :param global_broadcast (boolean) : If set to true, a global broadcast will be used for the whois. Use with care. """ if reset: self.discoveredDevices = None found = [] _networks = [] deviceInstanceRangeLowLimit, deviceInstanceRangeHighLimit = limits # Try to find on which network we are self.what_is_network_number() # Try to find local routers... self.whois_router_to_network() self._log.info("Found those networks : {}".format(self.known_network_numbers)) if networks: if isinstance(networks, list): # we'll make multiple whois... for network in networks: if network < 65535: _networks.append(network) elif networks == "known": _networks = self.known_network_numbers.copy() else: if isinstance(networks, int) and networks < 65535: _networks.append(networks) if _networks: for network in _networks: self._log.info("Discovering network {}".format(network)) _res = self.whois( "{}:* {} {}".format( network, deviceInstanceRangeLowLimit, deviceInstanceRangeHighLimit, ), global_broadcast=global_broadcast, ) for each in _res: found.append(each) else: self._log.info( "No BACnet network found, attempting a simple whois using provided device instances limits ({} - {})".format( deviceInstanceRangeLowLimit, deviceInstanceRangeHighLimit ) ) _res = self.whois( "{} {}".format( deviceInstanceRangeLowLimit, deviceInstanceRangeHighLimit ), global_broadcast=global_broadcast, ) for each in _res: found.append(each) return found
[docs] def register_device( self, device: t.Union[RPDeviceConnected, RPMDeviceConnected] ) -> None: oid = id(device) self._registered_devices[oid] = device
[docs] def ping_registered_devices(self) -> None: """ Registered device on a network (self) are kept in a list (registered_devices). This function will allow pinging thoses device regularly to monitor them. In case of disconnected devices, we will disconnect the device (which will save it). Then we'll ping again until reconnection, where the device will be bring back online. To permanently disconnect a device, an explicit device.disconnect(unregister=True [default value]) will be needed. This way, the device won't be in the registered_devices list and BAC0 won't try to ping it. """ for each in self.registered_devices: if isinstance(each, RPDeviceConnected) or isinstance( each, RPMDeviceConnected ): try: self._log.debug( "Ping {}|{}".format( each.properties.name, each.properties.address ) ) each.ping() if each.properties.ping_failures > 3: raise NumerousPingFailures except NumerousPingFailures: self._log.warning( "{}|{} is offline, disconnecting it.".format( each.properties.name, each.properties.address ) ) each.disconnect(unregister=False) else: device_id = each.properties.device_id addr = each.properties.address name = self.read("{} device {} objectName".format(addr, device_id)) if name == each.properties.name: each.properties.ping_failures = 0 self._log.info( "{}|{} is back online, reconnecting.".format( each.properties.name, each.properties.address ) ) each.connect(network=self) each.poll(delay=each.properties.pollDelay)
@property def registered_devices(self): """ Devices that have been created using BAC0.device(args) """ return list(self._registered_devices.values())
[docs] def unregister_device(self, device): """ Remove from the registered list """ oid = id(device) try: del self._registered_devices[oid] except KeyError: pass
[docs] def add_trend(self, point_to_trend: t.Union[Point, TrendLog, VirtualPoint]) -> None: """ Add point to the list of histories that will be handled by Bokeh Argument provided must be of type Point or TrendLog ex. bacnet.add_trend(controller['point_name']) """ if ( isinstance(point_to_trend, Point) or isinstance(point_to_trend, TrendLog) or isinstance(point_to_trend, VirtualPoint) ): oid = id(point_to_trend) self._points_to_trend[oid] = point_to_trend else: raise TypeError("Please provide point containing history")
[docs] def remove_trend( self, point_to_remove: t.Union[Point, TrendLog, VirtualPoint] ) -> None: """ Remove point from the list of histories that will be handled by Bokeh Argument provided must be of type Point or TrendLog ex. bacnet.remove_trend(controller['point_name']) """ if ( isinstance(point_to_remove, Point) or isinstance(point_to_remove, TrendLog) or isinstance(point_to_remove, VirtualPoint) ): oid = id(point_to_remove) else: raise TypeError("Please provide point or trendLog containing history") if oid in self._points_to_trend.keys(): del self._points_to_trend[oid]
@property def devices(self) -> t.List[t.Tuple[str, str, str, int]]: """ This property will create a good looking table of all the discovered devices seen on the network. For that, some requests will be sent over the network to look for name, manufacturer, etc and in big network, this could be a long process. """ lst = [] for device in list(self.discoveredDevices or {}): try: deviceName, vendorName = self.readMultiple( "{} device {} objectName vendorName".format(device[0], device[1]) ) except (UnrecognizedService, ValueError): self._log.warning( "Unrecognized service for {} | {}".format(device[0], device[1]) ) try: deviceName = self.read( "{} device {} objectName".format(device[0], device[1]) ) vendorName = self.read( "{} device {} vendorName".format(device[0], device[1]) ) except NoResponseFromController: self._log.warning("No response from {}".format(device)) continue except (NoResponseFromController, Timeout): self._log.warning("No response from {}".format(device)) continue lst.append((deviceName, vendorName, device[0], device[1])) return lst # type: ignore[return-value] @property def trends(self) -> t.List[t.Any]: """ This will present a list of all registered trends used by Bokeh Server """ return list(self._points_to_trend.values())
[docs] def disconnect(self) -> None: self._log.debug("Disconnecting") for each in self.registered_devices: each.disconnect() super().disconnect()
def __repr__(self) -> str: return "Bacnet Network using ip {} with device id {}".format( self.localIPAddr, self.Boid ) def __getitem__(self, boid_or_localobject): item = self.this_application.objectName[boid_or_localobject] if item is None: for device in self._registered_devices: if str(device.properties.device_id) == str(boid_or_localobject): return device self._log.error("{} not found".format(boid_or_localobject)) else: return item