Skip to content

Commit

Permalink
V1 - Cleanup some of the function rules (#2983)
Browse files Browse the repository at this point in the history
* Cleanup rules and reduce redundant code
* Cleanup Fn::Cidr function
* Add errors to resolvers when values can't be resolved
* Increase test coverage
  • Loading branch information
kddejong authored Dec 21, 2023
1 parent e175d61 commit df45aea
Show file tree
Hide file tree
Showing 23 changed files with 378 additions and 807 deletions.
16 changes: 14 additions & 2 deletions src/cfnlint/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ def __post_init__(self, parameter) -> None:

self.default = None
self.allowed_values = []
self.min_value = None
self.max_value = None
# SSM Parameter defaults and allowed values point to
# SSM paths not to the actual values
if self.type.startswith("AWS::SSM::Parameter::"):
Expand All @@ -203,6 +205,8 @@ def __post_init__(self, parameter) -> None:
else:
self.default = parameter.get("Default")
self.allowed_values = parameter.get("AllowedValues")
self.min_value = parameter.get("MinValue")
self.max_value = parameter.get("MaxValue")

def ref(self, context: Context) -> Iterable[Any]:
if self.allowed_values:
Expand All @@ -213,6 +217,12 @@ def ref(self, context: Context) -> Iterable[Any]:
if self.default:
yield self.default

if self.min_value:
yield self.min_value

if self.max_value:
yield self.max_value


@dataclass
class Resource(_Ref):
Expand Down Expand Up @@ -244,14 +254,16 @@ class _MappingSecondaryKey:
This class holds a mapping value
"""

keys: Dict[str, List[Any] | str] = field(init=False, default_factory=dict)
keys: Dict[str, List[Any] | str | int | float] = field(
init=False, default_factory=dict
)
instance: InitVar[Any]

def __post_init__(self, instance) -> None:
if not isinstance(instance, dict):
raise ValueError("Secondary keys must be a object")

Check warning on line 264 in src/cfnlint/context/context.py

View check run for this annotation

Codecov / codecov/patch

src/cfnlint/context/context.py#L264

Added line #L264 was not covered by tests
for k, v in instance.items():
if isinstance(v, (str, list)):
if isinstance(v, (str, list, int, float)):
self.keys[k] = v

def value(self, secondary_key: str):
Expand Down
121 changes: 83 additions & 38 deletions src/cfnlint/jsonschema/_resolvers_cfn.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import regex as re

from cfnlint.helpers import AVAILABILITY_ZONES
from cfnlint.jsonschema import Validator
from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.jsonschema._typing import ResolutionResult
from cfnlint.jsonschema._utils import equal


def unresolvable(validator: Validator, instance: Any) -> ResolutionResult:
Expand All @@ -24,13 +25,13 @@ def ref(validator: Validator, instance: Any) -> ResolutionResult:
if not isinstance(instance, (str, dict)):
return

for instance, _ in validator.resolve_value(instance):
for instance, _, _ in validator.resolve_value(instance):
if validator.is_type(instance, "string"):
# if the ref is to pseudo-parameter or parameter we can validate the values
if instance in validator.context.ref_values:
# Ref: AWS::NoValue returns None making this fail
if validator.context.ref_values[instance] is not None:
yield validator.context.ref_values[instance], deque([])
yield validator.context.ref_values[instance], deque([]), None
return
if instance in validator.context.parameters:
if validator.context.parameters[instance].allowed_values:
Expand All @@ -39,11 +40,19 @@ def ref(validator: Validator, instance: Any) -> ResolutionResult:
):
yield value, deque(
["Parameters", instance, "AllowedValues", index]
)
), None
if validator.context.parameters[instance].default is not None:
yield validator.context.parameters[instance].default, deque(
["Parameters", instance, "Default"]
)
), None
if validator.context.parameters[instance].min_value is not None:
yield validator.context.parameters[instance].min_value, deque(
["Parameters", instance, "MinValue"]
), None
if validator.context.parameters[instance].max_value is not None:
yield validator.context.parameters[instance].max_value, deque(
["Parameters", instance, "MaxValue"]
), None
return
return

Expand All @@ -54,61 +63,97 @@ def find_in_map(validator: Validator, instance: Any) -> ResolutionResult:
if len(instance) not in [3, 4]:
return

for map_name, _ in validator.resolve_value(instance[0]):
default_value = None
if len(instance) == 4:
options = instance[3]
if validator.is_type(options, "object"):
if "DefaultValue" in options:
for value, _, _ in validator.resolve_value(options["DefaultValue"]):
yield value, deque([]), None
default_value = value

for map_name, _, _ in validator.resolve_value(instance[0]):
if not validator.is_type(map_name, "string"):
continue
for top_level_key, _ in validator.resolve_value(instance[1]):
for top_level_key, _, _ in validator.resolve_value(instance[1]):
if not validator.is_type(top_level_key, "string"):
continue
for second_level_key, _ in validator.resolve_value(instance[2]):
for second_level_key, _, _ in validator.resolve_value(instance[2]):
if not validator.is_type(second_level_key, "string"):
continue
try:
mappings = list(validator.context.mappings.keys())
if not default_value and all(
not (equal(map_name, each)) for each in mappings
):
yield None, deque([0]), ValidationError(
f"{map_name!r} is not one of {mappings!r}", path=[0]
)
continue

top_level_keys = list(
validator.context.mappings[map_name].keys.keys()
)
if not default_value and all(
not (equal(top_level_key, each)) for each in top_level_keys
):
yield None, deque([1]), ValidationError(
f"{top_level_key!r} is not one of {top_level_keys!r}",
path=[0],
)
continue

second_level_keys = list(
validator.context.mappings[map_name]
.keys[top_level_key]
.keys.keys()
)
if not default_value and all(
not (equal(second_level_key, each))
for each in second_level_keys
):
yield None, deque([2]), ValidationError(
f"{second_level_key!r} is not one of {second_level_keys!r}",
path=[0],
)
continue

for value in validator.context.mappings[map_name].find_in_map(
top_level_key,
second_level_key,
):
yield value, deque(
["Mappings", map_name, top_level_key, second_level_key]
)
), None
except KeyError:
pass

if len(instance) == 4:
options = instance[3]
if not validator.is_type(options, "object"):
return
if "DefaultValue" not in options:
return
for value, _ in validator.resolve_value(options["DefaultValue"]):
yield value, deque([])


def get_azs(validator: Validator, instance: Any) -> ResolutionResult:
if not isinstance(instance, (str, dict)):
return

for instance, _ in validator.resolve_value(instance):
for instance, _, _ in validator.resolve_value(instance):
if validator.is_type(instance, "string"):
if instance == "":
instance = validator.context.region
# if the ref is to pseudo-parameter or parameter we can validate the values
if instance in AVAILABILITY_ZONES:
yield AVAILABILITY_ZONES.get(instance), deque([])
yield AVAILABILITY_ZONES.get(instance), deque([]), None


def _join_expansion(validator: Validator, instances: Any) -> Iterator[Any]:
if len(instances) == 0:
return

if len(instances) == 1:
for value, _ in validator.resolve_value(instances[0]):
for value, _, _ in validator.resolve_value(instances[0]):
if not isinstance(value, (str, int, float, bool)):
raise ValueError("Incorrect value type for {value!r}")
yield [value]
return

for value, _ in validator.resolve_value(instances[0]):
for value, _, _ in validator.resolve_value(instances[0]):
if not isinstance(value, (str, int, float, bool)):
raise ValueError("Incorrect value type for {value!r}")
for values in _join_expansion(validator, instances[1:]):
Expand All @@ -122,14 +167,14 @@ def join(validator: Validator, instance: Any) -> ResolutionResult:
if not len(instance) == 2:
return

for delimiter, _ in validator.resolve_value(instance[0]):
for delimiter, _, _ in validator.resolve_value(instance[0]):
if not validator.is_type(delimiter, "string"):
continue
for values, _ in validator.resolve_value(instance[1]):
for values, _, _ in validator.resolve_value(instance[1]):
if not validator.is_type(values, "array"):
continue
for value in _join_expansion(validator, values):
yield delimiter.join(value), deque([])
yield delimiter.join(value), deque([]), None


def select(validator: Validator, instance: Any) -> ResolutionResult:
Expand All @@ -143,8 +188,8 @@ def select(validator: Validator, instance: Any) -> ResolutionResult:
indexes = validator.resolve_value(instance[0])
objs = validator.resolve_value(instance[1])

for i, _ in indexes:
for obj, _ in objs:
for i, _, _ in indexes:
for obj, _, _ in objs:
try:
i = int(i)
except ValueError:
Expand All @@ -153,7 +198,7 @@ def select(validator: Validator, instance: Any) -> ResolutionResult:
continue
if len(obj) <= i:
continue
yield obj[i], deque([])
yield obj[i], deque([]), None


def split(validator: Validator, instance: Any) -> ResolutionResult:
Expand All @@ -162,14 +207,14 @@ def split(validator: Validator, instance: Any) -> ResolutionResult:
if not len(instance) == 2:
return

for delimiter, _ in validator.resolve_value(instance[0]):
for source_string, _ in validator.resolve_value(instance[1]):
for delimiter, _, _ in validator.resolve_value(instance[0]):
for source_string, _, _ in validator.resolve_value(instance[1]):
if not validator.is_type(delimiter, "string"):
continue
if not validator.is_type(source_string, "string"):
continue

yield source_string.split(delimiter), deque([])
yield source_string.split(delimiter), deque([]), None


def _sub_parameter_expansion(
Expand All @@ -182,13 +227,13 @@ def _sub_parameter_expansion(

if len(parameters) == 1:
for key, value in parameters.items():
for resolved_value, _ in validator.resolve_value(value):
for resolved_value, _, _ in validator.resolve_value(value):
yield {key: resolved_value}
return

key = list(parameters.keys())[0]
value = parameters.pop(key)
for resolved_value, _ in validator.resolve_value(value):
for resolved_value, _, _ in validator.resolve_value(value):
for values in _sub_parameter_expansion(validator, parameters):
yield dict({key: resolved_value}, **values)

Expand All @@ -205,7 +250,7 @@ def _replace(matchobj):
raise ValueError(f"No matches for {matchobj.group(2)!r}")

try:
yield re.sub(sub_regex, _replace, string), deque([])
yield re.sub(sub_regex, _replace, string), deque([]), None
except ValueError:
return

Expand Down Expand Up @@ -249,14 +294,14 @@ def if_(validator: Validator, instance: Any) -> ResolutionResult:
return

for i in [1, 2]:
for value, value_path in validator.resolve_value(instance[i]):
for value, value_path, err in validator.resolve_value(instance[i]):
value_path.appendleft(i)
yield value, value_path
yield value, value_path, err


def to_json_string(validator: Validator, instance: Any) -> ResolutionResult:
for value, _ in validator.resolve_value(instance):
yield json.dumps(value), deque([])
for value, _, err in validator.resolve_value(instance):
yield json.dumps(value), deque([]), err


# not all functions need to be resolved. These functions
Expand Down
4 changes: 3 additions & 1 deletion src/cfnlint/jsonschema/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
"""
# Code is taken from jsonschema package and adapted CloudFormation use
# https://github.com/python-jsonschema/jsonschema
from __future__ import annotations

from typing import Any, Callable, Deque, Dict, Iterator, Optional, Tuple

from cfnlint.jsonschema.exceptions import ValidationError

ValidationResult = Iterator[ValidationError]
V = Optional[Callable[[Any, Any, Any, Dict[str, Any]], ValidationResult]]
ResolutionResult = Iterator[Tuple[Any, Deque]]
ResolutionResult = Iterator[Tuple[Any, Deque, Optional[ValidationError]]]
Loading

0 comments on commit df45aea

Please sign in to comment.