import asyncio
from typing import List, Optional, Tuple, Union
from bacpypes3.app import Application
from bacpypes3.netservice import RouterEntryStatus
from bacpypes3.pdu import Address, GlobalBroadcast
from BAC0.core.app.asyncApp import BAC0Application
from ...core.utils.notes import note_and_log
ROUTER_TUPLE_TYPE = Union[
Tuple[Union[Address, str], Union[int, List[int]]],
Tuple[Union[Address, str], Union[int, List[int]], Optional[int]],
]
[docs]
@note_and_log
class Alias:
"""
Bacpypes3 now offer a wide range a functions out of the box
This mixin bring them to the BAC0 app so it's easy to use
"""
[docs]
async def who_is(self, address=None, low_limit=0, high_limit=4194303, timeout=3):
"""
Build a WhoIs request. WhoIs requests are sent to discover devices on the network.
If an address is specified, the request is sent to that address. Otherwise,
the request is broadcast to the local network.
:param address: (optional) The address to send the request to.
:param destination: (optional) The destination address.
:returns: List of IAm responses.
Example::
import BAC0
bacnet = BAC0.lite()
bacnet.whois()
bacnet.whois('2:5')
"""
_iams = await self.this_application.app.who_is(
address=Address(address) if address else None,
low_limit=low_limit,
high_limit=high_limit,
timeout=timeout,
)
return _iams
[docs]
def iam(self, address=None):
"""
Build an IAm response. IAm are sent in response to a WhoIs request that;
matches our device ID, whose device range includes us, or is a broadcast.
Content is defined by the script (deviceId, vendor, etc...)
Example::
iam()
"""
_this_application: BAC0Application = self.this_application
_app: Application = _this_application.app
self.log("do_iam", level="debug")
_app.i_am(address=address)
[docs]
async def whois_router_to_network(
self, network=None, *, destination=None, timeout=3, global_broadcast=False
):
"""
Send a Who-Is-Router-To-Network request. This request is used to discover routers
on the network that can route messages to a specific network.
The function sends a broadcast message to the local network to find routers that
can route messages to the specified network. The response will contain information
about the routers that can handle the routing.
Example::
whois_router_to_network()
"""
# build a request
_this_application: BAC0Application = self.this_application
_app: Application = _this_application.app
if destination and not isinstance(destination, Address):
destination = Address(destination)
elif global_broadcast:
destination = GlobalBroadcast()
try:
network_numbers = await asyncio.wait_for(
_app.nse.who_is_router_to_network(
destination=destination, network=network
),
timeout,
)
return network_numbers
except asyncio.TimeoutError:
# Handle the timeout error
self.log(
"Request timed out for whois_router_to_network, no response",
level="warning",
)
return []
[docs]
async def init_routing_table(self, address=None):
"""
irt <addr>
Send an empty Initialize-Routing-Table message to an address, a router
will return an acknowledgement with its routing table configuration.
"""
# build a request
self.log(f"Addr : {address}", level="info")
_this_application: BAC0Application = self.this_application
_app: Application = _this_application.app
if address is not None and not isinstance(address, Address):
address = Address(address)
await _app.nse.initialize_routing_table(destination=address)
[docs]
async def use_router(
self,
router_infos: Union[
Tuple[Union[Address, str], Union[int, List[int]]],
Tuple[Union[Address, str], Union[int, List[int]], Optional[int]],
] = (None, [], None),
):
address, dnets = router_infos[:2]
try:
snet = router_infos[2]
except IndexError:
snet = None
_this_application: BAC0Application = self.this_application
_app: Application = _this_application.app
if not isinstance(address, Address):
address = Address(address)
if not isinstance(dnets, list):
dnets = [dnets]
response = await self.who_is(address=address)
if response:
self._log.info(f"Router found at {address}")
self._log.info(
f"Adding router reference -> Snet : {snet} Addr : {address} dnets : {dnets}"
)
await _app.nsap.update_router_references(
snet=snet, address=address, dnets=dnets
)
self._ric = self.this_application.app.nsap.router_info_cache
self._log.info(
f"Updating router info cache -> Snet : {snet} Addr : {address} dnets : {dnets}"
)
for each in dnets:
await self._ric.set_path_info(
snet, each, address, RouterEntryStatus.available
)
_this_application._learnedNetworks.add(each)
else:
self._log.warning(f"Router not found at {address}")
[docs]
async def what_is_network_number(self, destination=None, timeout=3):
"""
winn [ <addr> ]
Send a What-Is-Network-Number message. If the address is unspecified
the message is locally broadcast.
"""
# build a request
# self.log("Addr : {}".format(address), level='info')
_this_application: BAC0Application = self.this_application
_app: Application = _this_application.app
try:
network_number = await asyncio.wait_for(
_app.nse.what_is_network_number(), timeout
)
return network_number
except asyncio.TimeoutError:
# Handle the timeout error
self.log(
"Request timed out for what_is_network_number, no response",
level="warning",
)
return None
[docs]
async def whohas(
self,
object_id=None,
object_name=None,
low_limit=0,
high_limit=4194303,
destination=None,
timeout=5,
):
"""
Build a WhoHas request.
:param object_id: (optional) The address to send the request to, if unused object_name must be present.
:param object_name: (optional) The address to send the request to, if unused object_id must be present.
:param destination: (optional) The destination address, if empty local broadcast will be used.
:param timeout: (optional) The timeout for the WhoHas.
:returns: IAm response.
Example::
import BAC0
bacnet = BAC0.lite()
bacnet.whohas(object_name='SomeDevice')
"""
_ihave = await self.this_application.app.who_has(
object_identifier=object_id,
object_name=object_name,
low_limit=low_limit,
high_limit=high_limit,
address=Address(destination) if destination else None,
timeout=timeout,
)
return _ihave