Skip to content

Commit

Permalink
Improve data transfer reliability with remote clients
Browse files Browse the repository at this point in the history
  • Loading branch information
mubes committed Aug 23, 2023
1 parent e7585dd commit d80b1c4
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 54 deletions.
4 changes: 4 additions & 0 deletions Src/itmfifos.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
#include "itmfifos.h"
#include "msgDecoder.h"

#ifndef O_BINARY
#define O_BINARY 0
#endif

#define MAX_STRING_LENGTH (100) /* Maximum length that will be output from a fifo for a single event */

struct runThreadParams /* Structure for parameters passed to a software task thread */
Expand Down
96 changes: 45 additions & 51 deletions Src/nwclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
#define SHARED_BUFFER_SIZE (8*TRANSFER_SIZE)

/* Tests for a hung client */
#define MAX_CLIENT_TESTS (10)
#define CLIENT_TEST_INTERVAL_US (100000)
#define MAX_CLIENT_TESTS (1000)
#define CLIENT_TEST_INTERVAL_US (1000)

/* Master structure for the set of nwclients */
struct nwclientsHandle
Expand All @@ -59,7 +59,6 @@ struct nwclientsHandle
int wp; /* Next write to shared buffer */
uint8_t sharedBuffer[SHARED_BUFFER_SIZE]; /* Data waiting to be sent to the clients */

int totalSent; /* Total sent to all clients */
int sockfd; /* The socket for the inferior */
pthread_t ipThread; /* The listening thread for n/w clients */
bool finish; /* Its time to leave */
Expand All @@ -79,7 +78,6 @@ struct nwClient
pthread_mutex_t dataAvailable_m; /* Mutex for counting data for clients */

/* Parameters used to run the client */
int totalSent; /* Total sent by this client */
int portNo; /* Port of connection */
int rp; /* Current read pointer in data stream */
};
Expand Down Expand Up @@ -159,9 +157,6 @@ static void *_client( void *args )
uint8_t *p;
ssize_t sent = 0;

/* Our starting total sent is the system total sent */
c->totalSent = c->parent->totalSent;

while ( !c->finish )
{
/* Spin until we're told there's something to send along */
Expand All @@ -172,7 +167,6 @@ static void *_client( void *args )
/* Data to send is either to the end of the ring buffer or to the wp, whichever is first */
readDataLen = ( c->rp < c->parent->wp ) ? c->parent->wp - c->rp : SHARED_BUFFER_SIZE - c->rp;
p = &( c->parent->sharedBuffer[c->rp] );
c->rp = ( c->rp + readDataLen ) % SHARED_BUFFER_SIZE;

while ( ( readDataLen > 0 ) && ( sent >= 0 ) )
{
Expand All @@ -191,9 +185,9 @@ static void *_client( void *args )
}
}

c->rp = ( c->rp + sent ) % SHARED_BUFFER_SIZE;
p += sent;
readDataLen -= sent;
c->totalSent += sent;
}

if ( c->finish || readDataLen )
Expand Down Expand Up @@ -254,11 +248,11 @@ static void *_listenTask( void *arg )
client->rp = h->wp;

#ifdef WIN32
/* Set port nonblocking since it can't be done per-call in Windows */
u_long iMode=1;
ioctlsocket(newsockfd,FIONBIO,&iMode);
/* Set port nonblocking since it can't be done per-call in Windows */
u_long iMode = 1;
ioctlsocket( newsockfd, FIONBIO, &iMode );
#endif

