#!/usr/bin/python# -*- coding: utf-8 -*-## Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com># Licensed under LGPLv3, see file LICENSE in this source tree.#"""RecurringTask.py - execute a recurring task"""importasyncioimportfunctoolsimportinspectfromtypingimportAny,Callable,Coroutine,Tuple,Unionfrom..core.utils.notesimportnote_and_logfrom.TaskManagerimportTask
[docs]@note_and_logclassRecurringTask(Task):""" Start a recurring task (a function passed) """def__init__(self,fnc:Union[Tuple[Callable,Any],Callable,Coroutine],delay:Union[int,float]=60,name:str="recurring",minimum_delay:Union[int,float]=5,)->None:""" :param fnc: a function or a tuple (function, args) :param delay: (int) Delay between reads executions :returns: Nothing """self.fnc_args=Noneself.delay=delayifisinstance(fnc,tuple):self.func,self.fnc_args=fncelifhasattr(fnc,"__call__"):self.func=fncelse:raiseValueError("You must pass a function or a tuple (function,args) to this...")Task.__init__(self,name=name,delay=delay,minimum_delay=minimum_delay)
[docs]asyncdeftask(self)->None:# Prefer awaiting async callables; offload sync callables with to_thread.def_is_async_callable(fn:Callable)->bool:ifinspect.iscoroutinefunction(fn):returnTruecall=getattr(fn,"__call__",None)ifcallandinspect.iscoroutinefunction(call):returnTrueifisinstance(fn,functools.partial):return_is_async_callable(fn.func)returnFalseifself.fnc_argsisnotNone:if_is_async_callable(self.func):awaitself.func(self.fnc_args)else:try:awaitasyncio.to_thread(self.func,self.fnc_args)except(RuntimeError,AttributeError):# Fallback for environments without to_thread or loop quirksloop=asyncio.get_running_loop()awaitloop.run_in_executor(None,self.func,self.fnc_args)else:if_is_async_callable(self.func):awaitself.func()else:try:awaitasyncio.to_thread(self.func)except(RuntimeError,AttributeError):loop=asyncio.get_running_loop()awaitloop.run_in_executor(None,self.func)