diff --git a/packages/shared/lib/sdk/dataValidation.ts b/packages/shared/lib/sdk/dataValidation.ts index 518b911d000..a1a7941d255 100644 --- a/packages/shared/lib/sdk/dataValidation.ts +++ b/packages/shared/lib/sdk/dataValidation.ts @@ -17,7 +17,9 @@ export function clearValidationCache() { cache.clear(); } -export function validateData({ version, input, modelName, jsonSchema }: ValidateProps): true | (ErrorObject | Error)[] { +export type ValidateDataError = ErrorObject | Error; + +export function validateData({ version, input, modelName, jsonSchema }: ValidateProps): true | ValidateDataError[] { if (!jsonSchema) { // For legacy reason, not all scripts have a jsonSchema return true; diff --git a/packages/shared/lib/sdk/sync.ts b/packages/shared/lib/sdk/sync.ts index cde4ca1a113..d00b17dac45 100644 --- a/packages/shared/lib/sdk/sync.ts +++ b/packages/shared/lib/sdk/sync.ts @@ -19,6 +19,7 @@ import { } from '@nangohq/utils'; import type { SyncConfig } from '../models/Sync.js'; import type { RunnerFlags } from '../services/sync/run.utils.js'; +import type { ValidateDataError } from './dataValidation.js'; import { validateData } from './dataValidation.js'; import { NangoError } from '../utils/error.js'; import type { DBTeam, GetPublicIntegration, MessageRowInsert } from '@nangohq/types'; @@ -361,6 +362,7 @@ export interface EnvironmentVariable { } const MEMOIZED_CONNECTION_TTL = 60000; +const RECORDS_VALIDATION_SAMPLE = 5; export const defaultPersistApi = axios.create({ baseURL: getPersistAPIUrl(), @@ -930,6 +932,7 @@ export class NangoSync extends NangoAction { } // Validate records + const hasErrors: { data: any; validation: ValidateDataError[] }[] = []; for (const record of results) { const validation = validateData({ version: this.syncConfig?.version || '1', @@ -941,16 +944,31 @@ export class NangoSync extends NangoAction { continue; } + hasErrors.push({ data: record, validation }); metrics.increment(metrics.Types.RUNNER_INVALID_SYNCS_RECORDS); + if (this.runnerFlags?.validateSyncRecords) { + break; + } + } + if (hasErrors.length > 0) { if (this.dryRun) { await this.log('Invalid record payload. Use `--validation` option to see the details', { level: 'warn' }); - } else { - await this.log('Invalid record payload', { data: record, validation, model }, { level: 'warn' }); } if (this.runnerFlags?.validateSyncRecords) { - throw new NangoError(`invalid_sync_record`, { data: record, validation, model }); + throw new NangoError(`invalid_sync_record`, { ...hasErrors[0], model }); + } + + const sampled = hasErrors.length > RECORDS_VALIDATION_SAMPLE; + const sample = sampled ? hasErrors.slice(0, RECORDS_VALIDATION_SAMPLE) : hasErrors; + if (sampled) { + await this.log(`Invalid records: ${hasErrors.length} failed ${sampled ? `(sampled to ${RECORDS_VALIDATION_SAMPLE})` : ''}`, { level: 'warn' }); } + await Promise.all( + sample.map((log) => { + return this.log(`Invalid record payload`, { ...log, model }, { level: 'warn' }); + }) + ); } if (this.dryRun) {