Skip to content

Commit

Permalink
Improve handling of clients in relation to data arrival timing
Browse files Browse the repository at this point in the history
  • Loading branch information
mubes committed Aug 22, 2023
1 parent 02dcdd9 commit 81376db
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 6 deletions.
6 changes: 3 additions & 3 deletions Src/itmfifos.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ static void *_runFifo( void *arg )
}
while ( ( written > 0 ) && ( !c->ending ) );

/* Falling out on writen fail means we can re-open the fifo if it overflowed */
/* Falling out on writen fail means we can re-open the fifo if it overflowed */
close( opfile );
}
while ( !c->ending );
Expand Down Expand Up @@ -231,7 +231,7 @@ static void *_runHWFifo( void *arg )
}
while ( ( writeDataLen > 0 ) && ( !c->ending ) );

/* Falling out on writeDataLen fail means we can re-open the fifo if it overflowed */
/* Falling out on writeDataLen fail means we can re-open the fifo if it overflowed */
close( opfile );
}
while ( !c->ending );
Expand Down Expand Up @@ -732,7 +732,7 @@ bool itmfifoCreate( struct itmfifosHandle *f )
f->c[t].params.permafile = f->permafile;
f->c[t].params.c = &f->c[t];

f->c[t].fifoName = ( char * )calloc( strlen( f->c[t].chanName ) + 2 + (f->chanPath ? strlen( f->chanPath ) : 0), 1 );
f->c[t].fifoName = ( char * )calloc( strlen( f->c[t].chanName ) + 2 + ( f->chanPath ? strlen( f->chanPath ) : 0 ), 1 );

if ( f->chanPath )
{
Expand Down
100 changes: 98 additions & 2 deletions Src/nwclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
/* Shared ring buffer for data */
#define SHARED_BUFFER_SIZE (8*TRANSFER_SIZE)

/* Tests for a hung client */
#define MAX_CLIENT_TESTS (10)
#define CLIENT_TEST_INTERVAL_US (100000)

/* Master structure for the set of nwclients */
struct nwclientsHandle

Expand All @@ -54,6 +58,7 @@ struct nwclientsHandle
int wp; /* Next write to shared buffer */
uint8_t sharedBuffer[SHARED_BUFFER_SIZE]; /* Data waiting to be sent to the clients */

int totalSent; /* Total sent to all clients */
int sockfd; /* The socket for the inferior */
pthread_t ipThread; /* The listening thread for n/w clients */
bool finish; /* Its time to leave */
Expand All @@ -73,6 +78,7 @@ struct nwClient
pthread_mutex_t dataAvailable_m; /* Mutex for counting data for clients */

/* Parameters used to run the client */
int totalSent; /* Total sent by this client */
int portNo; /* Port of connection */
int rp; /* Current read pointer in data stream */
};
Expand Down Expand Up @@ -152,6 +158,9 @@ static void *_client( void *args )
uint8_t *p;
ssize_t sent = 0;

/* Our starting total sent is the system total sent */
c->totalSent = c->parent->totalSent;

while ( !c->finish )
{
/* Spin until we're told there's something to send along */
Expand All @@ -166,9 +175,24 @@ static void *_client( void *args )

while ( ( readDataLen > 0 ) && ( sent >= 0 ) )
{
sent = send( c->portNo, ( const void * )p, readDataLen, MSG_NOSIGNAL );
for ( int numTests = 0; numTests < MAX_CLIENT_TESTS; numTests++ )
{
sent = send( c->portNo, ( const void * )p, readDataLen, MSG_DONTWAIT | MSG_NOSIGNAL );

if ( sent > 0 )
{
break;
}
else
{
/* We'll allow a few chances before we give up... */
usleep( CLIENT_TEST_INTERVAL_US );
}
}

p += sent;
readDataLen -= sent;
c->totalSent += sent;
}

if ( c->finish || readDataLen )
Expand Down Expand Up @@ -230,7 +254,7 @@ static void *_listenTask( void *arg )

if ( pthread_mutex_init( &client->dataAvailable_m, NULL ) != 0 )
{
genericsExit( -1, "Failed to establish mutex for condition variablee" EOL );
genericsExit( -1, "Failed to establish mutex for condition variable" EOL );
}

if ( pthread_cond_init( &client->dataAvailable, NULL ) != 0 )
Expand Down Expand Up @@ -271,6 +295,51 @@ static void *_listenTask( void *arg )
return NULL;
}
// ====================================================================================================
bool _clientsGood( struct nwclientsHandle *h, bool dumpClient )

