From fd5649de4d8497d8853f45c0c01d4d7d0d47a604 Mon Sep 17 00:00:00 2001 From: jiangzhh Date: Fri, 26 Apr 2024 23:34:54 +0800 Subject: [PATCH] fix: tags --- .gitignore | 3 +- README.md | 6 +- bot.py | 271 ++++++++++++++++++++++++++-------------------------- database.py | 27 +++--- 4 files changed, 152 insertions(+), 155 deletions(-) diff --git a/.gitignore b/.gitignore index 594c1376..83b8f448 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ __pycache__ .vscode .DS_Store .idea -.venv \ No newline at end of file +.venv +*.sqlite \ No newline at end of file diff --git a/README.md b/README.md index 8f637209..ffe3db91 100644 --- a/README.md +++ b/README.md @@ -72,18 +72,18 @@ Finally, run the bot: (files such as records and logs are located in `~/.tg_sear ```sh # op1. docker-compose docker-compose up -d -# op2. simple way (Python >=3.10) +# op2. simple way (Python >=3.9) pip install -r requirements.txt && python3 bot.py ``` ## Development -I use python-3.10.9 for development. Please use python <= 3.10 for development. In addition, it is recommended to use python virtual environment development to avoid unnecessary problems. The following are my development steps for reference only: +I use python-3.9.13 for development. Please use python <= 3.9 for development. In addition, it is recommended to use python virtual environment development to avoid unnecessary problems. The following are my development steps for reference only: ```shell git clone https://github.com/akynazh/tg-search-bot.git cd tg-search-bot -~/.pyenv/versions/3.10.9/bin/python -m venv .venv +~/.pyenv/versions/3.9.13/bin/python -m venv .venv source ./.venv/bin/activate pip3 install -r requirements.txt ``` diff --git a/bot.py b/bot.py index b8bacaa4..24dbdbcd 100644 --- a/bot.py +++ b/bot.py @@ -35,8 +35,12 @@ apihelper.proxy = BOT_CFG.proxy_json BOT = telebot.TeleBot(BOT_CFG.tg_bot_token) BOT_DB = BotFileDb(PATH_RECORD_FILE) -BOT_CACHE_DB = BotCacheDb(host=BOT_CFG.redis_host, port=BOT_CFG.redis_port, - password=BOT_CFG.redis_password, use_cache=BOT_CFG.use_cache) +BOT_CACHE_DB = BotCacheDb( + host=BOT_CFG.redis_host, + port=BOT_CFG.redis_port, + password=BOT_CFG.redis_password, + use_cache=BOT_CFG.use_cache, +) BASE_UTIL = jvav.BaseUtil(BOT_CFG.proxy_addr) DMM_UTIL = jvav.DmmUtil(BOT_CFG.proxy_addr_dmm) JAVBUS_UTIL = jvav.JavBusUtil(BOT_CFG.proxy_addr) @@ -163,7 +167,7 @@ def send_msg_fail_reason_op(self, reason: str, op: str): 执行结果: 失败, {reason} Q_Q""" ) - def check_success(self, code: int, op: str) -> bool: + def check_success(self, code: int, op: str): """检查状态码, 确认请求是否成功 :param int code: 状态码 @@ -180,7 +184,7 @@ def check_success(self, code: int, op: str) -> bool: self.send_msg_code_op(code=502, op=op) return False - def create_btn_by_key(self, key_type: str, obj) -> InlineKeyboardButton: + def create_btn_by_key(self, key_type: str, obj): """根据按钮种类创建按钮 :param str key_type: 按钮种类 @@ -202,14 +206,14 @@ def create_btn_by_key(self, key_type: str, obj) -> InlineKeyboardButton: ) def send_msg_btns( - self, - max_btn_per_row: int, - max_row_per_msg: int, - key_type: str, - title: str, - objs: list, - extra_btns=[], - page_btns=[], + self, + max_btn_per_row: int, + max_row_per_msg: int, + key_type: str, + title: str, + objs: list, + extra_btns=[], + page_btns=[], ): """发送按钮消息 @@ -255,8 +259,8 @@ def send_msg_btns( self.send_msg(msg=title, markup=markup) def get_page_elements( - self, objs: list, page: int, col: int, row: int, key_type: str - ) -> tuple[list, list, str]: + self, objs: list, page: int, col: int, row: int, key_type: str + ): """获取当前页对象列表, 分页按钮列表, 数量标题 :param list objs: 所有对象 @@ -280,7 +284,7 @@ def get_page_elements( page = page_count # 获取当前页对象字典 start_idx = (page - 1) * record_count_per_page - objs = objs[start_idx: start_idx + record_count_per_page] + objs = objs[start_idx : start_idx + record_count_per_page] # 获取按键列表 if page == 1: to_previous = 1 @@ -290,8 +294,7 @@ def get_page_elements( to_next = page_count else: to_next = page + 1 - btn_to_first = InlineKeyboardButton( - text="<<", callback_data=f"1:{key_type}") + btn_to_first = InlineKeyboardButton(text="<<", callback_data=f"1:{key_type}") btn_to_previous = InlineKeyboardButton( text="<", callback_data=f"{to_previous}:{key_type}" ) @@ -312,7 +315,7 @@ def get_page_elements( title, ) - def check_if_enable_nsfw(self) -> bool: + def check_if_enable_nsfw(self): if BOT_CFG.enable_nsfw == "0": self.send_msg("[NSFW] FORBIDDEN!") return False @@ -321,7 +324,9 @@ def check_if_enable_nsfw(self) -> bool: def get_stars_record(self, page=1): record, is_star_exists, _ = BOT_DB.check_has_record() if not record or not is_star_exists: - self.send_msg_fail_reason_op(reason="尚无演员收藏记录", op="获取演员收藏记录") + self.send_msg_fail_reason_op( + reason="尚无演员收藏记录", op="获取演员收藏记录" + ) return stars = record["stars"] stars.reverse() @@ -346,8 +351,10 @@ def get_stars_record(self, page=1): def get_star_detail_record_by_name_id(self, star_name: str, star_id: str): record, is_stars_exists, is_avs_exists = BOT_DB.check_has_record() if not record: - self.send_msg(reason="尚无该演员收藏记录", - op=f"获取演员 {star_name} 的更多信息") + self.send_msg( + reason="尚无该演员收藏记录", + op=f"获取演员 {star_name} 的更多信息", + ) return avs = [] star_avs = [] @@ -404,7 +411,9 @@ def get_star_detail_record_by_name_id(self, star_name: str, star_id: str): def get_avs_record(self, page=1): record, _, is_avs_exists = BOT_DB.check_has_record() if not record or not is_avs_exists: - self.send_msg_fail_reason_op(reason="尚无番号收藏记录", op="获取番号收藏记录") + self.send_msg_fail_reason_op( + reason="尚无番号收藏记录", op="获取番号收藏记录" + ) return avs = [av["id"] for av in record["avs"]] avs.reverse() @@ -451,84 +460,78 @@ def get_av_detail_record_by_id(self, id: str): markup.row(btn) self.send_msg(msg=f"{id}", markup=markup) - def search_bts(self, q) -> list[dict[str, str | Any]] | None: - def append_trackers() -> str: + def search_bts(self, q): + def append_trackers(): """Returns the base tracker list""" trackers = [ - 'udp://tracker.coppersurfer.tk:6969/announce', - 'udp://tracker.openbittorrent.com:6969/announce', - 'udp://9.rarbg.to:2710/announce', - 'udp://9.rarbg.me:2780/announce', - 'udp://9.rarbg.to:2730/announce', - 'udp://tracker.opentrackr.org:1337', - 'http://p4p.arenabg.com:1337/announce', - 'udp://tracker.torrent.eu.org:451/announce', - 'udp://tracker.tiny-vps.com:6969/announce', - 'udp://open.stealth.si:80/announce', + "udp://tracker.coppersurfer.tk:6969/announce", + "udp://tracker.openbittorrent.com:6969/announce", + "udp://9.rarbg.to:2710/announce", + "udp://9.rarbg.me:2780/announce", + "udp://9.rarbg.to:2730/announce", + "udp://tracker.opentrackr.org:1337", + "http://p4p.arenabg.com:1337/announce", + "udp://tracker.torrent.eu.org:451/announce", + "udp://tracker.tiny-vps.com:6969/announce", + "udp://open.stealth.si:80/announce", ] trackers = [quote(tr) for tr in trackers] - return '&tr='.join(trackers) + return "&tr=".join(trackers) - def category_name(category) -> str: + def category_name(category): """Translates the category code to a name""" - names = [ - '', - 'audio', - 'video', - 'apps', - 'games', - 'nsfw', - 'other' - ] + names = ["", "audio", "video", "apps", "games", "nsfw", "other"] category = int(category[0]) category = category if category < len(names) - 1 else -1 return names[category] - def size_as_str(size) -> str: + def size_as_str(size): """Formats the file size in bytes to kb, mb or gb accordingly""" size = int(size) size_str = f"{size} b" if size >= 1024: size_str = f"{(size / 1024):.2f} kb" - if size >= 1024 ** 2: + if size >= 1024**2: size_str = f"{(size / 1024 ** 2):.2f} mb" - if size >= 1024 ** 3: + if size >= 1024**3: size_str = f"{(size / 1024 ** 3):.2f} gb" return size_str - def magnet_link(ih, name) -> str: + def magnet_link(ih, name): """Creates the magnet URI""" - return f'magnet:?xt=urn:btih:{ih}&dn={quote(name)}&tr={append_trackers()}' + return f"magnet:?xt=urn:btih:{ih}&dn={quote(name)}&tr={append_trackers()}" agent = BASE_UTIL.ua() url = f"https://apibay.org/q.php?q={quote(q)}" - results = get(url, headers={'agent': agent}) + results = get(url, headers={"agent": agent}) if not results.status_code == 200: return None matches = [] data = results.json() - if data and 'no results' in data[0]['name'].lower(): + if data and "no results" in data[0]["name"].lower(): return matches for d in data: - matches.append({ - 'seeders': d['seeders'], - 'leechers': d['leechers'], - 'name': d['name'], - 'category': category_name(d['category']), - 'size': size_as_str(d['size']), - 'magnet': magnet_link(d['info_hash'], d['name']), - }) + matches.append( + { + "seeders": d["seeders"], + "leechers": d["leechers"], + "name": d["name"], + "category": category_name(d["category"]), + "size": size_as_str(d["size"]), + "magnet": magnet_link(d["info_hash"], d["name"]), + } + ) return matches def get_av_by_id( - self, - id: str, - send_to_pikpak=False, - is_nice=True, - is_uncensored=True, - magnet_max_count=3, - not_send=False, - ) -> Optional[dict]: + self, + id: str, + send_to_pikpak=False, + is_nice=True, + is_uncensored=True, + magnet_max_count=3, + not_send=False, + ): """根据番号获取 av :param str id: 番号 @@ -550,9 +553,9 @@ def get_av_by_id( if not av or not_send: with concurrent.futures.ThreadPoolExecutor() as executor: if not not_send: - futures[ - executor.submit(DMM_UTIL.get_score_by_id, id) - ] = 0 # 获取 av 评分 + futures[executor.submit(DMM_UTIL.get_score_by_id, id)] = ( + 0 # 获取 av 评分 + ) futures[ executor.submit( JAVBUS_UTIL.get_av_by_id, @@ -596,8 +599,7 @@ def get_av_by_id( key=id, value=av, type=BotCacheDb.TYPE_AV, expire=3600 * 24 * 1 ) else: - BOT_CACHE_DB.set_cache( - key=id, value=av, type=BotCacheDb.TYPE_AV) + BOT_CACHE_DB.set_cache(key=id, value=av, type=BotCacheDb.TYPE_AV) else: av_score = av["score"] is_cache = True @@ -686,8 +688,8 @@ def get_av_by_id( ) msg += stars_msg # 标签 - if av_tags != "": - av_tags = av_tags.replace("<", "").replace(">", "") + if av_tags: + av_tags = " ".join(av_tags).replace("<", "").replace(">", "") msg += f"""【标签】{av_tags} """ # 其它 @@ -722,7 +724,8 @@ def get_av_by_id( text="截图", callback_data=f"{av_id}:{BotKey.KEY_GET_SAMPLE_BY_ID}" ) more_btn = InlineKeyboardButton( - text="更多磁链", callback_data=f"{av_id}:{BotKey.KEY_GET_MORE_MAGNETS_BY_ID}" + text="更多磁链", + callback_data=f"{av_id}:{BotKey.KEY_GET_MORE_MAGNETS_BY_ID}", ) if len(av_magnets) != 0: markup = InlineKeyboardMarkup().row(sample_btn, pv_btn, fv_btn, more_btn) @@ -800,7 +803,9 @@ def send_magnet_to_pikpak(self, magnet: str, id: str): :param str magnet: 磁链 :param str id: 磁链对应的番号 """ - op_send_magnet_to_pikpak = f"发送番号 {id} 的磁链 A: {magnet} 到 pikpak" + op_send_magnet_to_pikpak = ( + f"发送番号 {id} 的磁链 A: {magnet} 到 pikpak" + ) if self.send_msg_to_pikpak(magnet): self.send_msg_success_op(op_send_magnet_to_pikpak) else: @@ -816,8 +821,7 @@ def get_sample_by_id(self, id: str): code, samples = JAVBUS_UTIL.get_samples_by_id(id) if not self.check_success(code, op_get_sample): return - BOT_CACHE_DB.set_cache(key=id, value=samples, - type=BotCacheDb.TYPE_SAMPLE) + BOT_CACHE_DB.set_cache(key=id, value=samples, type=BotCacheDb.TYPE_SAMPLE) # 发送图片列表 samples_imp = [] sample_error = False @@ -825,18 +829,17 @@ def get_sample_by_id(self, id: str): samples_imp.append(InputMediaPhoto(sample)) if len(samples_imp) == 10: # 图片数目达到 10 张则发送一次 try: - BOT.send_media_group( - chat_id=BOT_CFG.tg_chat_id, media=samples_imp) + BOT.send_media_group(chat_id=BOT_CFG.tg_chat_id, media=samples_imp) samples_imp = [] except Exception: sample_error = True self.send_msg_fail_reason_op( - reason="图片解析失败", op=op_get_sample) + reason="图片解析失败", op=op_get_sample + ) break if samples_imp != [] and not sample_error: try: - BOT.send_media_group( - chat_id=BOT_CFG.tg_chat_id, media=samples_imp) + BOT.send_media_group(chat_id=BOT_CFG.tg_chat_id, media=samples_imp) except Exception: self.send_msg_fail_reason_op(reason="图片解析失败", op=op_get_sample) @@ -877,8 +880,7 @@ def watch_av_by_id(self, id: str, type: str): from_site = "avgle" pv_src = pv_avgle pv_cache = {"from_site": from_site, "src": pv_src} - BOT_CACHE_DB.set_cache( - key=id, value=pv_cache, type=BotCacheDb.TYPE_PV) + BOT_CACHE_DB.set_cache(key=id, value=pv_cache, type=BotCacheDb.TYPE_PV) else: from_site = pv["from_site"] pv_src = pv["src"] @@ -916,8 +918,7 @@ def watch_av_by_id(self, id: str, type: str): if code != 200: self.send_msg(f"MissAv 视频地址: {BASE_URL_MISS_AV}/{id}") return - BOT_CACHE_DB.set_cache( - key=id, value=video, type=BotCacheDb.TYPE_FV) + BOT_CACHE_DB.set_cache(key=id, value=video, type=BotCacheDb.TYPE_FV) self.send_msg( f"""MissAv 视频地址: {BASE_URL_MISS_AV}/{id} @@ -925,7 +926,7 @@ def watch_av_by_id(self, id: str, type: str): """ ) - def search_star_by_name(self, star_name: str) -> bool: + def search_star_by_name(self, star_name: str): if not self.check_if_enable_nsfw(): return False op_search_star = f"搜索演员 {star_name}" @@ -936,8 +937,7 @@ def search_star_by_name(self, star_name: str) -> bool: code, star = JAVBUS_UTIL.check_star_exists(star_name) if not self.check_success(code, op_search_star): return - BOT_CACHE_DB.set_cache( - key=star_name, value=star, type=BotCacheDb.TYPE_STAR) + BOT_CACHE_DB.set_cache(key=star_name, value=star, type=BotCacheDb.TYPE_STAR) if star_name_origin != star_name: BOT_CACHE_DB.set_cache( key=star_name_origin, @@ -947,8 +947,7 @@ def search_star_by_name(self, star_name: str) -> bool: star_id = star["star_id"] star_name = star["star_name"] if BOT_DB.check_star_exists_by_id(star_id=star_id): - self.get_star_detail_record_by_name_id( - star_name=star_name, star_id=star_id) + self.get_star_detail_record_by_name_id(star_name=star_name, star_id=star_id) return True markup = InlineKeyboardMarkup() markup.row( @@ -986,11 +985,9 @@ def get_top_stars(self, page=1): code, stars = DMM_UTIL.get_top_stars(page) if not self.check_success(code, op_get_top_stars): return - BOT_CACHE_DB.set_cache(key=page, value=stars, - type=BotCacheDb.TYPE_RANK) + BOT_CACHE_DB.set_cache(key=page, value=stars, type=BotCacheDb.TYPE_RANK) stars_tmp = [None] * 80 - stars = stars_tmp[: ((page - 1) * 20)] + stars + \ - stars_tmp[((page - 1) * 20):] + stars = stars_tmp[: ((page - 1) * 20)] + stars + stars_tmp[((page - 1) * 20) :] col, row = 4, 5 objs, page_btns, title = self.get_page_elements( objs=stars, page=page, col=4, row=5, key_type=BotKey.KEY_GET_TOP_STARS @@ -1009,10 +1006,10 @@ def send_msg_to_pikpak(self, msg): async def send(): try: async with Client( - name=PATH_SESSION_FILE, - api_id=BOT_CFG.tg_api_id, - api_hash=BOT_CFG.tg_api_hash, - proxy=BOT_CFG.proxy_json_pikpak, + name=PATH_SESSION_FILE, + api_id=BOT_CFG.tg_api_id, + api_hash=BOT_CFG.tg_api_hash, + proxy=BOT_CFG.proxy_json_pikpak, ) as app: return await app.send_message(PIKPAK_BOT_NAME, msg) except Exception as e: @@ -1030,8 +1027,7 @@ def get_more_magnets_by_id(self, id: str): if not av: return magnets = av["magnets"] - BOT_CACHE_DB.set_cache(key=id, value=magnets, - type=BotCacheDb.TYPE_MAGNET) + BOT_CACHE_DB.set_cache(key=id, value=magnets, type=BotCacheDb.TYPE_MAGNET) msg = "" for magnet in magnets: magnet_tags = "" @@ -1056,8 +1052,7 @@ def get_more_magnets_by_id(self, id: str): def get_star_new_avs_by_name_id(self, star_name: str, star_id: str): op_get_star_new_avs = f"获取 {star_name} 最新 av" - ids = BOT_CACHE_DB.get_cache( - key=star_id, type=BotCacheDb.TYPE_NEW_AVS_OF_STAR) + ids = BOT_CACHE_DB.get_cache(key=star_id, type=BotCacheDb.TYPE_NEW_AVS_OF_STAR) if not ids: code, ids = JAVBUS_UTIL.get_new_ids_by_star_id(star_id=star_id) if not self.check_success(code, op_get_star_new_avs): @@ -1080,7 +1075,7 @@ def get_star_new_avs_by_name_id(self, star_name: str, star_id: str): markup.row(*btns[4:]) self.send_msg(msg=title, markup=markup) - def get_star_ja_name_by_zh_name(self, star_name: str) -> str: + def get_star_ja_name_by_zh_name(self, star_name: str): if langdetect.detect(star_name) == "ja": return star_name star_ja_name = BOT_CACHE_DB.get_cache( @@ -1113,13 +1108,18 @@ def send_bts(self, q, bts): 磁链: {bt['magnet']} """ - self.send_msg(res, markup=InlineKeyboardMarkup().row( - InlineKeyboardButton("转到 PikPak 云盘", url=f"{BASE_URL_TG}/{PIKPAK_BOT_NAME}"))) + self.send_msg( + res, + markup=InlineKeyboardMarkup().row( + InlineKeyboardButton( + "转到 PikPak 云盘", url=f"{BASE_URL_TG}/{PIKPAK_BOT_NAME}" + ) + ), + ) def random_get_new_av(self): page = random.randint(1, JAVLIB_UTIL.MAX_RANK_PAGE) - ids = BOT_CACHE_DB.get_cache( - key=page, type=BotCacheDb.TYPE_JLIB_PAGE_NEW_AVS) + ids = BOT_CACHE_DB.get_cache(key=page, type=BotCacheDb.TYPE_JLIB_PAGE_NEW_AVS) if not ids: code, ids = JAVLIB_UTIL.get_random_ids_from_rank_by_page( page=page, list_type=1 @@ -1136,8 +1136,7 @@ def random_get_new_av(self): def random_get_nice_av(self): page = random.randint(1, JAVLIB_UTIL.MAX_RANK_PAGE) - ids = BOT_CACHE_DB.get_cache( - key=page, type=BotCacheDb.TYPE_JLIB_PAGE_NICE_AVS) + ids = BOT_CACHE_DB.get_cache(key=page, type=BotCacheDb.TYPE_JLIB_PAGE_NICE_AVS) if not ids: code, ids = JAVLIB_UTIL.get_random_ids_from_rank_by_page( page=page, list_type=0 @@ -1158,8 +1157,7 @@ def random_get_nice_star_avs(self, star_name_ori): ) if not avs: star_name_ja = self.get_star_ja_name_by_zh_name(star_name_ori) - code, avs = DMM_UTIL.get_nice_avs_by_star_name( - star_name=star_name_ja) + code, avs = DMM_UTIL.get_nice_avs_by_star_name(star_name=star_name_ja) if self.check_success(code, f"获取演员 {star_name_ori} 的高分 av"): avs = avs[:60] BOT_CACHE_DB.set_cache( @@ -1190,7 +1188,7 @@ def handle_callback(call): LOG.info(f"处理回调: {call.data}") s = call.data.rfind(":") content = call.data[:s] - key_type = call.data[s + 1:] + key_type = call.data[s + 1 :] if key_type == BotKey.KEY_WATCH_PV_BY_ID: bot_utils.watch_av_by_id(id=content, type=0) elif key_type == BotKey.KEY_WATCH_FV_BY_ID: @@ -1204,18 +1202,19 @@ def handle_callback(call): star_name = tmp[0] star_id = tmp[1] code, id = JAVBUS_UTIL.get_id_by_star_id(star_id=star_id) - if bot_utils.check_success(code, f"随机获取演员 {star_name} 的 av"): + if bot_utils.check_success( + code, f"随机获取演员 {star_name} 的 av" + ): bot_utils.get_av_by_id(id=id) elif key_type == BotKey.KEY_GET_NEW_AVS_BY_STAR_NAME_ID: tmp = content.split("|") star_name = tmp[0] star_id = tmp[1] - bot_utils.get_star_new_avs_by_name_id( - star_name=star_name, star_id=star_id) + bot_utils.get_star_new_avs_by_name_id(star_name=star_name, star_id=star_id) elif key_type == BotKey.KEY_RECORD_STAR_BY_STAR_NAME_ID: s = content.find("|") star_name = content[:s] - star_id = content[s + 1:] + star_id = content[s + 1 :] if BOT_DB.record_star_by_name_id(star_name=star_name, star_id=star_id): bot_utils.get_star_detail_record_by_name_id( star_name=star_name, star_id=star_id @@ -1239,7 +1238,7 @@ def handle_callback(call): elif key_type == BotKey.KEY_GET_STAR_DETAIL_RECORD_BY_STAR_NAME_ID: s = content.find("|") bot_utils.get_star_detail_record_by_name_id( - star_name=content[:s], star_id=content[s + 1:] + star_name=content[:s], star_id=content[s + 1 :] ) elif key_type == BotKey.KEY_GET_AV_DETAIL_RECORD_BY_ID: bot_utils.get_av_detail_record_by_id(id=content) @@ -1259,21 +1258,23 @@ def handle_callback(call): bot_utils.send_msg_success_op(op_undo_record_av) else: bot_utils.send_msg_fail_reason_op( - reason="文件解析出错", op=op_undo_record_av) + reason="文件解析出错", op=op_undo_record_av + ) elif key_type == BotKey.KEY_UNDO_RECORD_STAR_BY_STAR_NAME_ID: s = content.find("|") op_undo_record_star = f"取消收藏演员 {content[:s]}" - if BOT_DB.undo_record_star_by_id(star_id=content[s + 1:]): + if BOT_DB.undo_record_star_by_id(star_id=content[s + 1 :]): bot_utils.send_msg_success_op(op_undo_record_star) else: bot_utils.send_msg_fail_reason_op( - reason="文件解析出错", op=op_undo_record_star) + reason="文件解析出错", op=op_undo_record_star + ) elif key_type == BotKey.KEY_SEARCH_STAR_BY_NAME: star_name = content star_name_alias = "" idx_alias = star_name.find("(") if idx_alias != -1: - star_name_alias = star_name[idx_alias + 1: -1] + star_name_alias = star_name[idx_alias + 1 : -1] star_name = star_name[:idx_alias] if not bot_utils.search_star_by_name(star_name) and star_name_alias != "": bot_utils.send_msg(f"尝试搜索演员{star_name}的别名{star_name_alias}......") @@ -1324,11 +1325,12 @@ def handle_message(message): bot_utils.get_avs_record() elif msg_cmd == "/record": if not os.path.exists(PATH_RECORD_FILE): - bot_utils.send_msg_fail_reason_op(reason="尚无收藏记录", op="获取收藏记录文件") + bot_utils.send_msg_fail_reason_op( + reason="尚无收藏记录", op="获取收藏记录文件" + ) return BOT.send_document( - chat_id=BOT_CFG.tg_chat_id, document=types.InputFile( - PATH_RECORD_FILE) + chat_id=BOT_CFG.tg_chat_id, document=types.InputFile(PATH_RECORD_FILE) ) elif msg_cmd == "/rank": bot_utils.get_top_stars(1) @@ -1351,13 +1353,10 @@ def handle_message(message): bot_utils.send_msg_fail_reason_op(reason="未找到结果", op=op) return if BOT_CFG.enable_nsfw == "0": - bts = list( - filter(lambda bt: bt['category'] != "nsfw", bts)) - bts = list( - filter(lambda bt: "nsfw" not in bt['name'].lower(), bts)) + bts = list(filter(lambda bt: bt["category"] != "nsfw", bts)) + bts = list(filter(lambda bt: "nsfw" not in bt["name"].lower(), bts)) bts = bts[:5] - BOT_CACHE_DB.set_cache( - key=msg, value=bts, type=BotCacheDb.TYPE_BT) + BOT_CACHE_DB.set_cache(key=msg, value=bts, type=BotCacheDb.TYPE_BT) bot_utils.send_bts(msg, bts) else: ids = [id.lower() for id in ids] @@ -1365,8 +1364,7 @@ def handle_message(message): ids_msg = ", ".join(ids) bot_utils.send_msg(f"检测到番号: {ids_msg}, 开始搜索......") for i, id in enumerate(ids): - threading.Thread(target=bot_utils.get_av_by_id, - args=(id,)).start() + threading.Thread(target=bot_utils.get_av_by_id, args=(id,)).start() @BOT.callback_query_handler(func=lambda call: True) @@ -1397,8 +1395,7 @@ def main(): except Exception as e: LOG.error(f"无法连接到机器人: {e}") return - BOT.set_my_commands([types.BotCommand(cmd, BOT_CMDS[cmd]) - for cmd in BOT_CMDS]) + BOT.set_my_commands([types.BotCommand(cmd, BOT_CMDS[cmd]) for cmd in BOT_CMDS]) BOT.infinity_polling() diff --git a/database.py b/database.py index ef7d3855..fe34664b 100644 --- a/database.py +++ b/database.py @@ -11,7 +11,7 @@ def __init__(self, path_record_file: str): self.path_record_file = path_record_file pass - def check_has_record(self) -> tuple[dict, bool, bool]: + def check_has_record(self): """检查是否有收藏记录, 如果有则返回记录 :return tuple[dict, bool, bool]: 收藏记录, 演员记录是否存在, 番号记录是否存在 @@ -33,16 +33,16 @@ def check_has_record(self) -> tuple[dict, bool, bool]: is_stars_exists = False is_avs_exists = False if ( - "stars" in record.keys() - and record["stars"] != [] - and len(record["stars"]) > 0 + "stars" in record.keys() + and record["stars"] != [] + and len(record["stars"]) > 0 ): is_stars_exists = True if "avs" in record.keys() and record["avs"] != [] and len(record["avs"]) > 0: is_avs_exists = True return record, is_stars_exists, is_avs_exists - def check_star_exists_by_id(self, star_id: str) -> bool: + def check_star_exists_by_id(self, star_id: str): """根据演员 id 确认收藏记录中演员是否存在 :param str star_id: 演员 id @@ -56,7 +56,7 @@ def check_star_exists_by_id(self, star_id: str) -> bool: if star["id"].lower() == star_id.lower(): return True - def check_id_exists(self, id: str) -> bool: + def check_id_exists(self, id: str): """根据番号确认收藏记录中番号是否存在 :param str id: 番号 @@ -70,7 +70,7 @@ def check_id_exists(self, id: str) -> bool: if av["id"].lower() == id.lower(): return True - def renew_record(self, record: dict) -> bool: + def renew_record(self, record: dict): """更新记录 :param dict record: 新的记录 @@ -86,7 +86,7 @@ def renew_record(self, record: dict) -> bool: LOG.error(f"更新收藏记录文件失败: {e}") return False - def record_star_by_name_id(self, star_name: str, star_id: str) -> bool: + def record_star_by_name_id(self, star_name: str, star_id: str): """记录演员 :param str star_name: 演员名称 @@ -111,7 +111,7 @@ def record_star_by_name_id(self, star_name: str, star_id: str) -> bool: record["stars"] = stars return self.renew_record(record) - def record_id_by_id_stars(self, id: str, stars: list) -> bool: + def record_id_by_id_stars(self, id: str, stars: list): """记录番号 :param str id: 番号 @@ -136,7 +136,7 @@ def record_id_by_id_stars(self, id: str, stars: list) -> bool: record["avs"] = avs return self.renew_record(record) - def undo_record_star_by_id(self, star_id: str) -> bool: + def undo_record_star_by_id(self, star_id: str): """取消收藏演员 :param str star_id: 演员id @@ -160,7 +160,7 @@ def undo_record_star_by_id(self, star_id: str) -> bool: return self.renew_record(record) return True - def undo_record_id(self, id: str) -> bool: + def undo_record_id(self, id: str): """取消收藏番号 :param str id: 番号 @@ -235,8 +235,7 @@ class BotCacheDb: "prefix": "jlib-page-new-avs-", "expire": 3600 * 24 * 2, } - CACHE_STAR_JA_NAME = {"prefix": "star-ja-name-", - "expire": 3600 * 24 * 30 * 6} + CACHE_STAR_JA_NAME = {"prefix": "star-ja-name-", "expire": 3600 * 24 * 30 * 6} CACHE_NEW_AVS_OF_STAR = { "prefix": "new-avs-of-star-", "expire": 3600 * 24 * 12, @@ -340,7 +339,7 @@ def set_cache(self, key: str, value, type: int, expire=None): except Exception as e: LOG.error(f"设置缓存: {cache_key} 失败: {e}") - def get_cache(self, key, type: int) -> any: + def get_cache(self, key, type: int): """获取缓存 :param str key: 键