Skip to content

Commit

Permalink
Refactor Celery Task (Netflix#9030)
Browse files Browse the repository at this point in the history
* Refactor celery_tasks

* global import of celery_tasks.app now possible with refactor
  • Loading branch information
castrapel authored Mar 15, 2021
1 parent 05cd9ec commit 7768e2c
Show file tree
Hide file tree
Showing 15 changed files with 57 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .run/Consoleme OSS - Local Celery Beat and Worker.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</ENTRIES>
</EXTENSION>
<option name="SCRIPT_NAME" value="celery" />
<option name="PARAMETERS" value="-A consoleme.celery.celery_tasks worker -l DEBUG -B -E --concurrency=8" />
<option name="PARAMETERS" value="-A consoleme.celery_tasks.celery_tasks worker -l DEBUG -B -E --concurrency=8" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="true" />
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
When ran in development mode (CONFIG_LOCATION=<location of development.yaml configuration file. To run both the celery
beat scheduler and a worker simultaneously, and to have jobs kick off starting at the next minute, run the following
command: celery -A consoleme.celery.celery_tasks worker --loglevel=info -l DEBUG -B
command: celery -A consoleme.celery_tasks.celery_tasks worker --loglevel=info -l DEBUG -B
"""
from __future__ import absolute_import

import json # We use a separate SetEncoder here so we cannot use ujson
import sys
import time
Expand Down Expand Up @@ -104,7 +106,7 @@ def on_configure(self) -> None:
app.control.purge()

log = config.get_logger()
red = async_to_sync(RedisHandler().redis)()
red = RedisHandler().redis_sync()
aws = get_plugin_by_name(config.get("plugins.aws", "default_aws"))
auth = get_plugin_by_name(config.get("plugins.auth", "default_auth"))()
group_mapping = get_plugin_by_name(
Expand Down Expand Up @@ -1355,77 +1357,77 @@ def cache_credential_authorization_mapping() -> Dict:

schedule = {
"cache_roles_across_accounts": {
"task": "consoleme.celery.celery_tasks.cache_roles_across_accounts",
"task": "consoleme.celery_tasks.celery_tasks.cache_roles_across_accounts",
"options": {"expires": 1000},
"schedule": schedule_45_minute,
},
"clear_old_redis_iam_cache": {
"task": "consoleme.celery.celery_tasks.clear_old_redis_iam_cache",
"task": "consoleme.celery_tasks.celery_tasks.clear_old_redis_iam_cache",
"options": {"expires": 180},
"schedule": schedule_6_hours,
},
"cache_policies_table_details": {
"task": "consoleme.celery.celery_tasks.cache_policies_table_details",
"task": "consoleme.celery_tasks.celery_tasks.cache_policies_table_details",
"options": {"expires": 1000},
"schedule": schedule_30_minute,
},
"report_celery_last_success_metrics": {
"task": "consoleme.celery.celery_tasks.report_celery_last_success_metrics",
"task": "consoleme.celery_tasks.celery_tasks.report_celery_last_success_metrics",
"options": {"expires": 60},
"schedule": schedule_minute,
},
"cache_managed_policies_across_accounts": {
"task": "consoleme.celery.celery_tasks.cache_managed_policies_across_accounts",
"task": "consoleme.celery_tasks.celery_tasks.cache_managed_policies_across_accounts",
"options": {"expires": 1000},
"schedule": schedule_45_minute,
},
"cache_s3_buckets_across_accounts": {
"task": "consoleme.celery.celery_tasks.cache_s3_buckets_across_accounts",
"task": "consoleme.celery_tasks.celery_tasks.cache_s3_buckets_across_accounts",
"options": {"expires": 300},
"schedule": schedule_45_minute,
},
"cache_sqs_queues_across_accounts": {
"task": "consoleme.celery.celery_tasks.cache_sqs_queues_across_accounts",
"task": "consoleme.celery_tasks.celery_tasks.cache_sqs_queues_across_accounts",
"options": {"expires": 300},
"schedule": schedule_45_minute,
},
"cache_sns_topics_across_accounts": {
"task": "consoleme.celery.celery_tasks.cache_sns_topics_across_accounts",
"task": "consoleme.celery_tasks.celery_tasks.cache_sns_topics_across_accounts",
"options": {"expires": 300},
"schedule": schedule_45_minute,
},
"cache_audit_table_details": {
"task": "consoleme.celery.celery_tasks.cache_audit_table_details",
"task": "consoleme.celery_tasks.celery_tasks.cache_audit_table_details",
"options": {"expires": 300},
"schedule": schedule_5_minutes,
},
"get_iam_role_limit": {
"task": "consoleme.celery.celery_tasks.get_iam_role_limit",
"task": "consoleme.celery_tasks.celery_tasks.get_iam_role_limit",
"options": {"expires": 300},
"schedule": schedule_24_hours,
},
"cache_cloudtrail_errors_by_arn": {
"task": "consoleme.celery.celery_tasks.cache_cloudtrail_errors_by_arn",
"task": "consoleme.celery_tasks.celery_tasks.cache_cloudtrail_errors_by_arn",
"options": {"expires": 300},
"schedule": schedule_1_hour,
},
"cache_resources_from_aws_config_across_accounts": {
"task": "consoleme.celery.celery_tasks.cache_resources_from_aws_config_across_accounts",
"task": "consoleme.celery_tasks.celery_tasks.cache_resources_from_aws_config_across_accounts",
"options": {"expires": 300},
"schedule": schedule_1_hour,
},
"cache_policy_requests": {
"task": "consoleme.celery.celery_tasks.cache_policy_requests",
"task": "consoleme.celery_tasks.celery_tasks.cache_policy_requests",
"options": {"expires": 1000},
"schedule": schedule_5_minutes,
},
"cache_cloud_account_mapping": {
"task": "consoleme.celery.celery_tasks.cache_cloud_account_mapping",
"task": "consoleme.celery_tasks.celery_tasks.cache_cloud_account_mapping",
"options": {"expires": 1000},
"schedule": schedule_1_hour,
},
"cache_credential_authorization_mapping": {
"task": "consoleme.celery.celery_tasks.cache_credential_authorization_mapping",
"task": "consoleme.celery_tasks.celery_tasks.cache_credential_authorization_mapping",
"options": {"expires": 1000},
"schedule": schedule_5_minutes,
},
Expand Down
2 changes: 1 addition & 1 deletion consoleme/handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ async def authorization_flow(
try:
self.red.setex(
f"USER-{self.user}-CONSOLE-{console_only}",
config.get("dynamic_config.role_cache.cache_expiration", 500),
config.get("dynamic_config.role_cache.cache_expiration", 60),
json.dumps(
{
"groups": self.groups,
Expand Down
24 changes: 21 additions & 3 deletions consoleme/handlers/v2/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from policy_sentry.util.arns import parse_arn
from pydantic import ValidationError

from consoleme.celery_tasks.celery_tasks import app as celery_app
from consoleme.config import config
from consoleme.exceptions.exceptions import (
InvalidRequestParameter,
Expand All @@ -29,7 +30,6 @@
get_url_for_resource,
should_auto_approve_policy_v2,
)
from consoleme.lib.requests import cache_all_policy_requests
from consoleme.lib.timeout import Timeout
from consoleme.lib.v2.requests import (
generate_request_from_change_model_array,
Expand Down Expand Up @@ -60,6 +60,16 @@ class RequestHandler(BaseAPIV2Handler):

allowed_methods = ["POST"]

def on_finish(self) -> None:
if self.request.method != "POST":
return
celery_app.send_task(
"consoleme.celery_tasks.celery_tasks.cache_policy_requests"
)
celery_app.send_task(
"consoleme.celery_tasks.celery_tasks.cache_credential_authorization_mapping"
)

async def post(self):
"""
POST /api/v2/request
Expand Down Expand Up @@ -390,7 +400,6 @@ async def post(self):
)
self.write(response.json())
await self.finish()
await cache_all_policy_requests()
return


Expand Down Expand Up @@ -492,6 +501,16 @@ class RequestDetailHandler(BaseAPIV2Handler):

allowed_methods = ["GET", "PUT"]

def on_finish(self) -> None:
if self.request.method != "PUT":
return
celery_app.send_task(
"consoleme.celery_tasks.celery_tasks.cache_policy_requests"
)
celery_app.send_task(
"consoleme.celery_tasks.celery_tasks.cache_credential_authorization_mapping"
)

async def _get_extended_request(self, request_id, log_data):
dynamo = UserDynamoHandler(self.user)
requests = await dynamo.get_policy_requests(request_id=request_id)
Expand Down Expand Up @@ -668,7 +687,6 @@ async def put(self, request_id):
return
self.write(response.json())
await self.finish()
await cache_all_policy_requests()
return


Expand Down
2 changes: 1 addition & 1 deletion docker-compose-deploy-dockerhub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ services:
python scripts/retrieve_or_decode_configuration.py;
python scripts/initialize_dynamodb_oss.py;
python scripts/initialize_redis_oss.py --use_celery=true;
celery -A consoleme.celery.celery_tasks worker -l DEBUG -B -E --concurrency=8'
celery -A consoleme.celery_tasks.celery_tasks worker -l DEBUG -B -E --concurrency=8'
2 changes: 1 addition & 1 deletion docker-compose-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ services:
python scripts/retrieve_or_decode_configuration.py;
python scripts/initialize_dynamodb_oss.py;
python scripts/initialize_redis_oss.py --use_celery=true;
celery -A consoleme.celery.celery_tasks worker -l DEBUG -B -E --concurrency=8'
celery -A consoleme.celery_tasks.celery_tasks worker -l DEBUG -B -E --concurrency=8'
2 changes: 1 addition & 1 deletion docker-compose-dockerhub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ services:
python scripts/retrieve_or_decode_configuration.py;
python scripts/initialize_dynamodb_oss.py;
python scripts/initialize_redis_oss.py --use_celery=true;
celery -A consoleme.celery.celery_tasks worker -l DEBUG -B -E --concurrency=8
celery -A consoleme.celery_tasks.celery_tasks worker -l DEBUG -B -E --concurrency=8
'
depends_on:
- consoleme-redis
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ services:
python scripts/retrieve_or_decode_configuration.py;
python scripts/initialize_dynamodb_oss.py;
python scripts/initialize_redis_oss.py --use_celery=true;
celery -A consoleme.celery.celery_tasks worker -l DEBUG -B -E --concurrency=8
celery -A consoleme.celery_tasks.celery_tasks worker -l DEBUG -B -E --concurrency=8
'
depends_on:
- consoleme-redis
Expand Down
4 changes: 2 additions & 2 deletions scripts/initialize_redis_oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from asgiref.sync import async_to_sync

from consoleme.celery import celery_tasks as celery
from consoleme.celery_tasks import celery_tasks as celery
from consoleme.lib.account_indexers import get_account_id_to_name_mapping
from consoleme_default_plugins.plugins.celery_tasks import (
celery_tasks as default_celery_tasks,
Expand Down Expand Up @@ -32,7 +32,7 @@ def str2bool(v):
if args.use_celery:
# Initialize Redis locally. If use_celery is set to `True`, you must be running a celery beat and worker. You can
# run this locally with the following command:
# `celery -A consoleme.celery.celery_tasks worker -l DEBUG -B -E --concurrency=8`
# `celery -A consoleme.celery_tasks.celery_tasks worker -l DEBUG -B -E --concurrency=8`

celery.cache_roles_across_accounts()
celery.cache_s3_buckets_across_accounts()
Expand Down
4 changes: 2 additions & 2 deletions terraform/central-account/templates/notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ stopasgroup=true
you'll want similar systemd/supervisor configurations for Celery scheduler and worker (commands incoming)

```bash
/apps/consoleme/bin/celery -A consoleme.celery.celery_tasks beat -l DEBUG --pidfile /tmp/celery.pid
/apps/consoleme/bin/celery -A consoleme.celery_tasks.celery_tasks beat -l DEBUG --pidfile /tmp/celery.pid

/apps/consoleme/bin/celery -A consoleme.celery.celery_tasks worker -l DEBUG -E --pidfile /tmp/celery.pid --max-memory-per-child=1000000 --max-tasks-per-child 50 --soft-time-limit 3600 --concurrency=10 -O fair
/apps/consoleme/bin/celery -A consoleme.celery_tasks.celery_tasks worker -l DEBUG -E --pidfile /tmp/celery.pid --max-memory-per-child=1000000 --max-tasks-per-child 50 --soft-time-limit 3600 --concurrency=10 -O fair
```

(Only bring up one scheduler. You can bring up N workers and enable autoscaling if desired)
Expand Down
2 changes: 1 addition & 1 deletion terraform/central-account/templates/userdata.sh
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ RestartSec=1
WorkingDirectory=/apps/consoleme
Environment=CONFIG_LOCATION=${CONFIG_LOCATION}
Environment=EC2_REGION=${region}
ExecStart=/usr/bin/env /apps/consoleme/env/bin/python3.8 /apps/consoleme/env/bin/celery -A consoleme.celery.celery_tasks worker -l DEBUG -B -E --concurrency=15
ExecStart=/usr/bin/env /apps/consoleme/env/bin/python3.8 /apps/consoleme/env/bin/celery -A consoleme.celery_tasks.celery_tasks worker -l DEBUG -B -E --concurrency=15
[Install]
WantedBy=multi-user.target
Expand Down
4 changes: 2 additions & 2 deletions tests/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class TestCelerySync(TestCase):
def setUp(self):
from consoleme.celery import celery_tasks as celery
from consoleme.celery_tasks import celery_tasks as celery

self.celery = celery

Expand Down Expand Up @@ -73,7 +73,7 @@ def test_cache_roles_for_account(self):
self.assertEqual(
res,
{
"function": "consoleme.celery.celery_tasks.cache_roles_across_accounts",
"function": "consoleme.celery_tasks.celery_tasks.cache_roles_across_accounts",
"cache_key": "test_cache_roles_for_account",
"num_roles": 0,
"num_accounts": 1,
Expand Down
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def create_default_resources(s3, iam, redis, iam_sync_roles, iamrole_table):
s3_key=config.get("cache_roles_across_accounts.all_roles_combined.s3.file"),
)
return
from consoleme.celery.celery_tasks import cache_roles_for_account
from consoleme.celery_tasks.celery_tasks import cache_roles_for_account
from consoleme.lib.account_indexers import get_account_id_to_name_mapping
from consoleme.lib.redis import RedisHandler

Expand Down Expand Up @@ -760,7 +760,7 @@ def mock_exception_stats():

@pytest.fixture(autouse=True, scope="session")
def mock_celery_stats(mock_exception_stats):
p = patch("consoleme.celery.celery_tasks.stats")
p = patch("consoleme.celery_tasks.celery_tasks.stats")

yield p.start()

Expand Down Expand Up @@ -799,7 +799,7 @@ def populate_caches(
):
from asgiref.sync import async_to_sync

from consoleme.celery import celery_tasks as celery
from consoleme.celery_tasks import celery_tasks as celery
from consoleme.lib.account_indexers import get_account_id_to_name_mapping
from consoleme_default_plugins.plugins.celery_tasks import (
celery_tasks as default_celery_tasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class TestCloudCredentialAuthorizationMapping(unittest.IsolatedAsyncioTestCase):
@classmethod
def setUpClass(cls):
from consoleme.celery.celery_tasks import (
from consoleme.celery_tasks.celery_tasks import (
cache_roles_across_accounts,
cache_roles_for_account,
)
Expand Down

0 comments on commit 7768e2c

Please sign in to comment.