Source code for BAC0.core.functions.DeviceCommunicationControl
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
Reinitialize.py - creation of ReinitializeDeviceRequest
"""
# --- standard Python modules ---
from bacpypes.apdu import (
DeviceCommunicationControlRequest,
DeviceCommunicationControlRequestEnableDisable,
SimpleAckPDU,
)
from bacpypes.core import deferred
from bacpypes.iocb import IOCB
# --- 3rd party modules ---
from bacpypes.pdu import Address
from bacpypes.primitivedata import CharacterString, Unsigned16
from ...core.io.Read import find_reason
from ...core.utils.notes import note_and_log
from ..io.IOExceptions import ApplicationNotStarted, NoResponseFromController
[docs]@note_and_log
class DeviceCommunicationControl:
"""
Mixin to support DeviceCommunicationControl from BAC0 to other devices
"""
[docs] def dcc(self, address=None, duration=None, password=None, state=None):
"""
Will send DeviceCommunicationControl request
"""
if not self._started:
raise ApplicationNotStarted("BACnet stack not running - use startApp()")
if not address:
raise ValueError("Provide address for request")
if not state:
raise ValueError("Provide state ('enable', 'disable', 'disableInitiation'")
# build a request
request = DeviceCommunicationControlRequest()
request.enableDisable = (
DeviceCommunicationControlRequestEnableDisable.enumerations[state]
)
request.pduDestination = Address(address)
if duration:
request.duration = Unsigned16(duration)
request.password = CharacterString(password)
self._log.debug("{:>12} {}".format("- request:", request))
iocb = IOCB(request) # make an IOCB
# pass to the BACnet stack
deferred(self.this_application.request_io, iocb)
# Unconfirmed request...so wait until complete
iocb.wait() # Wait for BACnet response
if iocb.ioResponse: # successful response
apdu = iocb.ioResponse
if not isinstance(apdu, SimpleAckPDU): # expect an ACK
self._log.warning("Not an ack, see debug for more infos.")
self._log.debug("Not an ack. | APDU : {} / {}".format(apdu, type(apdu)))
return
if iocb.ioError: # unsuccessful: error/reject/abort
apdu = iocb.ioError
reason = find_reason(apdu)
raise NoResponseFromController("APDU Abort Reason : {}".format(reason))
self._log.info(
"DeviceCommunicationControl request sent to device : {}".format(address)
)