Skip to content

Commit

Permalink
fix: updated for closing stream more safety (for pass tests)
Browse files Browse the repository at this point in the history
another try
  • Loading branch information
JS-AK committed Nov 8, 2024
1 parent fa877df commit 20839e9
Showing 1 changed file with 15 additions and 23 deletions.
38 changes: 15 additions & 23 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ class Transit {
this.pendingReqStreams.forEach(({ sender, stream }, id) => {
if (sender === nodeID) {
this.pendingReqStreams.delete(id);
this._destroyStreamIfPossible(stream, nodeID);
this._destroyStreamIfPossible(stream, `Stream closed by ${nodeID}`);
}
});

Expand Down Expand Up @@ -1068,7 +1068,7 @@ class Transit {
this.pendingResStreams.delete(id);

if (stream) {
this._closeStreamIfPossible(stream);
this._destroyStreamIfPossible(stream);
}
}

Expand All @@ -1085,7 +1085,7 @@ class Transit {
this.pendingResStreams.delete(id);

if (stream) {
this._destroyStreamIfPossible(stream, origin);
this._destroyStreamIfPossible(stream, `Stream closed by ${origin}`);
}
}

Expand All @@ -1102,7 +1102,7 @@ class Transit {
const pass = reqStream ? reqStream.stream : undefined;

if (pass) {
this._closeStreamIfPossible(pass);
this._destroyStreamIfPossible(pass);
}
}

Expand All @@ -1120,33 +1120,25 @@ class Transit {
const pass = reqStream ? reqStream.stream : undefined;

if (pass) {
this._destroyStreamIfPossible(pass, origin);
this._destroyStreamIfPossible(pass, `Stream closed by ${origin}`);
}
}

/**
* Internal method to destroy a stream (if not already destroyed) with error.
* Internal method to destroy a stream if it is not already destroyed.
*
* @param {Stream} stream The stream to destroy.
* @param {String} [origin] Optional origin of the request.
* @param {DuplexStream} stream - The stream to be destroyed.
* @param {String} [errorMessage] - The error message to be used when destroying.
* If not provided, the stream will be destroyed without an error.
* @memberof Transit
*/
_destroyStreamIfPossible(stream, origin) {
_destroyStreamIfPossible(stream, errorMessage) {
if (!stream.destroyed && stream.destroy) {
const message = origin ? `Stream closed by ${origin}` : "Stream internal error";
stream.destroy(new Error(message));
}
}

/**
* Internal method to close a stream (if not already closed) without error.
*
* @param {Stream} stream The stream to close.
* @memberof Transit
*/
_closeStreamIfPossible(stream) {
if (!stream.readableEnded && stream.push) {
stream.push(null);
if (errorMessage) {
stream.destroy(new Error(errorMessage));
} else {
stream.destroy();
}
}
}

Expand Down

0 comments on commit 20839e9

Please sign in to comment.