พาตั้งค่าใช้งาน Kafka กับ NestJS

·

ในบทความนี้ผมจะมาพูดถึงการตั้งค่า Kafka ในโปรเจกต์ NestJS ของเรา โดยเราจะมีการใช้ kafkajs เพื่อเชื่อมต่อและส่งข้อความไปยัง Kafka broker ที่เราตั้งค่าไว้ ผ่านการสร้างโมดูลและบริการที่เรียกว่า KafkaModule และ KafkaService

บทความที่เกี่ยวข้อง

1. การตั้งค่า Kafka Configuration

ไฟล์ kafka.config.ts เป็นที่ที่เราจะเก็บการตั้งค่าของ Kafka โดยใช้ NestJS Config Module เพื่อจัดการกับค่าคอนฟิกที่เราต้องการ

import { registerAs } from '@nestjs/config';

export default registerAs('kafka', () => ({
  kafkaBrokerUrl: process.env.KAFKA_HOST,
  clientAppId: process.env.KAFKA_CLIENT_ID,
  logTopicPrefix: process.env.LOG_KAFKA_TOPIC_PREFIX,
}));

export interface KafkaConfig {
  kafkaBrokerUrl: string;
  clientAppId: string;
  logTopicPrefix: string;
}

ในที่นี้เราได้ใช้ registerAs เพื่อสร้างคอนฟิกที่เรียกว่า kafka ซึ่งจะให้ค่าของ kafkaBrokerUrl, clientAppId และ logTopicPrefix จาก environment variables

ไฟล์ .env

KAFKA_HOST=localhost:9935
KAFKA_CLIENT_ID=my-app
LOG_KAFKA_TOPIC_PREFIX=test

2. การสร้าง Kafka Constants

ไฟล์ kafka.constants.ts ใช้สำหรับกำหนดค่าคงที่ต่าง ๆ ที่เราจะใช้ในโมดูลและบริการของเรา โดยในกรณีนี้เราจะสร้างคีย์สำหรับ Producer:

export const KAFKA_PRODUCER_TOKEN = 'KAFKA_PRODUCER_TOKEN';
  • KAFKA_PRODUCER_TOKEN จะช่วยให้เราสามารถอ้างอิงถึง Producer ในโมดูลและบริการได้ง่ายขึ้น

3. การสร้าง Kafka Module

ไฟล์ kafka.module.ts จะเป็นที่รวมกันของ provider ที่เราสร้างขึ้นมา รวมถึง producer ที่ใช้สำหรับส่งข้อความ

import { Global, Logger, Module, OnApplicationShutdown } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ModuleRef } from '@nestjs/core';
import { Kafka, Partitioners, Producer } from 'kafkajs';
import { KAFKA_PRODUCER_TOKEN } from './kafka.constants';
import { KafkaService } from './kafka.service';
import { KafkaConfig } from './kafka.config';

@Global()
@Module({
  providers: [
    {
      provide: KAFKA_PRODUCER_TOKEN,
      useFactory: async (configService: ConfigService) => {
        const logger = new Logger('Kafka');
        const kafkaConfig = configService.get<KafkaConfig>('kafka');

        const kafka = new Kafka({
          brokers: [kafkaConfig.kafkaBrokerUrl],
          clientId: kafkaConfig.clientAppId,
        });

        const kafkaProducer = kafka.producer({
          createPartitioner: Partitioners.DefaultPartitioner,
          retry: {
            retries: 0,
            initialRetryTime: 300,
            multiplier: 1.5,
          },
          maxInFlightRequests: 1,
          metadataMaxAge: 300000,
        });

        const retryConnect = async () => {
          try {
            await kafkaProducer.connect();
            logger.log('Kafka producer connected successfully.');
          } catch (error) {
            logger.error('Failed to connect to Kafka producer. Retrying...', error);
            await new Promise((resolve) => setTimeout(resolve, 3000));
            await retryConnect();
          }
        };
        retryConnect();

        return kafkaProducer;
      },
      inject: [ConfigService],
    },
    KafkaService,
  ],
  exports: [KafkaService],
})
export class KafkaModule implements OnApplicationShutdown {
  constructor(private readonly moduleRef: ModuleRef) {}

  async onApplicationShutdown() {
    const kafkaProducer: Producer = this.moduleRef.get(KAFKA_PRODUCER_TOKEN);
    await kafkaProducer.disconnect();
  }
}

  • เราใช้ useFactory เพื่อสร้าง producer และเชื่อมต่อกับ Kafka broker โดยมีการ retry ในกรณีที่การเชื่อมต่อล้มเหลว
  • OnApplicationShutdown จะช่วยให้เรา disconnect producer ได้เมื่อแอปพลิเคชันหยุดทำงาน

4. การสร้าง Kafka Service

ไฟล์ kafka.service.ts จะมีฟังก์ชันสำหรับการส่งข้อความไปยัง Kafka

import { Inject, Injectable, Logger } from '@nestjs/common';
import { Producer } from 'kafkajs';
import { KAFKA_PRODUCER_TOKEN } from './kafka.constants';
import { KafkaConfig } from './kafka.config';
import { ConfigService } from '@nestjs/config';

@Injectable()
export class KafkaService {
  private readonly logger = new Logger(KafkaService.name);

  constructor(
    @Inject(KAFKA_PRODUCER_TOKEN)
    private readonly kafkaProducer: Producer,
    private readonly configService: ConfigService,
  ) {}

  async publishLogMessage(logMessage: string, logLevel: string): Promise<void> {
    try {
      const kafkaConfig = this.configService.get<KafkaConfig>('kafka');
      const environment = "local";
      const topicName = `${kafkaConfig.logTopicPrefix}-${environment}-${kafkaConfig.clientAppId}-${logLevel}`;

      await this.kafkaProducer.send({
        topic: topicName,
        messages: [{ value: logMessage }],
        acks: 1,
      });

      this.logger.log(`Message sent to Kafka topic: ${topicName}`);
    } catch (error) {
      this.logger.error('Error sending message to Kafka topic.', error);
    }
  }
}

  • ฟังก์ชัน sendMessage จะส่งข้อความไปยัง Kafka ตามที่กำหนดใน topic ที่สร้างขึ้นจากค่าต่าง ๆ

5. การใช้ Kafka Module ใน Application

ในไฟล์ app.module.ts เราต้อง import KafkaModule และตั้งค่า ConfigModule เพื่อให้โหลดคอนฟิกที่เราสร้างขึ้น


import { ConfigModule } from '@nestjs/config';
import { KafkaModule } from './shared/kafka/kafka.module';
import kafkaConfig from './shared/kafka/kafka.config';

@Module({
  imports: [
    ConfigModule.forRoot({
      load: [kafkaConfig],
    }),
    KafkaModule,
  ],
})
export class AppModule {}

สรุป

ตั้งค่าเชื่อม Kafka กับ NestJS และส่งข้อความไปยัง Kafka ได้พร้อมระบบ retry อัตโนมัติถ้าเชื่อมต่อล้มเหลว

🚀 เรารับทำเว็บไซต์คุณภาพสูง

พร้อมบริการหลังการขายและรับประกันผลงาน

จ้างเราผ่าน Fastwork

🌐 ระบบสร้างเว็บไซต์อัตโนมัติจาก GenWebBlog

สร้างเว็บไซต์พร้อมใช้งานได้ในไม่กี่นาที พร้อม SEO และ Dark Mode

เยี่ยมชม GenWebBlog