Skip to content

Commit

Permalink
impl weight part work distribution; misc:
Browse files Browse the repository at this point in the history
- make enqueue return numnodes to have thread safe result
- add parallel push service as stand-in for an async service
  • Loading branch information
Rafael Stahl committed Sep 5, 2018
1 parent a906a15 commit c21498e
Show file tree
Hide file tree
Showing 8 changed files with 519 additions and 66 deletions.
6 changes: 6 additions & 0 deletions distriot/src/global_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ typedef struct dev_ctxt {
thread_safe_queue* result_queue;
uint32_t this_cli_id;

// Weight partitioning queues.
thread_safe_queue *results_pool_weightpart;
thread_safe_queue *ready_pool_weightpart;
thread_safe_queue **task_queue_weightpart;
thread_safe_queue *result_queue_weightpart;

uint32_t batch_size;/*Number of tasks to merge*/
void *model;/*pointers to execution model*/
uint32_t total_frames;/*max number of input frames*/
Expand Down
163 changes: 142 additions & 21 deletions distriot/src/network_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ static inline void write_to_sock(int sock, ctrl_proto proto, uint8_t* buffer, ui
static inline service_conn* new_service_conn(int sockfd, ctrl_proto proto, const char *dest_ip, struct sockaddr_in* addr, int portno);
#elif IPV6_TASK/*IPV4_TASK*/
static inline service_conn* new_service_conn(int sockfd, ctrl_proto proto, const char *dest_ip, struct sockaddr_in6* addr, int portno);
#endif/*IPV4_TASK*/
#endif/*IPV4_TASK*/

int service_init(int portno, ctrl_proto proto){
int sockfd;
Expand Down Expand Up @@ -42,7 +42,7 @@ int service_init(int portno, ctrl_proto proto){
printf("ERROR on binding\n");
exit(EXIT_FAILURE);
}
if (proto == TCP) listen(sockfd, 10);/*back_log numbers*/
if (proto == TCP) listen(sockfd, 10);/*back_log numbers*/
return sockfd;
}

Expand Down Expand Up @@ -155,7 +155,7 @@ blob* recv_data(service_conn* conn){
void send_request(void* req, uint32_t req_size, service_conn* conn){
blob* temp = new_blob_and_copy_data(0, req_size, req);
send_data(temp, conn);
free_blob(temp);
free_blob(temp);
}

static uint8_t* recv_request(service_conn* conn){
Expand All @@ -164,8 +164,8 @@ static uint8_t* recv_request(service_conn* conn){
temp = recv_data(conn);
req = (uint8_t*)malloc(sizeof(uint8_t)*(temp->size));
memcpy(req, temp->data, temp->size);
free_blob(temp);
return req;
free_blob(temp);
return req;
}

static inline uint32_t look_up_handler_table(char* name, const char* handler_name[], uint32_t handler_num){
Expand Down Expand Up @@ -196,8 +196,8 @@ void start_service_for_n_times(int sockfd, ctrl_proto proto, const char* handler
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
}else if(proto == UDP){
newsockfd = sockfd;
}else{
printf("Protocol is not supported\n");
}else{
printf("Protocol is not supported\n");
return;
}
if (newsockfd < 0) {printf("ERROR on accept\n");return;}
Expand All @@ -206,8 +206,8 @@ void start_service_for_n_times(int sockfd, ctrl_proto proto, const char* handler

/*First recv the request and look up the handler table*/
req = recv_request(conn);
handler_id = look_up_handler_table((char*)req, handler_name, handler_num);
handler_id = look_up_handler_table((char*)req, handler_name, handler_num);

free(req);
if(handler_id == handler_num){printf("Operation is not supported!\n"); return;}
/*Recv meta control data and pick up the correct handler*/
Expand All @@ -218,7 +218,7 @@ void start_service_for_n_times(int sockfd, ctrl_proto proto, const char* handler

/*Close connection*/
if(proto == TCP){
close(newsockfd);
close(newsockfd);
}
/*Close connection*/
}
Expand All @@ -243,8 +243,8 @@ void start_service(int sockfd, ctrl_proto proto, const char* handler_name[], uin
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
}else if(proto == UDP){
newsockfd = sockfd;
}else{
printf("Protocol is not supported\n");
}else{
printf("Protocol is not supported\n");
return;
}
if (newsockfd < 0) {printf("ERROR on accept\n");return;}
Expand All @@ -253,8 +253,8 @@ void start_service(int sockfd, ctrl_proto proto, const char* handler_name[], uin

/*First recv the request and look up the handler table*/
req = recv_request(conn);
handler_id = look_up_handler_table((char*)req, handler_name, handler_num);
handler_id = look_up_handler_table((char*)req, handler_name, handler_num);

free(req);
if(handler_id == handler_num){printf("Operation is not supported!\n"); return;}
/*Recv meta control data and pick up the correct handler*/
Expand All @@ -265,7 +265,7 @@ void start_service(int sockfd, ctrl_proto proto, const char* handler_name[], uin

/*Close connection*/
if(proto == TCP){
close(newsockfd);
close(newsockfd);
}
/*Close connection*/
}
Expand Down Expand Up @@ -304,7 +304,7 @@ static inline void write_to_sock(int sock, ctrl_proto proto, uint8_t* buffer, ui
}else if(proto == UDP){
if((bytes_length - bytes_written) < UDP_TRANS_SIZE) { n = bytes_length - bytes_written; }
else { n = UDP_TRANS_SIZE; }
if(sendto(sock, buffer + bytes_written, n, 0, to, tolen)< 0)
if(sendto(sock, buffer + bytes_written, n, 0, to, tolen)< 0)
printf("ERROR writing socket\n");
}else{printf("Protocol is not supported\n"); return;}
bytes_written += n;
Expand All @@ -315,12 +315,18 @@ static inline void write_to_sock(int sock, ctrl_proto proto, uint8_t* buffer, ui
static inline service_conn* new_service_conn(int sockfd, ctrl_proto proto, const char *dest_ip, struct sockaddr_in* addr, int portno){
#elif IPV6_TASK/*IPV4_TASK*/
static inline service_conn* new_service_conn(int sockfd, ctrl_proto proto, const char *dest_ip, struct sockaddr_in6* addr, int portno){
#endif/*IPV4_TASK*/
service_conn* conn = (service_conn*)malloc(sizeof(service_conn));
#endif/*IPV4_TASK*/
service_conn* conn = (service_conn*)malloc(sizeof(service_conn));
conn->sockfd = sockfd;
conn->proto = proto;
if(addr!=NULL){
conn->serv_addr_ptr = addr;
#if IPV4_TASK
conn->serv_addr_ptr = (struct sockaddr_in*)malloc(sizeof(struct sockaddr_in));
memcpy(conn->serv_addr_ptr, addr, sizeof(struct sockaddr_in));
#elif IPV6_TASK/*IPV4_TASK*/
conn->serv_addr_ptr = (struct sockaddr_in6*)malloc(sizeof(struct sockaddr_in6));
memcpy(conn->serv_addr_ptr, addr, sizeof(struct sockaddr_in6));
#endif/*IPV4_TASK*/
}else{
#if IPV4_TASK
conn->serv_addr_ptr = (struct sockaddr_in*)malloc(sizeof(struct sockaddr_in));
Expand All @@ -332,7 +338,122 @@ static inline service_conn* new_service_conn(int sockfd, ctrl_proto proto, const
conn->serv_addr_ptr->sin6_family = AF_INET6;
conn->serv_addr_ptr->sin6_port = htons(portno);
inet_pton(AF_INET6, dest_ip, &(conn->serv_addr_ptr->sin6_addr));
#endif/*IPV4_TASK*/
#endif/*IPV4_TASK*/
}
return conn;
return conn;
}




// FIXME: its bad to start a thread for every new client. this is the most convenient solution for now, but a real one
// would be async handling with partial serialization.

#include "thread_util.h"


typedef struct
{
int sockfd;
service_conn *conn;
const char **handler_name;
uint32_t handler_num;
void *(**handlers)(void *, void *);
int (*cb_push)(void *, void *);
void *user_args;
} push_service_args;

static void push_service_worker(void *arg)
{
push_service_args *targs = (push_service_args *)arg;

char ip_addr[ADDRSTRLEN];
inet_ntop(targs->conn->serv_addr_ptr->sin_family, &(targs->conn->serv_addr_ptr->sin_addr), ip_addr, ADDRSTRLEN);
int32_t cli_id = get_client_id(ip_addr, (void*)targs->user_args);
printf(">>> new conn open with %d: %s\n", cli_id, ip_addr);

while (1)
{
printf(">>> pushing data...\n");
int stop = targs->cb_push(targs->conn, targs->user_args);
if (stop)
{
break;
}

printf(">>> pushed.. waiting for results\n");
uint8_t *req = recv_request(targs->conn);
uint32_t handler_id = look_up_handler_table((char *)req, targs->handler_name, targs->handler_num);
free(req);

if (handler_id > targs->handler_num)
{
printf("Operation is not supported!\n");
return;
}

targs->handlers[handler_id](targs->conn, targs->user_args);
}

close_service_connection(targs->conn);
free(arg);
}


void start_parallel_push_service(int sockfd, ctrl_proto proto, int (*cb_push)(void *, void *),
const char *handler_name[], uint32_t handler_num, void *(*handlers[])(void *, void *),
void *arg)
{
#if IPV4_TASK
struct sockaddr_in cli_addr;
#elif IPV6_TASK /*IPV4_TASK*/
struct sockaddr_in6 cli_addr;
#endif /*IPV4_TASK*/
socklen_t clilen = sizeof(cli_addr);
service_conn *conn;

while (1)
{
/*Accept incoming connection*/
int newsockfd;
if (proto == TCP)
{
newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen);
}
else if (proto == UDP)
{
newsockfd = sockfd;
}
else
{
printf("Protocol is not supported\n");
return;
}
if (newsockfd < 0)
{
printf("ERROR on accept\n");
return;
}
conn = new_service_conn(newsockfd, proto, NULL, &cli_addr, 0);

char ip_addr[ADDRSTRLEN];
inet_ntop(conn->serv_addr_ptr->sin_family, &(conn->serv_addr_ptr->sin_addr), ip_addr, ADDRSTRLEN);
int32_t cli_id = get_client_id(ip_addr, (void*)arg);
printf(">>> starting new thread for cli %d: %s\n", cli_id, ip_addr);

// Handle client communication in a new thread.
push_service_args *args = malloc(sizeof(*args));
args->sockfd = newsockfd;
args->conn = conn;
args->handler_name = handler_name;
args->handler_num = handler_num;
args->handlers = handlers;
args->cb_push = cb_push;
args->user_args = arg;
sys_thread_t t = sys_thread_new("push_service_worker", push_service_worker, args, 0, 0);
}
/*This should not be called, users must explicitly call void close_service_connection(conn)*/
close(sockfd);

// join all: sys_thread_join
}
5 changes: 5 additions & 0 deletions distriot/src/network_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define START_CTRL 11113 //Control the start and stop of a service
#define RESULT_COLLECT_PORT 11114 //Service for collecting results
#define WORK_STEAL_PORT 11115 //Service for providing steal data at edge nodes
#define WEIGHT_PART_PORT 11116 //Service for weight partitioning communication

#define IPV4_TASK 1
#define IPV6_TASK !(IPV4_TASK)
Expand Down Expand Up @@ -54,6 +55,10 @@ void start_service_for_n_times(int sockfd, ctrl_proto proto, const char* handler
void start_service(int sockfd, ctrl_proto proto, const char* handler_name[], uint32_t handler_num, void* (*handlers[])(void*, void*), void* arg);
void close_service(int sockfd);

void start_parallel_push_service(int sockfd, ctrl_proto proto, int (*cb_push)(void *, void *),
const char *handler_name[], uint32_t handler_num, void *(*handlers[])(void *, void *),
void *arg);

/*Data exchanging API on both sides*/
blob* recv_data(service_conn* conn);
void send_data(blob *temp, service_conn* conn);
Expand Down
23 changes: 12 additions & 11 deletions distriot/src/thread_safe_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ static queue_node* new_node_and_copy_item(blob* item)
temp->item = new_blob_and_copy_data(item->id, item->size, item->data);
copy_blob_meta(temp->item, item);
temp->next = NULL;
return temp;
return temp;
}

thread_safe_queue *new_queue(uint32_t capacity)
Expand All @@ -22,8 +22,9 @@ thread_safe_queue *new_queue(uint32_t capacity)
return q;
}

void enqueue(thread_safe_queue *queue, blob* item)
uint32_t enqueue(thread_safe_queue *queue, blob* item)
{
uint32_t num_nodes;
uint8_t first;
queue_node *temp = new_node_and_copy_item(item);
sys_arch_sem_wait(&(queue->mutex), 0);
Expand All @@ -44,13 +45,15 @@ void enqueue(thread_safe_queue *queue, blob* item)
first = 0;
}
queue->number_of_node++;
num_nodes = queue->number_of_node;

if (first) {
sys_sem_signal(&queue->not_empty);
}

sys_sem_signal(&queue->mutex);

return num_nodes;
}

blob* dequeue(thread_safe_queue *queue)
Expand All @@ -61,13 +64,13 @@ blob* dequeue(thread_safe_queue *queue)
/* We block while waiting for a mail to arrive in the mailbox. */
sys_arch_sem_wait(&queue->not_empty, 0);
sys_arch_sem_wait(&queue->mutex, 0);
}
}

queue_node *temp = queue->head;
queue->head = queue->head->next;
if (queue->head == NULL)
queue->tail = NULL;
blob* item = temp->item;
blob* item = temp->item;
free(temp);
queue->number_of_node--;

Expand All @@ -94,7 +97,7 @@ void print_queue_by_id(thread_safe_queue *queue){
}
printf("%d, ", cur->item->id);
cur = cur->next;
}
}

sys_sem_signal(&queue->mutex);
return;
Expand All @@ -119,7 +122,7 @@ void remove_by_id(thread_safe_queue *queue, int32_t id){
sys_sem_signal(&queue->mutex);
return;
}

while (cur != NULL) {
if(cur->item->id == id){
prev->next = cur->next;
Expand All @@ -134,7 +137,7 @@ void remove_by_id(thread_safe_queue *queue, int32_t id){
}
prev = cur;
cur = cur->next;
}
}

sys_sem_signal(&queue->mutex);
return;
Expand All @@ -146,13 +149,13 @@ blob* try_dequeue(thread_safe_queue *queue)
while (queue->head == NULL) {
sys_sem_signal(&queue->mutex);
return NULL;
}
}

queue_node *temp = queue->head;
queue->head = queue->head->next;
if (queue->head == NULL)
queue->tail = NULL;
blob* item = temp->item;
blob* item = temp->item;
free(temp);
queue->number_of_node--;

Expand All @@ -176,5 +179,3 @@ void free_queue(thread_safe_queue *queue)
free(queue);
}
}


2 changes: 1 addition & 1 deletion distriot/src/thread_safe_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ typedef struct ts_queue {
} thread_safe_queue;

thread_safe_queue *new_queue(uint32_t capacity);
void enqueue(thread_safe_queue *q, blob* item);
uint32_t enqueue(thread_safe_queue *q, blob* item);
blob* dequeue(thread_safe_queue *q);
void remove_by_id(thread_safe_queue *q, int32_t id);
void print_queue_by_id(thread_safe_queue *queue);
Expand Down
Loading

0 comments on commit c21498e

Please sign in to comment.