diff --git a/Src/itmfifos.c b/Src/itmfifos.c index ea8ccc2f..9dada702 100644 --- a/Src/itmfifos.c +++ b/Src/itmfifos.c @@ -25,6 +25,10 @@ #include "itmfifos.h" #include "msgDecoder.h" +#ifndef O_BINARY + #define O_BINARY 0 +#endif + #define MAX_STRING_LENGTH (100) /* Maximum length that will be output from a fifo for a single event */ struct runThreadParams /* Structure for parameters passed to a software task thread */ diff --git a/Src/nwclient.c b/Src/nwclient.c index f5dbc265..ddf70554 100644 --- a/Src/nwclient.c +++ b/Src/nwclient.c @@ -46,8 +46,8 @@ #define SHARED_BUFFER_SIZE (8*TRANSFER_SIZE) /* Tests for a hung client */ -#define MAX_CLIENT_TESTS (10) -#define CLIENT_TEST_INTERVAL_US (100000) +#define MAX_CLIENT_TESTS (1000) +#define CLIENT_TEST_INTERVAL_US (1000) /* Master structure for the set of nwclients */ struct nwclientsHandle @@ -59,7 +59,6 @@ struct nwclientsHandle int wp; /* Next write to shared buffer */ uint8_t sharedBuffer[SHARED_BUFFER_SIZE]; /* Data waiting to be sent to the clients */ - int totalSent; /* Total sent to all clients */ int sockfd; /* The socket for the inferior */ pthread_t ipThread; /* The listening thread for n/w clients */ bool finish; /* Its time to leave */ @@ -79,7 +78,6 @@ struct nwClient pthread_mutex_t dataAvailable_m; /* Mutex for counting data for clients */ /* Parameters used to run the client */ - int totalSent; /* Total sent by this client */ int portNo; /* Port of connection */ int rp; /* Current read pointer in data stream */ }; @@ -159,9 +157,6 @@ static void *_client( void *args ) uint8_t *p; ssize_t sent = 0; - /* Our starting total sent is the system total sent */ - c->totalSent = c->parent->totalSent; - while ( !c->finish ) { /* Spin until we're told there's something to send along */ @@ -172,7 +167,6 @@ static void *_client( void *args ) /* Data to send is either to the end of the ring buffer or to the wp, whichever is first */ readDataLen = ( c->rp < c->parent->wp ) ? c->parent->wp - c->rp : SHARED_BUFFER_SIZE - c->rp; p = &( c->parent->sharedBuffer[c->rp] ); - c->rp = ( c->rp + readDataLen ) % SHARED_BUFFER_SIZE; while ( ( readDataLen > 0 ) && ( sent >= 0 ) ) { @@ -191,9 +185,9 @@ static void *_client( void *args ) } } + c->rp = ( c->rp + sent ) % SHARED_BUFFER_SIZE; p += sent; readDataLen -= sent; - c->totalSent += sent; } if ( c->finish || readDataLen ) @@ -254,11 +248,11 @@ static void *_listenTask( void *arg ) client->rp = h->wp; #ifdef WIN32 - /* Set port nonblocking since it can't be done per-call in Windows */ - u_long iMode=1; - ioctlsocket(newsockfd,FIONBIO,&iMode); + /* Set port nonblocking since it can't be done per-call in Windows */ + u_long iMode = 1; + ioctlsocket( newsockfd, FIONBIO, &iMode ); #endif - + if ( pthread_mutex_init( &client->dataAvailable_m, NULL ) != 0 ) { genericsExit( -1, "Failed to establish mutex for condition variable" EOL ); @@ -319,7 +313,7 @@ bool _clientsGood( struct nwclientsHandle *h, bool dumpClient ) while ( n ) { - if ( n->totalSent != h->totalSent ) + if ( n->rp != h->wp ) { if ( !dumpClient ) { @@ -328,10 +322,13 @@ bool _clientsGood( struct nwclientsHandle *h, bool dumpClient ) } else { - genericsReport( V_ERROR, "Unresponsive client dropped %d vs %d (%d)" EOL, n->totalSent, n->parent->totalSent, n->parent->totalSent - n->totalSent ); - /* Get rid of the unresponsive client */ - n->finish = true; - close( n->portNo ); + if ( !n->finish ) + { + genericsReport( V_ERROR, "Unresponsive client dropped" EOL ); + /* Get rid of the unresponsive client */ + n->finish = true; + close( n->portNo ); + } /* This is safe 'cos the list is locked */ n = n->nextClient; @@ -359,44 +356,13 @@ void nwclientSend( struct nwclientsHandle *h, uint32_t len, uint8_t *ipbuffer ) const struct timespec ts = {.tv_sec = 1, .tv_nsec = 0}; int numTests; - assert( len < SHARED_BUFFER_SIZE ); int newWp = ( h->wp + len ); int toEnd = ( newWp > SHARED_BUFFER_SIZE ) ? SHARED_BUFFER_SIZE - h->wp : len; - int fromStart = len - toEnd; - - if ( fromStart ) - { - /* The buffer wrapped around. This is a good time to check that the clients are keeping up */ - for ( numTests = 0; numTests < MAX_CLIENT_TESTS; numTests++ ) - { - if ( _clientsGood( h, false ) ) - { - break; - } - /* Wait before trying again */ - usleep( CLIENT_TEST_INTERVAL_US ); - } - - /* If that didn't work then kill the one that isn't behaving */ - if ( numTests == MAX_CLIENT_TESTS ) - { - _clientsGood( h, true ); - } - } - - h->totalSent += len; - - /* Copy the received data into the shared buffer */ + /* Copy the first (or next, if we're recursing) part of the received data into the shared buffer */ memcpy( &h->sharedBuffer[h->wp], ipbuffer, toEnd ); - - if ( fromStart ) - { - memcpy( h->sharedBuffer, &ipbuffer[toEnd], fromStart ); - } - - h->wp = newWp % SHARED_BUFFER_SIZE; + h->wp = ( h->wp + toEnd ) % SHARED_BUFFER_SIZE; if ( !h->finish ) { @@ -416,7 +382,35 @@ void nwclientSend( struct nwclientsHandle *h, uint32_t len, uint8_t *ipbuffer ) pthread_mutex_unlock( &h->clientList ); } + + /* If the buffer wrapped around this would be a good time to check that the clients are keeping up */ + if ( !h->wp ) + { + for ( numTests = 0; numTests < MAX_CLIENT_TESTS; numTests++ ) + { + if ( _clientsGood( h, false ) ) + { + break; + } + + /* Wait before trying again */ + usleep( CLIENT_TEST_INTERVAL_US ); + } + + /* If that didn't work then kill the one that isn't behaving... _clientsGood will do that for us */ + if ( MAX_CLIENT_TESTS == numTests ) + { + _clientsGood( h, true ); + } + + /* If the buffer has wrapped arond then it's possible there are still some data to send */ + if ( toEnd != len ) + { + nwclientSend( h, len - toEnd, &ipbuffer[toEnd] ); + } + } } + // ==================================================================================================== struct nwclientsHandle *nwclientStart( int port ) diff --git a/Src/orbuculum.c b/Src/orbuculum.c index fbac6044..5fd66e82 100644 --- a/Src/orbuculum.c +++ b/Src/orbuculum.c @@ -57,6 +57,10 @@ #include "orbtraceIf.h" #include "stream.h" +#ifndef O_BINARY + #define O_BINARY 0 +#endif + /* How many transfer buffers from the source to allocate */ #define NUM_RAW_BLOCKS (32) @@ -840,7 +844,7 @@ static void *_processBlocksQueue( void *params ) { pthread_cond_wait( &r->dataForClients, &r->dataForClients_m ); - while( r->rp != r->wp ) + while ( r->rp != r->wp ) { _processBlock( r, r->rawBlock[r->rp].fillLevel, r->rawBlock[r->rp].buffer ); r->rp = ( r->rp + 1 ) % NUM_RAW_BLOCKS; @@ -1261,7 +1265,7 @@ static int _serialFeeder( struct RunTime *r ) static int _fileFeeder( struct RunTime *r ) { - if ( ( r->f = open( r->options->file, O_RDONLY | O_BINARY) ) < 0 ) + if ( ( r->f = open( r->options->file, O_RDONLY | O_BINARY ) ) < 0 ) { genericsExit( -4, "Can't open file %s" EOL, r->options->file ); } @@ -1287,7 +1291,7 @@ static int _fileFeeder( struct RunTime *r ) } r->wp = nwp; - + if ( !rxBlock->fillLevel ) { if ( r->options->fileTerminate )