Kafka

Kafka 是一个开源的分布式流处理平台,具有三大核心能力:

  • 发布和订阅记录流,类似于消息队列或企业消息系统。
  • 以容错且持久的方式存储记录流。
  • 在记录流产生时实时处理它们。

Kafka 项目旨在提供一个统一、高吞吐、低延迟的平台来处理实时数据流。它能与 Apache Storm 和 Spark 完美集成,实现实时流数据分析。

安装

要开始构建基于 Kafka 的微服务,首先需要安装所需软件包:

$ npm i --save kafkajs

概述

与其他 Nest 微服务传输层实现类似,您可以通过传递给 createMicroservice() 方法的选项对象中的 transport 属性来选择 Kafka 传输机制,同时还可使用可选的 options 属性,如下所示:

main
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
});

提示 Transport 枚举是从 @nestjs/microservices 包中导入的。

选项

options 属性是特定于所选传输器的。Kafka 传输器暴露的属性如下所述。

选项 描述
client 客户端配置选项(了解更多此处
consumer 消费者配置选项(了解更多此处
run 运行配置选项(了解更多此处
subscribe 订阅配置选项(了解更多此处
producer 生产者配置选项(了解更多此处
send 发送配置选项(了解更多此处
producerOnlyMode 跳过消费者组注册并仅作为生产者运行的功能标志(boolean)
postfixId 修改 clientId 值的后缀(string)

客户端

Kafka 与其他微服务传输器有个小区别。我们不使用 ClientProxy 类,而是使用 ClientKafkaProxy 类。

与其他微服务传输器类似,创建 ClientKafkaProxy 实例有多种方式

一种创建实例的方法是使用 ClientsModule。要通过 ClientsModule 创建客户端实例,需先导入该模块,然后使用 register() 方法传入一个选项对象(包含与上文 createMicroservice() 方法相同的属性),以及用作注入令牌的 name 属性。更多关于 ClientsModule 的信息请参阅此处

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'HERO_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
  ...
})

也可以使用其他方式创建客户端(ClientProxyFactory@Client())。相关说明请查看此文档

按如下方式使用 @Client() 装饰器:

@Client({
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero',
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer'
    }
  }
})
client: ClientKafkaProxy;

消息模式

Kafka 微服务消息模式利用两个主题分别处理请求和回复通道。ClientKafkaProxy#send() 方法通过将返回地址关联 ID、回复主题和回复分区与请求消息关联来发送消息。这要求 ClientKafkaProxy 实例在发送消息前必须已订阅回复主题并分配到至少一个分区。

因此,您需要为每个运行的 Nest 应用程序至少配置一个回复主题分区。例如,如果运行 4 个 Nest 应用程序但回复主题只有 3 个分区,那么其中 1 个 Nest 应用程序在尝试发送消息时会报错。

当新的 ClientKafkaProxy 实例启动时,它们会加入消费者组并订阅各自的主题。此过程会触发对消费者组中各消费者分配的主题分区进行重新平衡。

通常情况下,主题分区会采用轮询分区器进行分配,该分区器将主题分区分配给一组按消费者名称排序的消费者,这些名称在应用启动时随机设置。然而,当新消费者加入消费者组时,新消费者可能被放置在该消费者集合中的任意位置。这会导致当原有消费者被定位在新消费者之后时,可能被分配到不同的分区。因此,那些被重新分配分区的消费者将会丢失在重新平衡前发送的请求响应消息。

为防止 ClientKafkaProxy 消费者丢失响应消息,系统采用了 Nest 专属的内置自定义分区器。该分区器会根据应用启动时设置的高精度时间戳(process.hrtime())对消费者集合进行排序来分配分区。

消息响应订阅

warning 注意 本节仅适用于使用请求-响应消息模式的情况(配合 @MessagePattern 装饰器和 ClientKafkaProxy#send 方法)。对于基于事件的通信方式(使用 @EventPattern 装饰器和 ClientKafkaProxy#emit 方法),则无需订阅响应主题。

ClientKafkaProxy 类提供了 subscribeToResponseOf() 方法。该方法以请求主题名称作为参数,并将派生的回复主题名称添加到回复主题集合中。在实现消息模式时必须调用此方法。

heroes.controller
onModuleInit() {
  this.client.subscribeToResponseOf('hero.kill.dragon');
}

如果 ClientKafkaProxy 实例是异步创建的,则必须在调用 connect() 方法之前调用 subscribeToResponseOf() 方法。

heroes.controller
async onModuleInit() {
  this.client.subscribeToResponseOf('hero.kill.dragon');
  await this.client.connect();
}

传入

Nest 接收传入的 Kafka 消息时,会将其作为一个包含 keyvalueheaders 属性的对象,这些属性的值类型为 Buffer。随后 Nest 会将这些缓冲值转换为字符串进行解析。如果字符串呈现"类对象"形式,Nest 会尝试将其作为 JSON 进行解析。最终 value 会被传递至其关联的处理器。

传出

Nest 在发布事件或发送消息时,会通过序列化过程发送传出的 Kafka 消息。该过程会对传入 ClientKafkaProxyemit()send() 方法的参数,或从 @MessagePattern 方法返回的值进行序列化。此序列化过程会通过 JSON.stringify() 或原型方法 toString() 将非字符串或缓冲区的对象"字符串化"。

heroes.controller
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const dragonId = message.dragonId;
    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];
    return items;
  }
}

