-
Notifications
You must be signed in to change notification settings - Fork 851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Release 0.81.0 #2988
Release 0.81.0 #2988
Changes from all commits
0cbb54f
1cba93b
c257be6
87bd311
ed050d7
74c40f9
7ed6460
9110b4e
15a212f
57c3c80
b27f0f8
93039e4
387ab23
67af21b
8f838ff
bd76a1e
e06b9e5
61efa9c
8748fb0
e13f66c
f7c7cda
0e26d37
674c21b
9933ee7
c199f85
8302e87
dc69497
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
|
||
from augur.tasks.start_tasks import augur_collection_monitor, create_collection_status_records | ||
from augur.tasks.git.facade_tasks import clone_repos | ||
from augur.tasks.github.util.github_api_key_handler import GithubApiKeyHandler | ||
from augur.tasks.gitlab.gitlab_api_key_handler import GitlabApiKeyHandler | ||
from augur.tasks.data_analysis.contributor_breadth_worker.contributor_breadth_worker import contributor_breadth_model | ||
from augur.application.db.models import UserRepo | ||
from augur.application.db.session import DatabaseSession | ||
|
@@ -25,6 +27,8 @@ | |
from augur.application.cli import test_connection, test_db_connection, with_database, DatabaseContext | ||
from augur.application.cli._cli_util import _broadcast_signal_to_processes, raise_open_file_limit, clear_redis_caches, clear_rabbitmq_messages | ||
|
||
from keyman.KeyClient import KeyClient, KeyPublisher | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
|
||
logger = AugurLogger("augur", reset_logfiles=False).get_logger() | ||
|
||
@click.group('server', short_help='Commands for controlling the backend API server & data collection workers') | ||
|
@@ -51,6 +55,26 @@ def start(ctx, development): | |
logger.error("Failed to raise open file limit!") | ||
raise e | ||
|
||
keypub = KeyPublisher() | ||
|
||
orchestrator = subprocess.Popen("python keyman/Orchestrator.py".split()) | ||
|
||
# Wait for orchestrator startup | ||
if not keypub.wait(republish=True): | ||
logger.critical("Key orchestrator did not respond in time") | ||
return | ||
|
||
# load keys | ||
ghkeyman = GithubApiKeyHandler(logger) | ||
glkeyman = GitlabApiKeyHandler(logger) | ||
|
||
for key in ghkeyman.keys: | ||
keypub.publish(key, "github_rest") | ||
keypub.publish(key, "github_graphql") | ||
|
||
for key in glkeyman.keys: | ||
keypub.publish(key, "gitlab_rest") | ||
|
||
if development: | ||
os.environ["AUGUR_DEV"] = "1" | ||
logger.info("Starting in development mode") | ||
|
@@ -94,6 +118,8 @@ def start(ctx, development): | |
if p: | ||
p.terminate() | ||
|
||
keypub.shutdown() | ||
|
||
if celery_beat_process: | ||
logger.info("Shutting down celery beat process") | ||
celery_beat_process.terminate() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
# SPDX-License-Identifier: MIT | ||
import logging | ||
import click | ||
import sqlalchemy as s | ||
from datetime import datetime | ||
import httpx | ||
from collections import Counter | ||
|
||
from augur.application.cli import test_connection, test_db_connection | ||
|
||
from augur.application.db.engine import DatabaseEngine | ||
from augur.tasks.github.util.github_api_key_handler import GithubApiKeyHandler | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
@click.group("github", short_help="Github utilities") | ||
def cli(): | ||
pass | ||
|
||
@cli.command("api-keys") | ||
@test_connection | ||
@test_db_connection | ||
def update_api_key(): | ||
""" | ||
Get the ratelimit of Github API keys | ||
""" | ||
|
||
with DatabaseEngine() as engine, engine.connect() as connection: | ||
|
||
get_api_keys_sql = s.sql.text( | ||
""" | ||
SELECT value as github_key from config Where section_name='Keys' AND setting_name='github_api_key' | ||
UNION All | ||
SELECT access_token as github_key from worker_oauth ORDER BY github_key DESC; | ||
""" | ||
) | ||
|
||
result = connection.execute(get_api_keys_sql).fetchall() | ||
keys = [x[0] for x in result] | ||
|
||
with httpx.Client() as client: | ||
|
||
invalid_keys = [] | ||
valid_key_data = [] | ||
for key in keys: | ||
core_key_data, graphql_key_data = GithubApiKeyHandler.get_key_rate_limit(client, key) | ||
if core_key_data is None or graphql_key_data is None: | ||
invalid_keys.append(key) | ||
else: | ||
valid_key_data.append((key, core_key_data, graphql_key_data)) | ||
|
||
valid_key_data = sorted(valid_key_data, key=lambda x: x[1]["requests_remaining"]) | ||
|
||
core_request_header = "Core Requests Left" | ||
core_reset_header = "Core Reset Time" | ||
graphql_request_header = "Graphql Requests Left" | ||
graphql_reset_header = "Graphql Reset Time" | ||
print(f"{'Key'.center(40)} {core_request_header} {core_reset_header} {graphql_request_header} {graphql_reset_header}") | ||
for key, core_key_data, graphql_key_data in valid_key_data: | ||
core_requests = str(core_key_data['requests_remaining']).center(len(core_request_header)) | ||
core_reset_time = str(epoch_to_local_time_with_am_pm(core_key_data["reset_epoch"])).center(len(core_reset_header)) | ||
|
||
graphql_requests = str(graphql_key_data['requests_remaining']).center(len(graphql_request_header)) | ||
graphql_reset_time = str(epoch_to_local_time_with_am_pm(graphql_key_data["reset_epoch"])).center(len(graphql_reset_header)) | ||
|
||
print(f"{key} | {core_requests} | {core_reset_time} | {graphql_requests} | {graphql_reset_time} |") | ||
|
||
valid_key_list = [x[0] for x in valid_key_data] | ||
duplicate_keys = find_duplicates(valid_key_list) | ||
if len(duplicate_keys) > 0: | ||
print("\n\nWARNING: There are duplicate keys this will slow down collection") | ||
print("Duplicate keys".center(40)) | ||
for key in duplicate_keys: | ||
print(key) | ||
|
||
|
||
if len(invalid_keys) > 0: | ||
invalid_key_header = "Invalid Keys".center(40) | ||
print("\n") | ||
print(invalid_key_header) | ||
for key in invalid_keys: | ||
print(key) | ||
print("") | ||
|
||
|
||
|
||
engine.dispose() | ||
|
||
|
||
def epoch_to_local_time_with_am_pm(epoch): | ||
local_time = datetime.fromtimestamp(epoch) | ||
formatted_time = local_time.strftime('%I:%M %p') # This format includes the date as well | ||
return formatted_time | ||
|
||
|
||
def find_duplicates(lst): | ||
counter = Counter(lst) | ||
return [item for item, count in counter.items() if count > 1] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,37 +3,47 @@ | |
import sqlalchemy as s | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
from sqlalchemy.sql import text | ||
from abc import ABC, abstractmethod | ||
from datetime import datetime, timedelta, timezone | ||
|
||
from augur.tasks.init.celery_app import celery_app as celery | ||
from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask | ||
from augur.application.db.data_parse import * | ||
from augur.tasks.github.util.github_data_access import GithubDataAccess, UrlNotFoundException | ||
from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth | ||
from augur.tasks.github.util.github_task_session import GithubTaskManifest | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
from augur.tasks.github.util.util import get_owner_repo | ||
from augur.tasks.util.worker_util import remove_duplicate_dicts | ||
from augur.application.db.models import PullRequestEvent, IssueEvent, Contributor, CollectionStatus | ||
from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_issues_by_repo_id, get_pull_requests_by_repo_id, update_issue_closed_cntrbs_by_repo_id, get_session, get_engine | ||
from augur.application.db.models import PullRequestEvent, IssueEvent, Contributor, Repo | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_issues_by_repo_id, get_pull_requests_by_repo_id, update_issue_closed_cntrbs_by_repo_id, get_session, get_engine, get_core_data_last_collected | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
|
||
|
||
platform_id = 1 | ||
|
||
@celery.task(base=AugurCoreRepoCollectionTask) | ||
def collect_events(repo_git: str): | ||
def collect_events(repo_git: str, full_collection: bool): | ||
|
||
logger = logging.getLogger(collect_events.__name__) | ||
|
||
owner, repo = get_owner_repo(repo_git) | ||
|
||
logger.debug(f"Collecting Github events for {owner}/{repo}") | ||
|
||
if full_collection: | ||
core_data_last_collected = None | ||
else: | ||
repo_id = get_repo_by_repo_git(repo_git).repo_id | ||
|
||
# subtract 2 days to ensure all data is collected | ||
core_data_last_collected = (get_core_data_last_collected(repo_id) - timedelta(days=2)).replace(tzinfo=timezone.utc) | ||
|
||
key_auth = GithubRandomKeyAuth(logger) | ||
|
||
if bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo): | ||
collection_strategy = BulkGithubEventCollection(logger) | ||
else: | ||
collection_strategy = ThoroughGithubEventCollection(logger) | ||
|
||
collection_strategy.collect(repo_git, key_auth) | ||
collection_strategy.collect(repo_git, key_auth, core_data_last_collected) | ||
|
||
def bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo): | ||
|
||
|
@@ -60,7 +70,7 @@ def __init__(self, logger): | |
self._data_source = "Github API" | ||
|
||
@abstractmethod | ||
def collect(self, repo_git, key_auth): | ||
def collect(self, repo_git, key_auth, since): | ||
pass | ||
|
||
def _insert_issue_events(self, events): | ||
|
@@ -97,7 +107,7 @@ def __init__(self, logger): | |
|
||
super().__init__(logger) | ||
|
||
def collect(self, repo_git, key_auth): | ||
def collect(self, repo_git, key_auth, since): | ||
|
||
repo_obj = get_repo_by_repo_git(repo_git) | ||
repo_id = repo_obj.repo_id | ||
|
@@ -106,7 +116,7 @@ def collect(self, repo_git, key_auth): | |
self.repo_identifier = f"{owner}/{repo}" | ||
|
||
events = [] | ||
for event in self._collect_events(repo_git, key_auth): | ||
for event in self._collect_events(repo_git, key_auth, since): | ||
events.append(event) | ||
|
||
# making this a decent size since process_events retrieves all the issues and prs each time | ||
|
@@ -117,15 +127,21 @@ def collect(self, repo_git, key_auth): | |
if events: | ||
self._process_events(events, repo_id) | ||
|
||
def _collect_events(self, repo_git: str, key_auth): | ||
def _collect_events(self, repo_git: str, key_auth, since): | ||
|
||
owner, repo = get_owner_repo(repo_git) | ||
|
||
url = f"https://api.github.com/repos/{owner}/{repo}/issues/events" | ||
|
||
github_data_access = GithubDataAccess(key_auth, self._logger) | ||
|
||
return github_data_access.paginate_resource(url) | ||
for event in github_data_access.paginate_resource(url): | ||
|
||
yield event | ||
|
||
# return if last event on the page was updated before the since date | ||
if since and datetime.fromisoformat(event["created_at"].replace("Z", "+00:00")).replace(tzinfo=timezone.utc) < since: | ||
return | ||
|
||
def _process_events(self, events, repo_id): | ||
|
||
|
@@ -248,26 +264,30 @@ class ThoroughGithubEventCollection(GithubEventCollection): | |
def __init__(self, logger): | ||
super().__init__(logger) | ||
|
||
def collect(self, repo_git, key_auth): | ||
def collect(self, repo_git, key_auth, since): | ||
|
||
repo_obj = get_repo_by_repo_git(repo_git) | ||
repo_id = repo_obj.repo_id | ||
|
||
owner, repo = get_owner_repo(repo_git) | ||
self.repo_identifier = f"{owner}/{repo}" | ||
|
||
self._collect_and_process_issue_events(owner, repo, repo_id, key_auth) | ||
self._collect_and_process_pr_events(owner, repo, repo_id, key_auth) | ||
self._collect_and_process_issue_events(owner, repo, repo_id, key_auth, since) | ||
self._collect_and_process_pr_events(owner, repo, repo_id, key_auth, since) | ||
|
||
def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): | ||
def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth, since): | ||
|
||
engine = get_engine() | ||
|
||
with engine.connect() as connection: | ||
|
||
# TODO: Remove src id if it ends up not being needed | ||
query = text(f""" | ||
select issue_id as issue_id, gh_issue_number as issue_number, gh_issue_id as gh_src_id from issues WHERE repo_id={repo_id} order by created_at desc; | ||
select issue_id as issue_id, gh_issue_number as issue_number, gh_issue_id as gh_src_id | ||
from issues | ||
where repo_id={repo_id} | ||
and updated_at > timestamptz(timestamp '{since}') | ||
order by created_at desc; | ||
""") | ||
|
||
issue_result = connection.execute(query).fetchall() | ||
|
@@ -309,14 +329,18 @@ def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): | |
events.clear() | ||
|
||
|
||
def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth): | ||
def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth, since): | ||
|
||
engine = get_engine() | ||
|
||
with engine.connect() as connection: | ||
|
||
query = text(f""" | ||
select pull_request_id, pr_src_number as gh_pr_number, pr_src_id from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc; | ||
select pull_request_id, pr_src_number as gh_pr_number, pr_src_id | ||
from pull_requests | ||
where repo_id={repo_id} | ||
and pr_updated_at > timestamptz(timestamp '{since}') | ||
order by pr_created_at desc; | ||
""") | ||
|
||
pr_result = connection.execute(query).fetchall() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[pylint] reported by reviewdog 🐶
W0611: Unused KeyClient imported from keyman.KeyClient (unused-import)