Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
leplatrem committed Feb 10, 2025
1 parent 4bdcd6a commit e9617c4
Show file tree
Hide file tree
Showing 16 changed files with 2,311 additions and 0 deletions.
37 changes: 37 additions & 0 deletions Cronjobs.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
FROM python:3.13.1 AS build

ENV PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
POETRY_HOME="/opt/poetry" \
POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_IN_PROJECT=true \
VIRTUAL_ENV=/opt/.venv \
PATH="/opt/.venv/bin:$PATH" \
PYTHONPATH="/app:$PYTHONPATH"

# Install Poetry
RUN python -m venv $POETRY_HOME && \
$POETRY_HOME/bin/pip install poetry==2.0.1 && \
$POETRY_HOME/bin/poetry --version

WORKDIR /opt
COPY pyproject.toml poetry.lock ./
RUN $POETRY_HOME/bin/poetry install --only cronjobs --no-root


FROM python:3.13.1

ENV PATH="/opt/.venv/bin:$PATH" \
PYTHONUNBUFFERED=1 \
VIRTUAL_ENV=/opt/.venv \
PYTHONPATH="/app:$PYTHONPATH"

COPY --from=build $VIRTUAL_ENV $VIRTUAL_ENV

WORKDIR /lambda
RUN mkdir /lambda/commands
ADD commands/*.py /lambda/commands/

# ./cronjobs/run.sh, not ./bin/run.sh
ENTRYPOINT ["/app/run.sh"]
CMD ["help"]
146 changes: 146 additions & 0 deletions cronjobs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Remote Settings Lambdas

A collection of scripts related to the Remote Settings service.

## Sentry

All commands use Sentry to report any unexpected errors. Sentry can be configured with these environment variables, which are recommended, but not required:

- `SENTRY_DSN`: The DSN from the "Client Keys" section in the project settings in Sentry.
- `SENTRY_ENV`: The environment to use for Sentry, e.g. dev, stage or prod.

## Commands

Each command can be run, either with Python:

```
$ python aws_lambda.py validate_signature
```

or via the Docker container:

```
$ docker run remote-settings-lambdas validate_signature
```


### refresh_signature

Environment config:

- ``SERVER``: server URL (default: ``http://localhost:8888/v1``)
- ``REFRESH_SIGNATURE_AUTH``: credentials, either ``user:pass`` or ``{access-token}`` (default: ``None``)
- ``REQUESTS_TIMEOUT_SECONDS``: Connection/Read timeout in seconds (default: ``2``)
- ``REQUESTS_NB_RETRIES``: Number of retries before failing (default: ``4``)
- ``MAX_SIGNATURE_AGE``: Refresh signatures that are older that this age in days (default: ``7``)

> **Note**:
> In order to force refresh of all signatures, set ``MAX_SIGNATURE_AGE=0``
Example:

```
$ REFRESH_SIGNATURE_AUTH=reviewer:pass python aws_lambda.py refresh_signature
Looking at /buckets/monitor/collections/changes:
Looking at /buckets/source/collections/source: to-review at 2018-03-05 13:56:08 UTC ( 1520258168885 )
Looking at /buckets/staging/collections/addons: Trigger new signature: signed at 2018-03-05 13:57:31 UTC ( 1520258251343 )
Looking at /buckets/staging/collections/certificates: Trigger new signature: signed at 2018-03-05 13:57:31 UTC ( 1520258251441 )
Looking at /buckets/staging/collections/plugins: Trigger new signature: signed at 2018-03-05 13:57:31 UTC ( 1520258251547 )
Looking at /buckets/staging/collections/gfx: Trigger new signature: signed at 2018-03-05 13:57:31 UTC ( 1520258251640 )
```


### backport_records

Backport the changes from one collection to another. This is useful if the new collection (*source*) has become the source of truth,
but there are still clients pulling data from the old collection (*destination*).

> Note: This lambda is not safe if other users can interact with the destination collection.
Environment config:

- ``SERVER``: server URL (default: ``http://localhost:8888/v1``)
- ``BACKPORT_RECORDS_SOURCE_AUTH``: authentication for source collection
- ``BACKPORT_RECORDS_DEST_AUTH``: authentication for destination collection (default: same as source)
- ``BACKPORT_RECORDS_SOURCE_BUCKET``: bucket id to read records from
- ``BACKPORT_RECORDS_SOURCE_COLLECTION``: collection id to read records from
- ``BACKPORT_RECORDS_SOURCE_FILTERS``: optional filters when backporting records as JSON format (default: none, eg. ``"{"min_age": 42}"``)
- ``BACKPORT_RECORDS_DEST_BUCKET``: bucket id to copy records to (default: same as source bucket)
- ``BACKPORT_RECORDS_DEST_COLLECTION``:collection id to copy records to (default: same as source collection)
- ``REQUESTS_TIMEOUT_SECONDS``: Connection/Read timeout in seconds (default: ``2``)
- ``REQUESTS_NB_RETRIES``: Number of retries before failing (default: ``4``)
- ``SAFE_HEADERS``: Add concurrency control headers to update requests (default: ``false``)

Example:

```
$ BACKPORT_RECORDS_SOURCE_AUTH=user:pass BACKPORT_RECORDS_SOURCE_BUCKET=blocklists BACKPORT_RECORDS_SOURCE_COLLECTION=certificates BACKPORT_RECORDS_DEST_BUCKET=security-state BACKPORT_RECORDS_DEST_COLLECTION=onecrl python3 aws_lambda.py backport_records
Batch #0: PUT /buckets/security-state/collections/onecrl/records/003234b2-f425-eae6-9596-040747dab2b9 - 201
Batch #1: PUT /buckets/security-state/collections/onecrl/records/00ac492e-04f7-ee6d-5fd2-bb12b97a4b7f - 201
Batch #2: DELETE /buckets/security-state/collections/onecrl/records/23 - 200
Done. 3 changes applied.
```

```
$ BACKPORT_RECORDS_SOURCE_AUTH=user:pass BACKPORT_RECORDS_SOURCE_BUCKET=blocklists BACKPORT_RECORDS_SOURCE_COLLECTION=certificates BACKPORT_RECORDS_DEST_BUCKET=security-state BACKPORT_RECORDS_DEST_COLLECTION=onecrl python3 aws_lambda.py backport_records
Records are in sync. Nothing to do.
```


### sync_megaphone

Send the current version of Remote Settings data to the Push server.

Does nothing if versions are in sync.

Environment config:

- ``SERVER``: Remote Settings server URL (default: ``http://localhost:8888/v1``)
- ``MEGAPHONE_URL``: Megaphone service URL
- ``MEGAPHONE_READER_AUTH``: Bearer token for Megaphone read access
- ``MEGAPHONE_BROADCASTER_AUTH``: Bearer token for Megaphone broadcaster access
- ``BROADCASTER_ID``: Push broadcaster ID (default: ``remote-settings``)
- ``CHANNEL_ID``: Push channel ID (default: ``monitor_changes``)

Example:

```
$ SERVER=https://settings.prod.mozaws.net/v1 MEGAPHONE_URL="https://push.services.mozilla.com/v1" MEGAPHONE_READER_AUTH="a-b-c" MEGAPHONE_BROADCASTER_AUTH="d-e-f" python aws_lambda.py sync_megaphone
```


## Test locally

```
$ make test
$ SERVER=https://firefox.settings.services.mozilla.com/v1/ python aws_lambda.py validate_signature
```

### Local Kinto server

Best way to obtain a local setup that looks like a writable Remote Settings instance is to follow [this tutorial](https://remote-settings.readthedocs.io/en/latest/tutorial-local-server.html)

It is possible to initialize the server with some fake data, like for the Kinto Dist smoke tests:

```
$ bash /path/to/kinto-dist/tests/smoke-test.sh
```

## Releasing

1. Create a release on Github on https://github.com/mozilla-services/remote-settings-lambdas/releases/new
2. Create a new tag `X.Y.Z` (*This tag will be created from the target when you publish this release.*)
3. Generate release notes
4. Publish release

## License

Apache 2.0

125 changes: 125 additions & 0 deletions cronjobs/aws_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#!/usr/bin/env python
import glob
import importlib
import os
import sys

import sentry_sdk
from decouple import config


SENTRY_DSN = config("SENTRY_DSN", default=None)
SENTRY_ENV = config("SENTRY_ENV", default=None)
SERVER_URL = os.getenv("SERVER", "http://localhost:8888/v1")

if SENTRY_DSN:
# Note! If you don't do `sentry_sdk.init(DSN)` it will still work
# to do things like calling `sentry_sdk.capture_exception(exception)`
# It just means it's a noop.
env_option = {}
if SENTRY_ENV:
env_option = {"environment": SENTRY_ENV}
if os.getenv("AWS_LAMBDA_FUNCTION_NAME"):
# We're running in AWS. See https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration

integrations = [AwsLambdaIntegration()]
elif os.getenv("FUNCTION_TARGET", os.getenv("GOOGLE_CLOUD_PROJECT")):
# We're running in Google Cloud. See https://cloud.google.com/functions/docs/configuring/env-var
from sentry_sdk.integrations.gcp import GcpIntegration

integrations = [GcpIntegration()]
else:
raise RuntimeError("Could not determine Cloud environment for Sentry")
sentry_sdk.init(SENTRY_DSN, integrations=integrations, **env_option)


def help_(**kwargs):
"""Show this help."""

def white_bold(s):
return f"\033[1m\x1b[37m{s}\033[0;0m"

entrypoints = [
os.path.splitext(os.path.basename(f))[0] for f in glob.glob("./commands/[a-z]*.py")
]
commands = [
getattr(importlib.import_module(f"commands.{entrypoint}"), entrypoint)
for entrypoint in entrypoints
]
func_listed = "\n - ".join([f"{white_bold(f.__name__)}: {f.__doc__}" for f in commands])
print(
f"""
Remote Settings lambdas.
Available commands:
- {func_listed}
"""
)


def run(command, event=None, context=None):
if event is None:
event = {"server": SERVER_URL}
if context is None:
context = {"sentry_sdk": sentry_sdk}

if isinstance(command, (str,)):
# Import the command module and returns its main function.
mod = importlib.import_module(f"commands.{command}")
command = getattr(mod, command)

# Note! If the sentry_sdk was initialized with the platform integration,
# it is now ready to automatically capture all and any unexpected exceptions.
# See https://docs.sentry.io/platforms/python/guides/aws-lambda/
# See https://docs.sentry.io/platforms/python/guides/gcp-functions/

# Option to test failure to test Sentry integration.
if event.get("force_fail") or os.getenv("FORCE_FAIL"):
raise Exception("Found forced failure flag")

command(event, context)


def backport_records(*args, **kwargs):
return run("backport_records", *args, **kwargs)


def blockpages_generator(*args, **kwargs):
return run("blockpages_generator", *args, **kwargs)


def refresh_signature(*args, **kwargs):
return run("refresh_signature", *args, **kwargs)


def sync_megaphone(*args, **kwargs):
return run("sync_megaphone", *args, **kwargs)


def build_bundles(*args, **kwargs):
return run("build_bundles", *args, **kwargs)


def main(*args):
# Run the function specified in CLI arg.
#
# $ AUTH=user:pass python aws_lambda.py refresh_signature
#

if not args or args[0] in ("help", "--help"):
help_()
return
entrypoint = args[0]
try:
command = globals()[entrypoint]
except KeyError:
print(f"Unknown function {entrypoint!r}", file=sys.stderr)
help_()
return 1
command()


if __name__ == "__main__":
sys.exit(main(*sys.argv[1:]))
79 changes: 79 additions & 0 deletions cronjobs/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import concurrent.futures
import os

import backoff
import kinto_http
import requests
from requests.adapters import TimeoutSauce


PARALLEL_REQUESTS = int(os.getenv("PARALLEL_REQUESTS", 4))
REQUESTS_TIMEOUT_SECONDS = float(os.getenv("REQUESTS_TIMEOUT_SECONDS", 2))
REQUESTS_NB_RETRIES = int(os.getenv("REQUESTS_NB_RETRIES", 4))
DRY_MODE = os.getenv("DRY_RUN", "0") in "1yY"

retry_timeout = backoff.on_exception(
backoff.expo,
(requests.exceptions.Timeout, requests.exceptions.ConnectionError),
max_tries=REQUESTS_NB_RETRIES,
)


class CustomTimeout(TimeoutSauce):
def __init__(self, *args, **kwargs):
if kwargs["connect"] is None:
kwargs["connect"] = REQUESTS_TIMEOUT_SECONDS
if kwargs["read"] is None:
kwargs["read"] = REQUESTS_TIMEOUT_SECONDS
super().__init__(*args, **kwargs)


requests.adapters.TimeoutSauce = CustomTimeout


class KintoClient(kinto_http.Client):
"""
This Kinto client will retry the requests if they fail for timeout, and
if the server replies with a 5XX.
"""

def __init__(self, *args, **kwargs):
kwargs.setdefault("retry", REQUESTS_NB_RETRIES)
kwargs.setdefault("dry_mode", DRY_MODE)
super().__init__(*args, **kwargs)

@retry_timeout
def server_info(self, *args, **kwargs):
return super().server_info(*args, **kwargs)

@retry_timeout
def get_collection(self, *args, **kwargs):
return super().get_collection(*args, **kwargs)

@retry_timeout
def get_records(self, *args, **kwargs):
return super().get_records(*args, **kwargs)

@retry_timeout
def get_records_timestamp(self, *args, **kwargs):
return super().get_records_timestamp(*args, **kwargs)

@retry_timeout
def get_changeset(self, *args, **kwargs):
return super().get_changeset(*args, **kwargs)

@retry_timeout
def approve_changes(self, *args, **kwargs):
return super().approve_changes(*args, **kwargs)

@retry_timeout
def request_review(self, *args, **kwargs):
return super().request_review(*args, **kwargs)


def call_parallel(func, args_list, max_workers=PARALLEL_REQUESTS):
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(func, *args) for args in args_list]
results = [future.result() for future in futures]
return results
Loading

0 comments on commit e9617c4

Please sign in to comment.