Skip to content

Commit

Permalink
Convert all transforms to async
Browse files Browse the repository at this point in the history
  • Loading branch information
ahal committed Nov 30, 2023
1 parent fb7fa3a commit ba99553
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 57 deletions.
4 changes: 2 additions & 2 deletions src/taskgraph/transforms/cached_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def format_task_digest(cached_task):


@transforms.add
def cache_task(config, tasks):
async def cache_task(config, tasks):
if taskgraph.fast:
for task in tasks:
yield task
Expand All @@ -61,7 +61,7 @@ def cache_task(config, tasks):
if "cached_task" in task.attributes:
digests[task.label] = format_task_digest(task.attributes["cached_task"])

for task in order_tasks(config, tasks):
for task in order_tasks(config, [t async for t in tasks]):
cache = task.pop("cache", None)
if cache is None:
yield task
Expand Down
4 changes: 2 additions & 2 deletions src/taskgraph/transforms/chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@


@transforms.add
def chunk_tasks(config, tasks):
for task in tasks:
async def chunk_tasks(config, tasks):
async for task in tasks:
chunk_config = task.pop("chunk", None)
if not chunk_config:
yield task
Expand Down
4 changes: 2 additions & 2 deletions src/taskgraph/transforms/code_review.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@


@transforms.add
def add_dependencies(config, jobs):
for job in jobs:
async def add_dependencies(config, jobs):
async for job in jobs:
job.setdefault("soft-dependencies", [])
job["soft-dependencies"] += [
dep_task.label
Expand Down
6 changes: 2 additions & 4 deletions src/taskgraph/transforms/docker_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@


@transforms.add
def fill_template(config, tasks):
async def fill_template(config, tasks):
available_packages = set()
for task in config.kind_dependencies_tasks.values():
if task.kind != "packages":
Expand All @@ -75,13 +75,11 @@ def fill_template(config, tasks):

context_hashes = {}

tasks = list(tasks)

if not taskgraph.fast and config.write_artifacts:
if not os.path.isdir(CONTEXTS_DIR):
os.makedirs(CONTEXTS_DIR)

for task in tasks:
async for task in tasks:
image_name = task.pop("name")
job_symbol = task.pop("symbol", None)
args = task.pop("args", {})
Expand Down
8 changes: 4 additions & 4 deletions src/taskgraph/transforms/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ def wrap(func):


@transforms.add
def process_fetch_job(config, jobs):
async def process_fetch_job(config, jobs):
# Converts fetch-url entries to the job schema.
for job in jobs:
async for job in jobs:
typ = job["fetch"]["type"]
name = job["name"]
fetch = job.pop("fetch")
Expand All @@ -103,15 +103,15 @@ def configure_fetch(config, typ, name, fetch):


@transforms.add
def make_task(config, jobs):
async def make_task(config, jobs):
# Fetch tasks are idempotent and immutable. Have them live for
# essentially forever.
if config.params["level"] == "3":
expires = "1000 years"
else:
expires = "28 days"

for job in jobs:
async for job in jobs:
name = job["name"]
artifact_prefix = job.get("artifact-prefix", "public")
env = job.get("env", {})
Expand Down
4 changes: 2 additions & 2 deletions src/taskgraph/transforms/from_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@


@transforms.add
def from_deps(config, tasks):
for task in tasks:
async def from_deps(config, tasks):
async for task in tasks:
# Setup and error handling.
from_deps = task.pop("from-deps")
kind_deps = config.config.get("kind-dependencies", [])
Expand Down
24 changes: 12 additions & 12 deletions src/taskgraph/transforms/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@


@transforms.add
def rewrite_when_to_optimization(config, jobs):
for job in jobs:
async def rewrite_when_to_optimization(config, jobs):
async for job in jobs:
when = job.pop("when", {})
if not when:
yield job
Expand All @@ -132,8 +132,8 @@ def rewrite_when_to_optimization(config, jobs):


@transforms.add
def set_implementation(config, jobs):
for job in jobs:
async def set_implementation(config, jobs):
async for job in jobs:
impl, os = worker_type_implementation(config.graph_config, job["worker-type"])
if os:
job.setdefault("tags", {})["os"] = os
Expand All @@ -148,8 +148,8 @@ def set_implementation(config, jobs):


@transforms.add
def set_label(config, jobs):
for job in jobs:
async def set_label(config, jobs):
async for job in jobs:
if "label" not in job:
if "name" not in job:
raise Exception("job has neither a name nor a label")
Expand All @@ -160,8 +160,8 @@ def set_label(config, jobs):


@transforms.add
def add_resource_monitor(config, jobs):
for job in jobs:
async def add_resource_monitor(config, jobs):
async for job in jobs:
if job.get("attributes", {}).get("resource-monitor"):
worker_implementation, worker_os = worker_type_implementation(
config.graph_config, job["worker-type"]
Expand Down Expand Up @@ -204,13 +204,13 @@ def get_attribute(dict, key, attributes, attribute_name):


@transforms.add
def use_fetches(config, jobs):
async def use_fetches(config, jobs):
artifact_names = {}
aliases = {}
extra_env = {}

jobs = [j async for j in jobs]
if config.kind in ("toolchain", "fetch"):
jobs = list(jobs)
for job in jobs:
run = job.get("run", {})
label = job["label"]
Expand Down Expand Up @@ -353,12 +353,12 @@ def cmp_artifacts(a):


@transforms.add
def make_task_description(config, jobs):
async def make_task_description(config, jobs):
"""Given a build description, create a task description"""
# import plugin modules first, before iterating over jobs
import_sibling_modules(exceptions=("common.py",))

for job in jobs:
async for job in jobs:
# always-optimized tasks never execute, so have no workdir
if job["worker"]["implementation"] in ("docker-worker", "generic-worker"):
job["run"].setdefault("workdir", "/builds/worker")
Expand Down
4 changes: 2 additions & 2 deletions src/taskgraph/transforms/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ def _convert_content(content):


@transforms.add
def add_notifications(config, tasks):
for task in tasks:
async def add_notifications(config, tasks):
async for task in tasks:
label = "{}-{}".format(config.kind, task["name"])
if "notifications" in task:
notify = _convert_legacy(config, task.pop("notifications"), label)
Expand Down
50 changes: 25 additions & 25 deletions src/taskgraph/transforms/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,11 @@ def build_dummy_payload(config, task, task_def):


@transforms.add
def set_implementation(config, tasks):
async def set_implementation(config, tasks):
"""
Set the worker implementation based on the worker-type alias.
"""
for task in tasks:
async for task in tasks:
worker = task.setdefault("worker", {})
if "implementation" in task["worker"]:
yield task
Expand All @@ -855,8 +855,8 @@ def set_implementation(config, tasks):


@transforms.add
def set_defaults(config, tasks):
for task in tasks:
async def set_defaults(config, tasks):
async for task in tasks:
task.setdefault("always-target", False)
task.setdefault("optimization", None)
task.setdefault("needs-sccache", False)
Expand Down Expand Up @@ -899,8 +899,8 @@ def set_defaults(config, tasks):


@transforms.add
def task_name_from_label(config, tasks):
for task in tasks:
async def task_name_from_label(config, tasks):
async for task in tasks:
if "label" not in task:
if "name" not in task:
raise Exception("task has neither a name nor a label")
Expand All @@ -911,8 +911,8 @@ def task_name_from_label(config, tasks):


@transforms.add
def validate(config, tasks):
for task in tasks:
async def validate(config, tasks):
async for task in tasks:
validate_schema(
task_description_schema,
task,
Expand Down Expand Up @@ -949,8 +949,8 @@ def add_generic_index_routes(config, task):


@transforms.add
def process_treeherder_metadata(config, tasks):
for task in tasks:
async def process_treeherder_metadata(config, tasks):
async for task in tasks:
routes = task.get("routes", [])
extra = task.get("extra", {})
task_th = task.get("treeherder")
Expand Down Expand Up @@ -1021,8 +1021,8 @@ def process_treeherder_metadata(config, tasks):


@transforms.add
def add_index_routes(config, tasks):
for task in tasks:
async def add_index_routes(config, tasks):
async for task in tasks:
index = task.get("index", {})

# The default behavior is to rank tasks according to their tier
Expand Down Expand Up @@ -1053,8 +1053,8 @@ def add_index_routes(config, tasks):


@transforms.add
def build_task(config, tasks):
for task in tasks:
async def build_task(config, tasks):
async for task in tasks:
level = str(config.params["level"])

provisioner_id, worker_type = get_worker_type(
Expand Down Expand Up @@ -1215,24 +1215,24 @@ def build_task(config, tasks):


@transforms.add
def add_github_checks(config, tasks):
async def add_github_checks(config, tasks):
"""
For git repositories, add checks route to all tasks.
This will be replaced by a configurable option in the future.
"""
if config.params["repository_type"] != "git":
for task in tasks:
async for task in tasks:
yield task

for task in tasks:
async for task in tasks:
task["task"]["routes"].append("checks")
yield task


@transforms.add
def chain_of_trust(config, tasks):
for task in tasks:
async def chain_of_trust(config, tasks):
async for task in tasks:
if task["task"].get("payload", {}).get("features", {}).get("chainOfTrust"):
image = task.get("dependencies", {}).get("docker-image")
if image:
Expand All @@ -1246,12 +1246,12 @@ def chain_of_trust(config, tasks):


@transforms.add
def check_task_identifiers(config, tasks):
async def check_task_identifiers(config, tasks):
"""Ensures that all tasks have well defined identifiers:
``^[a-zA-Z0-9_-]{1,38}$``
"""
e = re.compile("^[a-zA-Z0-9_-]{1,38}$")
for task in tasks:
async for task in tasks:
for attrib in ("workerType", "provisionerId"):
if not e.match(task["task"][attrib]):
raise Exception(
Expand All @@ -1263,9 +1263,9 @@ def check_task_identifiers(config, tasks):


@transforms.add
def check_task_dependencies(config, tasks):
async def check_task_dependencies(config, tasks):
"""Ensures that tasks don't have more than 100 dependencies."""
for task in tasks:
async for task in tasks:
number_of_dependencies = (
len(task["dependencies"])
+ len(task["if-dependencies"])
Expand Down Expand Up @@ -1311,7 +1311,7 @@ def check_caches_are_volumes(task):


@transforms.add
def check_run_task_caches(config, tasks):
async def check_run_task_caches(config, tasks):
"""Audit for caches requiring run-task.
run-task manages caches in certain ways. If a cache managed by run-task
Expand All @@ -1336,7 +1336,7 @@ def check_run_task_caches(config, tasks):

suffix = _run_task_suffix()

for task in tasks:
async for task in tasks:
payload = task["task"].get("payload", {})
command = payload.get("command") or [""]

Expand Down
4 changes: 2 additions & 2 deletions src/taskgraph/transforms/task_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@


@transforms.add
def render_task(config, jobs):
for job in jobs:
async def render_task(config, jobs):
async for job in jobs:
sub_config = job.pop("task-context")
params_context = {}
for var, path in sub_config.pop("from-parameters", {}).items():
Expand Down

0 comments on commit ba99553

Please sign in to comment.