Skip to content

Commit

Permalink
implement data reuse for LOP
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafael Stahl committed May 13, 2019
1 parent bbcf488 commit 33de677
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 77 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ libdistriot.so
obj/

models/yolo.weights

.vscode
core
200 changes: 133 additions & 67 deletions src/deepthings_edge.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ static blob *process_task_weightpart(device_ctxt *ctxt, blob *task){
// If fused, process the next layer.
if (is_weight_part_fused_layer(model, layer_id))
{
printf("processing fused layer\n");

model->net->input = l->output;
if (l->truth)
{
Expand Down Expand Up @@ -178,38 +176,19 @@ static blob *process_task_weightpart(device_ctxt *ctxt, blob *task){
blob *result = new_blob_and_copy_data(0, get_model_byte_size(model, layer_id), (uint8_t*)get_model_output(model, layer_id));
copy_blob_meta(result, task);
annotate_blob(result, get_this_client_id(ctxt), 0, task->id);
printf("processed blob on cli: %d\n", get_blob_cli_id(result));
result->id = task->id;
return result;
}

static void print_out(layer *l)
{
// Print basics of layer for debugging.

printf("##########################\n");
printf("outputs: %d\n", l->outputs);
printf("first value: %f\n", l->output[0]);
printf("last value: %f\n", l->output[l->outputs - 1]);
printf("##########################\n");
}

void print_part_data(float *data, int n)
{
printf("~~~~~~~~~\n");
printf("outputs: %d\n", n);
printf("first value: %f\n", data[0]);
printf("last value: %f\n", data[n - 1]);
printf("~~~~~~~~~\n");
}

void partition_frame_and_perform_inference_thread(void *arg){
device_ctxt* ctxt = (device_ctxt*)arg;
cnn_model* model = (cnn_model*)(ctxt->model);
blob* temp;
uint32_t frame_num;
bool* reuse_data_is_required;

uint32_t time_start = sys_now();

int32_t own_cli_id = get_this_client_id(ctxt);

for(frame_num = 0; frame_num < FRAME_NUM; frame_num ++){
Expand Down Expand Up @@ -275,38 +254,65 @@ void partition_frame_and_perform_inference_thread(void *arg){

network *net = model->net;
for (int i = model->ftp_para->fused_layers; i < net->n; i++){
printf("===weight part: layer %d/%d\n", i, net->n - 1);
//printf("===weight part: layer %d/%d\n", i, net->n - 1);
printf("start layer: %d\n", sys_now() - time_start);
layer *l = &net->layers[i];
net->index = i;
if (l->delta){
fill_cpu(l->outputs * l->batch, 0, l->delta, 1);
}

if (l->type == CONVOLUTIONAL){
printf("==weight part: conv layer!\n");

// Is a distributed layer.

// Partition the first layer: All nodes get same input.
//ctxt->weight_part_input = net->input;

blob *task_input = new_blob_and_move_data(i, l->inputs * sizeof(float), net->input);

// Create blobs for each partial result of the previous layer.
// Clients will not need the data of their partition if they calculated it before.
// See weight_partitioner.c:prune_filters()
int num_parts = ctxt->total_cli_num;
blob **task_inputs = malloc(num_parts * sizeof(blob*));
for (int c = 0; c < ctxt->total_cli_num; c++){
int partitionSize = l->c / num_parts;
int numFilters = partitionSize;
if (c == num_parts - 1){
numFilters += l->c % num_parts;
}

int inputChannelSize = l->w * l->h;
int inputPartSize = inputChannelSize * numFilters;
task_inputs[c] = new_blob_and_copy_data(i, inputPartSize * sizeof(float), net->input + c * inputChannelSize * partitionSize);
}
blob *dummy = new_blob_and_alloc_data(i, 1);

// into task queue? no, need id specific
for (int target_cli_id = 0; target_cli_id < ctxt->total_cli_num; target_cli_id++){
enqueue(ctxt->task_queue_weightpart[target_cli_id], task_input);
if (target_cli_id == ctxt->this_cli_id){
enqueue(ctxt->task_queue_weightpart[target_cli_id], task_input);
} else {
for (int c = 0; c < ctxt->total_cli_num; c++){
if (!is_entire_weightpart_input_required(model, i) &&
c == target_cli_id){
// Target cli can reuse data. Just queue a dummy segment.
enqueue(ctxt->task_queue_weightpart[target_cli_id], dummy);
} else {
enqueue(ctxt->task_queue_weightpart[target_cli_id], task_inputs[c]);
}
}
}
}
printf("before processing: %d\n", sys_now() - time_start);

// Process local tasks.
while (1)
{
blob *task_wpart = try_dequeue(ctxt->task_queue_weightpart[own_cli_id]);
if (!task_wpart){
printf("===weight part: no more local tasks for me\n");
//printf("===weight part: no more local tasks for me\n");
break;
}

printf("===weight part: processing local task %d\n", task_wpart->id);
//printf("===weight part: processing local task %d\n", task_wpart->id);

blob *result = process_task_weightpart(ctxt, task_wpart);
free_blob(task_wpart);
Expand All @@ -315,18 +321,24 @@ void partition_frame_and_perform_inference_thread(void *arg){
}

// Wait for results.
printf("===weight part: waiting for other results\n");
//printf("===weight part: waiting for other results\n");
blob *ready = dequeue(ctxt->ready_pool_weightpart);
printf("results ready: %d\n", sys_now() - time_start);

// Only now we can free the input.
free_blob(task_input);
for (int c = 0; c < ctxt->total_cli_num; c++){
free_blob(task_inputs[c]);
}
free(task_inputs);
free_blob(dummy);

// Merge outputs.
int num_partitions = ctxt->total_cli_num;
if (is_weight_part_fused_layer(model, i))
{
// More elaborate merging: add outputs and finalize layer.

printf("1: %d\n", sys_now() - time_start);
// Advance one layer.
i++;
l = &net->layers[i];
Expand All @@ -335,10 +347,11 @@ void partition_frame_and_perform_inference_thread(void *arg){
fill_cpu(l->outputs * l->batch, 0, l->delta, 1);
}

printf("##### processing output of merged layer %d + %d\n", i - 1, i);
//printf("##### processing output of merged layer %d + %d\n", i - 1, i);

// Need to set to zero for fused layers, because results will be added on top.
memset(l->output, 0, l->outputs * sizeof(float));
printf("3: %d\n", sys_now() - time_start);

for (int j = 0; j < num_partitions; j++)
{
Expand All @@ -349,7 +362,7 @@ void partition_frame_and_perform_inference_thread(void *arg){
printf("ERROR: got unexpected layer id!\n");
}

print_part_data(result->data, l->outputs);
//print_part_data(result->data, l->outputs);

// Accumulate to the output.
// TODO is there an nnpack accelerator for this?
Expand All @@ -358,8 +371,10 @@ void partition_frame_and_perform_inference_thread(void *arg){
l->output[k] += ((float*)result->data)[k];
}
}
printf("4: %d\n", sys_now() - time_start);

finalize_weight_part_fused_output(l, net);
printf("5: %d\n", sys_now() - time_start);
} else {
// Simple concatenation of outputs.

Expand All @@ -374,15 +389,14 @@ void partition_frame_and_perform_inference_thread(void *arg){

int partition_id = get_blob_cli_id(result);
copy_weight_part_output(l, result->data, partition_id, num_partitions);
printf("copied weights from %d in layer %d\n", partition_id, i);
}
}
}else{
// Not convolutional. Execute locally.
l->forward(*l, *net);
}

print_out(l);
//print_out(l);

net->input = l->output;
if (l->truth){
Expand All @@ -402,6 +416,8 @@ void partition_frame_and_perform_inference_thread(void *arg){
/*Unregister and prepare for next image*/
cancel_client(ctxt);
}

printf("Finished in %d ms\n", sys_now() - time_start);
}


Expand All @@ -416,36 +432,81 @@ void steal_weightpart_thread(void *arg){
steal_weightpart_args *args = (steal_weightpart_args*)arg;
device_ctxt *ctxt = args->ctxt;

printf(">>> connecting to %d: %s\n", args->steal_from_cli_id, get_client_addr(args->steal_from_cli_id, ctxt));

service_conn *conn = connect_service(TCP, get_client_addr(args->steal_from_cli_id, ctxt), WEIGHT_PART_PORT);

static int total_recv = 0;
static int total_send = 0;

// Keep receiving data.
while (1)
{
// Wait for data.
printf(">>> waiting for data\n");
blob *task = recv_data(conn);
if (task->id == -1){
// No more tasks with this client. Close the connection.
printf(">>> closing the connection\n");
free_blob(task);
close_service_connection(conn);
*args->done = true;
break;
uint32_t time_start = sys_now();

int layer_id = 0;
blob **tasks = malloc(ctxt->total_cli_num * sizeof(blob*));
for (int c = 0; c < ctxt->total_cli_num; c++){
tasks[c] = recv_data(conn);
total_recv += tasks[c]->size;
if (tasks[c]->id == -1){
// No more tasks with this client. Close the connection.
free_blob(tasks[c]);
free(tasks);
close_service_connection(conn);
*args->done = true;
return;
}
layer_id = tasks[c]->id;
}

cnn_model *model = (cnn_model*)ctxt->model;
layer *l = &model->net->layers[layer_id];

bool can_reuse = !is_entire_weightpart_input_required(model, layer_id);

// Reassemble data.
blob *task = new_blob_and_alloc_data(layer_id, l->inputs * sizeof(float));
int num_parts = ctxt->total_cli_num;
for (int c = 0; c < ctxt->total_cli_num; c++){
int partitionSize = l->c / ctxt->total_cli_num;
int numFilters = partitionSize;
if (c == num_parts - 1){
numFilters += l->c % num_parts;
}
int inputChannelSize = l->w * l->h;
int inputPartSize = inputChannelSize * numFilters;

uint8_t *data_to_copy;
if (can_reuse && c == ctxt->this_cli_id){
layer *prev_l = &model->net->layers[layer_id - 1];
data_to_copy = prev_l->output;
} else {
data_to_copy = tasks[c]->data;
}

memcpy(task->data + sizeof(float) * c * inputChannelSize * partitionSize, data_to_copy, inputPartSize * sizeof(float));
}

// Process data.
printf(">>> processing data\n");
blob *result = process_task_weightpart(ctxt, task);
free_blob(task);
for (int c = 0; c < ctxt->total_cli_num; c++){
free_blob(tasks[c]);
}
free(tasks);
/*enqueue(ctxt->result_queue_weightpart, result);
free_blob(result);*/

// Why not send it back directly?? Try that first...
printf(">>> sending back the results. cli: %d\n", get_blob_cli_id(result));
total_send += result->size;
send_request("results_weight", 20, conn);
send_data(result, conn);
free_blob(result);

// This is required because measurements become skewed if the os sometimes returns immediately to send.
blob *confirm = recv_data(conn);
printf("done: %d\n", sys_now() - time_start);

printf("total recv: %d kB, total send: %d kB\n", total_recv / 1000, total_send / 1000);
}
}

Expand Down Expand Up @@ -476,11 +537,9 @@ void steal_partition_and_perform_inference_thread(void *arg){

if (!thread_running)
{
printf("spawning steal wpart service conn thread\n");
thread_running = true;
t = sys_thread_new("steal_weightpart_thread", steal_weightpart_thread, &args, 0, 0);
} else if (thread_done) {
printf("stop steal wpart service conn thread\n");
sys_thread_join(t);
thread_running = false;
thread_done = false;
Expand Down Expand Up @@ -710,20 +769,21 @@ int on_weight_part_push(void *svc_conn, void *arg){
inet_ntop(conn->serv_addr_ptr->sin_family, &(conn->serv_addr_ptr->sin_addr), ip_addr, ADDRSTRLEN);
int32_t cli_id = get_client_id(ip_addr, ctxt);

printf("waiting for data to push to %d\n", cli_id);
static int total_send = 0;
for (int c = 0; c < ctxt->total_cli_num; c++){
blob *task = dequeue(ctxt->task_queue_weightpart[cli_id]);
if (task->id == -1)
{
// Stop the connection.
send_data(task, conn);
free_blob(task);
return 1;
}
total_send += task->size;

blob *task = dequeue(ctxt->task_queue_weightpart[cli_id]);
if (task->id == -1)
{
// Stop the connection.
printf("closing connection with %d\n", cli_id);
send_data(task, conn);
free_blob(task);
return 1;
}

printf("sending task data to %d\n", cli_id);
send_data(task, conn);
printf("total send: %d kB\n", total_send / 1000);

return 0;
}
Expand All @@ -736,10 +796,10 @@ void store_weight_part_result(device_ctxt *ctxt, blob *result)

uint32_t num_nodes = enqueue(ctxt->results_pool_weightpart, result);

printf("gathering result weightpart cli: %d, frame: %d, task: %d complete: %d/%d\n", cli_id, frame_seq, task_id, num_nodes, ctxt->total_cli_num);
//printf("gathering result weightpart cli: %d, frame: %d, task: %d complete: %d/%d\n", cli_id, frame_seq, task_id, num_nodes, ctxt->total_cli_num);

if (num_nodes == ctxt->total_cli_num){
printf("Weight results ready!\n");
//printf("Weight results ready!\n");
blob *ready = new_empty_blob(cli_id);
annotate_blob(ready, cli_id, frame_seq, task_id);
enqueue(ctxt->ready_pool_weightpart, ready);
Expand All @@ -752,8 +812,14 @@ void *on_weight_part_results(void *svc_conn, void *arg){
device_ctxt *ctxt = (device_ctxt*)arg;

blob *result = recv_data(conn);
static int total_recv = 0;
total_recv += result->size;
printf("total recv: %d kB\n", total_recv / 1000);
store_weight_part_result(ctxt, result);
free_blob(result);

blob *confirm = new_empty_blob(0);
send_data(confirm, conn);
}

void weight_part_service_thread(void *arg){
Expand Down
Loading

0 comments on commit 33de677

Please sign in to comment.