Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ray pipeline #1

Open
wants to merge 70 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
2069a7b
CHAIN PIPELINE
alindkhare Oct 8, 2019
c72517a
change location
Oct 8, 2019
02d63b4
removed errors
alindkhare Oct 8, 2019
1da01b7
included api calls in init
alindkhare Oct 8, 2019
ef36962
removed errors
alindkhare Oct 8, 2019
38dd377
removed errors
alindkhare Oct 8, 2019
0fb33bf
internal kv fix
alindkhare Oct 8, 2019
6a0ba87
bug fix
alindkhare Oct 8, 2019
874afc6
bug fix
alindkhare Oct 8, 2019
f4a5752
temp
alindkhare Oct 8, 2019
462d002
temp
alindkhare Oct 8, 2019
22f6be5
temp
alindkhare Oct 8, 2019
2cffaef
check
alindkhare Oct 8, 2019
1aa38a5
check
alindkhare Oct 8, 2019
5ee82ed
check
alindkhare Oct 8, 2019
f83492b
check
alindkhare Oct 8, 2019
514e7b4
check
alindkhare Oct 8, 2019
8056e39
check
alindkhare Oct 8, 2019
458f52a
final
alindkhare Oct 8, 2019
c63c364
enabling DAG pipeline scheduling
alindkhare Oct 9, 2019
38cf462
bug fix
alindkhare Oct 9, 2019
878972e
bug fix
alindkhare Oct 9, 2019
0d2e6e0
bug fix
alindkhare Oct 9, 2019
9e03872
Adding examples
alindkhare Oct 9, 2019
00fb3b9
Complex pipeline
alindkhare Oct 9, 2019
a872618
pipeline examples
alindkhare Oct 9, 2019
f272304
error handling
alindkhare Oct 9, 2019
73724d9
POST request functionality
alindkhare Oct 9, 2019
c90fd5e
POST request functionality attempt 2
alindkhare Oct 9, 2019
2e25544
POST request functionality attempt 3
alindkhare Oct 9, 2019
c02fde5
POST request functionality attempt 4
alindkhare Oct 9, 2019
09f3303
POST request functionality attempt 5
alindkhare Oct 9, 2019
ecb28d8
POST request functionality attempt 6
alindkhare Oct 9, 2019
75d3690
POST request functionality attempt 7
alindkhare Oct 9, 2019
fc2c670
POST request functionality attempt 8
alindkhare Oct 9, 2019
ca09f7e
POST request functionality attempt 9
alindkhare Oct 9, 2019
47e7311
POST Image Query and Resnet50 Example
alindkhare Oct 10, 2019
5bb7685
RESNET50 EXAMPLE
alindkhare Oct 10, 2019
07f2013
RESNET50 EXAMPLE
alindkhare Oct 10, 2019
f451823
setting device issue
alindkhare Oct 10, 2019
992e012
Increase ray init memory because of resnet
alindkhare Oct 10, 2019
c762712
bug fix
alindkhare Oct 10, 2019
61cbddf
check
alindkhare Oct 10, 2019
f162cde
check
alindkhare Oct 10, 2019
5f3526c
r50 check
alindkhare Oct 10, 2019
99a2641
r50 check
alindkhare Oct 10, 2019
d463275
r50 check
alindkhare Oct 10, 2019
2022596
r50 attempt 1
alindkhare Oct 10, 2019
974ec63
r50 attempt 2
alindkhare Oct 10, 2019
b99993d
r50 attempt 2
alindkhare Oct 10, 2019
7a34d45
r50 attempt 3
alindkhare Oct 10, 2019
94d2225
r50 attempt 4
alindkhare Oct 10, 2019
808871d
r50 attempt 5
alindkhare Oct 10, 2019
15d2400
Resnet50 post query with pipeline RAY serve
alindkhare Oct 10, 2019
d375ed7
Ordered Inputs to models + Grouped Dag execution
alindkhare Oct 11, 2019
6f5be43
Changing examples for showing pipeline anonymity to models
alindkhare Oct 11, 2019
e47cc1b
bug fix
alindkhare Oct 11, 2019
c126241
num gpu param addition
alindkhare Oct 11, 2019
b2f05bb
attempt 1 fix
alindkhare Oct 11, 2019
2615b13
attempt 1 fix
alindkhare Oct 11, 2019
34c76af
attempt 1 fix
alindkhare Oct 11, 2019
9f96d01
num returns fix
alindkhare Oct 11, 2019
41320e9
Disabling pipeline awareness to models + Ordered Inputs to models
alindkhare Oct 11, 2019
2ff6a1c
Resnet50 POST Query
alindkhare Oct 11, 2019
38e4462
Resnet50 POST Query
alindkhare Oct 11, 2019
0a8bac7
Addition of edges order preserved example
alindkhare Oct 11, 2019
3849453
Handle + Add node + expose dependency
alindkhare Oct 12, 2019
ca12757
Bug fix
alindkhare Oct 12, 2019
0d2805e
More example added
alindkhare Oct 12, 2019
9ad5125
Soft Real Time ML
alindkhare Oct 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/ray/experimental/internal_kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ def _internal_kv_get(key):
return worker.redis_client.hget(key, "value")


