Skip to content

Commit

Permalink
新的开始
Browse files Browse the repository at this point in the history
  • Loading branch information
djkcyl committed May 25, 2022
0 parents commit 2b069c5
Show file tree
Hide file tree
Showing 66 changed files with 14,381 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[flake8]
exclude =
test.py
max-line-length = 120
statistics = True
ignore =
W503,
E203
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
__pycache__/
library/browser/data

data/login_cache.json
data/subscription_list.json
data/white_list.json
data/bot_config.yaml
data/subscription_list copy.json
logs/
661 changes: 661 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BOT_Status = {
"liveing": [],
"offset": 0,
"init": False,
}
62 changes: 62 additions & 0 deletions core/bot_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import yaml

from pathlib import Path
from loguru import logger
from pydantic import AnyHttpUrl


class NoAliasDumper(yaml.SafeDumper):
def ignore_aliases(self, data):
return True


bot_config_file = Path("data/bot_config.yaml")
if bot_config_file.exists():
bot_config = yaml.load(bot_config_file.read_bytes(), Loader=yaml.FullLoader)
else:
logger.error("未找到配置文件,请检查配置文件(data/bot_group.yaml)是否存在")
exit()

if bot_config["master"] not in bot_config["admins"]:
logger.warning("管理员内未添加主人,已自动添加")
bot_config["admins"].append(bot_config["master"])


class BotConfig:
class Mirai:
account: int = bot_config["mirai"]["account"]
verify_key: str = bot_config["mirai"]["verify_key"]
mirai_host: AnyHttpUrl = bot_config["mirai"]["mirai_host"]

class Debug:
enable: bool = bot_config["debug"]["enable"]
groups: list[int] = bot_config["debug"]["groups"]

class Bilibili:
username: int = bot_config["bilibili"]["username"]
password: str = bot_config["bilibili"]["password"]

class Event:
mute: bool = bot_config["event"]["mute"]
permchange: bool = bot_config["event"]["permchange"]

name: str = bot_config["name"]
master: int = bot_config["master"]
admins: list[int] = bot_config["admins"]
access_control: bool = bot_config["access_control"]


def add_admin(admin: int):
if admin not in bot_config["admins"]:
bot_config["admins"].append(admin)
bot_config_file.write_text(yaml.dump(bot_config, Dumper=NoAliasDumper))


def remove_admin(admin: int):
if admin in bot_config["admins"]:
bot_config["admins"].remove(admin)
bot_config_file.write_text(yaml.dump(bot_config, Dumper=NoAliasDumper))


def save_config():
bot_config_file.write_text(yaml.dump(bot_config, Dumper=NoAliasDumper))
180 changes: 180 additions & 0 deletions core/control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# coding=utf-8
"""
Xenon 管理 https://github.com/McZoo/Xenon/blob/master/lib/control.py
"""

import time

from asyncio import Lock
from typing import Optional
from graia.saya import Channel
from collections import defaultdict
from typing import DefaultDict, Set, Tuple, Union
from graia.ariadne.model import Member, MemberPerm
from graia.broadcast.exceptions import ExecutionStop
from graia.ariadne.event.message import GroupMessage
from graia.broadcast.builtin.decorators import Depend

from core.bot_config import BotConfig

channel = Channel.current()


class Permission:
"""
用于管理权限的类,不应被实例化
"""

MASTER = 30
GROUP_ADMIN = 20
USER = 10
BANNED = 0
DEFAULT = USER

@classmethod
def get(cls, member: Union[Member, int]) -> int:
"""
获取用户的权限
:param user: 用户实例或QQ号
:return: 等级,整数
"""

if isinstance(member, Member):
user = member.id
user_permission = member.permission
if isinstance(member, int):
user = member
user_permission = cls.DEFAULT

if user == 80000000:
raise ExecutionStop()

if user in BotConfig.admins:
return cls.MASTER
elif user in []:
return cls.BANNED
elif user_permission in [MemberPerm.Administrator, MemberPerm.Owner]:
return cls.GROUP_ADMIN
else:
return cls.DEFAULT

@classmethod
def require(cls, level: int = DEFAULT) -> Depend:
"""
指示需要 `level` 以上等级才能触发,默认为至少 USER 权限
:param level: 限制等级
"""

def perm_check(event: GroupMessage):
member_level = cls.get(event.sender)

if (
member_level < cls.GROUP_ADMIN
and member_level >= level
and BotConfig.Debug.enable
and event.sender.group.id not in BotConfig.Debug.groups
or member_level < cls.GROUP_ADMIN
and member_level < level
):
raise ExecutionStop()

return Depend(perm_check)

@classmethod
def manual(cls, member: Member, level: int = DEFAULT) -> Depend:

member_level = cls.get(member.id)

if (
member_level < cls.GROUP_ADMIN
and member_level >= level
and BotConfig.Debug.enable
and member.group.id not in BotConfig.Debug.groups
or member_level < cls.GROUP_ADMIN
and member_level < level
):
raise ExecutionStop()


