Skip to content

Commit

Permalink
Prevent auth token generation for other jobs
Browse files Browse the repository at this point in the history
This commit adds validation that API user request to generate
an authentication token for a job only succeeds if the job
was initiated by the currently authenticated user.

This commit also adds auditing for when authenticated sessions
generate API tokens.
  • Loading branch information
anodos325 committed Jan 28, 2025
1 parent 7d0fed2 commit 8939a08
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
15 changes: 14 additions & 1 deletion src/middlewared/middlewared/plugins/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@ def generate_onetime_password(self, data):
passwd = OTPW_MANAGER.generate_for_uid(user_data[0]['uid'])
return passwd

@api_method(AuthGenerateTokenArgs, AuthGenerateTokenResult, authorization_required=False)
@api_method(
AuthGenerateTokenArgs, AuthGenerateTokenResult,
audit='Generate authentication token for session',
authorization_required=False
)
@pass_app(rest=True)
def generate_token(self, app, ttl, attrs, match_origin, single_use):
"""
Expand Down Expand Up @@ -360,6 +364,15 @@ def generate_token(self, app, ttl, attrs, match_origin, single_use):
if ttl is None:
ttl = 600

# FIXME: we need to properly define attrs in the schema
if (job_id := attrs.get('job')) is not None:
if job_id not in self.middleware.jobs:
raise CallError(f'{job_id}: job does not exist.')

job = self.middleware.jobs[job_id]
if error := job.credential_access_error(app.authenticated_credential, None)
raise CallError(f'{job_id}: {error}')

token = self.token_manager.create(
ttl,
attrs,
Expand Down
12 changes: 12 additions & 0 deletions tests/api2/test_auth_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import requests

from middlewared.test.integration.assets.account import unprivileged_user as unprivileged_user_template
from middlewared.test.integration.assets.account import unprivileged_user_client
from middlewared.test.integration.utils import call, client, ssh
from middlewared.test.integration.utils.client import truenas_server
from middlewared.test.integration.utils.shell import assert_shell_works
from middlewared.service_exception import CallError


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -111,3 +113,13 @@ def test_single_use_token():
with client(auth=None) as c:
assert c.call("auth.login_with_token", token)
assert not c.call("auth.login_with_token", token)


def test_token_job_validation():
with pytest.raises(CallError, match='job does not exist'):
call("auth.generate_token", 300, {'job': -1})

with unprivileged_user_client(roles=['READONLY_ADMIN']) as c:
job_id = call("smb.synchronize_passdb")
with pytest.raises(CallError, match='Job is not owned by current session'):
c.call("auth.generate_token", 300, {'job': job_id})

0 comments on commit 8939a08

Please sign in to comment.