#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
# Licensed under LGPLv3, see file LICENSE in this source tree.
#
# --- standard Python modules ---
# --- 3rd party modules ---
from collections import namedtuple
from typing import Any, Dict, List, Optional, Tuple, Union
from bacpypes3.primitivedata import Date, Time
# --- this application's modules ---
from ..utils.notes import note_and_log
from ..utils.lookfordependency import pandas_if_available
_PANDAS, pd, _, _ = pandas_if_available()
# ------------------------------------------------------------------------------
HistoryComponent = namedtuple("HistoryComponent", "index logdatum status choice")
[docs]
class TrendLogProperties(object):
"""
A container for trend properties
"""
def __init__(self):
self.device: Optional[Any] = None
self.oid: Optional[Any] = None
self.object_name: Optional[str] = None
self.description: str = ""
self.log_device_object_property: Optional[Any] = None
self.buffer_size: int = 0
self.record_count: int = 0
self.total_record_count: int = 0
self.log_interval: int = 0
self.statusFlags: Optional[Any] = None
self.status_flags: Dict[str, bool] = {
"in_alarm": False,
"fault": False,
"overridden": False,
"out_of_service": False,
}
self._history_components: List[HistoryComponent] = []
self._df = None
self.type: str = "TrendLog"
self.units_state: str = "None"
def __repr__(self):
return "{} | Descr : {} | Record count : {}".format(
self.object_name, self.description, self.record_count
)
@property
def name(self) -> Optional[str]:
return self.object_name
@note_and_log
class _TrendLog(TrendLogProperties):
"""
BAC0 simplification of TrendLog Object
"""
def __init__(
self,
OID: Any,
device: Optional[Any] = None,
read_log_on_creation: bool = True,
multiple_request: Optional[Any] = None,
):
self.properties: TrendLogProperties = TrendLogProperties()
self.properties.device = device
self.properties.oid = OID
self.update_properties_task: Optional[Any] = None
self._last_index: int = 0
if read_log_on_creation:
self.read_log_buffer_task: Optional[Any] = None
@staticmethod
def read_logDatum(logDatum: Any) -> Optional[Tuple[str, Any]]:
for k, v in logDatum.__dict__.items():
if v is None:
continue
else:
return (k, v)
return None
async def update_properties(self) -> None:
try:
(
self.properties.object_name,
self.properties.description,
self.properties.record_count,
self.properties.buffer_size,
self.properties.total_record_count,
self.properties.statusFlags,
self.properties.log_interval,
) = await self.properties.device.properties.network.readMultiple(
"{addr} trendLog {oid} objectName description recordCount bufferSize totalRecordCount statusFlags logInterval".format(
addr=self.properties.device.properties.address,
oid=str(self.properties.oid),
)
)
self.properties.description = str(self.properties.description)
except Exception as error:
raise Exception(f"Problem reading trendLog informations: {error}")
async def _total_record_count(self) -> int:
self.properties.total_record_count = (
await self.properties.device.properties.network.read(
"{addr} trendLog {oid} totalRecordCount".format(
addr=self.properties.device.properties.address,
oid=str(self.properties.oid),
)
)
)
return self.properties.total_record_count
async def read_log_buffer(self) -> None:
RECORDS = 10
log_buffer = set()
_actual_index = await self._total_record_count()
start = max(_actual_index - self.properties.record_count + 1, self._last_index)
_count = max(_actual_index - start, 0)
steps = int(_count / RECORDS) + int(1 if (_count % RECORDS) > 0 else 0)
self.log(f"Reading log : {start} {_count} {steps}", level="debug")
_from = start
for each in range(steps):
range_params = ("s", _from, Date("1979-01-01"), Time("00:00"), RECORDS)
_chunk = await self.properties.device.properties.network.readRange(
"{} trendLog {} logBuffer".format(
self.properties.device.properties.address, str(self.properties.oid)
),
range_params=range_params,
)
_from += len(_chunk)
for chunk in _chunk:
log_buffer.add(chunk)
self._last_index = _from
self.create_dataframe(log_buffer)
def create_dataframe(self, log_buffer: set) -> None:
for each in log_buffer:
year, month, day, dow = each.timestamp.date
year = year + 1900
hours, minutes, seconds, ms = each.timestamp.time
seconds = 0 if seconds == 255 else seconds
ms = 0 if ms == 255 else ms
_index = pd.to_datetime(
f"{year}-{month}-{day} {hours}:{minutes}:{seconds}.{ms}",
format="%Y-%m-%d %H:%M:%S.%f",
)
_choice, _logDatum = self.read_logDatum(each.logDatum)
_status = each.statusFlags
self._log.debug(f"{_index}, {_logDatum}, {_status}, {_choice}")
his_component = HistoryComponent(_index, _logDatum, _status, _choice)
if his_component not in self.properties._history_components:
self.properties._history_components.append(his_component)
if _PANDAS:
df = pd.DataFrame(
{
"index": [
each.index for each in self.properties._history_components
],
self.properties.object_name: [
each.logdatum for each in self.properties._history_components
],
"status": [
each.status for each in self.properties._history_components
],
"choice": [
each.choice for each in self.properties._history_components
],
}
)
df = df.set_index("index")
# df["choice"] = _choice
# df[self.properties.object_name] = df['logDatum']
self.properties._df = df
else:
# self.properties._history_components = (self.index, self.logdatum, self.status)
self._log.warning(
"Pandas not installed. Treating histories as simple list."
)
@property
async def history(self) -> Union[Dict, Any]:
await self.read_log_buffer()
if not _PANDAS or self.properties._df is None:
return dict(
zip(
[each.index for each in self.properties._history_components],
[each.logDatum for each in self.properties._history_components],
)
)
try:
if not self.properties.log_device_object_property:
self.properties.log_device_object_property = (
await self.properties.device.properties.network.read(
"{addr} trendLog {oid} logDeviceObjectProperty".format(
addr=self.properties.device.properties.address,
oid=str(self.properties.oid),
)
)
)
(
objectType,
objectAddress,
) = self.properties.log_device_object_property.objectIdentifier
logged_point = self.properties.device.find_point(objectType, objectAddress)
except (Exception, ValueError):
logged_point = None
serie = self.properties._df[self.properties.object_name].copy()
serie.units = logged_point.properties.units_state if logged_point else "n/a"
serie.name = ("{}/{}").format(
self.properties.device.properties.name, self.properties.object_name
)
if not logged_point:
serie.states = "unknown"
serie.datatype = None
else:
if logged_point.properties.name in self.properties.device.binary_states:
serie.states = "binary"
elif logged_point.properties.name in self.properties.device.multi_states:
serie.states = "multistates"
else:
serie.states = "analog"
serie.datatype = objectType
serie.description = self.properties.description
return serie.sort_index()
def chart(self, remove: bool = False) -> None:
"""
Add point to the bacnet trending list
"""
if not _PANDAS:
self._log.error(
"Pandas must be installed to use live chart feature. See documentation how how to run BAC0 in complete mode"
)
else:
if remove:
self.properties.device.properties.network.remove_trend(self)
else:
self.properties.device.properties.network.add_trend(self)
def __repr__(self):
return self.properties.__repr__()
[docs]
def TrendLog(*args, **kwargs) -> _TrendLog:
trend = _TrendLog(*args, **kwargs)
# update_properties_task = Task(trend.update_properties())
# update_properties_task.start()
# while not update_properties_task.done:
# pass
# if "read_log_on_creation" in args:
# trend.read_log_buffer_task = Task(trend.read_log_buffer())
return trend