def _internal_kv_exists(key):
worker = ray.worker.get_global_worker()
if worker.mode == ray.worker.LOCAL_MODE:
return key in _local
return worker.redis_client.hexists(key,"value")

def _internal_kv_put(key, value, overwrite=False):
"""Globally associates a value with a given binary key.

Expand Down
10 changes: 5 additions & 5 deletions python/ray/experimental/serve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
if sys.version_info < (3, 0):
raise ImportError("serve is Python 3 only.")

from ray.experimental.serve.api import (init, create_backend, create_endpoint,
link, split, rollback, get_handle,
global_state) # noqa: E402
from ray.experimental.serve.api import (init,provision_pipeline, create_backend, create_endpoint_pipeline,create_no_http_service,
add_service_dependencies,link_service, get_handle,
global_state,get_service_dependencies,add_service) # noqa: E402

__all__ = [
"init", "create_backend", "create_endpoint", "link", "split", "rollback",
"get_handle", "global_state"
"init","provision_pipeline", "create_backend", "create_endpoint_pipeline","create_no_http_service","add_service_dependencies", "link_service",
"get_handle" ,"global_state","get_service_dependencies","add_service"
]
162 changes: 102 additions & 60 deletions python/ray/experimental/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def init(blocking=False, object_store_memory=int(1e8)):
global_state.wait_until_http_ready()


def create_endpoint(endpoint_name, route_expression, blocking=True):
def create_endpoint_pipeline(pipeline_name, route_expression, blocking=True):
"""Create a service endpoint given route_expression.

Args:
Expand All @@ -48,14 +48,18 @@ def create_endpoint(endpoint_name, route_expression, blocking=True):
blocking (bool): If true, the function will wait for service to be
registered before returning
"""
assert pipeline_name in global_state.provisioned_services

future = global_state.kv_store_actor_handle.register_service.remote(
route_expression, endpoint_name)
route_expression, pipeline_name)
if blocking:
ray.get(future)
global_state.registered_endpoints.add(endpoint_name)
global_state.registered_endpoints.add(pipeline_name)

def create_no_http_service(service_name):
global_state.registered_services.add(service_name)

def create_backend(func_or_class, backend_tag, *actor_init_args):
def create_backend(func_or_class, backend_tag, num_gpu,*actor_init_args):
"""Create a backend using func_or_class and assign backend_tag.

Args:
Expand All @@ -71,7 +75,7 @@ def create_backend(func_or_class, backend_tag, *actor_init_args):
elif inspect.isclass(func_or_class):
# Python inheritance order is right-to-left. We put RayServeMixin
# on the left to make sure its methods are not overriden.
@ray.remote
@ray.remote(num_gpus=num_gpu)
class CustomActor(RayServeMixin, func_or_class):
pass

