Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(infra): decommission old instance of redis cluster #4824

Merged
merged 2 commits into from
Nov 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions apps/worker/src/app/workflow/services/cold-start.service.ts
Original file line number Diff line number Diff line change
@@ -2,28 +2,15 @@ import { INestApplication } from '@nestjs/common';
import { INovuWorker, ReadinessService } from '@novu/application-generic';

import { StandardWorker } from './standard.worker';
import { WorkflowWorker } from './workflow.worker';
import { OldInstanceStandardWorker } from './old-instance-standard.worker';
import { OldInstanceWorkflowWorker } from './old-instance-workflow.worker';
import { SubscriberProcessWorker } from './subscriber-process.worker';
import { WorkflowWorker } from './workflow.worker';

/**
* TODO: Temporary engage OldInstanceWorkflowWorker while migrating to MemoryDB
*/
const getWorkers = (app: INestApplication): INovuWorker[] => {
const standardWorker = app.get(StandardWorker, { strict: false });
const workflowWorker = app.get(WorkflowWorker, { strict: false });
const oldInstanceStandardWorker = app.get(OldInstanceStandardWorker, { strict: false });
const oldInstanceWorkflowWorker = app.get(OldInstanceWorkflowWorker, { strict: false });
const subscriberProcessWorker = app.get(SubscriberProcessWorker, { strict: false });

const workers: INovuWorker[] = [
standardWorker,
workflowWorker,
oldInstanceStandardWorker,
oldInstanceWorkflowWorker,
subscriberProcessWorker,
];
const workers: INovuWorker[] = [standardWorker, workflowWorker, subscriberProcessWorker];

return workers;
};
2 changes: 0 additions & 2 deletions apps/worker/src/app/workflow/services/index.ts
Original file line number Diff line number Diff line change
@@ -2,5 +2,3 @@ export * from './active-jobs-metric.service';
export * from './completed-jobs-metric.service';
export * from './standard.worker';
export * from './workflow.worker';
export * from './old-instance-standard.worker';
export * from './old-instance-workflow.worker';
199 changes: 0 additions & 199 deletions apps/worker/src/app/workflow/services/old-instance-standard.worker.ts

This file was deleted.

This file was deleted.