class Interval:
"""
用于冷却管理的类,不应被实例化
"""

last_exec: DefaultDict[int, Tuple[int, float]] = defaultdict(lambda: (1, 0.0))
sent_alert: Set[int] = set()
lock: Optional[Lock] = None

@classmethod
async def get_lock(cls):
if not cls.lock:
cls.lock = Lock()
return cls.lock

@classmethod
def require(
cls,
suspend_time: float = 10,
max_exec: int = 1,
override_level: int = Permission.MASTER,
):
"""
指示用户每执行 `max_exec` 次后需要至少相隔 `suspend_time` 秒才能再次触发功能
等级在 `override_level` 以上的可以无视限制
:param suspend_time: 冷却时间
:param max_exec: 在再次冷却前可使用次数
:param override_level: 可超越限制的最小等级
"""

async def cd_check(event: GroupMessage):
if Permission.get(event.sender) >= override_level:
return
current = time.time()
async with (await cls.get_lock()):
last = cls.last_exec[event.sender.id]
if current - cls.last_exec[event.sender.id][1] >= suspend_time:
cls.last_exec[event.sender.id] = (1, current)
if event.sender.id in cls.sent_alert:
cls.sent_alert.remove(event.sender.id)
return
elif last[0] < max_exec:
cls.last_exec[event.sender.id] = (last[0] + 1, current)
if event.sender.id in cls.sent_alert:
cls.sent_alert.remove(event.sender.id)
return
if event.sender.id not in cls.sent_alert:
cls.sent_alert.add(event.sender.id)
raise ExecutionStop()

return Depend(cd_check)

@classmethod
async def manual(
cls,
member: Union[Member, int],
suspend_time: float = 10,
max_exec: int = 1,
override_level: int = Permission.MASTER,
):
if Permission.get(member) >= override_level:
return
current = time.time()
async with (await cls.get_lock()):
last = cls.last_exec[member]
if current - cls.last_exec[member][1] >= suspend_time:
cls.last_exec[member] = (1, current)
if member in cls.sent_alert:
cls.sent_alert.remove(member)
return
elif last[0] < max_exec:
cls.last_exec[member] = (last[0] + 1, current)
if member in cls.sent_alert:
cls.sent_alert.remove(member)
return
if member not in cls.sent_alert:
cls.sent_alert.add(member)
raise ExecutionStop()
39 changes: 39 additions & 0 deletions core/group_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import json
from pathlib import Path
from typing import Union
from graia.ariadne.model import Group

from .bot_config import BotConfig


grouplist_file = Path("data/white_list.json")


if grouplist_file.exists():
data = json.loads(grouplist_file.read_text(encoding="utf-8"))
whitelist = data["white"]
else:
whitelist = []
grouplist_file.write_text(json.dumps({"white": []}, indent=2))


class GroupPermission:
def __init__(self, group: Union[int, Group]):
self.group_id = group.id if isinstance(group, Group) else int(group)

def can_join(self):
return self.group_id in whitelist if BotConfig.access_control else True

def add_to_whitelist(self):
if self.group_id in whitelist:
return False
whitelist.append(self.group_id)
grouplist_file.write_text(json.dumps({"white": whitelist}, indent=2))
return True

def remove_from_whitelist(self):
if self.group_id not in whitelist:
return False
whitelist.remove(self.group_id)
grouplist_file.write_text(json.dumps({"white": whitelist}, indent=2))
return True
19 changes: 19 additions & 0 deletions data/bot_config.exp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
mirai:
mirai_host: http://xxxxxxx:xxxx
verify_key: xxxxxxx
account: xxxxxxx
debug:
enable: false
groups:
- xxxxxxx
bilibili:
username: xxxxxxx
password: xxxxxxx
event:
mute: true
permchange: true
name: xxxxxxx
access_control: true
master: xxxxxxx
admins:
- xxxxxxx
32 changes: 32 additions & 0 deletions function/command/up/get_subscribe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from graia.saya import Channel
from graia.ariadne.app import Ariadne
from graia.ariadne.model import Group
from graia.ariadne.event.message import GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.saya.builtins.broadcast.schema import ListenerSchema
from graia.ariadne.message.parser.twilight import Twilight, RegexMatch

from library import get_group_sublist
from core.control import Interval, Permission

channel = Channel.current()


@channel.use(
ListenerSchema(
listening_events=[GroupMessage],
inline_dispatchers=[Twilight(RegexMatch(r"(查看)?(本群)?(订阅|关注)列表"))],
decorators=[Permission.require(Permission.GROUP_ADMIN), Interval.require()],
)
)
async def sub_list(app: Ariadne, group: Group):

sublist = get_group_sublist(group.id)
sublist_count = len(sublist)
if sublist_count == 0:
await app.sendGroupMessage(group, MessageChain.create("本群未订阅任何 UP"))
else:
await app.sendGroupMessage(
group,
MessageChain.create(f"本群共订阅 {sublist_count} 个 UP\n", "\n".join(sublist)),
)
Loading

0 comments on commit 2b069c5

Please sign in to comment.