{
const struct timespec ts = {.tv_sec = 1, .tv_nsec = 0};
bool result = true;

/* Check if network clients have transferred their data */
if ( _lock_with_timeout( &h->clientList, &ts ) < 0 )
{
genericsExit( -1, "Failed to acquire mutex" EOL );
}

/* Iterate over all clients to see if they managed to send all their data... */
volatile struct nwClient *n = h->firstClient;

while ( n )
{
if ( n->totalSent != h->totalSent )
{
if ( !dumpClient )
{
result = false;
break;
}
else
{
genericsReport( V_ERROR, "Unresponsive client dropped %d vs %d (%d)" EOL, n->totalSent, n->parent->totalSent, n->parent->totalSent - n->totalSent );
/* Get rid of the unresponsive client */
n->finish = true;
close( n->portNo );

/* This is safe 'cos the list is locked */
n = n->nextClient;
}
}
else
{
n = n->nextClient;
}
}

pthread_mutex_unlock( &h->clientList );
return result;
}
// ====================================================================================================
// ====================================================================================================
// ====================================================================================================
// Externally available routines
Expand All @@ -281,10 +350,37 @@ void nwclientSend( struct nwclientsHandle *h, uint32_t len, uint8_t *ipbuffer )

{
const struct timespec ts = {.tv_sec = 1, .tv_nsec = 0};
int numTests;

assert( len < SHARED_BUFFER_SIZE );

int newWp = ( h->wp + len );
int toEnd = ( newWp > SHARED_BUFFER_SIZE ) ? SHARED_BUFFER_SIZE - h->wp : len;
int fromStart = len - toEnd;

if ( fromStart )
{
/* The buffer wrapped around. This is a good time to check that the clients are keeping up */
for ( numTests = 0; numTests < MAX_CLIENT_TESTS; numTests++ )
{
if ( _clientsGood( h, false ) )
{
break;
}

/* Wait before trying again */
usleep( CLIENT_TEST_INTERVAL_US );
}

/* If that didn't work then kill the one that isn't behaving */
if ( numTests == MAX_CLIENT_TESTS )
{
_clientsGood( h, true );
}
}

h->totalSent += len;

/* Copy the received data into the shared buffer */
memcpy( &h->sharedBuffer[h->wp], ipbuffer, toEnd );

Expand Down
9 changes: 8 additions & 1 deletion Src/orbuculum.c
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,10 @@ static void _TPIUpacketRxed( enum TPIUPumpEvent e, struct TPIUPacket *p, void *p
/* We must have found a match for this at some point, so add it to the queue */
h->strippedBlock->buffer[h->strippedBlock->fillLevel++] = p->packet[g].d;
}
else
{
genericsReport( V_WARN, "No handler for %d" EOL, p->packet[g].s );
}
}

break;
Expand Down Expand Up @@ -1264,6 +1268,9 @@ static int _fileFeeder( struct RunTime *r )

r->conn = true;

/* We will read from the file very quickly, so let's give a chance for clients to connect before we do */
usleep( INTERVAL_1S );

while ( !r->ending )
{
struct dataBlock *rxBlock = &r->rawBlock[r->wp];
Expand All @@ -1290,7 +1297,7 @@ static int _fileFeeder( struct RunTime *r )
/* Spin waiting for buffer space to become available */
while ( nwp == r->rp )
{
usleep( INTERVAL_100MS );
usleep( INTERVAL_1MS );
}

r->wp = nwp;
Expand Down

0 comments on commit 81376db

Please sign in to comment.