Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

David/noasync arbitrary objectref #14

Open
wants to merge 2 commits into
base: arbitrary-objectref
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions python/ray/experimental/noasync_serve/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"time"
)

func unixMilli(t time.Time) float64 {
return float64(t.Round(time.Millisecond).UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)))
}

func MakeRequest(url string, values []byte, ch chan<- string,
deadline_ms float64) {
// values := map[string]byte{"data": username}

start := time.Now()
current_time := unixMilli(start)
current_time += deadline_ms
current_time_str := strconv.FormatFloat(current_time, 'E', -1, 64)
// values["absolute_slo_ms"] = current_time_str
// jsonValue, _ := json.Marshal(values)
resp, _ := http.Post(url, "application/json", bytes.NewBuffer(values))
secs := time.Since(start).Seconds()
body, _ := ioutil.ReadAll(resp.Body)
ch <- fmt.Sprintf("%.2f elapsed with response length: %s %s deadline_ms %s", secs, body, url, current_time_str)
}
func main() {
ch := make(chan string)
deadline_ms, err := strconv.ParseFloat(os.Args[1], 64)
img_path := os.Args[2]
arrival_curve := os.Args[3:]

imgFile, err := os.Open(img_path)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer imgFile.Close()
fInfo, _ := imgFile.Stat()
var size int64 = fInfo.Size()
buf := make([]byte, size)
fReader := bufio.NewReader(imgFile)
fReader.Read(buf)
// imgBase64Str := base64.StdEncoding.EncodeToString(buf)

// values := map[string]string{"image": imgBase64Str}
time.Sleep(10 * time.Millisecond)
start := time.Now()
for i := 0; i < len(arrival_curve); i++ {
// time.Sleep(12195 * time.Microsecond)
// time.Sleep(10 * time.Millisecond)
time_ms, err_time := strconv.ParseFloat(arrival_curve[i], 64)
if err_time != nil {
fmt.Println(err)
os.Exit(1)
}
time.Sleep(time.Duration(time_ms) * time.Millisecond)
// values := map[string]string{"data": imgBase64Str}
go MakeRequest("http://127.0.0.1:8000/resnet50", buf, ch, deadline_ms)
}
for i := 0; i < len(arrival_curve); i++ {
<-ch
}
fmt.Printf("%.2fs elapsed\n", time.Since(start).Seconds())
}
141 changes: 141 additions & 0 deletions python/ray/experimental/noasync_serve/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import click
import time
import torchvision.transforms as transforms

import ray
from utils import get_data
from queues import CentralizedQueuesActor
from worker import ImagePreprocWorker

@click.command()
@click.option("--num-replicas", type=int, default=1)
@click.option("--open-mean-qps", type=int, default=100)
def driver(num_replicas, open_mean_qps):
print(f"[config] # Replicas: {num_replicas}")
ray.init(
_system_config={
"enable_timeline": False,
"record_ref_creation_sites": False,
}
)

NUM_REPLICAS = num_replicas
NUM_REQUESTS = 100
worker_name = "ImagePreproc"
router_handle = CentralizedQueuesActor.remote()
ray.get(router_handle.link.remote(worker_name, worker_name))
#ray.get(router_handle.register_self_handle.remote(router_handle))

# start workers with given model and transform
min_img_size = 224
transform = transforms.Compose(
[
transforms.Resize(min_img_size),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.224],
),
]
)
model_name = "resnet50"
is_cuda = True

for _ in range(NUM_REPLICAS):
worker_handle = ImagePreprocWorker.options(num_gpus=1).remote(
worker_name, router_handle, transform, model_name, is_cuda
)
ray.get(worker_handle.set_my_handle.remote(worker_handle))
worker_handle.start.remote()

# image to pass through pipeline
img = open("elephant.jpg", "rb").read()

