-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathconnect.py
125 lines (108 loc) · 5.3 KB
/
connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import asyncio
import bilibiliCilent
from task import Task
import time
import printer
def CurrentTime():
currenttime = int(time.time())
return currenttime
class connect():
__slots__ = ('danmuji', 'roomid')
instance = None
def __new__(cls, roomid=None):
if not cls.instance:
cls.instance = super(connect, cls).__new__(cls)
cls.instance.danmuji = None
cls.instance.roomid = roomid
return cls.instance
async def run(self):
self.danmuji = bilibiliCilent.DanmuPrinter(self.roomid, 0)
while True:
print('# 正在启动直播监控弹幕姬')
time_start = int(CurrentTime())
connect_results = await self.danmuji.connectServer()
# print(connect_results)
if not connect_results:
continue
task_main = asyncio.ensure_future(self.danmuji.ReceiveMessageLoop())
task_heartbeat = asyncio.ensure_future(self.danmuji.HeartbeatLoop())
finished, pending = await asyncio.wait([task_main, task_heartbeat], return_when=asyncio.FIRST_COMPLETED)
print('主弹幕姬异常或主动断开,正在处理剩余信息')
time_end = int(CurrentTime())
if not task_heartbeat.done():
task_heartbeat.cancel()
task_terminate = asyncio.ensure_future(self.danmuji.close_connection())
await asyncio.wait(pending)
await asyncio.wait([task_terminate])
printer.info(['主弹幕姬退出,剩余任务处理完毕'], True)
if time_end - time_start < 5:
print('# 当前网络不稳定,为避免频繁不必要尝试,将自动在5秒后重试')
await asyncio.sleep(5)
async def reconnect(self, roomid):
self.roomid = roomid
print('已经切换roomid')
if self.danmuji is not None:
self.danmuji.roomid = roomid
await self.danmuji.close_connection()
class RaffleConnect():
def __init__(self, areaid):
self.danmuji = None
self.roomid = None
self.areaid = areaid
async def run(self):
self.danmuji = bilibiliCilent.DanmuRaffleHandler(self.roomid, self.areaid)
while True:
self.danmuji.roomid = await Task().call_right_now('get_one', self.areaid)
printer.info(['# 正在启动抽奖监控弹幕姬'], True)
time_start = int(CurrentTime())
connect_results = await self.danmuji.connectServer()
# print(connect_results)
if not connect_results:
continue
task_main = asyncio.ensure_future(self.danmuji.ReceiveMessageLoop())
task_heartbeat = asyncio.ensure_future(self.danmuji.HeartbeatLoop())
task_checkarea = asyncio.ensure_future(self.danmuji.CheckArea())
finished, pending = await asyncio.wait([task_main, task_heartbeat, task_checkarea], return_when=asyncio.FIRST_COMPLETED)
printer.info([f'{self.areaid}号弹幕姬异常或主动断开,正在处理剩余信息'], True)
time_end = int(CurrentTime())
if not task_heartbeat.done():
task_heartbeat.cancel()
if not task_checkarea.done():
task_checkarea.cancel()
task_terminate = asyncio.ensure_future(self.danmuji.close_connection())
await asyncio.wait(pending)
await asyncio.wait([task_terminate])
printer.info([f'{self.areaid}号弹幕姬退出,剩余任务处理完毕'], True)
if time_end - time_start < 5:
print('# 当前网络不稳定,为避免频繁不必要尝试,将自动在5秒后重试')
await asyncio.sleep(5)
class YjConnection():
def __init__(self, roomid):
self.danmuji = None
self.roomid = roomid
self.areaid = -1
async def run(self):
if not self.roomid:
return
self.danmuji = bilibiliCilent.YjMonitorHandler(self.roomid, self.areaid)
while True:
print('# 正在启动直播监控弹幕姬')
time_start = int(CurrentTime())
connect_results = await self.danmuji.connectServer()
# print(connect_results)
if not connect_results:
continue
task_main = asyncio.ensure_future(self.danmuji.ReceiveMessageLoop())
task_heartbeat = asyncio.ensure_future(self.danmuji.HeartbeatLoop())
finished, pending = await asyncio.wait([task_main, task_heartbeat], return_when=asyncio.FIRST_COMPLETED)
print('主弹幕姬异常或主动断开,正在处理剩余信息')
time_end = int(CurrentTime())
if not task_heartbeat.done():
task_heartbeat.cancel()
task_terminate = asyncio.ensure_future(self.danmuji.close_connection())
await asyncio.wait(pending)
await asyncio.wait([task_terminate])
printer.info(['主弹幕姬退出,剩余任务处理完毕'], True)
if time_end - time_start < 5:
print('# 当前网络不稳定,为避免频繁不必要尝试,将自动在5秒后重试')
await asyncio.sleep(5)