Source code for BAC0.core.functions.Schedule

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
import typing as t

# --- standard Python modules ---
from datetime import time as dt_time

from bacpypes3.apdu import WritePropertyRequest
from bacpypes3.app import Application
from bacpypes3.basetypes import DailySchedule, TimeValue
from bacpypes3.constructeddata import Any, ArrayOf

# --- 3rd party modules ---
from bacpypes3.pdu import Address
from bacpypes3.primitivedata import Enumerated, Integer, Real

from ...core.app.asyncApp import BAC0Application
from ...core.utils.notes import note_and_log


[docs] @note_and_log class Schedule: """ Everything you need to write to a schedule """ WeeklySchedule = ArrayOf(DailySchedule) schedules = {} days = [ "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday", ] schedule_example_multistate = { "states": {"Occupied": 1, "UnOccupied": 2, "Standby": 3, "Not Set": 4}, "week": { "monday": [("1:00", "Occupied"), ("17:00", "UnOccupied")], "tuesday": [("2:00", "Occupied"), ("17:00", "UnOccupied")], "wednesday": [("3:00", "Occupied"), ("17:00", "UnOccupied")], "thursday": [("4:00", "Occupied"), ("17:00", "UnOccupied")], "friday": [("5:00", "Occupied"), ("17:00", "UnOccupied")], "saturday": [("6:00", "Occupied"), ("17:00", "UnOccupied")], "sunday": [("7:00", "Occupied"), ("17:00", "UnOccupied")], }, } schedule_example_binary = { "states": {"inactive": 0, "active": 1}, "week": { "monday": [("1:00", "active"), ("16:00", "inactive")], "tuesday": [("2:00", "active"), ("16:00", "inactive")], "wednesday": [("3:00", "active"), ("16:00", "inactive")], "thursday": [("4:00", "active"), ("16:00", "inactive")], "friday": [("5:00", "active"), ("16:00", "inactive")], "saturday": [("6:00", "active"), ("16:00", "inactive")], "sunday": [("7:00", "active"), ("16:00", "inactive")], }, } schedule_example_analog = { "states": "analog", "week": { "monday": [("1:00", 22), ("18:00", 19)], "tuesday": [("2:00", 22), ("18:00", 19)], "wednesday": [("3:00", 22), ("18:00", 19)], "thursday": [("4:00", 22), ("18:00", 19)], "friday": [("5:00", 22), ("18:00", 19)], "saturday": [("6:00", 22), ("18:00", 19)], "sunday": [("7:00", 22), ("18:00", 19)], }, }
[docs] def create_weeklySchedule(self, dict_schedule, object_reference=None): """ From a structured dict (see schedule_example), create a WeeklySchedule an ArrayOf(DailySchedule) """ ds = dict_schedule if object_reference is not None: self.schedules[object_reference] = ds def _set_value(v): try: if dict_schedule["states"].lower() == "analog": return Real(v) except AttributeError: if dict_schedule["states"] == {"inactive": 0, "active": 1}: return Integer(dict_schedule["states"][v]) else: return Enumerated(dict_schedule["states"][v]) daily_schedules = [] for day in Schedule.days: list_of_events = dict_schedule["week"][day] _daily_schedule = [ TimeValue(time=event[0], value=_set_value(event[1])) for event in list_of_events ] daily_schedules.append(DailySchedule(daySchedule=_daily_schedule)) return Schedule.WeeklySchedule(daily_schedules)
[docs] def make_weeklySchedule_request(self, destination, object_instance, weeklySchedule): request = WritePropertyRequest( objectIdentifier=("schedule", object_instance), propertyIdentifier="weeklySchedule", ) address = Address(destination) request.pduDestination = address request.propertyValue = Any(weeklySchedule) request.priority = 15 return request
[docs] def send_weeklyschedule_request(self, request, timeout=10): _this_application: BAC0Application = self.this_application _app: Application = _this_application.app self.log(f"{'- request:':>12} {request}", level="debug") _app.request(request) self._log.info( f"Schedule Write request sent to device : {request.pduDestination}" )
[docs] def write_weeklySchedule(self, destination, schedule_instance, schedule): weeklyschedule = self.create_weeklySchedule(schedule) request = self.make_weeklySchedule_request( destination=destination, object_instance=schedule_instance, weeklySchedule=weeklyschedule, ) self.send_weeklyschedule_request(request)
[docs] async def read_weeklySchedule(self, address, schedule_instance): """ This function will turn the weeklySchedule received into a human readable dict. This dict can then be modified and used to write back to controller using bacnet.write_weeklySchedule """ try: ( _schedule, object_references, reliability, priority, presentValue, ) = await self.readMultiple( "{} schedule {} weeklySchedule listOfObjectPropertyReferences reliability priorityForWriting presentValue".format( address, schedule_instance ) ) except Exception: # Read Multiple not supported... try single prop read def _read(prop): return self.read(f"{address} schedule {schedule_instance} {prop}") try: _schedule = _read("weeklySchedule") object_references = _read("listOfObjectPropertyReferences") reliability = _read("reliability") priority = _read("priorityForWriting") presentValue = _read("presentValue") except Exception: raise () schedule: t.Dict[str, t.Union[t.Dict, t.List]] = {} _state_text: t.Union[str, range, t.List[str], None] = None offset_MV = 0 if len(object_references) == 0 else 1 try: first_obj_id = object_references[0] obj_type, obj_instance = first_obj_id.objectIdentifier if "binary" in str(obj_type): _state_text = ["inactive", "active"] elif "multi" in str(obj_type): _state_text = await self.read( f"{address} {obj_type} {obj_instance} stateText" ) elif "analog" in str(obj_type): _state_text = "analog" schedule["object_references"] = [ objectId.objectIdentifier for objectId in object_references ] schedule["references_names"] = [ await self.read( "{} {} {} objectName".format( address, each.objectIdentifier[0], each.objectIdentifier[1] ) ) for each in object_references ] except Exception as error: self.log(f"Error {error}", level="error") # Not used in device, not linked... _state_text = range(255) schedule["object_references"] = [] schedule["references_names"] = [] # re-ordered to make type inference possible, but not sure the logic here # is 100% sound (ignored type warnings both look like actual problems). # keeping logic intact and ignoring warnings for now sched_states = {} if _state_text == ["inactive", "active"]: for i, each in enumerate(_state_text): sched_states[each] = i presentValue = "{} ({})".format( list(sched_states.keys())[int(presentValue.get_value())], presentValue.get_value(), ) elif _state_text == "analog": sched_states = _state_text # type: ignore[assignment] if presentValue is not None: presentValue = f"{presentValue.get_value()}" else: try: for i, each in enumerate(_state_text): # type: ignore[arg-type] sched_states[each] = i + offset_MV presentValue = "{} ({})".format( list(sched_states.keys())[ int(presentValue.get_value()) - offset_MV ], presentValue.get_value(), ) except TypeError: presentValue = presentValue.get_value() schedule["states"] = sched_states schedule["reliability"] = reliability schedule["priority"] = priority schedule["presentValue"] = presentValue schedule["week"] = self.decode_weeklySchedule(_schedule, _state_text, offset_MV) return schedule
[docs] def decode_weeklySchedule(self, weeklySchedule, states, offset_MV): week = {} for i, day in enumerate(Schedule.days): week[day] = self.decode_dailySchedule(weeklySchedule[i], states, offset_MV) return week
[docs] def decode_dailySchedule(self, dailySchedule, states, offset_MV): events = [] for each in dailySchedule.daySchedule: hour, minute, second, hundreth = each.time _time = dt_time(hour, minute, second, hundreth).strftime("%H:%M") try: _value = None if each.value.get_value() is not None: if states == ["inactive", "active"]: _value = states[int(each.value.get_value())] elif states == "analog": _value = float(each.value.get_value()) else: _value = states[int(each.value.get_value()) - offset_MV] events.append((_time, _value)) except IndexError: events.append((_time, f"??? ({each.value.get_value()})")) return events