Skip to content

Commit

Permalink
fix: 'socket hang up' errors (again) (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
rentallect authored Jan 8, 2024
1 parent 7be009f commit b5bb462
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/channel/wasm-tls-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ import {Mutex, withTimeout, Semaphore} from 'async-mutex';
async handshake() {

this._zitiContext.logger.trace(`ZitiWASMTLSConnection.handshake(): fd[${this.wasmFD}] calling ssl_do_handshake()` );
let result = await this._zitiContext.ssl_do_handshake( this._wasmInstance, this._SSL );
let result = await this._zitiContext.ssl_do_handshake( false, this.wasmFD, this._wasmInstance, this._SSL );
this._zitiContext.logger.trace(`ZitiWASMTLSConnection.handshake(): fd[${this.wasmFD}] conn[${this.wasmFD}] back from ssl_do_handshake() result[${result}] (now awaiting cb)`);
}

Expand Down
24 changes: 19 additions & 5 deletions src/context/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ class ZitiContext extends EventEmitter {

this._connectMutexWithTimeout = withTimeout(new Mutex(), 30 * 1000);

this._tlsHandshakeLock = withTimeout(new Mutex(), 5 * 1000, new Error('timeout on _tlsHandshakeLock'));
this._tlsHandshakeLock = withTimeout(new Mutex(), 30 * 1000, new Error('timeout on _tlsHandshakeLock'));

this._fetchSemaphore = new Semaphore( 8 );
// this._fetchSemaphore = new Semaphore( 1 );
this._fetchSemaphoreHTTP = new Semaphore( 8 );

//TEMP: we constrain HTTP requests that travel over nestedTLS to one-at-a-time for the moment.
// This will be removed as soon as I fix the TLS protocol collision issue that manifests
// when multiple HTTP requests are initiated simultaneously :(
this._fetchSemaphoreHTTPS = new Semaphore( 1 );

this._pkey = null;
this._privateKeyPEM = null;
Expand Down Expand Up @@ -580,7 +584,11 @@ class ZitiContext extends EventEmitter {
/**
*
*/
async ssl_do_handshake(wasmInstance, ssl) {
async ssl_do_handshake(useLock, fd, wasmInstance, ssl) {

if (useLock) {
await this.acquireTLSHandshakeLock(fd);
}

this.logger.trace('ZitiContext.ssl_do_handshake() entered');

Expand Down Expand Up @@ -2038,7 +2046,13 @@ class ZitiContext extends EventEmitter {

let self = this;

const [value, release] = await self._fetchSemaphore.acquire();
let value, release;

if (isEqual(opts.serviceScheme, 'https')) {
[value, release] = await self._fetchSemaphoreHTTPS.acquire();
} else {
[value, release] = await self._fetchSemaphoreHTTP.acquire();
}

let ret;

Expand Down
10 changes: 9 additions & 1 deletion src/http/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,6 @@ function statusIsInformational(status) {
function parserOnIncomingClient(res, shouldKeepAlive) {
const socket = this.socket;
const req = socket._httpMessage;
// console.log(`parserOnIncomingClient() entered req.path[${req.path}] res: `, res);

// debug('AGENT incoming response!');

Expand Down Expand Up @@ -599,6 +598,15 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
if (req.aborted || !req.emit('response', res))
res._dump();

if (socket.innerTLSSocket) {
socket.innerTLSSocket._zitiContext.logger.trace(`parserOnIncomingClient() fd[${socket.innerTLSSocket.wasmFD}] req.path[${req.path}] _closeEventPending[${socket.innerTLSSocket._closeEventPending}]`);
if (socket.innerTLSSocket._closeEventPending) {
setTimeout((socket) => {
socket.innerTLSSocket.emit('close', undefined);
}, 10, socket)
}
}

if (method === 'HEAD')
return 1; // Skip body but don't treat as Upgrade.

Expand Down
36 changes: 20 additions & 16 deletions src/http/ziti-inner-tls-socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class ZitiInnerTLSSocket extends EventEmitter {

this._tlsReadActive = false;

this._closeEventPending = false;

this.pendingWriteArray = new Uint8Array(0)

}
Expand Down Expand Up @@ -200,6 +202,8 @@ class ZitiInnerTLSSocket extends EventEmitter {
*/
async create() {

await this._zitiContext.acquireTLSHandshakeLock(this.getWASMFD());

this._wasmInstance = await this._zitiContext.getWASMInstance();

this._sslContext = await this._zitiContext.ssl_CTX_new( this._wasmInstance );
Expand All @@ -220,7 +224,7 @@ class ZitiInnerTLSSocket extends EventEmitter {

this.handshake();

let success = await this.awaitTLSHandshakeComplete( 5000 ).catch((error) => {
let success = await this.awaitTLSHandshakeComplete( 30000 ).catch((error) => {

// Let any listeners know the attempt to complete a nestedTLS handshake has timed out,
// which is possibly a condition where the Service is misconfigured, and/or is not really
Expand All @@ -237,7 +241,10 @@ class ZitiInnerTLSSocket extends EventEmitter {
});

if (success) {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed`);
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed pause start`);
await this._zitiContext.delay(500); // allow the 'SSL negotiation finished successfully' work to complete
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed pause end`);
this._zitiContext.releaseTLSHandshakeLock(this.getWASMFD());
}
}

Expand Down Expand Up @@ -277,7 +284,7 @@ class ZitiInnerTLSSocket extends EventEmitter {
*/
async handshake() {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.handshake(): fd[${this.wasmFD}] calling ssl_do_handshake()` );
let result = await this._zitiContext.ssl_do_handshake( this._wasmInstance, this._SSL );
let result = await this._zitiContext.ssl_do_handshake( false, this.wasmFD, this._wasmInstance, this._SSL );
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.handshake(): fd[${this.wasmFD}] back from ssl_do_handshake(): result[${result}] (now awaiting cb)`);
}

Expand All @@ -286,7 +293,7 @@ class ZitiInnerTLSSocket extends EventEmitter {
*/
async isConnected() {

this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() entered: fd[${this.wasmFD}] connected[${this._connected}]`);
// this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() entered: fd[${this.wasmFD}] connected[${this._connected}]`);

await this._isConnectedMutex.runExclusive( async () => {

Expand All @@ -295,14 +302,14 @@ class ZitiInnerTLSSocket extends EventEmitter {
// Ask the SSL if its handshake has completed yet
let _connected = this._zitiContext.ssl_is_init_finished(this._wasmInstance, this._SSL);

this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() ssl_is_init_finished() result: SSL[${this._SSL}] fd[${this.wasmFD}] connected[${_connected}]`);
// this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() ssl_is_init_finished() result: SSL[${this._SSL}] fd[${this.wasmFD}] connected[${_connected}]`);

// If SSL indicates handshake has completed, let's delay a smidge, and allow the WASM mTLS ciphersuite-exchange to complete,
// before we turn loose any writes to the connection
if (_connected) {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() pausing fd[${this.wasmFD}]`);
// this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() pausing fd[${this.wasmFD}]`);
await this._zitiContext.delay(500);
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() resuming fd[${this.wasmFD}]`);
// this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() resuming fd[${this.wasmFD}]`);
this._connected = true;
}
}
Expand Down Expand Up @@ -424,16 +431,13 @@ class ZitiInnerTLSSocket extends EventEmitter {

// If the TLS handshake has completed, and we get a zero-length buffer...
if (isConnected) {
// ...then emit the 'close' event (...after slight delay)

this._zitiContext.logger.trace(`ZitiInnerTLSSocket.process() fd[${this.wasmFD}] pausing before emitting 'close' event`);
// ...then indicate we have a pending 'close' event. The 'close' event will be emitted by the ClientRequest
// when it runs and sees the need to emit the event. We need to defer/delay the emission of the 'close'
// event because it is possible that the data for this connection hasn't completed the decrypt flow yet,
// and thus the HTTP Response pqrsing logic hasn't run yet, and if we issue a 'close' before then, the
// HTTP Response parsing logic will interpret the close as a 'socket hang up' error.

setTimeout((self) => {

self._zitiContext.logger.trace(`ZitiInnerTLSSocket.process() fd[${self.wasmFD}] emitting 'close' event after pause`);
self.emit('close', undefined);

}, 1000, this)
this._closeEventPending = true;

}

Expand Down

0 comments on commit b5bb462

Please sign in to comment.