Source code for BAC0.core.functions.Discover

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
Discover.py

Classes needed to make discovering functions on a BACnet network

"""
# --- standard Python modules ---
import time

# --- 3rd party modules ---
from bacpypes.apdu import (
    IAmRequest,
    WhoHasLimits,
    WhoHasObject,
    WhoHasRequest,
    WhoIsRequest,
)
from bacpypes.core import deferred
from bacpypes.iocb import IOCB, IOController, SieveQueue
from bacpypes.netservice import NetworkServiceElement
from bacpypes.npdu import (
    IAmRouterToNetwork,
    InitializeRoutingTable,
    InitializeRoutingTableAck,
    NetworkNumberIs,
    RejectMessageToNetwork,
    WhatIsNetworkNumber,
    WhoIsRouterToNetwork,
)
from bacpypes.pdu import Address, GlobalBroadcast, LocalBroadcast
from bacpypes.primitivedata import CharacterString, ObjectIdentifier

from ...core.utils.notes import note_and_log

# --- this application's modules ---
from ..io.IOExceptions import ApplicationNotStarted


# ------------------------------------------------------------------------------
[docs]@note_and_log class NetworkServiceElementWithRequests(IOController, NetworkServiceElement): """ This class will add the capability to send requests at network level And capability to read responses for NPDU Deals with IOCB so the request can be deferred to task manager """ def __init__(self): NetworkServiceElement.__init__(self) IOController.__init__(self) # no pending request self._request = None self._iartn = [] self._learnedNetworks = set() self.queue_by_address = {} self._routing_table = {}
[docs] def process_io(self, iocb): # get the destination address from the pdu adapter, npdu = iocb.args[0] destination_address = npdu.pduDestination # look up the queue queue = self.queue_by_address.get(destination_address, None) if not queue: queue = SieveQueue(self.request, address=destination_address) self.queue_by_address[destination_address] = queue # ask the queue to process the request queue.request_io(iocb)
def _net_complete(self, npdu): # look up the queue queue = self.queue_by_address.get(npdu.pduDestination, None) if not queue: return # make sure it has an active iocb if not queue.active_iocb: return # this request is complete if isinstance( npdu, ( None.__class__, IAmRouterToNetwork, InitializeRoutingTableAck, NetworkNumberIs, ), ): queue.complete_io(queue.active_iocb, npdu) elif isinstance(npdu, RejectMessageToNetwork): queue.abort_io(queue.active_iocb, npdu) else: raise RuntimeError("unrecognized NPDU type") # if the queue is empty and idle, forget about the controller if not queue.ioQueue.queue and not queue.active_iocb: del self.queue_by_address[npdu.pduDestination]
[docs] def request(self, arg): adapter, npdu = arg # save a copy of the request self._request = npdu # forward it along NetworkServiceElement.request(self, adapter, npdu)
[docs] def indication(self, adapter, npdu): if isinstance(npdu, IAmRouterToNetwork): # if isinstance(self._request, WhoIsRouterToNetwork): address, netlist = str(npdu.pduSource), npdu.iartnNetworkList self._log.info("{} router to {}".format(address, netlist)) self._iartn.append(address) self._routing_table[address] = netlist for each in npdu.iartnNetworkList: self._learnedNetworks.add(int(each)) elif isinstance(npdu, InitializeRoutingTableAck): self._log.info("{} routing table".format(npdu.pduSource)) for rte in npdu.irtaTable: self._log.info( " {} {} {}".format(rte.rtDNET, rte.rtPortID, rte.rtPortInfo) ) elif isinstance(npdu, NetworkNumberIs): self._log.info( "{} network number is {}".format(npdu.pduSource, npdu.nniNet) ) self._learnedNetworks.add(int(npdu.nniNet)) elif isinstance(npdu, RejectMessageToNetwork): self._log.warning( "{} Rejected message to network (reason : {})".format( npdu.pduSource, rejectMessageToNetworkReasons[npdu.rmtnRejectionReason], ) ) # forward it along NetworkServiceElement.indication(self, adapter, npdu)
[docs] def response(self, adapter, npdu): # forward it along NetworkServiceElement.response(self, adapter, npdu)
[docs] def confirmation(self, adapter, npdu): # forward it along self._net_complete(npdu) NetworkServiceElement.confirmation(self, adapter, npdu)
[docs]@note_and_log class Discover: """ Define BACnet WhoIs and IAm functions. """
[docs] def whois(self, *args, global_broadcast=False, destination=None): """ Build a WhoIs request :param args: string built as [ <addr>] [ <lolimit> <hilimit> ] **optional** :returns: discoveredDevices as a defaultdict(int) Example:: whois(global_broadcast=True) # WhoIs broadcast globally. Every device will respond with an IAm whois('2:5') # WhoIs looking for the device at (Network 2, Address 5) whois('10 1000') # WhoIs looking for devices in the ID range (10 - 1000) """ if not self._started: raise ApplicationNotStarted("BACnet stack not running - use startApp()") if args: args = args[0].split() msg = args if args else "any" self._log.debug("do_whois {!r}".format(msg)) # build a request request = WhoIsRequest() if (len(args) == 1) or (len(args) == 3): self._log.info("{:>12} {}".format("- discovered addr:", args)) request.pduDestination = Address(args[0]) del args[0] else: if global_broadcast: request.pduDestination = GlobalBroadcast() else: request.pduDestination = LocalBroadcast() if len(args) == 2: try: request.deviceInstanceRangeLowLimit = int(args[0]) request.deviceInstanceRangeHighLimit = int(args[1]) except ValueError: pass self._log.debug("{:>12} {}".format("- request:", request)) if destination: request.pduDestination = Address(destination) iocb = IOCB(request) # make an IOCB self.this_application._last_i_am_received = [] # pass to the BACnet stack deferred(self.this_application.request_io, iocb) iocb.wait() # Wait for BACnet response if iocb.ioResponse: # successful response pass if iocb.ioError: # unsuccessful: error/reject/abort pass for each in range(100): time.sleep(1 / 1000) self.discoveredDevices = self.this_application.i_am_counter return self.this_application._last_i_am_received
def _iam_request(self, destination=None): """ Build the IOCB request for a I Am """ try: # build a response request = IAmRequest() request.pduDestination = destination if destination else GlobalBroadcast() # fill the response with details about us (from our device object) request.iAmDeviceIdentifier = self.this_device.objectIdentifier request.maxAPDULengthAccepted = self.this_device.maxApduLengthAccepted request.segmentationSupported = self.this_device.segmentationSupported request.vendorID = self.this_device.vendorIdentifier self._log.debug("{:>12} {}".format("- request:", request)) return request except Exception as error: self._log.error("exception: {!r}".format(error)) raise
[docs] def iam(self, destination=None): """ Build an IAm response. IAm are sent in response to a WhoIs request that; matches our device ID, whose device range includes us, or is a broadcast. Content is defined by the script (deviceId, vendor, etc...) :returns: bool Example:: iam() """ self._log.debug("do_iam") try: # build a response request = self._iam_request(destination=destination) iocb = IOCB(request) # make an IOCB deferred(self.this_application.request_io, iocb) iocb.wait() return True except Exception as error: self._log.error("exception: {!r}".format(error)) return False
[docs] def whois_router_to_network(self, network=None, *, destination=None): # build a request try: request = WhoIsRouterToNetwork() if network: request.wirtnNetwork = int(network) if destination: request.pduDestination = Address(destination) self._log.debug( "WhoIsRouterToNetwork Destination : {}".format(destination) ) else: request.pduDestination = LocalBroadcast() except: self._log.error("WhoIsRouterToNetwork : invalid arguments") return iocb = IOCB((self.this_application.nsap.local_adapter, request)) # make an IOCB iocb.set_timeout(2) deferred(self.this_application.nse.request_io, iocb) iocb.wait() try: self.init_routing_table(str(self.this_application.nse._iartn.pop())) except IndexError: pass
[docs] def init_routing_table(self, address): """ irt <addr> Send an empty Initialize-Routing-Table message to an address, a router will return an acknowledgement with its routing table configuration. """ # build a request self._log.info("Addr : {}".format(address)) try: request = InitializeRoutingTable() request.pduDestination = Address(address) except: self._log.error("invalid arguments") return iocb = IOCB((self.this_application.nsap.local_adapter, request)) # make an IOCB iocb.set_timeout(2) deferred(self.this_application.nse.request_io, iocb) iocb.wait()
[docs] def what_is_network_number(self, destination=None): """ winn [ <addr> ] Send a What-Is-Network-Number message. If the address is unspecified the message is locally broadcast. """ # build a request request = WhatIsNetworkNumber() if destination: request.pduDestination = Address(destination) else: request.pduDestination = LocalBroadcast() iocb = IOCB((self.this_application.nsap.local_adapter, request)) # make an IOCB iocb.set_timeout(2) deferred(self.this_application.nse.request_io, iocb) iocb.wait()
[docs] def whohas( self, object_id=None, object_name=None, instance_range_low_limit=0, instance_range_high_limit=4194303, destination=None, global_broadcast=False, ): """ Object ID : analogInput:1 Object Name : string Instance Range Low Limit : 0 Instance Range High Limit : 4194303 destination (optional) : If empty, local broadcast will be used. global_broadcast : False """ obj_id = ObjectIdentifier(object_id) if object_name and not object_id: obj_name = CharacterString(object_name) obj = WhoHasObject(objectName=obj_name) elif object_id and not object_name: obj = WhoHasObject(objectIdentifier=obj_id) else: obj_name = CharacterString(object_name) obj = WhoHasObject(objectIdentifier=obj_id, objectName=obj_name) limits = WhoHasLimits( deviceInstanceRangeLowLimit=instance_range_low_limit, deviceInstanceRangeHighLimit=instance_range_high_limit, ) request = WhoHasRequest(object=obj, limits=limits) if destination: request.pduDestination = Address(destination) else: if global_broadcast: request.pduDestination = GlobalBroadcast() else: request.pduDestination = LocalBroadcast() iocb = IOCB(request) # make an IOCB iocb.set_timeout(2) deferred(self.this_application.request_io, iocb) iocb.wait() iocb = IOCB(request) # make an IOCB self.this_application._last_i_have_received = [] if iocb.ioResponse: # successful response pass if iocb.ioError: # unsuccessful: error/reject/abort pass time.sleep(3) # self.discoveredObjects = self.this_application.i_am_counter return self.this_application._last_i_have_received
rejectMessageToNetworkReasons = [ "Other Error", "The router is not direclty connected to DNET and cannot find a router to DNET on any direclty connected network using Who-Is-Router-To-Network messages", "The tour is busy and unable to accept messages for the specified DNET at the present time", "It is an unknown network layer message", "The message is too long to be routed to this DNET", "The source message was rejected due to a BACnet security error and that error cannot be forwarded to the source device", "The source message was rejected due to errors in the addressing. The length of th DADR or SADR was determined to be invalid", ]