[docs]asyncdefrun(self):self._lite._log.debug(f"Subscribing to COV for {self.address} | {self.obj_identifier} -> {self._app}")try:asyncwithself._app.change_of_value(self.address,self.obj_identifier,self.process_identifier,self.confirmed,self.lifetime,)asscm:cov_fini_task_monitor=asyncio.create_task(self.cov_fini.wait())whilenotself.cov_fini.is_set():incoming:asyncio.Future=asyncio.ensure_future(scm.get_value())done,pending=awaitasyncio.wait([incoming,],return_when=asyncio.FIRST_COMPLETED,)fortaskinpending:task.cancel()ifincomingindone:property_identifier,property_value=incoming.result()self._lite._log.debug(f"COV notification received for {self.address}|{self.obj_identifier} | {property_identifier} -> {type(property_identifier)} with value {property_value} | {property_value} -> {type(property_value)}")ifself.callbackisnotNone:self._lite._log.debug(f"Calling callback for {property_identifier}")ifasyncio.iscoroutinefunction(self.callback):asyncio.create_task(self.callback(property_identifier=property_identifier,property_value=property_value,))elifhasattr(self.callback,"__call__"):self.callback(property_identifier=property_identifier,property_value=property_value,)else:self._lite._log.error(f"Callback {self.callback} is not callable")awaitcov_fini_task_monitorexceptExceptionase:self._lite._log.error(f"Error in COV subscription : {e}")
[docs]defstop(self):self._lite._log.debug(f"Stopping COV subscription class for {self.address} | {self.obj_identifier}")self.cov_fini.set()