forked from awslabs/open-data-registry
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathext.py
138 lines (101 loc) · 4.25 KB
/
ext.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import re
import yaml
import requests
from urllib3.exceptions import InsecureRequestWarning
# Suppress the warning on Verify=False requests
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
tags = yaml.safe_load(open('tags.yaml'))
tags.append('aws-pds')
resources = yaml.safe_load(open('resources.yaml'))
services = yaml.safe_load(open('services.yaml'))
arn_regex = re.compile(r"^arn:(aws|aws-iso):.+:.*:.*:.+$")
host_regex = re.compile(r"^(https?:\/\/)?([\da-z\.-]+)\.([a-z\.]{2,63})(\/.*)*\/?$")
controlled_access_regex = re.compile(r"^(https?:\/\/)?([\da-z\.\-\_]+)\.([a-z\.]{2,63})(\/.*)*\/?$")
explore_regex = re.compile(r"^\[.+\]\(https?:\/\/[\w\d.\-\/#]+\)$")
# Check if provided tags are in tags.yaml
def ext_tags(value, rule_obj, path):
if value not in tags:
print('Invalid tag!', value)
return False
# If we're here, all tags were ok
return True
# Check if provided resources are in resources.yaml
def ext_resources(value, rule_obj, path):
if value not in resources:
print('Invalid resource!', value)
return False
# If we're here, all resources were ok
return True
# Check if provided services are in services.yaml
def ext_services(value, rule_obj, path):
if value not in services:
print('Invalid service!', value)
return False
# If we're here, all services were ok
return True
# Check to make sure we have a valid arn
def ext_resources_arn(value, rule_obj, path):
if not re.fullmatch(arn_regex, value):
print("ARN '{}' is not valid, it should like like arn:aws:s3:::yourbucket".format(value))
return False
return True
# Check to make sure we have a valid host
def ext_resources_host(value, rule_obj, path):
if not re.fullmatch(host_regex, value):
print("Host '{}' is not valid".format(value))
return False
return True
# Check to make sure we have a valid controlled access string
def ext_resources_controlled_access(value, rule_obj, path):
if not re.fullmatch(controlled_access_regex, value):
print("Controlled Access string '{}' is not valid".format(value))
return False
return True
# Check to make sure we have a valid array of links
def ext_resources_explore(value, rule_obj, path):
if not re.fullmatch(explore_regex, value):
print("Explore string '{}' is not a valid link".format(value))
return False
return True
def ext_valid_bucket_regions(value, rule_obj, path):
# Validate required fields in resources
if not isinstance(value, dict):
print("Did not receives a resources dictionary...")
return False
if 'Type' not in value:
print("Type is a required resources field")
return False
if 'Description' not in value:
print("Description is a required resources field")
return False
if 'Region' not in value:
print("Region is a required resources field")
return False
if 'Explore' in value and not isinstance(value['Explore'], list):
print("Explore must be an array of links")
return False
# Make sure this is a dict, and a bucket, then validate the region
if value['Type'] == 'S3 Bucket':
bucket = value['ARN']
parts = bucket.split(':::')
if not parts[0] == 'arn:aws:s3':
# This is probably not on public aws so we can't check
return True
bucket = parts[1]
parts = bucket.split('/')
bucket = parts[0]
url = "https://{}.s3.amazonaws.com".format(bucket)
# Get the headers for this bucket.
# Verify=False because the wildcard matching doesn't work for buckets with '.'
r = requests.head(url, verify=False)
if r.status_code == requests.codes.not_found:
print("Bucket {} doesn't exist or there was a momentary glitch".format(bucket))
return False
if not 'x-amz-bucket-region' in r.headers:
print("Bucket region missing from request header?")
return False
region = r.headers['x-amz-bucket-region']
if not value['Region'].lower() == region.lower():
print('The region for bucket {} is listed as {} but is actually {}'.format(bucket, value['Region'], region))
return False
return True