-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtgtg-notification.py
222 lines (169 loc) Β· 6.48 KB
/
tgtg-notification.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import os
import json
import base64
import time
import random
import asyncio
from dotenv import load_dotenv
from tgtg import TgtgClient
from pushbullet import Pushbullet
from telegram import Bot
load_dotenv()
# Initialize Too Good To Go client
TGTG_CREDENTIALS = os.getenv("TGTG_CREDENTIALS")
if TGTG_CREDENTIALS == None or TGTG_CREDENTIALS == "":
print("TGTG_CREDTENIALS not set")
exit(1)
try:
tgtg_json = base64.b64decode(TGTG_CREDENTIALS).decode("utf-8")
tgtg_credentials = json.loads(tgtg_json)
except:
print("TGTG_CREDENTIALS is not a valid json")
exit()
tgtg_client = TgtgClient(
access_token=tgtg_credentials["access_token"],
refresh_token=tgtg_credentials["refresh_token"],
user_id=tgtg_credentials["user_id"],
cookie=tgtg_credentials["cookie"],
)
# Initialize Pushbullet client
pushbullet_client = None
PUSHBULLET_TOKEN = os.getenv("PUSHBULLET_TOKEN")
if PUSHBULLET_TOKEN == None or PUSHBULLET_TOKEN == "":
print("No Pushbullet token provided, Pushbullet notifications will not be sent")
else:
PUSHBULLET_CHAT_ID = (
os.getenv("PUSHBULLET_CHAT_ID")
if os.getenv("PUSHBULLET_CHAT_ID") != ""
else None
)
PUSHBULLET_DEVICE_ID = (
os.getenv("PUSHBULLET_DEVICE_ID")
if os.getenv("PUSHBULLET_DEVICE_ID") != ""
else None
)
pushbullet_client = Pushbullet(PUSHBULLET_TOKEN)
# Initialize Telegram client
telegram_client = None
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
if TELEGRAM_TOKEN == None or TELEGRAM_TOKEN == "":
print("No Telegram token provided, Telegram notifications will not be sent")
elif TELEGRAM_CHAT_ID == None or TELEGRAM_CHAT_ID == "":
print("No Telegram chat id provided, Telegram notifications will not be sent")
else:
telegram_client = Bot(TELEGRAM_TOKEN)
async_event_loop = asyncio.new_event_loop()
# Send telegram message
def send_telegram_message(title, message):
async def telegram_message():
await telegram_client.send_message(
chat_id=TELEGRAM_CHAT_ID,
text=f"{title}\n{message}",
)
async_event_loop.run_until_complete(telegram_message())
# Send notification to Pushbullet and Telegram
def send_notification(title, message, link):
if pushbullet_client != None:
pushbullet_client.push_link(
title=title,
url=link,
body=message,
chat=PUSHBULLET_CHAT_ID,
device=PUSHBULLET_DEVICE_ID,
)
if telegram_client != None:
send_telegram_message(title, f"{message}\n{link}")
# Send message to Pushbullet and Telegram
def send_message(title, message):
if pushbullet_client != None:
pushbullet_client.push_note(
title=title,
body=message,
chat=PUSHBULLET_CHAT_ID,
device=PUSHBULLET_DEVICE_ID,
)
if telegram_client != None:
send_telegram_message(title, message)
# Check if items are available
def check_items(prev_available_items, is_send_notification=True):
# Get items
items = tgtg_client.get_items()
# Check if items are available
available_items = list()
for item in items:
if item["items_available"] > 0:
available_items.append(item)
# Check if new items are available
new_available_items = list()
for item in available_items:
item_id = item["item"]["item_id"]
if item_id not in prev_available_items:
new_available_items.append(item)
elif item["items_available"] > prev_available_items[item_id]:
new_available_items.append(item)
# Send notification if new items are available
if len(new_available_items) > 0 and is_send_notification:
for item in new_available_items:
item_url = f"https://share.toogoodtogo.com/item/{item['item']['item_id']}"
notification_title = "π Too Good To Go - Item Watcher"
notification_message = f"{item['display_name']}: {item['items_available']}"
send_notification(notification_title, notification_message, item_url)
print("Item found", notification_message)
# If no items are available
if len(new_available_items) == 0:
print("No new items found")
# Update previous available items
prev_available_items.clear()
for item in available_items:
prev_available_items[item["item"]["item_id"]] = item["items_available"]
if __name__ == "__main__":
print("Starting Too Good To Go notification script")
print("----------------------------------")
# Create dictionary for previous available items
prev_available_items = dict()
# Send message to Pushbullet and Telegram
send_message("π Too Good To Go - Item Watcher", "The Item Watcher has started.")
# Create error counter
error_count = 0
# Check if items are available every 30 seconds
while True:
try:
print("Checking for items...")
# Check if items are available
check_items(prev_available_items, True)
# Reset error counter
error_count = 0
# Wait 30 seconds plus a random number of seconds between 0 and 5 seconds
sleep_time = 30 + random.uniform(0, 5)
print(f"Waiting {sleep_time:.2f} seconds...\n")
time.sleep(sleep_time)
except KeyboardInterrupt:
print("Keyboard interrupt detected, stopping script")
# Send message to Pushbullet and Telegram
send_message(
"π Too Good To Go - Item Watcher",
"The Item Watcher has stopped by the user.",
)
exit()
except Exception as e:
print(f"An error occured: {e}\n")
# Increase error counter
error_count += 1
# Send message to Pushbullet and Telegram
send_message(
"π Too Good To Go - Item Watcher",
f"An error occured: {e}\n\nError count: {error_count}",
)
# Stop script if error count is 5
if error_count == 5:
print("Error count is 5, stopping script")
# Send message to Pushbullet and Telegram
send_message(
"π Too Good To Go - Item Watcher",
"The Item Watcher has stopped because the error count is 5.",
)
exit(1)
# Wait 60 seconds
print("Waiting 60 seconds...\n")
time.sleep(60)