Skip to content

Commit

Permalink
fix(sdk): sample record validation errors (NangoHQ#2794)
Browse files Browse the repository at this point in the history
## Describe your changes

Fixes
https://linear.app/nango/issue/NAN-1833/reduce-validation-logs-andor-process-in-parallel

- Sample record validation errors
It's spamming the infra, making syncs slower than they should be.

- Send the sample in parallel
Don't remember why I didn't do it like this. It complexifies the code a
bit but faster; since we sample it's no longer a big problem but can be
useful if we increase the sampling or remove it.


> The wording is not crazy good, if you have suggestions...
<img width="1013" alt="Screenshot 2024-10-01 at 11 37 15"
src="https://github.com/user-attachments/assets/1a27a680-b847-4d07-b499-c74b64e5a760">

Co-authored-by: Khaliq <[email protected]>
  • Loading branch information
bodinsamuel and khaliqgant authored Oct 2, 2024
1 parent 560b80a commit b62def4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
4 changes: 3 additions & 1 deletion packages/shared/lib/sdk/dataValidation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ export function clearValidationCache() {
cache.clear();
}

export function validateData({ version, input, modelName, jsonSchema }: ValidateProps): true | (ErrorObject | Error)[] {
export type ValidateDataError = ErrorObject | Error;

export function validateData({ version, input, modelName, jsonSchema }: ValidateProps): true | ValidateDataError[] {
if (!jsonSchema) {
// For legacy reason, not all scripts have a jsonSchema
return true;
Expand Down
24 changes: 21 additions & 3 deletions packages/shared/lib/sdk/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
} from '@nangohq/utils';
import type { SyncConfig } from '../models/Sync.js';
import type { RunnerFlags } from '../services/sync/run.utils.js';
import type { ValidateDataError } from './dataValidation.js';
import { validateData } from './dataValidation.js';
import { NangoError } from '../utils/error.js';
import type { DBTeam, GetPublicIntegration, MessageRowInsert } from '@nangohq/types';
Expand Down Expand Up @@ -361,6 +362,7 @@ export interface EnvironmentVariable {
}

const MEMOIZED_CONNECTION_TTL = 60000;
const RECORDS_VALIDATION_SAMPLE = 5;

export const defaultPersistApi = axios.create({
baseURL: getPersistAPIUrl(),
Expand Down Expand Up @@ -930,6 +932,7 @@ export class NangoSync extends NangoAction {
}

// Validate records
const hasErrors: { data: any; validation: ValidateDataError[] }[] = [];
for (const record of results) {
const validation = validateData({
version: this.syncConfig?.version || '1',
Expand All @@ -941,16 +944,31 @@ export class NangoSync extends NangoAction {
continue;
}

hasErrors.push({ data: record, validation });
metrics.increment(metrics.Types.RUNNER_INVALID_SYNCS_RECORDS);

if (this.runnerFlags?.validateSyncRecords) {
break;
}
}
if (hasErrors.length > 0) {
if (this.dryRun) {
await this.log('Invalid record payload. Use `--validation` option to see the details', { level: 'warn' });
} else {
await this.log('Invalid record payload', { data: record, validation, model }, { level: 'warn' });
}
if (this.runnerFlags?.validateSyncRecords) {
throw new NangoError(`invalid_sync_record`, { data: record, validation, model });
throw new NangoError(`invalid_sync_record`, { ...hasErrors[0], model });
}

const sampled = hasErrors.length > RECORDS_VALIDATION_SAMPLE;
const sample = sampled ? hasErrors.slice(0, RECORDS_VALIDATION_SAMPLE) : hasErrors;
if (sampled) {
await this.log(`Invalid records: ${hasErrors.length} failed ${sampled ? `(sampled to ${RECORDS_VALIDATION_SAMPLE})` : ''}`, { level: 'warn' });
}
await Promise.all(
sample.map((log) => {
return this.log(`Invalid record payload`, { ...log, model }, { level: 'warn' });
})
);
}

if (this.dryRun) {
Expand Down

0 comments on commit b62def4

Please sign in to comment.