Skip to content

Commit

Permalink
Merge pull request #14 from Cardinal-Cryptography/fix-column-names
Browse files Browse the repository at this point in the history
Split GraphQL queries into v1 and v2.
  • Loading branch information
deuszx authored May 16, 2024
2 parents 7b564d1 + 0b144aa commit fec69f3
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 34 deletions.
265 changes: 264 additions & 1 deletion package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"@types/node": "^20.11.25",
"@types/ws": "^8.5.10",
"prettier": "3.2.5",
"trace-unhandled": "^2.0.1",
"ts-node": "^10.9.2",
"type": "^2.7.2",
"typescript": "~5.2.0"
Expand Down
1 change: 0 additions & 1 deletion src/grapqhl/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ export async function readWholeConnection<T>(
}
}
} catch (err) {
// TODO: throw error?
console.error(err);
break;
}
Expand Down
15 changes: 13 additions & 2 deletions src/grapqhl/pools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { Client } from "graphql-ws";
import { RawElement, readWholeConnection } from ".";
import { PairSwapVolume, PoolV2 } from "../models/pool";
import { Observable, mergeMap } from "rxjs";
import { poolsV2ConnectionsQuery } from "./queries";
import { poolsV2ConnectionsQuery as poolReservesV2 } from "./v2/queries";
import { poolsV2ConnectionsQuery as poolReservesV1 } from "./v1/queries";

export function poolsV2$(
rawObservable: Observable<RawElement>,
Expand All @@ -13,7 +14,17 @@ export function poolsV2$(
}

export function loadInitPoolReserves(client: Client): Promise<PoolV2[]> {
return readWholeConnection<PoolV2>(client, poolsV2ConnectionsQuery);
const v1 = loadInitReservesV1(client);
const v2 = loadInitReservesV2(client);
return Promise.all([v1, v2]).then((values) => values.flat());
}

export function loadInitReservesV1(client: Client): Promise<PoolV2[]> {
return readWholeConnection<PoolV2>(client, poolReservesV1);
}

export function loadInitReservesV2(client: Client): Promise<PoolV2[]> {
return readWholeConnection<PoolV2>(client, poolReservesV2);
}

/// Query all pair swap volumes from the GraphQL server.
Expand Down
2 changes: 1 addition & 1 deletion src/grapqhl/psp22.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RawElement, readWholeConnection } from ".";
import { psp22TokenBalancesConnectionsQuery } from "./queries";
import { psp22TokenBalancesConnectionsQuery } from "./v2/queries";
import { TokenBalances, TokenBalance } from "../models/psp22";
import { Observable, map, reduce } from "rxjs";
import { Client } from "graphql-ws";
Expand Down
4 changes: 2 additions & 2 deletions src/grapqhl/queries.ts → src/grapqhl/v1/queries.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ConnectionQuery } from "./connection";
import { SubscriptionQuery } from "./subscription";
import { ConnectionQuery } from "../connection";
import { SubscriptionQuery } from "../subscription";

