Vert.x Kafka 客户端

此组件提供了一个 Kafka 客户端,用于从 Apache Kafka 集群读取和发送消息。

作为消费者,该 API 提供了订阅主题分区以异步接收消息或将其作为流读取的方法(甚至可以暂停/恢复流)。

作为生产者,该 API 提供了向主题分区发送消息的方法,类似于写入流。

使用 Vert.x Kafka 客户端

要使用此组件,请将以下依赖项添加到构建描述符的依赖项部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-kafka-client</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile "io.vertx:vertx-kafka-client:5.0.1"

创建 Kafka 客户端

创建消费者和生产者的方式与使用原生 Kafka 客户端库的方式非常相似。

它们需要根据 Apache Kafka 官方文档中描述的大量属性进行配置,包括消费者生产者的配置。

为此,可以配置一个包含此类属性的映射,并将其传递给 KafkaConsumerKafkaProducer 暴露的静态创建方法之一。

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// use consumer for interacting with Apache Kafka
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

在上述示例中,使用映射实例创建了一个 KafkaConsumer 实例,以指定要连接的 Kafka 节点列表(只有一个)以及用于从每个接收到的消息中获取键和值的反序列化器。

同样可以创建一个生产者

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("acks", "1");

// use producer for interacting with Apache Kafka
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);
创建 KafkaConsumer 的事件循环将处理其消息。例如,如果您希望消息在 Verticle 的事件循环上处理,请在 Verticle 的 `start` 方法中创建 Kafka 消费者。

从主题接收消息并加入消费者组

为了开始从 Kafka 主题接收消息,消费者可以使用 subscribe 方法订阅属于消费者组(由创建时的属性指定)的一组主题。

也可以使用 subscribe 方法通过指定 Java 正则表达式来订阅更多主题。

您还需要使用 handler 注册一个处理器来处理传入消息。

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// subscribe to several topics with list
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer.subscribe(topics);

// or using a Java regex
Pattern pattern = Pattern.compile("topic\\d");
consumer.subscribe(pattern);

// or just subscribe to a single topic
consumer.subscribe("a-single-topic");

可以在调用 subscribe() 之前或之后注册处理器;在两个方法都调用之前,消息不会被消费。这允许您先调用 subscribe(),然后是 seek(),最后是 handler(),以便仅从特定偏移量开始消费消息,例如。

在订阅期间也可以传递一个处理器,以了解订阅结果并在操作完成时收到通知。

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// subscribe to several topics
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer
  .subscribe(topics)
  .onSuccess(v ->
    System.out.println("subscribed")
  ).onFailure(cause ->
    System.out.println("Could not subscribe " + cause.getMessage())
  );

// or just subscribe to a single topic
consumer
  .subscribe("a-single-topic")
  .onSuccess(v ->
    System.out.println("subscribed")
  ).onFailure(cause ->
    System.out.println("Could not subscribe " + cause.getMessage())
  );

使用消费者组方式,Kafka 集群会将分区分配给消费者,同时考虑同一消费者组中的其他连接消费者,从而使分区可以分散到它们之间。

当消费者离开组(已分配的分区可以自由分配给其他消费者)或新消费者加入组(它希望读取分区)时,Kafka 集群会处理分区再平衡。

您可以在 KafkaConsumer 上注册处理器,以便使用 partitionsRevokedHandlerpartitionsAssignedHandler 接收 Kafka 集群分区撤销和分配的通知。

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// registering handlers for assigned and revoked partitions
consumer.partitionsAssignedHandler(topicPartitions -> {
  System.out.println("Partitions assigned");
  for (TopicPartition topicPartition : topicPartitions) {
    System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  }
});

consumer.partitionsRevokedHandler(topicPartitions -> {
  System.out.println("Partitions revoked");
  for (TopicPartition topicPartition : topicPartitions) {
    System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
  }
});

// subscribes to the topic
consumer
  .subscribe("test")
  .onSuccess(v ->
    System.out.println("subscribed")
  ).onFailure(cause ->
    System.out.println("Could not subscribe " + cause.getMessage())
  );

在加入消费者组接收消息后,消费者可以决定离开消费者组,不再接收消息,使用 unsubscribe

consumer.unsubscribe();

您可以添加一个处理器来接收结果通知。

