diff --git a/src/taskgraph/transforms/cached_tasks.py b/src/taskgraph/transforms/cached_tasks.py index 57a55dffb..8f840c2dd 100644 --- a/src/taskgraph/transforms/cached_tasks.py +++ b/src/taskgraph/transforms/cached_tasks.py @@ -50,7 +50,7 @@ def format_task_digest(cached_task): @transforms.add -def cache_task(config, tasks): +async def cache_task(config, tasks): if taskgraph.fast: for task in tasks: yield task @@ -61,7 +61,7 @@ def cache_task(config, tasks): if "cached_task" in task.attributes: digests[task.label] = format_task_digest(task.attributes["cached_task"]) - for task in order_tasks(config, tasks): + for task in order_tasks(config, [t async for t in tasks]): cache = task.pop("cache", None) if cache is None: yield task diff --git a/src/taskgraph/transforms/chunking.py b/src/taskgraph/transforms/chunking.py index 31d7eff82..b6bac16d7 100644 --- a/src/taskgraph/transforms/chunking.py +++ b/src/taskgraph/transforms/chunking.py @@ -50,8 +50,8 @@ @transforms.add -def chunk_tasks(config, tasks): - for task in tasks: +async def chunk_tasks(config, tasks): + async for task in tasks: chunk_config = task.pop("chunk", None) if not chunk_config: yield task diff --git a/src/taskgraph/transforms/code_review.py b/src/taskgraph/transforms/code_review.py index bdb655b97..a6bfa9538 100644 --- a/src/taskgraph/transforms/code_review.py +++ b/src/taskgraph/transforms/code_review.py @@ -12,8 +12,8 @@ @transforms.add -def add_dependencies(config, jobs): - for job in jobs: +async def add_dependencies(config, jobs): + async for job in jobs: job.setdefault("soft-dependencies", []) job["soft-dependencies"] += [ dep_task.label diff --git a/src/taskgraph/transforms/docker_image.py b/src/taskgraph/transforms/docker_image.py index d0c5b9c97..17ba3e7fb 100644 --- a/src/taskgraph/transforms/docker_image.py +++ b/src/taskgraph/transforms/docker_image.py @@ -65,7 +65,7 @@ @transforms.add -def fill_template(config, tasks): +async def fill_template(config, tasks): available_packages = set() for task in config.kind_dependencies_tasks.values(): if task.kind != "packages": @@ -75,13 +75,11 @@ def fill_template(config, tasks): context_hashes = {} - tasks = list(tasks) - if not taskgraph.fast and config.write_artifacts: if not os.path.isdir(CONTEXTS_DIR): os.makedirs(CONTEXTS_DIR) - for task in tasks: + async for task in tasks: image_name = task.pop("name") job_symbol = task.pop("symbol", None) args = task.pop("args", {}) diff --git a/src/taskgraph/transforms/fetch.py b/src/taskgraph/transforms/fetch.py index bcb8ff38a..e24c0183f 100644 --- a/src/taskgraph/transforms/fetch.py +++ b/src/taskgraph/transforms/fetch.py @@ -78,9 +78,9 @@ def wrap(func): @transforms.add -def process_fetch_job(config, jobs): +async def process_fetch_job(config, jobs): # Converts fetch-url entries to the job schema. - for job in jobs: + async for job in jobs: typ = job["fetch"]["type"] name = job["name"] fetch = job.pop("fetch") @@ -103,7 +103,7 @@ def configure_fetch(config, typ, name, fetch): @transforms.add -def make_task(config, jobs): +async def make_task(config, jobs): # Fetch tasks are idempotent and immutable. Have them live for # essentially forever. if config.params["level"] == "3": @@ -111,7 +111,7 @@ def make_task(config, jobs): else: expires = "28 days" - for job in jobs: + async for job in jobs: name = job["name"] artifact_prefix = job.get("artifact-prefix", "public") env = job.get("env", {}) diff --git a/src/taskgraph/transforms/from_deps.py b/src/taskgraph/transforms/from_deps.py index 337d68e4b..c476c1038 100644 --- a/src/taskgraph/transforms/from_deps.py +++ b/src/taskgraph/transforms/from_deps.py @@ -113,8 +113,8 @@ @transforms.add -def from_deps(config, tasks): - for task in tasks: +async def from_deps(config, tasks): + async for task in tasks: # Setup and error handling. from_deps = task.pop("from-deps") kind_deps = config.config.get("kind-dependencies", []) diff --git a/src/taskgraph/transforms/job/__init__.py b/src/taskgraph/transforms/job/__init__.py index d86eff3ef..5ad6def7d 100644 --- a/src/taskgraph/transforms/job/__init__.py +++ b/src/taskgraph/transforms/job/__init__.py @@ -112,8 +112,8 @@ @transforms.add -def rewrite_when_to_optimization(config, jobs): - for job in jobs: +async def rewrite_when_to_optimization(config, jobs): + async for job in jobs: when = job.pop("when", {}) if not when: yield job @@ -132,8 +132,8 @@ def rewrite_when_to_optimization(config, jobs): @transforms.add -def set_implementation(config, jobs): - for job in jobs: +async def set_implementation(config, jobs): + async for job in jobs: impl, os = worker_type_implementation(config.graph_config, job["worker-type"]) if os: job.setdefault("tags", {})["os"] = os @@ -148,8 +148,8 @@ def set_implementation(config, jobs): @transforms.add -def set_label(config, jobs): - for job in jobs: +async def set_label(config, jobs): + async for job in jobs: if "label" not in job: if "name" not in job: raise Exception("job has neither a name nor a label") @@ -160,8 +160,8 @@ def set_label(config, jobs): @transforms.add -def add_resource_monitor(config, jobs): - for job in jobs: +async def add_resource_monitor(config, jobs): + async for job in jobs: if job.get("attributes", {}).get("resource-monitor"): worker_implementation, worker_os = worker_type_implementation( config.graph_config, job["worker-type"] @@ -204,13 +204,13 @@ def get_attribute(dict, key, attributes, attribute_name): @transforms.add -def use_fetches(config, jobs): +async def use_fetches(config, jobs): artifact_names = {} aliases = {} extra_env = {} + jobs = [j async for j in jobs] if config.kind in ("toolchain", "fetch"): - jobs = list(jobs) for job in jobs: run = job.get("run", {}) label = job["label"] @@ -353,12 +353,12 @@ def cmp_artifacts(a): @transforms.add -def make_task_description(config, jobs): +async def make_task_description(config, jobs): """Given a build description, create a task description""" # import plugin modules first, before iterating over jobs import_sibling_modules(exceptions=("common.py",)) - for job in jobs: + async for job in jobs: # always-optimized tasks never execute, so have no workdir if job["worker"]["implementation"] in ("docker-worker", "generic-worker"): job["run"].setdefault("workdir", "/builds/worker") diff --git a/src/taskgraph/transforms/notify.py b/src/taskgraph/transforms/notify.py index a61e7999c..8e9b4229a 100644 --- a/src/taskgraph/transforms/notify.py +++ b/src/taskgraph/transforms/notify.py @@ -140,8 +140,8 @@ def _convert_content(content): @transforms.add -def add_notifications(config, tasks): - for task in tasks: +async def add_notifications(config, tasks): + async for task in tasks: label = "{}-{}".format(config.kind, task["name"]) if "notifications" in task: notify = _convert_legacy(config, task.pop("notifications"), label) diff --git a/src/taskgraph/transforms/task.py b/src/taskgraph/transforms/task.py index c55de7851..b7f20fdff 100644 --- a/src/taskgraph/transforms/task.py +++ b/src/taskgraph/transforms/task.py @@ -831,11 +831,11 @@ def build_dummy_payload(config, task, task_def): @transforms.add -def set_implementation(config, tasks): +async def set_implementation(config, tasks): """ Set the worker implementation based on the worker-type alias. """ - for task in tasks: + async for task in tasks: worker = task.setdefault("worker", {}) if "implementation" in task["worker"]: yield task @@ -855,8 +855,8 @@ def set_implementation(config, tasks): @transforms.add -def set_defaults(config, tasks): - for task in tasks: +async def set_defaults(config, tasks): + async for task in tasks: task.setdefault("always-target", False) task.setdefault("optimization", None) task.setdefault("needs-sccache", False) @@ -899,8 +899,8 @@ def set_defaults(config, tasks): @transforms.add -def task_name_from_label(config, tasks): - for task in tasks: +async def task_name_from_label(config, tasks): + async for task in tasks: if "label" not in task: if "name" not in task: raise Exception("task has neither a name nor a label") @@ -911,8 +911,8 @@ def task_name_from_label(config, tasks): @transforms.add -def validate(config, tasks): - for task in tasks: +async def validate(config, tasks): + async for task in tasks: validate_schema( task_description_schema, task, @@ -949,8 +949,8 @@ def add_generic_index_routes(config, task): @transforms.add -def process_treeherder_metadata(config, tasks): - for task in tasks: +async def process_treeherder_metadata(config, tasks): + async for task in tasks: routes = task.get("routes", []) extra = task.get("extra", {}) task_th = task.get("treeherder") @@ -1021,8 +1021,8 @@ def process_treeherder_metadata(config, tasks): @transforms.add -def add_index_routes(config, tasks): - for task in tasks: +async def add_index_routes(config, tasks): + async for task in tasks: index = task.get("index", {}) # The default behavior is to rank tasks according to their tier @@ -1053,8 +1053,8 @@ def add_index_routes(config, tasks): @transforms.add -def build_task(config, tasks): - for task in tasks: +async def build_task(config, tasks): + async for task in tasks: level = str(config.params["level"]) provisioner_id, worker_type = get_worker_type( @@ -1215,24 +1215,24 @@ def build_task(config, tasks): @transforms.add -def add_github_checks(config, tasks): +async def add_github_checks(config, tasks): """ For git repositories, add checks route to all tasks. This will be replaced by a configurable option in the future. """ if config.params["repository_type"] != "git": - for task in tasks: + async for task in tasks: yield task - for task in tasks: + async for task in tasks: task["task"]["routes"].append("checks") yield task @transforms.add -def chain_of_trust(config, tasks): - for task in tasks: +async def chain_of_trust(config, tasks): + async for task in tasks: if task["task"].get("payload", {}).get("features", {}).get("chainOfTrust"): image = task.get("dependencies", {}).get("docker-image") if image: @@ -1246,12 +1246,12 @@ def chain_of_trust(config, tasks): @transforms.add -def check_task_identifiers(config, tasks): +async def check_task_identifiers(config, tasks): """Ensures that all tasks have well defined identifiers: ``^[a-zA-Z0-9_-]{1,38}$`` """ e = re.compile("^[a-zA-Z0-9_-]{1,38}$") - for task in tasks: + async for task in tasks: for attrib in ("workerType", "provisionerId"): if not e.match(task["task"][attrib]): raise Exception( @@ -1263,9 +1263,9 @@ def check_task_identifiers(config, tasks): @transforms.add -def check_task_dependencies(config, tasks): +async def check_task_dependencies(config, tasks): """Ensures that tasks don't have more than 100 dependencies.""" - for task in tasks: + async for task in tasks: number_of_dependencies = ( len(task["dependencies"]) + len(task["if-dependencies"]) @@ -1311,7 +1311,7 @@ def check_caches_are_volumes(task): @transforms.add -def check_run_task_caches(config, tasks): +async def check_run_task_caches(config, tasks): """Audit for caches requiring run-task. run-task manages caches in certain ways. If a cache managed by run-task @@ -1336,7 +1336,7 @@ def check_run_task_caches(config, tasks): suffix = _run_task_suffix() - for task in tasks: + async for task in tasks: payload = task["task"].get("payload", {}) command = payload.get("command") or [""] diff --git a/src/taskgraph/transforms/task_context.py b/src/taskgraph/transforms/task_context.py index 5c7ed6af8..14db94dda 100644 --- a/src/taskgraph/transforms/task_context.py +++ b/src/taskgraph/transforms/task_context.py @@ -81,8 +81,8 @@ @transforms.add -def render_task(config, jobs): - for job in jobs: +async def render_task(config, jobs): + async for job in jobs: sub_config = job.pop("task-context") params_context = {} for var, path in sub_config.pop("from-parameters", {}).items():