Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce dependency on create_collection_status_record tasks #3014

Merged
merged 2 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions augur/application/cli/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@

worker_vmem_cap = get_value("Celery", 'worker_process_vmem_cap')

# create rabbit messages so if it failed on shutdown the queues are clean
cleanup_collection_status_and_rabbit(logger, ctx.obj.engine)

gunicorn_command = f"gunicorn -c {gunicorn_location} -b {host}:{port} augur.api.server:app --log-file gunicorn.log"
server = subprocess.Popen(gunicorn_command.split(" "))

Expand All @@ -109,7 +112,7 @@
logger.info(f'Augur is running at: {"http" if development else "https"}://{host}:{port}')
logger.info(f"The API is available at '{api_response.json()['route']}'")

processes = start_celery_worker_processes(float(worker_vmem_cap), disable_collection)

Check warning on line 115 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 W0621: Redefining name 'processes' from outer scope (line 469) (redefined-outer-name) Raw Output: augur/application/cli/backend.py:115:4: W0621: Redefining name 'processes' from outer scope (line 469) (redefined-outer-name)

if os.path.exists("celerybeat-schedule.db"):
logger.info("Deleting old task schedule")
Expand Down Expand Up @@ -180,7 +183,7 @@

try:
keypub.shutdown()
cleanup_after_collection_halt(logger, ctx.obj.engine)
cleanup_collection_status_and_rabbit(logger, ctx.obj.engine)
except RedisConnectionError:
pass

Expand Down Expand Up @@ -252,7 +255,7 @@
"""
Sends SIGTERM to all Augur server & worker processes
"""
logger = logging.getLogger("augur.cli")

Check warning on line 258 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 W0621: Redefining name 'logger' from outer scope (line 34) (redefined-outer-name) Raw Output: augur/application/cli/backend.py:258:4: W0621: Redefining name 'logger' from outer scope (line 34) (redefined-outer-name)

augur_stop(signal.SIGTERM, logger, ctx.obj.engine)

Expand All @@ -265,7 +268,7 @@
"""
Stop collection tasks if they are running, block until complete
"""
processes = get_augur_processes()

Check warning on line 271 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 W0621: Redefining name 'processes' from outer scope (line 469) (redefined-outer-name) Raw Output: augur/application/cli/backend.py:271:4: W0621: Redefining name 'processes' from outer scope (line 469) (redefined-outer-name)

stopped = []

Expand All @@ -275,7 +278,7 @@
stopped.append(p)
p.terminate()

if not len(stopped):

Check warning on line 281 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 C1802: Do not use `len(SEQUENCE)` without comparison to determine if a sequence is empty (use-implicit-booleaness-not-len) Raw Output: augur/application/cli/backend.py:281:7: C1802: Do not use `len(SEQUENCE)` without comparison to determine if a sequence is empty (use-implicit-booleaness-not-len)
logger.info("No collection processes found")
return

Expand All @@ -284,7 +287,7 @@

killed = []
while True:
for i in range(len(alive)):

Check warning on line 290 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 C0200: Consider using enumerate instead of iterating with range and len (consider-using-enumerate) Raw Output: augur/application/cli/backend.py:290:8: C0200: Consider using enumerate instead of iterating with range and len (consider-using-enumerate)
if alive[i].status() == psutil.STATUS_ZOMBIE:
logger.info(f"KILLING ZOMBIE: {alive[i].pid}")
alive[i].kill()
Expand All @@ -296,13 +299,13 @@
for i in reversed(killed):
alive.pop(i)

if not len(alive):

Check warning on line 302 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 C1802: Do not use `len(SEQUENCE)` without comparison to determine if a sequence is empty (use-implicit-booleaness-not-len) Raw Output: augur/application/cli/backend.py:302:11: C1802: Do not use `len(SEQUENCE)` without comparison to determine if a sequence is empty (use-implicit-booleaness-not-len)
break

logger.info(f"Waiting on [{', '.join(str(p.pid for p in alive))}]")
time.sleep(0.5)

cleanup_after_collection_halt(logger, ctx.obj.engine)
cleanup_collection_status_and_rabbit(logger, ctx.obj.engine)

@cli.command('kill')
@test_connection
Expand All @@ -313,11 +316,11 @@
"""
Sends SIGKILL to all Augur server & worker processes
"""
logger = logging.getLogger("augur.cli")

Check warning on line 319 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 W0621: Redefining name 'logger' from outer scope (line 34) (redefined-outer-name) Raw Output: augur/application/cli/backend.py:319:4: W0621: Redefining name 'logger' from outer scope (line 34) (redefined-outer-name)
augur_stop(signal.SIGKILL, logger, ctx.obj.engine)


def augur_stop(signal, logger, engine):

Check warning on line 323 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 W0621: Redefining name 'signal' from outer scope (line 12) (redefined-outer-name) Raw Output: augur/application/cli/backend.py:323:15: W0621: Redefining name 'signal' from outer scope (line 12) (redefined-outer-name)