Expand All @@ -89,8 +93,45 @@ class CustomActor(RayServeMixin, func_or_class):

global_state.registered_backends.add(backend_tag)

# def add_service_to_pipeline(pipeline_name,service_name,blocking=True):
# assert service_name in global_state.registered_services
# # assert pipeline_name in global_state.registered_endpoints

# future = global_state.kv_store_actor_handle_pipeline.add_node.remote(pipeline_name,service_name)
# if blocking:
# ray.get(future)

def add_service_dependencies(pipeline_name,service_name_1,service_name_2,blocking=True):
assert service_name_1 in global_state.registered_services
assert service_name_2 in global_state.registered_services
assert pipeline_name not in global_state.provisioned_services

future = global_state.kv_store_actor_handle_pipeline.add_edge.remote(pipeline_name,service_name_1,service_name_2)
if blocking:
ray.get(future)

def provision_pipeline(pipeline_name,blocking=True) :
assert pipeline_name not in global_state.provisioned_services
future = global_state.kv_store_actor_handle_pipeline.provision.remote(pipeline_name)
if blocking :
ray.get(future)

global_state.provisioned_services.add(pipeline_name)


def link(endpoint_name, backend_tag):
def get_service_dependencies(pipeline_name):
assert pipeline_name in global_state.provisioned_services
future = global_state.kv_store_actor_handle_pipeline.get_dependency.remote(pipeline_name)
return ray.get(future)
def add_service(pipeline_name,service_name,blocking = True):
assert pipeline_name not in global_state.provisioned_services
assert service_name in global_state.registered_services
future = global_state.kv_store_actor_handle_pipeline.add_node.remote(pipeline_name,service_name)
if blocking :
ray.get(future)


def link_service(service_name, backend_tag):
"""Associate a service endpoint with backend tag.

Example:
Expand All @@ -102,75 +143,76 @@ def link(endpoint_name, backend_tag):

>>> serve.split("service-name", {"backend:v1": 1.0})
"""
assert endpoint_name in global_state.registered_endpoints
assert service_name in global_state.registered_services
assert backend_tag in global_state.registered_backends

global_state.router_actor_handle.link.remote(endpoint_name, backend_tag)
global_state.policy_action_history[endpoint_name].append({backend_tag: 1})
global_state.router_actor_handle.link.remote(service_name, backend_tag)
global_state.policy_action_history[service_name].append({backend_tag: 1})


def split(endpoint_name, traffic_policy_dictionary):
"""Associate a service endpoint with traffic policy.
# def split(endpoint_name, traffic_policy_dictionary):
# """Associate a service endpoint with traffic policy.

Example:
# Example:

>>> serve.split("service-name", {
"backend:v1": 0.5,
"backend:v2": 0.5
})
# >>> serve.split("service-name", {
# "backend:v1": 0.5,
# "backend:v2": 0.5
# })

Args:
endpoint_name (str): A registered service endpoint.
traffic_policy_dictionary (dict): a dictionary maps backend names
to their traffic weights. The weights must sum to 1.
"""
# Args:
# endpoint_name (str): A registered service endpoint.
# traffic_policy_dictionary (dict): a dictionary maps backend names
# to their traffic weights. The weights must sum to 1.
# """

# Perform dictionary checks
assert endpoint_name in global_state.registered_endpoints
# # Perform dictionary checks
# assert endpoint_name in global_state.registered_endpoints

assert isinstance(traffic_policy_dictionary,
dict), "Traffic policy must be dictionary"
prob = 0
for backend, weight in traffic_policy_dictionary.items():
prob += weight
assert (backend in global_state.registered_backends
), "backend {} is not registered".format(backend)
assert np.isclose(
prob, 1,
atol=0.02), "weights must sum to 1, currently it sums to {}".format(
prob)
# assert isinstance(traffic_policy_dictionary,
# dict), "Traffic policy must be dictionary"
# prob = 0
# for backend, weight in traffic_policy_dictionary.items():
# prob += weight
# assert (backend in global_state.registered_backends
# ), "backend {} is not registered".format(backend)
# assert np.isclose(
# prob, 1,
# atol=0.02), "weights must sum to 1, currently it sums to {}".format(
# prob)