if ( pthread_mutex_init( &client->dataAvailable_m, NULL ) != 0 )
{
genericsExit( -1, "Failed to establish mutex for condition variable" EOL );
Expand Down Expand Up @@ -319,7 +313,7 @@ bool _clientsGood( struct nwclientsHandle *h, bool dumpClient )

while ( n )
{
if ( n->totalSent != h->totalSent )
if ( n->rp != h->wp )
{
if ( !dumpClient )
{
Expand All @@ -328,10 +322,13 @@ bool _clientsGood( struct nwclientsHandle *h, bool dumpClient )
}
else
{
genericsReport( V_ERROR, "Unresponsive client dropped %d vs %d (%d)" EOL, n->totalSent, n->parent->totalSent, n->parent->totalSent - n->totalSent );
/* Get rid of the unresponsive client */
n->finish = true;
close( n->portNo );
if ( !n->finish )
{
genericsReport( V_ERROR, "Unresponsive client dropped" EOL );
/* Get rid of the unresponsive client */
n->finish = true;
close( n->portNo );
}

/* This is safe 'cos the list is locked */
n = n->nextClient;
Expand Down Expand Up @@ -359,44 +356,13 @@ void nwclientSend( struct nwclientsHandle *h, uint32_t len, uint8_t *ipbuffer )
const struct timespec ts = {.tv_sec = 1, .tv_nsec = 0};
int numTests;

assert( len < SHARED_BUFFER_SIZE );

int newWp = ( h->wp + len );
int toEnd = ( newWp > SHARED_BUFFER_SIZE ) ? SHARED_BUFFER_SIZE - h->wp : len;
int fromStart = len - toEnd;

if ( fromStart )
{
/* The buffer wrapped around. This is a good time to check that the clients are keeping up */
for ( numTests = 0; numTests < MAX_CLIENT_TESTS; numTests++ )
{
if ( _clientsGood( h, false ) )
{
break;
}

/* Wait before trying again */
usleep( CLIENT_TEST_INTERVAL_US );
}

/* If that didn't work then kill the one that isn't behaving */
if ( numTests == MAX_CLIENT_TESTS )
{
_clientsGood( h, true );
}
}

h->totalSent += len;

/* Copy the received data into the shared buffer */
/* Copy the first (or next, if we're recursing) part of the received data into the shared buffer */
memcpy( &h->sharedBuffer[h->wp], ipbuffer, toEnd );

if ( fromStart )
{
memcpy( h->sharedBuffer, &ipbuffer[toEnd], fromStart );
}

h->wp = newWp % SHARED_BUFFER_SIZE;
h->wp = ( h->wp + toEnd ) % SHARED_BUFFER_SIZE;

if ( !h->finish )
{
Expand All @@ -416,7 +382,35 @@ void nwclientSend( struct nwclientsHandle *h, uint32_t len, uint8_t *ipbuffer )

pthread_mutex_unlock( &h->clientList );
}

/* If the buffer wrapped around this would be a good time to check that the clients are keeping up */
if ( !h->wp )
{
for ( numTests = 0; numTests < MAX_CLIENT_TESTS; numTests++ )
{
if ( _clientsGood( h, false ) )
{
break;
}

/* Wait before trying again */
usleep( CLIENT_TEST_INTERVAL_US );
}

/* If that didn't work then kill the one that isn't behaving... _clientsGood will do that for us */
if ( MAX_CLIENT_TESTS == numTests )
{
_clientsGood( h, true );
}

/* If the buffer has wrapped arond then it's possible there are still some data to send */
if ( toEnd != len )
{
nwclientSend( h, len - toEnd, &ipbuffer[toEnd] );
}
}
}

// ====================================================================================================
struct nwclientsHandle *nwclientStart( int port )

Expand Down
10 changes: 7 additions & 3 deletions Src/orbuculum.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
#include "orbtraceIf.h"
#include "stream.h"

#ifndef O_BINARY
#define O_BINARY 0
#endif

/* How many transfer buffers from the source to allocate */
#define NUM_RAW_BLOCKS (32)

Expand Down Expand Up @@ -840,7 +844,7 @@ static void *_processBlocksQueue( void *params )
{
pthread_cond_wait( &r->dataForClients, &r->dataForClients_m );

while( r->rp != r->wp )
while ( r->rp != r->wp )
{
_processBlock( r, r->rawBlock[r->rp].fillLevel, r->rawBlock[r->rp].buffer );
r->rp = ( r->rp + 1 ) % NUM_RAW_BLOCKS;
Expand Down Expand Up @@ -1261,7 +1265,7 @@ static int _serialFeeder( struct RunTime *r )
static int _fileFeeder( struct RunTime *r )

{
if ( ( r->f = open( r->options->file, O_RDONLY | O_BINARY) ) < 0 )
if ( ( r->f = open( r->options->file, O_RDONLY | O_BINARY ) ) < 0 )
{
genericsExit( -4, "Can't open file %s" EOL, r->options->file );
}
Expand All @@ -1287,7 +1291,7 @@ static int _fileFeeder( struct RunTime *r )
}

r->wp = nwp;

if ( !rxBlock->fillLevel )
{
if ( r->options->fileTerminate )
Expand Down

0 comments on commit d80b1c4

Please sign in to comment.