13 changes: 1 addition & 12 deletions apps/worker/src/app/workflow/workflow.module.ts
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ import {
GetSubscriberGlobalPreference,
GetSubscriberTemplatePreference,
ProcessTenant,
OldInstanceBullMqService,
QueuesModule,
SelectIntegration,
SendTestEmail,
@@ -33,14 +32,7 @@ import {
} from '@novu/application-generic';
import { JobRepository } from '@novu/dal';

import {
ActiveJobsMetricService,
CompletedJobsMetricService,
StandardWorker,
WorkflowWorker,
OldInstanceWorkflowWorker,
OldInstanceStandardWorker,
} from './services';
import { ActiveJobsMetricService, CompletedJobsMetricService, StandardWorker, WorkflowWorker } from './services';

import {
MessageMatcher,
@@ -123,9 +115,6 @@ const PROVIDERS: Provider[] = [
StandardWorker,
WorkflowWorker,
SubscriberProcessWorker,
OldInstanceBullMqService,
OldInstanceStandardWorker,
OldInstanceWorkflowWorker,
];

@Module({
2 changes: 0 additions & 2 deletions apps/ws/src/socket/services/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
export { OldInstanceWebSocketsWorker } from './old-instance-web-sockets.worker';
export { OldInstanceWebSocketsWorkerService } from './old-instance-web-sockets-worker.service';
export { WebSocketWorker } from './web-socket.worker';

This file was deleted.

71 changes: 0 additions & 71 deletions apps/ws/src/socket/services/old-instance-web-sockets.worker.ts

This file was deleted.

10 changes: 2 additions & 8 deletions apps/ws/src/socket/socket.module.ts
Original file line number Diff line number Diff line change
@@ -6,17 +6,11 @@ import { WSGateway } from './ws.gateway';
import { SharedModule } from '../shared/shared.module';
import { ExternalServicesRoute } from './usecases/external-services-route';

import { OldInstanceWebSocketsWorker, OldInstanceWebSocketsWorkerService, WebSocketWorker } from './services';
import { WebSocketWorker } from './services';

const USE_CASES: Provider[] = [ExternalServicesRoute];

const PROVIDERS: Provider[] = [
WSGateway,
OldInstanceWebSocketsWorker,
OldInstanceWebSocketsWorkerService,
WebSocketsWorkerService,
WebSocketWorker,
];
const PROVIDERS: Provider[] = [WSGateway, WebSocketsWorkerService, WebSocketWorker];

@Module({
imports: [SharedModule],
12 changes: 0 additions & 12 deletions packages/application-generic/src/custom-providers/index.ts
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ import {
DistributedLockService,
FeatureFlagsService,
ReadinessService,
OldInstanceBullMqService,
StandardQueueService,
SubscriberProcessQueueService,
WebSocketsQueueService,
@@ -69,17 +68,6 @@ export const bullMqService = {
},
};

export const oldInstanceBullMqService = {
provide: OldInstanceBullMqService,
useFactory: async (): Promise<OldInstanceBullMqService> => {
const service = new OldInstanceBullMqService();

await service.initialize();

return service;
},
};

export const cacheService = {
provide: CacheService,
useFactory: async (): Promise<CacheService> => {
7 changes: 1 addition & 6 deletions packages/application-generic/src/modules/queues.module.ts
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ import {
WebSocketsQueueServiceHealthIndicator,
WorkflowQueueServiceHealthIndicator,
} from '../health';
import { OldInstanceBullMqService, ReadinessService } from '../services';
import { ReadinessService } from '../services';
import {
ActiveJobsMetricQueueService,
CompletedJobsMetricQueueService,
@@ -28,8 +28,6 @@ import {
SubscriberProcessWorkerService,
WebSocketsWorkerService,
WorkflowWorkerService,
OldInstanceStandardWorkerService,
OldInstanceWorkflowWorkerService,
} from '../services/workers';

const PROVIDERS: Provider[] = [
@@ -53,9 +51,6 @@ const PROVIDERS: Provider[] = [
WorkflowQueueService,
WorkflowQueueServiceHealthIndicator,
WorkflowWorkerService,
OldInstanceStandardWorkerService,
OldInstanceWorkflowWorkerService,
OldInstanceBullMqService,
SubscriberProcessQueueService,
SubscriberProcessWorkerService,
SubscriberProcessQueueHealthIndicator,
1 change: 0 additions & 1 deletion packages/application-generic/src/services/bull-mq/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from './bull-mq.service';
export { OldInstanceBullMqService } from './old-instance-bull-mq.service';

This file was deleted.

Original file line number Diff line number Diff line change
@@ -55,11 +55,6 @@ export class InMemoryProviderService {
}

private buildClient(provider: InMemoryProviderEnum): InMemoryProviderClient {
// TODO: Temporary while migrating to MemoryDB
if (provider === InMemoryProviderEnum.OLD_INSTANCE_REDIS) {
return this.oldInstanceInMemoryProviderSetup();
}

return this.isCluster
? this.inMemoryClusterProviderSetup(provider)
: this.inMemoryProviderSetup();
@@ -115,10 +110,7 @@ export class InMemoryProviderService {

public getOptions(): RedisOptions | undefined {
if (this.inMemoryProviderClient) {
if (
this.provider === InMemoryProviderEnum.OLD_INSTANCE_REDIS ||
!this.isCluster
) {
if (!this.isCluster) {
const options: RedisOptions = this.inMemoryProviderClient.options;

return options;
@@ -291,95 +283,6 @@ export class InMemoryProviderService {
}
}

/**
* TODO: Temporary while we migrate to MemoryDB
*/
private oldInstanceInMemoryProviderSetup(): Redis | undefined {
Logger.verbose(
this.descriptiveLogMessage('In-memory old instance service set up'),
LOG_CONTEXT
);

const { getClient, getConfig, isClientReady } = getClientAndConfig();

this.isProviderClientReady = isClientReady;
this.inMemoryProviderConfig = getConfig();
const { host, port, ttl } = getConfig();

if (!host) {
Logger.warn(
this.descriptiveLogMessage(
'Missing host for in-memory provider old instance'
),
LOG_CONTEXT
);
}

const inMemoryProviderClient = getClient();
if (host && inMemoryProviderClient) {
Logger.log(
this.descriptiveLogMessage(
`Connecting to old instance to ${host}:${port}`
),
LOG_CONTEXT
);

inMemoryProviderClient.on('connect', () => {
Logger.log(
this.descriptiveLogMessage('REDIS CONNECTED to old instance'),
LOG_CONTEXT
);
});

inMemoryProviderClient.on('reconnecting', () => {
Logger.verbose(
this.descriptiveLogMessage('Redis reconnecting to old instance'),
LOG_CONTEXT
);
});

inMemoryProviderClient.on('close', () => {
Logger.verbose(
this.descriptiveLogMessage('Redis close old instance'),
LOG_CONTEXT
);
});

inMemoryProviderClient.on('end', () => {
Logger.verbose(
this.descriptiveLogMessage('Redis end old instance'),
LOG_CONTEXT
);
});

inMemoryProviderClient.on('error', (error) => {
Logger.error(
error,
this.descriptiveLogMessage(
'There has been an error in the InMemory provider client for the old instance'
),
LOG_CONTEXT
);
});

inMemoryProviderClient.on('ready', () => {
Logger.log(
this.descriptiveLogMessage('Redis ready for old instance'),
LOG_CONTEXT
);
});

inMemoryProviderClient.on('wait', () => {
Logger.verbose(
this.descriptiveLogMessage('Redis wait for old instance'),
LOG_CONTEXT
);
});

return inMemoryProviderClient;
}
}

public inMemoryScan(pattern: string): ScanStream {
if (this.isCluster) {
const client = this.inMemoryProviderClient as Cluster;
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ export enum InMemoryProviderEnum {
MEMORY_DB = 'MemoryDB',
REDIS = 'Redis',
REDIS_CLUSTER = 'RedisCluster',
OLD_INSTANCE_REDIS = 'OldInstanceRedis',
}

export type Pipeline = ChainableCommander;
1 change: 0 additions & 1 deletion packages/application-generic/src/services/index.ts
Original file line number Diff line number Diff line change
@@ -22,6 +22,5 @@ export {
QueueOptions,
Worker,
WorkerOptions,
OldInstanceBullMqService,
} from './bull-mq';
export * from './auth';
8 changes: 2 additions & 6 deletions packages/application-generic/src/services/workers/index.ts
Original file line number Diff line number Diff line change
@@ -8,23 +8,19 @@ import { ActiveJobsMetricWorkerService } from './active-jobs-metric-worker.servi
import { CompletedJobsMetricWorkerService } from './completed-jobs-metric-worker.service';
import { InboundParseWorkerService } from './inbound-parse-worker.service';
import { StandardWorkerService } from './standard-worker.service';
import { SubscriberProcessWorkerService } from './subscriber-process-worker.service';
import { WebSocketsWorkerService } from './web-sockets-worker.service';
import { WorkflowWorkerService } from './workflow-worker.service';
import { OldInstanceStandardWorkerService } from './old-instance-standard-worker.service';
import { OldInstanceWorkflowWorkerService } from './old-instance-workflow-worker.service';
import { SubscriberProcessWorkerService } from './subscriber-process-worker.service';

export {
ActiveJobsMetricWorkerService,
CompletedJobsMetricWorkerService,
InboundParseWorkerService as InboundParseWorker,
StandardWorkerService,
SubscriberProcessWorkerService,
WebSocketsWorkerService,
WorkerBaseService,
WorkerOptions,
WorkerProcessor,
WorkflowWorkerService,
OldInstanceStandardWorkerService,
OldInstanceWorkflowWorkerService,
SubscriberProcessWorkerService,
};

This file was deleted.

This file was deleted.