diff --git a/src/middlewared/middlewared/plugins/auth.py b/src/middlewared/middlewared/plugins/auth.py index 0e0112f55c894..07e0012350966 100644 --- a/src/middlewared/middlewared/plugins/auth.py +++ b/src/middlewared/middlewared/plugins/auth.py @@ -328,7 +328,11 @@ def generate_onetime_password(self, data): passwd = OTPW_MANAGER.generate_for_uid(user_data[0]['uid']) return passwd - @api_method(AuthGenerateTokenArgs, AuthGenerateTokenResult, authorization_required=False) + @api_method( + AuthGenerateTokenArgs, AuthGenerateTokenResult, + audit='Generate authentication token for session', + authorization_required=False + ) @pass_app(rest=True) def generate_token(self, app, ttl, attrs, match_origin, single_use): """ @@ -360,6 +364,18 @@ def generate_token(self, app, ttl, attrs, match_origin, single_use): if ttl is None: ttl = 600 + # FIXME: we need to properly define attrs in the schema + if (job_id := attrs.get('job')) is not None: + job = self.middleware.jobs.get(job_id) + if not job: + raise CallError(f'{job_id}: job does not exist.') + + if error := job.credential_access_error(app.authenticated_credentials, None): + raise CallError(f'{job_id}: {error}') + + if job.pipes.output is None: + raise CallError(f'{job_id}: job is not suitable for download token') + token = self.token_manager.create( ttl, attrs, diff --git a/tests/api2/test_auth_token.py b/tests/api2/test_auth_token.py index cb234ca6caf86..68fdefcdd057d 100644 --- a/tests/api2/test_auth_token.py +++ b/tests/api2/test_auth_token.py @@ -5,14 +5,25 @@ import requests from middlewared.test.integration.assets.account import unprivileged_user as unprivileged_user_template +from middlewared.test.integration.assets.account import unprivileged_user_client from middlewared.test.integration.utils import call, client, ssh from middlewared.test.integration.utils.client import truenas_server from middlewared.test.integration.utils.shell import assert_shell_works +from middlewared.service_exception import CallError @pytest.fixture(scope="module") -def download_token(): - return call("auth.generate_token", 300, {"filename": "debug.txz", "job": 1020}, True) +def job_with_pipe(): + job_id, url = call("core.download", "config.save" , [], "debug.txz") + try: + yield job_id + finally: + call("core.job_abort", job_id) + + +@pytest.fixture(scope="module") +def download_token(job_with_pipe): + return call("auth.generate_token", 300, {"filename": "debug.txz", "job": job_with_pipe}, True) def test_download_auth_token_cannot_be_used_for_upload(download_token): @@ -111,3 +122,12 @@ def test_single_use_token(): with client(auth=None) as c: assert c.call("auth.login_with_token", token) assert not c.call("auth.login_with_token", token) + + +def test_token_job_validation(job_with_pipe): + with pytest.raises(CallError, match='job does not exist'): + call("auth.generate_token", 300, {'job': -1}) + + with unprivileged_user_client(roles=['READONLY_ADMIN']) as c: + with pytest.raises(CallError, match='Job is not owned by current session'): + c.call("auth.generate_token", 300, {'job': job_with_pipe})