consumer
  .unsubscribe()
  .onSuccess(v ->
    System.out.println("Consumer unsubscribed")
  );

从主题接收消息并请求特定分区

除了作为消费者组的一部分从主题接收消息外,消费者还可以请求特定的主题分区。当消费者不属于消费者组时,整个应用程序不能依赖再平衡功能。

您可以使用 assign 来请求特定分区。

consumer.handler(record -> {
  System.out.println("key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

//
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(new TopicPartition()
  .setTopic("test")
  .setPartition(0));

// requesting to be assigned the specific partition
consumer
  .assign(topicPartitions)
  .onSuccess(v -> System.out.println("Partition assigned"))
  // After the assignment is completed, get the assigned partitions to this consumer
  .compose(v -> consumer.assignment())
  .onSuccess(partitions -> {
    for (TopicPartition topicPartition : partitions) {
      System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
    }
  });

subscribe() 类似,可以在调用 assign() 之前或之后注册处理器;在两个方法都调用之前,消息不会被消费。这允许您先调用 assign(),然后是 seek(),最后是 handler(),以便仅从特定偏移量开始消费消息,例如。

调用 assignment 提供当前分配的分区列表。

使用显式轮询接收消息

除了使用内部轮询机制从 Kafka 接收消息外,客户端还可以订阅主题,避免注册用于获取消息的处理器,然后使用 poll 方法。

通过这种方式,用户应用程序负责在需要时执行轮询以获取消息,例如在处理完先前的消息之后。

consumer
  .subscribe("test")
  .onSuccess(v -> {
    System.out.println("Consumer subscribed");

    // Let's poll every second
    vertx.setPeriodic(1000, timerId ->
      consumer
        .poll(Duration.ofMillis(100))
        .onSuccess(records -> {
          for (int i = 0; i < records.size(); i++) {
            KafkaConsumerRecord<String, String> record = records.recordAt(i);
            System.out.println("key=" + record.key() + ",value=" + record.value() +
              ",partition=" + record.partition() + ",offset=" + record.offset());
          }
        })
        .onFailure(cause -> {
          System.out.println("Something went wrong when polling " + cause.toString());
          cause.printStackTrace();

          // Stop polling if something went wrong
          vertx.cancelTimer(timerId);
        })
    );
});

成功订阅后,应用程序会启动一个周期性计时器,以便定期执行轮询并从 Kafka 获取消息。

更改订阅或分配

在开始消费消息后,您可以通过再次调用 subscribe()assign() 来更改已订阅的主题或已分配的分区。

请注意,由于内部消息缓冲,记录处理器可能在 subscribe()assign() 方法的完成处理器被调用之后继续观察来自旧订阅或分配的消息。批处理处理器观察到的消息则不是这种情况:一旦完成处理器被调用,它将只观察从新订阅或分配中读取的消息。

获取主题分区信息

您可以调用 partitionsFor 来获取指定主题的分区信息。

consumer
  .partitionsFor("test")
  .onSuccess(partitions -> {
    for (PartitionInfo partitionInfo : partitions) {
      System.out.println(partitionInfo);
    }
  });

此外,listTopics 提供所有可用主题及其相关分区。

consumer
  .listTopics()
  .onSuccess(partitionsTopicMap ->
    partitionsTopicMap.forEach((topic, partitions) -> {
      System.out.println("topic = " + topic);
      System.out.println("partitions = " + partitions);
    })
  );

手动偏移量提交

在 Apache Kafka 中,消费者负责处理最后读取消息的偏移量。

这是通过每次从主题分区读取一批消息时自动执行的提交操作来实现的。创建消费者时,配置参数 enable.auto.commit 必须设置为 true

手动偏移量提交可以通过 commit 实现。它可以用于实现至少一次交付,以确保在提交偏移量之前处理已读取的消息。

consumer.commit().onSuccess(v ->
  System.out.println("Last read message offset committed")
);

在主题分区中查找

Apache Kafka 可以长时间保留消息,消费者可以在主题分区内查找并获得对消息的任意访问。

您可以使用 seek 来更改在特定位置读取的偏移量。

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// seek to a specific offset
consumer
  .seek(topicPartition, 10)
  .onSuccess(v -> System.out.println("Seeking done"));

当消费者需要从头开始重新读取流时,可以使用 seekToBeginning

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// seek to the beginning of the partition
consumer
  .seekToBeginning(Collections.singleton(topicPartition))
  .onSuccess(v -> System.out.println("Seeking done"));

最后,seekToEnd 可用于返回分区的末尾。

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// seek to the end of the partition
consumer
  .seekToEnd(Collections.singleton(topicPartition))
  .onSuccess(v -> System.out.println("Seeking done"));

请注意,由于内部消息缓冲,记录处理器可能在 seek*() 方法的完成处理器被调用之后一段时间内继续观察从原始偏移量读取的消息。批处理处理器观察到的消息则不是这种情况:一旦 seek*() 完成处理器被调用,它将只观察从新偏移量读取的消息。

偏移量查找

您可以使用 Kafka 0.10.1.1 中引入的 beginningOffsets API 来获取给定分区的第一个偏移量。与 seekToBeginning 不同,它不会更改消费者的偏移量。

Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);

consumer
  .beginningOffsets(topicPartitions)
  .onSuccess(results ->
    results.forEach((topic, beginningOffset) ->
      System.out.println(
        "Beginning offset for topic=" + topic.getTopic() + ", partition=" +
          topic.getPartition() + ", beginningOffset=" + beginningOffset
      )
    )
  );

// Convenience method for single-partition lookup
consumer
  .beginningOffsets(topicPartition)
  .onSuccess(beginningOffset ->
    System.out.println(
      "Beginning offset for topic=" + topicPartition.getTopic() + ", partition=" +
        topicPartition.getPartition() + ", beginningOffset=" + beginningOffset
    )
  );

您可以使用 Kafka 0.10.1.1 中引入的 endOffsets API 来获取给定分区的最后一个偏移量。与 seekToEnd 不同,它不会更改消费者的偏移量。

Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);

consumer.endOffsets(topicPartitions)
  .onSuccess(results ->
    results.forEach((topic, beginningOffset) ->
      System.out.println(
        "End offset for topic=" + topic.getTopic() + ", partition=" +
          topic.getPartition() + ", beginningOffset=" + beginningOffset
      )
    )
  );

// Convenience method for single-partition lookup
consumer
  .endOffsets(topicPartition)
  .onSuccess(endOffset ->
    System.out.println(
      "End offset for topic=" + topicPartition.getTopic() + ", partition=" +
        topicPartition.getPartition() + ", endOffset=" + endOffset
    )
);

您可以使用 Kafka 0.10.1.1 中引入的 offsetsForTimes API 按时间戳查找偏移量,即搜索参数是纪元时间戳,调用将返回摄取时间戳大于或等于给定时间戳的最低偏移量。

Map<TopicPartition, Long> topicPartitionsWithTimestamps = new HashMap<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);

