Skip to content

Commit

Permalink
observer loop implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolas Haimerl authored and Nikolas Haimerl committed Jan 17, 2025
2 parents fc88c78 + ee27e7c commit 9a238da
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 49 deletions.
50 changes: 34 additions & 16 deletions backend/bin/deal-observer-backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,56 @@ import { ethers } from 'ethers'
import assert from 'node:assert/strict'
import timers from 'node:timers/promises'
import slug from 'slug'
import { GLIF_RPC, rpcHeaders } from '../lib/config.js'
import { RPC_URL, rpcHeaders } from '../lib/config.js'
import '../lib/instrument.js'
import {
DealObserver
} from '../lib/deal-observer.js'
import { createInflux } from '../lib/telemetry.js'

const { INFLUXDB_TOKEN } = process.env
assert(INFLUXDB_TOKEN, 'INFLUXDB_TOKEN required')
//const { INFLUXDB_TOKEN } = process.env
//assert(INFLUXDB_TOKEN, 'INFLUXDB_TOKEN required')

const pgPool = await createPgPool()
// Filecoin will need some epochs to reach finality.
// We do not want to fetch deals that are newer than the current chain head - 900 epochs.
const finalityEpochs = 900

const fetchRequest = new ethers.FetchRequest(GLIF_RPC)
const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
// const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true })

const { recordTelemetry } = createInflux(INFLUXDB_TOKEN)
// const { recordTelemetry } = createInflux(INFLUXDB_TOKEN)

