-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmesh_subscriptions.py
109 lines (103 loc) · 3.7 KB
/
mesh_subscriptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
''' Bluetooth mesh cloud subscriptions module '''
from byte_codec import uint16
class Subscriptions():
''' Bluetooth mesh cloud subscriptions class '''
__MENU_CHOICES= [
'Subscribe',
'Unsubscribe',
'Get subscription list'
]
def __init__(self, sem, get_choice, publish):
self.__sem = sem
self.__get_choice = get_choice
self.__publish = publish
self.__subsciptions = [dict]
def __print(self):
print('Subscribed Addresses:')
for sub in self.__subscriptions:
print(' - ' + uint16(sub['address']))
if len(self.__subscriptions) == 0:
print(' No subscribed addresses')
print()
def menu(self):
''' Subscription configuration '''
print('SUBSCRIPTION CONFIGURATION MENU')
choice = self.__get_choice(self.__MENU_CHOICES)
if choice is None or choice == -1:
return choice
if choice == 0:
# Subscribe
addr = int(input('Enter unsigned 16-bit mesh address to subscribe to: '), 0)
subscribe = {
'id': 'randomId',
'type': 'operation',
'operation': {
'type': 'subscribe',
'addressList': [
{
'address': addr
}
]
}
}
print('Subscribing...')
self.__publish(subscribe)
if not self.__sem.acquire():
return
self.__print()
elif choice == 1:
# Unsubscribe
subscribe_list_req = {
'id': 'randomId',
'type': 'operation',
'operation': {
'type': 'subscribe_list_request'
}
}
print('Acquiring subscription list...')
self.__publish(subscribe_list_req)
if not self.__sem.acquire():
return
choices = []
for sub in self.__subscriptions:
choices.append(uint16(sub['address']))
print('Which address would you to like to unsubscribe from?')
choice = self.__get_choice(choices)
if choice is None or choice == -1:
return choice
addr = int(choices[choice], 0)
unsubscribe = {
'id': 'randomId',
'type': 'operation',
'operation': {
'type': 'unsubscribe',
'addressList': [
{
'address': addr
}
]
}
}
print('Unsubscribing...')
self.__publish(unsubscribe)
if not self.__sem.acquire():
return
self.__print()
elif choice == 2:
# Get subscriptin list
subscribe_list_req = {
'id': 'randomId',
'type': 'operation',
'operation': {
'type': 'subscribe_list_request'
}
}
print('Acquiring subscription list...')
self.__publish(subscribe_list_req)
if not self.__sem.acquire():
return
self.__print()
def evt(self, event):
''' Receive subscription list from gateway '''
self.__subscriptions = event['addressList'].copy()
self.__sem.release()