// We are interested in the offset for data ingested 60 seconds ago
long timestamp = (System.currentTimeMillis() - 60000);

topicPartitionsWithTimestamps.put(topicPartition, timestamp);
consumer
  .offsetsForTimes(topicPartitionsWithTimestamps)
  .onSuccess(results ->
    results.forEach((topic, offset) ->
      System.out.println(
        "Offset for topic=" + topic.getTopic() +
        ", partition=" + topic.getPartition() + "\n" +
        ", timestamp=" + timestamp + ", offset=" + offset.getOffset() +
        ", offsetTimestamp=" + offset.getTimestamp()
      )
    )
);

// Convenience method for single-partition lookup
consumer.offsetsForTimes(topicPartition, timestamp).onSuccess(offsetAndTimestamp ->
  System.out.println(
    "Offset for topic=" + topicPartition.getTopic() +
    ", partition=" + topicPartition.getPartition() + "\n" +
    ", timestamp=" + timestamp + ", offset=" + offsetAndTimestamp.getOffset() +
    ", offsetTimestamp=" + offsetAndTimestamp.getTimestamp()
  )
);

消息流控制

消费者可以控制传入消息流并暂停/恢复从主题读取的操作,例如,当它需要更多时间处理实际消息时可以暂停消息流,然后恢复以继续消息处理。

为此,您可以使用 pauseresume

对于特定分区的暂停和恢复,记录处理器可能在 pause() 方法的完成处理器被调用之后一段时间内继续观察来自已暂停分区的消息。批处理处理器观察到的消息则不是这种情况:一旦 pause() 完成处理器被调用,它将只观察来自未暂停分区的消息。

