forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
David/noasync arbitrary objectref #14
Open
davidmkwon
wants to merge
2
commits into
alindkhare:arbitrary-objectref
Choose a base branch
from
davidmkwon:david/noasync-arbitrary-objectref
base: arbitrary-objectref
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package main | ||
|
||
import ( | ||
"bufio" | ||
"bytes" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"os" | ||
"strconv" | ||
"time" | ||
) | ||
|
||
func unixMilli(t time.Time) float64 { | ||
return float64(t.Round(time.Millisecond).UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))) | ||
} | ||
|
||
func MakeRequest(url string, values []byte, ch chan<- string, | ||
deadline_ms float64) { | ||
// values := map[string]byte{"data": username} | ||
|
||
start := time.Now() | ||
current_time := unixMilli(start) | ||
current_time += deadline_ms | ||
current_time_str := strconv.FormatFloat(current_time, 'E', -1, 64) | ||
// values["absolute_slo_ms"] = current_time_str | ||
// jsonValue, _ := json.Marshal(values) | ||
resp, _ := http.Post(url, "application/json", bytes.NewBuffer(values)) | ||
secs := time.Since(start).Seconds() | ||
body, _ := ioutil.ReadAll(resp.Body) | ||
ch <- fmt.Sprintf("%.2f elapsed with response length: %s %s deadline_ms %s", secs, body, url, current_time_str) | ||
} | ||
func main() { | ||
ch := make(chan string) | ||
deadline_ms, err := strconv.ParseFloat(os.Args[1], 64) | ||
img_path := os.Args[2] | ||
arrival_curve := os.Args[3:] | ||
|
||
imgFile, err := os.Open(img_path) | ||
if err != nil { | ||
fmt.Println(err) | ||
os.Exit(1) | ||
} | ||
defer imgFile.Close() | ||
fInfo, _ := imgFile.Stat() | ||
var size int64 = fInfo.Size() | ||
buf := make([]byte, size) | ||
fReader := bufio.NewReader(imgFile) | ||
fReader.Read(buf) | ||
// imgBase64Str := base64.StdEncoding.EncodeToString(buf) | ||
|
||
// values := map[string]string{"image": imgBase64Str} | ||
time.Sleep(10 * time.Millisecond) | ||
start := time.Now() | ||
for i := 0; i < len(arrival_curve); i++ { | ||
// time.Sleep(12195 * time.Microsecond) | ||
// time.Sleep(10 * time.Millisecond) | ||
time_ms, err_time := strconv.ParseFloat(arrival_curve[i], 64) | ||
if err_time != nil { | ||
fmt.Println(err) | ||
os.Exit(1) | ||
} | ||
time.Sleep(time.Duration(time_ms) * time.Millisecond) | ||
// values := map[string]string{"data": imgBase64Str} | ||
go MakeRequest("http://127.0.0.1:8000/resnet50", buf, ch, deadline_ms) | ||
} | ||
for i := 0; i < len(arrival_curve); i++ { | ||
<-ch | ||
} | ||
fmt.Printf("%.2fs elapsed\n", time.Since(start).Seconds()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
import click | ||
import time | ||
import torchvision.transforms as transforms | ||
|
||
import ray | ||
from utils import get_data | ||
from queues import CentralizedQueuesActor | ||
from worker import ImagePreprocWorker | ||
|
||
@click.command() | ||
@click.option("--num-replicas", type=int, default=1) | ||
@click.option("--open-mean-qps", type=int, default=100) | ||
def driver(num_replicas, open_mean_qps): | ||
print(f"[config] # Replicas: {num_replicas}") | ||
ray.init( | ||
_system_config={ | ||
"enable_timeline": False, | ||
"record_ref_creation_sites": False, | ||
} | ||
) | ||
|
||
NUM_REPLICAS = num_replicas | ||
NUM_REQUESTS = 100 | ||
worker_name = "ImagePreproc" | ||
router_handle = CentralizedQueuesActor.remote() | ||
ray.get(router_handle.link.remote(worker_name, worker_name)) | ||
#ray.get(router_handle.register_self_handle.remote(router_handle)) | ||
|
||
# start workers with given model and transform | ||
min_img_size = 224 | ||
transform = transforms.Compose( | ||
[ | ||
transforms.Resize(min_img_size), | ||
transforms.ToTensor(), | ||
transforms.Normalize( | ||
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.224], | ||
), | ||
] | ||
) | ||
model_name = "resnet50" | ||
is_cuda = True | ||
|
||
for _ in range(NUM_REPLICAS): | ||
worker_handle = ImagePreprocWorker.options(num_gpus=1).remote( | ||
worker_name, router_handle, transform, model_name, is_cuda | ||
) | ||
ray.get(worker_handle.set_my_handle.remote(worker_handle)) | ||
worker_handle.start.remote() | ||
|
||
# image to pass through pipeline | ||
img = open("elephant.jpg", "rb").read() | ||
|
||
mean_qps = 0.0 | ||
AVG_CALC = 1 | ||
mean_closed_loop = 0.0 | ||
CLOSED_LOOP_LATENCY_ITER = 500 | ||
lstart = time.perf_counter() | ||
for _ in range(AVG_CALC): | ||
# throughput measurement | ||
WARMUP, NUM_REQUESTS = 200, 2000 | ||
future = [ | ||
router_handle.enqueue_request.remote(worker_name, img) | ||
for _ in range(WARMUP) | ||
] | ||
ray.wait(future, num_returns=WARMUP) | ||
ray.wait([ray.get(oid) for oid in future], num_returns=WARMUP) | ||
del future | ||
|
||
futures = [ | ||
router_handle.enqueue_request.remote(worker_name, img) | ||
for _ in range(NUM_REQUESTS) | ||
] | ||
start_time = time.perf_counter() | ||
get_data(futures) | ||
end_time = time.perf_counter() | ||
duration = end_time - start_time | ||
qps = NUM_REQUESTS / duration | ||
mean_qps += qps | ||
|
||
sum_closed_loop = 0.0 | ||
for _ in range(CLOSED_LOOP_LATENCY_ITER): | ||
start = time.perf_counter() | ||
ray.get(ray.get(router_handle.enqueue_request.remote(worker_name, img))) | ||
end = time.perf_counter() | ||
sum_closed_loop += end - start | ||
mean_closed_loop += sum_closed_loop / CLOSED_LOOP_LATENCY_ITER | ||
del futures | ||
|
||
lend = time.perf_counter() | ||
final_qps = mean_qps / AVG_CALC | ||
mean_closed_loop = mean_closed_loop / AVG_CALC | ||
print(f"Image Preprocessing Pipeline") | ||
print( | ||
f"Throughput QPS: {final_qps} ImageClassification Replicas: {NUM_REPLICAS} " | ||
f"Mean Closed Loop Latency: {mean_closed_loop} " | ||
) | ||
print(ray.get(ray.get(router_handle.enqueue_request.remote(worker_name, img)))) | ||
print("took {}s".format(lend - lstart)) | ||
|
||
'''# open loop benchmarking | ||
import subprocess | ||
from server import HTTPProxyActor | ||
import requests | ||
from utils import get_latency_stats, generate_fixed_arrival_process | ||
|
||
# Open Loop | ||
cv = 0 | ||
arrival_curve = generate_fixed_arrival_process( | ||
mean_qps=open_mean_qps, cv=cv, num_requests=2000, | ||
).tolist() | ||
|
||
http_actor = HTTPProxyActor.remote(host="127.0.0.1", port=8000) | ||
ray.get( | ||
http_actor.register_route.remote("/resnet50", router_handle, worker_name) | ||
) | ||
ray.get(http_actor.init_latency.remote()) | ||
ls_output = subprocess.Popen( | ||
[ | ||
"go", | ||
"run", | ||
"client.go", | ||
"60.0", | ||
"elephant.jpg", | ||
*[str(val) for val in arrival_curve], | ||
] | ||
) | ||
ls_output.communicate() | ||
latency_list = ray.get(http_actor.get_latency.remote()) | ||
ingest_mu, latency_ms, p95_ms, p99_ms = get_latency_stats( | ||
collected_latency=latency_list | ||
) | ||
print(f"p95(ms): {p95_ms} p99(ms): {p99_ms}") | ||
|
||
with open('./latency/ol-latency-{}-1-{}-{}'.format(open_mean_qps, num_replicas, cv), 'w') as f: | ||
for item in latency_ms: | ||
f.write("{}\n".format(str(item)))''' | ||
|
||
ray.shutdown() | ||
|
||
if __name__ == "__main__": | ||
driver() |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import click | ||
import time | ||
from blist import sortedlist | ||
from collections import defaultdict, deque | ||
|
||
import ray | ||
from ray import ObjectRef | ||
|
||
class Query: | ||
def __init__(self, request_body,slo, result_object_id=None): | ||
self.request_body = request_body | ||
self.slo = slo | ||
if result_object_id is None: | ||
self.result_object_id = ObjectRef.owned_plasma_objectref() | ||
else: | ||
self.result_object_id = result_object_id | ||
|
||
def __lt__(self, other): | ||
return self.slo > other.slo | ||
|
||
|
||
class WorkIntent: | ||
def __init__(self, work_object_id=None): | ||
if work_object_id is None: | ||
self.work_object_id = ObjectRef.owned_plasma_objectref() | ||
else: | ||
self.work_object_id = work_object_id | ||
|
||
|
||
class CentralizedQueues: | ||
"""A router that routes request to available workers. | ||
|
||
Router aceepts each request from the `enqueue_request` method and enqueues | ||
it. It also accepts worker request to work (called work_intention in code) | ||
from workers via the `dequeue_request` method. The traffic policy is used | ||
to match requests with their corresponding workers. | ||
|
||
Behavior: | ||
>>> # psuedo-code | ||
>>> queue = CentralizedQueues() | ||
>>> queue.enqueue_request('service-name', data) | ||
# nothing happens, request is queued. | ||
# returns result ObjectID, which will contains the final result | ||
>>> queue.dequeue_request('backend-1') | ||
# nothing happens, work intention is queued. | ||
# return work ObjectID, which will contains the future request payload | ||
>>> queue.link('service-name', 'backend-1') | ||
# here the enqueue_requester is matched with worker, request | ||
# data is put into work ObjectID, and the worker processes the request | ||
# and store the result into result ObjectID | ||
|
||
Traffic policy splits the traffic among different workers | ||
probabilistically: | ||
|
||
1. When all backends are ready to receive traffic, we will randomly | ||
choose a backend based on the weights assigned by the traffic policy | ||
dictionary. | ||
|
||
2. When more than 1 but not all backends are ready, we will normalize the | ||
weights of the ready backends to 1 and choose a backend via sampling. | ||
|
||
3. When there is only 1 backend ready, we will only use that backend. | ||
""" | ||
|
||
def __init__(self): | ||
# service_name -> request queue | ||
self.queues = defaultdict(sortedlist) | ||
# service_name -> max. batch size | ||
self.service_max_batch_size = {} | ||
# service_name -> traffic_policy | ||
self.traffic = defaultdict(dict) | ||
# backend_name -> worker queue | ||
self.worker_handles = defaultdict(deque) | ||
|
||
def enqueue_request(self, service, request_data, slo=float(1e10)): | ||
query = Query(request_data, slo) | ||
self.queues[service].add(query) | ||
self.flush() | ||
return query.result_object_id | ||
|
||
def set_max_batch(self, service, max_batch): | ||
self.service_max_batch_size[service] = max_batch | ||
|
||
def dequeue_request(self, backend, worker_handle): | ||
#intention = WorkIntent() | ||
#TODO: wrap all OIDs passed in lists | ||
self.worker_handles[backend].append(worker_handle) | ||
self.flush() | ||
#return intention.work_object_id | ||
|
||
def link(self, service, backend): | ||
#logger.debug("Link %s with %s", service, backend) | ||
self.traffic[service][backend] = 1.0 | ||
self.flush() | ||
|
||
def set_traffic(self, service, traffic_dict): | ||
#logger.debug("Setting traffic for service %s to %s", service, traffic_dict) | ||
self.traffic[service] = traffic_dict | ||
self.flush() | ||
|
||
def flush(self): | ||
"""In the default case, flush calls ._flush. | ||
|
||
When this class is a Ray actor, .flush can be scheduled as a remote | ||
method invocation. | ||
""" | ||
self._flush_single() | ||
|
||
def _get_available_backends(self, service): | ||
backends_in_policy = set(self.traffic[service].keys()) | ||
available_workers = { | ||
backend | ||
for backend, queues in self.worker_handles.items() if len(queues) > 0 | ||
} | ||
return list(backends_in_policy.intersection(available_workers)) | ||
|
||
def _flush_batch(self): | ||
for service, queue in self.queues.items(): | ||
ready_backends = self._get_available_backends(service) | ||
# logger.info("Service %s having queue lengths %s ready backends %s", service,len(queue),len(ready_backends)) | ||
while len(queue) and len(ready_backends): | ||
#batch_size = self.service_max_batch_size[service] | ||
batch_size = 5 | ||
for backend in ready_backends: | ||
if len(queue) == 0: | ||
break | ||
worker_handle = self.worker_handles[backend].popleft() | ||
pop_len = min(batch_size,len(queue)) | ||
request = [ queue.pop() for i in range(pop_len)] | ||
#print("putting request into work OID") | ||
worker_handle.__call__.remote(request) | ||
#print("called worker execute") | ||
|
||
ready_backends = self._get_available_backends(service) | ||
|
||
def _flush_single(self): | ||
for service, queue in self.queues.items(): | ||
ready_backends = self._get_available_backends(service) | ||
# logger.info("Service %s having queue lengths %s ready backends %s", service,len(queue),len(ready_backends)) | ||
while len(queue) and len(ready_backends): | ||
for backend in ready_backends: | ||
if len(queue) == 0: | ||
break | ||
worker_handle = self.worker_handles[backend].popleft() | ||
request = [queue.pop()] | ||
#print("put request in work OID") | ||
worker_handle.__call__.remote(request) | ||
#print("called worker execute") | ||
|
||
ready_backends = self._get_available_backends(service) | ||
|
||
|
||
@ray.remote | ||
class CentralizedQueuesActor(CentralizedQueues): | ||
self_handle = None | ||
|
||
def register_self_handle(self, handle_to_this_actor): | ||
self.self_handle = handle_to_this_actor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is what I am talking about |
||
|
||
def flush(self): | ||
if self.self_handle: | ||
self.self_handle._flush_single.remote() | ||
else: | ||
self._flush_single() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flush should happen asynchronously. Which means you need to do something like
self._router_actor_handle.flush.remote()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can register's actor's handle inside it's own actor.