info 注意 @Payload() 需从 @nestjs/microservices 包中导入。

传出消息也可以通过传递包含 keyvalue 属性的对象进行键控。消息键控对于满足共同分区要求非常重要。

heroes.controller
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const realm = 'Nest';
    const heroId = message.heroId;
    const dragonId = message.dragonId;

    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];

    return {
      headers: {
        realm
      },
      key: heroId,
      value: items
    }
  }
}

此外,以此格式传递的消息还可以包含设置在 headers 哈希属性中的自定义标头。标头哈希属性值必须是 string 类型或 Buffer 类型。

heroes.controller
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const realm = 'Nest';
    const heroId = message.heroId;
    const dragonId = message.dragonId;

    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];

    return {
      headers: {
        kafka_nestRealm: realm
      },
      key: heroId,
      value: items
    }
  }
}

基于事件

虽然请求-响应方法非常适合服务间交换消息,但当您的消息风格是基于事件的(这反过来又非常适合 Kafka)时就不太适用——当您只想发布事件而不等待响应时。在这种情况下,您不希望请求-响应为维护两个主题带来的开销。

查看以下两个部分以了解更多信息: 概述:基于事件概述:发布事件

上下文

在更复杂的场景中,您可能需要访问有关传入请求的额外信息。使用 Kafka 传输器时,可以访问 KafkaContext 对象。

@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  console.log(`Topic: ${context.getTopic()}`);
}

info 提示@Payload()@Ctx()KafkaContext 都是从 @nestjs/microservices 包导入的。

要访问原始的 Kafka IncomingMessage 对象,请使用 KafkaContext 对象的 getMessage() 方法,如下所示:

@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const originalMessage = context.getMessage();
  const partition = context.getPartition();
  const { headers, timestamp } = originalMessage;
}

IncomingMessage 满足以下接口时:

interface IncomingMessage {
  topic: string;
  partition: number;
  timestamp: string;
  size: number;
  attributes: number;
  offset: string;
  key: any;
  value: any;
  headers: Record<string, any>;
}

如果处理程序对每条接收到的消息处理时间较长,应考虑使用 heartbeat 回调。要获取 heartbeat 函数,请使用 KafkaContextgetHeartbeat() 方法,如下所示:

@MessagePattern('hero.kill.dragon')
async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const heartbeat = context.getHeartbeat();

  // Do some slow processing
  await doWorkPart1();

  // Send heartbeat to not exceed the sessionTimeout
  await heartbeat();

  // Do some slow processing again
  await doWorkPart2();
}

命名规范

Kafka 微服务组件会在 client.clientIdconsumer.groupId 选项后附加各自角色描述,以防止 Nest 微服务客户端与服务器组件之间发生冲突。默认情况下,ClientKafkaProxy 组件会附加 -client,而 ServerKafka 组件会附加 -server 到这两个选项中。请注意下方提供的值是如何按此方式转换的(如注释所示)。

main
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-server
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-server
    },
  }
});

对于客户端:

heroes.controller
@Client({
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-client
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-client
    }
  }
})
client: ClientKafkaProxy;

info 提示 Kafka 客户端和消费者的命名规则可以通过在自定义提供程序中扩展 ClientKafkaProxyKafkaServer 并重写构造函数来自定义。

由于 Kafka 微服务消息模式使用两个主题分别处理请求和回复通道,回复模式应从请求主题派生。默认情况下,回复主题的名称由请求主题名称与附加的 .reply 组合而成。

heroes.controller
onModuleInit() {
  this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
}

info 提示 Kafka 回复主题的命名规则可以通过在自定义提供程序中扩展 ClientKafkaProxy 并重写 getResponsePatternName 方法来自定义。

可重试异常

与其他传输器类似,所有未处理的异常都会被自动包装成 RpcException 并转换为"用户友好"格式。但在某些边缘情况下,您可能希望绕过此机制,让异常由 kafkajs 驱动程序直接处理。在处理消息时抛出异常会指示 kafkajs 重试该消息(重新投递),这意味着即使消息(或事件)处理程序已被触发,偏移量也不会提交到 Kafka。

warning 注意 对于事件处理程序(基于事件的通信),默认情况下所有未处理异常都被视为可重试异常

为此,您可以使用名为 KafkaRetriableException 的专用类,如下所示:

throw new KafkaRetriableException('...');

info:KafkaRetriableException 类是从 @nestjs/microservices 包中导出的。

自定义异常处理

