diff --git a/.flake8 b/.flake8 index 9f420e2a..a2c7f574 100644 --- a/.flake8 +++ b/.flake8 @@ -110,6 +110,8 @@ per-file-ignores = # additionally test docstrings don't need param lists (DAR, DCO020): tests/**.py: DAR, DCO020, S101, S105, S108, S404, S603, WPS202, WPS210, WPS430, WPS436, WPS441, WPS442, WPS450 + tests/_temporary_private_inject_api_test.py: DAR, DCO020, S101, S105, S108, S404, S603, WPS202, WPS210, WPS226, WPS430, WPS436, WPS441, WPS442, WPS450, WPS201 + src/awx_plugins/interfaces/_temporary_private_inject_api.py: ANN001,ANN201,B950,C901,CCR001,D103,E800,LN001,LN002,Q003,WPS110,WPS111,WPS118,WPS125,WPS204,WPS210,WPS211,WPS213,WPS221,WPS226,WPS231,WPS232,WPS319,WPS323,WPS336,WPS337,WPS361,WPS421,WPS429,WPS430,WPS431,WPS436,WPS442,WPS503,WPS507,WPS516,WPS226 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 49274211..29837be4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -196,6 +196,8 @@ repos: - lxml # dep of `--txt-report`, `--cobertura-xml-report` & `--html-report` - pytest - pytest-mock + - types-Jinja2 + - types-PyYAML args: - --python-version=3.13 - --any-exprs-report=.tox/.tmp/.test-results/mypy--py-3.13 @@ -214,6 +216,8 @@ repos: - lxml # dep of `--txt-report`, `--cobertura-xml-report` & `--html-report` - pytest - pytest-mock + - types-Jinja2 + - types-PyYAML args: - --python-version=3.12 - --any-exprs-report=.tox/.tmp/.test-results/mypy--py-3.12 @@ -232,6 +236,8 @@ repos: - lxml # dep of `--txt-report`, `--cobertura-xml-report` & `--html-report` - pytest - pytest-mock + - types-Jinja2 + - types-PyYAML args: - --python-version=3.11 - --any-exprs-report=.tox/.tmp/.test-results/mypy--py-3.11 @@ -255,5 +261,6 @@ repos: - pytest-mock # needed by pylint-pytest since it picks up pytest's args - pytest-xdist # needed by pylint-pytest since it picks up pytest's args - Sphinx # needed by the Sphinx extension stub + - PyYaml # ModuleNotFoundError: No module named 'yaml' without this ... diff --git a/.pylintrc.toml b/.pylintrc.toml index 23964222..e898d4f0 100644 --- a/.pylintrc.toml +++ b/.pylintrc.toml @@ -422,6 +422,7 @@ disable = [ "useless-import-alias", # MyPy requires the opposite "wrong-import-order", # isort-handled: https://github.com/pylint-dev/pylint/issues/9977 "wrong-import-position", # isort-handled: https://github.com/pylint-dev/pylint/issues/9977 + "relative-beyond-top-level", # Developer preference ] # Enable the message, report, category or checker with the given id(s). You can diff --git a/_type_stubs/awx/main/models/credential.pyi b/_type_stubs/awx/main/models/credential.pyi index 28177f9e..97cc2e75 100644 --- a/_type_stubs/awx/main/models/credential.pyi +++ b/_type_stubs/awx/main/models/credential.pyi @@ -1,19 +1,36 @@ from typing import Callable +from awx_plugins.interfaces._temporary_private_api import ( # noqa: WPS436 + EnvVarsType, + InjectorDefinitionType, + InputDefinitionType, +) from awx_plugins.interfaces._temporary_private_credential_api import ( # noqa: WPS436 Credential, - GenericOptionalPrimitiveType, ) class ManagedCredentialType: + namespace: str + name: str + kind: str + inputs: InputDefinitionType + injectors: InjectorDefinitionType = None + managed: bool = False + custom_injectors: Callable[ + [ + Credential, + EnvVarsType, str, + ], str | None, + ] | None = None + def __init__( self, namespace: str, name: str, kind: str, - inputs: dict[str, list[dict[str, bool | str] | str]], - injectors: dict[str, dict[str, str]] | None = None, + inputs: InputDefinitionType, + injectors: InjectorDefinitionType = None, managed: bool = False, - custom_injector: Callable[[Credential, dict[str, GenericOptionalPrimitiveType], str], str | None] | None = None, + custom_injectors: Callable[['Credential', EnvVarsType, str], str | None] | None = None, ): ... diff --git a/docs/conf.py b/docs/conf.py index 239b7a35..db0eb34a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -212,4 +212,7 @@ # Ref: https://stackoverflow.com/a/30624034/595220 nitpick_ignore = [ # temporarily listed ('role', 'reference') pairs that Sphinx cannot resolve + ('py:class', 'ExtraVarsType'), + ('py:class', 'EnvVarsType'), + ('py:class', 'jinja2.sandbox.ImmutableSandboxedEnvironment'), ] diff --git a/src/awx_plugins/interfaces/_temporary_private_api.py b/src/awx_plugins/interfaces/_temporary_private_api.py index d8dc162b..10b1cd54 100644 --- a/src/awx_plugins/interfaces/_temporary_private_api.py +++ b/src/awx_plugins/interfaces/_temporary_private_api.py @@ -4,17 +4,22 @@ The hope is that it will be refactored into something more standardized. """ -from collections.abc import Callable +from collections.abc import Callable, Mapping +from typing import Union from ._temporary_private_credential_api import ( # noqa: WPS436 Credential as Credential, - GenericOptionalPrimitiveType, ) -InputSchemaValueType = list[dict[str, str | bool]] -InputSchemaType = dict[str, InputSchemaValueType] +InputDefinitionValueType = list[dict[str, str | bool]] +InputDefinitionType = dict[str, InputDefinitionValueType] +InjectorDefinitionBaseType = dict[str, dict[str, str]] +InjectorDefinitionType = Union[InjectorDefinitionBaseType, None] + +EnvVarsValueType = Mapping[str, 'EnvVarsType'] | list['EnvVarsType'] | str +EnvVarsType = dict[str, EnvVarsValueType] try: # pylint: disable-next=unused-import @@ -37,10 +42,10 @@ class ManagedCredentialType: # type: ignore[no-redef] # noqa: WPS440 kind: str """Plugin category.""" - inputs: InputSchemaType + inputs: InputDefinitionType """UI input fields schema.""" - injectors: dict[str, dict[str, str]] | None = None + injectors: InjectorDefinitionType = None """Injector hook parameters.""" managed: bool = False @@ -49,36 +54,9 @@ class ManagedCredentialType: # type: ignore[no-redef] # noqa: WPS440 custom_injectors: Callable[ [ Credential, - dict[str, GenericOptionalPrimitiveType], str, + EnvVarsType, str, ], str | None, ] | None = None """Function to call as an alternative to the templated injection.""" - def inject_credential( # noqa: WPS211 - self: 'ManagedCredentialType', - credential: Credential, - env: dict[str, GenericOptionalPrimitiveType], - safe_env: dict[str, GenericOptionalPrimitiveType], - args: list[GenericOptionalPrimitiveType], - private_data_dir: str, - ) -> None: # noqa: DAR101 - """Transform credential data into runtime data. - - This inject_credential is the entry point within this - project. Outside of this project, in AWX, the standalone - inject_credential is called directly. Once the above awx - import hack is removed the duplication can be removed. - """ - from ._temporary_private_inject_api import ( # noqa: WPS433, WPS436 - inject_credential, - ) - inject_credential( - self, - credential, - env, - safe_env, - args, - private_data_dir, - ) - __all__ = () # noqa: WPS410 diff --git a/src/awx_plugins/interfaces/_temporary_private_credential_api.py b/src/awx_plugins/interfaces/_temporary_private_credential_api.py index e4bb8f50..fa5960d6 100644 --- a/src/awx_plugins/interfaces/_temporary_private_credential_api.py +++ b/src/awx_plugins/interfaces/_temporary_private_credential_api.py @@ -6,6 +6,8 @@ GenericOptionalPrimitiveType = bool | str | int | float | None # noqa: WPS465 """Generic type for input values.""" +CredentialInputType = dict[str, GenericOptionalPrimitiveType] + class Credential: """Input supplied by the user. @@ -16,9 +18,9 @@ class Credential: def __init__( self: 'Credential', - inputs: dict[str, GenericOptionalPrimitiveType] | None = None, + inputs: CredentialInputType | None = None, ) -> None: - self._inputs: dict[str, GenericOptionalPrimitiveType] = inputs or {} + self._inputs: CredentialInputType = inputs or {} def get_input( self: 'Credential', diff --git a/src/awx_plugins/interfaces/_temporary_private_inject_api.py b/src/awx_plugins/interfaces/_temporary_private_inject_api.py index 74d77cd6..e884f481 100644 --- a/src/awx_plugins/interfaces/_temporary_private_inject_api.py +++ b/src/awx_plugins/interfaces/_temporary_private_inject_api.py @@ -1,23 +1,37 @@ -"""Injection makes use of plugins.""" +"""Injectors exercise plugins.""" import os import re import stat import tempfile +from collections.abc import Mapping +from types import SimpleNamespace -from jinja2 import sandbox +from jinja2.sandbox import ImmutableSandboxedEnvironment from yaml import safe_dump as yaml_safe_dump from awx_plugins.interfaces._temporary_private_container_api import ( get_incontainer_path, ) -from ._temporary_private_api import ManagedCredentialType +from ._temporary_private_api import EnvVarsType, ManagedCredentialType from ._temporary_private_credential_api import ( Credential, GenericOptionalPrimitiveType, ) +# pylint: disable-next=too-few-public-methods +class TowerNamespace(SimpleNamespace): + """Dummy class.""" + + filename: str | SimpleNamespace | None = None + + +TowerNamespaceValueType = TowerNamespace | GenericOptionalPrimitiveType +ExtraVarsType = Mapping[str, 'ExtraVarsType'] | list['ExtraVarsType'] | str + +ArgsType = list[str] + HIDDEN_PASSWORD = '*' * 10 SENSITIVE_ENV_VAR_NAMES = 'API|TOKEN|KEY|SECRET|PASS' @@ -50,8 +64,8 @@ def build_safe_env( - env: dict[str, GenericOptionalPrimitiveType], -) -> dict[str, GenericOptionalPrimitiveType]: + env: EnvVarsType, +) -> EnvVarsType: """Obscure potentially sensitive env values. Given a set of environment variables, execute a set of heuristics to @@ -61,7 +75,7 @@ def build_safe_env( :returns: Sanitized environment variables. """ safe_env = dict(env) - for env_k, env_val in safe_env.items(): + for env_k, env_v in safe_env.items(): is_special = ( env_k == 'AWS_ACCESS_KEY_ID' or ( @@ -72,16 +86,21 @@ def build_safe_env( ) if is_special: continue - elif HIDDEN_PASSWORD_RE.search(env_k): + if HIDDEN_PASSWORD_RE.search(env_k): safe_env[env_k] = HIDDEN_PASSWORD - elif isinstance(env_val, str) and HIDDEN_URL_PASSWORD_RE.match(env_val): + elif isinstance(env_v, str) and HIDDEN_URL_PASSWORD_RE.match(env_v): safe_env[env_k] = HIDDEN_URL_PASSWORD_RE.sub( - HIDDEN_PASSWORD, env_val, + HIDDEN_PASSWORD, env_v, ) return safe_env def secret_fields(cred_type: ManagedCredentialType) -> list[str]: + """List of fields that are sensitive from the credential type. + + :param cred_type: Where the secret field descriptions live + :return: list of secret field names + """ return [ str(field['id']) for field in cred_type.inputs.get('fields', []) @@ -89,43 +108,83 @@ def secret_fields(cred_type: ManagedCredentialType) -> list[str]: ] +def _build_extra_vars( + sandbox: ImmutableSandboxedEnvironment, + namespace: dict[str, TowerNamespaceValueType], + node: Mapping[str, str | list[str]] | list[str] | str, +) -> ExtraVarsType: + """Execute template to generate extra vars. + + :param sandbox: jinja2 sandbox environment + :param namespace: variables available to the jinja2 sandbox + :param node: extra vars for this iteration + :return: filled in extra vars node + """ + if isinstance(node, Mapping): + return { + str(_build_extra_vars(sandbox, namespace, entry)): + _build_extra_vars(sandbox, namespace, v) + for entry, v in node.items() + } + if isinstance(node, list): + return [_build_extra_vars(sandbox, namespace, entry) for entry in node] + return sandbox.from_string(node).render(**namespace) + + +def _build_extra_vars_file( + extra_vars: ExtraVarsType, + private_dir: str, +) -> str: + """Serialize extra vars out to a file. + + :param extra_vars: python dict to serialize + :param private_dir: base directory to create file in + :return: path to the file + """ + handle, path = tempfile.mkstemp( + dir=os.path.join(private_dir, 'env'), + ) + f = os.fdopen(handle, 'w') + f.write(yaml_safe_dump(extra_vars)) + f.close() + os.chmod(path, stat.S_IRUSR) + return path + + +# pylint: disable-next=too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-branches,too-many-statements def inject_credential( cred_type: ManagedCredentialType, credential: Credential, - env: dict[str, GenericOptionalPrimitiveType], - safe_env: dict[str, GenericOptionalPrimitiveType], - args: list[GenericOptionalPrimitiveType], + env: EnvVarsType, + safe_env: EnvVarsType, + args: ArgsType, private_data_dir: str, ) -> None: + # pylint: disable=unidiomatic-typecheck """Inject credential data. - Inject credential data into the environment variables and - arguments passed to `ansible-playbook` - - :param cred_type: an instance of ManagedCredentialType - :param credential: a :class:`awx.main.models.Credential` instance - :param env: a dictionary of environment variables used in - the `ansible-playbook` call. This method adds - additional environment variables based on - custom `env` injectors defined on this - CredentialType. - :param safe_env: a dictionary of environment variables stored - in the database for the job run - (`UnifiedJob.job_env`); secret values should - be stripped - :param args: a list of arguments passed to - `ansible-playbook` in the style of - `subprocess.call(args)`. This method appends - additional arguments based on custom - `extra_vars` injectors defined on this - CredentialType. - :param private_data_dir: a temporary directory to store files generated - by `file` injectors (like config files or key - files) + Inject credential data into the environment variables and arguments + passed to ansible-playbook + + :param cred_type: an instance of ManagedCredentialType + :param credential: credential holding the input to be used + :param env: a dictionary of environment variables used in the + ansible-playbook call. This method adds additional environment + variables based on custom env injectors defined on this + CredentialType. + :param safe_env: a dictionary of environment variables stored in the + database for the job run secret values should be stripped + :param args: a list of arguments passed to ansible-playbook in the + style of subprocess.call(args). This method appends additional + arguments based on custom extra_vars injectors defined on this + CredentialType. + :param private_data_dir: a temporary directory to store files + generated by file injectors (like config files or key files) + :returns: None """ if not cred_type.injectors: if cred_type.managed and cred_type.custom_injectors: - injected_env: dict[str, GenericOptionalPrimitiveType] = {} + injected_env: EnvVarsType = {} cred_type.custom_injectors( credential, injected_env, private_data_dir, ) @@ -133,21 +192,19 @@ def inject_credential( safe_env.update(build_safe_env(injected_env)) return - class TowerNamespace: - """Dummy class.""" - tower_namespace = TowerNamespace() # maintain a normal namespace for building the ansible-playbook # arguments (env and args) - namespace: dict[str, TowerNamespace | GenericOptionalPrimitiveType] = { + namespace: dict[str, TowerNamespaceValueType] = { 'tower': tower_namespace, } # maintain a sanitized namespace for building the DB-stored arguments # (safe_env) - safe_namespace: dict[str, TowerNamespace | GenericOptionalPrimitiveType] = { - 'tower': tower_namespace, } + safe_namespace: dict[str, TowerNamespaceValueType] = { + 'tower': tower_namespace, + } # build a normal namespace with secret values decrypted (for # ansible-playbook) and a safe namespace with secret values hidden (for @@ -170,8 +227,9 @@ class TowerNamespace: for field in cred_type.inputs.get('fields', []): field_id = str(field['id']) + field_type_is_bool = field['type'] == 'boolean' # default missing boolean fields to False - if field['type'] == 'boolean' and field_id not in credential.get_input_keys(): + if field_type_is_bool and field_id not in credential.get_input_keys(): namespace[field_id] = False safe_namespace[field_id] = False # make sure private keys end with a \n @@ -186,27 +244,29 @@ class TowerNamespace: # special `tower` template namespace so the filename can be # referenced in other injectors - sandbox_env = sandbox.ImmutableSandboxedEnvironment() # type: ignore[misc] + sandbox_env = ImmutableSandboxedEnvironment() + + file: str | None = None + files: dict[str, str] = {} for file_label, file_tmpl in file_tmpls.items(): data: str = sandbox_env.from_string(file_tmpl).render( **namespace, - ) # type: ignore[misc] + ) env_dir = os.path.join(private_data_dir, 'env') - _, path = tempfile.mkstemp(dir=env_dir) - with open(path, 'w') as f: + path = tempfile.mkstemp(dir=env_dir)[1] + with open(path, 'w') as f: # pylint: disable=unspecified-encoding f.write(data) os.chmod(path, stat.S_IRUSR | stat.S_IWUSR) container_path = get_incontainer_path(path, private_data_dir) # determine if filename indicates single file or many if file_label.find('.') == -1: - tower_namespace.filename = container_path + file = container_path else: - if not hasattr(tower_namespace, 'filename'): - tower_namespace.filename = TowerNamespace() - file_label = file_label.split('.')[1] - setattr(tower_namespace.filename, file_label, container_path) + files[file_label.split('.')[1]] = container_path + + tower_namespace.filename = file or SimpleNamespace(**files) for env_var, tmpl in cred_type.injectors.get('env', {}).items(): if env_var in ENV_BLOCKLIST: @@ -216,37 +276,17 @@ class TowerNamespace: tmpl, ).render(**safe_namespace) - if 'INVENTORY_UPDATE_ID' not in env: - # awx-manage inventory_update does not support extra_vars via -e - def build_extra_vars( - node: dict[str, str | list[str]] | list[str] | str, - ) -> dict[str, str] | list[str] | str: - if isinstance(node, dict): - return { - build_extra_vars(k): build_extra_vars(v) for k, - v in node.items() - } - elif isinstance(node, list): - return [build_extra_vars(x) for x in node] - else: - return sandbox_env.from_string(node).render(**namespace) - - def build_extra_vars_file(vars, private_dir: str) -> str: - handle, path = tempfile.mkstemp( - dir=os.path.join(private_dir, 'env'), - ) - f = os.fdopen(handle, 'w') - f.write(yaml_safe_dump(vars)) - f.close() - os.chmod(path, stat.S_IRUSR) - return path - - extra_vars = build_extra_vars( - cred_type.injectors.get( - 'extra_vars', {}, - ), + extra_vars = _build_extra_vars( + sandbox_env, + namespace, + cred_type.injectors.get( + 'extra_vars', {}, + ), + ) + if extra_vars: + path = _build_extra_vars_file(extra_vars, private_data_dir) + container_path = get_incontainer_path(path, private_data_dir) + args.extend( + # pylint: disable-next=consider-using-f-string + ['-e', '@%s' % container_path], ) - if extra_vars: - path = build_extra_vars_file(extra_vars, private_data_dir) - container_path = get_incontainer_path(path, private_data_dir) - args.extend(['-e', '@%s' % container_path]) diff --git a/tests/_temporary_private_api_test.py b/tests/_temporary_private_api_test.py index 8d6cc712..2b6aab89 100644 --- a/tests/_temporary_private_api_test.py +++ b/tests/_temporary_private_api_test.py @@ -1,11 +1,8 @@ """Tests for the temporarily hosted private helpers.""" -from awx_plugins.interfaces._temporary_private_api import ( - ManagedCredentialType, -) +from awx_plugins.interfaces._temporary_private_api import ManagedCredentialType def test_managed_credential_type_instantiation() -> None: """Check that managed credential type can be instantiated.""" - assert ManagedCredentialType('', '', '', {}) diff --git a/tests/_temporary_private_inject_api_test.py b/tests/_temporary_private_inject_api_test.py index 68235e3b..d1e89814 100644 --- a/tests/_temporary_private_inject_api_test.py +++ b/tests/_temporary_private_inject_api_test.py @@ -1,150 +1,246 @@ +"""Tests for injector interface.""" + import os -from pathlib import Path -import jinja2 -import pytest import shutil import tempfile -import yaml +from collections.abc import Generator +from pathlib import Path +from unittest import mock # pylint: disable=preferred-module + +import pytest + +import jinja2 +from yaml import safe_load as yaml_safe_load -from awx_plugins.interfaces._temporary_private_container_api import CONTAINER_ROOT -from awx_plugins.interfaces._temporary_private_credential_api import Credential from awx_plugins.interfaces._temporary_private_api import ( + EnvVarsType, + InjectorDefinitionType, + InputDefinitionType, ManagedCredentialType, ) +from awx_plugins.interfaces._temporary_private_container_api import ( + CONTAINER_ROOT, +) +from awx_plugins.interfaces._temporary_private_credential_api import ( + Credential, + CredentialInputType, +) from awx_plugins.interfaces._temporary_private_inject_api import ( HIDDEN_PASSWORD, + ArgsType, + inject_credential, ) + +# pylint: disable=redefined-outer-name def to_host_path(path: str, private_data_dir: str) -> str: """Convert container path to host path. Given a path inside of the EE container, this gives the absolute path on the host machine within the private_data_dir. - """ + :param path: container path + :param private_data_dir: runtime directory + :raises ValueError: When private_data_dir is not an absolute path + :raises ValueError: path must be a subdir of the container root dir + :return: Absolute path of private_data_dir on the container host + """ if not os.path.isabs(private_data_dir): - raise RuntimeError('The private_data_dir path must be absolute') - if CONTAINER_ROOT != path and Path( - CONTAINER_ROOT, - ) not in Path(path).resolve().parents: - raise RuntimeError( - f'Cannot convert path {path} unless it is a subdir of {CONTAINER_ROOT}', ) + raise ValueError('The private_data_dir path must be absolute') + is_subdir_of_container = ( + CONTAINER_ROOT == path + or Path(CONTAINER_ROOT) in Path(path).resolve().parents + ) + if not is_subdir_of_container: + raise ValueError( + f'Cannot convert path {path}, not a subdir of {CONTAINER_ROOT}', + ) return path.replace(CONTAINER_ROOT, private_data_dir, 1) -def read_extra_vars(private_data_dir: str, args: list[str]) -> dict[str, str]: - fname = to_host_path(args[1][1:], private_data_dir) - with open(fname) as f: - return yaml.safe_load(f) - - -def assert_dict_subset(subset, full_dict): - """Recursively asserts that `subset` is a subset of `full_dict`.""" - - for key, value in subset.items(): - assert key in full_dict, f"Key '{key}' not found in full_dict" - if isinstance(value, dict): - assert isinstance( - full_dict[key], dict, - ), f"Key '{key}' is not a dictionary in full_dict" - assert_dict_subset(value, full_dict[key]) - else: - assert value == full_dict[key], f"Value mismatch for key '{key}': {value} != { - full_dict[key] - }" - - @pytest.fixture -def private_data_dir(): +def private_data_dir() -> Generator[str, None, None]: + """Simulate ansible-runner directory backed runtime parameters. + + :yield: runtime directory + """ private_data = tempfile.mkdtemp(prefix='awx_') for subfolder in ('inventory', 'env'): runner_subfolder = os.path.join(private_data, subfolder) - if not os.path.exists(runner_subfolder): - os.mkdir(runner_subfolder) + os.mkdir(runner_subfolder) yield private_data - shutil.rmtree(private_data, True) + shutil.rmtree(private_data, ignore_errors=True) -def test_managed_credential_type_inject_cred() -> None: - """Check basic env var injection.""" +def test_to_host_path_abs_path() -> None: + """Check relative path results in an error.""" + with pytest.raises(ValueError, match='.*path must be absolute'): + to_host_path('', 'is/not/absolute/') + + +def test_to_host_path_subdir() -> None: + """Check path must be a subdir of the container dir.""" + with pytest.raises(ValueError, match='.* not a subdir .*'): + to_host_path('not_a_subdir_of_CONTAINER_ROOT', '/is/absolute/') + +@pytest.mark.parametrize( + ( + 'inputs', + 'injectors', + 'cred_inputs', + 'expected_env_vars', + ), + ( + pytest.param( + { + 'fields': [ + { + 'id': 'my_field_name', + 'label': 'My field name', + 'type': 'string', + }, + ], + }, + {'env': {'FIELD_NAME': '{{my_field_name}}'}}, + {'my_field_name': 'just_another_value'}, + {'FIELD_NAME': 'just_another_value'}, + id='fields-env', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'my_var', + 'label': 'My var name', + 'type': 'string', + }, + ], + }, + {'env': {'VAR_NAME': '{{my_var}}'}}, + {'var_name': ''}, + {'VAR_NAME': ''}, + id='fields-env-missing-input', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'my_ssh_key', + 'label': 'My ssh key', + 'type': 'string', + 'format': 'ssh_private_key', + }, + ], + }, + {'env': {'MY_SSH_KEY': '{{my_ssh_key}}'}}, + {'my_ssh_key': 'super_secret'}, + {'MY_SSH_KEY': 'super_secret\n'}, + id='field-format-ssh-private-key-add-newline', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'my_rsa_key', + 'label': 'My rsa key', + 'type': 'string', + 'format': 'ssh_private_key', + }, + ], + }, + {'env': {'RSA_THING': '{{my_rsa_key}}'}}, + {'my_rsa_key': 'secret_rsa_key\n'}, + {'RSA_THING': 'secret_rsa_key\n'}, + id='field-format-ssh-private-key-do-not-add-newline', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'api_oauth_token', + 'label': 'API Oauth Token', + 'type': 'string', + }, + ], + }, + {'env': {'JOB_ID': 'reserved'}}, + {'api_oauth_token': 'ABC789'}, + {}, + id='reserved-env-var', + ), + ), +) +def test_injectors_with_env_vars( + inputs: InputDefinitionType, + injectors: InjectorDefinitionType, + cred_inputs: CredentialInputType, + expected_env_vars: dict[str, str], +) -> None: + """Check basic env var injection.""" cred_type = ManagedCredentialType( namespace='animal', name='dog', kind='companion', - managed=False, - inputs={ - 'fields': [ - { - 'id': 'my_pet_name', - 'label': 'My pet name', - 'type': 'string', - }, - ], - }, - injectors={'env': {'PET_NAME': '{{my_pet_name}}'}}, - ) - cred = Credential( - inputs={ - 'my_pet_name': 'birdie', - }, + managed=True, + inputs=inputs, + injectors=injectors, ) + cred = Credential(inputs=cred_inputs) - env = {} - cred_type.inject_credential(cred, env, {}, [], '') + env: EnvVarsType = {} + inject_credential(cred_type, cred, env, {}, [], '') - assert env['PET_NAME'] == 'birdie' + assert expected_env_vars.items() == env.items() def test_injectors_with_jinja_syntax_error( - private_data_dir, -): + private_data_dir: str, +) -> None: + """Check malicious jinja is not allowed.""" cred_type = ManagedCredentialType( - kind='cloud', - name='SomeCloud', + kind='cloudx', + name='SomeCloudy', namespace='foo', managed=False, - inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]}, - injectors={'env': {'MY_CLOUD_API_TOKEN': '{{api_token.foo()}}'}}, + inputs={ + 'fields': [ + {'id': 'api_oauth', 'label': 'API Token', 'type': 'string'}, + ], + }, + injectors={'env': {'MY_CLOUD_OAUTH': '{{api_oauth.foo()}}'}}, ) - credential = Credential(inputs={'api_token': 'ABC123'}) + credential = Credential(inputs={'api_oauth': 'ABC123'}) with pytest.raises(jinja2.exceptions.UndefinedError): - cred_type.inject_credential(credential, {}, {}, [], private_data_dir) - - -def test_injectors_with_reserved_env_var(private_data_dir): - cred_type = ManagedCredentialType( - kind='cloud', - name='SomeCloud', - namespace='foo', - managed=False, - inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]}, - injectors={'env': {'JOB_ID': 'reserved'}}, - ) - credential = Credential(inputs={'api_token': 'ABC123'}) + inject_credential(cred_type, credential, {}, {}, [], private_data_dir) - env = {} - cred_type.inject_credential(credential, env, {}, [], private_data_dir) - assert 'JOB_ID' not in env - - -def test_injectors_with_secret_field(private_data_dir): +def test_injectors_with_secret_field(private_data_dir: str) -> None: + """Check that secret values are obscured.""" cred_type = ManagedCredentialType( - kind='cloud', - name='SomeCloud', + kind='clouda', + name='SomeCloudb', namespace='foo', managed=False, - inputs={'fields': [{'id': 'password', 'label': 'Password', 'type': 'string', 'secret': True}]}, + inputs={ + 'fields': [ + { + 'id': 'password', + 'label': 'Password', + 'type': 'string', + 'secret': True, + }, + ], + }, injectors={'env': {'MY_CLOUD_PRIVATE_VAR': '{{password}}'}}, ) credential = Credential(inputs={'password': 'SUPER-SECRET-123'}) - env = {} - safe_env = {} - cred_type.inject_credential( - credential, env, safe_env, [], private_data_dir, + env: EnvVarsType = {} + safe_env: EnvVarsType = {} + inject_credential( + cred_type, credential, env, safe_env, [], private_data_dir, ) assert env['MY_CLOUD_PRIVATE_VAR'] == 'SUPER-SECRET-123' @@ -153,51 +249,135 @@ def test_injectors_with_secret_field(private_data_dir): @pytest.mark.parametrize( - ('inputs', 'injectors', 'cred_inputs', 'expected_extra_vars'), + ( + 'inputs', + 'injectors', + 'cred_inputs', + 'expected_extra_vars', + ), ( pytest.param( - {'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]}, - {'extra_vars': {'api_token': '{{api_token}}'}}, - {'api_token': 'ABC123'}, - {'api_token': 'ABC123'}, + { + 'fields': [ + { + 'id': 'api_secret', + 'label': 'API Secret', + 'type': 'string', + }, + ], + }, + {'extra_vars': {'api_secret': '{{api_secret}}'}}, + {'api_secret': 'ABC123'}, + {'api_secret': 'ABC123'}, id='happy-path', ), pytest.param( - {'fields': [{'id': 'turbo_button', 'label': 'Turbo Button', 'type': 'boolean'}]}, + { + 'fields': [ + { + 'id': 'turbo_button', + 'label': 'Turbo Button', + 'type': 'boolean', + }, + ], + }, {'extra_vars': {'turbo_button': '{{turbo_button}}'}}, {'turbo_button': True}, {'turbo_button': 'True'}, id='boolean', ), pytest.param( - {'fields': [{'id': 'host', 'label': 'Host', 'type': 'string'}]}, + { + 'fields': [ + { + 'id': 'host', + 'label': 'Host', + 'type': 'string', + }, + ], + }, {'extra_vars': {'auth': {'host': '{{host}}'}}}, - {'host': 'example.com'}, - {'auth': {'host': 'example.com'}}, - id='nested', + {'host': 'foo.example.com'}, + {'auth': {'host': 'foo.example.com'}}, + id='nested-dict', ), pytest.param( - {'fields': [{'id': 'environment', 'label': 'Environment', 'type': 'string'}, {'id': 'host', 'label': 'Host', 'type': 'string'}]}, + { + 'fields': [ + { + 'id': 'host', + 'label': 'Host', + 'type': 'string', + }, + ], + }, + { + 'extra_vars': { + 'auth': { + 'host': [ + '{{host_1}}', + '{{host_2}}', + '{{host_3}}', + ], + }, + }, + }, + {'host_1': 'a.com', 'host_2': 'b.com', 'host_3': 'c.com'}, + {'auth': {'host': ['a.com', 'b.com', 'c.com']}}, + id='nested-list', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'environment', + 'label': 'Environment', + 'type': 'string', + }, + { + 'id': 'host', + 'label': 'Host', + 'type': 'string', + }, + ], + }, {'extra_vars': {'{{environment}}_auth': {'host': '{{host}}'}}}, {'environment': 'test', 'host': 'example.com'}, {'test_auth': {'host': 'example.com'}}, id='templated-key', ), pytest.param( - {'fields': [{'id': 'turbo_button', 'label': 'Turbo Button', 'type': 'boolean'}]}, - {'extra_vars': {'turbo_button': '{% if turbo_button %}FAST!{% else %}SLOW!{% endif %}'}}, - {'turbo_button': True}, - {'turbo_button': 'FAST!'}, + { + 'fields': [ + { + 'id': 'trubo', + 'label': 'Turbo Button', + 'type': 'boolean', + }, + ], + }, + { + 'extra_vars': { + 'turbo': '{% if turbo %}FAST!{% else %}SLOW!{% endif %}', + }, + }, + {'turbo': True}, + {'turbo': 'FAST!'}, id='templated-bool', ), ), ) -def test_custom_environment_injectors_with_extra_vars( - private_data_dir, inputs, injectors, cred_inputs, expected_extra_vars, -): +def test_injectors_with_extra_vars( + private_data_dir: str, + inputs: InputDefinitionType, + injectors: InjectorDefinitionType, + cred_inputs: CredentialInputType, + expected_extra_vars: dict[str, str], +) -> None: + """Check extra vars are injected in a file.""" cred_type = ManagedCredentialType( - kind='cloud', - name='SomeCloud', + kind='cloudc', + name='SomeCloudd', namespace='foo', managed=False, inputs=inputs, @@ -205,12 +385,13 @@ def test_custom_environment_injectors_with_extra_vars( ) credential = Credential(inputs=cred_inputs) - args = [] - cred_type.inject_credential(credential, {}, {}, args, private_data_dir) + args: ArgsType = [] + inject_credential(cred_type, credential, {}, {}, args, private_data_dir) + extra_vars_fname = to_host_path(args[1][1:], private_data_dir) + with open(extra_vars_fname, encoding='utf-8') as extra_vars_file: + extra_vars = yaml_safe_load(extra_vars_file) - extra_vars = read_extra_vars(private_data_dir, args) - - assert_dict_subset(expected_extra_vars, extra_vars) + assert expected_extra_vars.items() <= extra_vars.items() @pytest.mark.parametrize( @@ -233,9 +414,9 @@ def test_custom_environment_injectors_with_extra_vars( 'file': {'template': '[mycloud]\n{{api_token}}'}, 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}, }, - {'api_token': 'ABC123'}, + {'api_token': 'ABC456'}, { - 'MY_CLOUD_INI_FILE': '[mycloud]\nABC123', + 'MY_CLOUD_INI_FILE': '[mycloud]\nABC456', }, id='ini-file', ), @@ -243,11 +424,11 @@ def test_custom_environment_injectors_with_extra_vars( {'fields': []}, { 'file': {'template': 'Iñtërnâtiônàlizætiøn'}, - 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}, + 'env': {'MY_PERSONAL_INI_FILE': '{{tower.filename}}'}, }, {}, { - 'MY_CLOUD_INI_FILE': 'Iñtërnâtiônàlizætiøn', + 'MY_PERSONAL_INI_FILE': 'Iñtërnâtiônàlizætiøn', }, id='unicode', ), @@ -288,16 +469,17 @@ def test_custom_environment_injectors_with_extra_vars( ), ), ) -def test_custom_environment_injectors_with_file( - private_data_dir, - inputs, - injectors, - cred_inputs, - expected_file_content, -): +def test_injectors_with_file( + private_data_dir: str, + inputs: InputDefinitionType, + injectors: InjectorDefinitionType, + cred_inputs: CredentialInputType, + expected_file_content: dict[str, str], +) -> None: + """Check data flows from credential into a file.""" cred_type = ManagedCredentialType( - kind='cloud', - name='SomeCloud', + kind='cloude', + name='SomeCloudf', namespace='foo', managed=False, inputs=inputs, @@ -305,10 +487,92 @@ def test_custom_environment_injectors_with_file( ) credential = Credential(inputs=cred_inputs) - env = {} - cred_type.inject_credential(credential, env, {}, [], private_data_dir) + env: EnvVarsType = {} + inject_credential(cred_type, credential, env, {}, [], private_data_dir) for env_fname_key, expected_content in expected_file_content.items(): - path = to_host_path(env[env_fname_key], private_data_dir) - with open(path) as f: - assert f.read() == expected_content + path = to_host_path(str(env[env_fname_key]), private_data_dir) + with open(path, encoding='utf-8') as injected_file: + assert injected_file.read() == expected_content + + +@pytest.mark.parametrize( + 'managed', (True, False), # noqa: WPS425 +) +def test_custom_injectors(private_data_dir: str, managed: bool) -> None: + """Check that custom injectors is used when defined.""" + injector = mock.Mock() + cred_type = ManagedCredentialType( + kind='cloudh', + name='SomeCloudi', + namespace='foo', + managed=managed, + inputs={'fields': []}, + custom_injectors=injector, + ) + credential = Credential(inputs={}) + + env: EnvVarsType = {} + inject_credential(cred_type, credential, env, {}, [], private_data_dir) + + if managed: + injector.assert_called_once() + else: + injector.assert_not_called() + + +@pytest.mark.parametrize( + ( + 'custom_injectors_env', + 'expected_safe_env', + ), + ( + pytest.param( + {'foo': 'bar'}, + {'foo': 'bar'}, + ), + pytest.param( + {'MY_SPECIAL_TOKEN': 'foobar'}, + {'MY_SPECIAL_TOKEN': '**********'}, + ), + pytest.param( + {'ANSIBLE_MY_SPECIAL_TOKEN': 'foobar'}, + {'ANSIBLE_MY_SPECIAL_TOKEN': 'foobar'}, + ), + pytest.param( + {'FOO': 'https://my-username:my-password@foo.com'}, + {'FOO': '**********'}, + ), + ), +) +def test_custom_injectors_safe_env( + private_data_dir: str, + custom_injectors_env: EnvVarsType, + expected_safe_env: EnvVarsType, +) -> None: + """Check that special env vars are obscured in safe env.""" + def custom_injectors(_cr: Credential, env: EnvVarsType, _pd: str) -> None: + env |= custom_injectors_env + + cred_type = ManagedCredentialType( + kind='cloud', + name='SomeCloud', + namespace='foo', + managed=True, + inputs={}, + custom_injectors=custom_injectors, + ) + cred = Credential(inputs={}) + + env: EnvVarsType = {} + safe_env: EnvVarsType = {} + inject_credential( + cred_type, + cred, + env, + safe_env, + [], + private_data_dir, + ) + + assert safe_env.items() == expected_safe_env.items()