diff --git a/augur/application/cli/backend.py b/augur/application/cli/backend.py index 2b5ec6904..69f93aa6e 100644 --- a/augur/application/cli/backend.py +++ b/augur/application/cli/backend.py @@ -86,6 +86,9 @@ def start(ctx, disable_collection, development, pidfile, port): worker_vmem_cap = get_value("Celery", 'worker_process_vmem_cap') + # create rabbit messages so if it failed on shutdown the queues are clean + cleanup_collection_status_and_rabbit(logger, ctx.obj.engine) + gunicorn_command = f"gunicorn -c {gunicorn_location} -b {host}:{port} augur.api.server:app --log-file gunicorn.log" server = subprocess.Popen(gunicorn_command.split(" ")) @@ -180,7 +183,7 @@ def start(ctx, disable_collection, development, pidfile, port): try: keypub.shutdown() - cleanup_after_collection_halt(logger, ctx.obj.engine) + cleanup_collection_status_and_rabbit(logger, ctx.obj.engine) except RedisConnectionError: pass @@ -302,7 +305,7 @@ def stop_collection(ctx): logger.info(f"Waiting on [{', '.join(str(p.pid for p in alive))}]") time.sleep(0.5) - cleanup_after_collection_halt(logger, ctx.obj.engine) + cleanup_collection_status_and_rabbit(logger, ctx.obj.engine) @cli.command('kill') @test_connection @@ -330,10 +333,10 @@ def augur_stop(signal, logger, engine): _broadcast_signal_to_processes(augur_processes, broadcast_signal=signal, given_logger=logger) if "celery" in process_names: - cleanup_after_collection_halt(logger, engine) + cleanup_collection_status_and_rabbit(logger, engine) -def cleanup_after_collection_halt(logger, engine): +def cleanup_collection_status_and_rabbit(logger, engine): clear_redis_caches() connection_string = get_value("RabbitMQ", "connection_string") diff --git a/augur/tasks/frontend.py b/augur/tasks/frontend.py index f526d9041..cd2d3e32b 100644 --- a/augur/tasks/frontend.py +++ b/augur/tasks/frontend.py @@ -9,7 +9,7 @@ from augur.tasks.github.util.github_graphql_data_access import GithubGraphQlDataAccess from augur.application.db.lib import get_group_by_name, get_repo_by_repo_git, get_github_repo_by_src_id, get_gitlab_repo_by_src_id from augur.tasks.github.util.util import get_owner_repo -from augur.application.db.models.augur_operations import retrieve_owner_repos, FRONTEND_REPO_GROUP_NAME, RepoGroup +from augur.application.db.models.augur_operations import retrieve_owner_repos, FRONTEND_REPO_GROUP_NAME, RepoGroup, CollectionStatus from augur.tasks.github.util.github_paginator import hit_api from augur.application.db.models import UserRepo, Repo @@ -235,6 +235,8 @@ def add_github_repo(logger, session, url, repo_group_id, group_id, repo_type, re logger.error(f"Error while adding repo: Failed to insert user repo record. A record with a repo_id of {repo_id} and a group id of {group_id} needs to be added to the user repo table so that this repo shows up in the users group") return + CollectionStatus.insert(session, logger, repo_id) + def get_gitlab_repo_data(gl_session, url: str, logger) -> bool: @@ -281,6 +283,8 @@ def add_gitlab_repo(logger, session, url, repo_group_id, group_id, repo_src_id): if not result: logger.error(f"Error while adding repo: Failed to insert user repo record. A record with a repo_id of {repo_id} and a group id of {group_id} needs to be added to the user repo table so that this repo shows up in the users group") return + + CollectionStatus.insert(session, logger, repo_id) # @celery.task # def add_org_repo_list(user_id, group_name, urls): diff --git a/augur/tasks/init/celery_app.py b/augur/tasks/init/celery_app.py index 1be45b1f0..5026244f8 100644 --- a/augur/tasks/init/celery_app.py +++ b/augur/tasks/init/celery_app.py @@ -201,8 +201,8 @@ def setup_periodic_tasks(sender, **kwargs): The tasks so that they are grouped by the module they are defined in """ from celery.schedules import crontab - from augur.tasks.start_tasks import augur_collection_monitor, augur_collection_update_weights - from augur.tasks.start_tasks import non_repo_domain_tasks, retry_errored_repos + from augur.tasks.start_tasks import augur_collection_monitor + from augur.tasks.start_tasks import non_repo_domain_tasks, retry_errored_repos, create_collection_status_records from augur.tasks.git.facade_tasks import clone_repos from augur.tasks.db.refresh_materialized_views import refresh_materialized_views from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model @@ -232,6 +232,9 @@ def setup_periodic_tasks(sender, **kwargs): logger.info(f"Setting 404 repos to be marked for retry on midnight each day") sender.add_periodic_task(crontab(hour=0, minute=0),retry_errored_repos.s()) + one_day_in_seconds = 24*60*60 + sender.add_periodic_task(one_day_in_seconds, create_collection_status_records.s()) + @after_setup_logger.connect def setup_loggers(*args,**kwargs): """Override Celery loggers with our own.""" diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 3ba30ed70..2a697a0ea 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -378,5 +378,5 @@ def create_collection_status_records(self): CollectionStatus.insert(session, logger, repo[0]) repo = execute_sql(query).first() - #Check for new repos every seven minutes to be out of step with the clone_repos task - create_collection_status_records.si().apply_async(countdown=60*7) + # no longer recursively run this task because collection status records are added when repos are inserted + #create_collection_status_records.si().apply_async(countdown=60*7) diff --git a/augur/util/repo_load_controller.py b/augur/util/repo_load_controller.py index 7021a215f..af46ce326 100644 --- a/augur/util/repo_load_controller.py +++ b/augur/util/repo_load_controller.py @@ -5,7 +5,7 @@ from typing import Any, Dict from augur.application.db.engine import DatabaseEngine -from augur.application.db.models import Repo, UserRepo, RepoGroup, UserGroup, User +from augur.application.db.models import Repo, UserRepo, RepoGroup, UserGroup, User, CollectionStatus from augur.application.db.models.augur_operations import retrieve_owner_repos from augur.application.db.util import execute_session_query @@ -67,8 +67,10 @@ def add_cli_repo(self, repo_data: Dict[str, Any], from_org_list=False, repo_type # if the repo doesn't exist it adds it if "gitlab" in url: repo_id = Repo.insert_gitlab_repo(self.session, url, repo_group_id, "CLI") + CollectionStatus.insert(self.session, logger, repo_id) else: repo_id = Repo.insert_github_repo(self.session, url, repo_group_id, "CLI", repo_type) + CollectionStatus.insert(self.session, logger, repo_id) if not repo_id: logger.warning(f"Invalid repo group id specified for {url}, skipping.")