-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelpers.py
70 lines (54 loc) · 1.93 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import logging
import requests
log = logging.getLogger(__name__)
def get_public_ip(service: str) -> str:
return requests.get(url=f"{service}").json().get("ip")
def get_records_from_aws(client, zone_id: str, domain_name: str) -> list:
"""
Get all records from AWS Route53
:param client: boto3 client
:param zone_id: AWS Route53 zone id
:param domain_name: Domain name to get records for
:return: list of records
"""
record_objects = client.list_resource_record_sets(
HostedZoneId=zone_id,
StartRecordName=domain_name,
StartRecordType="A",
)
return record_objects.get("ResourceRecordSets")
def get_records_to_update(records: list, current_public_ip: str) -> list:
"""
Get records to update
:param records: list of records
:param current_public_ip: current public ip
:return: list of changes
"""
changes = []
for record in records:
if record.get("Type") != "A":
log.info("Ignoring record %s, wrong type: %s", record.get("Name"), record.get("Type"))
continue
current_record_ip = record.get("ResourceRecords")[0].get("Value")
if current_public_ip == current_record_ip:
log.info(
f"Current IP '{current_public_ip}' matches the IP resolved for record '{record}'. No need to update."
)
continue
log.info(
f"Current IP {current_public_ip} doesn't match record {record['Name']} IP ({current_record_ip}). Adding to batch update."
)
changes.append(
{
"Action": "UPSERT",
"ResourceRecordSet": {
"Name": record["Name"],
"Type": "A",
"TTL": record["TTL"],
"ResourceRecords": [
{"Value": current_public_ip},
],
},
}
)
return changes