Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamz #400

Draft
wants to merge 8 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,21 @@
containers:
- args:
{pathways_worker_args}
env:
- name: PROJECT_ID
value: {args.project}
- name: LOCATION
value: {args.zone}
- name: CLUSTER_NAME
value: {args.cluster}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: CONTAINER_NAME
value: "pathways-worker"
- name: NAMESPACE
value: "cloud_prod"
image: {args.server_image}
imagePullPolicy: Always
name: pathways-worker
Expand Down Expand Up @@ -295,6 +310,20 @@
value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME)
- name: TPU_SKIP_MDS_QUERY
value: "true"
- name: PROJECT_ID
value: {args.project}
- name: LOCATION
value: {args.zone}
- name: CLUSTER_NAME
value: {args.cluster}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: CONTAINER_NAME
value: "pathways-rm"
- name: NAMESPACE
value: "cloud_prod"
image: {args.server_image}
imagePullPolicy: Always
name: pathways-rm
Expand Down Expand Up @@ -329,6 +358,23 @@
containers:
- args:
{pathways_proxy_args}
env:
- name: PROJECT_ID
value: {args.project}
- name: LOCATION
value: {args.zone}
- name: CLUSTER_NAME
value: {args.cluster}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: CONTAINER_NAME
value: "pathways-proxy"
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: {args.proxy_server_image}
imagePullPolicy: Always
name: pathways-proxy
Expand All @@ -352,6 +398,13 @@ def workload_create_pathways(args) -> None:
0 if successful and 1 otherwise.
"""
args.use_pathways = True
if args.headless:
xpk_print(
'Please use kubectl port forwarding to connect to the Pathways proxy.'
' kubectl get pods kubectl port-forward <proxy-pod-name> 29000:29000'
' JAX_PLATFORMS=proxy JAX_BACKEND_TARGET=grpc://127.0.0.1:29000 python'
" -c 'import pathwaysutils; import jax; print(jax.devices())'"
)
workload_create(args)


Expand All @@ -366,14 +419,6 @@ def workload_create(args) -> None:
"""
add_zone_and_project(args)

if args.headless:
xpk_print(
'Please use kubectl port forwarding to connect to the Pathways proxy.'
' kubectl get pods kubectl port-forward <proxy-pod-name> 29000:29000'
' JAX_PLATFORMS=proxy JAX_BACKEND_TARGET=grpc://127.0.0.1:29000 python'
" -c 'import pathwaysutils; import jax; print(jax.devices())'"
)

set_cluster_command_code = set_cluster_command(args)
if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)
Expand Down
35 changes: 31 additions & 4 deletions src/xpk/core/pathways.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def get_pathways_worker_args(args) -> str:
- --resource_manager_address={rm_address}
- --gcs_scratch_location={args.pathways_gcs_location}"""
if args.use_pathways:
if args.custom_pathways_worker_args:
yaml = append_custom_pathways_args(yaml, args.custom_pathways_worker_args)
return yaml.format(args=args, rm_address=get_rm_address(args))
else:
return ''
Expand All @@ -62,6 +64,10 @@ def get_pathways_proxy_args(args) -> str:
- --gcs_scratch_location={args.pathways_gcs_location}"""

if args.use_pathways:
if args.custom_pathways_proxy_server_args:
yaml = append_custom_pathways_args(
yaml, args.custom_pathways_proxy_server_args
)
return yaml.format(args=args, rm_address=get_rm_address(args))
else:
return ''
Expand Down Expand Up @@ -174,15 +180,12 @@ def ensure_pathways_workload_prerequisites(args, system) -> bool:

def get_pathways_unified_query_link(args) -> str:
"""Get the unified query link for the pathways workload."""
pw_suffixes = ['main', 'rm', 'proxy']
pw_pod_names = [f'"{args.workload}-{suffix}-0"' for suffix in pw_suffixes]
pw_pod_names_query = '%20OR%20'.join(pw_pod_names + ['worker-0-0'])
query_params = (
'resource.type%3D"k8s_container"%0A'
f'resource.labels.project_id%3D"{args.project}"%0A'
f'resource.labels.location%3D"{zone_to_region(args.zone)}"%0A'
f'resource.labels.cluster_name%3D"{args.cluster}"%0A'
f'resource.labels.pod_name:{pw_pod_names_query}%0A'
f'resource.labels.pod_name:"{args.workload}-"%0A'
'severity>%3DDEFAULT'
)

