Source code for BAC0.tasks.RecurringTask
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
RecurringTask.py - execute a recurring task
"""
from typing import Any, Callable, Tuple, Union
from ..core.utils.notes import note_and_log
from .TaskManager import Task
[docs]@note_and_log
class RecurringTask(Task):
"""
Start a recurring task (a function passed)
"""
def __init__(
self,
fnc: Union[Tuple[Callable, Any], Callable],
delay: int = 60,
name: str = "recurring",
) -> None:
"""
:param fnc: a function or a tuple (function, args)
:param delay: (int) Delay between reads executions
:returns: Nothing
"""
self.fnc_args = None
if isinstance(fnc, tuple):
self.func, self.fnc_args = fnc
elif hasattr(fnc, "__call__"):
self.func = fnc
else:
raise ValueError(
"You must pass a function or a tuple (function,args) to this..."
)
Task.__init__(self, name=name, delay=delay)
[docs] def task(self) -> None:
if self.fnc_args:
self.func(self.fnc_args)
else:
self.func()