diff --git a/package.json b/package.json index 97b0780..46b6dc6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "uhttp-exit-app", - "version": "3.4.0", + "version": "4.0.0-a.1", "description": "u(nlinked)HTTP exit application", "author": "HOPR Association", "main": "src/index.ts", @@ -31,7 +31,7 @@ "typescript": "^5.4.5" }, "dependencies": { - "@hoprnet/uhttp-lib": "^3.8.0", + "@hoprnet/uhttp-lib": "^3.9.0-beta.1", "debug": "^4.3.4", "sqlite3": "^5.1.7", "ws": "^8.18.0" diff --git a/src/index.ts b/src/index.ts index a6d50af..a261758 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,9 +1,11 @@ +import Net from 'node:net'; import WebSocket from 'ws'; import { version as Version } from '../package.json'; import { DpApi, EndpointApi, ExitNode, + Frame, NodeApi, Payload, Request, @@ -30,6 +32,8 @@ const RequestPurgeTimeout = 60e3; // 60 sec const SetupRelayPeriod = 1e3 * 60 * 15; // 15 min // reconnect timeout for the websocket after closure const SocketReconnectTimeout = 3e3; // 3 sec +// restart server after closing +const ServerRestartTimeout = 1e3; // 1 sec // Time period in which counters for crypto are considered valid const ValidCounterPeriod = 1e3 * 60 * 60; // 1 hour // Time period to keep segments for potential retransfer @@ -37,6 +41,7 @@ const ValidResponseSegmentPeriod = 1e3 * 60 * 10; // 10 min type State = { socket?: WebSocket; + server?: Net.Server; privateKey: Uint8Array; publicKey: Uint8Array; peerId: string; @@ -60,6 +65,7 @@ type Ops = { discoveryPlatform?: OpsDP; dbFile: string; pinnedFetch: typeof globalThis.fetch; + tcpPort: number; }; type Msg = { @@ -72,6 +78,7 @@ async function start(ops: Ops) { try { const state = await setup(ops); setupSocket(state, ops); + setupServer(state, ops); removeExpiredRequests(state); removeExpiredSegmentResponses(state); setupRelays(state, ops); @@ -167,6 +174,129 @@ function setupSocket(state: State, ops: Ops) { state.socket = socket; } +function setupServer(state: State, ops: Ops) { + const server = Net.createServer(onConnection(state, ops)); + + server.on('error', (err: Error) => { + log.error('error on server: %o', err); + }); + + server.on('close', () => { + log.warn('closing server - attempting reconnect'); + setTimeout(() => setupServer(state, ops), ServerRestartTimeout); + }); + + server.listen(ops.tcpPort, () => { + log.verbose('tcp server bound to %d', ops.tcpPort); + }); +} + +function onConnection(state: State, ops: Ops) { + return function (conn: Net.Socket) { + const recvAt = performance.now(); + let requestWrapper: Frame.RequestWrapper; + + conn.on('data', function (rawData: Uint8Array) { + const data = new Uint8Array(rawData); + if (requestWrapper) { + Frame.concatData(requestWrapper, data); + } else { + const resWrap = Frame.toRequestFrameWrapper(data); + if (Res.isErr(resWrap)) { + log.warn('discarding received %d bytes: %s', data.length, resWrap.error); + return; + } + requestWrapper = resWrap.res; + } + if (Frame.isComplete(requestWrapper)) { + // requrest complete + onIncomingRequest(state, ops, conn, requestWrapper, recvAt); + } + }); + + conn.on('end', () => { + log.error('unexpected end of incoming socket connection'); + }); + + conn.on('error', (err) => { + log.error('error on connection: %o', err); + }); + }; +} + +async function onIncomingRequest( + state: State, + ops: Ops, + conn: Net.Socket, + requestWrapper: Frame.RequestWrapper, + recvAt: number, +) { + const resReq = Request.messageToReq({ + requestId: requestWrapper.requestId, + message: requestWrapper.data, + exitPeerId: state.peerId, + exitPrivateKey: state.privateKey, + }); + + if (Res.isErr(resReq)) { + log.error('error unboxing request:', resReq.error); + return; + } + + const unboxRequest = resReq.res; + const { reqPayload, session: unboxSession } = unboxRequest; + const sendParams = { + conn, + entryPeerId: requestWrapper.entryId, + requestId: requestWrapper.requestId, + unboxRequest, + }; + + // check counter + const counter = Number(unboxSession.updatedTS); + const now = Date.now(); + const valid = now - ValidCounterPeriod; + if (counter < valid) { + log.info('counter %d outside valid period %d (now: %d)', counter, valid, now); + // counter fail resp + return respond(sendParams, { type: Payload.RespType.CounterFail, counter: now }); + } + + // check uuid + const res = await RequestStore.addIfAbsent(state.db, requestWrapper.requestId, counter); + if (res === RequestStore.AddRes.Duplicate) { + log.info('duplicate request id: %s', requestWrapper.requestId); + // duplicate fail resp + return respond(sendParams, { type: Payload.RespType.DuplicateFail }); + } + + // do actual endpoint request + const fetchStartedAt = performance.now(); + const resFetch = await EndpointApi.fetchUrl( + ops.pinnedFetch, + reqPayload.endpoint, + reqPayload, + ).catch((err: Error) => { + log.error('error during request to %s with %o: %o', reqPayload.endpoint, reqPayload, err); + // HTTP critical fail response + const resp: Payload.RespPayload = { + type: Payload.RespType.Error, + reason: err.toString(), + }; + return respond(sendParams, resp); + }); + if (!resFetch) { + return; + } + + const fetchDur = Math.round(performance.now() - fetchStartedAt); + const resp: Payload.RespPayload = { + type: Payload.RespType.Resp, + ...resFetch, + }; + return respond(sendParams, addLatencies(reqPayload, resp, { fetchDur, recvAt })); +} + function removeExpiredSegmentResponses(state: State) { ResponseSegmentStore.removeExpired(state.db, Date.now() - ValidResponseSegmentPeriod) .then(() => { @@ -527,6 +657,59 @@ function addLatencies( } } +function respond( + { + conn, + entryPeerId, + requestId, + unboxRequest: { session: unboxSession }, + }: { + conn: Net.Socket; + entryPeerId: string; + requestId: string; + unboxRequest: Request.UnboxRequest; + }, + respPayload: Payload.RespPayload, +) { + const resResp = Response.respToMessage({ + requestId, + entryPeerId, + respPayload, + unboxSession, + }); + if (Res.isErr(resResp)) { + log.error('error boxing response: %o', resResp.error); + return; + } + + const resFrames = Frame.toResponseFrames(resResp.res); + if (Res.isErr(resFrames)) { + log.error('error creating response frames: %o', resFrames.error); + return; + } + + const frames = resFrames.res; + log.verbose( + 'returning bytes to e%s, requestId: %s, frameCount: %d', + Utils.shortPeerId(entryPeerId), + requestId, + frames.length, + ); + + frames.forEach((f: Frame.Frame) => { + conn.write(f); + }); + + // if (ops.discoveryPlatform) { + // reportToDiscoveryPlatform({ + // ops, + // cacheEntry, + // reqPayload, + // segments, + // }); + // } +} + function sendResponse( { state, @@ -703,6 +886,9 @@ if (require.main === module) { if (!process.env.UHTTP_EA_DATABASE_FILE) { throw new Error("Missing 'UHTTP_EA_DATABASE_FILE' env var."); } + if (!process.env.UHTTP_EA_TCPPORT) { + throw new Error("Missing 'UHTTP_EA_TCPPORT' env var."); + } const dpEndpoint = process.env.UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT; let discoveryPlatform; @@ -726,5 +912,6 @@ if (require.main === module) { discoveryPlatform, dbFile: process.env.UHTTP_EA_DATABASE_FILE, pinnedFetch: globalThis.fetch.bind(globalThis), + tcpPort: parseInt(process.env.UHTTP_EA_TCPPORT), }); } diff --git a/yarn.lock b/yarn.lock index 429c040..557a810 100644 --- a/yarn.lock +++ b/yarn.lock @@ -355,10 +355,10 @@ "@noble/curves" "^1.3.0" "@noble/hashes" "^1.3.3" -"@hoprnet/uhttp-lib@^3.8.0": - version "3.8.0" - resolved "https://registry.yarnpkg.com/@hoprnet/uhttp-lib/-/uhttp-lib-3.8.0.tgz#5f738c28f12fd6796fc4e3c047425f750abbd3f3" - integrity sha512-VoqJhXd8xe0pcbssDl/FRuheTP5UR5S8BwRxjXWAKaWrBkPBMVWiwSQAq6J6OPd0zA3R5VqLMRPu/OmkFpcatg== +"@hoprnet/uhttp-lib@^3.9.0-beta.1": + version "3.9.0-beta.1" + resolved "https://registry.yarnpkg.com/@hoprnet/uhttp-lib/-/uhttp-lib-3.9.0-beta.1.tgz#0fa76a55e463638d78c180e04855070822fb2431" + integrity sha512-9pUX8hwGroOSx/F6AqHhoL0DsjBgWjX8h1ctpG7gE9uYS41toQnCh8m89ugPDg6axnJh5SyDAapn9UOk5dZqfQ== dependencies: "@hoprnet/uhttp-crypto" "^1.0.1" debug "^4.3.4"