diff --git a/project/tests/test_collector.py b/project/tests/test_collector.py deleted file mode 100644 index dd35410f..00000000 --- a/project/tests/test_collector.py +++ /dev/null @@ -1,99 +0,0 @@ -import cProfile -import sys - -from django.test import TestCase -from tests.util import DictStorage - -from silk.collector import DataCollector -from silk.config import SilkyConfig - -from .factories import RequestMinFactory - - -class TestCollector(TestCase): - def test_singleton(self): - a = DataCollector() - b = DataCollector() - c = DataCollector() - self.assertTrue(a == b == c) - - def test_query_registration(self): - mock_query = {} - DataCollector().register_query(mock_query) - self.assertIn(mock_query, list(DataCollector().queries.values())) - - def test_clear(self): - self.test_query_registration() - DataCollector().clear() - self.assertFalse(DataCollector().queries) - - def test_finalise(self): - request = RequestMinFactory() - DataCollector().configure(request) - with self.subTest("Default file-based storage"): - DataCollector().finalise() - file = DataCollector().request.prof_file - self.assertIsNotNone(file) - with file.storage.open(file.name) as f: - content = f.read() - self.assertTrue(content) - - # Some storages, such as S3Boto3Storage, don't support local file system path. - # Simulate this behaviour using DictStorage. - with self.subTest("Pathless storage"): - request.prof_file.storage = DictStorage() - DataCollector().finalise() - file = DataCollector().request.prof_file - self.assertIsNotNone(file) - with file.storage.open(file.name) as f: - content = f.read() - self.assertTrue(content) - self.assertGreater(len(content), 0) - - def test_configure_exception(self): - other_profiler = cProfile.Profile() - other_profiler.enable() - collector = DataCollector() - collector.configure() - other_profiler.disable() - if sys.version_info >= (3, 12): - self.assertEqual(collector.local.pythonprofiler, None) - else: - self.assertIsNotNone(collector.local.pythonprofiler) - collector.stop_python_profiler() - - def test_profile_file_name_with_disabled_extended_file_name(self): - SilkyConfig().SILKY_PYTHON_PROFILER_EXTENDED_FILE_NAME = False - request_path = 'normal/uri/' - resulting_prefix = self._get_prof_file_name(request_path) - self.assertEqual(resulting_prefix, '') - - def test_profile_file_name_with_enabled_extended_file_name(self): - - SilkyConfig().SILKY_PYTHON_PROFILER_EXTENDED_FILE_NAME = True - request_path = 'normal/uri/' - resulting_prefix = self._get_prof_file_name(request_path) - self.assertEqual(resulting_prefix, 'normal_uri_') - - def test_profile_file_name_with_path_traversal_and_special_char(self): - SilkyConfig().SILKY_PYTHON_PROFILER_EXTENDED_FILE_NAME = True - request_path = 'spÉciàl/.././大/uri/@É/' - resulting_prefix = self._get_prof_file_name(request_path) - self.assertEqual(resulting_prefix, 'special_uri_e_') - - def test_profile_file_name_with_long_path(self): - SilkyConfig().SILKY_PYTHON_PROFILER_EXTENDED_FILE_NAME = True - request_path = 'long/path/' + 'a' * 100 - resulting_prefix = self._get_prof_file_name(request_path) - # the path is limited to 50 char plus the last `_` - self.assertEqual(len(resulting_prefix), 51) - - @classmethod - def _get_prof_file_name(cls, request_path: str) -> str: - request = RequestMinFactory() - request.path = request_path - DataCollector().configure(request) - DataCollector().finalise() - file_path = DataCollector().request.prof_file.name - filename = file_path.rsplit('/')[-1] - return filename.replace(f"{request.id}.prof", "") diff --git a/project/tests/test_config_meta.py b/project/tests/test_config_meta.py deleted file mode 100644 index fd4566b1..00000000 --- a/project/tests/test_config_meta.py +++ /dev/null @@ -1,50 +0,0 @@ -from unittest.mock import NonCallableMock - -from django.test import TestCase - -from silk.collector import DataCollector -from silk.config import SilkyConfig -from silk.middleware import SilkyMiddleware -from silk.models import Request - -from .util import delete_all_models - - -def fake_get_response(): - def fake_response(): - return 'hello world' - return fake_response - - -class TestConfigMeta(TestCase): - def _mock_response(self): - response = NonCallableMock() - response.headers = {} - response.status_code = 200 - response.queries = [] - response.get = response.headers.get - response.content = '' - return response - - def _execute_request(self): - delete_all_models(Request) - DataCollector().configure(Request.objects.create()) - response = self._mock_response() - SilkyMiddleware(fake_get_response)._process_response('', response) - self.assertTrue(response.status_code == 200) - objs = Request.objects.all() - self.assertEqual(objs.count(), 1) - r = objs[0] - return r - - def test_enabled(self): - SilkyConfig().SILKY_META = True - r = self._execute_request() - self.assertTrue(r.meta_time is not None - or r.meta_num_queries is not None - or r.meta_time_spent_queries is not None) - - def test_disabled(self): - SilkyConfig().SILKY_META = False - r = self._execute_request() - self.assertFalse(r.meta_time) diff --git a/project/tests/test_db.py b/project/tests/test_db.py deleted file mode 100644 index 3157725d..00000000 --- a/project/tests/test_db.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -Test profiling of DB queries without mocking, to catch possible -incompatibility -""" -from django.shortcuts import reverse -from django.test import Client, TestCase - -from silk.collector import DataCollector -from silk.config import SilkyConfig -from silk.models import Request -from silk.profiling.profiler import silk_profile - -from .factories import BlindFactory - - -class TestDbQueries(TestCase): - @classmethod - def setUpClass(cls): - super().setUpClass() - BlindFactory.create_batch(size=5) - SilkyConfig().SILKY_META = False - - def test_profile_request_to_db(self): - DataCollector().configure(Request(reverse('example_app:index'))) - - with silk_profile(name='test_profile'): - resp = self.client.get(reverse('example_app:index')) - - DataCollector().profiles.values() - assert len(resp.context['blinds']) == 5 - - def test_profile_request_to_db_with_constraints(self): - DataCollector().configure(Request(reverse('example_app:create'))) - - resp = self.client.post(reverse('example_app:create'), {'name': 'Foo'}) - self.assertEqual(resp.status_code, 302) - - -class TestAnalyzeQueries(TestCase): - - @classmethod - def setUpClass(cls): - super().setUpClass() - BlindFactory.create_batch(size=5) - SilkyConfig().SILKY_META = False - SilkyConfig().SILKY_ANALYZE_QUERIES = True - - @classmethod - def tearDownClass(cls): - super().tearDownClass() - SilkyConfig().SILKLY_ANALYZE_QUERIES = False - - def test_analyze_queries(self): - DataCollector().configure(Request(reverse('example_app:index'))) - client = Client() - - with silk_profile(name='test_profile'): - resp = client.get(reverse('example_app:index')) - - DataCollector().profiles.values() - assert len(resp.context['blinds']) == 5 - - -class TestAnalyzeQueriesExplainParams(TestAnalyzeQueries): - - @classmethod - def setUpClass(cls): - super().setUpClass() - SilkyConfig().SILKY_EXPLAIN_FLAGS = {'verbose': True} - - @classmethod - def tearDownClass(cls): - super().tearDownClass() - SilkyConfig().SILKY_EXPLAIN_FLAGS = None diff --git a/project/tests/test_execute_sql.py b/project/tests/test_execute_sql.py deleted file mode 100644 index 7e9c5b20..00000000 --- a/project/tests/test_execute_sql.py +++ /dev/null @@ -1,120 +0,0 @@ -from unittest.mock import Mock, NonCallableMagicMock, NonCallableMock, patch - -from django.test import TestCase - -from silk.collector import DataCollector -from silk.models import Request, SQLQuery -from silk.sql import execute_sql - -from .util import delete_all_models - - -def mock_sql(): - mock_sql_query = Mock(spec_set=['_execute_sql', 'query', 'as_sql', 'connection']) - mock_sql_query._execute_sql = Mock() - mock_sql_query.query = NonCallableMock(spec_set=['model']) - mock_sql_query.query.model = Mock() - query_string = 'SELECT * from table_name' - mock_sql_query.as_sql = Mock(return_value=(query_string, ())) - - mock_sql_query.connection = NonCallableMock( - spec_set=['cursor', 'features', 'ops'], - cursor=Mock( - spec_set=['__call__'], - return_value=NonCallableMagicMock(spec_set=['__enter__', '__exit__', 'execute']) - ), - features=NonCallableMock( - spec_set=['supports_explaining_query_execution'], - supports_explaining_query_execution=True - ), - ops=NonCallableMock(spec_set=['explain_query_prefix']), - ) - - return mock_sql_query, query_string - - -def call_execute_sql(cls, request): - DataCollector().configure(request=request) - delete_all_models(SQLQuery) - cls.mock_sql, cls.query_string = mock_sql() - kwargs = { - 'one': 1, - 'two': 2 - } - cls.args = [1, 2] - cls.kwargs = kwargs - execute_sql(cls.mock_sql, *cls.args, **cls.kwargs) - - -class TestCallNoRequest(TestCase): - @classmethod - def setUpClass(cls): - super().setUpClass() - call_execute_sql(cls, None) - - def test_called(self): - self.mock_sql._execute_sql.assert_called_once_with(*self.args, **self.kwargs) - - def test_count(self): - self.assertEqual(0, len(DataCollector().queries)) - - -class TestCallRequest(TestCase): - @classmethod - def setUpClass(cls): - super().setUpClass() - call_execute_sql(cls, Request()) - - def test_called(self): - self.mock_sql._execute_sql.assert_called_once_with(*self.args, **self.kwargs) - - def test_count(self): - self.assertEqual(1, len(DataCollector().queries)) - - def test_query(self): - query = list(DataCollector().queries.values())[0] - self.assertEqual(query['query'], self.query_string) - - -class TestCallSilky(TestCase): - def test_no_effect(self): - DataCollector().configure() - sql, _ = mock_sql() - sql.query.model = NonCallableMagicMock(spec_set=['__module__']) - sql.query.model.__module__ = 'silk.models' - # No SQLQuery models should be created for silk requests for obvious reasons - with patch('silk.sql.DataCollector', return_value=Mock()) as mock_DataCollector: - execute_sql(sql) - self.assertFalse(mock_DataCollector().register_query.call_count) - - -class TestCollectorInteraction(TestCase): - def _query(self): - try: - query = list(DataCollector().queries.values())[0] - except IndexError: - self.fail('No queries created') - return query - - def test_request(self): - DataCollector().configure(request=Request.objects.create(path='/path/to/somewhere')) - sql, _ = mock_sql() - execute_sql(sql) - query = self._query() - self.assertEqual(query['request'], DataCollector().request) - - def test_registration(self): - DataCollector().configure(request=Request.objects.create(path='/path/to/somewhere')) - sql, _ = mock_sql() - execute_sql(sql) - query = self._query() - self.assertIn(query, DataCollector().queries.values()) - - def test_explain(self): - DataCollector().configure(request=Request.objects.create(path='/path/to/somewhere')) - sql, qs = mock_sql() - prefix = "EXPLAIN" - mock_cursor = sql.connection.cursor.return_value.__enter__.return_value - sql.connection.ops.explain_query_prefix.return_value = prefix - execute_sql(sql) - mock_cursor.execute.assert_called_once_with(f"{prefix} {qs}", ()) diff --git a/project/tests/test_silky_profiler.py b/project/tests/test_silky_profiler.py deleted file mode 100644 index 38262ec4..00000000 --- a/project/tests/test_silky_profiler.py +++ /dev/null @@ -1,124 +0,0 @@ -from time import sleep - -from django.test import TestCase - -from silk.collector import DataCollector -from silk.models import Request, _time_taken -from silk.profiling.profiler import silk_profile - -from .test_lib.mock_suite import MockSuite - - -class TestProfilerRequests(TestCase): - def test_context_manager_no_request(self): - DataCollector().configure() - with silk_profile(name='test_profile'): - sleep(0.1) - self.assertFalse(DataCollector().profiles) - - def test_decorator_no_request(self): - DataCollector().configure() - - @silk_profile() - def func(): - sleep(0.1) - - func() - profile = list(DataCollector().profiles.values())[0] - self.assertFalse(profile['request']) - - def test_context_manager_request(self): - DataCollector().configure(Request.objects.create(path='/to/somewhere')) - with silk_profile(name='test_profile'): - sleep(0.1) - profile = list(DataCollector().profiles.values())[0] - self.assertEqual(DataCollector().request, profile['request']) - - def test_decorator_request(self): - DataCollector().configure(Request.objects.create(path='/to/somewhere')) - - @silk_profile() - def func(): - sleep(0.1) - - func() - profile = list(DataCollector().profiles.values())[0] - self.assertEqual(DataCollector().request, profile['request']) - - -class TestProfilertContextManager(TestCase): - @classmethod - def setUpClass(cls): - super().setUpClass() - r = Request.objects.create() - DataCollector().configure(r) - with silk_profile(name='test_profile'): - sleep(0.1) - - def test_one_object(self): - self.assertEqual(len(DataCollector().profiles), 1) - - def test_name(self): - profile = list(DataCollector().profiles.values())[0] - self.assertEqual(profile['name'], 'test_profile') - - def test_time_taken(self): - profile = list(DataCollector().profiles.values())[0] - time_taken = _time_taken(start_time=profile['start_time'], end_time=profile['end_time']) - self.assertGreaterEqual(time_taken, 100) - self.assertLess(time_taken, 110) - - -class TestProfilerDecorator(TestCase): - @classmethod - def setUpClass(cls): - super().setUpClass() - DataCollector().configure(Request.objects.create()) - - @silk_profile() - def func(): - sleep(0.1) - - func() - - def test_one_object(self): - self.assertEqual(len(DataCollector().profiles), 1) - - def test_name(self): - profile = list(DataCollector().profiles.values())[0] - self.assertEqual(profile['name'], 'func') - - def test_time_taken(self): - profile = list(DataCollector().profiles.values())[0] - time_taken = _time_taken(start_time=profile['start_time'], end_time=profile['end_time']) - self.assertGreaterEqual(time_taken, 100) - self.assertLess(time_taken, 115) - - -class TestQueries(TestCase): - def test_no_queries_before(self): - DataCollector().configure(Request.objects.create()) - with silk_profile(name='test_no_queries_before_profile'): - mock_queries = MockSuite().mock_sql_queries(n=5, as_dict=True) - DataCollector().register_query(*mock_queries) - profile = list(DataCollector().profiles.values())[0] - self.assertEqual(profile['name'], 'test_no_queries_before_profile') - queries = profile['queries'] - self.assertEqual(len(queries), 5) - for query in DataCollector().queries: - self.assertIn(query, queries) - - def test_queries_before(self): - """test that any queries registered before profiling begins are ignored""" - DataCollector().configure(Request.objects.create()) - DataCollector().register_query(*MockSuite().mock_sql_queries(n=2, as_dict=True)) - before = [x for x in DataCollector().queries] - with silk_profile(name='test_no_queries_before_profile'): - mock_queries = MockSuite().mock_sql_queries(n=5, as_dict=True) - DataCollector().register_query(*mock_queries) - profile = list(DataCollector().profiles.values())[0] - self.assertEqual(profile['name'], 'test_no_queries_before_profile') - queries = profile['queries'] - self.assertEqual(len(queries), 5) - for query in set(DataCollector().queries).difference(before): - self.assertIn(query, queries)