พาตั้งค่าใช้งาน Kafka กับ NestJS

Nov. 6, 2024 · boychawin

ในบทความนี้ผมจะมาพูดถึงการตั้งค่า Kafka ในโปรเจกต์ NestJS ของเรา โดยเราจะมีการใช้ kafkajs เพื่อเชื่อมต่อและส่งข้อความไปยัง Kafka broker ที่เราตั้งค่าไว้ ผ่านการสร้างโมดูลและบริการที่เรียกว่า KafkaModule และ KafkaService

บทความที่เกี่ยวข้อง

1. การตั้งค่า Kafka Configuration

ไฟล์ kafka.config.ts เป็นที่ที่เราจะเก็บการตั้งค่าของ Kafka โดยใช้ NestJS Config Module เพื่อจัดการกับค่าคอนฟิกที่เราต้องการ

import { registerAs } from '@nestjs/config';

export default registerAs('kafka', () => ({
  kafkaBrokerUrl: process.env.KAFKA_HOST,
  clientAppId: process.env.KAFKA_CLIENT_ID,
  logTopicPrefix: process.env.LOG_KAFKA_TOPIC_PREFIX,
}));

export interface KafkaConfig {
  kafkaBrokerUrl: string;
  clientAppId: string;
  logTopicPrefix: string;
}

ในที่นี้เราได้ใช้ registerAs เพื่อสร้างคอนฟิกที่เรียกว่า kafka ซึ่งจะให้ค่าของ kafkaBrokerUrl, clientAppId และ logTopicPrefix จาก environment variables

ไฟล์ .env

KAFKA_HOST=localhost:9935
KAFKA_CLIENT_ID=my-app
LOG_KAFKA_TOPIC_PREFIX=test

2. การสร้าง Kafka Constants

ไฟล์ kafka.constants.ts ใช้สำหรับกำหนดค่าคงที่ต่าง ๆ ที่เราจะใช้ในโมดูลและบริการของเรา โดยในกรณีนี้เราจะสร้างคีย์สำหรับ Producer:

export const KAFKA_PRODUCER_TOKEN = 'KAFKA_PRODUCER_TOKEN';
  • KAFKA_PRODUCER_TOKEN จะช่วยให้เราสามารถอ้างอิงถึง Producer ในโมดูลและบริการได้ง่ายขึ้น

3. การสร้าง Kafka Module

ไฟล์ kafka.module.ts จะเป็นที่รวมกันของ provider ที่เราสร้างขึ้นมา รวมถึง producer ที่ใช้สำหรับส่งข้อความ

import { Global, Logger, Module, OnApplicationShutdown } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ModuleRef } from '@nestjs/core';
import { Kafka, Partitioners, Producer } from 'kafkajs';
import { KAFKA_PRODUCER_TOKEN } from './kafka.constants';
import { KafkaService } from './kafka.service';
import { KafkaConfig } from './kafka.config';

@Global()
@Module({
  providers: [
    {
      provide: KAFKA_PRODUCER_TOKEN,
      useFactory: async (configService: ConfigService) => {
        const logger = new Logger('Kafka');
        const kafkaConfig = configService.get<KafkaConfig>('kafka');

        const kafka = new Kafka({
          brokers: [kafkaConfig.kafkaBrokerUrl],
          clientId: kafkaConfig.clientAppId,
        });

        const kafkaProducer = kafka.producer({
          createPartitioner: Partitioners.DefaultPartitioner,
          retry: {
            retries: 0,
            initialRetryTime: 300,
            multiplier: 1.5,
          },
          maxInFlightRequests: 1,
          metadataMaxAge: 300000,
        });

        const retryConnect = async () => {
          try {
            await kafkaProducer.connect();
            logger.log('Kafka producer connected successfully.');
          } catch (error) {
            logger.error('Failed to connect to Kafka producer. Retrying...', error);
            await new Promise((resolve) => setTimeout(resolve, 3000));
            await retryConnect();
          }
        };
        retryConnect();

        return kafkaProducer;
      },
      inject: [ConfigService],
    },
    KafkaService,
  ],
  exports: [KafkaService],
})
export class KafkaModule implements OnApplicationShutdown {
  constructor(private readonly moduleRef: ModuleRef) {}

  async onApplicationShutdown() {
    const kafkaProducer: Producer = this.moduleRef.get(KAFKA_PRODUCER_TOKEN);
    await kafkaProducer.disconnect();
  }
}

  • เราใช้ useFactory เพื่อสร้าง producer และเชื่อมต่อกับ Kafka broker โดยมีการ retry ในกรณีที่การเชื่อมต่อล้มเหลว
  • OnApplicationShutdown จะช่วยให้เรา disconnect producer ได้เมื่อแอปพลิเคชันหยุดทำงาน

4. การสร้าง Kafka Service

ไฟล์ kafka.service.ts จะมีฟังก์ชันสำหรับการส่งข้อความไปยัง Kafka

import { Inject, Injectable, Logger } from '@nestjs/common';
import { Producer } from 'kafkajs';
import { KAFKA_PRODUCER_TOKEN } from './kafka.constants';
import { KafkaConfig } from './kafka.config';
import { ConfigService } from '@nestjs/config';

@Injectable()
export class KafkaService {
  private readonly logger = new Logger(KafkaService.name);

  constructor(
    @Inject(KAFKA_PRODUCER_TOKEN)
    private readonly kafkaProducer: Producer,
    private readonly configService: ConfigService,
  ) {}

  async publishLogMessage(logMessage: string, logLevel: string): Promise<void> {
    try {
      const kafkaConfig = this.configService.get<KafkaConfig>('kafka');
      const environment = "local";
      const topicName = `${kafkaConfig.logTopicPrefix}-${environment}-${kafkaConfig.clientAppId}-${logLevel}`;

      await this.kafkaProducer.send({
        topic: topicName,
        messages: [{ value: logMessage }],
        acks: 1,
      });

      this.logger.log(`Message sent to Kafka topic: ${topicName}`);
    } catch (error) {
      this.logger.error('Error sending message to Kafka topic.', error);
    }
  }
}

  • ฟังก์ชัน sendMessage จะส่งข้อความไปยัง Kafka ตามที่กำหนดใน topic ที่สร้างขึ้นจากค่าต่าง ๆ

5. การใช้ Kafka Module ใน Application

ในไฟล์ app.module.ts เราต้อง import KafkaModule และตั้งค่า ConfigModule เพื่อให้โหลดคอนฟิกที่เราสร้างขึ้น


import { ConfigModule } from '@nestjs/config';
import { KafkaModule } from './shared/kafka/kafka.module';
import kafkaConfig from './shared/kafka/kafka.config';

@Module({
  imports: [
    ConfigModule.forRoot({
      load: [kafkaConfig],
    }),
    KafkaModule,
  ],
})
export class AppModule {}

สรุป

ตั้งค่าเชื่อม Kafka กับ NestJS และส่งข้อความไปยัง Kafka ได้พร้อมระบบ retry อัตโนมัติถ้าเชื่อมต่อล้มเหลว