const loop = async (name, fn, interval) => {
const loop = async (dealObserver, name, interval) => {
while (true) {
const start = Date.now()
try {
await fn()
// If the store is empty we set the lastEpochStore to 0 and start fetching from the current chain head
let currentChainHead = await dealObserver.getChainHead()
let currentFinalizedChainHead = currentChainHead.Height - finalityEpochs
let lastEpochStored = await dealObserver.getLastStoredHeight() ?? currentFinalizedChainHead - 1
if (lastEpochStored < currentFinalizedChainHead) {
// TODO: The free plan does not allow for fetching epochs older than 2000 blocks. We need to account for that.
// TODO: Since we call each epoch individually and the db write is not the bottleneck we can parallelize this process
for (const epoch of Array.from({ length: currentFinalizedChainHead - lastEpochStored + 1 }, (_, i) => i + lastEpochStored)){
await dealObserver.observeBuiltinActorEvents(epoch)
}
}
lastEpochStored = await dealObserver.getLastStoredHeight()
// The storage should now be up to date with the last epoch stored
assert(lastEpochStored == currentFinalizedChainHead, 'Last stored height should not be less than current finalized chain head')
} catch (e) {
console.error(e)
Sentry.captureException(e)
}
const dt = Date.now() - start
console.log(`Loop "${name}" took ${dt}ms`)
console.log(`Loop ${name}" took ${dt}ms`)

recordTelemetry(`loop_${slug(name, '_')}`, point => {
point.intField('interval_ms', interval)
point.intField('duration_ms', dt)
})
// recordTelemetry(`loop_${slug(name, '_')}`, point => {
// point.intField('interval_ms', interval)
// point.intField('duration_ms', dt)
// })

if (dt < interval) {
await timers.setTimeout(interval - dt)
Expand All @@ -46,9 +62,11 @@ const loop = async (name, fn, interval) => {
}

await Promise.all([
loop(
'Built-in actor events',
() => new DealObserver(pgPool),
30_000
DealObserver.create(pgPool).then(async (dealObserver) =>
await loop(
dealObserver,
'Built-in actor events',
30_000
)
)
])
8 changes: 4 additions & 4 deletions backend/lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ const {
} = process.env

const rpcUrls = RPC_URLS.split(',')
const GLIF_RPC = rpcUrls[Math.floor(Math.random() * rpcUrls.length)]
console.log(`Selected JSON-RPC endpoint ${GLIF_RPC}`)
const RPC_URL = rpcUrls[Math.floor(Math.random() * rpcUrls.length)]
console.log(`Selected JSON-RPC endpoint ${RPC_URLS}`)

const rpcHeaders = {}
if (GLIF_RPC.includes('glif')) {
if (RPC_URLS.includes('glif')) {
rpcHeaders.Authorization = `Bearer ${GLIF_TOKEN}`
}

export {
GLIF_RPC,
RPC_URL,
rpcHeaders
}
121 changes: 109 additions & 12 deletions backend/lib/deal-observer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/** @import {Queryable} from '@filecoin-station/deal-observer-db' */
/** @import {Provider} from 'ethers' */
import { parse } from '@ipld/dag-json'
import assert from 'node:assert'

import { ActorEventFilter, RpcApiClient } from './rpc-service/service.js'

Expand All @@ -12,28 +14,123 @@ class DealObserver {
// TODO: Store events in pgPool
this.#pgPool = pgPool
this.#cache = new Map()
this.#cache.set('chainHead', chainHead)
this.#cache.set('lastStoredHeight', null)
}

async build () {
this.#rpcApiClient = await (new RpcApiClient()).build()
if (!this.#cache.get('chainHead')) {
const chainHead = await this.#rpcApiClient.getChainHead()
this.#cache.set('chainHead', chainHead)
}
this.#rpcApiClient = await RpcApiClient.create()
return this
}

static async create (pgPool = null, chainHead = null) {
const observer = new DealObserver(pgPool, chainHead)
const observer = new DealObserver(pgPool)
return await observer.build()
}

async observeBuiltinActorEvents (blockHeight = this.#cache.get('chainHead').Height, eventType = 'claim') {
return this.#rpcApiClient.getActorEvents(new ActorEventFilter(blockHeight, eventType))
async getChainHead () {
return await this.#rpcApiClient.getChainHead()
}

async observeBuiltinActorEvents (blockHeight, eventType = 'claim') {
let activeDeals = await this.#rpcApiClient.getActorEvents(new ActorEventFilter(blockHeight, eventType))
assert(activeDeals != undefined, `No ${eventType} events found in block ${blockHeight}`)
await this.storeActiveDeals(activeDeals)
// Update the last stored height in the cache
this.#cache.set('lastStoredHeight', activeDeals.length > 0 ? activeDeals[activeDeals.length - 1].height : blockHeight)
}

async getLastStoredHeight () {
const cachedLastStoredHeight = this.#cache.get('lastStoredHeight');
if (!cachedLastStoredHeight){
let latestDeal = await this.fetchDealWithHighestActivatedEpoch();
return latestDeal? latestDeal.height : null;
}
return cachedLastStoredHeight
}
}

export {
DealObserver
async fetchDealWithHighestActivatedEpoch(){
const client = await this.#pgPool.connect();
const query = "SELECT * FROM active_deals ORDER BY activated_at_epoch DESC LIMIT 1";
const result = await client.query(query);
client.release();
return result.rows.length > 0 ? result.rows[0] : null;
}

async storeActiveDeals(activeDeals){
const client = await this.#pgPool.connect();
try {
// Start a transaction
await client.query('BEGIN');

// Insert deals in a batch
const insertQuery = `
INSERT INTO active_deals (
activated_at_epoch,
miner,
client,
piece_cid,
piece_size,
term_start_epoch,
term_min,
term_max,
sector,
payload_cid
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9 ,$10)
`;

// Loop through the deals array and execute the insert query for each deal
for (const deal of activeDeals) {
await client.query(insertQuery, [
deal.height,
deal.event.provider,
deal.event.client,
deal.event.pieceCid,
deal.event.pieceSize,
deal.event.termStart,
deal.event.termMin,
deal.event.termMax,
deal.event.sector,
null
]);
}

// Commit the transaction if all inserts are successful
await client.query('COMMIT');
console.log('All deals inserted successfully.');

} catch (error) {
// If any error occurs, roll back the transaction
await client.query('ROLLBACK');
console.error('Error inserting deals. Rolling back:', error.message);
} finally {
// Release the client back to the pool
client.release();
}
}
async fetchActiveDealsByKey(searchParams){
function buildQuery(baseQuery, conditions) {
const queryParts = [];

// Iterate through conditions and build the WHERE clauses dynamically
conditions.forEach(({key,value}) => {
// Add the condition to the queryParts array
queryParts.push(`${key} = ${value}`);
});

// Combine base query with dynamically built WHERE clause
const finalQuery = queryParts.length > 0 ? `${baseQuery} ${queryParts.join(' AND ')}` : baseQuery;

return finalQuery;
}
const client = await this.#pgPool.connect();
const baseQuery = "SELECT * FROM active_deals WHERE";
const query = buildQuery(baseQuery, searchParams);
let result = await client.query(query)
result = parse(result)
client.release();
return result
}
}

export { DealObserver }
27 changes: 14 additions & 13 deletions backend/lib/rpc-service/service.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { GLIF_RPC } from '../config.js'
import { RPC_URL } from '../config.js'
import { base64pad } from 'multiformats/bases/base64'
import { encode as cborEncode } from '@ipld/dag-cbor'
import { decode as jsonDecode } from '@ipld/dag-json'
Expand All @@ -8,7 +8,7 @@ import { rawEventEntriesToEvent } from './utils.js'

const makeRpcRequest = async (method, params) => {
const reqBody = JSON.stringify({ method, params, id: 1, jsonrpc: '2.0' })
const response = await request(GLIF_RPC, {
const response = await request(RPC_URL, {
bodyTimeout: 1000 * 60,
headersTimeout: 1000 * 60,
method: 'POST',
Expand All @@ -25,16 +25,16 @@ class RpcApiClient {
#ipldSchema
#make_rpc_request

constructor (rpcRequest) {
constructor(rpcRequest) {
this.#make_rpc_request = rpcRequest
}

async build () {
async build() {
this.#ipldSchema = await (new IpldSchemaValidator()).build()
return this
}

static async create (rpcRequest = makeRpcRequest) {
static async create(rpcRequest = makeRpcRequest) {
const apiClient = new RpcApiClient(rpcRequest)
return apiClient.build()
}
Expand All @@ -44,8 +44,12 @@ class RpcApiClient {
* Returns actor events filtered by the given actorEventFilter
* @returns {Promise<object>}
*/
async getActorEvents (actorEventFilter) {
async getActorEvents(actorEventFilter) {
const rawEvents = (await this.#make_rpc_request('Filecoin.GetActorEventsRaw', [actorEventFilter]))
if (rawEvents && rawEvents.length === 0) {
console.log(`No actor events found in the height range ${actorEventFilter.fromHeight} - ${actorEventFilter.toHeight}.`)
return []
}
const typedRawEventEntries = rawEvents.map((rawEvent) => this.#ipldSchema.applyType(
'RawActorEvent', rawEvent
))
Expand All @@ -65,7 +69,7 @@ class RpcApiClient {
return emittedEvents
}

async getChainHead () {
async getChainHead() {
return await this.#make_rpc_request('Filecoin.ChainHead', [])
}
}
Expand All @@ -75,19 +79,16 @@ class ActorEventFilter {
* @param {number} blockHeight
* @param {string} eventTypeString
*/
constructor (blockHeight, eventTypeString) {
constructor(blockHeight, eventTypeString) {
// We only search for events in a single block
this.fromHeight = blockHeight
this.toHeight = blockHeight
this.fields = {
$type: // string must be encoded as CBOR and then presented as a base64 encoded string
// Codec 81 is CBOR and will only give us builtin-actor events, FEVM events are all RAW
{ Codec: 81, Value: base64pad.baseEncode(cborEncode(eventTypeString)) }
[{ Codec: 81, Value: base64pad.baseEncode(cborEncode(eventTypeString)) }]
}
}
}

export {
ActorEventFilter,
RpcApiClient
}
export { RpcApiClient, ActorEventFilter }
13 changes: 9 additions & 4 deletions backend/test/observer.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import assert from 'node:assert'
import { after, before, beforeEach, describe, it } from 'node:test'
import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db'
import { IpldSchemaValidator } from '../lib/rpc-service/ipld-schema-validator.js'
import assert from 'assert'
import { claimTestEvent } from './test_data/claimEvent.js'
import { ActorEventFilter, RpcApiClient } from '../lib/rpc-service/service.js'
import { chainHeadTestData } from './test_data/chainHead.js'
Expand All @@ -21,15 +21,20 @@ describe('deal-observer-backend', () => {
})

describe('observeBuiltinActorEvents', () => {
let providerMock
beforeEach(async () => {
// TODO: reset DB
// await pgPool.query('DELETE FROM daily_reward_transfers')
await pgPool.query('DELETE FROM active_deals')

providerMock = {
getBlockNumber: async () => 2000
}
})

// TODO - remove this placeholder and implement proper tests
it('adds new FIL+ deals from built-in actor events', async () => {
})
})
})

describe('IPLD Schema Validator', () => {
let claimEvent
Expand Down
2 changes: 2 additions & 0 deletions db/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import Postgrator from 'postgrator'
/** @typedef {import('./typings.js').Queryable} Queryable */
/** @typedef {import('./typings.js').PgPool} PgPool */

pg.types.setTypeParser(20, BigInt) // Type Id 20 = BIGINT | BIGSERIAL

const {
// DATABASE_URL points to `spark_deal_observer` database managed by this monorepo
DATABASE_URL = 'postgres://localhost:5432/spark_deal_observer'
Expand Down
14 changes: 14 additions & 0 deletions db/migrations/002.do.active-deals.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE active_deals (
activated_at_epoch INT NOT NULL,
miner INT NOT NULL,
client INT NOT NULL,
piece_cid TEXT NOT NULL,
piece_size BIGINT NOT NULL,
term_start_epoch INT NOT NULL,
term_min INT NOT NULL,
term_max INT NOT NULL,
sector BIGINT NOT NULL,
payload_cid TEXT
);


0 comments on commit 9a238da

Please sign in to comment.