Skip to content

Commit

Permalink
Remove worker service, and use pod ip instead. (sql-machine-learning#…
Browse files Browse the repository at this point in the history
…2275)

* remove worker service

* remove def

* fix
  • Loading branch information
skydoorkai authored Sep 1, 2020
1 parent 34f1d4e commit 9c39637
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 78 deletions.
31 changes: 0 additions & 31 deletions elasticdl/python/common/k8s_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
from elasticdl_client.common.k8s_client import append_pod_ip_to_env

_PS_SERVICE_PORT = 2222
_WORKER_SERVICE_PORT = 3333
_FTLIB_GOSSIP_CONTAINER_PORT = 7946


def get_worker_pod_name(job_name, worker_id):
Expand Down Expand Up @@ -101,14 +99,6 @@ def _get_service_address(self, service_name, port):
def get_worker_pod_name(self, worker_id):
return get_worker_pod_name(self.job_name, worker_id)

def get_worker_service_name(self, worker_id):
return self.get_worker_pod_name(worker_id)

def get_worker_service_address(self, worker_id):
return self._get_service_address(
self.get_worker_service_name(worker_id), _WORKER_SERVICE_PORT
)

def get_ps_pod_name(self, ps_id):
return get_ps_pod_name(self.job_name, ps_id)

Expand Down Expand Up @@ -151,17 +141,6 @@ def get_ps_service(self, ps_id):
logger.warning("Exception when reading PS service: %s\n" % e)
return None

def get_worker_service(self, worker_id):
try:
return self.client.read_namespaced_service(
# Worker service has the same name as pod name
name=self.get_worker_service_name(worker_id),
namespace=self.namespace,
)
except client.api_client.ApiException as e:
logger.warning("Exception when reading worker service: %s\n" % e)
return None

def _create_ps_worker_pod(self, pod_name, type_key, index_key, **kargs):
# Find that master pod that will be used as the owner reference
# for the ps or worker pod.
Expand Down Expand Up @@ -246,16 +225,6 @@ def create_ps_service(self, ps_id):
owner=self.get_ps_pod(ps_id),
)

def create_worker_service(self, worker_id):
return self._create_service(
name=self.get_worker_service_name(worker_id),
port=_WORKER_SERVICE_PORT,
target_port=_WORKER_SERVICE_PORT,
replica_type="worker",
replica_index=worker_id,
owner=self.get_worker_pod(worker_id),
)

def _create_service(self, **kargs):
labels = self._get_common_labels()

Expand Down
52 changes: 33 additions & 19 deletions elasticdl/python/master/k8s_instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(

# Protects followed variables, which are accessed from event_cb.
self._lock = threading.Lock()
# worker id to (pod name, phase) mapping
# worker id to (pod name, ip, phase) mapping
# phase: None/Pending/Running/Succeeded/Failed/Unknown
# None: worker was just launched, haven't received event yet.
# Pending: worker pod not started yet
Expand All @@ -110,7 +110,7 @@ def __init__(
# no issue.
# Failed: worker pod is killed for some reason
# Unknown: unknown
self._worker_pods_phase = {}
self._worker_pods_ip_phase = {}
# pod name to worker id mapping
self._worker_pod_name_to_id = {}

Expand Down Expand Up @@ -164,8 +164,7 @@ def _start_worker(self, worker_id):
)
name = pod.metadata.name
self._worker_pod_name_to_id[name] = worker_id
self._worker_pods_phase[worker_id] = (name, None)
self._k8s_client.create_worker_service(worker_id)
self._worker_pods_ip_phase[worker_id] = (name, None, None)

def _start_ps(self, ps_id):
logger.info("Starting PS: %d" % ps_id)
Expand Down Expand Up @@ -217,7 +216,7 @@ def start_parameter_servers(self):
def _remove_worker(self, worker_id):
logger.info("Removing worker: %d", worker_id)
with self._lock:
if worker_id not in self._worker_pods_phase:
if worker_id not in self._worker_pods_ip_phase:
logger.error("Unknown worker id: %s" % worker_id)
return

Expand All @@ -236,7 +235,7 @@ def _remove_ps(self, ps_id):
def stop_relaunch_and_remove_workers(self):
with self._lock:
self._relaunch_deleted_live_worker = False
for worker_id in self._worker_pods_phase:
for worker_id in self._worker_pods_ip_phase:
self._k8s_client.delete_worker(worker_id)

def stop_relaunch_and_remove_all_ps(self):
Expand All @@ -247,7 +246,9 @@ def stop_relaunch_and_remove_all_ps(self):

def get_worker_counter(self):
with self._lock:
return Counter([v for _, v in self._worker_pods_phase.values()])
return Counter(
[v for _, _, v in self._worker_pods_ip_phase.values()]
)

def get_ps_counter(self):
with self._lock:
Expand All @@ -265,6 +266,7 @@ def _event_cb(self, event):
return

pod_name = evt_obj.metadata.name
pod_ip = evt_obj.status.pod_ip
phase = evt_obj.status.phase
if pod_name == self._k8s_client.get_master_pod_name():
# No need to care about master pod
Expand Down Expand Up @@ -311,9 +313,13 @@ def _event_cb(self, event):

if pod_name in self._worker_pod_name_to_id:
worker_id = self._worker_pod_name_to_id.get(pod_name)
self._worker_pods_phase[worker_id] = (pod_name, phase)
self._worker_pods_ip_phase[worker_id] = (
pod_name,
pod_ip,
phase,
)
if evt_type == "DELETED" or relaunch_failed_pod:
del self._worker_pods_phase[worker_id]
del self._worker_pods_ip_phase[worker_id]
del self._worker_pod_name_to_id[pod_name]

# If a deleted pod was not "Succeeded", relaunch a worker.
Expand All @@ -323,7 +329,11 @@ def _event_cb(self, event):
)
else:
workers_failed = []
for pod_name, phase in self._worker_pods_phase.values():
for (
pod_name,
_,
phase,
) in self._worker_pods_ip_phase.values():
workers_failed.append(phase == PodStatus.FAILED)
self.all_workers_failed = all(workers_failed)

