Skip to content

Commit

Permalink
feat: 🎸 add kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
sweatpotato13 committed Jan 12, 2025
1 parent ba9c692 commit ececa55
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ REDIS_COMMANDER_PORT=8082
REDIS_COMMANDER_USER=admin
REDIS_COMMANDER_PASSWORD=admin

# Kafka
KAFKA_BROKER_ENDPOINT=localhost:9092
KAFKA_CLIENT_ID=client
KAFKA_GROUP_ID=group

# JsonWebToken
JWT_ALGORITHM=
JWT_PRIVATE_KEY=
Expand Down
29 changes: 27 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3"
services:
# postgres
postgres:
image: postgres:16
image: postgres:17
container_name: postgres
restart: always
environment:
Expand All @@ -25,6 +25,7 @@ services:
HASURA_GRAPHQL_DEV_MODE: "true"
HASURA_GRAPHQL_ADMIN_SECRET: myadminsecretkey
HASURA_GRAPHQL_UNAUTHORIZED_ROLE: anonymous
# mongo
mongo:
image: mongo
container_name: mongo
Expand All @@ -46,6 +47,7 @@ services:
ME_CONFIG_MONGODB_ADMINPASSWORD: ${MONGODB_PW}
ports:
- ${MONGOEXPRESS_PORT}:8081
# redis
redis:
container_name: redis
image: redis
Expand All @@ -69,6 +71,7 @@ services:
- REDIS_PORT=${REDIS_PORT}
- HTTP_USER=${REDIS_COMMANDER_USER}
- HTTP_PASSWORD=${REDIS_COMMANDER_PASSWORD}
# elasticsearch
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.6
container_name: es01
Expand All @@ -93,6 +96,28 @@ services:
environment:
ELASTICSEARCH_URL: http://es01:9200
ELASTICSEARCH_HOSTS: http://es01:9200
# kafka
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://localhost:19092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
nest:
container_name: nest
restart: always
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"helmet": "^8.0.0",
"ioredis": "^5.4.2",
"jsonwebtoken": "^9.0.2",
"kafkajs": "^2.2.4",
"moment": "^2.30.1",
"morgan": "^1.10.0",
"nest-winston": "^1.10.0",
Expand Down
18 changes: 14 additions & 4 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ if (typeof process.send !== "function") {

import ElasticsearchConfig from "./modules/elasticsearch/elasticsearch";
import JwtModuleConfig from "./modules/jwt";
import KafkaConfg from "./modules/kafka/kafka";
import { loggerConfig } from "./modules/logger/logger";
import GoogleOauthConfig from "./modules/passport/google";
import RedisModuleConfig from "./modules/redis";
Expand All @@ -14,6 +15,7 @@ export {
ElasticsearchConfig,
GoogleOauthConfig,
JwtModuleConfig,
KafkaConfg,
loggerConfig,
RedisModuleConfig
};
Expand Down
33 changes: 33 additions & 0 deletions src/config/modules/kafka/kafka.config.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Inject, Injectable } from "@nestjs/common";
import { ConfigType } from "@nestjs/config";
import {
ClientProvider,
ClientsModuleOptionsFactory,
KafkaOptions,
Transport
} from "@nestjs/microservices";
import { KafkaConfg } from "@src/config";

@Injectable()
export class KafkaConfigService implements ClientsModuleOptionsFactory {
constructor(
@Inject(KafkaConfg.KEY)
private config: ConfigType<typeof KafkaConfg>
) {}

createClientOptions(): ClientProvider {
const option: KafkaOptions = {
transport: Transport.KAFKA,
options: {
client: {
clientId: this.config.clientId,
brokers: [this.config.brokerEndpoint]
},
consumer: {
groupId: this.config.groupId
}
}
};
return option;
}
}
7 changes: 7 additions & 0 deletions src/config/modules/kafka/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { registerAs } from "@nestjs/config";

export default registerAs("kafka", () => ({
brokerEndpoint: process.env.KAFKA_BROKER_ENDPOINT || "localhost:9092",
clientId: process.env.KAFKA_CLIENT_ID || "client",
groupId: process.env.KAFKA_GROUP_ID || "ura-consumer-1337"
}));
23 changes: 22 additions & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint-disable @typescript-eslint/no-unused-expressions */
import { config } from "@config";
import { ValidationPipe } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { NestFactory } from "@nestjs/core";
import { NestExpressApplication } from "@nestjs/platform-express";
import { OpenAPIObject, SwaggerModule } from "@nestjs/swagger";
Expand All @@ -15,6 +16,7 @@ import { AppModule } from "./app.module";
import { BadRequestExceptionFilter } from "./common/filters/bad-request-exception.filter";
import { HttpExceptionFilter } from "./common/filters/http-exception.filter";
import { TimeoutInterceptor } from "./common/interceptors/timeout.interceptor";
import { KafkaConfigService } from "./config/modules/kafka/kafka.config.service";
import { errorStream, logger } from "./config/modules/winston";

async function bootstrap() {
Expand All @@ -32,6 +34,26 @@ async function bootstrap() {
app.use(rTracer.expressMiddleware());

app.use(json({ limit: "50mb" }));

const configService = app.get(ConfigService);
const kafkaConfig = configService.get("kafka");

// connect kafka only when kafkaConfig is set
if (
kafkaConfig?.brokerEndpoint &&
kafkaConfig?.clientId &&
kafkaConfig?.groupId
) {
app.connectMicroservice(
new KafkaConfigService(kafkaConfig).createClientOptions()
);
await app.startAllMicroservices();
logger.info(
`🚀 Kafka connected at ${kafkaConfig.brokerEndpoint}`,
{ context: "BootStrap" }
);
}

// Swagger
const swagger = JSON.parse(
fs.readFileSync(__dirname + "/../public/swagger.json", "utf8")
Expand Down Expand Up @@ -84,7 +106,6 @@ async function bootstrap() {
next();
});

await app.startAllMicroservices();
await app.listen(config.port, () => {
!config.isProduction
? logger.info(
Expand Down
3 changes: 2 additions & 1 deletion src/shared/modules/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { JwtModule } from "./jwt/jwt.module";
import { KafkaModule } from "./kafka/kafka.module";
import { RedisModule } from "./redis/redis.module";

export { JwtModule, RedisModule };
export { JwtModule, KafkaModule, RedisModule };
20 changes: 20 additions & 0 deletions src/shared/modules/kafka/kafka.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { ClientsModule } from "@nestjs/microservices";
import { KafkaConfg } from "@src/config";
import { KafkaConfigService } from "@src/config/modules/kafka/kafka.config.service";

@Module({
imports: [
ClientsModule.registerAsync([
{
imports: [ConfigModule.forFeature(KafkaConfg)],
name: "KAFKA",
useClass: KafkaConfigService
}
])
],
providers: [],
controllers: []
})
export class KafkaModule {}

0 comments on commit ececa55

Please sign in to comment.