TopicPartition topicPartition = new TopicPartition()
  .setTopic("test")
  .setPartition(0);

// registering the handler for incoming messages
consumer.handler(record -> {
  System.out.println("key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());

  // i.e. pause/resume on partition 0, after reading message up to offset 5
  if ((record.partition() == 0) && (record.offset() == 5)) {

    // pause the read operations
    consumer.pause(topicPartition)
      .onSuccess(v -> System.out.println("Paused"))
      .onSuccess(v -> vertx.setTimer(5000, timeId ->
        // resume read operations
        consumer.resume(topicPartition)
      ));
  }
});

关闭消费者

调用 close 关闭消费者。关闭消费者将关闭所有打开的连接并释放所有消费者资源。

关闭操作实际上是异步的,可能在调用返回后一段时间才能完成。如果您希望在实际关闭完成时收到通知,则可以传入一个处理器。

当关闭完全完成时,将调用此处理器。

consumer
  .close()
  .onSuccess(v -> System.out.println("Consumer is now closed"))
  .onFailure(cause -> System.out.println("Close failed: " + cause));

向主题发送消息

您可以使用 write 向主题发送消息(记录)。

发送消息最简单的方法是只指定目标主题和相关值,省略其键或分区,在这种情况下,消息将以轮询方式发送到主题的所有分区。

for (int i = 0; i < 5; i++) {

  // only topic and message value are specified, round robin on destination partitions
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", "message_" + i);

  producer.write(record);
}

您可以接收消息发送的元数据,例如其主题、目标分区及其分配的偏移量。

for (int i = 0; i < 5; i++) {

  // only topic and message value are specified, round robin on destination partitions
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", "message_" + i);

  producer.send(record).onSuccess(recordMetadata ->
    System.out.println(
      "Message " + record.value() + " written on topic=" + recordMetadata.getTopic() +
      ", partition=" + recordMetadata.getPartition() +
      ", offset=" + recordMetadata.getOffset()
    )
  );
}

当您需要为消息分配分区时,可以指定其分区标识符或其键。

for (int i = 0; i < 10; i++) {

  // a destination partition is specified
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", null, "message_" + i, 0);

  producer.write(record);
}

由于生产者使用键哈希来标识目标,您可以使用它来保证所有具有相同键的消息都发送到同一分区并保留顺序。

for (int i = 0; i < 10; i++) {

  // i.e. defining different keys for odd and even messages
  int key = i % 2;

  // a key is specified, all messages with same key will be sent to the same partition
  KafkaProducerRecord<String, String> record =
    KafkaProducerRecord.create("test", String.valueOf(key), "message_" + i);

  producer.write(record);
}
共享生产者在第一次调用 createShared 时创建,其配置也在此时定义,共享生产者的使用必须使用相同的配置。

共享生产者

有时您希望在多个 verticle 或上下文中共享同一个生产者。

调用 KafkaProducer.createShared 返回一个可以安全共享的生产者。

KafkaProducer<String, String> producer1 = KafkaProducer.createShared(vertx, "the-producer", config);

// Sometimes later you can close it
producer1.close();

通过此方法返回的生产者将共享相同的资源(线程、连接)。

当您使用完生产者后,只需关闭它;当所有共享生产者都关闭时,资源将为您释放。

关闭生产者

调用 close 关闭生产者。关闭生产者将关闭所有打开的连接并释放所有生产者资源。

关闭操作实际上是异步的,可能在调用返回后一段时间才能完成。如果您希望在实际关闭完成时收到通知,则可以传入一个处理器。

当关闭完全完成时,将调用此处理器。

producer
  .close()
  .onSuccess(v -> System.out.println("Producer is now closed"))
  .onFailure(cause -> System.out.println("Close failed: " + cause));

获取主题分区信息

您可以调用 partitionsFor 来获取指定主题的分区信息。

producer
  .partitionsFor("test")
  .onSuccess(partitions ->
    partitions.forEach(System.out::println)
  );

错误处理

Kafka 客户端(消费者或生产者)与 Kafka 集群之间的错误处理(例如超时)通过使用 exceptionHandlerexceptionHandler 完成。

consumer.exceptionHandler(e -> {
  System.out.println("Error = " + e.getMessage());
});

verticle 中的自动清理

如果您在 verticle 内部创建消费者和生产者,那么当 verticle 被卸载时,这些消费者和生产者将自动关闭。

使用 Vert.x 序列化器/反序列化器

Vert.x Kafka 客户端开箱即用地支持缓冲区、JSON 对象和 JSON 数组的序列化器和反序列化器。

在消费者中可以使用缓冲区

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// Creating a consumer able to deserialize to json object
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// Creating a consumer able to deserialize to json array
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

或者在生产者中

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("acks", "1");

// Creating a producer able to serialize to json object
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("acks", "1");

// Creating a producer able to serialize to json array
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("acks", "1");

您也可以在创建时指定序列化器/反序列化器。

在消费者中

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");

// Creating a consumer able to deserialize buffers
KafkaConsumer<Buffer, Buffer> bufferConsumer = KafkaConsumer.create(vertx, config, Buffer.class, Buffer.class);

// Creating a consumer able to deserialize json objects
KafkaConsumer<JsonObject, JsonObject> jsonObjectConsumer = KafkaConsumer.create(vertx, config, JsonObject.class, JsonObject.class);

// Creating a consumer able to deserialize json arrays
KafkaConsumer<JsonArray, JsonArray> jsonArrayConsumer = KafkaConsumer.create(vertx, config, JsonArray.class, JsonArray.class);

或者在生产者中

Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("acks", "1");

// Creating a producer able to serialize to buffers
KafkaProducer<Buffer, Buffer> bufferProducer = KafkaProducer.create(vertx, config, Buffer.class, Buffer.class);

// Creating a producer able to serialize to json objects
KafkaProducer<JsonObject, JsonObject> jsonObjectProducer = KafkaProducer.create(vertx, config, JsonObject.class, JsonObject.class);

// Creating a producer able to serialize to json arrays
KafkaProducer<JsonArray, JsonArray> jsonArrayProducer = KafkaProducer.create(vertx, config, JsonArray.class, JsonArray.class);

RxJava 3 API

Kafka 客户端提供了原始 API 的 Rx 化版本。

Observable<KafkaConsumerRecord<String, Long>> observable = consumer.toObservable();

observable
  .map(record -> record.value())
  .buffer(256)
  .map(
  list -> list.stream().mapToDouble(n -> n).average()
).subscribe(val -> {

  // Obtained an average

});

自动追踪传播

当 Vert.x 配置为启用追踪(参见 setTracingOptions)时,追踪将随 Kafka 消息自动传播。

Kafka 生产者在写入消息时会向追踪添加一个跨度(span),追踪上下文将作为 Kafka 头部传播,消费者在读取消息时也会创建一个跨度。

遵循 OpenTracing 语义约定,span 标签如下:

  • span.kind,可以是 consumerproducer

  • peer.address 可以从 setTracePeerAddress 配置。如果未设置,它将是配置的引导服务器。

  • peer.hostnamepeer.address 解析。

  • peer.portpeer.address 解析。

  • peer.service 始终是 kafka

  • message_bus.destination,设置为正在使用的主题。

Vert.x Kafka 管理客户端

此组件提供了 Kafka Admin Client API 的 Vert.x 封装。Kafka Admin Client 用于创建、修改和删除主题。它还提供了处理 ACL(访问控制列表)、消费者组等的方法。

创建 Kafka 管理客户端

创建管理客户端的方式与使用原生 Kafka 客户端库的方式非常相似。

它需要根据 Apache Kafka 官方文档中描述的大量属性进行配置,包括管理端的配置。

为此,可以配置一个包含此类属性的映射,并将其传递给 KafkaAdminClient 暴露的静态创建方法之一。

Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

KafkaAdminClient adminClient = KafkaAdminClient.create(vertx, config);

列出主题

您可以调用 listTopics 来列出集群中的主题。唯一的参数是用于处理结果的常规回调,它提供主题列表。

adminClient.listTopics().onSuccess(topics ->
    System.out.println("Topics= " + topics)
);

描述主题

您可以调用 describeTopics 来描述集群中的主题。描述主题意味着获取所有相关元数据,如分区数量、副本、领导者、同步副本等。所需的参数是要描述的主题名称列表,以及用于处理结果的常规回调,该回调提供一个包含主题名称和相关 TopicDescription 的映射。

adminClient.describeTopics(Collections.singletonList("my-topic")).onSuccess(topics -> {
  TopicDescription topicDescription = topics.get("first-topic");

  System.out.println("Topic name=" + topicDescription.getName() +
      " isInternal= " + topicDescription.isInternal() +
      " partitions= " + topicDescription.getPartitions().size());

  for (TopicPartitionInfo topicPartitionInfo : topicDescription.getPartitions()) {
    System.out.println("Partition id= " + topicPartitionInfo.getPartition() +
      " leaderId= " + topicPartitionInfo.getLeader().getId() +
      " replicas= " + topicPartitionInfo.getReplicas() +
      " isr= " + topicPartitionInfo.getIsr());
  }
});

创建主题

您可以调用 createTopics 在集群中创建主题。所需的参数是要创建的主题列表,以及用于处理结果的常规回调。要创建的主题通过 NewTopic 类定义,指定名称、分区数量和复制因子。也可以描述副本分配,将每个副本映射到代理 ID,而不是指定分区数量和复制因子(在这种情况下必须设置为 -1)。

adminClient.createTopics(Collections.singletonList(new NewTopic("testCreateTopic", 1, (short)1)))
  .onSuccess(v -> {
    // topics created successfully
  })
  .onFailure(cause -> {
    // something went wrong when creating the topics
  });

删除主题

您可以调用 deleteTopics 删除集群中的主题。所需的参数是要删除的主题列表,以及用于处理结果的常规回调。

adminClient.deleteTopics(Collections.singletonList("topicToDelete"))
  .onSuccess(v -> {
    // topics deleted successfully
  })
  .onFailure(cause -> {
    // something went wrong when removing the topics
  });

描述配置

您可以调用 describeConfigs 来描述资源配置。描述资源配置意味着获取集群资源(如主题或代理)的所有配置信息。所需的参数是您想要配置的资源列表,以及用于处理结果的常规回调。资源由 ConfigResource 集合描述,而结果将每个资源映射到一个对应的 Config,其中每个配置参数都有更多的 ConfigEntry

adminClient.describeConfigs(Collections.singletonList(
  new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, "my-topic"))).onSuccess(configs -> {
  // check the configurations
});

