forked from c0sm0void/ReVot
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
First commit after testing successfully
- Loading branch information
“minnonymous”
committed
Sep 22, 2020
1 parent
c2be9d9
commit b9da942
Showing
17 changed files
with
1,246 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
[[source]] | ||
|
||
url = "https://pypi.python.org/simple" | ||
verify_ssl = true | ||
name = "pypi" | ||
|
||
|
||
[packages] | ||
|
||
python-telegram-bot = "*" | ||
"bs4" = "*" | ||
requests = "*" | ||
pillow = "*" | ||
paramiko = "*" | ||
moviepy = "*" | ||
|
||
|
||
[dev-packages] | ||
|
||
|
||
|
||
[requires] | ||
|
||
python_version = "3.6" |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
#!/usr/bin/env bash | ||
set -e | ||
|
||
echo files changed: $(git diff $oldrev $newrev --diff-filter=ACDMR --name-only | wc -l) | ||
|
||
umask 002 | ||
|
||
git submodule sync && git submodule update --init --recursive | ||
echo "Restarting reverse_image_search_bot.service" | ||
systemctl --user restart reverse_image_search_bot.service | ||
echo "Restarted" |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
import logging | ||
import os | ||
import sys | ||
from threading import Thread | ||
|
||
from telegram import Bot, TelegramError, Update | ||
from telegram.ext import CallbackQueryHandler, CommandHandler, Filters, MessageHandler, Updater | ||
|
||
from . import settings | ||
from .commands import best_match, callback_best_match, gif_image_search, group_image_reply_search, image_search_link, \ | ||
start, sticker_image_search, unknown | ||
|
||
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def error(bot: Bot, update: Update, error: TelegramError): | ||
"""Log all errors from the telegram bot api | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
error (:obj:`telegram.error.TelegramError`): Telegram Api TelegramError Object | ||
""" | ||
logger.warning('Update "%s" caused error "%s"' % (update, error)) | ||
|
||
|
||
def main(): | ||
updater = Updater(settings.TELEGRAM_API_TOKEN) | ||
dispatcher = updater.dispatcher | ||
|
||
def stop_and_restart(): | ||
"""Gracefully stop the Updater and replace the current process with a new one.""" | ||
updater.stop() | ||
os.execl(sys.executable, sys.executable, *sys.argv) | ||
|
||
def restart(bot: Bot, update: Update): | ||
"""Start the restarting process | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
""" | ||
update.message.reply_text('Bot is restarting...') | ||
logger.info('Gracefully restarting...') | ||
Thread(target=stop_and_restart).start() | ||
|
||
dispatcher.add_handler(CommandHandler("start", start)) | ||
dispatcher.add_handler(CommandHandler("help", start)) | ||
dispatcher.add_handler(CommandHandler('restart', restart, filters=Filters.user(username='@c0sm0s0'))) | ||
dispatcher.add_handler(CommandHandler('reply_search', group_image_reply_search)) | ||
dispatcher.add_handler(CommandHandler('best_match', best_match, pass_args=True)) | ||
dispatcher.add_handler(CallbackQueryHandler(callback_best_match)) | ||
|
||
dispatcher.add_handler(MessageHandler(Filters.sticker, sticker_image_search)) | ||
dispatcher.add_handler(MessageHandler(Filters.photo, image_search_link)) | ||
dispatcher.add_handler(MessageHandler(Filters.video | Filters.document, gif_image_search)) | ||
dispatcher.add_handler(MessageHandler(Filters.command, unknown)) | ||
|
||
# log all errors | ||
dispatcher.add_error_handler(error) | ||
|
||
updater.start_polling() | ||
logger.info('Started bot. Waiting for requests...') | ||
updater.idle() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,249 @@ | ||
import io | ||
import os | ||
from tempfile import NamedTemporaryFile | ||
from uuid import uuid4 | ||
|
||
from PIL import Image | ||
from moviepy.video.io.VideoFileClip import VideoFileClip | ||
from telegram import Bot, ChatAction, InlineKeyboardButton, InlineKeyboardMarkup, Update | ||
from telegram.parsemode import ParseMode | ||
|
||
from reverse_image_search_bot.utils import dict_to_str | ||
from .image_search import BingReverseImageSearchEngine, \ | ||
GoogleReverseImageSearchEngine, IQDBReverseImageSearchEngine, \ | ||
TinEyeReverseImageSearchEngine, YandexReverseImageSearchEngine | ||
|
||
|
||
def start(bot: Bot, update: Update): | ||
"""Send Start / Help message to client. | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
""" | ||
reply = """*ReVot - Reverse Image Search Bot* | ||
*How to use me* | ||
Send me images or stickers and I will send you direct reverse image search links for IQDB, Google, TinEy, Yandex and Bing. | ||
*Commands* | ||
- /help, /start: show a help message with information about the bot and it's usage. | ||
Thank you for using. | ||
(Beta tester Minhajul \@-@/ and Abhyuday \0-0/) | ||
""" | ||
|
||
update.message.reply_text(reply, parse_mode=ParseMode.MARKDOWN) | ||
|
||
current_dir = os.path.dirname(os.path.realpath(__file__)) | ||
image_dir = os.path.join(current_dir, 'images/example_usage.png') | ||
bot.send_photo(update.message.chat_id, photo=open(image_dir, 'rb'), caption='Example Usage') | ||
|
||
|
||
def group_image_reply_search(bot: Bot, update: Update): | ||
"""Reverse search for reply mentions to images in groups | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
""" | ||
print(update.message.reply_to_message.document.file_id) | ||
pass | ||
|
||
|
||
def gif_image_search(bot: Bot, update: Update): | ||
"""Send a reverse image search link for the GIF sent to us | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
""" | ||
update.message.reply_text('Please wait for your results ...') | ||
bot.send_chat_action(chat_id=update.message.chat_id, action=ChatAction.TYPING) | ||
|
||
document = update.message.document or update.message.video | ||
video = bot.getFile(document.file_id) | ||
|
||
with NamedTemporaryFile() as video_file: | ||
video.download(out=video_file) | ||
video_clip = VideoFileClip(video_file.name, audio=False) | ||
|
||
with NamedTemporaryFile(suffix='.gif') as gif_file: | ||
video_clip.write_gif(gif_file.name) | ||
|
||
dirname = os.path.dirname(gif_file.name) | ||
file_name = os.path.splitext(gif_file.name)[0] | ||
compressed_gif_path = os.path.join(dirname, file_name + '-min.gif') | ||
|
||
os.system('gifsicle -O3 --lossy=50 -o {dst} {src}'.format(dst=compressed_gif_path, src=gif_file.name)) | ||
if os.path.isfile(compressed_gif_path): | ||
general_image_search(bot, update, compressed_gif_path, 'gif') | ||
else: | ||
general_image_search(bot, update, gif_file.name, 'gif') | ||
|
||
|
||
def sticker_image_search(bot: Bot, update: Update): | ||
"""Send a reverse image search link for the image of the sticker sent to us | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
""" | ||
update.message.reply_text('Please wait for your results ...') | ||
bot.send_chat_action(chat_id=update.message.chat_id, action=ChatAction.TYPING) | ||
|
||
sticker_image = bot.getFile(update.message.sticker.file_id) | ||
converted_image = io.BytesIO() | ||
|
||
with io.BytesIO() as image_buffer: | ||
sticker_image.download(out=image_buffer) | ||
with io.BufferedReader(image_buffer) as image_file: | ||
pil_image = Image.open(image_file).convert("RGBA") | ||
pil_image.save(converted_image, 'png') | ||
|
||
general_image_search(bot, update, converted_image, 'png') | ||
|
||
|
||
def image_search_link(bot: Bot, update: Update): | ||
"""Send a reverse image search link for the image he sent us to the client | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
""" | ||
update.message.reply_text('Please wait for your results ...') | ||
bot.send_chat_action(chat_id=update.message.chat_id, action=ChatAction.TYPING) | ||
|
||
photo = bot.getFile(update.message.photo[-1].file_id) | ||
with io.BytesIO() as image_buffer: | ||
photo.download(out=image_buffer) | ||
with io.BufferedReader(image_buffer) as image_file: | ||
general_image_search(bot, update, image_file) | ||
|
||
|
||
def general_image_search(bot: Bot, update: Update, image_file, image_extension: str=None): | ||
"""Send a reverse image search link for the image sent to us | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
image_file: File like image to search for | ||
image_extension (:obj:`str`): What extension the image should have. Default is 'jpg' | ||
""" | ||
image_extension = image_extension or 'jpg' | ||
|
||
iqdb_search = IQDBReverseImageSearchEngine() | ||
google_search = GoogleReverseImageSearchEngine() | ||
tineye_search = TinEyeReverseImageSearchEngine() | ||
bing_search = BingReverseImageSearchEngine() | ||
yandex_search = YandexReverseImageSearchEngine() | ||
|
||
image_url = iqdb_search.upload_image(image_file, 'irs-' + str(uuid4())[:8] + '.' + image_extension) | ||
|
||
iqdb_url = iqdb_search.get_search_link_by_url(image_url) | ||
google_url = google_search.get_search_link_by_url(image_url) | ||
tineye_url = tineye_search.get_search_link_by_url(image_url) | ||
bing_url = bing_search.get_search_link_by_url(image_url) | ||
yandex_url = yandex_search.get_search_link_by_url(image_url) | ||
|
||
button_list = [[ | ||
InlineKeyboardButton(text='Best Match', callback_data='best_match ' + image_url) | ||
], [ | ||
InlineKeyboardButton(text='Go To Image', url=image_url) | ||
], [ | ||
InlineKeyboardButton(text='IQDB', url=iqdb_url), | ||
InlineKeyboardButton(text='GOOGLE', url=google_url), | ||
], [ | ||
InlineKeyboardButton(text='TINEYE', url=tineye_url), | ||
InlineKeyboardButton(text='BING', url=bing_url), | ||
], [ | ||
InlineKeyboardButton(text='YANDEX', url=yandex_url), | ||
]] | ||
|
||
reply = 'You can either use "Best Match" to get your best match right here or search for yourself.' | ||
reply_markup = InlineKeyboardMarkup(button_list) | ||
update.message.reply_text( | ||
text=reply, | ||
reply_markup=reply_markup | ||
) | ||
|
||
|
||
def callback_best_match(bot: Bot, update: Update): | ||
"""Find best matches for an image for a :class:`telegram.callbackquery.CallbackQuery`. | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
""" | ||
bot.answer_callback_query(update.callback_query.id, show_alert=False) | ||
url = update.callback_query.data.split(' ')[1] | ||
best_match(bot, update, [url, ]) | ||
|
||
|
||
def best_match(bot: Bot, update: Update, args: list): | ||
"""Find best matches for an image. | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
args (:obj:`list`): List of arguments passed by the user | ||
""" | ||
if not args: | ||
update.message.reply_text('You have to give me an URL to make this work.') | ||
return | ||
tineye = TinEyeReverseImageSearchEngine() | ||
iqdb = IQDBReverseImageSearchEngine() | ||
tineye.search_url = args[0] | ||
iqdb.search_url = args[0] | ||
|
||
chat_id = update.effective_chat.id | ||
message = bot.send_message(chat_id, 'Searching for best match on TinEye...') | ||
|
||
match = tineye.best_match | ||
if not match: | ||
bot.edit_message_text( | ||
text='Nothing found on TinEye, searching on IQDB...', | ||
chat_id=chat_id, | ||
message_id=message.message_id | ||
) | ||
match = iqdb.best_match | ||
|
||
if match: | ||
reply = ( | ||
'Best Match:\n' | ||
'Link: [{website_name}]({website})\n'.format( | ||
website_name=match['website_name'], | ||
website=match['website'], | ||
) | ||
) | ||
reply += dict_to_str(match, ignore=['website_name', 'website', 'image_url', 'thumbnail']) | ||
|
||
image_url = match.get('image_url', None) or match.get('website', None) | ||
thumbnail = match.get('image_url', None) or match.get('thumbnail', None) | ||
|
||
reply_markup = InlineKeyboardMarkup([[InlineKeyboardButton(text='Open', url=image_url), ], ]) | ||
bot.delete_message(chat_id, message.message_id) | ||
|
||
bot.send_photo(chat_id=chat_id, photo=thumbnail) | ||
bot.send_message( | ||
chat_id=chat_id, | ||
text=reply, | ||
reply_markup=reply_markup, | ||
parse_mode=ParseMode.MARKDOWN | ||
) | ||
else: | ||
bot.edit_message_text( | ||
text='Nothing found on TinEye nor IQDB.', | ||
chat_id=chat_id, | ||
message_id=message.message_id, | ||
) | ||
|
||
|
||
def unknown(bot: Bot, update: Update): | ||
"""Send a error message to the client if the entered command did not work. | ||
Args: | ||
bot (:obj:`telegram.bot.Bot`): Telegram Api Bot Object. | ||
update (:obj:`telegram.update.Update`): Telegram Api Update Object | ||
""" | ||
update.message.reply_text("Sorry, I didn't understand that command.") |
Oops, something went wrong.