Skip to content

Commit

Permalink
add sequential layer mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafael Stahl committed Aug 26, 2020
1 parent ad1b146 commit 7c74c66
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 9 deletions.
113 changes: 106 additions & 7 deletions src/deepthings_edge.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ device_ctxt* deepthings_edge_init(uint32_t N, uint32_t M, uint32_t fused_layers,
case 32:
// YOLOv2
fused_layers = 12;
set_lt(lt, 12, LAYER_PART_TYPE_FUSE1, fused_layers);
/*set_lt(lt, 12, LAYER_PART_TYPE_FUSE1, fused_layers);
set_lt(lt, 13, LAYER_PART_TYPE_FUSE2, fused_layers);
set_lt(lt, 14, LAYER_PART_TYPE_FUSE1, fused_layers);
set_lt(lt, 15, LAYER_PART_TYPE_FUSE2, fused_layers);
Expand All @@ -88,7 +88,29 @@ device_ctxt* deepthings_edge_init(uint32_t N, uint32_t M, uint32_t fused_layers,
set_lt(lt, 24, LAYER_PART_TYPE_FUSE2, fused_layers);
set_lt(lt, 26, LAYER_PART_TYPE_LIP, fused_layers);
set_lt(lt, 29, LAYER_PART_TYPE_FUSE1, fused_layers);
set_lt(lt, 30, LAYER_PART_TYPE_FUSE2, fused_layers);*/

set_lt(lt, 12, LAYER_PART_TYPE_SEQ, fused_layers);
set_lt(lt, 13, LAYER_PART_TYPE_FUSE2, fused_layers);
set_lt(lt, 14, LAYER_PART_TYPE_FUSE2, fused_layers);
set_lt(lt, 15, LAYER_PART_TYPE_FUSE2, fused_layers);
set_lt(lt, 16, LAYER_PART_TYPE_FUSE2, fused_layers);

set_lt(lt, 18, LAYER_PART_TYPE_SEQ, fused_layers);
set_lt(lt, 19, LAYER_PART_TYPE_FUSE2, fused_layers);
set_lt(lt, 20, LAYER_PART_TYPE_FUSE2, fused_layers);
set_lt(lt, 21, LAYER_PART_TYPE_FUSE2, fused_layers);
set_lt(lt, 22, LAYER_PART_TYPE_FUSE2, fused_layers);

set_lt(lt, 23, LAYER_PART_TYPE_SEQ, fused_layers);

set_lt(lt, 24, LAYER_PART_TYPE_SEQ, fused_layers);

set_lt(lt, 26, LAYER_PART_TYPE_SEQ, fused_layers);

set_lt(lt, 29, LAYER_PART_TYPE_SEQ, fused_layers);
set_lt(lt, 30, LAYER_PART_TYPE_FUSE2, fused_layers);

break;
default:
printf("Unknown model! Not applying any OPFD layers\n");
Expand Down Expand Up @@ -256,11 +278,12 @@ static blob *process_task_weightpart(device_ctxt *ctxt, blob *task){
layer *l = &model->net->layers[layer_id];
bool is_lip = is_lip_layer(model, layer_id);
bool is_fused = is_weight_part_fused_layer(model, layer_id);
bool is_seq = is_seq_layer(model, layer_id);

if (!is_lip)
{
#ifdef NNPACK
forward_convolutional_layer_nnpack(*l, *model->net);
forward_convolutional_layer_nnpack(*l, *model->net);
#else
forward_convolutional_layer(*l, *model->net);
#endif
Expand Down Expand Up @@ -324,6 +347,32 @@ static blob *process_task_weightpart(device_ctxt *ctxt, blob *task){
#endif
}

if (is_seq) {
while (is_weight_part_fused_layer2(model, layer_id+1)) {
model->net->input = l->output;
if (l->truth)
{
model->net->truth = l->output;
}

layer_id++;
l = &model->net->layers[layer_id];
model->net->index = layer_id;
if (l->delta) {
fill_cpu(l->outputs * l->batch, 0, l->delta, 1);
}
if (l->type == CONVOLUTIONAL) {
#ifdef NNPACK
forward_convolutional_layer_nnpack(*l, *model->net);
#else
forward_convolutional_layer(*l, *model->net);
#endif
} else {
l->forward(*l, *model->net);
}
}
}

blob *result = new_blob_and_copy_data(0, get_model_byte_size(model, layer_id), (uint8_t*)get_model_output(model, layer_id));
copy_blob_meta(result, task);
annotate_blob(result, get_this_client_id(ctxt), 0, task->id);
Expand Down Expand Up @@ -432,6 +481,7 @@ void partition_frame_and_perform_inference_thread(void *arg){
set_model_input(model, fused_output);
}

int numSeqStarts = 0;
int opfd_start = model->ftp_para->fused_layers > first_conv_layer ? model->ftp_para->fused_layers : first_conv_layer;
for (int i = opfd_start; i < net->n; i++){
//printf("===weight part: layer %d/%d\n", i, net->n - 1);
Expand Down Expand Up @@ -467,6 +517,28 @@ printf("start layer %i at %d\n", i, sys_now() - time_start);
enqueue(ctxt->task_queue_weightpart[target_cli_id], task_inputs[target_cli_id]);
}
}
else if (is_seq_layer(model, i)) {
// sequential layer mapping.
if (numSeqStarts == 0) {
enqueue(ctxt->task_queue_weightpart[ctxt->this_cli_id], task_input);
} else {
// distribute to other clis in order
int target_cli_id = numSeqStarts;
if (target_cli_id <= ctxt->this_cli_id) {
target_cli_id--;
}
enqueue(ctxt->task_queue_weightpart[target_cli_id], task_input);
}
numSeqStarts++;

// fill results queue with dummies such that the ready queue will trigger on one result.
//blob *ready = new_empty_blob(cli_id);
//annotate_blob(ready, cli_id, frame_seq, task_id);
for (int d = 0; d < ctxt->total_cli_num - 1; d++) {
enqueue(ctxt->results_pool_weightpart, dummy);
}
//free_blob(ready);
}
else
{
for (int target_cli_id = 0; target_cli_id < ctxt->total_cli_num; target_cli_id++){
Expand Down Expand Up @@ -525,8 +597,8 @@ printf("results ready: %d\n", sys_now() - time_start);
printf("1: %d\n", sys_now() - time_start);
if (is_weight_part_fused_layer(model, i))
{
// Advance one layer.
i++;
// Advance one layer.
i++;
//printf("##### processing output of merged layer %d + %d\n", i - 1, i);
}
l = &net->layers[i];
Expand Down Expand Up @@ -562,6 +634,33 @@ printf("4: %d\n", sys_now() - time_start);

finalize_weight_part_fused_output(l, net);
printf("5: %d\n", sys_now() - time_start);
} else if (is_seq_layer(model, i)) {
// Take result as is.

// dequeue dummies.
for (int d = 0; d < ctxt->total_cli_num - 1; d++) {
blob *trash = dequeue(ctxt->results_pool_weightpart);
free_blob(trash);
}

blob *result = dequeue(ctxt->results_pool_weightpart);
int layer_id = result->id;
if (layer_id != i)
{
printf("ERROR: got unexpected layer id!\n");
}

// advance inference by number of sequential layers.
while (is_weight_part_fused_layer2(model, i+1)) {
i++;
}
l = &net->layers[i];
net->index = i;
if (l->delta){
fill_cpu(l->outputs * l->batch, 0, l->delta, 1);
}

memcpy(l->output, result->data, result->size);
} else {
// Simple concatenation of outputs.

Expand Down Expand Up @@ -686,7 +785,7 @@ uint32_t time_start = sys_now();
layer *l = &model->net->layers[layer_id];
blob *task;

if (is_lip_layer(model, layer_id))
if (is_lip_layer(model, layer_id) || is_seq_layer(model, layer_id))
{
task = new_blob_and_alloc_data(layer_id, tasks[0]->size);
memcpy(task->data, tasks[0]->data, tasks[0]->size);
Expand Down Expand Up @@ -726,7 +825,7 @@ uint32_t time_start = sys_now();
blob *result = process_task_weightpart(ctxt, task);
free_blob(task);
free_blob(tasks[0]);
if (!is_lip_layer(model, layer_id))
if (!is_lip_layer(model, layer_id) && !is_seq_layer(model, layer_id))
{
for (int c = 1; c < ctxt->total_cli_num; c++){
free_blob(tasks[c]);
Expand Down Expand Up @@ -1021,7 +1120,7 @@ static int total_send = 0;
total_send += task->size;
send_data(task, conn);
int layer_id = task->id;
if (!is_lip_layer(ctxt->model, layer_id))
if (!is_lip_layer(ctxt->model, layer_id) && !is_seq_layer(ctxt->model, layer_id))
{
for (int c = 1; c < ctxt->total_cli_num; c++)
{
Expand Down
32 changes: 31 additions & 1 deletion src/weight_partitioner.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,21 @@ bool is_weight_part_fused_layer(cnn_model *model, int layer_id)
return model->weight_part_para.type[layer_id] == LAYER_PART_TYPE_FUSE1;
}

bool is_weight_part_fused_layer2(cnn_model *model, int layer_id)
{
return model->weight_part_para.type[layer_id] == LAYER_PART_TYPE_FUSE2;
}

bool is_lip_layer(cnn_model *model, int layer_id)
{
return model->weight_part_para.type[layer_id] == LAYER_PART_TYPE_LIP;
}

bool is_seq_layer(cnn_model *model, int layer_id)
{
return model->weight_part_para.type[layer_id] == LAYER_PART_TYPE_SEQ;
}

bool can_reuse_lop_output(cnn_model *model, int layer_id)
{
return model->weight_part_para.type[layer_id - 1] == LAYER_PART_TYPE_LOP &&
Expand All @@ -128,6 +138,7 @@ void load_partitioned_weights(cnn_model *model, int32_t cli_id, int num_partitio

// TODO: for now this simply gives each client a weight partition.
int partition_id = cli_id % num_partitions;
int numSeqStarts = 0;

for (int i = 0; i < net->n; i++)
{
Expand All @@ -144,7 +155,11 @@ void load_partitioned_weights(cnn_model *model, int32_t cli_id, int num_partitio
}
if (l->type != CONVOLUTIONAL)
{
printf("Invalid layer type: Partitioned must be convolutional\n");
if (para->type[i] == LAYER_PART_TYPE_FUSE2) {
// ok because of seq.
continue;
}
printf("Invalid layer type at %i: Partitioned must be convolutional\n", i);
exit(1);
}

Expand All @@ -162,7 +177,20 @@ void load_partitioned_weights(cnn_model *model, int32_t cli_id, int num_partitio
case LAYER_PART_TYPE_LOP:
prune_filters(l, partition_id, num_partitions);
break;
case LAYER_PART_TYPE_SEQ:
if (numSeqStarts != cli_id) {
free(l->weights);
}
numSeqStarts++;
break;
case LAYER_PART_TYPE_FUSE2:
if (i - 1 >= 0 && (para->type[i-1] == LAYER_PART_TYPE_SEQ || para->type[i-1] == LAYER_PART_TYPE_FUSE2))
{
if (numSeqStarts-1 != cli_id) {
free(l->weights);
}
continue;
}
if (i - 1 < 0 || para->type[i-1] != LAYER_PART_TYPE_FUSE1)
{
printf("Invalid layer type: F2 must be preceded by F1\n");
Expand Down Expand Up @@ -257,6 +285,8 @@ const char *get_layer_type_name(enum layer_partition_type type)
return "FUSE1";
case LAYER_PART_TYPE_FUSE2:
return "FUSE2";
case LAYER_PART_TYPE_SEQ:
return "SEQUENTIAL";
default:
return "Unknown";
}
Expand Down
5 changes: 4 additions & 1 deletion src/weight_partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ enum layer_partition_type
LAYER_PART_TYPE_LOP,
LAYER_PART_TYPE_LIP,
LAYER_PART_TYPE_FUSE1,
LAYER_PART_TYPE_FUSE2
LAYER_PART_TYPE_FUSE2,
LAYER_PART_TYPE_SEQ
};

typedef struct
Expand All @@ -28,7 +29,9 @@ typedef struct


bool is_weight_part_fused_layer(cnn_model *model, int layer_id);
bool is_weight_part_fused_layer2(cnn_model *model, int layer_id);
bool is_lip_layer(cnn_model *model, int layer_id);
bool is_seq_layer(cnn_model *model, int layer_id);
bool can_reuse_lop_output(cnn_model *model, int layer_id);

void load_partitioned_weights(cnn_model *model, int32_t cli_id, int num_partitions);
Expand Down

0 comments on commit 7c74c66

Please sign in to comment.