Source code for BAC0.tasks.Match
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
Match.py - verify a point's status matches its commanded value.
Example:
Is a fan commanded to 'On' actually 'running'?
"""
# --- standard Python modules ---
# --- 3rd party modules ---
from ..core.utils.notes import note_and_log
# --- this application's modules ---
from .TaskManager import Task
# ------------------------------------------------------------------------------
[docs]@note_and_log
class Match(Task):
"""
Match two properties of a BACnet Object (i.e. a point status with its command).
"""
def __init__(self, status=None, command=None, delay=5, name=None):
self._log.debug(
"Creating Match task for {} and {}. Delay : {}".format(
command, status, delay
)
)
if not name:
name = "Match on " + status.properties.name
self.command = command
self.status = status
Task.__init__(self, delay=delay, name=name)
[docs] def task(self):
try:
if self.status.history[-1] != self.command.history[-1]:
_val = (
self.command.history[-1].split(":")[1]
if ":" in self.command.history[-1]
else self.command.history[-1]
)
self.status._setitem(_val)
except Exception:
self._log.error(
"Something wrong matching {} and {}... try again next time...".format(
self.command, self.status
)
)
[docs] def stop(self):
self.status._setitem("auto")
super().stop()
[docs]@note_and_log
class Match_Value(Task):
"""
Verify a point's Present_Value equals the given value after a delay of X seconds.
Thus giving the BACnet controller (and connected equipment) time to respond to the
command.
Match_Value(On, <AI:1>, 5)
i.e. Does Fan value = On after 5 seconds.
"""
def __init__(
self, value=None, point=None, delay=5, name=None, use_last_value=False
):
self._log.debug("Creating MatchValue task for {} and {}".format(value, point))
self.value = value
self.point = point
self.use_last_value = use_last_value
if not name:
name = "Match_Value on " + point.properties.name
Task.__init__(self, delay=delay, name=name)
[docs] def task(self):
try:
if self.use_last_value:
_point = self.point.lastValue
else:
_point = self.point.value
value = self.value() if hasattr(self.value, "__call__") else self.value
if value != _point:
self.point._set(value)
except Exception:
self._log.error(
"Something is wrong matching {} and {}... try again next time".format(
self.value, self.point
)
)
[docs] def stop(self):
self.point._set("auto")
super().stop()