Skip to content

Commit

Permalink
Allow retransmission requests for singular segments (#22)
Browse files Browse the repository at this point in the history
* initial sqlite tables

* exit app can now retransfer requested segmetns

* fix compilaion errors

* gently nudging node towards performance breakdown

* scheduled more lenient transfer

* updating to retransfer uhttp

* fix tests

* adjust docker build to current verison
  • Loading branch information
esterlus authored Oct 16, 2024
1 parent 81785fd commit 672db98
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 94 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ coverage/
node_modules/

# Created by github pipeline
gha-creds-*.json
gha-creds-*.json

*.sqlite3
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Specify platform to be amd because arm doesn't work
FROM --platform=linux/amd64 node:18-alpine as builder
FROM --platform=linux/amd64 node:20-alpine as builder

Check warning on line 2 in Dockerfile

View workflow job for this annotation

GitHub Actions / Close release

The 'as' keyword should match the case of the 'from' keyword

FromAsCasing: 'as' and 'FROM' keywords' casing do not match More info: https://docs.docker.com/go/dockerfile/rule/from-as-casing/

Check warning on line 2 in Dockerfile

View workflow job for this annotation

GitHub Actions / Close release

FROM --platform flag should not use a constant value

FromPlatformFlagConstDisallowed: FROM --platform flag should not use constant value "linux/amd64" More info: https://docs.docker.com/go/dockerfile/rule/from-platform-flag-const-disallowed/

RUN apk upgrade --no-cache

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"typescript": "^5.4.5"
},
"dependencies": {
"@hoprnet/uhttp-lib": "^3.3.0",
"@hoprnet/uhttp-lib": "^3.7.2",
"debug": "^4.3.4",
"sqlite3": "^5.1.7",
"ws": "^8.18.0"
Expand Down
25 changes: 25 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import sqlite3 from 'sqlite3';

export type DB = sqlite3.Database;

export function setup(dbFile: string): Promise<DB> {
return new Promise((res, rej) => {
const db: sqlite3.Database = new sqlite3.Database(dbFile, (err) => {
if (err) {
return rej(`Error creating db: ${err}`);
}
return res(db);
});
});
}

export function close(db: DB): Promise<void> {
return new Promise((res, rej) => {
db.close((err) => {
if (err) {
return rej(`Error closing db: ${err}`);
}
return res();
});
});
}
181 changes: 131 additions & 50 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import {
Utils,
} from '@hoprnet/uhttp-lib';

import log from './logger';
import * as DB from './db';
import * as RequestStore from './request-store';
import * as ResponseSegmentStore from './response-segment-store';
import Version from './version';
import log from './logger';

// WebSocket heartbeats
const HeartBeatInterval = 30e3; // 30 sec
Expand All @@ -28,7 +30,9 @@ const SetupRelayPeriod = 1e3 * 60 * 15; // 15 min
// reconnect timeout for the websocket after closure
const SocketReconnectTimeout = 3e3; // 3 sec
// Time period in which counters for crypto are considered valid
const ValidCounterPeriod = 1e3 * 60 * 60; // 1hour
const ValidCounterPeriod = 1e3 * 60 * 60; // 1 hour
// Time period to keep segments for potential retransfer
const ValidResponseSegmentPeriod = 1e3 * 60 * 10; // 10 min

type State = {
socket?: WebSocket;
Expand All @@ -37,7 +41,7 @@ type State = {
peerId: string;
cache: SegmentCache.Cache;
deleteTimer: Map<string, ReturnType<typeof setTimeout>>; // deletion timer of requests in segment cache
requestStore: RequestStore.RequestStore;
db: DB.DB;
relays: string[];
heartbeatInterval?: ReturnType<typeof setInterval>;
};
Expand All @@ -54,6 +58,7 @@ type Ops = {
accessToken: string;
discoveryPlatform?: OpsDP;
dbFile: string;
pinnedFetch: typeof globalThis.fetch;
};

type Msg = {
Expand All @@ -66,7 +71,8 @@ async function start(ops: Ops) {
try {
const state = await setup(ops);
setupSocket(state, ops);
removeExpired(state);
removeExpiredRequests(state);
removeExpiredSegmentResponses(state);
setupRelays(state, ops);
} catch (err) {
log.error(
Expand All @@ -79,13 +85,16 @@ async function start(ops: Ops) {
}

async function setup(ops: Ops): Promise<State> {
const requestStore = await RequestStore.setup(ops.dbFile).catch((err) => {
log.error('error setting up request store: %o', err);
const db = await DB.setup(ops.dbFile).catch((err) => {
log.error('error setting up database: %o', err);
});
if (!requestStore) {
throw new Error('No request store');
if (!db) {
throw new Error('No database');
}

await RequestStore.setup(db);
await ResponseSegmentStore.setup(db);

log.verbose('set up DB at', ops.dbFile);

const resPeerId = await NodeApi.accountAddresses(ops).catch((err: Error) => {
Expand All @@ -96,7 +105,8 @@ async function setup(ops: Ops): Promise<State> {
}

const { hopr: peerId } = resPeerId;
const cache = SegmentCache.init();
// we don't care for missing segments reminder in our cache
const cache = SegmentCache.init(function () {});
const deleteTimer = new Map();

const logOpts: Record<string, string> = {
Expand All @@ -114,7 +124,7 @@ async function setup(ops: Ops): Promise<State> {
privateKey: Utils.hexStringToBytes(ops.privateKey),
publicKey: Utils.hexStringToBytes(ops.publicKey),
peerId,
requestStore,
db,
relays: [],
};
}
Expand Down Expand Up @@ -156,27 +166,48 @@ function setupSocket(state: State, ops: Ops) {
state.socket = socket;
}

function removeExpired(state: State) {
RequestStore.removeExpired(state.requestStore, ValidCounterPeriod)
function removeExpiredSegmentResponses(state: State) {
ResponseSegmentStore.removeExpired(state.db, Date.now() - ValidResponseSegmentPeriod)
.then(() => {
log.info('successfully removed expired response segments from db');
})
.catch((err) => {
log.error('error during removeExpiredSegmentResponses: %o', err);
})
.finally(() => {
scheduleRemoveExpiredSegmentResponses(state);
});
}

function removeExpiredRequests(state: State) {
RequestStore.removeExpired(state.db, Date.now() - ValidCounterPeriod)
.then(() => {
log.info('successfully removed expired requests from store');
log.info('successfully removed expired requests from db');
})
.catch((err) => {
log.error('error during removeExpired: %o', err);
log.error('error during removeExpiredRequests: %o', err);
})
.finally(() => {
scheduleRemoveExpired(state);
scheduleRemoveExpiredRequests(state);
});
}

function scheduleRemoveExpired(state: State) {
function scheduleRemoveExpiredSegmentResponses(state: State) {
const logM = ValidResponseSegmentPeriod / 1000 / 60;
log.info('scheduling next remove expired response segments in %dm', logM);
setTimeout(() => {
removeExpiredSegmentResponses(state);
}, ValidResponseSegmentPeriod);
}

function scheduleRemoveExpiredRequests(state: State) {
// schedule next run somehwere between 1h and 1h and 10m
const next = ValidCounterPeriod + Math.floor(Math.random() * 10 * 60e3);
const logH = Math.floor(next / 1000 / 60 / 60);
const logM = Math.round(next / 1000 / 60) - logH * 60;

log.info('scheduling next remove expired requests in %dh%dm', logH, logM);
setTimeout(() => removeExpired(state), next);
setTimeout(() => removeExpiredRequests(state), next);
}

async function setupRelays(state: State, ops: Ops) {
Expand Down Expand Up @@ -243,6 +274,11 @@ function onMessage(state: State, ops: Ops) {
return onInfoReq(state, ops, msg);
}

// determine if segment retransfer
if (msg.body.startsWith('resg;')) {
return onRetransferSegmentsReq(state, ops, msg);
}

// determine if valid segment
const segRes = Segment.fromMessage(msg.body);
if (Res.isErr(segRes)) {
Expand Down Expand Up @@ -306,7 +342,7 @@ function onPingReq(state: State, ops: Ops, msg: Msg) {

function onInfoReq(state: State, ops: Ops, msg: Msg) {
log.info('received info req:', msg.body);
// info-originPeerId-hops
// info-originPeerId-hops-manualRelay
const [, recipient, hopsStr, reqRel] = msg.body.split('-');
const hops = parseInt(hopsStr, 10);
const conn = { ...ops, hops };
Expand All @@ -333,6 +369,40 @@ function onInfoReq(state: State, ops: Ops, msg: Msg) {
});
}

function onRetransferSegmentsReq(state: State, ops: Ops, msg: Msg) {
log.info('received retransfer segments req:', msg.body);
// resg;originPeerId;hops;requestId;segmentNrs
const [, recipient, hopsStr, requestId, rawSegNrs] = msg.body.split(';');
const hops = parseInt(hopsStr, 10);
const conn = { ...ops, hops };
const segNrs = rawSegNrs.split(',').map((s) => parseInt(s));

log.verbose('reading segment nrs for %s from db: %o', requestId, segNrs);
ResponseSegmentStore.all(state.db, requestId, segNrs)
.then((segments) => {
segments.forEach((seg, idx) => {
setTimeout(() => {
const segLog = Segment.prettyPrint(seg);
log.verbose('retransferring %s to %s', segLog, recipient);
NodeApi.sendMessage(conn, {
recipient,
tag: msg.tag,
message: Segment.toMessage(seg),
})
.then((r) => {
log.verbose('retransfer response %s: %o', segLog, r);
})
.catch((err: Error) => {
log.error('error retransferring %s: %o', Segment.prettyPrint(seg), err);
});
}, idx * 10);
});
})
.catch((err) => {
log.error('error reading response segments: %o', err);
});
}

async function completeSegmentsEntry(
state: State,
ops: Ops,
Expand Down Expand Up @@ -382,7 +452,7 @@ async function completeSegmentsEntry(
}

// check uuid
const res = await RequestStore.addIfAbsent(state.requestStore, requestId, counter);
const res = await RequestStore.addIfAbsent(state.db, requestId, counter);
if (res === RequestStore.AddRes.Duplicate) {
log.info('duplicate request id:', requestId);
// duplicate fail resp
Expand All @@ -391,22 +461,19 @@ async function completeSegmentsEntry(

// do actual endpoint request
const fetchStartedAt = performance.now();
const resFetch = await EndpointApi.fetchUrl(reqPayload.endpoint, reqPayload).catch(
(err: Error) => {
log.error(
'error doing RPC req on %s with %o: %o',
reqPayload.endpoint,
reqPayload,
err,
);
// HTTP critical fail response
const resp: Payload.RespPayload = {
type: Payload.RespType.Error,
reason: err.toString(),
};
return sendResponse(sendParams, resp);
},
);
const resFetch = await EndpointApi.fetchUrl(
ops.pinnedFetch,
reqPayload.endpoint,
reqPayload,
).catch((err: Error) => {
log.error('error doing RPC req on %s with %o: %o', reqPayload.endpoint, reqPayload, err);
// HTTP critical fail response
const resp: Payload.RespPayload = {
type: Payload.RespType.Error,
reason: err.toString(),
};
return sendResponse(sendParams, resp);
});
if (!resFetch) {
return;
}
Expand Down Expand Up @@ -511,36 +578,47 @@ function sendResponse(
};

// queue segment sending for all of them
segments.forEach((seg: Segment.Segment) => {
NodeApi.sendMessage(conn, {
recipient: entryPeerId,
tag,
message: Segment.toMessage(seg),
}).catch((err: Error) => {
log.error('error sending %s: %o', Segment.prettyPrint(seg), err);
// remove relay if it fails
state.relays = state.relays.filter((r) => r !== relay);
});
const ts = Date.now();
segments.forEach((seg: Segment.Segment, idx) => {
const segLog = Segment.prettyPrint(seg);
log.verbose('putting %s into db at %d ', segLog, ts);
ResponseSegmentStore.put(state.db, seg, ts);
setTimeout(() => {
log.verbose('sending %s to %s', segLog, entryPeerId);
NodeApi.sendMessage(conn, {
recipient: entryPeerId,
tag,
message: Segment.toMessage(seg),
})
.then((r) => {
log.verbose('response %s: %o', segLog, r);
})
.catch((err: Error) => {
log.error('error sending %s: %o', Segment.prettyPrint(seg), err);
// remove relay if it fails
state.relays = state.relays.filter((r) => r !== relay);
});
}, idx * 10);
});

if (ops.discoveryPlatform) {
reportToDiscoveryPlatform({
ops,
cacheEntry,
opsDP: ops.discoveryPlatform,
reqPayload,
segments,
});
}
}

async function reportToDiscoveryPlatform({
ops,
cacheEntry,
opsDP,
reqPayload,
segments,
}: {
cacheEntry: SegmentCache.Entry;
opsDP: OpsDP;
ops: Ops;
reqPayload: Payload.ReqPayload;
segments: Segment.Segment[];
}) {
Expand Down Expand Up @@ -570,9 +648,11 @@ async function reportToDiscoveryPlatform({
type: 'response',
};

const dp = ops.discoveryPlatform as OpsDP;
const conn = {
discoveryPlatformEndpoint: opsDP.endpoint,
nodeAccessToken: opsDP.nodeAccessToken,
discoveryPlatformEndpoint: dp.endpoint,
nodeAccessToken: dp.nodeAccessToken,
pinnedFetch: ops.pinnedFetch,
};
DpApi.postQuota(conn, quotaRequest).catch((err) => {
log.error('error recording request quota: %o', err);
Expand Down Expand Up @@ -644,5 +724,6 @@ if (require.main === module) {
accessToken: process.env.UHTTP_EA_HOPRD_ACCESS_TOKEN,
discoveryPlatform,
dbFile: process.env.UHTTP_EA_DATABASE_FILE,
pinnedFetch: globalThis.fetch.bind(globalThis),
});
}
Loading

0 comments on commit 672db98

Please sign in to comment.