-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
141 lines (111 loc) · 4.01 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
BioGUI entry point.
Copyright 2023 Mattia Orlandi, Pierangelo Maria Rapa
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import argparse
import socket
import sys
from PySide6.QtCore import QObject, QThread, Signal
from biogui import BioGUI
class SocketListener(QObject):
"""
Component handling remote acquisition management via a socket.
Parameters
----------
port : int
Socket port.
Attributes
----------
_port : int
Socket port.
_isRunning : bool
Whether the socket is still running.
Class attributes
----------------
startCmdRecv : Signal
Qt Signal emitted when the socket receive a start command.
stopCmdRecv : Signal
Qt Signal emitted when the socket receive a stop command.
"""
startCmdRecv = Signal()
stopCmdRecv = Signal()
def __init__(self, port: int, parent=None):
super().__init__(parent)
self._port = port
self._isRunning = True
def run(self):
# Create a socket listener
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", self._port))
s.listen()
# Set socket to non-blocking mode
s.settimeout(1.0)
while self._isRunning:
try:
conn, addr = s.accept()
logging.info(f"SocketListener: TCP connection from {addr}.")
with conn:
data = conn.recv(1024).strip()
if data == b"start":
logging.info("SocketListener: start command received.")
self.startCmdRecv.emit()
elif data == b"stop":
logging.info("SocketListener: stop command received.")
self.stopCmdRecv.emit()
# Close the connection
conn.close()
except socket.timeout:
continue # Continue listening if no connection is made
except Exception as e:
logging.error(f"SocketListener: failed with exception {e}")
break
def stop(self):
self._isRunning = False
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--log",
action="store_true",
help="Whether to enable logs",
)
parser.add_argument(
"--rem_port",
type=int,
required=False,
help="Port for remote acquisition management",
)
args = vars(parser.parse_args())
# Enable logging
if args["log"]:
import logging
logging.basicConfig(level=logging.INFO)
app = BioGUI()
if args["rem_port"]:
# Instantiate socket listener for remote acquisition execution
socketListener = SocketListener(args["rem_port"])
thread = QThread(app)
socketListener.moveToThread(thread)
# Safe exit
def onAppExit():
socketListener.stop()
thread.quit()
thread.wait()
# Connect signals
thread.started.connect(socketListener.run)
thread.finished.connect(thread.deleteLater)
socketListener.startCmdRecv.connect(app.mainController.startStreaming)
socketListener.stopCmdRecv.connect(app.mainController.stopStreaming)
app.aboutToQuit.connect(onAppExit)
# Start the thread
thread.start()
sys.exit(app.exec())