export const psp22TokenBalancesConnectionsQuery: ConnectionQuery =
new ConnectionQuery(
Expand Down
35 changes: 35 additions & 0 deletions src/grapqhl/v2/queries.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { ConnectionQuery } from "../connection";
import { SubscriptionQuery } from "../subscription";

export const psp22TokenBalancesConnectionsQuery: ConnectionQuery =
new ConnectionQuery(
"psp22TokenBalances",
"account amount token lastUpdateBlockHeight lastUpdateTimestamp id",
);

export const pspTokenBalancesSubscriptionQuery: SubscriptionQuery =
new SubscriptionQuery(
"psp22TokenBalances",
"account amount token lastUpdateBlockHeight lastUpdateTimestamp",
);

export const nativeTransfersSubscriptionQuery: SubscriptionQuery =
new SubscriptionQuery(
"nativeTransfers",
"extrinsicHash sender recipient amount blockNumber timestamp",
50,
"timestamp_ASC",
);

export const poolsV2SubscriptionQuery: SubscriptionQuery =
new SubscriptionQuery(
"pools",
"id token0 token1 reserves0 reserves1 lastUpdateTimestamp",
50,
"lastUpdateTimestamp_ASC",
);

export const poolsV2ConnectionsQuery: ConnectionQuery = new ConnectionQuery(
"pools",
"id token0 token1 reserves0 reserves1 lastUpdateTimestamp",
);
68 changes: 45 additions & 23 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,31 @@ import http from "http";
import express from "express";

import { Config } from "./config";
import { tokenBalances$, loadInitBalances } from "./grapqhl/psp22";
import * as rest from "./servers/http";
import { graphqlSubscribe$ } from "./grapqhl";
import {
pspTokenBalancesSubscriptionQuery,
nativeTransfersSubscriptionQuery,
poolsV2SubscriptionQuery,
} from "./grapqhl/queries";
import { setupNativeTransfersOverWss } from "./servers/ws/nativeTransfers";
import { graphqlSubscribe$, RawElement } from "./grapqhl";
import { poolsV2SubscriptionQuery as v2PoolSubscriptionQuery } from "./grapqhl/v2/queries";
import { poolsV2SubscriptionQuery as v1PoolSubscriptionQuery } from "./grapqhl/v1/queries";
import { setupPoolsV2OverWs } from "./servers/ws/amm";
import { nativeTransfers$ } from "./grapqhl/nativeTransfers";
import { UsdPriceCache } from "./services/usdPriceCache";
import { loadInitPoolReserves, poolsV2$ } from "./grapqhl/pools";
import { poolDataSample$ } from "./mocks/pools";
import { Pools } from "./models/pool";
import { share } from "rxjs";
import { merge, Observable, share } from "rxjs";
import { isV2GraphQLReservesError, isV1GraphQLReservesError } from "./utils";

async function main(): Promise<void> {
const app = express();
app.use(cors({
origin: [
/\.common\.fi$/, // mainnet, testnet
/\.azero\.dev$/, // devnet, branch previews
/\.d15umvvtx19run\.amplifyapp\.com$/, // AWS amplify previews
/^http:\/\/localhost:[0-9]*$/, // local development
/^http:\/\/127\.0\.0\.1:[0-9]*$/, // local development
],
}));
app.use(
cors({
origin: [
/\.common\.fi$/, // mainnet, testnet
/\.azero\.dev$/, // devnet, branch previews
/\.d15umvvtx19run\.amplifyapp\.com$/, // AWS amplify previews
/^http:\/\/localhost:[0-9]*$/, // local development
/^http:\/\/127\.0\.0\.1:[0-9]*$/, // local development
],
}),
);
const server = http.createServer(app);
const config = new Config();

Expand Down Expand Up @@ -113,14 +110,39 @@ async function main(): Promise<void> {
pools.updateBatch(initPools);
});

let graphqlPoolV2$ = graphqlSubscribe$(
// It's safe to run both subscriptions since one of them is bound to fail.
new Observable<RawElement>((_observer) => {});
new Observable<RawElement>((_observer) => {});

let v2graphqlPoolV2$ = graphqlSubscribe$(
graphqlClient,
poolsV2SubscriptionQuery,
v2PoolSubscriptionQuery,
);

let poolsV2Updates$ = poolsV2$(graphqlPoolV2$).pipe(share());
let v1graphqlPoolV1$ = graphqlSubscribe$(
graphqlClient,
v1PoolSubscriptionQuery,
);

poolsV2Updates$.forEach((pool) => pools.update(pool));
// Share the observable to enable multiple subscriptions (forEach and setupPoolsV2OverWs).
let v2poolsV2Updates$ = poolsV2$(v2graphqlPoolV2$).pipe(share());
let v1poolsV2Updates$ = poolsV2$(v1graphqlPoolV1$).pipe(share());

let poolsV2Updates$ = merge(v1poolsV2Updates$, v2poolsV2Updates$);

poolsV2Updates$
.forEach((pool) => pools.update(pool))
.catch((err) => {
// We expect certain types of errors to happen b/c of the migration.
// Anything else should be a fatal error.
if (
!isV2GraphQLReservesError(err) &&
!isV1GraphQLReservesError(err)
) {
console.error("Error updating pools", err);
Promise.reject(err);
}
});
setupPoolsV2OverWs(wsServer, poolsV2Updates$, pools);

// let initBalances = tokenBalancesFromArray(
Expand Down
8 changes: 4 additions & 4 deletions src/servers/ws/amm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ export function setupPoolsV2OverWs(
console.log("error sending known state", error);
}
});

const subscription = pools.subscribe((pool) => {
ws.send(JSON.stringify(pool), function () {
//
// Ignore errors.
//
ws.send(JSON.stringify(pool), function (err) {
console.error(`Error when sending data to the client over WS: ${err}`);
});
});

console.log("started feeding new client the pools events");

ws.on("error", console.error);
Expand Down
18 changes: 18 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,21 @@ export function runProgram(
onerror(e);
}
}

export function isV2GraphQLReservesError(err: any): boolean {
let v2QueryErrors = new Set([
`Value "blockTimestamp_ASC" does not exist in "PoolOrderByInput" enum.`,
`Cannot query field "blockTimestamp" on type "Pool".`,
]);
let errMsgs = Array.isArray(err) ? err : [err];
return errMsgs.every((errMsg) => v2QueryErrors.has(errMsg.message));
}

export function isV1GraphQLReservesError(err: any): boolean {
let v1QueryErrors = new Set([
`Cannot query field "lastUpdateTimestamp" on type "Pool".`,
`Value "lastUpdateTimestamp_ASC" does not exist in "PoolOrderByInput" enum. Did you mean the enum value "blockTimestamp_ASC"?`,
]);
let errMsgs = Array.isArray(err) ? err : [err];
return errMsgs.every((errMsg) => v1QueryErrors.has(errMsg.message));
}

0 comments on commit fec69f3

Please sign in to comment.