-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathota-handling.py
139 lines (112 loc) · 4.71 KB
/
ota-handling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# SPDX-License-Identifier: MIT
# Copyright (C) 2024 Avnet
# Authors: Nikola Markovic <[email protected]> et al.
import os
import random
import subprocess
import sys
import threading
import time
import urllib.request
from typing import Optional
from avnet.iotconnect.sdk.lite import Client, DeviceConfig, Callbacks, DeviceConfigError
from avnet.iotconnect.sdk.lite import __version__ as SDK_VERSION
from avnet.iotconnect.sdk.sdklib.mqtt import C2dOta, C2dAck
"""
In this demo we demonstrate a simple example of how an OTA could be handled.
We read the list of OTA URLs then and if we see a wheel, we install it. If we see a zip ot a zipped tar, we extract.
Then we restart the process so that it can pick up on newly replaced files.
While we should have more robust handling for all of these steps in production, this example is simplified so
that
"""
dl_thread: Optional[threading.Thread] = None
# can only exit from main thread, so use this flag or synchronize threads
need_restart = False
def exit_and_restart():
print("") # Print a blank line so it doesn't look as confusing in the output.
sys.stdout.flush()
# This way to restart the process seems to work reliably.
# It is best to drive the main application with a runner, like a system service,
# a cron job or custom simple driver script that keeps restarting the main application python process on exit
os.execv(sys.executable, [sys.executable, __file__] + [sys.argv[0]])
def subprocess_run_with_print(args):
print("Running command:", ' '.join(args))
subprocess.run(args, check=True)
def download(msg: C2dOta):
global dl_thread
error_msg = None
c.send_ota_ack(msg, C2dAck.OTA_DOWNLOADING)
for url in msg.urls:
print("Downloading OTA file %s from %s" % (url.file_name, url.url))
try:
urllib.request.urlretrieve(url.url, url.file_name)
except Exception as e:
print("Encountered download error", e)
error_msg = "Download error for %s" % url.file_name
break
try:
if url.file_name.endswith(".whl"):
# Force install could help with testing and allowing package downgrades
subprocess_run_with_print(("python3", "-m", "pip", "install", "--force-reinstall", url.file_name))
elif url.file_name.endswith(".zip"):
subprocess_run_with_print(("unzip", "-oqq", url.file_name))
elif url.file_name.endswith(".tgz") or url.file_name.endswith(".tar.gz"):
subprocess_run_with_print(("tar", "-zxf", url.file_name))
else:
print("ERROR: Unhandled file format for file %s" % url.file_name)
error_msg = "Processing error for %s" % url.file_name
break
except subprocess.CalledProcessError:
print("ERROR: Failed to install %s" % url.file_name)
error_msg = "Install error for %s" % url.file_name
break
if error_msg is not None:
c.send_ota_ack(msg, C2dAck.OTA_FAILED, error_msg)
print('Encountered a download processing error "%s". Not restarting.' % error_msg) # In hopes that someone pushes a better update
else:
global need_restart
print("OTA successful. Will restart the application at next main loop iteration...")
c.send_ota_ack(msg, C2dAck.OTA_DOWNLOAD_DONE)
need_restart = True
dl_thread = None
def on_ota(msg: C2dOta):
global dl_thread
if dl_thread is not None:
print("Received OTA while download is still in progress")
return
# We just print the URL. The actual handling of the OTA request would be project specific.
print("Starting OTA downloads for version %s" % msg.version)
dl_thread = threading.Thread(target=download, args=[msg])
dl_thread.start()
try:
device_config = DeviceConfig.from_iotc_device_config_json_file(
device_config_json_path="iotcDeviceConfig.json",
device_cert_path="device-cert.pem",
device_pkey_path="device-pkey.pem"
)
c = Client(
config=device_config,
callbacks=Callbacks(
ota_cb=on_ota,
)
)
while True:
if not c.is_connected():
print('(re)connecting...')
c.connect()
if not c.is_connected():
print('Unable to connect. Exiting.') # Still unable to connect after 100 (default) re-tries.
sys.exit(2)
c.send_telemetry({
'sdk_version': SDK_VERSION,
'random': random.randint(0, 100)
})
if need_restart:
exit_and_restart()
time.sleep(10)
except DeviceConfigError as dce:
print(dce)
sys.exit(1)
except KeyboardInterrupt:
print("Exiting.")
sys.exit(0)