Expand All @@ -203,6 +206,8 @@ def get_pathways_rm_args(args, system: SystemCharacteristics) -> str:
- --instance_count={instance_count}
- --instance_type={instance_type}"""
if args.use_pathways:
if args.custom_pathways_server_args:
yaml = append_custom_pathways_args(yaml, args.custom_pathways_server_args)
return yaml.format(
args=args,
instance_count=args.num_slices,
Expand All @@ -212,6 +217,28 @@ def get_pathways_rm_args(args, system: SystemCharacteristics) -> str:
return ''


def append_custom_pathways_args(yaml, custom_args) -> str:
"""Append custom Pathways args to the YAML with proper indentation.

Args:
yaml (string): existing yaml containing args

Returns:
yaml (string): yaml with additional args appended.
"""
second_line = yaml.split('\n')[1]
if (
not second_line
): # to cover edge case if only one arg remains, we would have to look at the entire YAML in this case.
return yaml
# Calculate the indentation based on the second line of existing YAML.
indentation = ' ' * (len(second_line) - len(second_line.lstrip()))
custom_args = custom_args.split(' ')
for arg in custom_args:
yaml += '\n' + indentation + '- ' + arg
return yaml


def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str:
"""
Create a user workload container for Pathways.
Expand Down
5 changes: 2 additions & 3 deletions src/xpk/parser/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,10 @@ def set_cluster_parser(cluster_parser):
'--enable-pathways',
action='store_true',
help=(
'DEPRECATING SOON!!! Please use `xpk cluster create-pathways`.'
' Enable cluster to accept Pathways workloads.'
'Please use `xpk cluster create-pathways` instead to'
' enable cluster to accept Pathways workloads.'
),
)

### Autoprovisioning arguments specific to "cluster create"
cluster_create_autoprovisioning_arguments = (
cluster_create_parser.add_argument_group(
Expand Down
142 changes: 82 additions & 60 deletions src/xpk/parser/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ def set_workload_parsers(workload_parser):
'Arguments for configuring autoprovisioning.',
)
)
workload_pathways_workload_arguments = workload_create_parser.add_argument_group(
'Pathways Image Arguments',
'If --use-pathways is provided, user wants to set up a'
'Pathways workload on xpk.',
)

workload_vertex_tensorboard_arguments = (
workload_create_parser.add_argument_group(
'Vertex Tensorboard Arguments',
Expand Down Expand Up @@ -151,6 +147,15 @@ def set_workload_parsers(workload_parser):
),
)

workload_create_parser_optional_arguments.add_argument(
'--use-pathways',
action='store_true',
help=(
'Please use `xpk workload create-pathways` instead to'
' create Pathways workloads.'
),
)

# Autoprovisioning workload arguments
workload_create_autoprovisioning_arguments.add_argument(
'--on-demand',
Expand Down Expand Up @@ -178,16 +183,6 @@ def set_workload_parsers(workload_parser):
),
)

# Pathways workload arguments
workload_pathways_workload_arguments.add_argument(
'--use-pathways',
action='store_true',
help=(
'DECRATING SOON!!! Please use `xpk workload create-pathways` instead.'
' Provide this argument to create Pathways workloads.'
),
)

# "workload create-pathways" command parser.
workload_create_pathways_parser = workload_subcommands.add_parser(
'create-pathways', help='Create a new job.'
Expand Down Expand Up @@ -230,6 +225,45 @@ def set_workload_parsers(workload_parser):
help='The tpu type to use, v5litepod-16, etc.',
)