global_state.router_actor_handle.set_traffic.remote(
endpoint_name, traffic_policy_dictionary)
global_state.policy_action_history[endpoint_name].append(
traffic_policy_dictionary)
# global_state.router_actor_handle.set_traffic.remote(
# endpoint_name, traffic_policy_dictionary)
# global_state.policy_action_history[endpoint_name].append(
# traffic_policy_dictionary)


def rollback(endpoint_name):
"""Rollback a traffic policy decision.
# def rollback(endpoint_name):
# """Rollback a traffic policy decision.

Args:
endpoint_name (str): A registered service endpoint.
"""
assert endpoint_name in global_state.registered_endpoints
action_queues = global_state.policy_action_history[endpoint_name]
cur_policy, prev_policy = action_queues[-1], action_queues[-2]
# Args:
# endpoint_name (str): A registered service endpoint.
# """
# assert endpoint_name in global_state.registered_endpoints
# action_queues = global_state.policy_action_history[endpoint_name]
# cur_policy, prev_policy = action_queues[-1], action_queues[-2]

logger.warning("""
Current traffic policy is:
{cur_policy}
# logger.warning("""
# Current traffic policy is:
# {cur_policy}

Will rollback to:
{prev_policy}
""".format(
cur_policy=pformat_color_json(cur_policy),
prev_policy=pformat_color_json(prev_policy)))
# Will rollback to:
# {prev_policy}
# """.format(
# cur_policy=pformat_color_json(cur_policy),
# prev_policy=pformat_color_json(prev_policy)))

action_queues.pop()
global_state.router_actor_handle.set_traffic.remote(
endpoint_name, prev_policy)
# action_queues.pop()
# global_state.router_actor_handle.set_traffic.remote(
# endpoint_name, prev_policy)


def get_handle(endpoint_name):
def get_handle(pipeline_name):
"""Retrieve RayServeHandle for service endpoint to invoke it from Python.

Args:
Expand All @@ -179,9 +221,9 @@ def get_handle(endpoint_name):
Returns:
RayServeHandle
"""
assert endpoint_name in global_state.registered_endpoints
assert pipeline_name in global_state.provisioned_services

# Delay import due to it's dependency on global_state
from ray.experimental.serve.handle import RayServeHandle

return RayServeHandle(global_state.router_actor_handle, endpoint_name)
return RayServeHandle(global_state.kv_store_actor_handle_pipeline,global_state.router_actor_handle, pipeline_name)
2 changes: 1 addition & 1 deletion python/ray/experimental/serve/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#: The interval which http server refreshes its routing table
HTTP_ROUTER_CHECKER_INTERVAL_S = 2
HTTP_ROUTER_CHECKER_INTERVAL_S = 1
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
"""
Example service that prints out http context.
"""

import time

import requests
Expand All @@ -10,19 +6,25 @@
from ray.experimental.serve.utils import pformat_color_json


def echo(context):
def echo1(context):
context["query_string"] += 'FROM MODEL1 -> '
return context
def echo2(context):
context += 'FROM MODEL2 -> '
return context


serve.init(blocking=True)

serve.create_endpoint("my_endpoint", "/echo", blocking=True)
serve.create_endpoint_pipeline("pipeline1", "/echo", blocking=True)

serve.create_backend(echo, "echo:v1")
serve.link("my_endpoint", "echo:v1")
serve.create_no_http_service("serve1")
serve.link_service("serve1", "echo:v1")
serve.add_service_to_pipeline("pipeline1","serve1")

while True:
resp = requests.get("http://127.0.0.1:8000/echo").json()
print(pformat_color_json(resp))

print("...Sleeping for 2 seconds...")
time.sleep(2)
time.sleep(2)
41 changes: 0 additions & 41 deletions python/ray/experimental/serve/examples/echo_actor.py

This file was deleted.

Loading