mean_qps = 0.0
AVG_CALC = 1
mean_closed_loop = 0.0
CLOSED_LOOP_LATENCY_ITER = 500
lstart = time.perf_counter()
for _ in range(AVG_CALC):
# throughput measurement
WARMUP, NUM_REQUESTS = 200, 2000
future = [
router_handle.enqueue_request.remote(worker_name, img)
for _ in range(WARMUP)
]
ray.wait(future, num_returns=WARMUP)
ray.wait([ray.get(oid) for oid in future], num_returns=WARMUP)
del future

futures = [
router_handle.enqueue_request.remote(worker_name, img)
for _ in range(NUM_REQUESTS)
]
start_time = time.perf_counter()
get_data(futures)
end_time = time.perf_counter()
duration = end_time - start_time
qps = NUM_REQUESTS / duration
mean_qps += qps

sum_closed_loop = 0.0
for _ in range(CLOSED_LOOP_LATENCY_ITER):
start = time.perf_counter()
ray.get(ray.get(router_handle.enqueue_request.remote(worker_name, img)))
end = time.perf_counter()
sum_closed_loop += end - start
mean_closed_loop += sum_closed_loop / CLOSED_LOOP_LATENCY_ITER
del futures

lend = time.perf_counter()
final_qps = mean_qps / AVG_CALC
mean_closed_loop = mean_closed_loop / AVG_CALC
print(f"Image Preprocessing Pipeline")
print(
f"Throughput QPS: {final_qps} ImageClassification Replicas: {NUM_REPLICAS} "
f"Mean Closed Loop Latency: {mean_closed_loop} "
)
print(ray.get(ray.get(router_handle.enqueue_request.remote(worker_name, img))))
print("took {}s".format(lend - lstart))

'''# open loop benchmarking
import subprocess
from server import HTTPProxyActor
import requests
from utils import get_latency_stats, generate_fixed_arrival_process

# Open Loop
cv = 0
arrival_curve = generate_fixed_arrival_process(
mean_qps=open_mean_qps, cv=cv, num_requests=2000,
).tolist()

http_actor = HTTPProxyActor.remote(host="127.0.0.1", port=8000)
ray.get(
http_actor.register_route.remote("/resnet50", router_handle, worker_name)
)
ray.get(http_actor.init_latency.remote())
ls_output = subprocess.Popen(
[
"go",
"run",
"client.go",
"60.0",
"elephant.jpg",
*[str(val) for val in arrival_curve],
]
)
ls_output.communicate()
latency_list = ray.get(http_actor.get_latency.remote())
ingest_mu, latency_ms, p95_ms, p99_ms = get_latency_stats(
collected_latency=latency_list
)
print(f"p95(ms): {p95_ms} p99(ms): {p99_ms}")

with open('./latency/ol-latency-{}-1-{}-{}'.format(open_mean_qps, num_replicas, cv), 'w') as f:
for item in latency_ms:
f.write("{}\n".format(str(item)))'''

ray.shutdown()

if __name__ == "__main__":
driver()
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
164 changes: 164 additions & 0 deletions python/ray/experimental/noasync_serve/queues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import click
import time
from blist import sortedlist
from collections import defaultdict, deque

import ray
from ray import ObjectRef

class Query:
def __init__(self, request_body,slo, result_object_id=None):
self.request_body = request_body
self.slo = slo
if result_object_id is None:
self.result_object_id = ObjectRef.owned_plasma_objectref()
else:
self.result_object_id = result_object_id

def __lt__(self, other):
return self.slo > other.slo


class WorkIntent:
def __init__(self, work_object_id=None):
if work_object_id is None:
self.work_object_id = ObjectRef.owned_plasma_objectref()
else:
self.work_object_id = work_object_id