### "workload create-pathways" Optional arguments, specific to Pathways
workload_create_pathways_parser_optional_arguments.add_argument(
'--headless',
action='store_true',
help=(
'Please provide this argument to create Pathways workloads in'
' headless mode. This arg can only be used in `xpk workload'
' create-pathways`.'
),
)
workload_create_pathways_parser_optional_arguments.add_argument(
'--proxy-server-image',
type=str,
default=(
'us-docker.pkg.dev/cloud-tpu-v2-images/pathways/proxy_server:latest'
),
help=(
'Please provide the proxy server image for Pathways. This arg can'
' only be used in `xpk workload create-pathways`.'
),
)
workload_create_pathways_parser_optional_arguments.add_argument(
'--server-image',
type=str,
default='us-docker.pkg.dev/cloud-tpu-v2-images/pathways/server:latest',
help=(
'Please provide the server image for Pathways. This arg can only be'
' used in `xpk workload create-pathways`.'
),
)
workload_create_pathways_parser_optional_arguments.add_argument(
'--pathways-gcs-location',
type=str,
default='gs://cloud-pathways-staging/tmp',
help=(
'Please provide the GCS location to store Pathways artifacts. This'
' arg can only be used in `xpk workload create-pathways`.'
),
)
workload_create_pathways_parser_optional_arguments.add_argument(
'--command',
type=str,
Expand All @@ -244,6 +278,39 @@ def set_workload_parsers(workload_parser):
required=False,
)

workload_create_pathways_parser_optional_arguments.add_argument(
'--custom-pathways-server-args',
type=str,
default=None,
help=(
'Provide custom Pathways server args as follows -'
" --custom-pathways-server-args='--arg_1=xxx --arg2=yyy'"
),
required=False,
)

workload_create_pathways_parser_optional_arguments.add_argument(
'--custom-pathways-proxy-server-args',
type=str,
default=None,
help=(
'Provide custom Pathways proxy server args as follows -'
" --custom-pathways-proxy-server-args='--arg_1=xxx --arg2=yyy'"
),
required=False,
)

workload_create_pathways_parser_optional_arguments.add_argument(
'--custom-pathways-worker-args',
type=str,
default=None,
help=(
'Provide custom Pathways worker args as follows -'
" --custom-pathways-worker-args='--arg_1=xxx --arg2=yyy'"
),
required=False,
)

add_shared_workload_create_required_arguments([
workload_create_parser_required_arguments,
workload_create_pathways_parser_required_arguments,
Expand Down Expand Up @@ -522,51 +589,6 @@ def add_shared_workload_create_optional_arguments(args_parsers):
' default on Pathways workloads.'
),
)
custom_parser.add_argument(
'--headless',
action='store_true',
help=(
'Please provide this argument to create Pathways workloads in'
' headless mode. This arg can only be used in `xpk workload'
' create-pathways`(preferred) or `xpk workload create'
' --use-pathways.` (--use-pathways will be deprecated soon).'
),
)
custom_parser.add_argument(
'--proxy-server-image',
type=str,
default=(
'us-docker.pkg.dev/cloud-tpu-v2-images/pathways/proxy_server:latest'
),
help=(
'Please provide the proxy server image for Pathways. This arg can'
' only be used in `xpk workload create-pathways`(preferred) or `xpk'
' workload create --use-pathways.` (--use-pathways will be'
' deprecated soon).'
),
)
custom_parser.add_argument(
'--server-image',
type=str,
default='us-docker.pkg.dev/cloud-tpu-v2-images/pathways/server:latest',
help=(
'Please provide the server image for Pathways. This arg can only be'
' used in `xpk workload create-pathways`(preferred) or `xpk'
' workload create --use-pathways.` (--use-pathways will be'
' deprecated soon).'
),
)
custom_parser.add_argument(
'--pathways-gcs-location',
type=str,
default='gs://cloud-pathways-staging/tmp',
help=(
'Please provide the GCS location to store Pathways artifacts. This'
' arg can only be used in `xpk workload create-pathways`(preferred)'
' or `xpk workload create --use-pathways.` (--use-pathways will be'
' deprecated soon).'
),
)
custom_parser.add_argument(
'--ramdisk-directory',
type=str,
Expand Down
Loading