Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] [Only for the long running PW job] Pathways gcsfuse intergation #401

Draft
wants to merge 8 commits into
base: develop
Choose a base branch
from
234 changes: 161 additions & 73 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@
xpk.google.com/workload: {args.workload}
annotations:
{storage_annotations}
gke-gcsfuse/volumes: "true"
gke-gcsfuse/cpu-limit: "500m"
gke-gcsfuse/memory-limit: "350Gi"
gke-gcsfuse/ephemeral-storage-limit: "40Gi"
spec:
schedulerName: {args.scheduler}
restartPolicy: Never
Expand All @@ -130,10 +134,22 @@
dnsPolicy: ClusterFirstWithHostNet
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
containers:
- name: gke-gcsfuse-sidecar
image: gcr.io/gcs-tess/gcs-fuse-csi-driver-sidecar-mounter:v2.10.0_linux_amd64
{container}
serviceAccountName: {service_account}
volumes:
{volumes}
- name: gcs-ckpt-pvc
persistentVolumeClaim:
claimName: ckpt-bucket-pvc
- name: gcs-dataset-pvc
persistentVolumeClaim:
claimName: cached-dataset-bucket-pvc
- name: gke-gcsfuse-cache
emptyDir:
medium: Memory
sizeLimit: 100Gi
"""


Expand Down Expand Up @@ -260,7 +276,112 @@
operator: "All"
targetReplicatedJobs:
- {args.targetReplicatedJob}
startupPolicy:
startupPolicyOrder: InOrder
replicatedJobs:
- name: rm
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_rm_args}
env:
- name: REPLICATED_JOB_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name']
- name: JOBSET_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name']
- name: HOST_ADDRESS
value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME)
- name: TPU_SKIP_MDS_QUERY
value: "true"
- name: PROJECT_ID
value: {args.project}
- name: LOCATION
value: {args.zone}
- name: CLUSTER_NAME
value: {args.cluster}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: CONTAINER_NAME
value: "pathways-rm"
- name: NAMESPACE
value: "cloud_prod"
image: {args.server_image}
imagePullPolicy: Always
name: pathways-rm
ports:
- containerPort: 29001
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
name: shared-tmp
nodeSelector:
cloud.google.com/gke-nodepool: cpu-rm-np
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: proxy
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_proxy_args}
env:
- name: PROJECT_ID
value: {args.project}
- name: LOCATION
value: {args.zone}
- name: CLUSTER_NAME
value: {args.cluster}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: CONTAINER_NAME
value: "pathways-proxy"
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: {args.proxy_server_image}
imagePullPolicy: Always
name: pathways-proxy
ports:
- containerPort: 29000
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
nodeSelector:
cloud.google.com/gke-nodepool: cpu-proxy-np
{user_workload}
- name: worker
replicas: {args.num_slices}
template:
Expand All @@ -277,10 +398,16 @@
metadata:
annotations:
{storage_annotations}
gke-gcsfuse/volumes: "true"
gke-gcsfuse/cpu-limit: "500m"
gke-gcsfuse/memory-limit: "350Gi"
gke-gcsfuse/ephemeral-storage-limit: "40Gi"
spec:
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
serviceAccountName: {service_account}
containers:
- name: gke-gcsfuse-sidecar
image: gcr.io/gcs-tess/gcs-fuse-csi-driver-sidecar-mounter:v2.10.0_linux_amd64
- args:
{pathways_worker_args}
image: {args.server_image}
Expand All @@ -298,6 +425,12 @@
volumeMounts:
- mountPath: /tmp
name: shared-tmp
- mountPath: /tmp/gcsfuse
name: gcs-ckpt-pvc
readOnly: false
- mountPath: /tmp/dataset
name: gcs-dataset-pvc
readOnly: false
{storage_volume_mounts}
env:
# Workaround for v6e
Expand All @@ -321,6 +454,20 @@
fieldPath: "metadata.labels['jobset.sigs.k8s.io/job-index']"
- name: MEGASCALE_COORDINATOR_ADDRESS
value: "$(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-$(MEGASCALE_SLICE_ID)-0.$(JOBSET_NAME)"
- name: PROJECT_ID
value: {args.project}
- name: LOCATION
value: {args.zone}
- name: CLUSTER_NAME
value: {args.cluster}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: CONTAINER_NAME
value: "pathways-worker"
- name: NAMESPACE
value: "cloud_prod"
{pathways_sidecar_container}
nodeSelector:
{accelerator_label}
Expand All @@ -334,79 +481,17 @@
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: gcs-ckpt-pvc
persistentVolumeClaim:
claimName: ckpt-bucket-pvc
- name: gcs-dataset-pvc
persistentVolumeClaim:
claimName: cached-dataset-bucket-pvc
- name: gke-gcsfuse-cache
emptyDir:
medium: Memory
sizeLimit: 100Gi
{storage_volumes}
- name: rm
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_rm_args}
env:
- name: REPLICATED_JOB_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name']
- name: JOBSET_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name']
- name: HOST_ADDRESS
value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME)
- name: TPU_SKIP_MDS_QUERY
value: "true"
image: {args.server_image}
imagePullPolicy: Always
name: pathways-rm
ports:
- containerPort: 29001
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
name: shared-tmp
nodeSelector:
cloud.google.com/gke-nodepool: cpu-rm-np
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: proxy
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_proxy_args}
image: {args.proxy_server_image}
imagePullPolicy: Always
name: pathways-proxy
ports:
- containerPort: 29000
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
nodeSelector:
cloud.google.com/gke-nodepool: cpu-proxy-np
{user_workload}
"""


Expand Down Expand Up @@ -534,7 +619,10 @@ def workload_create(args) -> None:
- PodFailurePolicy"""
restart_on_exit_codes = get_restart_exit_codes(args)
restart_on_exit_codes = ','.join(map(str, restart_on_exit_codes))
pod_failure_policy = f"""
if args.use_pathways == True:
pod_failure_policy = ''
else:
pod_failure_policy = f"""
podFailurePolicy:
rules:
- action: FailJob
Expand Down
12 changes: 12 additions & 0 deletions src/xpk/core/docker_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ def get_volume_mounts(args, system: SystemCharacteristics) -> str:
"""
volume_mount_yaml = """- mountPath: /dev/shm
name: dshm-2
- mountPath: /tmp/dataset
name: gcs-dataset-pvc
readOnly: false
- mountPath: /tmp/gcsfuse
name: gcs-ckpt-pvc
readOnly: false
"""

if args.ramdisk_directory != '':
Expand All @@ -237,6 +243,12 @@ def get_volume_mounts(args, system: SystemCharacteristics) -> str:
if args.use_pathways:
volume_mount_yaml = """- mountPath: /tmp
name: shared-tmp
- mountPath: /tmp/dataset
name: gcs-dataset-pvc
readOnly: false
- mountPath: /tmp/gcsfuse
name: gcs-ckpt-pvc
readOnly: false
"""
elif (
system.accelerator_type == AcceleratorType['TPU']
Expand Down
2 changes: 1 addition & 1 deletion src/xpk/core/nodepool.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def run_gke_node_pool_create_command(
create_commands.append(command)
create_task_names.append(task)

desired_pw_cpu_node_pools = ['cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np']
desired_pw_cpu_node_pools = ['cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np', 'high-mem-pool']
if args.enable_pathways:
# Pathways needs CPU nodepools in addition to TPU nodepools
for node_pool_name in desired_pw_cpu_node_pools:
Expand Down
Loading