Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add asyncio task monitor for api services #419

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions iceprod/credentials/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from wipac_dev_tools import from_environment

from iceprod import __version__ as version_string
from ..prom_utils import AsyncMonitor
from iceprod.rest.auth import authorization
from iceprod.rest.base_handler import IceProdRestConfig, APIBase
from iceprod.server.util import nowstr, datetime2str
Expand Down Expand Up @@ -455,6 +456,7 @@ def __init__(self):

# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None
self.async_monitor = None

logging_url = config["DB_URL"].split('@')[-1] if '@' in config["DB_URL"] else config["DB_URL"]
logging.info(f'DB: {logging_url}')
Expand Down Expand Up @@ -521,6 +523,8 @@ async def start(self):
'version': version_string,
'type': 'credentials',
})
self.async_monitor = AsyncMonitor(labels={'type': 'credentials'})
await self.async_monitor.start()

for collection in self.indexes:
existing = await self.db[collection].index_information()
Expand All @@ -543,3 +547,5 @@ async def stop(self):
pass # ignore cancellations
finally:
self.refresh_service_task = None
if self.async_monitor:
await self.async_monitor.stop()
6 changes: 6 additions & 0 deletions iceprod/materialization/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from wipac_dev_tools import from_environment

from iceprod import __version__ as version_string
from ..prom_utils import AsyncMonitor
from iceprod.rest.auth import authorization, attr_auth
from iceprod.rest.base_handler import IceProdRestConfig, APIBase
from iceprod.server.util import nowstr, datetime2str
Expand Down Expand Up @@ -307,6 +308,7 @@ def __init__(self):

# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None
self.async_monitor = None

logging_url = config["DB_URL"].split('@')[-1] if '@' in config["DB_URL"] else config["DB_URL"]
logging.info(f'DB: {logging_url}')
Expand Down Expand Up @@ -368,6 +370,8 @@ async def start(self):
'version': version_string,
'type': 'materialization',
})
self.async_monitor = AsyncMonitor(labels={'type': 'materialization'})
await self.async_monitor.start()

for collection in self.indexes:
existing = await self.db[collection].index_information()
Expand All @@ -390,3 +394,5 @@ async def stop(self):
pass
finally:
self.materialization_service_task = None
if self.async_monitor:
await self.async_monitor.stop()
31 changes: 30 additions & 1 deletion iceprod/prom_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""
Some Prometheus utilities.
"""

import asyncio
import time

from prometheus_client import Histogram
from wipac_dev_tools.prometheus_tools import GlobalLabels,AsyncPromWrapper


class HistogramBuckets:
Expand Down Expand Up @@ -49,3 +50,31 @@ def on_finish(self):
path=self.request.path,
status=self.get_status(),
).observe(end_time - self._prom_start_time)


class AsyncMonitor(GlobalLabels):
SLEEP_TIME = 5

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._task = None

async def start(self):
if self._task is None:
self._task = asyncio.create_task(self._monitor())

async def stop(self):
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
finally:
self._task = None

@AsyncPromWrapper(lambda self: self.prometheus.gauge('asyncio_tasks_running', 'Python asyncio tasks active'))
async def _monitor(self, prom_gauge):
while True:
prom_gauge.set(len(asyncio.all_tasks()))
asyncio.sleep(self.SLEEP_TIME)
6 changes: 6 additions & 0 deletions iceprod/rest/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from wipac_dev_tools import from_environment

from iceprod import __version__ as version_string
from ..prom_utils import AsyncMonitor
from ..s3 import boto3, S3
from .base_handler import IceProdRestConfig

Expand Down Expand Up @@ -81,6 +82,7 @@ def __init__(self, s3_override=None):

# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None
self.async_monitor = None

s3conn = None
if s3_override:
Expand Down Expand Up @@ -135,6 +137,8 @@ async def start(self):
'version': version_string,
'type': 'api',
})
self.async_monitor = AsyncMonitor(labels={'type': 'api'})
await self.async_monitor.start()

for database in self.indexes:
db = self.db[database]
Expand All @@ -148,3 +152,5 @@ async def start(self):

async def stop(self):
await self.server.stop()
if self.async_monitor:
await self.async_monitor.stop()
7 changes: 6 additions & 1 deletion iceprod/website/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from wipac_dev_tools import from_environment

from iceprod import __version__ as version_string
from iceprod.prom_utils import PromRequestMixin
from iceprod.prom_utils import AsyncMonitor, PromRequestMixin
from iceprod.roles_groups import GROUPS
from iceprod.core.config import CONFIG_SCHEMA as DATASET_SCHEMA
from iceprod.server.config import CONFIG_SCHEMA as SERVER_SCHEMA
Expand Down Expand Up @@ -712,6 +712,7 @@ def __init__(self):

# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None
self.async_monitor = None

if config['ICEPROD_CRED_CLIENT_ID'] and config['ICEPROD_CRED_CLIENT_SECRET']:
logging.info(f'enabling auth via {config["OPENID_URL"]} for aud "{config["OPENID_AUDIENCE"]}"')
Expand Down Expand Up @@ -802,6 +803,10 @@ async def start(self):
'version': version_string,
'type': 'website',
})
self.async_monitor = AsyncMonitor(labels={'type': 'website'})
await self.async_monitor.start()

async def stop(self):
await self.server.stop()
if self.async_monitor:
await self.async_monitor.stop()