Check warning on line 323 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 W0621: Redefining name 'logger' from outer scope (line 34) (redefined-outer-name) Raw Output: augur/application/cli/backend.py:323:23: W0621: Redefining name 'logger' from outer scope (line 34) (redefined-outer-name)
"""
Stops augur with the given signal,
and cleans up collection if it was running
Expand All @@ -330,10 +333,10 @@
_broadcast_signal_to_processes(augur_processes, broadcast_signal=signal, given_logger=logger)

if "celery" in process_names:
cleanup_after_collection_halt(logger, engine)
cleanup_collection_status_and_rabbit(logger, engine)


def cleanup_after_collection_halt(logger, engine):
def cleanup_collection_status_and_rabbit(logger, engine):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W0621: Redefining name 'logger' from outer scope (line 34) (redefined-outer-name)

clear_redis_caches()

connection_string = get_value("RabbitMQ", "connection_string")
Expand Down Expand Up @@ -482,7 +485,7 @@
pass
return augur_processes

def _broadcast_signal_to_processes(processes, broadcast_signal=signal.SIGTERM, given_logger=None):

Check warning on line 488 in augur/application/cli/backend.py

View workflow job for this annotation

GitHub Actions / runner / pylint

[pylint] reported by reviewdog 🐶 W0621: Redefining name 'processes' from outer scope (line 469) (redefined-outer-name) Raw Output: augur/application/cli/backend.py:488:35: W0621: Redefining name 'processes' from outer scope (line 469) (redefined-outer-name)
if given_logger is None:
_logger = logger
else:
Expand Down
6 changes: 5 additions & 1 deletion augur/tasks/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from augur.tasks.github.util.github_graphql_data_access import GithubGraphQlDataAccess
from augur.application.db.lib import get_group_by_name, get_repo_by_repo_git, get_github_repo_by_src_id, get_gitlab_repo_by_src_id
from augur.tasks.github.util.util import get_owner_repo
from augur.application.db.models.augur_operations import retrieve_owner_repos, FRONTEND_REPO_GROUP_NAME, RepoGroup
from augur.application.db.models.augur_operations import retrieve_owner_repos, FRONTEND_REPO_GROUP_NAME, RepoGroup, CollectionStatus
from augur.tasks.github.util.github_paginator import hit_api

from augur.application.db.models import UserRepo, Repo
Expand Down Expand Up @@ -235,6 +235,8 @@ def add_github_repo(logger, session, url, repo_group_id, group_id, repo_type, re
logger.error(f"Error while adding repo: Failed to insert user repo record. A record with a repo_id of {repo_id} and a group id of {group_id} needs to be added to the user repo table so that this repo shows up in the users group")
return

CollectionStatus.insert(session, logger, repo_id)


def get_gitlab_repo_data(gl_session, url: str, logger) -> bool:

Expand Down Expand Up @@ -281,6 +283,8 @@ def add_gitlab_repo(logger, session, url, repo_group_id, group_id, repo_src_id):
if not result:
logger.error(f"Error while adding repo: Failed to insert user repo record. A record with a repo_id of {repo_id} and a group id of {group_id} needs to be added to the user repo table so that this repo shows up in the users group")
return

CollectionStatus.insert(session, logger, repo_id)

# @celery.task
# def add_org_repo_list(user_id, group_name, urls):
Expand Down
7 changes: 5 additions & 2 deletions augur/tasks/init/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ def setup_periodic_tasks(sender, **kwargs):
The tasks so that they are grouped by the module they are defined in
"""
from celery.schedules import crontab
from augur.tasks.start_tasks import augur_collection_monitor, augur_collection_update_weights
from augur.tasks.start_tasks import non_repo_domain_tasks, retry_errored_repos
from augur.tasks.start_tasks import augur_collection_monitor
from augur.tasks.start_tasks import non_repo_domain_tasks, retry_errored_repos, create_collection_status_records
from augur.tasks.git.facade_tasks import clone_repos
from augur.tasks.db.refresh_materialized_views import refresh_materialized_views
from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model
Expand Down Expand Up @@ -232,6 +232,9 @@ def setup_periodic_tasks(sender, **kwargs):
logger.info(f"Setting 404 repos to be marked for retry on midnight each day")
sender.add_periodic_task(crontab(hour=0, minute=0),retry_errored_repos.s())

one_day_in_seconds = 24*60*60
sender.add_periodic_task(one_day_in_seconds, create_collection_status_records.s())

@after_setup_logger.connect
def setup_loggers(*args,**kwargs):
"""Override Celery loggers with our own."""
Expand Down
4 changes: 2 additions & 2 deletions augur/tasks/start_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,5 +378,5 @@ def create_collection_status_records(self):
CollectionStatus.insert(session, logger, repo[0])
repo = execute_sql(query).first()

#Check for new repos every seven minutes to be out of step with the clone_repos task
create_collection_status_records.si().apply_async(countdown=60*7)
# no longer recursively run this task because collection status records are added when repos are inserted
#create_collection_status_records.si().apply_async(countdown=60*7)
4 changes: 3 additions & 1 deletion augur/util/repo_load_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict

from augur.application.db.engine import DatabaseEngine
from augur.application.db.models import Repo, UserRepo, RepoGroup, UserGroup, User
from augur.application.db.models import Repo, UserRepo, RepoGroup, UserGroup, User, CollectionStatus
from augur.application.db.models.augur_operations import retrieve_owner_repos
from augur.application.db.util import execute_session_query

Expand Down Expand Up @@ -67,8 +67,10 @@ def add_cli_repo(self, repo_data: Dict[str, Any], from_org_list=False, repo_type
# if the repo doesn't exist it adds it
if "gitlab" in url:
repo_id = Repo.insert_gitlab_repo(self.session, url, repo_group_id, "CLI")
CollectionStatus.insert(self.session, logger, repo_id)
else:
repo_id = Repo.insert_github_repo(self.session, url, repo_group_id, "CLI", repo_type)
CollectionStatus.insert(self.session, logger, repo_id)

if not repo_id:
logger.warning(f"Invalid repo group id specified for {url}, skipping.")
Expand Down