-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathvideo_ui.py
169 lines (140 loc) · 6.22 KB
/
video_ui.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""This user interface allow the user to take control of the drone in the same time it provides you
a live stream video of the front camera of the drone"""
import os
import tkinter as tk
from tkinter import PhotoImage, TclError
from time import sleep
from threading import Thread
from PIL import ImageTk
from toolbox import command_from_key
from tello_edu import TelloEDU
class VideoUI:
"""Embeded Tkinter ineterface and drone control"""
def __init__(self, drone):
self.drone = drone
self.pictures = []
self.video_thread = self.ka_thread = None
# UI
self.window_is_open = False
self.root = tk.Tk()
self.root.wm_title("VideoUI")
self.root.bind("<Key>", self.keys)
self.root.bind("<Control-Key>", self.quit)
self.root.protocol("WM_DELETE_WINDOW", self.quit)
self.panel = None
self.dir_path = os.path.dirname(os.path.realpath(__file__))
picture_path = os.path.sep.join((self.dir_path, 'pictures'))
if not self.drone.is_connected:
path = os.path.sep.join((picture_path, 'connection_failed.png'))
self.frame = PhotoImage(file=path)
else:
path = os.path.sep.join((picture_path, 'loading.png'))
self.frame = PhotoImage(file=path)
# Timeout to kill concurent threads
self.drone.command_socket.settimeout(11)
self.tkframe = None
self.panel = tk.Label(self.root, image=self.frame)
self.panel.pack()
def __del__(self):
"""Deleting the drone drone reference"""
print('UI deletion')
@property
def threads_alive(self):
"""Return the number of threads still alived"""
existing_threads = [thread for thread in (self.video_thread, self.ka_thread) if thread is not None]
return len([thread for thread in existing_threads if thread.isAlive()])
def show_bindings(self):
"""Display keybindings on the UI"""
tk.Label(self.root, text='⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀forward clockwise rotation : 4 ↻').pack()
tk.Label(self.root, text='⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⬆️ counter clockwise rotation : 6 ↺').pack()
tk.Label(self.root, text='left ⬅️ ➡️ right').pack()
tk.Label(self.root, text='⬇️').pack()
tk.Label(self.root, text='backward').pack()
tk.Label(self.root, text='').pack()
tk.Label(self.root, text='land : +⠀⠀⠀⠀⠀').pack()
tk.Label(self.root, text='takeoff : space bar').pack()
tk.Label(self.root, text='up : 8⠀⠀⠀⠀⠀').pack()
tk.Label(self.root, text='down : 2⠀⠀⠀⠀⠀').pack()
def quit(self, event=None):
"""Close the window"""
print('quit')
self.window_is_open = False
self.drone.end_connection = True
# Try to kill drone before
# sleep(2)
try:
self.root.destroy()
# If window has already been close ...
except TclError as tcl:
print(tcl)
def keys(self, event):
"""Bind keys to drone commands"""
key = event.char
keycode = event.keycode
if key == 'p':
self.pictures.append(self.drone.take_picture())
self.drone.save_pictures(self.pictures) # Rewriting each time
elif command_from_key(key) is not None:
command = command_from_key(key)
self.drone.execute_actions([f'0-{command}'])
elif command_from_key(keycode) is not None:
command = command_from_key(keycode)
self.drone.execute_actions([f'0-{command}'])
else:
print(f'{key} is not bind to an action')
def _keep_alive(self):
"""Simple thread to be sure the drone will not try to land after a short time without command received"""
while self.drone.is_connected and self.window_is_open:
self.drone.execute_actions(['0-command'])
sleep(10)
print('keep alive done')
self.drone.end_connection = True
path = os.path.sep.join((self.dir_path, 'offline.png'))
tk.Label(self.root, image=path).pack()
def _video_stream(self):
"""Image recepting thread"""
while self.drone.is_connected and self.window_is_open:
self.tkframe = self.frame
if self.drone.take_picture() is not None:
self.frame = self.drone.take_picture()
try:
# This may lead to blocking call ...
self.tkframe = ImageTk.PhotoImage(self.frame)
except RuntimeError:
print('Last frame was incomplete')
### Problem : blocking function call
#This should be an asynchronous call (indications about future image)
self.panel.configure(image=self.tkframe)
#This is the real affectation
self.panel.image = self.tkframe
self.drone.end_connection = True
print('video down')
# Test if panel is None after destruct
def open(self):
"""Main method used to open the UI"""
if self.drone.is_connected:
self.window_is_open = True
# Threads
self.ka_thread = Thread(target=self._keep_alive)
self.ka_thread.start()
self.video_thread = Thread(target=self._video_stream)
self.video_thread.start()
self.show_bindings()
self.root.mainloop()
self.drone.end_connection = True
print('windows done')
# Manually kill the thread (tkinter issue)
while self.drone.threads_alive or self.threads_alive:
sleep(1)
print(self.drone.threads_alive)
print(self.threads_alive)
print('not drone here')
del self.drone
if __name__ == "__main__":
# my_drone = Swarm(['192.168.10.1', ], video_stream=True)
my_drone = TelloEDU('192.168.10.1', video_stream=True)
vui = VideoUI(my_drone)
vui.open()
# Problem
# Windows needs to be killed after the drone disconnect else the software is trying to update the closed windows
# unding with a blocking call function with no warning