From 33de677ae090fe46d4e5cbea3f792a3bb41fc363 Mon Sep 17 00:00:00 2001 From: Rafael Stahl Date: Mon, 13 May 2019 22:25:58 +0200 Subject: [PATCH] implement data reuse for LOP --- .gitignore | 3 + src/deepthings_edge.c | 200 ++++++++++++++++++++++++++------------- src/top.c | 7 +- src/weight_partitioner.c | 21 ++-- src/weight_partitioner.h | 1 + 5 files changed, 155 insertions(+), 77 deletions(-) diff --git a/.gitignore b/.gitignore index 8f23f61..be3de4b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ libdistriot.so obj/ models/yolo.weights + +.vscode +core diff --git a/src/deepthings_edge.c b/src/deepthings_edge.c index a5a873e..bdcc35e 100644 --- a/src/deepthings_edge.c +++ b/src/deepthings_edge.c @@ -147,8 +147,6 @@ static blob *process_task_weightpart(device_ctxt *ctxt, blob *task){ // If fused, process the next layer. if (is_weight_part_fused_layer(model, layer_id)) { - printf("processing fused layer\n"); - model->net->input = l->output; if (l->truth) { @@ -178,31 +176,10 @@ static blob *process_task_weightpart(device_ctxt *ctxt, blob *task){ blob *result = new_blob_and_copy_data(0, get_model_byte_size(model, layer_id), (uint8_t*)get_model_output(model, layer_id)); copy_blob_meta(result, task); annotate_blob(result, get_this_client_id(ctxt), 0, task->id); - printf("processed blob on cli: %d\n", get_blob_cli_id(result)); result->id = task->id; return result; } -static void print_out(layer *l) -{ - // Print basics of layer for debugging. - - printf("##########################\n"); - printf("outputs: %d\n", l->outputs); - printf("first value: %f\n", l->output[0]); - printf("last value: %f\n", l->output[l->outputs - 1]); - printf("##########################\n"); -} - -void print_part_data(float *data, int n) -{ - printf("~~~~~~~~~\n"); - printf("outputs: %d\n", n); - printf("first value: %f\n", data[0]); - printf("last value: %f\n", data[n - 1]); - printf("~~~~~~~~~\n"); -} - void partition_frame_and_perform_inference_thread(void *arg){ device_ctxt* ctxt = (device_ctxt*)arg; cnn_model* model = (cnn_model*)(ctxt->model); @@ -210,6 +187,8 @@ void partition_frame_and_perform_inference_thread(void *arg){ uint32_t frame_num; bool* reuse_data_is_required; + uint32_t time_start = sys_now(); + int32_t own_cli_id = get_this_client_id(ctxt); for(frame_num = 0; frame_num < FRAME_NUM; frame_num ++){ @@ -275,7 +254,8 @@ void partition_frame_and_perform_inference_thread(void *arg){ network *net = model->net; for (int i = model->ftp_para->fused_layers; i < net->n; i++){ - printf("===weight part: layer %d/%d\n", i, net->n - 1); + //printf("===weight part: layer %d/%d\n", i, net->n - 1); +printf("start layer: %d\n", sys_now() - time_start); layer *l = &net->layers[i]; net->index = i; if (l->delta){ @@ -283,30 +263,56 @@ void partition_frame_and_perform_inference_thread(void *arg){ } if (l->type == CONVOLUTIONAL){ - printf("==weight part: conv layer!\n"); - // Is a distributed layer. - // Partition the first layer: All nodes get same input. - //ctxt->weight_part_input = net->input; - blob *task_input = new_blob_and_move_data(i, l->inputs * sizeof(float), net->input); + // Create blobs for each partial result of the previous layer. + // Clients will not need the data of their partition if they calculated it before. + // See weight_partitioner.c:prune_filters() + int num_parts = ctxt->total_cli_num; + blob **task_inputs = malloc(num_parts * sizeof(blob*)); + for (int c = 0; c < ctxt->total_cli_num; c++){ + int partitionSize = l->c / num_parts; + int numFilters = partitionSize; + if (c == num_parts - 1){ + numFilters += l->c % num_parts; + } + + int inputChannelSize = l->w * l->h; + int inputPartSize = inputChannelSize * numFilters; + task_inputs[c] = new_blob_and_copy_data(i, inputPartSize * sizeof(float), net->input + c * inputChannelSize * partitionSize); + } + blob *dummy = new_blob_and_alloc_data(i, 1); + // into task queue? no, need id specific for (int target_cli_id = 0; target_cli_id < ctxt->total_cli_num; target_cli_id++){ - enqueue(ctxt->task_queue_weightpart[target_cli_id], task_input); + if (target_cli_id == ctxt->this_cli_id){ + enqueue(ctxt->task_queue_weightpart[target_cli_id], task_input); + } else { + for (int c = 0; c < ctxt->total_cli_num; c++){ + if (!is_entire_weightpart_input_required(model, i) && + c == target_cli_id){ + // Target cli can reuse data. Just queue a dummy segment. + enqueue(ctxt->task_queue_weightpart[target_cli_id], dummy); + } else { + enqueue(ctxt->task_queue_weightpart[target_cli_id], task_inputs[c]); + } + } + } } +printf("before processing: %d\n", sys_now() - time_start); // Process local tasks. while (1) { blob *task_wpart = try_dequeue(ctxt->task_queue_weightpart[own_cli_id]); if (!task_wpart){ - printf("===weight part: no more local tasks for me\n"); + //printf("===weight part: no more local tasks for me\n"); break; } - printf("===weight part: processing local task %d\n", task_wpart->id); + //printf("===weight part: processing local task %d\n", task_wpart->id); blob *result = process_task_weightpart(ctxt, task_wpart); free_blob(task_wpart); @@ -315,18 +321,24 @@ void partition_frame_and_perform_inference_thread(void *arg){ } // Wait for results. - printf("===weight part: waiting for other results\n"); + //printf("===weight part: waiting for other results\n"); blob *ready = dequeue(ctxt->ready_pool_weightpart); +printf("results ready: %d\n", sys_now() - time_start); // Only now we can free the input. free_blob(task_input); + for (int c = 0; c < ctxt->total_cli_num; c++){ + free_blob(task_inputs[c]); + } + free(task_inputs); + free_blob(dummy); // Merge outputs. int num_partitions = ctxt->total_cli_num; if (is_weight_part_fused_layer(model, i)) { // More elaborate merging: add outputs and finalize layer. - +printf("1: %d\n", sys_now() - time_start); // Advance one layer. i++; l = &net->layers[i]; @@ -335,10 +347,11 @@ void partition_frame_and_perform_inference_thread(void *arg){ fill_cpu(l->outputs * l->batch, 0, l->delta, 1); } - printf("##### processing output of merged layer %d + %d\n", i - 1, i); + //printf("##### processing output of merged layer %d + %d\n", i - 1, i); // Need to set to zero for fused layers, because results will be added on top. memset(l->output, 0, l->outputs * sizeof(float)); +printf("3: %d\n", sys_now() - time_start); for (int j = 0; j < num_partitions; j++) { @@ -349,7 +362,7 @@ void partition_frame_and_perform_inference_thread(void *arg){ printf("ERROR: got unexpected layer id!\n"); } - print_part_data(result->data, l->outputs); + //print_part_data(result->data, l->outputs); // Accumulate to the output. // TODO is there an nnpack accelerator for this? @@ -358,8 +371,10 @@ void partition_frame_and_perform_inference_thread(void *arg){ l->output[k] += ((float*)result->data)[k]; } } +printf("4: %d\n", sys_now() - time_start); finalize_weight_part_fused_output(l, net); +printf("5: %d\n", sys_now() - time_start); } else { // Simple concatenation of outputs. @@ -374,7 +389,6 @@ void partition_frame_and_perform_inference_thread(void *arg){ int partition_id = get_blob_cli_id(result); copy_weight_part_output(l, result->data, partition_id, num_partitions); - printf("copied weights from %d in layer %d\n", partition_id, i); } } }else{ @@ -382,7 +396,7 @@ void partition_frame_and_perform_inference_thread(void *arg){ l->forward(*l, *net); } - print_out(l); + //print_out(l); net->input = l->output; if (l->truth){ @@ -402,6 +416,8 @@ void partition_frame_and_perform_inference_thread(void *arg){ /*Unregister and prepare for next image*/ cancel_client(ctxt); } + + printf("Finished in %d ms\n", sys_now() - time_start); } @@ -416,36 +432,81 @@ void steal_weightpart_thread(void *arg){ steal_weightpart_args *args = (steal_weightpart_args*)arg; device_ctxt *ctxt = args->ctxt; - printf(">>> connecting to %d: %s\n", args->steal_from_cli_id, get_client_addr(args->steal_from_cli_id, ctxt)); - service_conn *conn = connect_service(TCP, get_client_addr(args->steal_from_cli_id, ctxt), WEIGHT_PART_PORT); +static int total_recv = 0; +static int total_send = 0; + + // Keep receiving data. while (1) { - // Wait for data. - printf(">>> waiting for data\n"); - blob *task = recv_data(conn); - if (task->id == -1){ - // No more tasks with this client. Close the connection. - printf(">>> closing the connection\n"); - free_blob(task); - close_service_connection(conn); - *args->done = true; - break; +uint32_t time_start = sys_now(); + + int layer_id = 0; + blob **tasks = malloc(ctxt->total_cli_num * sizeof(blob*)); + for (int c = 0; c < ctxt->total_cli_num; c++){ + tasks[c] = recv_data(conn); + total_recv += tasks[c]->size; + if (tasks[c]->id == -1){ + // No more tasks with this client. Close the connection. + free_blob(tasks[c]); + free(tasks); + close_service_connection(conn); + *args->done = true; + return; + } + layer_id = tasks[c]->id; + } + + cnn_model *model = (cnn_model*)ctxt->model; + layer *l = &model->net->layers[layer_id]; + + bool can_reuse = !is_entire_weightpart_input_required(model, layer_id); + + // Reassemble data. + blob *task = new_blob_and_alloc_data(layer_id, l->inputs * sizeof(float)); + int num_parts = ctxt->total_cli_num; + for (int c = 0; c < ctxt->total_cli_num; c++){ + int partitionSize = l->c / ctxt->total_cli_num; + int numFilters = partitionSize; + if (c == num_parts - 1){ + numFilters += l->c % num_parts; + } + int inputChannelSize = l->w * l->h; + int inputPartSize = inputChannelSize * numFilters; + + uint8_t *data_to_copy; + if (can_reuse && c == ctxt->this_cli_id){ + layer *prev_l = &model->net->layers[layer_id - 1]; + data_to_copy = prev_l->output; + } else { + data_to_copy = tasks[c]->data; + } + + memcpy(task->data + sizeof(float) * c * inputChannelSize * partitionSize, data_to_copy, inputPartSize * sizeof(float)); } // Process data. - printf(">>> processing data\n"); blob *result = process_task_weightpart(ctxt, task); free_blob(task); + for (int c = 0; c < ctxt->total_cli_num; c++){ + free_blob(tasks[c]); + } + free(tasks); /*enqueue(ctxt->result_queue_weightpart, result); free_blob(result);*/ // Why not send it back directly?? Try that first... - printf(">>> sending back the results. cli: %d\n", get_blob_cli_id(result)); + total_send += result->size; send_request("results_weight", 20, conn); send_data(result, conn); free_blob(result); + + // This is required because measurements become skewed if the os sometimes returns immediately to send. + blob *confirm = recv_data(conn); +printf("done: %d\n", sys_now() - time_start); + +printf("total recv: %d kB, total send: %d kB\n", total_recv / 1000, total_send / 1000); } } @@ -476,11 +537,9 @@ void steal_partition_and_perform_inference_thread(void *arg){ if (!thread_running) { - printf("spawning steal wpart service conn thread\n"); thread_running = true; t = sys_thread_new("steal_weightpart_thread", steal_weightpart_thread, &args, 0, 0); } else if (thread_done) { - printf("stop steal wpart service conn thread\n"); sys_thread_join(t); thread_running = false; thread_done = false; @@ -710,20 +769,21 @@ int on_weight_part_push(void *svc_conn, void *arg){ inet_ntop(conn->serv_addr_ptr->sin_family, &(conn->serv_addr_ptr->sin_addr), ip_addr, ADDRSTRLEN); int32_t cli_id = get_client_id(ip_addr, ctxt); - printf("waiting for data to push to %d\n", cli_id); +static int total_send = 0; + for (int c = 0; c < ctxt->total_cli_num; c++){ + blob *task = dequeue(ctxt->task_queue_weightpart[cli_id]); + if (task->id == -1) + { + // Stop the connection. + send_data(task, conn); + free_blob(task); + return 1; + } + total_send += task->size; - blob *task = dequeue(ctxt->task_queue_weightpart[cli_id]); - if (task->id == -1) - { - // Stop the connection. - printf("closing connection with %d\n", cli_id); send_data(task, conn); - free_blob(task); - return 1; } - - printf("sending task data to %d\n", cli_id); - send_data(task, conn); +printf("total send: %d kB\n", total_send / 1000); return 0; } @@ -736,10 +796,10 @@ void store_weight_part_result(device_ctxt *ctxt, blob *result) uint32_t num_nodes = enqueue(ctxt->results_pool_weightpart, result); - printf("gathering result weightpart cli: %d, frame: %d, task: %d complete: %d/%d\n", cli_id, frame_seq, task_id, num_nodes, ctxt->total_cli_num); + //printf("gathering result weightpart cli: %d, frame: %d, task: %d complete: %d/%d\n", cli_id, frame_seq, task_id, num_nodes, ctxt->total_cli_num); if (num_nodes == ctxt->total_cli_num){ - printf("Weight results ready!\n"); + //printf("Weight results ready!\n"); blob *ready = new_empty_blob(cli_id); annotate_blob(ready, cli_id, frame_seq, task_id); enqueue(ctxt->ready_pool_weightpart, ready); @@ -752,8 +812,14 @@ void *on_weight_part_results(void *svc_conn, void *arg){ device_ctxt *ctxt = (device_ctxt*)arg; blob *result = recv_data(conn); + static int total_recv = 0; + total_recv += result->size; +printf("total recv: %d kB\n", total_recv / 1000); store_weight_part_result(ctxt, result); free_blob(result); + + blob *confirm = new_empty_blob(0); + send_data(confirm, conn); } void weight_part_service_thread(void *arg){ diff --git a/src/top.c b/src/top.c index f3002d6..87240d2 100644 --- a/src/top.c +++ b/src/top.c @@ -26,12 +26,12 @@ static const char* addr_list[MAX_EDGE_NUM] = EDGE_ADDR_LIST; int main(int argc, char **argv){ - uint32_t total_cli_num = 0; uint32_t this_cli_id = 0; uint32_t partitions_h = get_int_arg(argc, argv, "-n", 5); uint32_t partitions_w = get_int_arg(argc, argv, "-m", 5); uint32_t fused_layers = get_int_arg(argc, argv, "-l", 16); + uint32_t total_cli_num = get_int_arg(argc, argv, "-total_edge", 0); char network_file[30] = "models/yolo.cfg"; char weight_file[30] = "models/yolo.weights"; @@ -42,18 +42,17 @@ int main(int argc, char **argv){ }else if(0 == strcmp(get_string_arg(argc, argv, "-mode", "none"), "gateway")){ printf("Gateway device\n"); printf("We have %d edge devices now\n", get_int_arg(argc, argv, "-total_edge", 0)); - total_cli_num = get_int_arg(argc, argv, "-total_edge", 0); deepthings_gateway(partitions_h, partitions_w, fused_layers, network_file, weight_file, total_cli_num, addr_list); }else if(0 == strcmp(get_string_arg(argc, argv, "-mode", "none"), "data_src")){ printf("Data source edge device\n"); printf("This client ID is %d\n", get_int_arg(argc, argv, "-edge_id", 0)); this_cli_id = get_int_arg(argc, argv, "-edge_id", 0); - deepthings_victim_edge(partitions_h, partitions_w, fused_layers, network_file, weight_file, this_cli_id, MAX_EDGE_NUM, addr_list); + deepthings_victim_edge(partitions_h, partitions_w, fused_layers, network_file, weight_file, this_cli_id, total_cli_num, addr_list); }else if(0 == strcmp(get_string_arg(argc, argv, "-mode", "none"), "non_data_src")){ printf("Idle edge device\n"); printf("This client ID is %d\n", get_int_arg(argc, argv, "-edge_id", 0)); this_cli_id = get_int_arg(argc, argv, "-edge_id", 0); - deepthings_stealer_edge(partitions_h, partitions_w, fused_layers, network_file, weight_file, this_cli_id, MAX_EDGE_NUM, addr_list); + deepthings_stealer_edge(partitions_h, partitions_w, fused_layers, network_file, weight_file, this_cli_id, total_cli_num, addr_list); } return 0; } diff --git a/src/weight_partitioner.c b/src/weight_partitioner.c index 8750177..dcaa307 100644 --- a/src/weight_partitioner.c +++ b/src/weight_partitioner.c @@ -99,6 +99,21 @@ bool is_weight_part_fused_layer(cnn_model *model, int layer_id) return false; } +bool is_entire_weightpart_input_required(cnn_model *model, int layer_id) +{ + if (layer_id <= model->ftp_para->fused_layers) + { + return true; + } + network *net = model->net; + if (is_weight_part_fused_layer(model, layer_id - 1) || is_weight_part_fused_layer(model, layer_id - 2)) + { + return true; + } + layer *prev_l = &net->layers[layer_id - 1]; + return prev_l->type != CONVOLUTIONAL; +} + void load_partitioned_weights(cnn_model *model, int32_t cli_id, int num_partitions) { // FIXME: this is just pruning weights that have already been loaded. it should only load the neccessary weights in @@ -136,7 +151,6 @@ void load_partitioned_weights(cnn_model *model, int32_t cli_id, int num_partitio para->num_part_layers++; prune_filters(l, partition_id, num_partitions); - printf("layer %d: orignumfilt: %d, numfilt: %d\n", i, l->extra, l->n); // continue; /// SKIP FUSING @@ -170,14 +184,9 @@ void copy_weight_part_output(layer *l, float *data, int partition_id, int num_pa num_filters += orig_num_filters % num_partitions; } - printf("restoring tiles: orignumfilt: %d, numfilt: %d, for this(%d): %d\n", orig_num_filters, l->n, partition_id, - num_filters); - int out_offset = partition_id * l->w * l->h * partition_size; int out_size = l->w * l->h * num_filters; - printf("out offset: %d, out_sz: %d\n", out_offset, out_size); - memcpy(l->output + out_offset, data, out_size * sizeof(float)); } diff --git a/src/weight_partitioner.h b/src/weight_partitioner.h index b7496e8..de12edd 100644 --- a/src/weight_partitioner.h +++ b/src/weight_partitioner.h @@ -26,6 +26,7 @@ typedef struct bool is_weight_part_fused_layer(cnn_model *model, int layer_id); +bool is_entire_weightpart_input_required(cnn_model *model, int layer_id); void load_partitioned_weights(cnn_model *model, int32_t cli_id, int num_partitions);