-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathmain.py
369 lines (327 loc) · 13.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
from win32api import GetSystemMetrics
from PIL import Image
import math
import os
import queue
import sys
import time
import yaml
import serial
import win32api
import win32con
import win32gui
import win32print
import threading
import copy
# 是否开启调试模式
DEBUG = False
# 编辑好的图片路径
IMAGE_PATH = "./image/image_monitor.png"
# 串口号
COM_PORT = "COM33"
# 比特率
COM_BAUDRATE = 9600
# 检测区域的像素值范围
AREA_SCOPE = 120
# 检测区域圆上点的数量
AREA_POINT_NUM = 8
# 是否开启屏幕反转
REVERSE_MONITOR = False
# touch_thread 是否启用sleep, 默认开启, 如果程序 CPU 占用较高则开启, 如果滑动时延迟极大请关闭
TOUCH_THREAD_SLEEP_MODE = False
# 每次 sleep 的延迟, 单位: 微秒, 默认 10 微秒
TOUCH_THREAD_SLEEP_DELAY = 10
# 自定义屏幕宽(0则为自动获取主屏幕的值)
SCREEN_WIDTH: 0
# 自定义屏幕高(0则为自动获取主屏幕的值)
SCREEN_HEIGHT: 0
# 窗口图标路径
icon_path = './image/favicon.ico'
exp_list = [
["A1", "A2", "A3", "A4", "A5", ],
["A6", "A7", "A8", "B1", "B2", ],
["B3", "B4", "B5", "B6", "B7", ],
["B8", "C1", "C2", "D1", "D2", ],
["D3", "D4", "D5", "D6", "D7", ],
["D8", "E1", "E2", "E3", "E4", ],
["E5", "E6", "E7", "E8", ],
]
exp_image_dict = {'41-65-93': 'A1', '87-152-13': 'A2', '213-109-81': 'A3', '23-222-55': 'A4', '69-203-71': 'A5',
'147-253-55': 'A6', '77-19-35': 'A7', '159-109-79': 'A8', '87-217-111': 'B1', '149-95-154': 'B2',
'97-233-9': 'B3', '159-27-222': 'B4', '152-173-186': 'B5', '192-185-149': 'B6', '158-45-23': 'B7',
'197-158-219': 'B8', '127-144-79': 'C1', '242-41-155': 'C2', '69-67-213': 'D1', '105-25-130': 'D2',
'17-39-170': 'D3', '97-103-203': 'D4', '113-25-77': 'D5', '21-21-140': 'D6', '155-179-166': 'D7',
'55-181-134': 'D8', '61-33-27': 'E1', '51-91-95': 'E2', '143-227-63': 'E3', '216-67-226': 'E4',
'202-181-245': 'E5', '99-11-183': 'E6', '75-119-224': 'E7', '182-19-85': 'E8'}
class SerialManager:
def __init__(self):
self.p1Serial = serial.Serial(COM_PORT, COM_BAUDRATE)
self.settingPacket = bytearray([40, 0, 0, 0, 0, 41])
self.startUp = False
self.recvData = ""
self.touchQueue = queue.Queue()
self.data_lock = threading.Lock()
self.touchThread = threading.Thread(target=self.touch_thread)
self.writeThread = threading.Thread(target=self.write_thread)
self.now_touch_data = b''
self.now_touch_keys = []
self.ping_touch_thread()
def start(self):
print(f"开始监听 {COM_PORT} 串口...")
self.touchThread.start()
self.writeThread.start()
def ping_touch_thread(self):
self.touchQueue.put([self.build_touch_package(exp_list), []])
def touch_thread(self):
while True:
start_time = time.perf_counter()
if self.p1Serial.is_open:
self.read_data(self.p1Serial)
if not self.touchQueue.empty():
# print("touchQueue 不为空,开始执行")
s_temp = self.touchQueue.get()
self.update_touch(s_temp)
# 延迟防止消耗 CPU 时间过长
if TOUCH_THREAD_SLEEP_MODE:
microsecond_sleep(TOUCH_THREAD_SLEEP_DELAY)
# print("单次执行时间:", (time.perf_counter() - start_time) * 1e3, "毫秒")
def write_thread(self):
while True:
# # 延迟匹配波特率
# time.sleep(0.0075) # 9600
# # time.sleep(0.002) # 115200
time.sleep(0.000001) # 避免延迟过大
if not self.startUp:
# print("当前没有启动")
continue
# print(self.now_touch_data)
with self.data_lock:
self.send_touch(self.p1Serial, self.now_touch_data)
def destroy(self):
self.touchThread.join()
self.p1Serial.close()
def read_data(self, ser):
if ser.in_waiting == 6:
self.recvData = ser.read(6).decode()
# print(self.recvData)
self.touch_setup(ser, self.recvData)
def touch_setup(self, ser, data):
byte_data = ord(data[3])
if byte_data in [76, 69]:
self.startUp = False
elif byte_data in [114, 107]:
for i in range(1, 5):
self.settingPacket[i] = ord(data[i])
ser.write(self.settingPacket)
elif byte_data == 65:
self.startUp = True
print("已连接到游戏")
def send_touch(self, ser, data):
ser.write(data)
# def build_touch_package(self, sl):
# sum_list = [0, 0, 0, 0, 0, 0, 0]
# for i in range(len(sl)):
# for j in range(len(sl[i])):
# if sl[i][j] == 1:
# sum_list[i] += (2 ** j)
# s = "28 "
# for i in sum_list:
# s += hex(i)[2:].zfill(2).upper() + " "
# s += "29"
# # print(s)
# return bytes.fromhex(s)
def build_touch_package(self, sl):
sum_list = [sum(2 ** j for j, val in enumerate(row) if val == 1) for row in sl]
hex_list = [hex(i)[2:].zfill(2).upper() for i in sum_list]
s = "28 " + " ".join(hex_list) + " 29"
# print(s)
return bytes.fromhex(s)
def update_touch(self, s_temp):
# if not self.startUp:
# print("当前没有启动")
# return
with self.data_lock:
self.now_touch_data = s_temp[0]
self.send_touch(self.p1Serial, s_temp[0])
self.now_touch_keys = s_temp[1]
print("Touch Keys:", s_temp[1])
# else:
# self.send_touch(self.p2Serial, s_temp[0])
def change_touch(self, sl, touch_keys):
self.touchQueue.put([self.build_touch_package(sl), touch_keys])
def restart_script():
python = sys.executable
script = os.path.abspath(sys.argv[0])
os.execv(python, [python, script])
def microsecond_sleep(sleep_time):
# time.sleep(sleep_time / 1000000)
end_time = time.perf_counter() + (sleep_time - 1.0) / 1e6 # 1.0是时间补偿,需要根据自己PC的性能去实测
while time.perf_counter() < end_time:
pass
def get_colors_in_area(x, y):
colors = set() # 使用集合来存储颜色值,以避免重复
num_points = AREA_POINT_NUM # 要获取的点的数量
angle_increment = 360.0 / num_points # 角度增量
cos_values = [math.cos(math.radians(i * angle_increment)) for i in range(num_points)]
sin_values = [math.sin(math.radians(i * angle_increment)) for i in range(num_points)]
# 处理中心点
if 0 <= x < exp_image_width and 0 <= y < exp_image_height:
colors.add(get_color_name(exp_image.getpixel((x, y))))
# 处理圆上的点
for i in range(num_points):
dx = int(AREA_SCOPE * cos_values[i])
dy = int(AREA_SCOPE * sin_values[i])
px = x + dx
py = y + dy
if 0 <= px < exp_image_width and 0 <= py < exp_image_height:
colors.add(get_color_name(exp_image.getpixel((px, py))))
return list(colors)
def get_color_name(pixel):
return str(pixel[0]) + "-" + str(pixel[1]) + "-" + str(pixel[2])
# def convert(touch_data):
# copy_exp_list = copy.deepcopy(exp_list)
# touch_data_values = list(touch_data.values())
# touch_keys = set()
# touched = 0
# for i in touch_data_values:
# touched += 1
# x = i["x"]
# y = i["y"]
# for rgb_str in get_colors_in_area(x, y):
# if not rgb_str in exp_image_dict:
# continue
# touch_keys.add(exp_image_dict[rgb_str])
# # print("Touched:", touched)
# # print("Touch Keys:", touch_keys)
# touch_keys_list = list(touch_keys)
# for i in range(len(copy_exp_list)):
# for j in range(len(copy_exp_list[i])):
# if copy_exp_list[i][j] in touch_keys_list:
# copy_exp_list[i][j] = 1
# else:
# copy_exp_list[i][j] = 0
# # print(copy_exp_list)
# serial_manager.change_touch(copy_exp_list, touch_keys_list)
def convert(touch_data):
copy_exp_list = copy.deepcopy(exp_list)
touch_keys = set()
for i in touch_data.values():
colors = set(get_colors_in_area(i["x"], i["y"]))
touch_keys.update(exp_image_dict[rgb_str] for rgb_str in colors if rgb_str in exp_image_dict)
touch_keys_list = list(touch_keys)
copy_exp_list = [[1 if item in touch_keys_list else 0 for item in sublist] for sublist in copy_exp_list]
serial_manager.change_touch(copy_exp_list, touch_keys_list)
def get_real_resolution():
hDC = win32gui.GetDC(0)
w = win32print.GetDeviceCaps(hDC, win32con.DESKTOPHORZRES)
h = win32print.GetDeviceCaps(hDC, win32con.DESKTOPVERTRES)
return w, h
def get_screen_size():
w = GetSystemMetrics(0)
h = GetSystemMetrics(1)
if(SCREEN_WIDTH != 0):
w = SCREEN_WIDTH
if(SCREEN_HEIGHT != 0):
h = SCREEN_HEIGHT
return w, h
def getevent():
# 存储多点触控数据的列表
touch_data = {}
pygame.init()
icon = pygame.image.load(icon_path)
pygame.display.set_icon(icon)
screen_width, screen_height = MONITOR_SIZE
screen = pygame.display.set_mode((screen_width, screen_height))
pygame.display.set_caption("maimai-windows-touch-panel")
fuchsia = (128, 128, 128)
hwnd = pygame.display.get_wm_info()["window"]
win32gui.SetWindowLong(hwnd, win32con.GWL_EXSTYLE,
win32gui.GetWindowLong(hwnd, win32con.GWL_EXSTYLE) | win32con.WS_EX_LAYERED)
win32gui.SetLayeredWindowAttributes(hwnd, win32api.RGB(*fuchsia), 1, win32con.LWA_ALPHA)
screen.fill(fuchsia) # 使用透明背景
clock = pygame.time.Clock()
while True:
# start_time = time.perf_counter()
clock.tick(6000)
for event in pygame.event.get():
if event.type == pygame.QUIT:
break
elif event.type == pygame.FINGERDOWN or event.type == pygame.FINGERUP or event.type == pygame.FINGERMOTION:
touch_id = event.finger_id
touch_x, touch_y = event.x * screen_width, event.y * screen_height
clac_touch_x = 0
clac_touch_y = 0
if event.type == pygame.FINGERDOWN or event.type == pygame.FINGERMOTION:
touch_data[str(touch_id)] = {}
if not REVERSE_MONITOR:
clac_touch_x = int(touch_x * x_scale)
clac_touch_y = int(touch_y * y_scale)
else:
clac_touch_x = int((MONITOR_SIZE[0] - touch_x) * x_scale)
clac_touch_y = int((MONITOR_SIZE[1] - touch_y) * y_scale)
touch_data[str(touch_id)]["x"] = clac_touch_x
touch_data[str(touch_id)]["y"] = clac_touch_y
elif event.type == pygame.FINGERUP:
touch_data.pop(str(touch_id))
convert(touch_data)
if not DEBUG:
continue
if event.type == pygame.FINGERDOWN:
print(f"Touch Down: ID={touch_id}, X={clac_touch_x}, Y={clac_touch_y}")
elif event.type == pygame.FINGERUP:
print(f"Touch Up: ID={touch_id}, X={clac_touch_x}, Y={clac_touch_y}")
elif event.type == pygame.FINGERMOTION:
print(f"Touch Motion: ID={touch_id}, X={clac_touch_x}, Y={clac_touch_y}")
# print("单次执行时间:", (time.perf_counter() - start_time) * 1e3, "毫秒")
if __name__ == "__main__":
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
import pygame
yaml_file_path = 'config.yaml'
if len(sys.argv) > 1:
yaml_file_path = sys.argv[1]
if os.path.isfile(yaml_file_path):
print("使用配置文件:", yaml_file_path)
with open(yaml_file_path, 'r', encoding='utf-8') as file:
c = yaml.safe_load(file)
DEBUG = c["DEBUG"]
IMAGE_PATH = c["IMAGE_PATH"]
COM_PORT = c["COM_PORT"]
COM_BAUDRATE = c["COM_BAUDRATE"]
AREA_SCOPE = c["AREA_SCOPE"]
AREA_POINT_NUM = c["AREA_POINT_NUM"]
REVERSE_MONITOR = c["REVERSE_MONITOR"]
TOUCH_THREAD_SLEEP_MODE = c["TOUCH_THREAD_SLEEP_MODE"]
TOUCH_THREAD_SLEEP_DELAY = c["TOUCH_THREAD_SLEEP_DELAY"]
exp_image_dict = c["exp_image_dict"]
SCREEN_WIDTH = c["SCREEN_WIDTH"]
SCREEN_HEIGHT = c["SCREEN_HEIGHT"]
else:
print("未找到配置文件, 使用默认配置")
MONITOR_SIZE = get_screen_size()
exp_image = Image.open(IMAGE_PATH)
exp_image_width, exp_image_height = exp_image.size
x_scale = exp_image_width / MONITOR_SIZE[0]
y_scale = exp_image_height / MONITOR_SIZE[1]
print(f"定位图路径: {IMAGE_PATH}")
print(f"定位图大小: [{exp_image_width}, {exp_image_height}]")
print(f"触摸屏幕大小: {MONITOR_SIZE}")
print(f"X轴缩放比例: {x_scale}")
print(f"Y轴缩放比例: {y_scale}")
print(('已' if REVERSE_MONITOR else '未') + "开启屏幕反转")
serial_manager = SerialManager()
serial_manager.start()
threading.Thread(target=getevent).start()
while True:
input_str = input().strip()
if len(input_str) == 0:
continue
if input_str == 'start':
serial_manager.startUp = True
print("已连接到游戏")
elif input_str == 'reverse':
REVERSE_MONITOR = not REVERSE_MONITOR
print("已" + ('开启' if REVERSE_MONITOR else '关闭') + "屏幕反转")
elif input_str == 'restart':
restart_script()
else:
print("未知的输入")