更改配置

您可以调用 alterConfigs 来更改资源配置。更改资源配置意味着更新集群资源(如主题或代理)的配置信息。所需的参数是带有相关配置的资源列表(待更新),以及用于处理结果的常规回调。可以通过一次调用更改不同资源的配置。输入参数将每个 ConfigResource 映射到您要应用的相应 Config

ConfigResource resource = new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, "my-topic");
// create a entry for updating the retention.ms value on the topic
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "51000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singletonList(retentionEntry)));
adminClient.alterConfigs(updateConfig)
  .onSuccess(v -> {
    // configuration altered successfully
  })
  .onFailure(cause -> {
    // something went wrong when altering configs
  });

列出消费者组

您可以调用 listConsumerGroups 来列出集群中的消费者组。唯一的参数是用于处理结果的常规回调,它提供消费者组列表。

adminClient.listConsumerGroups().onSuccess(consumerGroups ->
  System.out.println("ConsumerGroups= " + consumerGroups)
);

描述消费者组

您可以调用 describeConsumerGroups 来描述集群中的消费者组。描述消费者组意味着获取所有相关信息,如成员、相关 ID、订阅的主题、分区分配等。所需的参数是要描述的消费者组名称列表,以及用于处理结果的常规回调,该回调提供一个包含消费者组名称和相关 MemberDescription 的映射。

adminClient.describeTopics(Collections.singletonList("my-topic")).onSuccess(topics -> {
  TopicDescription topicDescription = topics.get("first-topic");

  System.out.println("Topic name=" + topicDescription.getName() +
      " isInternal= " + topicDescription.isInternal() +
      " partitions= " + topicDescription.getPartitions().size());

  for (TopicPartitionInfo topicPartitionInfo : topicDescription.getPartitions()) {
    System.out.println("Partition id= " + topicPartitionInfo.getPartition() +
      " leaderId= " + topicPartitionInfo.getLeader().getId() +
      " replicas= " + topicPartitionInfo.getReplicas() +
      " isr= " + topicPartitionInfo.getIsr());
  }
});