Source code for BAC0.tasks.Poll

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
Poll.py - create a Polling task to repeatedly read a point.
"""

import typing as t

# --- standard Python modules ---
import weakref

from ..core.utils.notes import note_and_log

# --- this application's modules ---
from .TaskManager import Task

if t.TYPE_CHECKING:
    from ..core.devices.Device import RPDeviceConnected, RPMDeviceConnected


# ------------------------------------------------------------------------------
[docs]class MultiplePollingFailures(Exception): pass
[docs]@note_and_log class SimplePoll(Task): """ Start a polling task to repeatedly read a point's Present_Value. ex. device['point_name'].poll(delay=60) """ def __init__(self, point, *, delay: int = 10) -> None: """ :param point: (BAC0.core.device.Points.Point) name of the point to read :param delay: (int) Delay between reads in seconds, defaults = 10sec A delay cannot be < 1sec This task is meant for single points, so BAC0 will allow short delays. This way, a fast polling is available for some points in a device that would not support segmentation. :returns: Nothing """ if delay < 1: delay = 1 if point.properties: self._point = point Task.__init__(self, name="rp_poll", delay=delay) else: raise ValueError("Provide a point object")
[docs] def task(self): self._point.value
[docs]@note_and_log class DevicePoll(Task): """ Start a polling task to repeatedly read a list of points from a device using ReadPropertyMultiple requests. """ def __init__( self, device: t.Union["RPMDeviceConnected", "RPDeviceConnected"], delay: int = 10, name: str = "", prefix: str = "basic_poll", ) -> None: """ :param device: (BAC0.core.devices.Device.Device) device to poll :param delay: (int) Delay between polls in seconds, defaults = 10sec A delay cannot be < 10sec For delays under 10s, use DeviceFastPoll class. :returns: Nothing """ self.failures = 0 self.MAX_FAILURES = 3 self._device = weakref.ref(device) Task.__init__(self, name="{}_{}".format(prefix, name), delay=delay) self._counter = 0 @property def device(self) -> t.Union["RPMDeviceConnected", "RPDeviceConnected", None]: return self._device()
[docs] def task(self) -> None: if self.device.properties.ping_failures > 0: self.device._log.warning( "{} ({}) | Ping failed, skipping polling for now. Resending a ping to speed up things".format( self.device.properties.name, self.device.properties.address ) ) self.device.ping() return try: if self.failures >= self.MAX_FAILURES: raise MultiplePollingFailures( "{} ({}) | Polling failed numerous times in a row... let see what we can do".format( self.device.properties.name, self.device.properties.address ) ) self.device.read_multiple( list(self.device.pollable_points_name), points_per_request=25 ) self._counter += 1 if self._counter == self.device.properties.auto_save: self.device.save(resampling=self.device.properties.save_resampling) if self.device.properties.clear_history_on_save: self.device.clear_histories() self._counter = 0 self.failures = 0 except AttributeError as e: # This error can be seen when defining a controller on a busy network... # When creation fail, polling is created and fail the first time... # So kill the task self.device._log.error( "{} ({}) | Something is wrong while creating the polling task.\nError: {} | Type : {}".format( self.device.properties.name, self.device.properties.address, e, type(e), ) ) # self.stop() self.failures += 1 except ValueError as e: self.failures += 1 self.device._log.error( "{} ({}) | Polling results contains a wrong value. Probably a communication error. Will skip this result and wait for the next cycle.\nError: {} | Type : {}".format( self.device.properties.name, self.device.properties.address, e, type(e), ) ) pass except MultiplePollingFailures as e: self.device._log.warning( "{} ({}) | Trying to ping device then we'll reset the number of failures and get back with polling\nError: {}| Type : {}".format( self.device.properties.name, self.device.properties.address, e, type(e), ) ) if self.device.ping(): self.failures = 0
[docs]@note_and_log class DeviceNormalPoll(DevicePoll): """ Start a normal polling task to repeatedly read a list of points from a device using ReadPropertyMultiple requests. Normal polling will limit the polling speed to 10 second minimum """ def __init__(self, device, delay=10, name=""): """ :param device: (BAC0.core.devices.Device.Device) device to poll :param delay: (int) Delay between polls in seconds, defaults = 10sec :returns: Nothing """ if delay < 10: delay = 10 self._log.info( "Device defined for normal polling with a delay of {}sec".format(delay) ) DevicePoll.__init__( self, device=device, name=name, delay=delay, prefix="rpm_normal_poll" )
[docs]@note_and_log class DeviceFastPoll(DevicePoll): """ Start a fast polling task to repeatedly read a list of points from a device using ReadPropertyMultiple requests. Delay allowed will be 0 to 10 seconds Normal polling will limit the polling speed to 10 second minimum Warning : Fast polling must be used with care or network flooding may occur """ def __init__(self, device, delay=1, name=""): """ :param device: (BAC0.core.devices.Device.Device) device to poll :param delay: (int) Delay between polls in seconds, defaults = 1sec :returns: Nothing """ if delay < 0: delay = 0.01 elif delay > 10: delay = 10 self._log.warning( "Device defined for fast polling with a delay of {}sec".format(delay) ) DevicePoll.__init__( self, device=device, name=name, delay=delay, prefix="rpm_fast_poll" )