Source code for BAC0.scripts.Base

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
Doc here
"""
import asyncio
import random
import sys
import typing as t
from collections import defaultdict

# --- standard Python modules ---
from bacpypes3.basetypes import DeviceStatus, HostNPort, ObjectTypesSupported
from bacpypes3.json.util import sequence_to_json
from bacpypes3.local.device import DeviceObject
from bacpypes3.local.networkport import NetworkPortObject
from bacpypes3.pdu import Address
from bacpypes3.primitivedata import CharacterString, ObjectIdentifier
from bacpypes3.vendor import VendorInfo, get_vendor_info

# --- this application's modules ---
from ..core.app.asyncApp import (
    BAC0Application,
)  # BAC0BBMDDeviceApplication,; BAC0ForeignDeviceApplication,
from ..core.functions.GetIPAddr import validate_ip_address
from ..core.functions.TimeSync import TimeHandler
from ..core.io.IOExceptions import InitializationError, UnknownObjectError
from ..core.utils.notes import note_and_log
from ..tasks.TaskManager import stopAllTasks

# --- 3rd party modules ---

# ------------------------------------------------------------------------------


[docs] @note_and_log class LocalObjects(object): def __init__(self, device): self.device = device def __getitem__(self, obj): item = None if isinstance(obj, tuple): obj_type, instance = obj item = self.device.this_application.app.get_object_id((obj_type, instance)) elif isinstance(obj, str): name = obj item = self.device.this_application.app.get_object_name(name) if item is None: raise UnknownObjectError(f"Can't find {obj} in local device") else: return item
[docs] def charstring(val): return CharacterString(val) if isinstance(val, str) else val
[docs] class DiscoveredDevice(t.TypedDict): object_instance: ObjectIdentifier address: Address network_number: t.Set[int] vendor_id: int vendor_name: str
[docs] @note_and_log class Base: """ Build a running BACnet/IP device that accepts WhoIs and IAm requests Initialization requires some minimial information about the local device. :param localIPAddr='127.0.0.1': :param localObjName='BAC0': :param deviceId=None: :param maxAPDULengthAccepted='1024': :param maxSegmentsAccepted='1024': :param segmentationSupported='segmentedBoth': """ _used_ips: t.Set[Address] = set() _last_cov_identifier = 0 _running_cov_tasks = {} def __init__( self, localIPAddr: Address = Address("127.0.0.1/24"), networkNumber: t.Optional[int] = None, localObjName: str = "BAC0", deviceId: t.Optional[int] = None, firmwareRevision: str = "".join(sys.version.split("|")[:2]), maxAPDULengthAccepted: str = "1024", maxSegmentsAccepted: str = "1024", segmentationSupported: str = "segmentedBoth", bbmdAddress: t.Optional[str] = None, bbmdTTL: t.Optional[int] = 0, bdtable: t.Optional[list] = None, modelName: str = "BAC0 Scripting Tool", vendorId: int = 842, vendorName: str = "SERVISYS inc.", description: str = "http://christiantremblay.github.io/BAC0/", location: str = "Bromont, Québec", timezone: str = "America/Montreal", json_file: t.Optional[str] = None, ): self.log("Configurating app", level="debug") # Register Servisys try: _BAC0_vendor = VendorInfo(vendorId) except RuntimeError: pass # we are re-running the script... forgive us _BAC0_vendor = get_vendor_info(vendorId) _BAC0_vendor.register_object_class( ObjectTypesSupported.networkPort, NetworkPortObject ) _BAC0_vendor.register_object_class(ObjectTypesSupported.device, DeviceObject) self.timehandler = TimeHandler(tz=timezone) self.response = None self._initialized = False self._started = False self._stopped = False if localIPAddr in Base._used_ips: raise InitializationError( "IP Address provided ({}) already used by BAC0. Check if another software is using port 47808 on this network interface. If so, you can define multiple IP per interface. Or specify another IP using BAC0.lite(ip='IP/mask')".format( localIPAddr ) ) if validate_ip_address(localIPAddr): self.localIPAddr = localIPAddr else: raise InitializationError( "IP Address provided ({}) invalid. Check if another software is using port 47808 on this network interface. If so, you can define multiple IP per interface. Or specify another IP using BAC0.lite(ip='IP/mask')".format( localIPAddr ) ) self.networkNumber = networkNumber self.Boid = ( int(deviceId) if deviceId else (3056177 + int(random.uniform(0, 1000))) ) self.segmentationSupported = segmentationSupported self.maxSegmentsAccepted = maxSegmentsAccepted self.localObjName = localObjName self.local_objects = LocalObjects(device=self) self.maxAPDULengthAccepted = maxAPDULengthAccepted self.vendorId = vendorId self.vendorName = charstring(vendorName) self.modelName = charstring(modelName) self.description = charstring(description) self.location = charstring(location) self.discoveredDevices: t.Optional[t.Dict[str, DiscoveredDevice]] = None self.systemStatus = DeviceStatus(1) self.bbmdAddress = bbmdAddress self.bbmdTTL = bbmdTTL self.bdtable = bdtable self.firmwareRevision = firmwareRevision self._ric = {} self.subscription_contexts = {} # Cannot reference db.InfluxDB directly since it's an optional import self.database: t.Optional[t.Any] = None self.json_file = json_file try: self.startApp() except InitializationError as error: raise InitializationError( f"Gros probleme : {error}. Address requested : {localIPAddr}" )
[docs] def startApp(self): """ Define the local device, including services supported. Once defined, start the BACnet stack in its own thread. """ self.log("Create Local Device", level="debug") try: app_type = "BACnet/IP App" class config(defaultdict): "Simple class to mimic args dot retrieval" def __init__(self, cfg): for k, v in cfg.items(): self[k] = v def __getattr__(self, key): return self[key] if self.bbmdAddress is not None: mode = "foreign" elif self.bdtable: mode = "bbmd" else: mode = "normal" cfg = { "BAC0": { "bbmdAddress": self.bbmdAddress, "bdt": self.bdtable, "ttl": self.bbmdTTL, }, "device": { "object-name": self.localObjName, # "firmware-revision": self.firmwareRevision, "vendor-identifier": self.vendorId, "vendor-name": "Servisys inc.", "object-identifier": f"device,{self.Boid}", "object-list": [f"device,{self.Boid}", "network-port,1"], "model-name": self.modelName, # "max-apdu-length-accepted": self.maxAPDULengthAccepted, # "max-segments-accepted": self.maxSegmentsAccepted, # "location": self.location, # "description": self.description }, "network-port": { "ip-address": str(self.localIPAddr), "ip-subnet-mask": str(self.localIPAddr.netmask), "bacnet-ip-udp-port": self.localIPAddr.addrPort, "network-number": None, "fd-bbmd-address": sequence_to_json(HostNPort(self.bbmdAddress)), "fd-subscription-lifetime": self.bbmdTTL, "bacnet-ip-mode": mode, }, } if mode == "bbmd": # bdt_json_seq = [f"BDTEntry({addr})" for addr in self.bdtable] cfg["network-port"]["bbmdBroadcastDistributionTable"] = self.bdtable _cfg = config(cfg) self.this_application = BAC0Application( _cfg, self.localIPAddr, json_file=self.json_file ) if mode == "bbmd": self._log.info(f"Populating BDT with {self.bdtable}") self.this_application.populate_bdt() if mode == "foreign": self._log.info( f"Registering as a foreign device to host {self.bbmdAddress} for {self.bbmdTTL} seconds" ) if self.bbmdAddress is None or self.bbmdTTL is None: raise ValueError("Missing bbmdAddress and/or bbmdTTL") self.this_application.register_as_foreign_device_to( host=self.bbmdAddress, lifetime=self.bbmdTTL ) self.log("Starting", level="debug") self._initialized = True try: Base._used_ips.add(self.localIPAddr) self.log(f"Registered as {app_type} | mode {mode}", level="info") self._started = True except OSError as error: self.log(f"Error opening socket: {error}", level="warning") raise InitializationError(f"Error opening socket: {error}") self.log("Running", level="debug") except OSError as error: self.log(f"an error has occurred: {error}", level="error") raise InitializationError(f"Error starting app: {error}") self.log("finally", level="debug")
[docs] def register_foreign_device(self, addr=None, ttl=0): # self.this_application.register_to_bbmd(addr, ttl) raise NotImplementedError()
[docs] def unregister_foreign_device(self): self.this_application.unregister_from_bbmd()
[docs] def disconnect(self) -> asyncio.Task: task = asyncio.create_task(self._disconnect()) return task
async def _disconnect(self): """ Stop the BACnet stack. Free the IP socket. """ self.log("Stopping All running tasks", level="debug") await stopAllTasks() self.log("Stopping BACnet stack", level="debug") # Freeing socket self.this_application.app.close() self._stopped = True # Stop stack thread # self.t.join() self._started = False Base._used_ips.discard(self.localIPAddr) self.log("BACnet stopped", level="info") @property def routing_table(self): """ Routing Table will give all the details about routers and how they connect BACnet networks together. It's a decoded presentation of what bacpypes.router_info_cache contains. Returns a dict with the address of routers as key. """ class Router: def __init__(self, snet, address, dnets, path=None): self.source_network: int = snet self.address: Address = address self.destination_networks: set = dnets self.path: list = path def __repr__(self): return "Source Network: {} | Address: {} | Destination Networks: {} | Path: {}".format( self.source_network, self.address, self.destination_networks, self.path, ) self._routers = {} self._ric = self.this_application.app.nsap.router_info_cache for router, dnets in self._ric.router_dnets.items(): snet, address = router self._routers[str(address)] = Router(snet, address, dnets, path=[]) for path, router_info in self._ric.path_info.items(): router_address, router_status = router_info snet, dnet = path self._routers[str(router_address)].path.append((path, router_status)) return self._routers
[docs] @classmethod def extract_value_from_primitive_data(cls, value): if isinstance(value, float): return float(value) # elif isinstance(value, Boolean): # if value == int(1): # return True # else: # return False elif isinstance(value, int): return int(value) elif isinstance(value, str): return str(value) else: return value