Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: break fraudAssessment into evaluations #442

Merged
merged 21 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7b7c90c
refactor: break fraudAssessment into evaluations
bajtos Jan 9, 2025
9e59ed6
fixup! remove unused import
bajtos Jan 9, 2025
07be2cc
Merge branch 'main' into refactor-fraud-assessment-step2
bajtos Jan 9, 2025
1392633
fixup! add FIXME comments
bajtos Jan 10, 2025
11d896e
test: we don't reward measurements that are not in majority
bajtos Jan 10, 2025
8731b08
Merge branch 'main' into refactor-fraud-assessment-step2
bajtos Jan 10, 2025
01dc39f
test: counts only majority measurements as accepted
bajtos Jan 10, 2025
c08c10a
test: records histogram of "score per inet group"
bajtos Jan 10, 2025
aa08724
Update bin/dry-run.js
bajtos Jan 10, 2025
f8a0505
fix evaluate-measurements
bajtos Jan 10, 2025
16938d8
preserve behaviour in bin/evaluate-measurements.js
bajtos Jan 10, 2025
52390bd
Apply suggestions from code review
bajtos Jan 13, 2025
1aba267
fixup! removeu unnecessary check
bajtos Jan 13, 2025
a6a84bc
Merge branch 'main' into refactor-fraud-assessment-step2
bajtos Jan 15, 2025
7a5db2d
Rename to consensusEvaluation, MAJORITY_RESULT and commitee.evaluatio…
bajtos Jan 21, 2025
a88ffba
Merge remote-tracking branch 'origin/main' into refactor-fraud-assess…
bajtos Jan 21, 2025
85c882d
rename to ConsensusCheckError
bajtos Jan 21, 2025
c9d91eb
rename to ConsensusEvaluationDetails
bajtos Jan 21, 2025
3680486
rename to ConsensusNotFoundReason, CommitteeDecision, indexMajorityFo…
bajtos Jan 22, 2025
e141918
Merge branch 'main' into refactor-fraud-assessment-step2
bajtos Jan 22, 2025
eee1d77
Merge branch 'main' into refactor-fraud-assessment-step2
bajtos Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/dry-run.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ if (ignoredErrors.length) {
}

