-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] add driver/executor pod in Spark #3016
base: master
Are you sure you want to change the base?
Changes from 7 commits
8cc081d
7793398
b21d1e3
00d0c3a
1b7c1c9
32f6aa9
167a390
9bdae27
8c826c0
d6b752b
900113f
68bef7d
5a12025
30cff7c
e17ee57
7f4e00b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -139,6 +139,7 @@ | |
pod_template: "PodTemplate", | ||
primary_container: "task_models.Container", | ||
settings: SerializationSettings, | ||
task_type: str = "", | ||
) -> Dict[str, Any]: | ||
# import here to avoid circular import | ||
from kubernetes.client import ApiClient, V1PodSpec | ||
|
@@ -169,15 +170,18 @@ | |
# with the values given to ContainerTask. | ||
# The attributes include: image, command, args, resource, and env (env is unioned) | ||
|
||
is_primary = False | ||
if container.name == cast(PodTemplate, pod_template).primary_container_name: | ||
if container.image is None: | ||
# Copy the image from primary_container only if the image is not specified in the pod spec. | ||
container.image = primary_container.image | ||
else: | ||
container.image = get_registerable_container_image(container.image, settings.image_config) | ||
|
||
container.command = primary_container.command | ||
container.args = primary_container.args | ||
if task_type != "spark": | ||
# for spark driver/executor, do not use the command and args from task podTemplate | ||
container.command = primary_container.command | ||
container.args = primary_container.args | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider extracting Spark container logic
Consider extracting the Spark-specific container command/args logic into a separate helper function to improve code organization and readability. The current nested if condition makes the code harder to follow. Code suggestionCheck the AI-generated fix before applying
Code Review Run #27c6ae Is this a valid issue, or was it incorrectly flagged by the Agent?
|
||
|
||
limits, requests = {}, {} | ||
for resource in primary_container.resources.limits: | ||
|
@@ -192,9 +196,14 @@ | |
container.env = [V1EnvVar(name=key, value=val) for key, val in primary_container.env.items()] + ( | ||
container.env or [] | ||
) | ||
is_primary = True | ||
else: | ||
container.image = get_registerable_container_image(container.image, settings.image_config) | ||
|
||
if task_type == "spark" and not is_primary: | ||
# for spark driver/executor, only take the primary container | ||
continue | ||
|
||
final_containers.append(container) | ||
cast(V1PodSpec, pod_template.pod_spec).containers = final_containers | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use this function to create a k8sPod from podTemplate.
flytekit/flytekit/models/task.py
Lines 1079 to 1083 in 2ef875c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the information! I changed using this function and remove
task_type
in_serialize_pod_spec