#!/usr/bin/python# -*- coding: utf-8 -*-## Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com># Licensed under LGPLv3, see file LICENSE in this source tree.#"""Match.py - verify a point's status matches its commanded value.Example: Is a fan commanded to 'On' actually 'running'?"""# --- standard Python modules ---# --- 3rd party modules ---importasynciofrom..core.io.IOExceptionsimportNotReadyErrorfrom..core.utils.notesimportnote_and_log# --- this application's modules ---from.TaskManagerimportTask# ------------------------------------------------------------------------------
[docs]@note_and_logclassMatch(Task):""" Match two properties of a BACnet Object (i.e. a point status with its command). """def__init__(self,status=None,command=None,delay=5,name=None):self._log.debug(f"Creating Match task for {command} and {status}. Delay : {delay}")ifnotname:name="Match on "+status.properties.nameself.command=commandself.status=statusTask.__init__(self,delay=delay,name=name)
[docs]asyncdeftask(self):if(self.status.properties.network.initializedisFalseorself.status.properties.network.initializedisNoneorself.statusisNone):raiseNotReadyError(f"{self.status} is not ready")if(self.command.properties.network.initializedisFalseorself.command.properties.network.initializedisNoneorself.commandisNone):raiseNotReadyError(f"{self.command} is not ready")try:ifself.status.history[-1]!=self.command.history[-1]:_val=(self.command.history[-1].split(":")[1]if":"inself.command.history[-1]elseself.command.history[-1])self.log(f"Match value is {_val}",level="debug")awaitself.status._setitem(_val.replace(" ",""))except(NotReadyError,TypeError)aserror:self.log(f"Problem executing match value task {self.status.name} -> {self.command.name} : {error}",level="warning",)exceptExceptionaserror:self._log.error(f"Something wrong matching {self.command.properties.name} and {self.status.properties.name}... try again next time...\nError:{error}")awaitasyncio.sleep(1)
[docs]@note_and_logclassMatch_Value(Task):""" Verify a point's Present_Value equals the given value after a delay of X seconds. Thus giving the BACnet controller (and connected equipment) time to respond to the command. Match_Value(On, <AI:1>, 5) i.e. Does Fan value = On after 5 seconds. """def__init__(self,value=None,point=None,delay=5,name=None,use_last_value=False):# self.log(f"Creating MatchValue task for {value} and {point}", level="debug")# if not isinstance(value, (float, int, str, bool)) or not hasattr(self.value, "__call__"):# raise ValueError("Value must be a float, int, str or bool OR must be a callable function that returns one of these types.")self.value=valueself.point=pointself.use_last_value=use_last_valueifnotname:name="Match_Value on "+point.properties.nameTask.__init__(self,delay=delay,name=name)
[docs]asyncdeftask(self):if(self.point.properties.device.initializedisFalseorself.point.properties.device.initializedisNoneorself.pointisNone):raiseNotReadyError(f"{self.point} is not ready")try:ifself.use_last_value:_point=self.point.lastValueelse:_point=awaitself.point.valuevalue=self.value()ifhasattr(self.value,"__call__")elseself.valueifvalue!=_point:awaitself.point._set(value=value)except(NotReadyError,TypeError)aserror:self.log(f"Problem executing match value task on {self.point.properties.name} -> {value}: {error}",level="warning",)awaitasyncio.sleep(1)exceptExceptionaserror:self.log(f"Something is wrong matching {self.value} and {self.point.properties.name}... try again next time {error}",level="error",)awaitasyncio.sleep(1)
asyncdef_before_stop(self):try:awaitself.point._set("auto")except(ValueError,TypeError):self.log("Could not set {} to auto. If this is a network input, it is normal as we didn't have to override or simulate".format(self.point),level="warning",)