class CentralizedQueues:
"""A router that routes request to available workers.

Router aceepts each request from the `enqueue_request` method and enqueues
it. It also accepts worker request to work (called work_intention in code)
from workers via the `dequeue_request` method. The traffic policy is used
to match requests with their corresponding workers.

Behavior:
>>> # psuedo-code
>>> queue = CentralizedQueues()
>>> queue.enqueue_request('service-name', data)
# nothing happens, request is queued.
# returns result ObjectID, which will contains the final result
>>> queue.dequeue_request('backend-1')
# nothing happens, work intention is queued.
# return work ObjectID, which will contains the future request payload
>>> queue.link('service-name', 'backend-1')
# here the enqueue_requester is matched with worker, request
# data is put into work ObjectID, and the worker processes the request
# and store the result into result ObjectID

Traffic policy splits the traffic among different workers
probabilistically:

1. When all backends are ready to receive traffic, we will randomly
choose a backend based on the weights assigned by the traffic policy
dictionary.

2. When more than 1 but not all backends are ready, we will normalize the
weights of the ready backends to 1 and choose a backend via sampling.

3. When there is only 1 backend ready, we will only use that backend.
"""

def __init__(self):
# service_name -> request queue
self.queues = defaultdict(sortedlist)
# service_name -> max. batch size
self.service_max_batch_size = {}
# service_name -> traffic_policy
self.traffic = defaultdict(dict)
# backend_name -> worker queue
self.worker_handles = defaultdict(deque)

def enqueue_request(self, service, request_data, slo=float(1e10)):
query = Query(request_data, slo)
self.queues[service].add(query)
self.flush()
return query.result_object_id

def set_max_batch(self, service, max_batch):
self.service_max_batch_size[service] = max_batch

def dequeue_request(self, backend, worker_handle):
#intention = WorkIntent()
#TODO: wrap all OIDs passed in lists
self.worker_handles[backend].append(worker_handle)
self.flush()
#return intention.work_object_id

def link(self, service, backend):
#logger.debug("Link %s with %s", service, backend)
self.traffic[service][backend] = 1.0
self.flush()

def set_traffic(self, service, traffic_dict):
#logger.debug("Setting traffic for service %s to %s", service, traffic_dict)
self.traffic[service] = traffic_dict
self.flush()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flush should happen asynchronously. Which means you need to do something like self._router_actor_handle.flush.remote()

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can register's actor's handle inside it's own actor.


def flush(self):
"""In the default case, flush calls ._flush.

When this class is a Ray actor, .flush can be scheduled as a remote
method invocation.
"""
self._flush_single()

def _get_available_backends(self, service):
backends_in_policy = set(self.traffic[service].keys())
available_workers = {
backend
for backend, queues in self.worker_handles.items() if len(queues) > 0
}
return list(backends_in_policy.intersection(available_workers))

def _flush_batch(self):
for service, queue in self.queues.items():
ready_backends = self._get_available_backends(service)
# logger.info("Service %s having queue lengths %s ready backends %s", service,len(queue),len(ready_backends))
while len(queue) and len(ready_backends):
#batch_size = self.service_max_batch_size[service]
batch_size = 5
for backend in ready_backends:
if len(queue) == 0:
break
worker_handle = self.worker_handles[backend].popleft()
pop_len = min(batch_size,len(queue))
request = [ queue.pop() for i in range(pop_len)]
#print("putting request into work OID")
worker_handle.__call__.remote(request)
#print("called worker execute")

ready_backends = self._get_available_backends(service)

def _flush_single(self):
for service, queue in self.queues.items():
ready_backends = self._get_available_backends(service)
# logger.info("Service %s having queue lengths %s ready backends %s", service,len(queue),len(ready_backends))
while len(queue) and len(ready_backends):
for backend in ready_backends:
if len(queue) == 0:
break
worker_handle = self.worker_handles[backend].popleft()
request = [queue.pop()]
#print("put request in work OID")
worker_handle.__call__.remote(request)
#print("called worker execute")

ready_backends = self._get_available_backends(service)


@ray.remote
class CentralizedQueuesActor(CentralizedQueues):
self_handle = None

def register_self_handle(self, handle_to_this_actor):
self.self_handle = handle_to_this_actor
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what I am talking about


def flush(self):
if self.self_handle:
self.self_handle._flush_single.remote()
else:
self._flush_single()
Loading