diff --git a/parliament/__init__.py b/parliament/__init__.py index 5b57ca5..95a7bb6 100644 --- a/parliament/__init__.py +++ b/parliament/__init__.py @@ -1,7 +1,7 @@ """ This library is a linter for AWS IAM policies. """ -__version__ = "0.3.1" +__version__ = "0.3.2" import os import json @@ -44,6 +44,11 @@ def analyze_policy_string(policy_str, filepath=None): policy.analyze() return policy +class UnknownPrefixException(Exception): + pass + +class UnknownActionException(Exception): + pass def is_arn_match(resource_type, arn_format, resource): """ @@ -209,10 +214,10 @@ def expand_action(action, raise_exceptions=True): ) if not service_match and raise_exceptions: - raise ValueError("Unknown prefix {}".format(prefix)) + raise UnknownPrefixException("Unknown prefix {}".format(prefix)) if len(actions) == 0 and raise_exceptions: - raise ValueError("Unknown action {}:{}".format(prefix, unexpanded_action)) + raise UnknownActionException("Unknown action {}:{}".format(prefix, unexpanded_action)) return actions diff --git a/parliament/cli.py b/parliament/cli.py index 18affbb..d6acb96 100755 --- a/parliament/cli.py +++ b/parliament/cli.py @@ -9,17 +9,19 @@ from parliament import analyze_policy_string, enhance_finding -def print_finding( - finding, minimal_output=False, json_output=False, minimum_severity="LOW" -): +def is_finding_filtered(finding, minimum_severity="LOW"): + # Return True if the finding should not be displayed minimum_severity = minimum_severity.upper() finding = enhance_finding(finding) severity_choices = ["MUTE", "INFO", "LOW", "MEDIUM", "HIGH", "CRITICAL"] if severity_choices.index(finding.severity) < severity_choices.index( minimum_severity ): - return + return True + return False + +def print_finding(finding, minimal_output=False, json_output=False): if minimal_output: print("{}".format(finding.issue)) elif json_output: @@ -37,8 +39,8 @@ def print_finding( ) else: print( - "{} - {} - {} - {}".format( - finding.severity, finding.title, finding.detail, finding.location + "{} - {} - {} - {} - {}".format( + finding.severity, finding.title, finding.description, finding.detail, finding.location ) ) @@ -140,18 +142,24 @@ def main(): elif args.file: with open(args.file) as f: contents = f.read() - policy = analyze_policy_string(contents) - for finding in policy.findings: - findings.extend(policy.findings) + policy = analyze_policy_string(contents, args.file) + findings.extend(policy.findings) else: parser.print_help() exit(-1) - if len(findings) == 0: + filtered_findings = [] + for finding in findings: + if not is_finding_filtered(finding, args.minimum_severity): + filtered_findings.append(finding) + + if len(filtered_findings) == 0: + # Return with exit code 0 if no findings return - for finding in policy.findings: - print_finding(finding, args.minimal, args.json, args.minimum_severity) + for finding in filtered_findings: + print_finding(finding, args.minimal, args.json) + # There were findings, so return with a non-zero exit code exit(1) diff --git a/parliament/config.yaml b/parliament/config.yaml index af19c00..97f2964 100644 --- a/parliament/config.yaml +++ b/parliament/config.yaml @@ -44,6 +44,16 @@ MISMATCHED_TYPE: severity: MEDIUM group: INVALID +UNKNOWN_ACTION: + title: Unknown action + severity: LOW + group: INVALID + +UNKNOWN_PREFIX: + title: Unknown prefix + severity: LOW + group: INVALID + UNKNOWN_CONDITION_FOR_ACTION: title: Unknown condition for action description: The given condition is not documented to work with the given action @@ -58,7 +68,7 @@ BAD_PATTERN_FOR_MFA: INVALID_SID: title: Invalid SID description: Statement Sid does match regex [0-9A-Za-z]* - severity: MEDIUM + severity: LOW group: INVALID INVALID_ARN: @@ -67,6 +77,11 @@ INVALID_ARN: severity: MEDIUM group: INVALID +NO_VERSION: + title: Policy does not contain a Version element + severity: LOW + group: NOT_IDEAL + INVALID_VERSION: title: Invalid Version description: "Unknown Version used. Version must be either 2012-10-17 or 2008-10-17" diff --git a/parliament/policy.py b/parliament/policy.py index 21f4f7c..d0121bc 100644 --- a/parliament/policy.py +++ b/parliament/policy.py @@ -164,17 +164,17 @@ def analyze(self): # Check Version if "Version" not in self.policy_json: self.add_finding( - "MALFORMED", detail="Policy does not contain a Version element" + "NO_VERSION" ) - return False - self.version = self.policy_json["Version"] - - if self.version not in ["2012-10-17", "2008-10-17"]: - self.add_finding("INVALID_VERSION", location={"string": self.version}) - elif self.version != "2012-10-17": - # TODO I should have a check so that if an older version is being used, - # and a variable is detected, it should be marked as higher severity. - self.add_finding("OLD_VERSION", location={"string": self.version}) + else: + self.version = self.policy_json["Version"] + + if self.version not in ["2012-10-17", "2008-10-17"]: + self.add_finding("INVALID_VERSION", location={"string": self.version}) + elif self.version != "2012-10-17": + # TODO I should have a check so that if an older version is being used, + # and a variable is detected, it should be marked as higher severity. + self.add_finding("OLD_VERSION", location={"string": self.version}) # Check Statements if "Statement" not in self.policy_json: diff --git a/parliament/statement.py b/parliament/statement.py index 4175e06..9330948 100644 --- a/parliament/statement.py +++ b/parliament/statement.py @@ -3,7 +3,7 @@ import fnmatch import re -from . import iam_definition, is_arn_match, expand_action +from . import iam_definition, is_arn_match, expand_action, UnknownActionException from .finding import Finding from .misc import make_list, ACCESS_DECISION @@ -50,7 +50,7 @@ def get_privilege_info(service, action): privilege_info["service_resources"] = service_info["resources"] privilege_info["service_conditions"] = service_info["conditions"] return privilege_info - raise Exception("Unknown action {}:{}".format(service, action)) + raise UnknownActionException("Unknown action {}:{}".format(service, action)) def get_arn_format(resource_type, service_resources): @@ -279,8 +279,10 @@ def in_actions(self, privilege_prefix, privilege_name): for action in make_list(self.stmt["Action"]): if action == "*" or action == "*:*": return True + + expanded_actions = expand_action(action, raise_exceptions=False) - for action_struct in expand_action(action, raise_exceptions=False): + for action_struct in expanded_actions: if ( action_struct["service"] == privilege_prefix and action_struct["action"] == privilege_name @@ -619,6 +621,7 @@ def analyze_statement(self): # Check Sid if "Sid" in self.stmt and not re.fullmatch("[0-9A-Za-z]*", self.stmt["Sid"]): + # The grammar is defined at https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_grammar.html self.add_finding("INVALID_SID", detail=self.stmt) return False @@ -686,6 +689,9 @@ def analyze_statement(self): try: # Given an action such as "s3:List*", return all the possible values it could have expanded_actions.extend(expand_action(action)) + except UnknownActionException as e: + self.add_finding("UNKNOWN_ACTION", detail=e, location={"string": self.stmt}) + return False except Exception as e: self.add_finding("EXCEPTION", detail=e, location={"string": self.stmt}) return False @@ -750,6 +756,7 @@ def analyze_statement(self): # https://github.com/SummitRoute/aws_managed_policies/blob/4b71905a9042e66b22bc3d2b9cb1378b1e1d239e/policies/AWSEC2SpotServiceRolePolicy#L21 if not has_malformed_resource and effect == "Allow": + actions_without_matching_resources = [] # Ensure the required resources for each action exist # Iterate through each action for action_struct in expanded_actions: @@ -767,12 +774,7 @@ def analyze_statement(self): if resource == "*": match_found = True if not match_found: - self.add_finding( - "RESOURCE_MISMATCH", - detail="No resources match for {}:{} which requires a resource format of *".format( - action_struct["service"], action_struct["action"] - ), - ) + actions_without_matching_resources.append({'action': '{}:{}'.format(action_struct["service"], action_struct["action"]), 'required_format': '*'}) # Iterate through the resources defined in the action definition for resource_type in privilege_info["resource_types"]: @@ -799,15 +801,12 @@ def analyze_statement(self): continue if not match_found: - self.add_finding( - "RESOURCE_MISMATCH", - detail="No resources match for {}:{} which requires a resource format of {} for the resource {}".format( - action_struct["service"], - action_struct["action"], - arn_format, - resource_type, - ), - ) + actions_without_matching_resources.append({'action': '{}:{}'.format(action_struct["service"], action_struct["action"]), 'required_format': arn_format}) + if actions_without_matching_resources: + self.add_finding( + "RESOURCE_MISMATCH", + detail=actions_without_matching_resources + ) # If conditions exist, it will be an element, which was previously made into a list if len(conditions) == 1: diff --git a/tests/unit/test_action_expansion.py b/tests/unit/test_action_expansion.py index 51c0b3d..105d170 100644 --- a/tests/unit/test_action_expansion.py +++ b/tests/unit/test_action_expansion.py @@ -7,6 +7,7 @@ assert_count_equal, ) +from parliament import UnknownPrefixException, UnknownActionException from parliament.statement import expand_action @@ -61,19 +62,19 @@ def test_exception_bad_service(self): try: expand_action("333:listallmybuckets") assert False - except ValueError as e: + except UnknownPrefixException as e: assert True def test_exception_bad_action(self): try: expand_action("s3:zzz") assert False - except ValueError as e: + except UnknownActionException as e: assert True def test_exception_bad_expansion(self): try: expand_action("s3:zzz*") assert False - except ValueError as e: + except UnknownActionException as e: assert True