#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2017 by Christian Tremblay, P.Eng <christian.tremblay@servisysDeviceObject.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
"""
Device.py - describe a BACnet Device
"""
import asyncio
import logging
import os.path
# --- standard Python modules ---
from collections import namedtuple
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
# --- this application's modules ---
from bacpypes3.basetypes import ServicesSupported
from bacpypes3.errors import NoResponse
from bacpypes3.primitivedata import PropertyIdentifier
# from ...bokeh.BokehRenderer import BokehPlot
from ...db.sql import SQLMixin
from ...tasks.DoOnce import DoOnce
from ...tasks.Poll import DeviceOneShotPoll
from ..io.IOExceptions import (
BadDeviceDefinition,
DeviceNotConnected,
NoResponseFromController,
RemovedPointException,
SegmentationNotSupported,
WritePropertyException,
WrongParameter,
)
from ..utils.lookfordependency import pandas_if_available
from ..utils.notes import note_and_log
from .mixins.read_mixin import ReadProperty, ReadPropertyMultiple
from .Points import BooleanPoint, EnumPoint, NumericPoint, OfflinePoint, Point
from .Virtuals import VirtualPoint
_PANDAS, pd, _, _ = pandas_if_available()
# ------------------------------------------------------------------------------
[docs]
class DeviceProperties(object):
def __init__(self):
self.name: str = "Unknown"
self.address: Optional[str] = None
self.device_id: Optional[int] = None
self.network: Optional[Any] = None
self.pollDelay: Optional[int] = None
self.objects_list: Optional[List] = None
self.pss: ServicesSupported = ServicesSupported()
self.multistates: Optional[Dict] = None
self.db_name: Optional[str] = None
self.segmentation_supported: bool = True
self.history_size: Optional[int] = None
self.save_resampling: str = "1s"
self.clear_history_on_save: Optional[bool] = None
self.bacnet_properties: Dict[PropertyIdentifier, Any] = {}
self.auto_save: Optional[bool] = None
self.fast_polling: bool = False
self.vendor_id: int = 0
self.ping_failures: int = 0
@property
def asdict(self) -> Dict:
return self.__dict__
def __repr__(self):
return f"{self.asdict}"
[docs]
@note_and_log
class Device(SQLMixin):
"""
This class represents a BACnet device. It provides methods to read, write, simulate, and release
communication with the device on the network.
Parameters:
address (str, optional): The address of the device (e.g., '2:5'). Defaults to None.
device_id (int, optional): The BACnet device ID (boid). Defaults to None.
network (BAC0.scripts.ReadWriteScript.ReadWriteScript, optional): Defined by BAC0.connect(). Defaults to None.
poll (int, optional): If greater than 0, the device will poll every point each x seconds. Defaults to None.
from_backup (str, optional): SQLite backup file. Defaults to None.
segmentation_supported (bool, optional): When set to False, BAC0 will not use read property multiple to poll the device. Defaults to None.
object_list (list, optional): User can provide a custom object list for the creation of the device. The object list must be built using the same pattern returned by bacpypes when polling the objectList property. Defaults to None.
auto_save (bool or int, optional): If False or 0, auto_save is disabled. To activate, pass an integer representing the number of polls before auto_save is called. Will write the histories to SQLite db locally. Defaults to None.
clear_history_on_save (bool, optional): If set to True, will clear device history. Defaults to None.
"""
def __init__(
self,
address: Optional[str] = None,
device_id: Optional[int] = None,
network: Optional[Any] = None,
*,
poll: int = 10,
from_backup: Optional[str] = None, # filename of backup
segmentation_supported: bool = True,
object_list: Optional[List] = None,
auto_save: bool = False,
save_resampling: str = "1s",
clear_history_on_save: bool = False,
history_size: Optional[int] = None,
reconnect_on_failure: bool = True,
):
self.properties = DeviceProperties()
# self.initialized = False
self.properties.address = address
self.properties.device_id = device_id
self.properties.network = network
self.properties.pollDelay = poll
self.properties.fast_polling = True if poll < 10 else False
self.properties.name = ""
self.properties.vendor_id = 0
self.properties.objects_list = []
self.properties.pss = ServicesSupported()
self.properties.multistates = {}
self.properties.auto_save = auto_save
self.properties.save_resampling = save_resampling
self.properties.clear_history_on_save = clear_history_on_save
self.properties.history_size = history_size
self._reconnect_on_failure = reconnect_on_failure
self.segmentation_supported = segmentation_supported
self.custom_object_list = object_list
# self.db = None
# Todo : find a way to normalize the name of the db
self.properties.db_name = None
self.points = []
self._list_of_trendlogs = {}
self._polling_task = namedtuple("_polling_task", ["task", "running"])
self._polling_task.task = None
self._polling_task.running = False
self._find_overrides_progress = 0.0
self._find_overrides_running = False
self._release_overrides_progress = 0.0
self._release_overrides_running = False
self.creation_task = (
None # when sent to asyncio.create_tasks, it will be stored here
)
self.note("Controller initialized")
if from_backup:
filename = from_backup
db_name = filename.split(".")[0]
self.properties.network = None
if os.path.isfile(filename):
self.properties.db_name = db_name
else:
raise FileNotFoundError(f"Can't find {filename} on drive")
else:
if (
self.properties.network
and self.properties.address
and self.properties.device_id is not None
):
pass
else:
raise BadDeviceDefinition(
"Please provide address, device id and network or specify from_backup argument"
)
@property
def initialized(self):
if isinstance(self, DeviceConnected):
return True
return False
[docs]
async def new_state(self, newstate: Any) -> None:
"""
Changes the state of the device.
This method forms the basis of the state machine mechanism and is used to transition between device states.
It also calls the state initialization function.
:param newstate: The new state to transition to.
:type newstate: Any
:return: None
"""
self.log(
f"Changing {self.properties.device_id} state to {str(newstate).split('.')[-1]}",
level="info",
)
self.__class__ = newstate
await self._init_state()
async def _init_state(self) -> None:
"""
Execute additional code upon state modification
"""
raise NotImplementedError()
[docs]
def connect(self) -> None:
"""
Connect the device to the network
"""
raise NotImplementedError()
[docs]
def disconnect(self, save_on_disconnect=True, unregister=True) -> None:
asyncio.create_task(self._disconnect(save_on_disconnect, unregister))
async def _disconnect(self, save_on_disconnect=True, unregister=True) -> None:
raise NotImplementedError()
[docs]
def initialize_device_from_db(self) -> None:
raise NotImplementedError()
[docs]
def df(self, list_of_points: List[str], force_read: bool = True):
"""
Build a pandas DataFrame from a list of points. DataFrames are used to present and analyze data.
:param list_of_points: a list of point names as str
:returns: pd.DataFrame
"""
raise NotImplementedError()
@property
def simulated_points(self) -> Iterator[Point]:
"""
iterate over simulated points
:returns: points if simulated (out_of_service == True)
:rtype: BAC0.core.devices.Points.Point
"""
for each in self.points:
if each.properties.simulated[0]:
yield each
async def _buildPointList(self) -> None:
"""
Read all points from a device into a (Pandas) dataframe (Pandas). Items are
accessible by point name.
"""
raise NotImplementedError()
def __getitem__(
self, point_name: Union[str, List[str]]
) -> Point:
"""
Get a point from its name.
If a list is passed - a dataframe is returned.
:param point_name: (str) name of the point or list of point_names
:type point_name: str
:returns: (Point) the point (can be Numeric, Boolean or Enum) or pd.DataFrame
"""
raise NotImplementedError()
def __iter__(self) -> Point:
"""
When iterating a device, iterate points of it.
"""
raise NotImplementedError()
def __contains__(self, value: str) -> bool:
"When using in..."
raise NotImplementedError()
@property
def points_name(self) -> List[str]:
"""
When iterating a device, iterate points of it.
"""
raise NotImplementedError()
[docs]
def to_excel(self) -> None:
"""
Using xlwings, make a dataframe of all histories and save it
"""
raise NotImplementedError()
def __setitem__(self, point_name: str, value: float) -> None:
"""
Write, sim or ovr value
:param point_name: Name of the point to set
:param value: value to write to the point
:type point_name: str
:type value: float
"""
raise NotImplementedError()
def __len__(self) -> int:
"""
Will return number of points available
"""
raise NotImplementedError()
def _parseArgs(self, arg: str) -> Tuple[str, str]:
"""
Given a string, interpret the last word as the value, everything else is
considered to be the point name.
"""
args = arg.split()
pointName = " ".join(args[:-1])
value = args[-1]
return (pointName, value)
[docs]
def clear_histories(self) -> None:
for point in self.points:
point.clear_history()
[docs]
def update_history_size(self, size: Optional[int] = None) -> None:
for point in self.points:
point.properties.history_size = size
@property
def analog_units(self) -> Dict[str, str]:
raise NotImplementedError()
@property
def temperatures(self) -> Dict[str, str]:
raise NotImplementedError()
@property
def percent(self) -> Dict[str, str]:
raise NotImplementedError()
@property
def multi_states(self) -> Dict[str, str]:
raise NotImplementedError()
@property
def binary_states(self) -> Dict[str, str]:
raise NotImplementedError()
def _findPoint(self, name: str, force_read: bool = True) -> Point:
"""
Helper that retrieve point based on its name.
:param name: (str) name of the point
:param force_read: (bool) read value of the point each time the function is called.
:returns: Point object
:rtype: BAC0.core.devices.Point.Point (NumericPoint, EnumPoint or BooleanPoint)
"""
raise NotImplementedError()
[docs]
def find_point(self, objectType: str, objectAddress: float) -> Point:
"""
Find point based on type and address
"""
for point in self.points:
if (
point.properties.type == objectType
and float(point.properties.address) == objectAddress
):
return point
raise ValueError(f"{objectType} {objectAddress} doesn't exist in controller")
[docs]
def find_overrides(self, force: bool = False) -> None:
if self._find_overrides_running and not force:
self.log(
f"Already running ({self._find_overrides_progress:.1%})... please wait.",
level="warning",
)
return
lst = []
self._find_overrides_progress = 0.0
self._find_overrides_running = True
total = len(self.points)
async def _find_overrides() -> None:
self._log.info("Overrides are being checked, wait for completion message.")
for idx, point in enumerate(self.points):
if await point.is_overridden:
lst.append(point)
self._find_overrides_progress = idx / total
self._log.info(
"Override check ready, results available in device.properties.points_overridden"
)
self.properties.points_overridden = lst
self._find_overrides_running = False
self._find_overrides_progress = 1.0
self.do(_find_overrides)
[docs]
def find_overrides_progress(self) -> float:
return self._find_overrides_progress
[docs]
def release_all_overrides(self, force: bool = False) -> None:
if self._release_overrides_running and not force:
self.log(
f"Already running ({self._release_overrides_progress:.1%})... please wait.",
level="warning",
)
return
self._release_overrides_running = True
self._release_overrides_progress = 0.0
async def _release_all_overrides() -> None:
self.find_overrides()
# def _progress_bar(progress, width=40):
# filled = int(width * progress)
# bar = 'â–ˆ' * filled + '-' * (width - filled)
# return f"[{bar}] {progress*100:5.1f}%"
while self._find_overrides_running:
# Provide progress feedback (e.g., log or update UI)
# bar = _progress_bar(self._find_overrides_progress)
# print(f"\rOverride check progress: {bar}", end='', flush=True)
await asyncio.sleep(0.2) # Adjust sleep for responsiveness
if self.properties.points_overridden:
total = len(self.properties.points_overridden)
self.log("=================================", level="info")
self.log("Overrides found... releasing them", level="info")
self.log("=================================", level="info")
for idx, point in enumerate(self.properties.points_overridden):
self.log(f"Releasing {point}", level="info")
await point.release_ovr()
self._release_overrides_progress = (idx / total) / 2 + 0.5
else:
self.log("No override found", level="info")
self._release_overrides_running = False
self._release_overrides_progress = 1
self.do(_release_all_overrides)
[docs]
def do(self, func: Any) -> None:
DoOnce(func).start()
def __repr__(self) -> str:
return f"{self.properties.name} / Undefined"
# def device(*args: Any, **kwargs: Any) -> Device:
# dev = Device(*args, **kwargs)
# t = asyncio.create_task(dev.new_state(DeviceDisconnected))
# dev.creation_task = t
# while not t.done:
# pass
# return dev
[docs]
async def device(*args: Any, **kwargs: Any) -> Device:
dev = Device(*args, **kwargs)
await dev.new_state(DeviceDisconnected)
return dev
# @fix_docs
[docs]
class DeviceConnected(Device):
"""
Find a device on the BACnet network. Set its state to 'connected'.
Once connected, all subsequent commands use this BACnet connection.
"""
async def _init_state(self):
await self._buildPointList()
self.properties.network.register_device(self)
if self.properties.pollDelay == 0:
_poll = DeviceOneShotPoll(self)
_poll.start()
# self.initialized = True
async def _disconnect(self, save_on_disconnect=True, unregister=True):
self.log(
f"Wait while stopping polling for {self.properties.name}", level="info"
)
self.poll(command="stop")
if unregister:
self.properties.network.unregister_device(self)
self.properties.network = None
if save_on_disconnect:
self.log(f"Saving {self.properties.name} to database...", level="info")
await self.save()
if self.properties.db_name:
await self.new_state(DeviceFromDB)
else:
await self.new_state(DeviceDisconnected)
[docs]
async def connect(self, *args, db=None, **kwargs):
"""
A connected device can be switched to 'database mode' where the device will
not use the BACnet network but instead obtain its contents from a previously
stored database.
"""
if db:
self.poll(command="stop")
self.properties.db_name = db.split(".")[0]
await self.new_state(DeviceFromDB)
else:
self._log.warning(
"Already connected, provide db arg if you want to connect to db"
)
[docs]
def df(self, list_of_points, force_read=True):
"""
When connected, calling DF should force a reading on the network.
"""
his = []
for point in list_of_points:
try:
his.append(self._findPoint(point, force_read=force_read).history)
except ValueError as ve:
self.log(f"Value Error : {ve}", level=logging.DEBUG)
continue
if not _PANDAS:
return dict(zip(list_of_points, his))
return pd.DataFrame(dict(zip(list_of_points, his)))
async def _buildPointList(self):
"""
Upon connection to build the device point list and properties.
"""
try:
self.properties.pss.value = await self.properties.network.read(
"{} device {} protocolServicesSupported".format(
self.properties.address, self.properties.device_id
)
)
except NoResponseFromController as error:
self.log(f"Controller not found, aborting. ({error})", level="error")
return ("Not Found", "", [], [])
except SegmentationNotSupported:
self.log("Segmentation not supported", level="warning")
self.segmentation_supported = False
await self.new_state(DeviceDisconnected)
self.properties.name = str(
await self.properties.network.read(
f"{self.properties.address} device {self.properties.device_id} objectName"
)
)
self.properties.vendor_id = await self.properties.network.read(
"{} device {} vendorIdentifier".format(
self.properties.address, self.properties.device_id
)
)
self._log.info(
"Device {}:[{}] found... building points list".format(
self.properties.device_id, self.properties.name
)
)
try:
(
self.properties.objects_list,
self.points,
self._list_of_trendlogs,
) = await self._discoverPoints(self.custom_object_list)
if self.properties.pollDelay is not None and self.properties.pollDelay > 0:
self.poll(delay=self.properties.pollDelay)
self.update_history_size(size=self.properties.history_size)
self._log.info(
f"Device {self.properties.name} | {self.properties.device_id} ready, use device_name.points and start interact with it"
)
# self.clear_histories()
except NoResponseFromController:
self.log("Cannot retrieve object list, disconnecting...", level="error")
self.segmentation_supported = False
await self.new_state(DeviceDisconnected)
except IndexError:
if self._reconnect_on_failure:
self.log("Device creation failed... re-connecting", level="error")
await self.new_state(DeviceDisconnected)
else:
self.log("Device creation failed... disconnecting", level="error")
def __getitem__(self, point_name):
"""
Allows the syntax: device['point_name'] or device[list_of_points]
If calling a list, last value will be used (won't read on the network)
for performance reasons.
If calling a simple point, point will be read via BACnet.
"""
try:
if isinstance(point_name, list):
return self.df(point_name, force_read=False)
elif isinstance(point_name, tuple):
_type, _address = point_name
for point in self.points:
if point.properties.type == _type and str(
point.properties.address
) == str(_address):
return point
else:
try:
return self._findPoint(point_name, force_read=False)
except ValueError:
try:
return self._findTrend(point_name)
except ValueError:
try:
if "@prop_" in point_name:
point_name = point_name.split("prop_")[1]
return self.read_property(
("device", self.properties.device_id, point_name)
)
else:
raise ValueError()
except ValueError:
raise ValueError()
except ValueError as ve:
self.log(f"Can't find {point_name} | {ve}", level="error")
def __iter__(self):
yield from self.points
def __contains__(self, value):
"""
Allows the syntax:
if "point_name" in device:
"""
return value in self.points_name
@property
def pollable_points_name(self):
for each in self.points:
if not isinstance(each, VirtualPoint):
yield each.properties.name
else:
continue
@property
def points_name(self):
for each in self.points:
yield each.properties.name
def __setitem__(self, point_name, value):
"""
Allows the syntax:
device['point_name'] = value
"""
try:
loop = asyncio.get_event_loop()
loop.create_task(self._findPoint(point_name)._set(value))
except WritePropertyException as ve:
self.log(f"{ve}", level="error")
def __len__(self):
"""
Length of a device = number of points
"""
return len(self.points)
def _parseArgs(self, arg):
args = arg.split()
pointName = " ".join(args[:-1])
value = args[-1]
return (pointName, value)
@property
def analog_units(self):
"""
Shortcut to retrieve all analog points units [Used by Bokeh trending feature]
"""
au = []
us = []
for each in self.points:
if isinstance(each, NumericPoint):
au.append(each.properties.name)
us.append(each.properties.units_state)
return dict(zip(au, us))
@property
def temperatures(self):
for each in self.analog_units.items():
if "deg" in each[1]:
yield each
@property
def percent(self):
for each in self.analog_units.items():
if "percent" in each[1]:
yield each
@property
def multi_states(self):
ms = []
us = []
for each in self.points:
if isinstance(each, EnumPoint):
ms.append(each.properties.name)
us.append(each.properties.units_state)
return dict(zip(ms, us))
@property
def binary_states(self):
bs = []
us = []
for each in self.points:
if isinstance(each, BooleanPoint):
bs.append(each.properties.name)
us.append(each.properties.units_state)
return dict(zip(bs, us))
def _findPoint(self, name, force_read=False):
"""
Used by getter and setter functions
"""
for point in self.points:
if point.properties.name == name:
if force_read:
point.value
return point
raise ValueError(f"{name} doesn't exist in controller")
def _trendlogs(self):
for k, v in self._list_of_trendlogs.items():
name, trendlog = v
yield trendlog
@property
def trendlogs_names(self):
for each in self._trendlogs():
yield each.properties.object_name
@property
def trendlogs(self):
return list(self._trendlogs())
def _findTrend(self, name):
for trend in self._trendlogs():
if trend.properties.object_name == name:
return trend
raise ValueError(f"{name} doesn't exist in controller")
[docs]
async def read_property(self, prop) -> Any:
# if instance == -1:
# pass
if isinstance(prop, tuple):
_obj, _instance, _prop = prop
elif isinstance(prop, str):
_obj = "device"
_instance = self.properties.device_id
_prop = prop
else:
raise ValueError(
"Please provide property using tuple with object, instance and property"
)
try:
request = f"{self.properties.address} {_obj} {_instance} {_prop}"
val = await self.properties.network.read(
request, vendor_id=self.properties.vendor_id
)
except KeyError as error:
raise Exception(f"Unknown property : {error}")
return val
[docs]
async def write_property(self, prop, value, priority=None):
if prop == "description":
self.update_description(value)
else:
if priority is not None:
priority = f"- {priority}"
if isinstance(prop, tuple):
_obj, _instance, _prop = prop
else:
raise ValueError(
"Please provide property using tuple with object, instance and property"
)
try:
request = "{} {} {} {} {} {}".format(
self.properties.address, _obj, _instance, _prop, value, priority
)
val = await self.properties.network.write(
request, vendor_id=self.properties.vendor_id
)
except KeyError as error:
raise Exception(f"Unknown property : {error}")
return val
[docs]
async def update_bacnet_properties(self):
"""
Retrieve bacnet properties for this device
"""
try:
res = await self.properties.network.readMultiple(
"{} device {} all".format(
self.properties.address, str(self.properties.device_id)
),
vendor_id=self.properties.vendor_id,
show_property_name=True,
)
for each in res:
if not each:
continue
v, prop = each
self.properties.bacnet_properties[prop] = v
except Exception as e:
raise Exception(f"Problem reading : {self.properties.name} | {e}")
async def _bacnet_properties(self, update=False) -> Dict[PropertyIdentifier, Any]:
if not self.properties.bacnet_properties or update:
await self.update_bacnet_properties()
return self.properties.bacnet_properties
@property
async def bacnet_properties(self) -> Dict:
return await self._bacnet_properties(update=True)
[docs]
async def update_description(self, value):
await self.properties.network.send_text_write_request(
addr=self.properties.address,
obj_type="device",
obj_inst=int(self.device_id),
value=value,
prop_id="description",
)
self.properties.description = str(await self.read_property("description"))
[docs]
async def ping(self):
try:
if await self.read_property("objectName") == self.properties.name:
self.properties.ping_failures = 0
return True
else:
self.properties.ping_failures += 1
return False
except NoResponseFromController as e:
self.log(
f"{self.properties.name} ({self.properties.address})| Ping failure ({e}).",
level="error",
)
self.properties.ping_failures += 1
return False
def __repr__(self):
return f"{self.properties.name} / Connected"
# ------------------------------------------------------------------------------
[docs]
class RPDeviceConnected(DeviceConnected, ReadProperty):
"""
[Device state] If device is connected but doesn't support ReadPropertyMultiple
BAC0 will not poll such points automatically (since it would cause excessive network traffic).
Instead manual polling must be used as needed via the poll() function.
"""
def __str__(self):
return "connected [for ReadProperty]"
[docs]
class RPMDeviceConnected(DeviceConnected, ReadPropertyMultiple):
"""
[Device state] If device is connected and supports ReadPropertyMultiple
"""
def __str__(self):
return "connected [for ReadPropertyMultiple]"
# @fix_docs
[docs]
class DeviceDisconnected(Device):
"""
[Device state] Initial state of a device. Disconnected from BACnet.
"""
async def _init_state(self):
# self.initialized = False
await self.connect()
[docs]
async def connect(self, *args, network=None, db=None, **kwargs):
"""
Attempt to connect to device. If unable, attempt to connect to a controller database
(so the user can use previously saved data).
"""
if network:
self.properties.network = network
if not self.properties.network:
self.log("No network...calling DeviceFromDB", level="debug")
if db:
await self.new_state(DeviceFromDB)
self.log(
'You can reconnect to network using : "device.connect(network=bacnet)"',
level="info",
)
else:
try:
segmentation = await self.properties.network.read(
"{} device {} segmentationSupported".format(
self.properties.address, self.properties.device_id
)
)
self.segmentation_supported = (
False if segmentation and segmentation.numerator == 3 else True
)
if self.segmentation_supported:
await self.new_state(RPMDeviceConnected)
else:
await self.new_state(RPDeviceConnected)
except SegmentationNotSupported:
self.segmentation_supported = False
self._log.warning(
"Segmentation not supported.... expect slow responses."
)
await self.new_state(RPDeviceConnected)
except (NoResponseFromController, AttributeError) as error:
self._log.warning(f"BAC0 got no response from controller: {error}")
if self.properties.db_name:
await self.new_state(DeviceFromDB)
else:
self._log.warning(
"Offline: provide database name to load stored data."
)
self._log.warning("Ex. controller.connect(db = 'backup')")
[docs]
def df(self, list_of_points, force_read=True):
raise DeviceNotConnected("Must connect to BACnet or database")
@property
def simulated_points(self):
for each in self.points:
if each.properties.simulated:
yield each
def _buildPointList(self):
raise DeviceNotConnected("Must connect to BACnet or database")
# This should be a "read" function and rpm defined in state rpm
[docs]
def read_multiple(
self, points_list, *, points_per_request=25, discover_request=(None, 6)
):
raise DeviceNotConnected("Must connect to BACnet or database")
[docs]
def poll(self, command="start", *, delay=10):
raise DeviceNotConnected("Must connect to BACnet or database")
def __getitem__(self, point_name):
raise DeviceNotConnected("Must connect to BACnet or database")
def __iter__(self):
raise DeviceNotConnected("Must connect to BACnet or database")
def __contains__(self, value):
raise DeviceNotConnected("Must connect to BACnet or database")
@property
def points_name(self):
raise DeviceNotConnected("Must connect to BACnet or database")
[docs]
def to_excel(self):
raise DeviceNotConnected("Must connect to BACnet or database")
def __setitem__(self, point_name, value):
raise DeviceNotConnected("Must connect to BACnet or database")
def __len__(self):
raise DeviceNotConnected("Must connect to BACnet or database")
@property
def analog_units(self):
raise DeviceNotConnected("Must connect to BACnet or database")
@property
def temperatures(self):
raise DeviceNotConnected("Must connect to BACnet or database")
@property
def percent(self):
raise DeviceNotConnected("Must connect to BACnet or database")
@property
def multi_states(self):
raise DeviceNotConnected("Must connect to bacnet or database")
@property
def binary_states(self):
raise DeviceNotConnected("Must connect to BACnet or database")
def _discoverPoints(self, custom_object_list=None):
raise DeviceNotConnected("Must connect to BACnet or database")
def _findPoint(self, name, force_read=True):
raise DeviceNotConnected("Must connect to BACnet or database")
def __repr__(self):
return f"{self.properties.name} / Disconnected"
# ------------------------------------------------------------------------------
# @fix_docs
[docs]
class DeviceFromDB(DeviceConnected):
"""
[Device state] Where requests for a point's present value returns the last
valid value from the point's history.
"""
async def _init_state(self):
try:
await self.initialize_device_from_db()
except ValueError as e:
self.log(f"Problem with DB initialization : {e}", level="error")
# self.new_state(DeviceDisconnected)
raise
[docs]
async def connect(self, *args, network=None, from_backup=None, **kwargs):
"""
In DBState, a device can be reconnected to BACnet using:
device.connect(network=bacnet) (bacnet = BAC0.connect())
"""
if network and from_backup:
raise WrongParameter("Please provide network OR from_backup")
elif network:
self.log("Network provided... trying to connect", level="debug")
self.properties.network = network
try:
name = await self.properties.network.read(
"{} device {} objectName".format(
self.properties.address, self.properties.device_id
)
)
segmentation = await self.properties.network.read(
"{} device {} segmentationSupported".format(
self.properties.address, self.properties.device_id
)
)
segmentation_supported = False if segmentation.numerator == 3 else True
if name:
if segmentation_supported:
self.log("Segmentation supported, connecting...", level="debug")
await self.new_state(RPMDeviceConnected)
else:
self.log(
"Segmentation not supported, connecting...", level="debug"
)
await self.new_state(RPDeviceConnected)
# self.db.close()
except (NoResponseFromController, NoResponse):
self.log("Unable to connect, keeping DB mode active", level="error")
else:
self.log("Not connected, open DB", level="debug")
if from_backup:
self.properties.db_name = from_backup.split(".")[0]
await self._init_state()
[docs]
async def initialize_device_from_db(self):
self.log(f"Initializing DB for {self.properties.name}", level="info")
# Save important properties for reuse
if self.properties.db_name:
dbname = self.properties.db_name
try:
self._props = self.read_dev_prop(self.properties.db_name)
except ValueError:
raise ValueError(f"Can't find {self.properties.db_name} on drive")
else:
self.log(f"Missing argument DB for {self.properties.name}", level="info")
raise ValueError(
f"Please provide db name using device.load_db('name') for {self.properties.name}"
)
# network = self.properties.network
pss = self.properties.pss
self.points = []
for point in await self.points_from_sql(self.properties.db_name):
try:
self.points.append(OfflinePoint(self, point))
except RemovedPointException:
continue
self.properties = DeviceProperties()
self.properties.db_name = dbname
self.properties.address = self._props["address"]
self.properties.device_id = self._props["device_id"]
self.properties.network = None
self.properties.pollDelay = self._props["pollDelay"]
self.properties.name = self._props["name"]
self.properties.objects_list = self._props["objects_list"]
self.properties.pss = pss
self.properties.serving_chart = {}
self.properties.charts = []
self.properties.multistates = self._props["multistates"]
self.properties.auto_save = self._props["auto_save"]
self.properties.save_resampling = self._props["save_resampling"]
self.properties.clear_history_on_save = self._props["clear_history_on_save"]
self.properties.default_history_size = self._props["history_size"]
self.log(f"{self.properties.name} restored from db", level="info")
self.log(
'You can reconnect to network using : "device.connect(network=bacnet)"',
level="info",
)
@property
def simulated_points(self):
raise DeviceNotConnected("Must connect to BACnet or database")
def _buildPointList(self):
raise DeviceNotConnected("Must connect to BACnet or database")
# This should be a "read" function and rpm defined in state rpm
[docs]
def read_multiple(
self, points_list, *, points_per_request=25, discover_request=(None, 6)
):
raise DeviceNotConnected("Must connect to BACnet or database")
[docs]
def poll(self, command="start", *, delay=10):
raise DeviceNotConnected("Must connect to BACnet or database")
def __contains__(self, value):
raise DeviceNotConnected("Must connect to BACnet or database")
[docs]
def to_excel(self):
raise DeviceNotConnected("Must connect to BACnet or database")
def __setitem__(self, point_name, value):
raise DeviceNotConnected("Must connect to BACnet or database")
def _discoverPoints(self, custom_object_list=None):
raise DeviceNotConnected("Must connect to BACnet or database")
def __repr__(self):
return f"{self.properties.name} / Disconnected"
# ------------------------------------------------------------------------------
[docs]
class DeviceLoad(DeviceFromDB):
def __init__(self, filename=None):
if filename:
Device.__init__(self, None, None, None, from_backup=filename)
else:
raise Exception("Please provide backup file as argument")