This repository has been archived by the owner on Sep 12, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 230
/
luna.py
120 lines (98 loc) · 2.78 KB
/
luna.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import re
import os
from asyncio import gather, get_event_loop, sleep
from aiohttp import ClientSession
from pyrogram import Client, filters, idle
from Python_ARQ import ARQ
is_config = os.path.exists("config.py")
if is_config:
from config import *
else:
from sample_config import *
luna = Client(
":memory:",
bot_token=bot_token,
api_id=6,
api_hash="eb06d4abfb49dc3eeb1aeb98ae0f581e",
)
bot_id = int(bot_token.split(":")[0])
arq = None
async def lunaQuery(query: str, user_id: int):
query = (
query
if LANGUAGE == "en"
else (await arq.translate(query, "en")).result.translatedText
)
resp = (await arq.luna(query, user_id)).result
return (
resp
if LANGUAGE == "en"
else (
await arq.translate(resp, LANGUAGE)
).result.translatedText
)
async def type_and_send(message):
chat_id = message.chat.id
user_id = message.from_user.id if message.from_user else 0
query = message.text.strip()
await message._client.send_chat_action(chat_id, "typing")
response, _ = await gather(lunaQuery(query, user_id), sleep(2))
await message.reply_text(response)
await message._client.send_chat_action(chat_id, "cancel")
@luna.on_message(filters.command("repo") & ~filters.edited)
async def repo(_, message):
await message.reply_text(
"[GitHub](https://github.com/thehamkercat/LunaChatBot)"
+ " | [Group](t.me/PatheticProgrammers)",
disable_web_page_preview=True,
)
@luna.on_message(filters.command("help") & ~filters.edited)
async def start(_, message):
await luna.send_chat_action(message.chat.id, "typing")
await sleep(2)
await message.reply_text("/repo - Get Repo Link")
@luna.on_message(
~filters.private
& filters.text
& ~filters.command("help")
& ~filters.edited,
group=69,
)
async def chat(_, message):
if message.reply_to_message:
if not message.reply_to_message.from_user:
return
from_user_id = message.reply_to_message.from_user.id
if from_user_id != bot_id:
return
else:
match = re.search(
"[.|\n]{0,}luna[.|\n]{0,}",
message.text.strip(),
flags=re.IGNORECASE,
)
if not match:
return
await type_and_send(message)
@luna.on_message(
filters.private & ~filters.command("help") & ~filters.edited
)
async def chatpm(_, message):
if not message.text:
return
await type_and_send(message)
async def main():
global arq
session = ClientSession()
arq = ARQ(ARQ_API_BASE_URL, ARQ_API_KEY, session)
await luna.start()
print(
"""
-----------------
| Luna Started! |
-----------------
"""
)
await idle()
loop = get_event_loop()
loop.run_until_complete(main())