Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add websocket session support #24

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "uhttp-exit-app",
"version": "3.4.0",
"version": "4.0.0-a.1",
"description": "u(nlinked)HTTP exit application",
"author": "HOPR Association",
"main": "src/index.ts",
Expand Down Expand Up @@ -31,7 +31,7 @@
"typescript": "^5.4.5"
},
"dependencies": {
"@hoprnet/uhttp-lib": "^3.8.0",
"@hoprnet/uhttp-lib": "^3.9.0-beta.1",
"debug": "^4.3.4",
"sqlite3": "^5.1.7",
"ws": "^8.18.0"
Expand Down
187 changes: 187 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import Net from 'node:net';
import WebSocket from 'ws';
import { version as Version } from '../package.json';
import {
DpApi,
EndpointApi,
ExitNode,
Frame,
NodeApi,
Payload,
Request,
Expand All @@ -30,13 +32,16 @@ const RequestPurgeTimeout = 60e3; // 60 sec
const SetupRelayPeriod = 1e3 * 60 * 15; // 15 min
// reconnect timeout for the websocket after closure
const SocketReconnectTimeout = 3e3; // 3 sec
// restart server after closing
const ServerRestartTimeout = 1e3; // 1 sec
// Time period in which counters for crypto are considered valid
const ValidCounterPeriod = 1e3 * 60 * 60; // 1 hour
// Time period to keep segments for potential retransfer
const ValidResponseSegmentPeriod = 1e3 * 60 * 10; // 10 min

type State = {
socket?: WebSocket;
server?: Net.Server;
privateKey: Uint8Array;
publicKey: Uint8Array;
peerId: string;
Expand All @@ -60,6 +65,7 @@ type Ops = {
discoveryPlatform?: OpsDP;
dbFile: string;
pinnedFetch: typeof globalThis.fetch;
tcpPort: number;
};

type Msg = {
Expand All @@ -72,6 +78,7 @@ async function start(ops: Ops) {
try {
const state = await setup(ops);
setupSocket(state, ops);
setupServer(state, ops);
removeExpiredRequests(state);
removeExpiredSegmentResponses(state);
setupRelays(state, ops);
Expand Down Expand Up @@ -167,6 +174,129 @@ function setupSocket(state: State, ops: Ops) {
state.socket = socket;
}

function setupServer(state: State, ops: Ops) {
const server = Net.createServer(onConnection(state, ops));

server.on('error', (err: Error) => {
log.error('error on server: %o', err);
});

server.on('close', () => {
log.warn('closing server - attempting reconnect');
setTimeout(() => setupServer(state, ops), ServerRestartTimeout);
});

server.listen(ops.tcpPort, () => {
log.verbose('tcp server bound to %d', ops.tcpPort);
});
}

function onConnection(state: State, ops: Ops) {
return function (conn: Net.Socket) {
const recvAt = performance.now();
let requestWrapper: Frame.RequestWrapper;

conn.on('data', function (rawData: Uint8Array) {
const data = new Uint8Array(rawData);
if (requestWrapper) {
Frame.concatData(requestWrapper, data);
} else {
const resWrap = Frame.toRequestFrameWrapper(data);
if (Res.isErr(resWrap)) {
log.warn('discarding received %d bytes: %s', data.length, resWrap.error);
return;
}
requestWrapper = resWrap.res;
}
if (Frame.isComplete(requestWrapper)) {
// requrest complete
onIncomingRequest(state, ops, conn, requestWrapper, recvAt);
}
});

conn.on('end', () => {
log.error('unexpected end of incoming socket connection');
});

conn.on('error', (err) => {
log.error('error on connection: %o', err);
});
};
}

async function onIncomingRequest(
state: State,
ops: Ops,
conn: Net.Socket,
requestWrapper: Frame.RequestWrapper,
recvAt: number,
) {
const resReq = Request.messageToReq({
requestId: requestWrapper.requestId,
message: requestWrapper.data,
exitPeerId: state.peerId,
exitPrivateKey: state.privateKey,
});

if (Res.isErr(resReq)) {
log.error('error unboxing request:', resReq.error);
return;
}

const unboxRequest = resReq.res;
const { reqPayload, session: unboxSession } = unboxRequest;
const sendParams = {
conn,
entryPeerId: requestWrapper.entryId,
requestId: requestWrapper.requestId,
unboxRequest,
};

// check counter
const counter = Number(unboxSession.updatedTS);
const now = Date.now();
const valid = now - ValidCounterPeriod;
if (counter < valid) {
log.info('counter %d outside valid period %d (now: %d)', counter, valid, now);
// counter fail resp
return respond(sendParams, { type: Payload.RespType.CounterFail, counter: now });
}

// check uuid
const res = await RequestStore.addIfAbsent(state.db, requestWrapper.requestId, counter);
if (res === RequestStore.AddRes.Duplicate) {
log.info('duplicate request id: %s', requestWrapper.requestId);
// duplicate fail resp
return respond(sendParams, { type: Payload.RespType.DuplicateFail });
}

// do actual endpoint request
const fetchStartedAt = performance.now();
const resFetch = await EndpointApi.fetchUrl(
ops.pinnedFetch,
reqPayload.endpoint,
reqPayload,
).catch((err: Error) => {
log.error('error during request to %s with %o: %o', reqPayload.endpoint, reqPayload, err);
// HTTP critical fail response
const resp: Payload.RespPayload = {
type: Payload.RespType.Error,
reason: err.toString(),
};
return respond(sendParams, resp);
});
if (!resFetch) {
return;
}

const fetchDur = Math.round(performance.now() - fetchStartedAt);
const resp: Payload.RespPayload = {
type: Payload.RespType.Resp,
...resFetch,
};
return respond(sendParams, addLatencies(reqPayload, resp, { fetchDur, recvAt }));
}

function removeExpiredSegmentResponses(state: State) {
ResponseSegmentStore.removeExpired(state.db, Date.now() - ValidResponseSegmentPeriod)
.then(() => {
Expand Down Expand Up @@ -527,6 +657,59 @@ function addLatencies(
}
}

function respond(
{
conn,
entryPeerId,
requestId,
unboxRequest: { session: unboxSession },
}: {
conn: Net.Socket;
entryPeerId: string;
requestId: string;
unboxRequest: Request.UnboxRequest;
},
respPayload: Payload.RespPayload,
) {
const resResp = Response.respToMessage({
requestId,
entryPeerId,
respPayload,
unboxSession,
});
if (Res.isErr(resResp)) {
log.error('error boxing response: %o', resResp.error);
return;
}

const resFrames = Frame.toResponseFrames(resResp.res);
if (Res.isErr(resFrames)) {
log.error('error creating response frames: %o', resFrames.error);
return;
}

const frames = resFrames.res;
log.verbose(
'returning bytes to e%s, requestId: %s, frameCount: %d',
Utils.shortPeerId(entryPeerId),
requestId,
frames.length,
);

frames.forEach((f: Frame.Frame) => {
conn.write(f);
});

// if (ops.discoveryPlatform) {
// reportToDiscoveryPlatform({
// ops,
// cacheEntry,
// reqPayload,
// segments,
// });
// }
}

function sendResponse(
{
state,
Expand Down Expand Up @@ -703,6 +886,9 @@ if (require.main === module) {
if (!process.env.UHTTP_EA_DATABASE_FILE) {
throw new Error("Missing 'UHTTP_EA_DATABASE_FILE' env var.");
}
if (!process.env.UHTTP_EA_TCPPORT) {
throw new Error("Missing 'UHTTP_EA_TCPPORT' env var.");
}
Comment on lines +890 to +891
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate 'UHTTP_EA_TCPPORT' Environment Variable

When reading the UHTTP_EA_TCPPORT environment variable, ensure that it is not only present but also a valid integer within the acceptable TCP port range (1-65535). Invalid port numbers could cause the server to fail unexpectedly.

Apply this diff to add validation:

 if (!process.env.UHTTP_EA_TCPPORT) {
     throw new Error("Missing 'UHTTP_EA_TCPPORT' env var.");
 }
+const tcpPort = parseInt(process.env.UHTTP_EA_TCPPORT, 10);
+if (isNaN(tcpPort) || tcpPort < 1 || tcpPort > 65535) {
+    throw new Error("Invalid 'UHTTP_EA_TCPPORT' env var. It must be a valid port number (1-65535).");
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
throw new Error("Missing 'UHTTP_EA_TCPPORT' env var.");
}
if (!process.env.UHTTP_EA_TCPPORT) {
throw new Error("Missing 'UHTTP_EA_TCPPORT' env var.");
}
const tcpPort = parseInt(process.env.UHTTP_EA_TCPPORT, 10);
if (isNaN(tcpPort) || tcpPort < 1 || tcpPort > 65535) {
throw new Error("Invalid 'UHTTP_EA_TCPPORT' env var. It must be a valid port number (1-65535).");
}


const dpEndpoint = process.env.UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT;
let discoveryPlatform;
Expand All @@ -726,5 +912,6 @@ if (require.main === module) {
discoveryPlatform,
dbFile: process.env.UHTTP_EA_DATABASE_FILE,
pinnedFetch: globalThis.fetch.bind(globalThis),
tcpPort: parseInt(process.env.UHTTP_EA_TCPPORT),
});
}
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,10 @@
"@noble/curves" "^1.3.0"
"@noble/hashes" "^1.3.3"

"@hoprnet/uhttp-lib@^3.8.0":
version "3.8.0"
resolved "https://registry.yarnpkg.com/@hoprnet/uhttp-lib/-/uhttp-lib-3.8.0.tgz#5f738c28f12fd6796fc4e3c047425f750abbd3f3"
integrity sha512-VoqJhXd8xe0pcbssDl/FRuheTP5UR5S8BwRxjXWAKaWrBkPBMVWiwSQAq6J6OPd0zA3R5VqLMRPu/OmkFpcatg==
"@hoprnet/uhttp-lib@^3.9.0-beta.1":
version "3.9.0-beta.1"
resolved "https://registry.yarnpkg.com/@hoprnet/uhttp-lib/-/uhttp-lib-3.9.0-beta.1.tgz#0fa76a55e463638d78c180e04855070822fb2431"
integrity sha512-9pUX8hwGroOSx/F6AqHhoL0DsjBgWjX8h1ctpG7gE9uYS41toQnCh8m89ugPDg6axnJh5SyDAapn9UOk5dZqfQ==
dependencies:
"@hoprnet/uhttp-crypto" "^1.0.1"
debug "^4.3.4"
Expand Down
Loading