Skip to content

Commit

Permalink
[server] move connection limit mechanism completely to C3 WASM
Browse files Browse the repository at this point in the history
  • Loading branch information
rexim committed Oct 10, 2024
1 parent 48d0f0d commit 22314a6
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 99 deletions.
2 changes: 1 addition & 1 deletion client.mts
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,9 @@ async function createGame(): Promise<Game> {
prevTimestamp = timestamp;

game.wasmClient.render_game(deltaTime, time);

displaySwapBackImageData(game.display, game.wasmClient);
renderDebugInfo(game.display.ctx, deltaTime, game);

window.requestAnimationFrame(frame);
}
window.requestAnimationFrame((timestamp) => {
Expand Down
2 changes: 2 additions & 0 deletions common.c3
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import std::math;
import std::io;
import std::collections::list;

def ShortString = char[64];

struct Asset {
String filename;
usz offset;
Expand Down
10 changes: 10 additions & 0 deletions common.mjs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export const SERVER_PORT = 6970;
export const UINT32_SIZE = 4;
const SHORT_STRING_SIZE = 64;
export function makeWasmCommon(wasm) {
return {
wasm,
Expand All @@ -15,4 +16,13 @@ export function arrayBufferAsMessageInWasm(wasmCommon, buffer) {
new Uint8ClampedArray(wasmCommon.memory.buffer, wasmBufferPtr + UINT32_SIZE, wasmBufferSize - UINT32_SIZE).set(new Uint8ClampedArray(buffer));
return wasmBufferPtr;
}
export function stringAsShortStringInWasm(wasmCommon, s) {
const shortStringPtr = wasmCommon.allocate_temporary_buffer(SHORT_STRING_SIZE);
const bytes = new Uint8ClampedArray(wasmCommon.memory.buffer, shortStringPtr, SHORT_STRING_SIZE);
bytes.fill(0);
for (let i = 0; i < s.length && i < SHORT_STRING_SIZE - 1; ++i) {
bytes[i] = s.charCodeAt(i);
}
return shortStringPtr;
}
//# sourceMappingURL=common.mjs.map
11 changes: 11 additions & 0 deletions common.mts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export const SERVER_PORT = 6970;
export const UINT32_SIZE = 4;
const SHORT_STRING_SIZE = 64; // IMPORTANT! Must be synchronized with the capacity of the ShortString type in server.c3

export interface WasmCommon {
wasm: WebAssembly.WebAssemblyInstantiatedSource,
Expand All @@ -24,3 +25,13 @@ export function arrayBufferAsMessageInWasm(wasmCommon: WasmCommon, buffer: Array
new Uint8ClampedArray(wasmCommon.memory.buffer, wasmBufferPtr + UINT32_SIZE, wasmBufferSize - UINT32_SIZE).set(new Uint8ClampedArray(buffer));
return wasmBufferPtr;
}

export function stringAsShortStringInWasm(wasmCommon: WasmCommon, s: string): number {
const shortStringPtr = wasmCommon.allocate_temporary_buffer(SHORT_STRING_SIZE);
const bytes = new Uint8ClampedArray(wasmCommon.memory.buffer, shortStringPtr, SHORT_STRING_SIZE);
bytes.fill(0);
for (let i = 0; i < s.length && i < SHORT_STRING_SIZE-1; ++i) {
bytes[i] = s.charCodeAt(i);
}
return shortStringPtr;
}
75 changes: 61 additions & 14 deletions server.c3
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import std::io;
import std::math;
import std::collections::list;
import std::collections::map;
import std::hash::fnv32a;

const usz SERVER_TOTAL_LIMIT = 2000;
// const usz SERVER_SINGLE_IP_LIMIT = 10;
const usz SERVER_SINGLE_IP_LIMIT = 2;

extern fn int platform_now_secs();
extern fn uint platform_send_message(uint player_id, void *message);
extern fn uint platform_now_msecs();

const int SERVER_FPS = 60;
const int SERVER_FPS = 60; // IMPORTANT! Must be in sync with SERVER_FPS in server.mts

fn void send_message_and_update_stats(uint player_id, void* message) {
uint sent = platform_send_message(player_id, message);
Expand All @@ -19,6 +24,13 @@ fn void send_message_and_update_stats(uint player_id, void* message) {
}
}

/// Connection Limits //////////////////////////////

def ShortString = char[64]; // IMPORTANT! The capacity must be in sync with the SHORT_STRING_SIZE in common.mts
macro uint ShortString.hash(self) => fnv32a::encode(&self);
def ConnectionLimitEntry = Entry(<ShortString, uint>);
HashMap(<ShortString, uint>) connection_limits;

/// Items //////////////////////////////

List(<usz>) collected_items;
Expand Down Expand Up @@ -114,6 +126,7 @@ fn BombsExplodedBatchMessage* exploded_bombs_as_batch_message(Bombs* bombs) {
struct PlayerOnServer {
Player player;
char new_moving;
ShortString remote_address;
}

def PlayerOnServerEntry = Entry(<uint, PlayerOnServer>);
Expand All @@ -125,25 +138,63 @@ HashMap(<uint, bool>) left_ids;
def PingEntry = Entry(<uint, uint>);
HashMap(<uint, uint>) ping_ids;

fn void register_new_player(uint id) @extern("register_new_player") @wasm {
fn bool register_new_player(uint id, ShortString* remote_address) @extern("register_new_player") @wasm {
if (players.len() >= SERVER_TOTAL_LIMIT) {
stats::inc_counter(PLAYERS_REJECTED, 1);
return false;
}

assert(remote_address != null);
usz remote_address_len = ((ZString)&(*remote_address)[0]).char_len(); // WutFace
if (remote_address_len == 0) {
stats::inc_counter(PLAYERS_REJECTED, 1);
return false;
}

if (try count = connection_limits.get(*remote_address)) {
// TODO: we need to let the player know somehow that they were rejected due to the limit
if (count >= SERVER_SINGLE_IP_LIMIT) {
stats::inc_counter(PLAYERS_REJECTED, 1);
return false;
}
connection_limits.set(*remote_address, count + 1);
} else {
connection_limits.set(*remote_address, 1);
}

assert(!players.has_key(id));
joined_ids.set(id, true);
stats::inc_counter(PLAYERS_JOINED, 1);
stats::inc_counter(PLAYERS_CURRENTLY, 1);
players.set(id, {
.player = {
.id = id,
}
},
.remote_address = *remote_address,
});

stats::inc_counter(PLAYERS_JOINED, 1);
stats::inc_counter(PLAYERS_CURRENTLY, 1);

return true;
}

fn void unregister_player(uint id) @extern("unregister_player") @wasm {
if (catch joined_ids.remove(id)) {
left_ids.set(id, false);
// console.log(`Player ${id} disconnected`);
if (try player = players.get(id)) {
if (try count = connection_limits.get(player.remote_address)) {
if (count <= 1) {
connection_limits.remove(player.remote_address);
} else {
connection_limits.set(player.remote_address, count - 1);
}
}

if (catch joined_ids.remove(id)) {
left_ids.set(id, false);
}
stats::inc_counter(PLAYERS_LEFT, 1);
stats::inc_counter(PLAYERS_CURRENTLY, -1);
players.remove(id);
}
stats::inc_counter(PLAYERS_LEFT, 1);
stats::inc_counter(PLAYERS_CURRENTLY, -1);
players.remove(id);
}

fn PlayersJoinedBatchMessage *all_players_as_joined_batch_message() {
Expand Down Expand Up @@ -430,10 +481,6 @@ fn uint tick() @extern("tick") @wasm {
return tickTime;
}

fn void stats_inc_players_rejected_counter() @extern("stats_inc_players_rejected_counter") @wasm {
stats::inc_counter(PLAYERS_REJECTED, 1);
}

/// Entry point //////////////////////////////

fn void entry() @init(2048) @private {
Expand Down
45 changes: 11 additions & 34 deletions server.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,19 @@ import { readFileSync } from 'fs';
import { WebSocketServer } from 'ws';
import * as common from './common.mjs';
const SERVER_FPS = 60;
const SERVER_TOTAL_LIMIT = 2000;
const SERVER_SINGLE_IP_LIMIT = 10;
const wasmServer = await instantiateWasmServer('server.wasm');
const connections = new Map();
const connectionLimits = new Map();
let idCounter = 0;
const wss = new WebSocketServer({ port: common.SERVER_PORT });
wss.on("connection", (ws, req) => {
ws.binaryType = 'arraybuffer';
if (connections.size >= SERVER_TOTAL_LIMIT) {
wasmServer.stats_inc_players_rejected_counter();
ws.close();
return;
}
if (req.socket.remoteAddress === undefined) {
wasmServer.stats_inc_players_rejected_counter();
const remoteAddressPtr = common.stringAsShortStringInWasm(wasmServer, req.socket.remoteAddress ?? "");
const id = idCounter++;
if (!wasmServer.register_new_player(id, remoteAddressPtr)) {
ws.close();
return;
}
const remoteAddress = req.socket.remoteAddress;
{
let count = connectionLimits.get(remoteAddress) || 0;
if (count >= SERVER_SINGLE_IP_LIMIT) {
wasmServer.stats_inc_players_rejected_counter();
ws.close();
return;
}
connectionLimits.set(remoteAddress, count + 1);
}
const id = idCounter++;
wasmServer.register_new_player(id);
connections.set(id, { ws, remoteAddress });
connections.set(id, ws);
ws.addEventListener("message", (event) => {
if (!(event.data instanceof ArrayBuffer)) {
throw new Error("binaryType of the client WebSocket must be 'arraybuffer'");
Expand All @@ -45,15 +26,6 @@ wss.on("connection", (ws, req) => {
}
});
ws.on("close", () => {
let count = connectionLimits.get(remoteAddress);
if (count !== undefined) {
if (count <= 1) {
connectionLimits.delete(remoteAddress);
}
else {
connectionLimits.set(remoteAddress, count - 1);
}
}
wasmServer.unregister_player(id);
connections.delete(id);
});
Expand All @@ -77,7 +49,7 @@ function platform_send_message(player_id, message) {
const size = new Uint32Array(wasmServer.memory.buffer, message, 1)[0];
if (size === 0)
return 0;
connection.ws.send(new Uint8Array(wasmServer.memory.buffer, message + common.UINT32_SIZE, size - common.UINT32_SIZE));
connection.send(new Uint8Array(wasmServer.memory.buffer, message + common.UINT32_SIZE, size - common.UINT32_SIZE));
return size;
}
async function instantiateWasmServer(path) {
Expand All @@ -88,13 +60,18 @@ async function instantiateWasmServer(path) {
platform_send_message,
platform_now_msecs: () => performance.now(),
fmodf: (x, y) => x % y,
memcmp: (ps1, ps2, n) => {
const mem = new Uint8ClampedArray(wasmServer.memory.buffer);
for (; n > 0 && mem[ps1] === mem[ps2]; n--, ps1++, ps2++)
;
return n > 0 ? mem[ps1] - mem[ps2] : 0;
},
},
});
const wasmCommon = common.makeWasmCommon(wasm);
wasmCommon._initialize();
return {
...wasmCommon,
stats_inc_players_rejected_counter: wasm.instance.exports.stats_inc_players_rejected_counter,
register_new_player: wasm.instance.exports.register_new_player,
unregister_player: wasm.instance.exports.unregister_player,
process_message_on_server: wasm.instance.exports.process_message_on_server,
Expand Down
65 changes: 15 additions & 50 deletions server.mts
Original file line number Diff line number Diff line change
Expand Up @@ -2,74 +2,35 @@ import {readFileSync} from 'fs';
import {WebSocketServer, WebSocket} from 'ws';
import * as common from './common.mjs'

const SERVER_FPS = 60;
const SERVER_TOTAL_LIMIT = 2000;
const SERVER_SINGLE_IP_LIMIT = 10;

interface PlayerConnection {
ws: WebSocket,
remoteAddress: string,
}
const SERVER_FPS = 60; // IMPORTANT! Must be in sync with SERVER_FPS in server.c3

const wasmServer = await instantiateWasmServer('server.wasm');
const connections = new Map<number, PlayerConnection>();
const connectionLimits = new Map<string, number>();
const connections = new Map<number, WebSocket>();
let idCounter = 0;
const wss = new WebSocketServer({port: common.SERVER_PORT})

wss.on("connection", (ws, req) => {
ws.binaryType = 'arraybuffer';

if (connections.size >= SERVER_TOTAL_LIMIT) {
wasmServer.stats_inc_players_rejected_counter();
ws.close();
return;
}
const remoteAddressPtr = common.stringAsShortStringInWasm(wasmServer, req.socket.remoteAddress ?? "");

if (req.socket.remoteAddress === undefined) {
// NOTE: something weird happened the client does not have a remote address
wasmServer.stats_inc_players_rejected_counter();
const id = idCounter++;
if (!wasmServer.register_new_player(id, remoteAddressPtr)) {
ws.close();
return;
}

const remoteAddress = req.socket.remoteAddress;

{
let count = connectionLimits.get(remoteAddress) || 0;
if (count >= SERVER_SINGLE_IP_LIMIT) {
wasmServer.stats_inc_players_rejected_counter();
ws.close();
return;
}
connectionLimits.set(remoteAddress, count + 1);
}

const id = idCounter++;
wasmServer.register_new_player(id);
connections.set(id, {ws, remoteAddress});
// console.log(`Player ${id} connected`);
connections.set(id, ws);
ws.addEventListener("message", (event) => {
if (!(event.data instanceof ArrayBuffer)) {
throw new Error("binaryType of the client WebSocket must be 'arraybuffer'");
}
const eventDataPtr = common.arrayBufferAsMessageInWasm(wasmServer, event.data);
// console.log(`Received message from player ${id}`, new Uint8ClampedArray(event.data));
if (!wasmServer.process_message_on_server(id, eventDataPtr)) {
ws.close();
return;
}
});
ws.on("close", () => {
// console.log(`Player ${id} disconnected`);
let count = connectionLimits.get(remoteAddress);
if (count !== undefined) {
if (count <= 1) {
connectionLimits.delete(remoteAddress);
} else {
connectionLimits.set(remoteAddress, count - 1);
}
}
wasmServer.unregister_player(id);
connections.delete(id);
})
Expand All @@ -81,8 +42,7 @@ function tick() {
}

interface WasmServer extends common.WasmCommon {
stats_inc_players_rejected_counter: () => void,
register_new_player: (id: number) => void,
register_new_player: (id: number, remote_address: number) => boolean,
unregister_player: (id: number) => void,
process_message_on_server: (id: number, message: number) => boolean,
tick: () => number,
Expand All @@ -105,7 +65,7 @@ function platform_send_message(player_id: number, message: number): number {
if (connection === undefined) return 0; // connection does not exist
const size = new Uint32Array(wasmServer.memory.buffer, message, 1)[0];
if (size === 0) return 0; // empty emssage
connection.ws.send(new Uint8Array(wasmServer.memory.buffer, message + common.UINT32_SIZE, size - common.UINT32_SIZE));
connection.send(new Uint8Array(wasmServer.memory.buffer, message + common.UINT32_SIZE, size - common.UINT32_SIZE));
return size;
}

Expand All @@ -117,14 +77,19 @@ async function instantiateWasmServer(path: string): Promise<WasmServer> {
platform_send_message,
platform_now_msecs: () => performance.now(),
fmodf: (x: number, y: number) => x%y,
memcmp: (ps1: number, ps2: number, n: number) => {
// NOTE: the idea is stolen from musl's memcmp
const mem = new Uint8ClampedArray(wasmServer.memory.buffer);
for (; n > 0 && mem[ps1] === mem[ps2]; n--, ps1++, ps2++);
return n > 0 ? mem[ps1] - mem[ps2] : 0;
},
},
});
const wasmCommon = common.makeWasmCommon(wasm);
wasmCommon._initialize();
return {
...wasmCommon,
stats_inc_players_rejected_counter: wasm.instance.exports.stats_inc_players_rejected_counter as () => void,
register_new_player: wasm.instance.exports.register_new_player as (id: number) => void,
register_new_player: wasm.instance.exports.register_new_player as (id: number, remote_address: number) => boolean,
unregister_player: wasm.instance.exports.unregister_player as (id: number) => void,
process_message_on_server: wasm.instance.exports.process_message_on_server as (id: number, message: number) => boolean,
tick: wasm.instance.exports.tick as () => number,
Expand Down
Binary file modified server.wasm
Binary file not shown.

0 comments on commit 22314a6

Please sign in to comment.