-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdns_functions.py
185 lines (149 loc) · 7.09 KB
/
dns_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import requests
import pydig
import json
from owl.notifications import Notifier
from owl.config import load_config
from owl.webserver import write_to_template
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
TIMEZONE = ZoneInfo(load_config('./config.json')['TIMEZONE'])
if load_config('./config.json')['NOTIFICATIONS']['ENABLE_NOTIFICATIONS']:
notification_service = Notifier()
else:
notification_service = None
def get_current_public_ip(provider: str = 'CLOUDFLARE') -> str:
if provider == 'CLOUDFLARE':
print(f'\tChecking public IP via https://cloudflare.com/cdn-cgi/trace')
ip_address = requests.get('https://cloudflare.com/cdn-cgi/trace')
ip_address_text = ip_address.text.split('\n')[2].replace('ip=', '')
else:
print(f'\tChecking public IP via https://ident.me')
ip_address = requests.get('https://ident.me')
ip_address_text = ip_address.text
print(f'\tMy public IP address is: {ip_address_text}')
return ip_address_text
def resolve_current_server_ip(url, nameservers='8.8.8.8') -> [str, str]:
nameservers = [nameservers]
resolver = pydig.Resolver(nameservers=nameservers)
local_ip = pydig.query(url, 'A')
public_ip = resolver.query(url, 'A')
# print(f'\tLocal IP address for {url} is: {local_ip[0]}')
# print(f'\tPublic IP address for {url} is: {public_ip[0]}')
return local_ip[0], public_ip[0]
def compare_ip(ip1, ip2) -> bool:
# print('\tComparing addresses:')
# print(f'\tIP address 1: {ip1}')
# print(f'\tIP address 2: {ip2}')
if ip1 == ip2:
return True
else:
return False
def set_ip(cloudflare, domain, current_ip: str):
"""
sets the ip in via cloudflare api
"""
zone_id = cloudflare['ZONE_ID']
api_key = cloudflare['API_KEY']
user_email = cloudflare['USER_EMAIL']
record_id = domain['RECORD_ID']
record_name = domain['RECORD_NAME']
print(f"\tCloudflare Zone ID is: {zone_id}")
print(f"\tCloudflare API Key is: {api_key}")
print(f"\tRecord ID is: {record_id}")
print(f"\tRecord Name is: {record_name}")
url = (
"https://api.cloudflare.com/client/v4/zones/%(zone_id)s/dns_records/%(record_id)s"
% {"zone_id": zone_id, "record_id": record_id}
)
headers = {
"X-Auth-Email": user_email,
"X-Auth-Key": api_key,
"Content-Type": "application/json",
}
payload = {"type": "A", "name": record_name, "content": current_ip}
response = requests.put(url, headers=headers, data=json.dumps(payload))
# print(response.status_code)
# print(response.json()['success'])
return response
def update_all_ip(current_ip):
data = load_config('./config.json')
cf = data
for domain in data['domains']:
print(f"{'':#<40}")
print(f"\tPerform Check if Update is necessary...")
domain_ip = resolve_current_server_ip(domain['RECORD_NAME'])[1]
check = compare_ip(current_ip, domain_ip)
domain['OLD_IP'] = domain_ip
if check:
if notification_service:
if data['NOTIFICATIONS']['ENABLE_NOTIFICATION_NO_CHANGE']:
print(f"\tIP for domain: {domain['RECORD_NAME']} is {domain_ip} which is the current public IP {current_ip}. No Update necessary!")
notification_service.send_success(f"\tIP for domain: {domain['RECORD_NAME']} is {domain_ip} which is the current public IP {current_ip}. No Update necessary!")
else:
print(f"\tIP for domain: {domain['RECORD_NAME']} is {domain_ip} which is the current public IP {current_ip}. No Update necessary! No Notification send due to configuration!")
domain['NEW_IP'] = domain_ip
domain['RESULT'] = f"No change"
### Placeholder
### Needs some work to load time from last successful update
domain_info = get_current_dns_entry_from_cf(cf, domain)
last_update = datetime.strptime(domain_info['result']['modified_on'], "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
domain['LAST_UPDATE'] = last_update.astimezone(TIMEZONE).strftime("%d.%m.%Y at %H:%M:%S")
else:
print(f"\tUpdating DynDNS IP for domain: {domain['RECORD_NAME']}...")
response = set_ip(cf, domain, current_ip)
if response.json()['success']:
print(f"\tIP was set successfully!")
if notification_service:
notification_service.send_success(f"IP for domain {domain['RECORD_NAME']} was successfully set to {current_ip}. Old IP was {domain_ip}")
domain['NEW_IP'] = current_ip
domain['RESULT'] = f"Update successfull"
domain_info = get_current_dns_entry_from_cf(cf, domain)
last_update = datetime.strptime(domain_info['result']['modified_on'], "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
domain['LAST_UPDATE'] = last_update.astimezone(TIMEZONE).strftime("%d.%m.%Y at %H:%M:%S")
else:
print(f"\tThere was an error, see below for more details")
print(f"\tResponse code was: {response.status_code}")
if notification_service:
notification_service.send_error(f"An error occurred while setting IP for domain {domain['RECORD_NAME']}, status code {response.status_code}")
print(f"\tResponse json is: {response.json()}")
domain['NEW_IP'] = f"Not set"
domain['RESULT'] = f"Error"
### Placeholder
### Needs some work to load time from last successful update
domain_info = get_current_dns_entry_from_cf(cf, domain)
last_update = datetime.strptime(domain_info['result']['modified_on'], "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
domain['LAST_UPDATE'] = last_update.astimezone(TIMEZONE).strftime("%d.%m.%Y at %H:%M:%S")
print('\tDone!')
print(f"{'':#<40}")
# Updating index.html
write_to_template(data['domains'])
print('\tUpdating index.html...')
print('\tDone!')
print(f"{'':#<40}")
def get_current_dns_entry_from_cf(cloudflare, domain):
zone_id = cloudflare['ZONE_ID']
api_key = cloudflare['API_KEY']
user_email = cloudflare['USER_EMAIL']
record_id = domain['RECORD_ID']
record_name = domain['RECORD_NAME']
print(f"\tCloudflare Zone ID is: {zone_id}")
print(f"\tCloudflare API Key is: {api_key}")
print(f"\tRecord ID is: {record_id}")
print(f"\tRecord Name is: {record_name}")
url = (
"https://api.cloudflare.com/client/v4/zones/%(zone_id)s/dns_records/%(record_id)s"
% {"zone_id": zone_id, "record_id": record_id}
)
headers = {
"X-Auth-Email": user_email,
"X-Auth-Key": api_key,
"Content-Type": "application/json",
}
response = requests.get(url, headers=headers)
# print(response.status_code)
return response.json()
if __name__ == '__main__':
print(f"{'':#<40}")
ip = get_current_public_ip()
update_all_ip(ip)
print(f"\tDone updating, sleep until next CRON schedule...")