#!/usr/bin/python# -*- coding: utf-8 -*-## Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com># Licensed under LGPLv3, see file LICENSE in this source tree.#"""DoOnce.py - execute a task once"""importasynciofromtypingimportCallable,Optionalfrom..core.utils.notesimportnote_and_logfrom.TaskManagerimportOneShotTask
[docs]@note_and_logclassDoOnce(OneShotTask):""" Execute a function once, optionally awaiting it if it's a coroutine. Example:: device['point_name'].poll(delay=60) """def__init__(self,fn:Callable,args:Optional[str]=None):""" :param point: (BAC0.core.device.Points.Point) name of the point to read :param delay: (int) Delay between reads in seconds, defaults = 10sec A delay cannot be < 5sec (there are risks of overloading the device) :returns: Nothing """super().__init__(fn,args)
[docs]asyncdeftask(self):ifasyncio.iscoroutinefunction(self.fn):self._log.debug(f"Running {self.name} with args {self.args}")awaitself.fn(self.args)else:self.fn(self.args)