diff --git a/backend/bin/deal-observer-backend.js b/backend/bin/deal-observer-backend.js index 17a2c0a..d0ebf12 100644 --- a/backend/bin/deal-observer-backend.js +++ b/backend/bin/deal-observer-backend.js @@ -8,7 +8,7 @@ import '../lib/instrument.js' import { createInflux } from '../lib/telemetry.js' import { rpcRequest } from '../lib/rpc-service/service.js' import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, observeBuiltinActorEvents } from '../lib/deal-observer.js' -import { lookUpPayloadCids } from '../lib/look-up-payload-cids.js' +import { countStoredActiveDealsWithUnresolvedPayloadCid, lookUpPayloadCids } from '../lib/look-up-payload-cids.js' import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js' import { getDealPayloadCid } from '../lib/piece-indexer-service.js' /** @import {Queryable} from '@filecoin-station/deal-observer-db' */ @@ -124,7 +124,14 @@ export const lookUpPayloadCidsLoop = async (makeRpcRequest, getDealPayloadCid, p // Maximum number of deals to look up payload CIDs for in one loop iteration const maxDeals = 1000 try { - await lookUpPayloadCids(makeRpcRequest, getDealPayloadCid, pgPool, maxDeals) + const numOfPayloadCidsResolved = await lookUpPayloadCids(makeRpcRequest, getDealPayloadCid, pgPool, maxDeals) + const numOfUnresolvedPayloadCids = await countStoredActiveDealsWithUnresolvedPayloadCid(pgPool) + if (INFLUXDB_TOKEN) { + recordTelemetry('look_up_payload_cids_stats', point => { + point.intField('total_unresolved_payload_cids', numOfUnresolvedPayloadCids) + point.intField('payload_cids_resolved', numOfPayloadCidsResolved) + }) + } } catch (e) { console.error(e) Sentry.captureException(e) diff --git a/backend/lib/look-up-payload-cids.js b/backend/lib/look-up-payload-cids.js index 1175bf0..022d96f 100644 --- a/backend/lib/look-up-payload-cids.js +++ b/backend/lib/look-up-payload-cids.js @@ -12,17 +12,20 @@ import { getMinerPeerId } from './rpc-service/service.js' * @param {function} getDealPayloadCid * @param {Queryable} pgPool * @param {number} maxDeals - * @returns {Promise} + * @returns {Promise} */ export const lookUpPayloadCids = async (makeRpcRequest, getDealPayloadCid, pgPool, maxDeals) => { - for (const deal of await fetchDealsWithNoPayloadCid(pgPool, maxDeals)) { + let missingPayloadCidsResolved = 0 + for (const deal of await fetchDealsWithUnresolvedPayloadCid(pgPool, maxDeals)) { const minerPeerId = await getMinerPeerId(deal.miner_id, makeRpcRequest) const payloadCid = await getDealPayloadCid(minerPeerId, deal.piece_cid) if (payloadCid) { deal.payload_cid = payloadCid - await updatePayloadInActiveDeal(pgPool, deal) + await updatePayloadCidInActiveDeal(pgPool, deal) + missingPayloadCidsResolved++ } } + return missingPayloadCidsResolved } /** @@ -30,17 +33,23 @@ export const lookUpPayloadCids = async (makeRpcRequest, getDealPayloadCid, pgPoo * @param {number} maxDeals * @returns {Promise>>} */ -async function fetchDealsWithNoPayloadCid (pgPool, maxDeals) { +async function fetchDealsWithUnresolvedPayloadCid (pgPool, maxDeals) { const query = 'SELECT * FROM active_deals WHERE payload_cid IS NULL ORDER BY activated_at_epoch ASC LIMIT $1' return await loadDeals(pgPool, query, [maxDeals]) } +export async function countStoredActiveDealsWithUnresolvedPayloadCid (pgPool) { + const query = 'SELECT COUNT(*) FROM active_deals WHERE payload_cid IS NULL' + const result = await pgPool.query(query) + return result.rows[0].count +} + /** * @param {Queryable} pgPool * @param {Static} deal * @returns { Promise} */ -async function updatePayloadInActiveDeal (pgPool, deal) { +async function updatePayloadCidInActiveDeal (pgPool, deal) { const updateQuery = ` UPDATE active_deals SET payload_cid = $1 diff --git a/backend/test/look-up-payload-cids.test.js b/backend/test/look-up-payload-cids.test.js index b32b3b3..caf7a51 100644 --- a/backend/test/look-up-payload-cids.test.js +++ b/backend/test/look-up-payload-cids.test.js @@ -7,7 +7,7 @@ import { fetchAndStoreActiveDeals } from '../lib/deal-observer.js' import assert from 'assert' import { minerPeerIds } from './test_data/minerInfo.js' import { payloadCIDs } from './test_data/payloadCIDs.js' -import { lookUpPayloadCids } from '../lib/look-up-payload-cids.js' +import { countStoredActiveDealsWithUnresolvedPayloadCid, lookUpPayloadCids } from '../lib/look-up-payload-cids.js' describe('deal-observer-backend look up payload CIDs', () => { const makeRpcRequest = async (method, params) => { @@ -63,4 +63,19 @@ describe('deal-observer-backend look up payload CIDs', () => { 85 // Not all deals have a payload CID in the test data ) }) + + it('piece indexer count number of missing payload CIDs', async () => { + let missingPayloadCids = await countStoredActiveDealsWithUnresolvedPayloadCid(pgPool) + assert.strictEqual(missingPayloadCids, 336n) + const getDealPayloadCidCalls = [] + const getDealPayloadCid = async (providerId, pieceCid) => { + getDealPayloadCidCalls.push({ providerId, pieceCid }) + const payloadCid = payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid })) + return payloadCid?.payloadCid + } + + await lookUpPayloadCids(makeRpcRequest, getDealPayloadCid, pgPool, 10000) + missingPayloadCids = await countStoredActiveDealsWithUnresolvedPayloadCid(pgPool) + assert.strictEqual(missingPayloadCids, 85n) + }) })