-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbt.py
311 lines (254 loc) · 10.1 KB
/
bt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import asyncio
import re
import subprocess
import time
import logging
from typing import Callable, List, Dict, Union, Tuple
from bleak import BleakClient, BleakScanner
NameType = Union[str, int, Tuple[Union[str, int]]]
def get_logger(verbose=False):
log_format = '%(asctime)s %(levelname)s [%(module)s] %(message)s'
if verbose:
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level, format=log_format, datefmt='%H:%M:%S')
logger = logging.getLogger()
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
return logger
class FuturesPool:
"""
Manage a collection of named futures.
"""
def __init__(self):
self._futures: Dict[str, asyncio.Future] = {}
def acquire(self, name: NameType):
if isinstance(name, tuple):
tuple(self.acquire(n) for n in name)
return FutureContext(name, pool=self)
assert name not in self._futures, "already waiting for %s" % name
fut = asyncio.Future()
self._futures[name] = fut
return FutureContext(name, pool=self)
def set_result(self, name, value):
fut = self._futures.get(name, None)
if fut:
# if fut.done():
# print('future %s already done' % name)
fut.set_result(value)
def clear(self):
for fut in self._futures.values():
fut.cancel()
self._futures.clear()
def remove(self, name):
if isinstance(name, tuple):
return tuple(self.remove(n) for n in name)
self._futures.pop(name, None)
async def wait_for(self, name: NameType, timeout):
if isinstance(name, tuple):
tasks = [self.wait_for(n, timeout) for n in name]
return await asyncio.gather(*tasks, return_exceptions=False)
try:
return await asyncio.wait_for(self._futures.get(name), timeout)
except (asyncio.TimeoutError, asyncio.CancelledError):
raise
finally:
self.remove(name)
class FutureContext:
def __init__(self, name: NameType, pool: FuturesPool):
self.name = name
self.pool = pool
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.remove(self.name)
async def bt_discovery():
print('BT Discovery:')
devices = await BleakScanner.discover()
if not devices:
print(' - no devices found - ')
for d in devices:
print(f"BT Device {d.name} address={d.address}")
def bleak_version():
try:
import bleak
return bleak.__version__
except:
from importlib.metadata import version
return version('bleak')
def bt_stack_version():
try:
# get BlueZ version
p = subprocess.Popen(["bluetoothctl", "--version"], stdout=subprocess.PIPE)
out, _ = p.communicate()
s = re.search(b"(\\d+).(\\d+)", out.strip(b"'"))
bluez_version = tuple(map(int, s.groups()))
return 'bluez-v%i.%i' % bluez_version
except:
return '? (%s)' % BleakClient.__name__
class BtBms:
def __init__(self, address: str, name: str, keep_alive=False, psk=None, adapter=None, verbose_log=False):
self.address = address
self.name = name
self.keep_alive = keep_alive
self.verbose_log = verbose_log
self.logger = get_logger(verbose_log)
self._fetch_futures = FuturesPool()
self._psk = psk
self._connect_time = 0
if address.startswith('test_'):
pass
else:
kwargs = {}
if psk:
try:
import bleak.backends.bluezdbus.agent
except ImportError:
self.logger.warn("this bleak version has no pairing agent, pairing with a pin will likely fail!")
self._adapter = adapter
if adapter: # hci0, hci1 (BT adapter hardware)
kwargs['adapter'] = adapter
self.client = BleakClient(address,
handle_pairing=bool(psk),
disconnected_callback=self._on_disconnect,
**kwargs
)
async def start_notify(self, char_specifier, callback: Callable[[int, bytearray], None], **kwargs):
if not isinstance(char_specifier, list):
char_specifier = [char_specifier]
exception = None
for cs in char_specifier:
try:
await self.client.start_notify(cs, callback, **kwargs)
return cs
except Exception as e:
exception = e
await enumerate_services(self.client, self.logger)
raise exception
def characteristic_uuid_to_handle(self, uuid: str, property: str) -> Union[str, int]:
for service in self.client.services:
for char in service.characteristics:
if char.uuid == uuid and property in char.properties:
return char.handle
return uuid
def _on_disconnect(self, client):
if self.keep_alive and self._connect_time:
self.logger.warning('BMS %s disconnected after %.1fs!', self.__str__(), time.time() - self._connect_time)
try:
self._fetch_futures.clear()
except Exception as e:
self.logger.warning('error clearing futures pool: %s', str(e) or type(e))
async def _connect_client(self, timeout):
await self.client.connect(timeout=timeout)
if self.verbose_log:
await enumerate_services(self.client, logger=self.logger)
self._connect_time = time.time()
if self._psk:
def get_passkey(device: str, pin, passkey):
if pin:
self.logger.info(f"Device {device} is displaying pin '{pin}'")
return True
if passkey:
self.logger.info(f"Device {device} is displaying passkey '{passkey:06d}'")
return True
self.logger.info(f"Device {device} asking for psk, giving '{self._psk}'")
return str(self._psk) or None
self.logger.debug("Pairing %s using psk '%s'...", self._psk)
res = await self.client.pair(callback=get_passkey)
if not res:
self.logger.error("Pairing failed!")
@property
def is_connected(self):
return self.client.is_connected
async def connect(self, timeout=20):
"""
Establish a BLE connection
:param timeout:
:return:
"""
await self._connect_client(timeout=timeout)
async def _connect_with_scanner(self, timeout=20):
"""
Starts a bluetooth discovery and tries to establish a BLE connection with back off.
This fixes connection errors for some BMS (jikong). Use instead of connect().
:param timeout:
:return:
"""
import bleak
scanner_kw = {}
if self._adapter:
scanner_kw['adapter'] = self._adapter
scanner = bleak.BleakScanner(**scanner_kw)
self.logger.debug("starting scan")
await scanner.start()
attempt = 1
while True:
try:
discovered = set(b.address for b in scanner.discovered_devices)
if self.client.address not in discovered:
raise Exception('Device %s not discovered. Make sure it in range and is not being controled by '
'another application. (%s)' % (self.client.address, discovered))
self.logger.debug("connect attempt %d", attempt)
await self._connect_client(timeout=timeout / 2)
break
except Exception as e:
await self.client.disconnect()
if attempt < 8:
self.logger.debug('retry %d after error %s', attempt, e)
await asyncio.sleep(0.2 * (1.5 ** attempt))
attempt += 1
else:
await scanner.stop()
raise
await scanner.stop()
async def disconnect(self):
await self.client.disconnect()
self._fetch_futures.clear()
def __str__(self):
return f'{self.__class__.__name__}({self.client.address})'
async def __aenter__(self):
# print("enter")
if self.keep_alive and self.is_connected:
return
await self.connect()
async def __aexit__(self, *args):
# print("exit")
if self.keep_alive:
return
if self.client.is_connected:
await self.disconnect()
def __await__(self):
return self.__aexit__().__await__()
def set_keep_alive(self, keep):
if keep:
self.logger.info("BMS %s keep alive enabled", self.__str__())
self.keep_alive = keep
def debug_data(self):
return None
async def enumerate_services(client: BleakClient, logger):
for service in client.services:
logger.info(f"[Service] {service}")
for char in service.characteristics:
if "read" in char.properties:
try:
value = bytes(await client.read_gatt_char(char.uuid))
logger.info(
f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
)
except Exception as e:
logger.error(
f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {e}"
)
else:
value = None
logger.info(
f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
)
for descriptor in char.descriptors:
try:
value = bytes(
await client.read_gatt_descriptor(descriptor.handle)
)
logger.info(f"\t\t[Descriptor] {descriptor}) | Value: {value}")
except Exception as e:
logger.error(f"\t\t[Descriptor] {descriptor}) | Value: {e}")