Skip to content

Commit

Permalink
Qrexec policy daemon and tests
Browse files Browse the repository at this point in the history
Also includes rudimentary protocol documentation.
Contains only pure policy daemon that's not used
by anything yet.
references QubesOS/qubes-issues#5125
  • Loading branch information
marmarta committed Aug 5, 2019
1 parent 4894d70 commit e97c5a6
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 13 deletions.
35 changes: 35 additions & 0 deletions Documentation/qrexec-policy-daemon.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
Qubes Policy Request Daemon
===========================

Protocol
^^^^^^^^

Request
-------

Newline-separated:

- domain_id=
- source=
- intended_target=
- service_and_arg=
- process_ident=

Optional arguments:

- assume_yes_for_ask=yes
- just_evaluate=yes

Newline-separated


Response
--------

result=allow/deny

newline + further

all not compliant responses are bad no good terrible

end of request/response - empty line
1 change: 1 addition & 0 deletions qrexec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
QUBESD_SOCK = '/var/run/qubesd.sock'

POLICYPATH = pathlib.Path('/etc/qubes/policy.d')
POLICYSOCKET = pathlib.Path('/var/run/qubes/policy.sock')
INCLUDEPATH = POLICYPATH / 'include'
POLICYSUFFIX = '.policy'
POLICYPATH_OLD = pathlib.Path('/etc/qubes-rpc/policy')
Expand Down
179 changes: 179 additions & 0 deletions qrexec/tests/qrexec_policy_daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marta Marczykowska-Górecka
# <[email protected]>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
#

import asyncio
from contextlib import suppress

import pytest
from unittest.mock import Mock
import functools

import unittest
import unittest.mock

from ..tools import qrexec_policy_daemon

class TestPolicyDaemon:
@pytest.fixture
def mock_request(self, monkeypatch):
mock_request = Mock()
monkeypatch.setattr('qrexec.tools.qrexec_policy_daemon.handle_request',
mock_request)
return mock_request

@pytest.fixture
async def async_server(self, tmp_path):
log = unittest.mock.Mock()
server = await asyncio.start_unix_server(
functools.partial(qrexec_policy_daemon.handle_client_connection,
log, "path"),
path=tmp_path / "socket.d")
yield server
server.close()

async def send_data(self, server, path, data):
task = asyncio.create_task(server.serve_forever())

reader, writer = await asyncio.open_unix_connection(
str(path / "socket.d"))
writer.write(data)
await writer.drain()

await reader.read()

writer.close()

task.cancel()

with suppress(asyncio.CancelledError):
await task


@pytest.mark.asyncio
async def test_simple_request(self, mock_request, async_server, tmp_path):

data = b'domain_id=a\n' \
b'source=b\n' \
b'intended_target=c\n' \
b'service_and_arg=d\n' \
b'process_ident=1 9\n\n'

await self.send_data(async_server, tmp_path, data)

mock_request.assert_called_once_with(
domain_id='a', source='b', intended_target='c',
service_and_arg='d', process_ident='1 9', log=unittest.mock.ANY,
path="path")

@pytest.mark.asyncio
async def test_complex_request(self, mock_request, async_server, tmp_path):

data = b'domain_id=a\n' \
b'source=b\n' \
b'intended_target=c\n' \
b'service_and_arg=d\n' \
b'process_ident=9\n' \
b'assume_yes_for_ask=yes\n' \
b'just_evaluate=yes\n\n'

await self.send_data(async_server, tmp_path, data)

mock_request.assert_called_once_with(
domain_id='a', source='b', intended_target='c',
service_and_arg='d', process_ident='9', log=unittest.mock.ANY,
assume_yes_for_ask=True, just_evaluate=True, path="path")

@pytest.mark.asyncio
async def test_complex_request2(self, mock_request, async_server, tmp_path):

data = b'domain_id=a\n' \
b'source=b\n' \
b'intended_target=c\n' \
b'service_and_arg=d\n' \
b'process_ident=9\n' \
b'assume_yes_for_ask=no\n' \
b'just_evaluate=no\n\n'

await self.send_data(async_server, tmp_path, data)

