ในบทความนี้ผมจะมาพูดถึงการตั้งค่า Kafka ในโปรเจกต์ NestJS ของเรา โดยเราจะมีการใช้ kafkajs เพื่อเชื่อมต่อและส่งข้อความไปยัง Kafka broker ที่เราตั้งค่าไว้ ผ่านการสร้างโมดูลและบริการที่เรียกว่า KafkaModule และ KafkaService
บทความที่เกี่ยวข้อง
1. การตั้งค่า Kafka Configuration
ไฟล์ kafka.config.ts เป็นที่ที่เราจะเก็บการตั้งค่าของ Kafka โดยใช้ NestJS Config Module เพื่อจัดการกับค่าคอนฟิกที่เราต้องการ
import { registerAs } from '@nestjs/config';
export default registerAs('kafka', () => ({
kafkaBrokerUrl: process.env.KAFKA_HOST,
clientAppId: process.env.KAFKA_CLIENT_ID,
logTopicPrefix: process.env.LOG_KAFKA_TOPIC_PREFIX,
}));
export interface KafkaConfig {
kafkaBrokerUrl: string;
clientAppId: string;
logTopicPrefix: string;
}
ในที่นี้เราได้ใช้ registerAs เพื่อสร้างคอนฟิกที่เรียกว่า kafka ซึ่งจะให้ค่าของ kafkaBrokerUrl, clientAppId และ logTopicPrefix จาก environment variables
ไฟล์ .env
KAFKA_HOST=localhost:9935
KAFKA_CLIENT_ID=my-app
LOG_KAFKA_TOPIC_PREFIX=test
2. การสร้าง Kafka Constants
ไฟล์ kafka.constants.ts ใช้สำหรับกำหนดค่าคงที่ต่าง ๆ ที่เราจะใช้ในโมดูลและบริการของเรา โดยในกรณีนี้เราจะสร้างคีย์สำหรับ Producer:
export const KAFKA_PRODUCER_TOKEN = 'KAFKA_PRODUCER_TOKEN';
- KAFKA_PRODUCER_TOKEN จะช่วยให้เราสามารถอ้างอิงถึง Producer ในโมดูลและบริการได้ง่ายขึ้น
3. การสร้าง Kafka Module
ไฟล์ kafka.module.ts จะเป็นที่รวมกันของ provider ที่เราสร้างขึ้นมา รวมถึง producer ที่ใช้สำหรับส่งข้อความ
import { Global, Logger, Module, OnApplicationShutdown } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ModuleRef } from '@nestjs/core';
import { Kafka, Partitioners, Producer } from 'kafkajs';
import { KAFKA_PRODUCER_TOKEN } from './kafka.constants';
import { KafkaService } from './kafka.service';
import { KafkaConfig } from './kafka.config';
@Global()
@Module({
providers: [
{
provide: KAFKA_PRODUCER_TOKEN,
useFactory: async (configService: ConfigService) => {
const logger = new Logger('Kafka');
const kafkaConfig = configService.get<KafkaConfig>('kafka');
const kafka = new Kafka({
brokers: [kafkaConfig.kafkaBrokerUrl],
clientId: kafkaConfig.clientAppId,
});
const kafkaProducer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
retry: {
retries: 0,
initialRetryTime: 300,
multiplier: 1.5,
},
maxInFlightRequests: 1,
metadataMaxAge: 300000,
});
const retryConnect = async () => {
try {
await kafkaProducer.connect();
logger.log('Kafka producer connected successfully.');
} catch (error) {
logger.error('Failed to connect to Kafka producer. Retrying...', error);
await new Promise((resolve) => setTimeout(resolve, 3000));
await retryConnect();
}
};
retryConnect();
return kafkaProducer;
},
inject: [ConfigService],
},
KafkaService,
],
exports: [KafkaService],
})
export class KafkaModule implements OnApplicationShutdown {
constructor(private readonly moduleRef: ModuleRef) {}
async onApplicationShutdown() {
const kafkaProducer: Producer = this.moduleRef.get(KAFKA_PRODUCER_TOKEN);
await kafkaProducer.disconnect();
}
}
- เราใช้ useFactory เพื่อสร้าง producer และเชื่อมต่อกับ Kafka broker โดยมีการ retry ในกรณีที่การเชื่อมต่อล้มเหลว
- OnApplicationShutdown จะช่วยให้เรา disconnect producer ได้เมื่อแอปพลิเคชันหยุดทำงาน
4. การสร้าง Kafka Service
ไฟล์ kafka.service.ts จะมีฟังก์ชันสำหรับการส่งข้อความไปยัง Kafka
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Producer } from 'kafkajs';
import { KAFKA_PRODUCER_TOKEN } from './kafka.constants';
import { KafkaConfig } from './kafka.config';
import { ConfigService } from '@nestjs/config';
@Injectable()
export class KafkaService {
private readonly logger = new Logger(KafkaService.name);
constructor(
@Inject(KAFKA_PRODUCER_TOKEN)
private readonly kafkaProducer: Producer,
private readonly configService: ConfigService,
) {}
async publishLogMessage(logMessage: string, logLevel: string): Promise<void> {
try {
const kafkaConfig = this.configService.get<KafkaConfig>('kafka');
const environment = "local";
const topicName = `${kafkaConfig.logTopicPrefix}-${environment}-${kafkaConfig.clientAppId}-${logLevel}`;
await this.kafkaProducer.send({
topic: topicName,
messages: [{ value: logMessage }],
acks: 1,
});
this.logger.log(`Message sent to Kafka topic: ${topicName}`);
} catch (error) {
this.logger.error('Error sending message to Kafka topic.', error);
}
}
}
- ฟังก์ชัน sendMessage จะส่งข้อความไปยัง Kafka ตามที่กำหนดใน topic ที่สร้างขึ้นจากค่าต่าง ๆ
5. การใช้ Kafka Module ใน Application
ในไฟล์ app.module.ts เราต้อง import KafkaModule และตั้งค่า ConfigModule เพื่อให้โหลดคอนฟิกที่เราสร้างขึ้น
import { ConfigModule } from '@nestjs/config';
import { KafkaModule } from './shared/kafka/kafka.module';
import kafkaConfig from './shared/kafka/kafka.config';
@Module({
imports: [
ConfigModule.forRoot({
load: [kafkaConfig],
}),
KafkaModule,
],
})
export class AppModule {}
สรุป
ตั้งค่าเชื่อม Kafka กับ NestJS และส่งข้อความไปยัง Kafka ได้พร้อมระบบ retry อัตโนมัติถ้าเชื่อมต่อล้มเหลว