if (DUMP) {
const props = ['cid', 'minerId', 'participantAddress', 'inet_group', 'retrievalResult', 'fraudAssessment']
const props = ['cid', 'minerId', 'participantAddress', 'inet_group', 'retrievalResult', 'taskingEvaluation', 'majorityEvaluation']
for (const k of Object.keys(round.measurements[0])) {
if (!props.includes(k)) props.push(k)
}
Expand Down
32 changes: 19 additions & 13 deletions bin/evaluate-measurements.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const EVALUATION_NDJSON_FILE = `${basename(measurementsPath, '.ndjson')}.evaluat
const evaluationTxtWriter = fs.createWriteStream(EVALUATION_TXT_FILE)
const evaluationNdjsonWriter = fs.createWriteStream(EVALUATION_NDJSON_FILE)

evaluationTxtWriter.write(formatHeader({ includeFraudAssesment: keepRejected }) + '\n')
evaluationTxtWriter.write(formatHeader({ includeEvaluation: keepRejected }) + '\n')

const resultCounts = {
total: 0
Expand Down Expand Up @@ -98,22 +98,26 @@ async function processRound (roundIndex, measurements, resultCounts) {
})

for (const m of round.measurements) {
if (m.fraudAssessment !== 'OK') continue
// FIXME: we should include non-majority measurements too
// See https://github.com/filecoin-station/spark-evaluate/pull/396
if (m.taskingEvaluation !== 'OK' && m.majorityEvaluation === 'OK') continue
resultCounts.total++
resultCounts[m.retrievalResult] = (resultCounts[m.retrievalResult] ?? 0) + 1
}

if (!keepRejected) {
round.measurements = round.measurements
// Keep accepted measurements only
.filter(m => m.fraudAssessment === 'OK')
// Remove the fraudAssessment field as all accepted measurements have the same 'OK' value
.map(m => ({ ...m, fraudAssessment: undefined }))
// FIXME: we should include non-majority measurements too
// See https://github.com/filecoin-station/spark-evaluate/pull/396
.filter(m => m.taskingEvaluation === 'OK' && m.majorityEvaluation === 'OK')
// Remove the taskingEvaluation and majorityEvaluation fields as all accepted measurements have the same 'OK' value
.map(m => ({ ...m, taskingEvaluation: undefined, majorityEvaluation: undefined }))
}

evaluationTxtWriter.write(
round.measurements
.map(m => formatMeasurement(m, { includeFraudAssesment: keepRejected }) + '\n')
.map(m => formatMeasurement(m, { includeEvaluation: keepRejected }) + '\n')
.join('')
)
evaluationNdjsonWriter.write(
Expand Down Expand Up @@ -144,17 +148,19 @@ function isFlagEnabled (envVarValue) {
/**
* @param {import('../lib/preprocess.js').Measurement} m
* @param {object} options
* @param {boolean} [options.includeFraudAssesment]
* @param {boolean} [options.includeEvaluation]
*/
function formatMeasurement (m, { includeFraudAssesment } = {}) {
function formatMeasurement (m, { includeEvaluation } = {}) {
const fields = [
new Date(m.finished_at).toISOString(),
(m.cid ?? '').padEnd(70),
(m.protocol ?? '').padEnd(10)
]

if (includeFraudAssesment) {
fields.push((m.fraudAssessment === 'OK' ? '🫡 ' : '🙅 '))
if (includeEvaluation) {
// FIXME: we should distinguish tasking and majority evaluation
// See https://github.com/filecoin-station/spark-evaluate/pull/396
fields.push((m.taskingEvaluation === 'OK' && m.majorityEvaluation === 'OK' ? '🫡 ' : '🙅 '))
}

fields.push((m.retrievalResult ?? ''))
Expand All @@ -164,16 +170,16 @@ function formatMeasurement (m, { includeFraudAssesment } = {}) {

/**
* @param {object} options
* @param {boolean} [options.includeFraudAssesment]
* @param {boolean} [options.includeEvaluation]
*/
function formatHeader ({ includeFraudAssesment } = {}) {
function formatHeader ({ includeEvaluation } = {}) {
const fields = [
'Timestamp'.padEnd(new Date().toISOString().length),
'CID'.padEnd(70),
'Protocol'.padEnd(10)
]

if (includeFraudAssesment) {
if (includeEvaluation) {
fields.push('🕵️ ')
}

Expand Down
11 changes: 7 additions & 4 deletions lib/committee.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class Committee {
addMeasurement (m) {
assert.strictEqual(m.cid, this.retrievalTask.cid, 'cid must match')
assert.strictEqual(m.minerId, this.retrievalTask.minerId, 'minerId must match')
assert.strictEqual(m.fraudAssessment, 'OK', 'only accepted measurements can be added')
assert.strictEqual(m.taskingEvaluation, 'OK', 'only measurements accepted by task evaluation can be added')
this.#measurements.push(m)
}

Expand All @@ -75,7 +75,7 @@ export class Committee {
hasRetrievalMajority: false,
retrievalResult: 'COMMITTEE_TOO_SMALL'
}
for (const m of this.#measurements) m.fraudAssessment = 'COMMITTEE_TOO_SMALL'
for (const m of this.#measurements) m.majorityEvaluation = 'COMMITTEE_TOO_SMALL'
return
}

Expand Down Expand Up @@ -115,12 +115,15 @@ export class Committee {
let retrievalResult
if (retrievalResultMajority) {
retrievalResult = retrievalResultMajority.majorityValue.retrievalResult
for (const m of retrievalResultMajority.majorityMeasurements) {
m.majorityEvaluation = 'OK'
}
for (const m of retrievalResultMajority.minorityMeasurements) {
m.fraudAssessment = 'MINORITY_RESULT'
m.majorityEvaluation = 'MINORITY_RESULT'
}
} else {
retrievalResult = 'MAJORITY_NOT_FOUND'
for (const m of this.#measurements) m.fraudAssessment = 'MAJORITY_NOT_FOUND'
for (const m of this.#measurements) m.majorityEvaluation = 'MAJORITY_NOT_FOUND'
}

this.evaluation = {
Expand Down
46 changes: 24 additions & 22 deletions lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ export const evaluate = async ({
requiredCommitteeSize,
logger
})
const honestMeasurements = measurements.filter(m => m.fraudAssessment === 'OK')
const measurementsToReward = measurements.filter(
m => m.taskingEvaluation === 'OK' && m.majorityEvaluation === 'OK'
)
bajtos marked this conversation as resolved.
Show resolved Hide resolved

// Calculate reward shares
const participants = {}
let sum = 0n
for (const measurement of honestMeasurements) {
for (const measurement of measurementsToReward) {
if (!participants[measurement.participantAddress]) {
participants[measurement.participantAddress] = 0n
}
Expand All @@ -76,7 +78,7 @@ export const evaluate = async ({
for (const [participantAddress, participantTotal] of Object.entries(participants)) {
const score = participantTotal *
MAX_SCORE /
BigInt(honestMeasurements.length)
BigInt(measurementsToReward.length)
participants[participantAddress] = score
sum += score
}
Expand All @@ -88,24 +90,24 @@ export const evaluate = async ({
logger.log('EVALUATE ROUND %s: added %s as rounding to MAX_SCORE', roundIndex, delta)
}

// Calculate aggregates per fraud detection outcome
// Calculate aggregates per evaluation outcome
// This is used for logging and telemetry
/** @type {Partial<Record<import('./typings.js').FraudAssesment, number>>} */
const fraudAssessments = {
/** @type {Partial<Record<import('./typings.js').TaskingEvaluation | import('./typings.js').CommitteeCheckError, number>>} */
const evaluationOutcomes = {
OK: 0,
TASK_NOT_IN_ROUND: 0,
DUP_INET_GROUP: 0,
TOO_MANY_TASKS: 0
}
for (const m of measurements) {
fraudAssessments[m.fraudAssessment] = (fraudAssessments[m.fraudAssessment] ?? 0) + 1
evaluationOutcomes[m.taskingEvaluation] = (evaluationOutcomes[m.taskingEvaluation] ?? 0) + 1
}
logger.log(
'EVALUATE ROUND %s: Evaluated %s measurements, found %s honest entries.\n%o',
'EVALUATE ROUND %s: Evaluated %s measurements, rewarding %s entries.\n%o',
roundIndex,
measurements.length,
honestMeasurements.length,
fraudAssessments
measurementsToReward.length,
evaluationOutcomes
)

const fraudDetectionDuration = Date.now() - started
Expand Down Expand Up @@ -138,11 +140,11 @@ export const evaluate = async ({
point.intField('total_participants', Object.keys(participants).length)
point.intField('total_measurements', measurements.length)
point.intField('total_nodes', countUniqueNodes(measurements))
point.intField('honest_measurements', honestMeasurements.length)
point.intField('honest_measurements', measurementsToReward.length)
point.intField('set_scores_duration_ms', setScoresDuration)
point.intField('fraud_detection_duration_ms', fraudDetectionDuration)

for (const [type, count] of Object.entries(fraudAssessments)) {
for (const [type, count] of Object.entries(evaluationOutcomes)) {
point.intField(`measurements_${type}`, count)
}
})
Expand All @@ -152,7 +154,9 @@ export const evaluate = async ({
try {
recordTelemetry('retrieval_stats_honest', (point) => {
point.intField('round_index', roundIndex)
buildRetrievalStats(honestMeasurements, point)
// FIXME: Include non-majority measurements in these stats
// See https://github.com/filecoin-station/spark-evaluate/issues/446
buildRetrievalStats(measurementsToReward, point)
})
} catch (err) {
console.error('Cannot record retrieval stats (honest).', err)
Expand Down Expand Up @@ -271,8 +275,6 @@ export const runFraudDetection = async ({
// or missing some of the required fields like `inet_group`
//
for (const m of measurements) {
if (m.fraudAssessment) continue

// sanity checks to get nicer errors if we forget to set required fields in unit tests
assert(typeof m.inet_group === 'string', 'missing inet_group')
assert(typeof m.finished_at === 'number', 'missing finished_at')
Expand All @@ -281,15 +283,15 @@ export const runFraudDetection = async ({
t => t.cid === m.cid && t.minerId === m.minerId
)
if (!isValidTask) {
m.fraudAssessment = 'TASK_NOT_IN_ROUND'
m.taskingEvaluation = 'TASK_NOT_IN_ROUND'
continue
}

const isValidTaskForNode = tasksAllowedForStations.get(m.stationId).some(
t => t.cid === m.cid && t.minerId === m.minerId
)
if (!isValidTaskForNode) {
m.fraudAssessment = 'TASK_WRONG_NODE'
m.taskingEvaluation = 'TASK_WRONG_NODE'
}
}

Expand All @@ -299,7 +301,7 @@ export const runFraudDetection = async ({
/** @type {Map<string, Measurement[]>} */
const inetGroups = new Map()
for (const m of measurements) {
if (m.fraudAssessment) continue
if (m.taskingEvaluation) continue

const key = m.inet_group
let group = inetGroups.get(key)
Expand Down Expand Up @@ -347,18 +349,18 @@ export const runFraudDetection = async ({

if (tasksSeen.has(taskId)) {
debug(' pa: %s h: %s task: %s - task was already rewarded', m.participantAddress, h, taskId)
m.fraudAssessment = 'DUP_INET_GROUP'
m.taskingEvaluation = 'DUP_INET_GROUP'
continue
}

if (tasksSeen.size >= sparkRoundDetails.maxTasksPerNode) {
debug(' pa: %s h: %s task: %s - already rewarded max tasks', m.participantAddress, h, taskId)
m.fraudAssessment = 'TOO_MANY_TASKS'
m.taskingEvaluation = 'TOO_MANY_TASKS'
continue
}

tasksSeen.add(taskId)
m.fraudAssessment = 'OK'
m.taskingEvaluation = 'OK'
debug(' pa: %s h: %s task: %s - REWARD', m.participantAddress, h, taskId)
}
}
Expand All @@ -372,7 +374,7 @@ export const runFraudDetection = async ({
// needs is to iterate over the accepted measurements once.
const iterateAcceptedMeasurements = function * () {
for (const m of measurements) {
if (m.fraudAssessment !== 'OK') continue
if (m.taskingEvaluation !== 'OK') continue
yield m
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/platform-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export const updateStationsAndParticipants = async (
}

stationStats.total++
if (m.fraudAssessment === 'OK') stationStats.accepted++
if (m.taskingEvaluation === 'OK' && m.majorityEvaluation === 'OK') stationStats.accepted++
bajtos marked this conversation as resolved.
Show resolved Hide resolved
juliangruber marked this conversation as resolved.
Show resolved Hide resolved

let subnetsSet = subnets.get(participantId)
if (!subnetsSet) {
Expand Down
6 changes: 4 additions & 2 deletions lib/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ export class Measurement {
// Note: providerId is recorded by spark-publish but we don't use it for evaluations yet
this.providerId = pointerize(m.provider_id)
this.spark_version = pointerize(m.spark_version)
/** @type {import('./typings.js').FraudAssesment} */
this.fraudAssessment = null
/** @type {import('./typings.js').TaskingEvaluation} */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since initial value is set to null by default we might should set property type to be optional, maybe something like:

/** @type {import('./typings.js').TaskingEvaluation?} */

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed 👍

This pull request is not making this any worse, so let's leave such improvement out of the scope, please.

this.taskingEvaluation = null
/** @type {import('./typings.js').MajorityEvaluation} */
this.majorityEvaluation = null
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
this.inet_group = pointerize(m.inet_group)
this.finished_at = parseDateTime(m.finished_at)
this.provider_address = pointerize(m.provider_address)
Expand Down
7 changes: 4 additions & 3 deletions lib/retrieval-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ export const buildRetrievalStats = (measurements, telemetryPoint) => {
const endAt = m.end_at
const ttfb = startAt && firstByteAt && (firstByteAt - startAt)
const duration = startAt && endAt && (endAt - startAt)
const isAccepted = m.taskingEvaluation === 'OK' && m.majorityEvaluation === 'OK'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: check if we need to add a test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in c08c10a

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To double check: We will now be calling measurements that will be rewarded "accepted measurements", right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is a good call! 🤔

I want to call measurements that will be rewarded "measurementsToReward"; that's what I use inside the main evaluation loop.

Everywhere else, we need to decide whether we are interested in measurements that passed tasking evaluation but may be in the minority, or whether we are interested in measurements that will be rewarded (passed the tasking evalution and are in majority).

In that light, maybe it would be better to avoid using the term "accepted measurements" altogether. Or qualify it as "measurements accepted by tasking"?

I don't have a good answer 😞


debug('size=%s ttfb=%s duration=%s status=%s valid? %s', byteLength, ttfb, duration, m.status_code, m.fraudAssessment === 'OK')
debug('size=%s ttfb=%s duration=%s status=%s accepted? %s', byteLength, ttfb, duration, m.status_code, isAccepted)
if (byteLength !== undefined && m.status_code === 200) {
downloadBandwidth += byteLength
sizeValues.push(byteLength)
Expand All @@ -93,7 +94,7 @@ export const buildRetrievalStats = (measurements, telemetryPoint) => {
const node = `${m.inet_group}::${m.participantAddress}`
tasksPerNode.set(node, (tasksPerNode.get(node) ?? 0) + 1)

if (m.fraudAssessment === 'OK') {
if (isAccepted) {
acceptedMeasurementsPerInetGroup.set(m.inet_group, (acceptedMeasurementsPerInetGroup.get(m.inet_group) ?? 0) + 1)
}

Expand Down Expand Up @@ -222,7 +223,7 @@ export const recordCommitteeSizes = (committees, point) => {
data.nodes.add(`${m.inet_group}::${m.participantAddress}`)
data.measurements++

if (m.fraudAssessment === 'OK') {
if (m.taskingEvaluation === 'OK' && m.majorityEvaluation === 'OK') {
bajtos marked this conversation as resolved.
Show resolved Hide resolved
NikolasHaimerl marked this conversation as resolved.
Show resolved Hide resolved
data.majoritySize = (data.majoritySize ?? 0) + 1
}
}
Expand Down
5 changes: 4 additions & 1 deletion lib/round.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/** @import { Measurement } from './preprocess.js' */

export class RoundData {
/** @type {Map<string, string>} */
#knownStrings
Expand All @@ -7,8 +9,9 @@ export class RoundData {
*/
constructor (index) {
this.index = index
/** @type {import('./preprocess.js').Measurement[]} */
/** @type {string[]} */
this.measurementBatches = []
/** @type {Measurement[]} */
this.measurements = []
bajtos marked this conversation as resolved.
Show resolved Hide resolved
this.details = null
this.#knownStrings = new Map()
Expand Down
8 changes: 5 additions & 3 deletions lib/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ export type CommitteeCheckError =
| 'MAJORITY_NOT_FOUND'
| 'MINORITY_RESULT'

export type MajorityEvaluation =
| 'OK'
| CommitteeCheckError
bajtos marked this conversation as resolved.
Show resolved Hide resolved

// When adding a new enum value, remember to update the summary initializer inside `evaluate()`
export type FraudAssesment =
export type TaskingEvaluation =
| 'OK'
| 'TASK_NOT_IN_ROUND'
| 'TASK_WRONG_NODE'
| 'DUP_INET_GROUP'
| 'TOO_MANY_TASKS'
| CommitteeCheckError


// When adding a new enum value, remember to update the summary initializer inside `reportRetrievalStats()`
export type RetrievalResult =
Expand Down
Loading
Loading