Source code for BAC0.tasks.TaskManager

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
TaskManager.py - creation of threads used for repetitive tasks.

A key building block for point simulation.
"""

import asyncio
import time
from random import random
import typing as t

# --- 3rd party modules ---
# --- this application's modules ---
from ..core.utils.notes import note_and_log

# ------------------------------------------------------------------------------


[docs] async def stopAllTasks(): Task._log.info("Stopping all tasks") for each in Task.tasks: each.aio_task.cancel() Task._log.info("Ok all tasks stopped") await Task.clean_tasklist(all=True) return True
[docs] @note_and_log class Task(object): tasks = [] # high_latency = 60 -> delay * 2
[docs] @classmethod async def clean_tasklist(cls, all: bool = False) -> None: """ Async-clean the task list. If all=True, cancel and await all running tasks, otherwise remove finished tasks only. """ if all: cls._log.debug("Cleaning tasks list (cancel & await all)") # capture current tasks snapshot to avoid mid-iteration mutation running = [ t for t in list(cls.tasks) if getattr(t, "aio_task", None) is not None ] # cancel all running aio tasks for t in running: try: t.aio_task.cancel() except Exception: pass # await completion (collect exceptions, including CancelledError) if running: await asyncio.gather( *(t.aio_task for t in running), return_exceptions=True ) cls.tasks = [] else: # keep only tasks that are not done remaining = [] for each in list(cls.tasks): if getattr(each, "done", False): cls._log.debug(f"Removing task {each.name}") else: remaining.append(each) cls.tasks = remaining
[docs] @classmethod def number_of_tasks(cls): return len(cls.tasks)
def __init__( self, fn: t.Any = None, name: t.Optional[str] = None, delay: float = 0, minimum_delay: float = 5, ): # delay = 0 -> one shot self.id: int = id(self) self.args: t.Any = None self.name: str = name if name is not None else f"Task_{self.id}" if isinstance(fn, tuple): self.fn, self.args = fn else: self.fn = fn minimum_delay = minimum_delay + 0.1 if minimum_delay == 0 else minimum_delay if delay > 0: self.delay = delay if delay >= minimum_delay else minimum_delay else: self.delay = 0 self.previous_execution: t.Optional[float] = None self.average_execution_delay: float = 0 self.average_latency: float = 0 self.next_execution: float = time.time() + delay + (random() / 10) self.execution_time: float = 0.0 self.count: int = 0 self._kwargs: t.Optional[t.Dict[str, t.Any]] = None self._task: t.Optional[asyncio.Task] = None self.aio_task: t.Optional[asyncio.Task] = None
[docs] async def task(self): raise NotImplementedError("Must be implemented")
[docs] async def execute(self): if self.delay > 0: self.log( f"Installing recurring task {self.name} (id:{self.id})", level="info" ) while True: self.count += 1 _start_time: float = time.time() self.log( f"Executing : {self.name} | Count : {self.count}", level="debug" ) self.log(f"Start Time : {_start_time}", level="debug") if self.previous_execution: self.log( f"Previous execution : {self.previous_execution}", level="debug" ) else: self.log("First Run", level="debug") self.average_latency = ( self.average_latency + (_start_time - self.next_execution) ) / 2 try: # if self.fn and self.args is not None: # await self.fn(self.args) # elif self.fn: # await self.fn() # else: if self._kwargs is not None: await self.task(**self._kwargs) else: await self.task() except Exception as error: self.log( f"An exception occured while running the task {self.name} (id:{self.id}) : {error}", level="error", ) if self.previous_execution: _total = self.average_execution_delay + ( _start_time - self.previous_execution ) self.average_execution_delay = _total / 2 else: self.average_execution_delay = self.delay # self.log('Stat for task {}'.format(self), level='info') if self.average_latency > (self.delay * 2): self.log(f"High latency for {self.name}", level="warning") self.log(f"Stats : {self}", level="warning") self.execution_time = time.time() - _start_time self.log(f"Execution Time : {self.execution_time}", level="debug") self.previous_execution = _start_time self.next_execution = time.time() + self.delay await asyncio.sleep(self.delay - (random() / 10)) else: # one shot self.log(f"Running one shot task {self.name} (id:{self.id})", level="info") if self.fn and self.args is not None: await self.fn(self.args) elif self.fn: await self.fn() else: if self._kwargs is not None: await self.task(**self._kwargs) else: await self.task()
[docs] def start(self): self.aio_task = asyncio.create_task(self.execute(), name=f"aio{self.name}") Task.tasks.append(self)
[docs] def stop(self): for each in Task.tasks: if each.id == self.id: each.aio_task.cancel() Task.tasks.remove(each) return True
@property def done(self): if self.aio_task is not None: return self.aio_task.done() else: return False @property def last_time(self): return time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(self.previous_execution) ) @property def next_time(self): return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.next_execution)) @property def latency(self): return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.average_latency))
[docs] def is_alive(self): return
def __repr__(self): return "{:<40} | Avg exec delay : {:.2f} sec | Avg latency : {:.2f} sec | last executed : {} | Next Time : {}".format( self.name, self.average_execution_delay, self.average_latency, self.last_time, self.next_time, ) def __lt__(self, other): # list sort use __lt__... little cheat to reverse list already return self.next_execution > other.next_execution def __eq__(self, other): # list remove use __eq__... so compare with id if isinstance(other, Task): return self.id == other.id else: return self.id == other
[docs] @note_and_log class OneShotTask(Task): def __init__(self, fn=None, args=None, name=None): self.fn = fn self.name = name or fn.__name__ self.args = args Task._log.debug(f"Creating OneShotTask {self.name} with args: {self.args}") super().__init__(fn=self.fn, name=self.name, delay=0)