除了默认的错误处理机制外,您还可以为 Kafka 事件创建自定义的异常过滤器来管理重试逻辑。例如,以下示例展示了如何在可配置的重试次数后跳过问题事件:

import { Catch, ArgumentsHost, Logger } from '@nestjs/common';
import { BaseExceptionFilter } from '@nestjs/core';
import { KafkaContext } from '../ctx-host';

@Catch()
export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
  private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name);

  constructor(
    private readonly maxRetries: number,
    // Optional custom function executed when max retries are exceeded
    private readonly skipHandler?: (message: any) => Promise<void>
  ) {
    super();
  }

  async catch(exception: unknown, host: ArgumentsHost) {
    const kafkaContext = host.switchToRpc().getContext<KafkaContext>();
    const message = kafkaContext.getMessage();
    const currentRetryCount = this.getRetryCountFromContext(kafkaContext);

    if (currentRetryCount >= this.maxRetries) {
      this.logger.warn(
        `Max retries (${
          this.maxRetries
        }) exceeded for message: ${JSON.stringify(message)}`
      );

      if (this.skipHandler) {
        try {
          await this.skipHandler(message);
        } catch (err) {
          this.logger.error('Error in skipHandler:', err);
        }
      }

      try {
        await this.commitOffset(kafkaContext);
      } catch (commitError) {
        this.logger.error('Failed to commit offset:', commitError);
      }
      return; // Stop propagating the exception
    }

    // If retry count is below the maximum, proceed with the default Exception Filter logic
    super.catch(exception, host);
  }

  private getRetryCountFromContext(context: KafkaContext): number {
    const headers = context.getMessage().headers || {};
    const retryHeader = headers['retryCount'] || headers['retry-count'];
    return retryHeader ? Number(retryHeader) : 0;
  }

  private async commitOffset(context: KafkaContext): Promise<void> {
    const consumer = context.getConsumer && context.getConsumer();
    if (!consumer) {
      throw new Error('Consumer instance is not available from KafkaContext.');
    }

    const topic = context.getTopic && context.getTopic();
    const partition = context.getPartition && context.getPartition();
    const message = context.getMessage();
    const offset = message.offset;

    if (!topic || partition === undefined || offset === undefined) {
      throw new Error(
        'Incomplete Kafka message context for committing offset.'
      );
    }

    await consumer.commitOffsets([
      {
        topic,
        partition,
        // When committing an offset, commit the next number (i.e., current offset + 1)
        offset: (Number(offset) + 1).toString(),
      },
    ]);
  }
}

该过滤器提供了将 Kafka 事件重试处理至可配置次数的功能。当达到最大重试次数时,会触发自定义的 skipHandler(如果提供)并提交偏移量,从而跳过问题事件。这使得后续事件能够不受干扰地继续处理。

您可以通过将此过滤器添加到事件处理器中进行集成:

@UseFilters(new KafkaMaxRetryExceptionFilter(5))
export class MyEventHandler {
  @EventPattern('your-topic')
  async handleEvent(@Payload() data: any, @Ctx() context: KafkaContext) {
    // Your event processing logic...
  }
}

提交偏移量

在使用 Kafka 时,提交偏移量至关重要。默认情况下,消息会在特定时间后自动提交。更多信息请参阅 KafkaJS 文档KafkaContext 提供了一种访问活跃消费者以手动提交偏移量的方式。该消费者即为 KafkaJS 消费者,其工作方式与原生 KafkaJS 实现一致。

@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
  // business logic

const { offset } = context.getMessage(); const partition = context.getPartition(); const topic = context.getTopic(); const consumer = context.getConsumer(); await consumer.commitOffsets([{ topic, partition, offset }]) }

要禁用消息自动提交,请在 `run` 配置中设置 `autoCommit: false`,如下所示: ```typescript title="main" const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, { transport: Transport.KAFKA, options: { client: { brokers: ['localhost:9092'], }, run: { autoCommit: false } } });

实例状态更新

要获取连接及底层驱动实例状态的实时更新,您可以订阅 status 流。该流提供所选驱动特有的状态更新。对于 Kafka 驱动,status 流会发出 connected(已连接)、disconnected(已断开)、rebalancing(再平衡)、crashed(崩溃)和 stopped(停止)事件。

this.client.status.subscribe((status: KafkaStatus) => {
  console.log(status);
});

提示 KafkaStatus 类型是从 @nestjs/microservices 包导入的。

同样地,您可以订阅服务器的 status 流来接收有关服务器状态的通知。

const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: KafkaStatus) => {
  console.log(status);
});

底层生产者与消费者

对于更高级的用例,您可能需要访问底层生产者和消费者实例。这在手动关闭连接或使用驱动程序特定方法等场景中会很有用。但请注意,在大多数情况下,您无需直接访问驱动程序。

为此,您可以使用 ClientKafkaProxy 实例公开的 producerconsumer 获取方法。

const producer = this.client.producer;
const consumer = this.client.consumer;