-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathcheckIn_ZhangFei_All.py
899 lines (763 loc) · 34.5 KB
/
checkIn_ZhangFei_All.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
'''
new Env('掌上飞车全能版(多线程)')
cron: 10 0 * * *
Author : BNDou
Date : 2025-01-09 01:38:32
LastEditTime : 2025-01-13 20:39:25
FilePath : /Auto_Check_In/checkIn_ZhangFei_All.py
Description : 掌上飞车签到+购物+寻宝一体化脚本(多线程)
抓包说明:
(推荐方式)
开启抓包-进入签到页面-等待上方账号信息加载出来-停止抓包
选请求这个url的包-https://speed.qq.com/cp/
添加环境变量说明:
1. 基础Cookie变量(必需):
COOKIE_ZHANGFEI,多账户用 回车 或 && 分开
Cookie参数说明:
1. 基础参数(必需):
roleId=QQ号; userId=掌飞社区ID号; accessToken=xxx; appid=xxx; openid=xxx; areaId=xxx; token=xxx; speedqqcomrouteLine=xxx;
2. 功能控制参数(可选,默认均为false):
enable_signin=true/false; - 签到功能
enable_shopping=true/false; - 购物功能
enable_treasure=true/false; - 寻宝功能
3. 购物功能参数(enable_shopping=true时必需):
shopName=xxx; - 要购买的商品名称(掌飞商城里面的全称)
giftPackId=1-6; - 月签20和25天礼包选择,默认1
'''
import calendar
import datetime
import json
import os
import re
import sys
import threading
import time
from urllib.parse import unquote
import requests
try:
from utils.sendNotify import send
except Exception as err:
print(f'%s\n❌加载通知服务失败~' % err)
# 全局变量
isvip = 0 # 紫钻状态
class ZhangFeiUser:
"""掌上飞车用户类"""
def __init__(self, cookie_str):
self.user_data = {}
self.progress = 0 # 添加进度属性
self.status = "" # 添加状态描述
# 解析cookie字符串到user_data
for item in cookie_str.replace(" ", "").split(';'):
if item:
key, value = item.split('=')
self.user_data[key] = unquote(value)
# 设置功能控制参数默认值
self.user_data.setdefault('enable_signin', 'false')
self.user_data.setdefault('enable_shopping', 'false')
self.user_data.setdefault('enable_treasure', 'false')
self.user_data.setdefault('giftPackId', '1')
def is_feature_enabled(self, feature):
"""检查功能是否启用"""
return self.user_data.get(f'enable_{feature}', 'false').lower() == 'true'
def get_account_info(self):
"""获取账号显示信息"""
area_name = '电信区' if self.user_data.get('areaId') == '1' else '联通区' if self.user_data.get('areaId') == '2' else '电信2区'
return f"🚗账号 {self.user_data.get('roleId')} {area_name}"
def check_token(self, feature=""):
"""检查token是否有效"""
url = "https://bang.qq.com/app/speed/treasure/index"
params = {
"roleId": self.user_data.get('roleId'),
"areaId": self.user_data.get('areaId'),
"uin": self.user_data.get('roleId')
}
try:
response = requests.get(url, params=params)
if "登录态失效" in response.text:
print(f"❌账号{self.user_data.get('roleId')}登录失效,请重新获取token")
return False
return True
except Exception as e:
print(f"❌检查token时发生错误: {e}")
return False
class SignIn:
"""签到功能类"""
def __init__(self, user):
self.user = user
def get_sign_info(self):
"""获取签到信息"""
try:
flow = requests.get(
f"https://speed.qq.com/cp/{self.user.user_data['speedqqcomrouteLine']}/index.js"
)
html = flow.text
# 解析签到信息
flow_strings = re.findall(r"Milo.emit\(flow_(\d+)\)", html)
self.user.user_data.update({
"total_id": flow_strings[1],
"week_signIn": flow_strings[2:10],
"month_SignIn": flow_strings[10:15],
"task_id": flow_strings[-5:],
"iActivityId": re.findall(r"actId: '(\d+)'", html)[0]
})
return True
except Exception as e:
print(f"❌获取签到信息失败: {e}")
return False
def commit(self, sData):
"""提交签到信息"""
url = f"https://comm.ams.game.qq.com/ams/ame/amesvr?iActivityId={self.user.user_data.get('iActivityId')}"
headers = {
'Cookie': f"acctype=qc; access_token={self.user.user_data.get('accessToken')}; appid={self.user.user_data.get('appid')}; openid={self.user.user_data.get('openid')};"
}
# 根据不同类型设置iFlowId
if sData[0] == "witchDay": # 累计信息
iFlowId = self.user.user_data.get('total_id')
elif sData[0] == "signIn": # 签到
iFlowId = self.user.user_data.get('week_signIn')[datetime.datetime.now().weekday()]
elif sData[0] == "number": # 补签
iFlowId = self.user.user_data.get('week_signIn')[-1]
elif sData[0] == "giftPackId": # 月签
iFlowId = self.user.user_data.get('month_SignIn')[sData[-1]]
elif sData[0] == "task_id": # 任务
iFlowId = self.user.user_data.get('task_id')[sData[1]]
data = {
"curl_userId": self.user.user_data.get('userId'),
"curl_qq": self.user.user_data.get('roleId'),
"curl_token": self.user.user_data.get('token'),
"iActivityId": self.user.user_data.get('iActivityId'),
"iFlowId": iFlowId,
"g_tk": "1842395457",
"sServiceType": "speed",
sData[0]: sData[1]
}
response = requests.post(url, headers=headers, data=data)
response.encoding = "utf-8"
return response.json()
def get_out_value(self):
"""获取累计信息"""
ret = self.commit(['witchDay', (datetime.datetime.now().weekday() + 1)])
if ret['ret'] == '101':
print(f"❌账号{self.user.user_data.get('roleId')}登录失败,请检查账号信息是否正确")
return False
elif ret['ret'] == '700':
print(f"❌账号{self.user.user_data.get('roleId')} 请确认 token 是否有效")
return False
modRet = ret['modRet']
# 本周已签到天数
self.user.user_data["weekSignIn"] = modRet['sOutValue5']
# 周补签(资格剩余)
if (datetime.datetime.now().weekday() + 1) < 3:
weekSupplementarySignature = "0"
else:
weekBuqian = modRet['sOutValue7'].split(',')
if int(weekBuqian[1]) == 1:
weekSupplementarySignature = "0" # 已经使用资格
else:
weekSupplementarySignature = "1" if int(weekBuqian[0]) >= 3 else "0"
self.user.user_data["weekSupplementarySignature"] = weekSupplementarySignature
# 周补签状态
self.user.user_data["weekStatue"] = modRet['sOutValue2'].split(',')
# 本月已签到天数
monthSignIn = modRet['sOutValue4']
if int(monthSignIn) > 25:
monthSignIn = "25"
self.user.user_data["monthSignIn"] = monthSignIn
# 月签(资格剩余)
self.user.user_data["monthStatue"] = modRet['sOutValue1'].split(',')
return True
def sign_in(self):
"""签到"""
msg = ""
try:
ret = self.commit(['signIn', ''])
log = str(ret['modRet']['sMsg']) if ret['ret'] == '0' else str(ret['flowRet']['sMsg'])
if "网络故障" in log:
log = f"❌今日{datetime.datetime.now().strftime('{}月%d日').format(datetime.datetime.now().month)} 星期{datetime.datetime.now().weekday() + 1} 已签到"
elif "非常抱歉,请先登录!" in log:
log = f"❌今日{datetime.datetime.now().strftime('{}月%d日').format(datetime.datetime.now().month)} 星期{datetime.datetime.now().weekday() + 1} 非常抱歉,请先登录!"
else:
log = f"✅今日{datetime.datetime.now().strftime('{}月%d日').format(datetime.datetime.now().month)} 星期{datetime.datetime.now().weekday() + 1} {log}"
except Exception as err:
log = f"❌签到失败~{err}"
return log + "\n"
def week_supplementary_signature(self):
"""补签"""
msg = ""
try:
if self.user.user_data.get('weekSupplementarySignature') == "1":
for index, value in enumerate(self.user.user_data.get('weekStatue')):
if value == "1":
if (datetime.datetime.now().weekday() + 1) > index + 1:
# 补签
ret = self.commit(['number', (index + 1)])
log = str(ret['modRet']['sMsg']) if ret['ret'] == '0' else str(ret['flowRet']['sMsg'])
msg += f"✅补签:{log}\n"
except Exception as err:
msg = f"❌补签失败~{err}\n"
print(msg)
return msg
def month_sign_in(self):
"""获取月签礼包"""
msg = ""
for index, day in enumerate([5, 10, 15, 20, 25]):
if int(self.user.user_data.get('monthSignIn')) >= day:
if int(self.user.user_data.get('monthStatue')[index]) == 0:
# 如果未设置,默认领取第一个礼包
giftPackId = self.user.user_data.get('giftPackId', '1')
# 领取礼包
ret = self.commit(['giftPackId', giftPackId, index])
log = str(ret['modRet']['sMsg']) if ret['ret'] == '0' else str(ret['flowRet']['sMsg'])
log = f"✅累计签到{day}天:{log}"
msg += log + '\n'
return msg
def browse_backpack(self):
"""日常任务:浏览背包"""
url = f"https://mwegame.qq.com/yoyo/dnf/phpgameproxypass"
data = {
"uin": self.user.user_data.get('roleId'),
"userId": self.user.user_data.get('userId'),
"areaId": self.user.user_data.get('areaId'),
"token": self.user.user_data.get('token'),
"service": "dnf_getspeedknapsack",
"cGameId": "1003",
}
response = requests.post(url=url, data=data)
response.encoding = "utf-8"
return True if response.json()['returnMsg'] == '' else False
def task_gift(self):
"""日常任务:领取奖励"""
msg = ""
for index in range(len(self.user.user_data.get('task_id'))):
ret = self.commit(['task_id', index])
if ret['ret'] == '0':
log = str(ret['modRet']['sMsg'])
log = f"✅日常任务{index + 1}:{log}"
msg += log + '\n'
else:
log = str(ret['flowRet']['sMsg'])
log = f"❌日常任务{index + 1}:{log}"
return msg
def execute(self):
"""执行签到任务"""
msg = "签到\n"
# 获取签到信息
if not self.get_sign_info():
return msg + "❌获取签到信息失败\n"
# 获取累计信息
if not self.get_out_value():
return msg + "❌获取累计信息失败\n"
# 执行签到
msg += self.sign_in()
# 输出签到统计
msg += (f"本周签到{self.user.user_data.get('weekSignIn')}/7天,"
f"本月签到{self.user.user_data.get('monthSignIn')}/25天,"
f"有{self.user.user_data.get('weekSupplementarySignature')}天可补签\n")
# 补签
week_msg = self.week_supplementary_signature()
if week_msg:
msg += week_msg
# 领取月签奖励
month_msg = self.month_sign_in()
if month_msg:
msg += month_msg
# 日常任务:浏览背包
if self.browse_backpack():
msg += "✅日常任务:浏览背包成功\n"
# 日常任务:领取奖励
task_msg = self.task_gift()
if task_msg:
msg += task_msg
return msg
class Shopping:
"""购物功能类"""
def __init__(self, user):
self.user = user
def get_pack_info(self):
"""获取点券、消费券信息"""
url = f"https://bang.qq.com/app/speed/mall/main2"
params = {
'uin': self.user.user_data.get('roleId'),
'userId': self.user.user_data.get('userId'),
'areaId': self.user.user_data.get('areaId'),
'token': self.user.user_data.get('token'),
}
response = requests.get(url, params=params)
response.encoding = "utf-8"
try:
return {
'money': re.findall(r'<b id="super_money">(\d+)<', response.text)[0],
'coupons': re.findall(r'<b id="coupons">(\d+)<', response.text)[0]
}
except IndexError:
print("❌获取点券、消费券信息失败")
return None
def process_data(self, input_dict):
"""格式化道具信息"""
global isvip
if isvip > 0:
vip_discount = input_dict["iMemeberRebate"]
else:
vip_discount = input_dict["iCommonRebate"]
output_dict = {}
price_idx = {}
item = input_dict["szItems"][0]
# 准备工作:去除可能的逗号结尾
if item.get("ItemNum") == "":
item["ItemAvailPeriod"] = item["ItemAvailPeriod"][:-1]
# 对每个项目数量或可用期限和价格执行逻辑
item_array = item["ItemNum"].split(',') if item.get(
"ItemNum") else item["ItemAvailPeriod"].split(',')
# 构建 price_idx 词典信息
for index, value in enumerate(item_array):
if value:
key = value if item.get(
"ItemNum") else "99999999" if value == "-1" else str(
int(value) / 24)
item_price = input_dict["szPrices"][index]["SuperMoneyPrice"]
price_idx[key] = {
"index": str(index), # 价格索引
"price": str(int(item_price) * int(vip_discount) // 100)
}
# 构建最终结果对象,包括单位信息
output_dict[input_dict["szName"]] = {
"commodity_id": input_dict["iId"],
"price_idx": sorted(price_idx.items(),
key=lambda x: int(x[0]) if item.get("ItemNum") else float(x[0]),
reverse=True), # 道具可购买数量和价格由高到低排序
"unit": "个" if item.get("ItemNum") else "天" # 根据 ItemNum 存在与否确定单位
}
return output_dict
def search_shop(self):
"""搜索商品信息"""
url = f"https://bang.qq.com/app/speed/mall/search"
params = {
"uin": self.user.user_data.get('roleId'),
"userId": self.user.user_data.get('userId'),
"token": self.user.user_data.get('token'),
"start": "0",
"paytype": "1", # 按点券筛选
"order": "2", # 按点券筛选
"text": self.user.user_data.get('shopName')
}
headers = {"Referer": "https://bang.qq.com/app/speed/mall/main2"}
response = requests.post(url, params=params, headers=headers)
response.encoding = "utf-8"
if len(response.json()['data']) == 1:
return self.process_data(response.json()['data'][0])
return None
def get_shop_items(self, item_data, purse):
"""根据当前余额和道具价格生成购物列表"""
# 判断是否是月末
is_last_day = datetime.datetime.now().day == calendar.monthrange(
datetime.datetime.now().year, datetime.datetime.now().month)[1]
# 计算可用余额
money = (int(purse['money']) + int(purse['coupons'])) if is_last_day else int(purse['coupons'])
total = 0
shop_array = []
for item in item_data:
i = 0
while i < len(item_data[item]['price_idx']):
# 商品数量索引
shop_idx = item_data[item]['price_idx'][i][0]
# 如果购买的商品可以购买永久且当前余额可以购买永久
if (item_data[item]['price_idx'][i][0] == "99999999" and
money > int(item_data[item]['price_idx'][i][1]['price'])):
shop_array.append({
"name": item,
"count": "99999999",
"commodity_id": item_data[item]['commodity_id'],
"price_idx": shop_idx
})
item_data[item]['unit'] = "永久"
break
# 计算当前余额可以购买的最大道具数量
max_counts = money // int(item_data[item]['price_idx'][i][1]['price'])
if type(item_data[item]['price_idx'][i][0]) == str:
total += max_counts * int(float(item_data[item]['price_idx'][i][0]))
else:
total += max_counts * int(item_data[item]['price_idx'][i][0])
money -= max_counts * int(item_data[item]['price_idx'][i][1]['price'])
if max_counts:
# 将可购买的道具添加到购物列表
for _ in range(max_counts):
shop_array.append({
"name": item,
"count": item_data[item]['price_idx'][i][0],
"commodity_id": item_data[item]['commodity_id'],
"price_idx": item_data[item]['price_idx'][i][1]['index']
})
# 处理余额不足情况
if (money < int(item_data[item]['price_idx'][-1][1]['price']) and
not is_last_day and
money / int(item_data[item]['price_idx'][-1][1]['price']) > 0.5):
price_diff = int(item_data[item]['price_idx'][-1][1]['price']) - money
if price_diff < int(purse['money']):
total += int(item_data[item]['price_idx'][-1][0])
money = 0
shop_array.append({
"name": item,
"count": item_data[item]['price_idx'][-1][0],
"commodity_id": item_data[item]['commodity_id'],
"price_idx": item_data[item]['price_idx'][-1][1]['index']
})
i += 1
return shop_array, total, item_data[item]['unit']
def purchase(self, buy_info):
"""购买道具"""
url = "https://bang.qq.com/app/speed/mall/getPurchase"
headers = {"Referer": "https://bang.qq.com/app/speed/mall/detail2"}
data = {
"uin": self.user.user_data.get('roleId'),
"userId": self.user.user_data.get('userId'),
"areaId": self.user.user_data.get('areaId'),
"token": self.user.user_data.get('token'),
"pay_type": "1",
"commodity_id": buy_info['commodity_id'],
"price_idx": buy_info['price_idx']
}
# 延迟400毫秒执行,防止频繁
time.sleep(0.4)
response = requests.post(url, headers=headers, data=data)
response.encoding = "utf-8"
if "恭喜购买成功" in response.json()['msg']:
return int(buy_info['count']) if buy_info['count'] != "99999999" else 99999999
else:
print(f"❌{response.json()['msg']}")
return 0
def execute(self):
"""执行购物任务"""
msg = "购物\n"
# 检查是否设置了商品名称
if not self.user.user_data.get('shopName'):
return msg + "❌未设置shopName参数,请在cookie中添加shopName=商品名称\n"
# 获取余额信息
purse = self.get_pack_info()
if not purse:
return msg + "❌获取余额信息失败\n"
msg += f"💰当前余额: {purse['money']}点券 {purse['coupons']}消费券\n"
# 搜索商品信息
item_data = self.search_shop()
if not item_data:
msg += f"❌检测道具“{self.user.user_data.get('shopName')}”在商店中未售卖或不唯一\n"
return msg
# 生成购物列表
shop_array, total, unit = self.get_shop_items(item_data, purse)
if shop_array:
msg += f"✅预计可购买 {'' if total == 0 else total} {unit} {self.user.user_data.get('shopName')}\n"
success_buy_counts = 0
# 执行购买
for buy_info in shop_array:
success_buy_counts += self.purchase(buy_info)
# 统计购买结果
failed_buy_counts = total - (1 if success_buy_counts == 99999999 else success_buy_counts)
if success_buy_counts > 0:
success_buy_counts = "" if success_buy_counts == 99999999 else success_buy_counts
msg += f"✅成功购买 {success_buy_counts} {unit} {self.user.user_data.get('shopName')}\n"
if failed_buy_counts > 0:
msg += f"❌未购买成功 {failed_buy_counts} {unit}\n"
else:
msg += f"❌全部购买失败,共计 {total} {unit}\n"
else:
is_last_day = datetime.datetime.now().day == calendar.monthrange(datetime.datetime.now().year, datetime.datetime.now().month)[1]
msg += f"✅{'本月余额' if is_last_day else '今日消费券'}不足以购买 {self.user.user_data.get('shopName')}\n"
# 获取剩余余额
purse = self.get_pack_info()
msg += f"💰剩余 {purse['money']}点券 {purse['coupons']}消费券\n"
return msg
class TreasureHunt:
"""寻宝功能类"""
def __init__(self, user):
self.user = user
self.lock = threading.RLock()
def get_treasure_info(self):
"""获取寻宝信息"""
def extract(_html, _pattern):
match = re.search(_pattern, _html)
if match:
return json.loads(re.sub(r'^\((.*)\)$', r'\1', match.group(1)))
return None
url = "https://bang.qq.com/app/speed/treasure/index"
params = {
"roleId": self.user.user_data.get('roleId'),
"areaId": self.user.user_data.get('areaId'),
"uin": self.user.user_data.get('roleId')
}
response = requests.get(url, params=params)
response.encoding = 'utf-8'
# 获取用户信息
user_info = extract(response.text, r'window\.userInfo\s*=\s*eval\(\'([^\']+)\'\);')
if not user_info:
print("❌未找到用户信息")
return None
# 获取剩余寻宝次数
left_times = re.search(r'id="leftTimes">(\d+)</i>', response.text)
if not left_times:
print("❌未找到剩余寻宝次数")
return None
# 获取地图信息
map_info = extract(response.text, r'window\.mapInfo\s*=\s*eval\(\'([^\']+)\'\);')
if not map_info:
print("❌未找到地图信息")
return None
# 获取最高星级
star_info = user_info.get('starInfo', {})
star_keys = [key for key, value in star_info.items() if value == 1]
if not star_keys:
print("❌未找到已解锁的星级地图")
return None
star_id = max(star_keys)
# 返回整理后的信息
return {
'vip_flag': bool(user_info.get('vip_flag')),
'left_times': left_times.group(1),
'star_id': star_id,
'map_info': map_info
}
def get_treasure(self, iFlowId):
"""领取奖励"""
url = "https://act.game.qq.com/ams/ame/amesvr?ameVersion=0.3&iActivityId=468228"
headers = {
"Cookie": f"access_token={self.user.user_data.get('accessToken')}; acctype=qc; appid={self.user.user_data.get('appid')}; openid={self.user.user_data.get('openid')}"
}
data = {
'appid': self.user.user_data.get('appid'),
'sArea': self.user.user_data.get('areaId'),
'sRoleId': self.user.user_data.get('roleId'),
'accessToken': self.user.user_data.get('accessToken'),
'iActivityId': "468228",
'iFlowId': iFlowId,
'g_tk': '1842395457',
'sServiceType': 'bb'
}
response = requests.post(url, headers=headers, data=data)
response.encoding = "utf-8"
if response.json()['ret'] == '0':
return f"✅{response.json()['modRet']['sPackageName']}"
return '❌非常抱歉,您还不满足参加该活动的条件!'
def dig(self, status):
"""寻宝操作"""
url = f"https://bang.qq.com/app/speed/treasure/ajax/{status}DigTreasure"
headers = {
"Referer": "https://bang.qq.com/app/speed/treasure/index",
"Cookie": f"access_token={self.user.user_data.get('accessToken')}; acctype=qc; appid={self.user.user_data.get('appid')}; openid={self.user.user_data.get('openid')}"
}
data = {
"mapId": self.user.user_data.get('mapId'),
"starId": self.user.user_data.get('starId'),
"areaId": self.user.user_data.get('areaId'),
"type": self.user.user_data.get('type'),
"roleId": self.user.user_data.get('roleId'),
"userId": self.user.user_data.get('userId'),
"uin": self.user.user_data.get('roleId'),
"token": self.user.user_data.get('token')
}
response = requests.post(url, headers=headers, data=data)
return False if response.json()['res'] == 0 else True
def execute(self):
"""执行寻宝任务"""
msg = "寻宝\n"
# 获取寻宝信息
info = self.get_treasure_info()
if not info:
return msg + "❌获取寻宝信息失败\n"
# 更新用户数据
self.user.user_data.update({
'type': 2 if info['vip_flag'] else 1,
'starId': info['star_id']
})
# 获取今日大吉地图
luck_maps = [item for item in info['map_info'][info['star_id']] if item.get('isdaji') == 1]
if not luck_maps:
return msg + "❌未找到今日大吉地图\n"
self.user.user_data['mapId'] = luck_maps[0]['id']
# 输出基本信息
msg += f"💎紫钻用户:{'是' if info['vip_flag'] else '否'}\n"
msg += f"⭐最高地图解锁星级:{info['star_id']}\n"
msg += f"🌏今日大吉地图是[{luck_maps[0]['name']}]-地图ID是[{luck_maps[0]['id']}]\n"
msg += f"⏰剩余寻宝次数:{info['left_times']}\n"
# 星级地图对应的iFlowId
iFlowId_dict = {
'1': ['856152', '856155'],
'2': ['856156', '856157'],
'3': ['856158', '856159'],
'4': ['856160', '856161'],
'5': ['856162', '856163'],
'6': ['856164', '856165']
}
if info['left_times'] != "0":
# 每日5次寻宝
for n in range(5):
with self.lock:
# 开始寻宝
if self.dig('start'):
msg += f"❌第{n+1}次寻宝...对不起,当天的寻宝次数已用完\n"
break
# 寻宝倒计时
if self.user.user_data['type'] == 2:
time.sleep(10)
else:
time.sleep(600)
# 结束寻宝
self.dig('end')
# 领取奖励
for iflowid in iFlowId_dict[info['star_id']]:
reward = self.get_treasure(iflowid)
msg += reward + "\n"
else:
msg += "❌对不起,当天的寻宝次数已用完\n"
return msg
def update_progress(users):
"""更新并显示所有账号的进度"""
# 用于跟踪哪些账号已经显示过完成状态
shown_completed = set()
while True:
# 清除控制台
os.system('cls' if os.name == 'nt' else 'clear')
all_completed = True
any_output = False
for user in users:
if user.progress < 100:
# 显示未完成的账号
account_info = user.get_account_info()
progress_bar = '=' * int(user.progress / 2) + '>' + ' ' * (50 - int(user.progress / 2))
print(f"{account_info}")
print(f"[{progress_bar}] {user.progress}% {user.status}")
# print()
all_completed = False
any_output = True
elif user.progress == 100 and user not in shown_completed:
# 显示刚完成的账号(仅一次)
account_info = user.get_account_info()
progress_bar = '=' * 50
print(f"{account_info}")
print(f"[{progress_bar}] 100% {user.status}")
# print()
shown_completed.add(user)
any_output = True
# 如果没有任何输出,显示完成信息
if not any_output:
print("所有账号任务已完成!")
if all_completed:
break
time.sleep(1)
def process_account(user, msg_dict, user_index):
"""处理单个账号的任务"""
try:
# 初始化账号消息
account_msg = f"\n{user.get_account_info()}\n"
msg_dict[user_index] = [account_msg] # 使用列表存储当前账号的所有消息
# 检查token
if not user.check_token():
user.progress = 100
user.status = "登录失效"
msg_dict[user_index].append("❌登录失效,请重新获取token\n")
return
# 检查功能启用状态
enabled_features = []
# 检查签到功能
if user.is_feature_enabled('signin'):
enabled_features.append('signin')
else:
msg_dict[user_index].append("❌未启用签到功能,请在cookie中添加enable_signin=true;\n")
# 检查购物功能
if user.is_feature_enabled('shopping'):
enabled_features.append('shopping')
else:
msg_dict[user_index].append("❌未启用购物功能,请在cookie中添加enable_shopping=true;\n")
# 检查寻宝功能
if user.is_feature_enabled('treasure'):
enabled_features.append('treasure')
else:
msg_dict[user_index].append("❌未启用寻宝功能,请在cookie中添加enable_treasure=true;\n")
if not enabled_features:
user.progress = 100
user.status = "未启用功能"
return
# 显示已启用的功能
feature_names = {
'signin': '签到',
'shopping': '购物',
'treasure': '寻宝'
}
enabled_msg = "✅已启用功能:" + "、".join([feature_names[f] for f in enabled_features])
print(f"{user.get_account_info()}\n{enabled_msg}\n")
# 如果启用了购物功能,检查必需的参数
if 'shopping' in enabled_features and not user.user_data.get('shopName'):
msg = "❌已启用购物功能但未设置shopName参数\n"
msg_dict[user_index].append(msg)
user.progress = 100
user.status = "购物参数缺失"
return
total_features = len(enabled_features)
progress_per_feature = 100 / total_features
# 执行签到
if 'signin' in enabled_features:
user.status = "正在签到..."
sign_in = SignIn(user)
sign_msg = sign_in.execute()
msg_dict[user_index].append(sign_msg)
user.progress += progress_per_feature
# 执行购物
if 'shopping' in enabled_features:
user.status = "正在购物..."
shopping = Shopping(user)
shop_msg = shopping.execute()
msg_dict[user_index].append(shop_msg)
user.progress += progress_per_feature
# 执行寻宝
if 'treasure' in enabled_features:
user.status = "正在寻宝..."
treasure = TreasureHunt(user)
treasure_msg = treasure.execute()
msg_dict[user_index].append(treasure_msg)
user.progress = 100
user.status = "任务完成"
msg_dict[user_index].append("="*30 + "\n")
except Exception as e:
if user_index not in msg_dict:
msg_dict[user_index] = [account_msg]
msg_dict[user_index].append(f"❌执行出错: {str(e)}\n")
user.status = f"执行出错: {str(e)}"
user.progress = 100
def main():
"""主函数"""
# 获取环境变量
if "COOKIE_ZHANGFEI" not in os.environ:
print('❌未添加COOKIE_ZHANGFEI变量')
return
cookie_list = re.split('\n|&&', os.environ.get('COOKIE_ZHANGFEI'))
print(f"✅检测到共{len(cookie_list)}个飞车账号")
# 创建用户对象列表
users = [ZhangFeiUser(cookie) for cookie in cookie_list]
# 创建线程安全的消息字典
msg_dict = {}
# 创建并启动进度显示线程
progress_thread = threading.Thread(target=update_progress, args=(users,))
progress_thread.daemon = True
progress_thread.start()
# 创建并启动账号处理线程
threads = []
for i, user in enumerate(users):
t = threading.Thread(target=process_account, args=(user, msg_dict, i))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
# 等待进度显示线程完成最后一次更新
time.sleep(1)
# 按账号顺序合并消息
all_msg = ""
for i in range(len(users)):
if i in msg_dict:
all_msg += "".join(msg_dict[i])
# 发送通知
try:
send('掌上飞车全能版', all_msg)
except Exception as err:
print(f'❌发送通知失败: {err}')
if __name__ == "__main__":
print("----------掌上飞车全能版开始运行----------")
main()
print("----------掌上飞车全能版运行完毕----------")