-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathclient_channels.py
129 lines (105 loc) · 5.6 KB
/
client_channels.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import logging
import time
from typing import Any, Dict
import socketio
from era_5g_interface.channels import DATA_ERROR_EVENT, DATA_NAMESPACE, CallbackInfoClient, Channels, ChannelType
logger = logging.getLogger(__name__)
class ClientChannels(Channels):
"""Channels class is used to define channel data callbacks and contains send functions.
It handles image frames JPEG, H.264 and HEVC, and JSON LZ4 encoding/decoding. Data is sent via the DATA_NAMESPACE.
"""
_callbacks_info: Dict[str, CallbackInfoClient]
def __init__(
self,
sio: socketio.Client,
callbacks_info: Dict[str, CallbackInfoClient],
**kwargs,
):
"""Constructor.
Args:
sio (socketio.Client): Socketio Client object.
callbacks_info (Dict[str, CallbackInfoClient]): Callbacks Info dictionary, key is custom event name.
disconnect_callback (Callable, optional): Triggered before _shutdown on unhandled exception.
back_pressure_size (int, optional): Back pressure size - max size of eio.queue.qsize().
recreate_coder_attempts_count (int): How many times try to recreate the video frame encoder/decoder.
stats (bool): Store output data sizes.
extended_measuring (bool): Enable logging of measuring.
"""
super().__init__(sio, callbacks_info, **kwargs)
self._sio.on(DATA_ERROR_EVENT, lambda data: self.data_error_callback(data), namespace=DATA_NAMESPACE)
for event, callback_info in self._callbacks_info.items():
logger.info(f"Creating client channels callback, type: {callback_info.type}, event: '{event}'")
if callback_info.type is ChannelType.JSON:
self._sio.on(
event,
lambda data, local_event=event: self.json_callback(data, local_event),
namespace=DATA_NAMESPACE,
)
elif callback_info.type is ChannelType.JSON_LZ4:
self._sio.on(
event,
lambda data, local_event=event: self.json_lz4_callback(data, local_event),
namespace=DATA_NAMESPACE,
)
elif callback_info.type in (ChannelType.JPEG, ChannelType.H264, ChannelType.HEVC):
self._sio.on(
event,
lambda data, local_event=event: self.image_callback(data, local_event),
namespace=DATA_NAMESPACE,
)
else:
raise ValueError(f"Unknown channel type: {callback_info.type}")
def json_callback(self, data: Dict[str, Any], event: str) -> Any:
"""Allows to receive general JSON data on DATA_NAMESPACE.
Args:
data (Dict[str, Any]): JSON data.
event (str): Event name.
"""
timestamp = Channels.get_timestamp_from_data(data)
cb_info = self._callbacks_info[event]
try:
self._input_measuring.log_timestamp(timestamp, "before_callback_timestamp")
return_value = cb_info.callback(data)
self._input_measuring.log_timestamp(timestamp, "after_callback_timestamp")
self._input_measuring.log_measuring(timestamp, "eio_sid", self.get_client_eio_sid(None, DATA_NAMESPACE))
self._input_measuring.log_measuring(timestamp, "event", event)
self._input_measuring.store_measuring(timestamp)
return return_value
except Exception:
self._shutdown("JSON", event)
def json_lz4_callback(self, data: bytes, event: str) -> Any:
"""Allows to receive LZ4 compressed general JSON data on DATA_NAMESPACE.
Args:
data (bytes): LZ4 compressed JSON data.
event (str): Event name.
"""
before_lz4_decode_timestamp = time.perf_counter_ns()
decoded_data = super().data_lz4_decode(data, event)
if decoded_data:
after_lz4_decode_timestamp = time.perf_counter_ns()
timestamp = Channels.get_timestamp_from_data(decoded_data, default=before_lz4_decode_timestamp)
self._input_measuring.log_measuring(timestamp, "before_lz4_decode_timestamp", before_lz4_decode_timestamp)
self._input_measuring.log_measuring(timestamp, "after_lz4_decode_timestamp", after_lz4_decode_timestamp)
return self.json_callback(decoded_data, event)
def image_callback(self, data: Dict[str, Any], event: str) -> Any:
"""Allows to receive JPEG or H.264 or HEVC encoded image on DATA_NAMESPACE.
Args:
data (Dict[str, Any]): Received dictionary with frame data.
event (str): Event name.
"""
timestamp = Channels.get_timestamp_from_data(data)
cb_info = self._callbacks_info[event]
self._input_measuring.log_timestamp(timestamp, "before_decode_timestamp")
decoded_data = super().image_decode(data, event)
self._input_measuring.log_timestamp(timestamp, "after_decode_timestamp")
if decoded_data:
try:
self._input_measuring.log_timestamp(timestamp, "before_callback_timestamp")
return_value = cb_info.callback(decoded_data)
self._input_measuring.log_timestamp(timestamp, "after_callback_timestamp")
self._input_measuring.log_measuring(timestamp, "eio_sid", self.get_client_eio_sid(None, DATA_NAMESPACE))
self._input_measuring.log_measuring(timestamp, "event", event)
self._input_measuring.store_measuring(timestamp)
return return_value
except Exception:
self._shutdown("image", event)