-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
__init__.py
893 lines (780 loc) · 36.7 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
"""
Custom integration to integrate Bermuda BLE Trilateration with Home Assistant.
For more details about this integration, please refer to
https://github.com/agittins/bermuda
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from datetime import timedelta
import voluptuous as vol
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth import MONOTONIC_TIME
from homeassistant.components.bluetooth import BluetoothChange
from homeassistant.components.bluetooth import BluetoothScannerDevice
from homeassistant.components.bluetooth import BluetoothServiceInfoBleak
from homeassistant.components.bluetooth.active_update_coordinator import (
PassiveBluetoothDataUpdateCoordinator,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_HOME
from homeassistant.const import STATE_NOT_HOME
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import Config
from homeassistant.core import HomeAssistant
from homeassistant.core import SupportsResponse
from homeassistant.core import callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import area_registry
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.device_registry import DeviceEntry
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.util import slugify
from homeassistant.util.dt import get_age
from homeassistant.util.dt import now
from .const import ADVERT_FRESHTIME
from .const import CONF_ATTENUATION
from .const import CONF_DEVICES
from .const import CONF_DEVTRACK_TIMEOUT
from .const import CONF_MAX_RADIUS
from .const import CONF_REF_POWER
from .const import CONFDATA_SCANNERS
from .const import DEFAULT_ATTENUATION
from .const import DEFAULT_DEVTRACK_TIMEOUT
from .const import DEFAULT_MAX_RADIUS
from .const import DEFAULT_REF_POWER
from .const import DOMAIN
from .const import HIST_KEEP_COUNT
from .const import PLATFORMS
from .const import SIGNAL_DEVICE_NEW
from .const import STARTUP_MESSAGE
# from typing import TYPE_CHECKING
# from bthome_ble import BTHomeBluetoothDeviceData
# if TYPE_CHECKING:
# from bleak.backends.device import BLEDevice
SCAN_INTERVAL = timedelta(seconds=5)
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
_LOGGER: logging.Logger = logging.getLogger(__package__)
async def async_setup(
hass: HomeAssistant, config: Config
): # pylint: disable=unused-argument;
"""Setting up this integration using YAML is not supported."""
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Set up this integration using UI."""
if hass.data.get(DOMAIN) is None:
hass.data.setdefault(DOMAIN, {})
_LOGGER.info(STARTUP_MESSAGE)
coordinator = BermudaDataUpdateCoordinator(hass, entry)
await coordinator.async_refresh()
if not coordinator.last_update_success:
raise ConfigEntryNotReady
hass.data[DOMAIN][entry.entry_id] = coordinator
for platform in PLATFORMS:
coordinator.platforms.append(platform)
hass.async_add_job(
hass.config_entries.async_forward_entry_setup(entry, platform)
)
entry.add_update_listener(async_reload_entry)
return True
def rssi_to_metres(rssi, ref_power=None, attenuation=None):
"""Convert instant rssi value to a distance in metres
Based on the information from
https://mdpi-res.com/d_attachment/applsci/applsci-10-02003/article_deploy/applsci-10-02003.pdf?version=1584265508
attenuation: a factor representing environmental attenuation
along the path. Will vary by humidity, terrain etc.
ref_power: db. measured rssi when at 1m distance from rx. The will
be affected by both receiver sensitivity and transmitter
calibration, antenna design and orientation etc.
TODO: the ref_power and attenuation figures can/should probably be mapped
against each receiver and transmitter for variances. We could also fine-
tune the attenuation in real time based on changing values coming from
known-fixed beacons (eg thermometers, window sensors etc)
"""
if ref_power is None:
return False
# ref_power = self.ref_power
if attenuation is None:
return False
# attenuation= self.attenuation
distance = 10 ** ((ref_power - rssi) / (10 * attenuation))
return distance
class BermudaPBDUCoordinator(
PassiveBluetoothDataUpdateCoordinator,
):
"""Class for receiving bluetooth adverts in realtime
Looks like this needs to be run through setup, with a specific
BLEDevice (address) already specified, so won't do "all adverts"
We (plan to) use it to capture each monitored addresses' events so we can update more
frequently than UPDATE_INTERVAL, and respond in realtime.
The co-ordinator will create one of these for each device we create
sensors for, and this will just call the coordinator's update routine
whenever something changes.
WORK IN PROGRESS: this doesn't currently do anything.
"""
def __init__(
self,
hass: HomeAssistant,
logger: logging.Logger,
address: str,
device: BermudaDevice,
coordinator: BermudaDataUpdateCoordinator,
) -> None:
"""Init"""
super().__init__(
hass=hass,
logger=logger,
address=address,
mode=bluetooth.BluetoothScanningMode.PASSIVE,
connectable=False,
)
self.device = device
self.coordinator = coordinator
@callback
def _async_handle_unavailable(
self, service_info: BluetoothServiceInfoBleak
) -> None:
return super()._async_handle_unavailable(service_info)
@callback
def _async_handle_bluetooth_event(
self, service_info: BluetoothServiceInfoBleak, change: BluetoothChange
) -> None:
_LOGGER.warning(
"Update triggered by device %s (this is a good thing)", self.device.name
)
self.coordinator.async_refresh()
return super()._async_handle_bluetooth_event(service_info, change)
class BermudaDeviceScanner(dict):
"""Represents details from a scanner relevant to a specific device
A BermudaDevice will contain 0 or more of these depending on whether
it has been "seen" by that scanner.
Note that details on a scanner itself are BermudaDevice instances
in their own right.
"""
def __init__(
self,
device_address: str,
scandata: BluetoothScannerDevice,
area_id: str,
options,
):
# I am declaring these just to control their order in the dump,
# which is a bit silly, I suspect.
self.name: str = scandata.scanner.name
self.area_id: str = area_id
self.stamp: float = 0
self.hist_stamp = []
self.rssi: float = None
self.hist_rssi = []
self.hist_distance = []
self.hist_interval = []
self.stale_update_count = (
0 # How many times we did an update but no new stamps were found.
)
self.tx_power: float = None
# Just pass the rest on to update...
self.update(device_address, scandata, area_id, options)
def update(
self,
device_address: str,
scandata: BluetoothScannerDevice,
area_id: str,
options,
):
# We over-write pretty much everything, except our locally-preserved stats.
#
# In case the scanner has changed it's details since startup though:
self.name: str = scandata.scanner.name
self.area_id: str = area_id
# Only remote scanners log timestamps here (local usb adaptors do not),
if hasattr(scandata.scanner, "_discovered_device_timestamps"):
# Found a remote scanner which has timestamp history...
scanner_sends_stamps = True
# FIXME: Doesn't appear to be any API to get this otherwise...
# pylint: disable-next=protected-access
stamps = scandata.scanner._discovered_device_timestamps
# In this dict all MAC address keys are upper-cased
uppermac = device_address.upper()
if uppermac in stamps:
if stamps[uppermac] > self.stamp:
have_new_stamp = True
new_stamp = stamps[uppermac]
else:
# We have no updated advert in this run.
have_new_stamp = False
self.stale_update_count += 1
else:
# This shouldn't happen, as we shouldn't have got a record
# of this scanner if it hadn't seen this device.
_LOGGER.error(
"Scanner %s has no stamp for %s - very odd.",
scandata.scanner.source,
device_address,
)
have_new_stamp = False
else:
# Not a bluetooth_proxy device / remote scanner, but probably a USB Bluetooth adaptor.
# We don't get advertisement timestamps from bluez, so currently there's no way to
# reliably include it in our calculations.
scanner_sends_stamps = False
# But if the rssi has changed from last time, consider it "new"
if self.rssi != scandata.advertisement.rssi:
# Since rssi has changed, we'll consider this "new", but
# since it could be pretty much any age, make it a multiple
# of freshtime. This means it can still be useful for home/away
# detection in device_tracker, but won't factor in to area localisation.
have_new_stamp = True
new_stamp = MONOTONIC_TIME() - (ADVERT_FRESHTIME * 4)
else:
have_new_stamp = False
if len(self.hist_stamp) == 0 or have_new_stamp:
# We load anything the first time 'round, but from then on
# we are more cautious.
self.rssi: float = scandata.advertisement.rssi
self.hist_rssi.insert(0, self.rssi)
self.rssi_distance: float = rssi_to_metres(
self.rssi,
options.get(CONF_REF_POWER, DEFAULT_REF_POWER),
options.get(CONF_ATTENUATION, DEFAULT_ATTENUATION),
)
self.hist_distance.insert(0, self.rssi_distance)
self.hist_interval.insert(0, new_stamp - self.stamp)
# Stamp will be faked from above if required.
if have_new_stamp:
self.stamp = new_stamp
self.hist_stamp.insert(0, self.stamp)
# Safe to update these values regardless of stamps...
self.adapter: str = scandata.scanner.adapter
self.source: str = scandata.scanner.source
if (
self.tx_power is not None
and scandata.advertisement.tx_power != self.tx_power
):
# Not really an erorr, we just don't account for this happening -
# I want to know if it does.
# AJG 2024-01-11: This does happen. Looks like maybe apple devices?
# Changing from warning to debug to quiet users' logs.
_LOGGER.debug(
"Device changed TX-POWER! That was unexpected: %s %sdB",
device_address,
scandata.advertisement.tx_power,
)
self.tx_power: float = scandata.advertisement.tx_power
self.adverts: dict[str, bytes] = scandata.advertisement.service_data.items()
self.scanner_sends_stamps = scanner_sends_stamps
self.options = options
# Trim our history lists
for histlist in (
self.hist_distance,
self.hist_interval,
self.hist_rssi,
self.hist_stamp,
):
del histlist[HIST_KEEP_COUNT:]
def to_dict(self):
"""Convert class to serialisable dict for dump_devices"""
out = {}
for var, val in vars(self).items():
if var == "adverts":
val = {}
for uuid, thebytes in self.adverts:
val[uuid] = thebytes.hex()
out[var] = val
return out
class BermudaDevice(dict):
"""This class is to represent a single bluetooth "device" tracked by Bermuda.
"device" in this context means a bluetooth receiver like an ESPHome
running bluetooth_proxy or a bluetooth transmitter such as a beacon,
a thermometer, watch or phone etc.
We're not storing this as an Entity because we don't want all devices to
become entities in homeassistant, since there might be a _lot_ of them.
"""
def __init__(self, address, options):
"""Initial (empty) data"""
self.name: str = None
self.local_name: str = None
self.prefname: str = None # "preferred" name - ideally local_name
self.address: str = address
self.options = options
self.unique_id: str = None # mac address formatted.
self.area_id: str = None
self.area_name: str = None
self.area_distance: float = None # how far this dev is from that area
self.area_rssi: float = None # rssi from closest scanner
self.area_scanner: str = None # name of closest scanner
self.zone: str = STATE_UNAVAILABLE # STATE_HOME or STATE_NOT_HOME
self.manufacturer: str = None
self.connectable: bool = False
self.is_scanner: bool = False
self.entry_id: str = None # used for scanner devices
self.create_sensor: bool = False # Create/update a sensor for this device
self.create_sensor_done: bool = (
False # If we have requested the sensor be created
)
self.last_seen: float = (
0 # stamp from most recent scanner spotting. MONOTONIC_TIME
)
self.scanners: dict[str, BermudaDeviceScanner] = {}
def add_scanner(
self, scanner_device: BermudaDevice, discoveryinfo: BluetoothScannerDevice
):
"""Add/Replace a scanner entry on this device, indicating a received advertisement"""
if format_mac(scanner_device.address) in self.scanners:
# Device already exists, update it
self.scanners[format_mac(scanner_device.address)].update(
self.address,
discoveryinfo, # the entire BluetoothScannerDevice struct
scanner_device.area_id,
self.options,
)
else:
self.scanners[format_mac(scanner_device.address)] = BermudaDeviceScanner(
self.address,
discoveryinfo, # the entire BluetoothScannerDevice struct
scanner_device.area_id,
self.options,
)
device_scanner = self.scanners[format_mac(scanner_device.address)]
# Let's see if we should update our last_seen based on this...
if self.last_seen < device_scanner.stamp:
self.last_seen = device_scanner.stamp
def to_dict(self):
"""Convert class to serialisable dict for dump_devices"""
out = {}
for var, val in vars(self).items():
if var == "scanners":
scanout = {}
for address, scanner in self.scanners.items():
scanout[address] = scanner.to_dict()
val = scanout
out[var] = val
return out
class BermudaDataUpdateCoordinator(DataUpdateCoordinator):
"""Class to manage fetching data from the Bluetooth component.
Since we are not actually using an external API and only computing local
data already gathered by the bluetooth integration, the update process is
very cheap, and the processing process (currently) rather cheap.
Future work / algo's etc to keep in mind:
https://en.wikipedia.org/wiki/Triangle_inequality
- with distance to two rx nodes, we can apply min and max bounds
on the distance between them (less than the sum, more than the
difference). This could allow us to iterively approximate toward
the rx layout, esp as devices move between (and right up to) rx.
- bear in mind that rssi errors are typically attenuation-only.
This means that we should favour *minimum* distances as being
more accurate, both when weighting measurements from distant
receivers, and when whittling down a max distance between
receivers (but beware of the min since that uses differences)
https://mdpi-res.com/d_attachment/applsci/applsci-10-02003/article_deploy/applsci-10-02003.pdf?version=1584265508
- lots of good info and ideas.
TODO / IDEAS:
- when we get to establishing a fix, we can apply a path-loss factor to
a calculated vector based on previously measured losses on that path.
We could perhaps also fine-tune that with real-time measurements from
fixed beacons to compensate for environmental factors.
- An "obstruction map" or "radio map" could provide field strength estimates
at given locations, and/or hint at attenuation by counting "wall crossings"
for a given vector/path.
"""
def __init__(
self,
hass: HomeAssistant,
entry: ConfigEntry,
) -> None:
"""Initialize."""
# self.config_entry = entry
self.platforms = []
self.config_entry = entry
super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=SCAN_INTERVAL)
# First time around we freshen the restored scanner info by
# forcing a scan of the captured info.
self._do_full_scanner_init = True
self.options = {}
if hasattr(entry, "options"):
# Firstly, on some calls (specifically during reload after settings changes)
# we seem to get called with a non-existant config_entry.
# Anyway... if we DO have one, convert it to a plain dict so we can
# serialise it properly when it goes into the device and scanner classes.
for key, val in entry.options.items():
if key in (
CONF_ATTENUATION,
CONF_DEVICES,
CONF_DEVTRACK_TIMEOUT,
CONF_MAX_RADIUS,
CONF_REF_POWER,
):
self.options[key] = val
self.devices: dict[str, BermudaDevice] = {}
self.updaters: dict[str, BermudaPBDUCoordinator] = {}
self.area_reg = area_registry.async_get(hass)
# Restore the scanners saved in config entry data. We maintain
# a list of known scanners so we can
# restore the sensor states even if we don't have a full set of
# scanner receipts in the discovery data.
self.scanner_list = []
if hasattr(entry, "data"):
for address, saved in entry.data.get(CONFDATA_SCANNERS, {}).items():
scanner = self._get_or_create_device(address)
for key, value in saved.items():
setattr(scanner, key, value)
self.scanner_list.append(address)
hass.services.async_register(
DOMAIN,
"dump_devices",
self.service_dump_devices,
vol.Schema({vol.Optional("addresses"): cv.string}),
SupportsResponse.ONLY,
)
def sensor_created(self, address):
"""Allows sensor platform to report back that sensors have been set up"""
dev = self._get_device(address)
if dev is not None:
dev.create_sensor_done = True
self.updaters[address] = pduc = BermudaPBDUCoordinator(
self.hass, _LOGGER, address, dev, self
)
_LOGGER.debug("Registering PDUC for %s", dev.name)
self.config_entry.async_on_unload(pduc.async_start())
else:
_LOGGER.warning("Very odd, we got sensor_created for non-tracked device")
def _get_device(self, address: str) -> BermudaDevice:
"""Search for a device entry based on mac address"""
mac = format_mac(address)
# format_mac tries to return a lower-cased, colon-separated mac address.
# failing that, it returns the original unaltered.
if mac in self.devices:
return self.devices[mac]
return None
def _get_or_create_device(self, address: str) -> BermudaDevice:
device = self._get_device(address)
if device is None:
mac = format_mac(address)
self.devices[mac] = device = BermudaDevice(
address=mac, options=self.options
)
device.address = mac
device.unique_id = mac
return device
async def _async_update_data(self):
"""Update data on known devices.
This works only with local data, so should be cheap to run
(no network requests made etc).
"""
for service_info in bluetooth.async_discovered_service_info(self.hass, False):
# Note that some of these entries are restored from storage,
# so we won't necessarily find (immediately, or perhaps ever)
# scanner entries for any given device.
# Get/Create a device entry
device = self._get_or_create_device(service_info.address)
# Check if it's broadcasting an Apple Inc manufacturing data (ID: 0x004C)
for (
company_code,
man_data,
) in (
service_info.advertisement.manufacturer_data.items()
): # .get(0x004C, None)
if company_code == 0x00E0: # 224 Google
_LOGGER.debug(
"Found Google Device: %s %s", device.address, man_data.hex()
)
#
elif company_code == 0x004C: # 76 Apple Inc
_LOGGER.debug(
"Found Apple Manufacturer data: %s %s",
device.address,
man_data.hex(),
)
device.prefname = man_data.hex()
if man_data[:2] == b"\x02\x15": # 0x0215: # iBeacon packet
uuid = man_data[2:18].hex().upper()
major = int.from_bytes(man_data[18:20], byteorder="big")
minor = int.from_bytes(man_data[20:22], byteorder="big")
power = int.from_bytes([man_data[22]], signed=True)
_LOGGER.debug(
"Device %s is iBeacon with UUID %s %s %s %sdB",
device.address,
uuid,
major,
minor,
power,
)
# ce12cbeb2dbe448bb057c1fe9804b45f00640001c5
else:
_LOGGER.debug(
"Found unknown manufacturer %d data: %s %s",
company_code,
device.address,
man_data.hex(),
)
# We probably don't need to do all of this every time, but we
# want to catch any changes, eg when the system learns the local
# name etc.
device.name = device.name or service_info.device.name
device.local_name = (
device.local_name or service_info.advertisement.local_name
)
device.manufacturer = device.manufacturer or service_info.manufacturer
device.connectable = service_info.connectable
# Try to make a nice name for prefname.
# TODO: Add support for user-defined name.
if device.prefname is None or device.prefname.startswith(DOMAIN + "_"):
device.prefname = (
device.name
or device.local_name
or DOMAIN + "_" + slugify(device.address)
)
# Work through the scanner entries...
matched_scanners = bluetooth.async_scanner_devices_by_address(
self.hass, service_info.address, False
)
for discovered in matched_scanners:
scanner_device = self._get_device(discovered.scanner.source)
if scanner_device is None:
# The receiver doesn't have a device entry yet, let's refresh
# all of them in this batch...
self._refresh_scanners(matched_scanners, self._do_full_scanner_init)
self._do_full_scanner_init = False
scanner_device = self._get_device(discovered.scanner.source)
if scanner_device is None:
# Highly unusual. If we can't find an entry for the scanner
# maybe it's from an integration that's not yet loaded, or
# perhaps it's an unexpected type that we don't know how to
# find.
_LOGGER.error(
"Failed to find config for scanner %s, this is probably a bug.",
discovered.scanner.source,
)
continue
# Replace the scanner entry on the current device
device.add_scanner(scanner_device, discovered)
# Update whether the device has been seen recently, for device_tracker:
if (
MONOTONIC_TIME()
- self.options.get(CONF_DEVTRACK_TIMEOUT, DEFAULT_DEVTRACK_TIMEOUT)
< device.last_seen
):
device.zone = STATE_HOME
else:
device.zone = STATE_NOT_HOME
if device.address.upper() in self.options.get(CONF_DEVICES, []):
# This is a device we track. Set it up:
device.create_sensor = True
# FIXME: If a tracked device isn't present at start-up, the sensor
# entities don't get created (via the add_entries call in sensor.py etc)
# and we don't have a mechanism to trigger that later. What you see in
# the gui is a "restored" entity, marked as no longer being provided by
# the integration. I think *here* might be the place to fix that.
self._refresh_areas_by_min_distance()
# We might need to freshen deliberately on first start this if no new scanners
# were discovered in the first scan update. This is likely if nothing has changed
# since the last time we booted.
if self._do_full_scanner_init:
self._refresh_scanners([], self._do_full_scanner_init)
self._do_full_scanner_init = False
# The devices are all updated now (and any new scanners seen have been added),
# so let's ensure any devices that we create sensors for are set up ready to go.
# We don't do this sooner because we need to ensure we have every active scanner
# already loaded up.
for address in self.options.get(CONF_DEVICES, []):
device = self._get_device(format_mac(address))
if device is not None:
if not device.create_sensor_done:
_LOGGER.debug("Firing device_new for %s", device.name)
# self.hass.async_run_job(
async_dispatcher_send(
self.hass, SIGNAL_DEVICE_NEW, device.address, self.scanner_list
)
# )
# let the sensor platform do it intead: device.create_sensor_done = True
# end of async update
def dt_mono_to_datetime(self, stamp) -> datetime:
"""Given a monotonic timestamp, convert to datetime object"""
age = MONOTONIC_TIME() - stamp
return now() - timedelta(seconds=age)
def dt_mono_to_age(self, stamp) -> str:
"""Convert monotonic timestamp to age (eg: "6 seconds ago")"""
return get_age(self.dt_mono_to_datetime(stamp))
def _refresh_areas_by_min_distance(self):
"""Set area for ALL devices based on closest beacon"""
for device in self.devices.values():
if device.is_scanner is not True:
self._refresh_area_by_min_distance(device)
def _refresh_area_by_min_distance(self, device: BermudaDevice):
"""Very basic Area setting by finding closest beacon to a given device"""
assert device.is_scanner is not True
closest_scanner: BermudaDeviceScanner = None
for scanner in device.scanners.values():
# whittle down to the closest beacon inside max range
if scanner.rssi_distance < self.options.get(
CONF_MAX_RADIUS, DEFAULT_MAX_RADIUS
): # It's inside max_radius...
if closest_scanner is None:
# no encumbent, we win! (unless we don't have a stamp to validate our claim)
# FIXME: This effectively excludes HCI/usb adaptors currently since we
# haven't found a way to get ad timestamps from HA's bluez yet.
if scanner.stamp > 0:
closest_scanner = scanner
else:
# is it fresh enough to win on proximity alone?
is_fresh_enough = (
scanner.stamp > closest_scanner.stamp - ADVERT_FRESHTIME
)
# is it so much fresher that it wins outright?
is_fresher = (
scanner.stamp > closest_scanner.stamp + ADVERT_FRESHTIME
)
# is it closer?
is_closer = scanner.rssi_distance < closest_scanner.rssi_distance
if is_fresher or (
is_closer and is_fresh_enough
): # This scanner is closer, and the advert is still fresh in comparison..
closest_scanner = scanner
if closest_scanner is not None:
# We found a winner
old_area = device.area_name
device.area_id = closest_scanner.area_id
areas = self.area_reg.async_get_area(device.area_id)
if hasattr(areas, "name"):
device.area_name = areas.name
else:
# Wasn't a single area entry. Let's freak out.
_LOGGER.warning(
"Could not discern area from scanner %s: %s."
"Please assign an area then reload this integration",
closest_scanner.name,
areas,
)
device.area_name = f"No area: {closest_scanner.name}"
device.area_distance = closest_scanner.rssi_distance
device.area_rssi = closest_scanner.rssi
device.area_scanner = closest_scanner.name
if old_area != device.area_name and device.create_sensor:
_LOGGER.debug("Device %s now in %s", device.name, device.area_name)
else:
# Not close to any scanners!
device.area_id = None
device.area_name = None
device.area_distance = None
device.area_rssi = None
device.area_scanner = None
def _refresh_scanners(
self, scanners: list[BluetoothScannerDevice], do_full_scan=False
):
"""Refresh our local (and saved) list of scanners (BLE Proxies)"""
addresses = set()
update_scannerlist = False
for scanner in scanners:
addresses.add(scanner.scanner.source.upper())
# If we are doing a full scan, add all the known
# scanner addresses to the list, since that will cover
# the scanners that have been restored from config.data
if do_full_scan:
for address in self.scanner_list:
addresses.add(address)
if len(addresses) > 0:
# FIXME: Really? This can't possibly be a sensible nesting of loops.
# should probably look at the API. Anyway, we are checking any devices
# that have a "mac" or "bluetooth" connection,
for dev_entry in self.hass.data["device_registry"].devices.data.values():
for dev_connection in dev_entry.connections:
if dev_connection[0] in ["mac", "bluetooth"]:
found_address = dev_connection[1].upper()
if found_address in addresses:
scandev = self._get_device(found_address)
if scandev is None:
# It's a new scanner, we will need to update our saved config.
_LOGGER.debug("New Scanner: %s", found_address)
update_scannerlist = True
scandev = self._get_or_create_device(found_address)
scandev_orig = scandev
scandev.area_id = dev_entry.area_id
scandev.entry_id = dev_entry.id
if dev_entry.name_by_user is not None:
scandev.name = dev_entry.name_by_user
else:
scandev.name = dev_entry.name
areas = self.area_reg.async_get_area(dev_entry.area_id)
if hasattr(areas, "name"):
scandev.area_name = areas.name
else:
_LOGGER.warning(
"No area name for while updating scanner %s",
scandev.name,
)
scandev.is_scanner = True
if scandev_orig != scandev:
# something changed, let's update the saved list.
update_scannerlist = True
if update_scannerlist:
# We need to update our saved list of scanners in config data.
self.scanner_list = []
scanners: dict[str, str] = {}
for device in self.devices.values():
if device.is_scanner:
scanners[device.address] = device.to_dict()
self.scanner_list.append(device.address)
_LOGGER.debug(
"Replacing config data scanners was %s now %s",
self.config_entry.data.get(CONFDATA_SCANNERS, {}),
scanners,
)
self.hass.config_entries.async_update_entry(
self.config_entry,
data={**self.config_entry.data, CONFDATA_SCANNERS: scanners},
)
async def service_dump_devices(self, call): # pylint: disable=unused-argument;
"""Return a dump of beacon advertisements by receiver"""
out = {}
addresses_input = call.data.get("addresses", "")
if addresses_input != "":
addresses = addresses_input.upper().split()
else:
addresses = []
for address, device in self.devices.items():
if len(addresses) == 0 or address.upper() in addresses:
out[address] = device.to_dict()
return out
async def async_remove_config_entry_device(
hass: HomeAssistant, config_entry: ConfigEntry, device_entry: DeviceEntry
) -> bool:
"""Remove a config entry from a device."""
coordinator: BermudaDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
address = None
for ident in device_entry.identifiers:
try:
if ident[0] == DOMAIN:
# the identifier should be the mac address, and
# may have "_range" or some other per-sensor suffix. Just grab
# the mac address part.
address = ident[1][:17]
except KeyError:
pass
if address is not None:
try:
coordinator.devices[format_mac(address)].create_sensor = False
except KeyError:
_LOGGER.warning("Failed to locate device entry for %s", address)
return True
return False
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Handle removal of an entry."""
coordinator = hass.data[DOMAIN][entry.entry_id]
unloaded = all(
await asyncio.gather(
*[
hass.config_entries.async_forward_entry_unload(entry, platform)
for platform in PLATFORMS
if platform in coordinator.platforms
]
)
)
if unloaded:
hass.data[DOMAIN].pop(entry.entry_id)
return unloaded
async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Reload config entry."""
await async_unload_entry(hass, entry)
await async_setup_entry(hass, entry)