-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreplay_buffer.py
165 lines (135 loc) · 7.56 KB
/
replay_buffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from copy import deepcopy
from metavision_core.event_io import EventsIterator
from metavision_sdk_core import PeriodicFrameGenerationAlgorithm
from metavision_sdk_ui import EventLoop, BaseWindow, Window, UIAction, UIKeyEvent, MTWindow
from time import time, sleep
import _thread
class Replay_buffer():
def __init__(self, input_path="", start_ts=0, mode="delta_t", delta_t=20000, n_events=1000,
max_duration=None, relative_timestamps=False, accum_time=20000, replay_time=2, slow_scale=5):
self.events_iterator = EventsIterator(input_path=input_path,
start_ts=start_ts, mode=mode,
delta_t=delta_t,
n_events=n_events,
max_duration=max_duration, relative_timestamps=relative_timestamps)
self.accum_time = accum_time
self.global_counter = 0 # track how many events we processed
self.global_max_t = 0 # track the highest timestamp we processed
self.replay_time = replay_time # time intervel to replay in [s]
self.rply_buffer = [] # replay buffer to keep the lastest events of an interval
self.rply_buffer_size = int(replay_time * 1.0e6 / delta_t) # the number of events slices the replay buffer keeps
self.height, self.width = self.events_iterator.get_size() # Camera Geometry
self.should_clear = False
self.slow_scale = slow_scale
def run(self):
# Window - Graphical User Interface
with Window(title="Real-time Play", width=4 * self.width, height=4 * self.height, mode=BaseWindow.RenderMode.BGR) as window:
def keyboard_cb(key, scancode, action, mods):
if action != UIAction.RELEASE:
return
if key == UIKeyEvent.KEY_Q or key == UIKeyEvent.KEY_ESCAPE:
window.set_close_flag()
self.should_clear = True
if key == UIKeyEvent.KEY_SPACE and len(self.rply_buffer) == self.rply_buffer_size:
self.should_clear = False
print("--------Open Replay Window--------")
#t_rply1 = time()
self.replay_callback()
self.should_clear = True
#t_rply2 = time()
#print("replay time: {} s".format(t_rply2 - t_rply1))
print("--------Close Replay Window--------")
window.set_keyboard_callback(keyboard_cb)
# Event Frame Generator
event_frame_gen = PeriodicFrameGenerationAlgorithm(self.width, self.height, self.accum_time)
def on_cd_frame_cb(ts, cd_frame):
window.show(cd_frame)
event_frame_gen.set_output_callback(on_cd_frame_cb)
t1 = time()
i = 0
# evs is a list of tuples, each tuple is an event
for evs in self.events_iterator:
# Dispatch system events to the window
EventLoop.poll_and_dispatch()
#print("Before processing ", time())
event_frame_gen.process_events(evs)
#print("current fps: ", event_frame_gen.get_fps())
#print("----- New Event Buffer -----")
if evs.size == 0:
#print("The current event buffer is empty!")
continue
else:
min_t = evs['t'][0] # Get the timestamp of the first event of this callback
max_t = evs['t'][-1] # Get the timestamp of the last event of this callback
self.global_max_t = max_t
counter = evs.size # Local counter
self.global_counter += counter # Increase global counter
#print(f"There were {counter} events in this event buffer.")
#print(f"There were {self.global_counter} total events up to now.")
#print(f"The current event buffer included events from {min_t} to {max_t} microseconds.")
#print("----- End of the event buffer! -----")
# updata the replay buffer
if len(self.rply_buffer) < self.rply_buffer_size:
self.rply_buffer.append(evs)
else:
self.rply_buffer.append(evs) # add the lastest evs slice
popped = self.rply_buffer.pop(0) # remove the first evs slice
del popped
#print("replay buffer size: ", len(self.rply_buffer))
if window.should_close():
break
# test the time
i += 1
if i == self.rply_buffer_size:
t2 = time()
print("time to play a replay buffer in normal speed: {} s".format(t2 - t1))
t1 = time()
i = 0
def replay_callback(self):
rply_buffer = deepcopy(self.rply_buffer)
def make_window():
with Window(title="Replay Window", width=4 * self.width, height=4 * self.height, mode=BaseWindow.RenderMode.BGR) as rply_window:
def keyboard_cb(key, scancode, action, mods):
if action != UIAction.RELEASE:
return
if key == UIKeyEvent.KEY_ESCAPE or key == UIKeyEvent.KEY_Q:
print("--should close replay--")
rply_window.set_close_flag()
rply_window.set_keyboard_callback(keyboard_cb)
# Event Frame Generator
rply_event_frame_gen = PeriodicFrameGenerationAlgorithm(self.width, self.height, int(self.accum_time/self.slow_scale))
def on_cd_frame_cb(ts, cd_frame):
rply_window.show(cd_frame)
rply_event_frame_gen.set_output_callback(on_cd_frame_cb)
time_per_iter = self.accum_time * 1.0e-6 * self.slow_scale
print("time_per_iter: ", time_per_iter)
t_rply1 = time()
first = True
j = 0
last_time = time()
for evs in rply_buffer:
if first:
min_t = evs['t'][0] # Get the timestamp of the first event of this callback
first = False
# Dispatch system events to the window
EventLoop.poll_and_dispatch()
rply_event_frame_gen.process_events(evs)
#cur_time = time()
#print("time of one iter: {} s".format(cur_time - last_time))
#if cur_time - last_time < time_per_iter:
#sleep(time_per_iter - (cur_time - last_time))
#print("wait to display")
#last_time = cur_time
j += 1
while time() - last_time < time_per_iter:
continue
last_time = time()
if rply_window.should_close():
break
#print("replay fps: ", rply_event_frame_gen.get_fps())
max_t = evs['t'][-1] # Get the timestamp of the last event of this callback
print("Event elapse: ", max_t - min_t)
# print("j:", j)
t_rply2 = time()
print("replay time: {} s".format(t_rply2 - t_rply1))
_thread.start_new_thread(make_window, ())