mock_request.assert_called_once_with(
domain_id='a', source='b', intended_target='c',
service_and_arg='d', process_ident='9', log=unittest.mock.ANY,
assume_yes_for_ask=False, just_evaluate=False, path="path")

@pytest.mark.asyncio
async def test_unfinished_request(
self, mock_request, async_server, tmp_path):

data = b'unfinished'

task = self.send_data(async_server, tmp_path, data)

with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(task, timeout=2)

for task in asyncio.tasks.all_tasks():
task.cancel()

with suppress(asyncio.CancelledError):
await asyncio.sleep(1)

mock_request.assert_not_called()

@pytest.mark.asyncio
async def test_too_short_request(
self, mock_request, async_server, tmp_path):

data = b'domain_id=None\n\n'

await self.send_data(async_server, tmp_path, data)

mock_request.assert_not_called()

@pytest.mark.asyncio
async def test_duplicate_arg(self, mock_request, async_server, tmp_path):

data = b'domain_id=a\n' \
b'source=b\n' \
b'intended_target=c\n' \
b'service_and_arg=d\n' \
b'process_ident=9\n' \
b'assume_yes_for_ask=no\n' \
b'just_evaluate=no\n' \
b'domain_id=a\n\n'

await self.send_data(async_server, tmp_path, data)

mock_request.assert_not_called()

@pytest.mark.asyncio
async def test_wrong_arg(self, mock_request, async_server, tmp_path):

data = b'domains_id=a\n' \
b'source=b\n' \
b'intended_target=c\n' \
b'service_and_arg=d\n' \
b'process_ident=9\n' \
b'assume_yes_for_ask=no\n' \
b'just_evaluate=no\n\n'

await self.send_data(async_server, tmp_path, data)

mock_request.assert_not_called()
121 changes: 121 additions & 0 deletions qrexec/tools/qrexec_policy_daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marta Marczykowska-Górecka
# <[email protected]>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
#

import argparse
import functools
import pathlib
import asyncio
import logging

from .. import POLICYPATH, POLICYSOCKET

from .qrexec_policy_exec import handle_request

argparser = argparse.ArgumentParser(description='Evaluate qrexec policy daemon')

argparser.add_argument('--policy-path',
type=pathlib.Path, default=POLICYPATH,
help='Use alternative policy path')
argparser.add_argument('--socket-path',
type=pathlib.Path, default=POLICYSOCKET,
help='Use alternative policy socket path')

required_request_arguments = ('domain_id', 'source', 'intended_target',
'service_and_arg', 'process_ident')

optional_request_arguments = ('assume_yes_for_ask', 'just_evaluate')

allowed_request_arguments = required_request_arguments + \
optional_request_arguments


async def handle_client_connection(log, policy_path, reader, writer):

args = {}

try:
while True:
line = await reader.readline()
line = line.decode('ascii').rstrip('\n')

if not line:
break

argument, value = line.split('=', 1)
if argument in args:
log.error(
'error parsing policy request: '
'duplicate argument {}'.format(argument))
return
if argument not in allowed_request_arguments:
log.error(
'error parsing policy request: unknown argument {}'.format(
argument))
return

if argument in ('assume_yes_for_ask', 'just_evaluate'):
if value == 'yes':
value = True
elif value == 'no':
value = False
else:
log.error(
'error parsing policy request: invalid bool value '
'{} for argument {}'.format(value, argument))
return

args[argument] = value

if not all(arg in args for arg in required_request_arguments):
log.error(
'error parsing policy request: required argument missing')
return

result = handle_request(**args, log=log, path=policy_path)

writer.write(b"result=allow\n" if result else b"result=deny\n")

await writer.drain()

finally:
writer.close()


async def main(args=None):
args = argparser.parse_args(args)

log = logging.getLogger('policy')
log.setLevel(logging.INFO)
if not log.handlers:
handler = logging.handlers.SysLogHandler(address='/dev/log')
log.addHandler(handler)

server = await asyncio.start_unix_server(
functools.partial(handle_client_connection, log, args.policy_path),
path=args.socket_path)

await server.serve_forever()


if __name__ == '__main__':
asyncio.run(main())



Loading

0 comments on commit e97c5a6

Please sign in to comment.