Expand Down Expand Up @@ -359,30 +369,34 @@ def _event_cb(self, event):

def get_alive_workers(self):
alive_workers = []
for pod_name, phase in self._worker_pods_phase.values():
for pod_name, _, phase in self._worker_pods_ip_phase.values():
if phase == PodStatus.RUNNING:
alive_workers.append(pod_name)
return alive_workers

def get_worker_pod_ip(self, worker_id):
if worker_id not in self._worker_pods_ip_phase:
return None
_, pod_ip, _ = self._worker_pods_ip_phase[worker_id]
return pod_ip

def _get_alive_worker_addr(self):
alive_workers = self.get_alive_workers()
worker_service_addrs = []
worker_addrs = []
worker_start_times = []
for pod_name in alive_workers:
pod = self._k8s_client.get_pod(pod_name)
worker_start_times.append(pod.status.start_time)
worker_id = self._worker_pod_name_to_id[pod_name]
service_addr_port = self._k8s_client.get_worker_service_address(
worker_id
)
worker_service_addrs.append(service_addr_port.split(":")[0])
pod_ip = self.get_worker_pod_ip(worker_id)
worker_addrs.append(pod_ip)

# Sort worker addrs by start time. Then the master will assign
# the rank according to the order in addrs list.
worker_service_addrs = [
x for _, x in sorted(zip(worker_start_times, worker_service_addrs))
worker_addrs = [
x for _, x in sorted(zip(worker_start_times, worker_addrs))
]
return worker_service_addrs
return worker_addrs

@property
def ps_addrs(self):
Expand Down
4 changes: 1 addition & 3 deletions elasticdl/python/master/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ def get_worker_liveness_time(self, worker_id):

def get_comm_rank(self, request, _):
worker_id = request.worker_id
k8s_client = self._instance_manager._k8s_client
worker_address_port = k8s_client.get_worker_service_address(worker_id)
worker_host = worker_address_port.split(":")[0]
worker_host = self._instance_manager.get_worker_pod_ip(worker_id)

res = elasticdl_pb2.GetCommRankResponse()
res.rank_id = self._rendezvous_server.get_worker_host_rank(worker_host)
Expand Down
18 changes: 0 additions & 18 deletions elasticdl/python/tests/k8s_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,6 @@ def test_client(self):
worker.metadata.labels[k8s.ELASTICDL_REPLICA_INDEX_KEY], str(i)
)

# Start 3 worker services
for i in range(3):
c.create_worker_service(i)

# Check worker services
for i in range(3):
service = c.get_worker_service(i)
self.assertIsNotNone(service)
self.assertEqual(
service.spec.selector[k8s.ELASTICDL_JOB_KEY], c.job_name
)
self.assertEqual(
service.spec.selector[k8s.ELASTICDL_REPLICA_TYPE_KEY], "worker"
)
self.assertEqual(
service.spec.selector[k8s.ELASTICDL_REPLICA_INDEX_KEY], str(i)
)

# Start 2 ps pods
for i in range(2):
_ = c.create_ps(
Expand Down
7 changes: 5 additions & 2 deletions elasticdl/python/tests/k8s_instance_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ def test_relaunch_worker_pod(self):
current_workers = set()
live_workers = set()
with instance_manager._lock:
for k, (_, phase) in instance_manager._worker_pods_phase.items():
for (
k,
(_, _, phase),
) in instance_manager._worker_pods_ip_phase.items():
current_workers.add(k)
if phase in ["Running", "Pending"]:
live_workers.add(k)
Expand All @@ -169,7 +172,7 @@ def test_relaunch_worker_pod(self):
break
time.sleep(1)
with instance_manager._lock:
for k in instance_manager._worker_pods_phase:
for k in instance_manager._worker_pods_ip_phase:
if k not in range(num_workers, num_workers * 2):
found = True
else:
Expand Down
8 changes: 4 additions & 4 deletions elasticdl/python/tests/servicer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ def test_get_comm_rank(self):
["172.0.0.1", "172.0.0.2"]
)

k8s_client = Mock()
k8s_client.get_worker_service_address = MagicMock(
return_value="172.0.0.1:8080"
mock_instance_manager = Mock()
mock_instance_manager.get_worker_pod_ip = MagicMock(
return_value="172.0.0.1"
)
self.master.instance_manager = Mock(_k8s_client=k8s_client)
self.master.instance_manager = mock_instance_manager
master_servicer = MasterServicer(
3, evaluation_service=None, master=self.master
)
Expand Down
1 change: 0 additions & 1 deletion elasticdl_client/common/k8s_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
ELASTICDL_JOB_KEY = "elasticdl-job-name"
ELASTICDL_REPLICA_TYPE_KEY = "elasticdl-replica-type"
ELASTICDL_REPLICA_INDEX_KEY = "elasticdl-replica-index"
_FTLIB_GOSSIP_CONTAINER_PORT = 7946


def get_master_pod_name(job_name):
Expand Down

0 comments on commit 9c39637

Please sign in to comment.