事件总线桥接

Vert.x Camel 桥接

Apache Camel (http://camel.apache.org) 是一个开源的 Java 框架,致力于使集成对开发者来说更简单、更易于访问。这个桥接允许 Vert.x 应用程序与 Camel 端点进行交互

  • 应用程序可以将消息发送到 Camel。

  • 应用程序可以从 Camel 接收消息。

该桥接依赖于 Vert.x 事件总线,并将事件总线地址与 Camel 端点关联起来。

该组件不是多语言的,因为它需要一些只能在 Java 中使用的 Camel 类。

使用 vertx-camel-bridge

要使用 Vert.x Camel 桥接,请将以下依赖项添加到构建描述符的dependencies部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-camel-bridge</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile 'io.vertx:vertx-camel-bridge:5.0.1'

桥接配置

在使用之前,需要对桥接进行配置和启动

CamelContext camel = new DefaultCamelContext();
CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
).start();

桥接需要一个 CamelContext。它将从上下文中查找端点。桥接在使用前需要启动。请注意,start 方法是异步的。您可以使用返回的 Future 在桥接启动时获得通知。

入站映射

入站映射将 Camel 端点与事件总线地址关联起来。在此端点上接收到的消息会转换为事件总线消息。

Endpoint endpoint = camel.getEndpoint("direct:foo");

CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address"))
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .withoutHeadersCopy())
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .usePublish())
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .withBodyType(String.class))
);

上述代码片段展示了配置入站映射的不同方式

  • 您可以使用 Endpoint 对象或其 URI 来配置 Camel 端点

  • 您可以禁用消息头复制(Camel 消息头会被复制到事件总线消息中)

  • 您可以使用 publish 而不是 send 将消息广播到所有事件总线消费者

  • 您可以配置事件总线消息体的类型。如果未设置,它将使用 Camel 消息负载。如果已设置,它将在 Camel 上下文中查找 Camel 消息负载与所需类型之间的转换器。

注意: org.fusesource.hawtbuf.Buffer 会自动转换为 Buffer

如果使用 send(而不是 publish),并且当 Camel 交换期望回复(In Out 交换)时,Vert.x 代码会期望对发送的消息进行回复。当回复到达时,它会传播到交换中

Endpoint endpoint = camel.getEndpoint("direct:stuff");

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addInboundMapping(new InboundMapping().setAddress("test-reply").setEndpoint(endpoint)));

vertx.eventBus().consumer("with-reply", message -> {
  message.reply("How are you ?");
});

camel.start();
bridge.start();

ProducerTemplate template = camel.createProducerTemplate();
Future<Object> future = template.asyncRequestBody(endpoint, "hello");
String response = template.extractFutureBody(future, String.class);

您还可以使用 setTimeout 配置回复 timeout

出站映射

出站映射将事件总线地址与 Camel 端点关联起来。在此事件总线地址上接收到的消息会转换为 Camel 消息并发送到该端点。

Endpoint endpoint = camel.getEndpoint("stream:out");

CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint)
            .withoutHeadersCopy())
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
);

上述代码片段展示了配置出站映射的不同方式。

您可以将出站映射连接到 Camel 路由

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:start")
        .transform(constant("OK"));
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("direct:start")));

camel.start();
bridge.start();


vertx.eventBus().request("test", "hello").onComplete(reply -> {
  // Reply from the route (here it's "OK")
});

如果您在事件总线上发送消息时注册了回复处理器,它会配置 Camel 交换以期望响应(它使用了 EIP 的请求-回复模式)。响应会作为回复体传递。如果路由失败,您会收到回复失败(收件人失败),并附带消息作为原因

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:my-route")
        .to("https://:8080");
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route")));

camel.start();
bridge.start();

vertx.eventBus().request("camel-route", "hello").onComplete(reply -> {
  if (reply.succeeded()) {
    Object theResponse = reply.result().body();
  } else {
    Throwable theCause = reply.cause();
  }
});

如果您应用的处理器是阻塞的,您必须blocking设置为 true。这可以避免在事件循环线程上执行处理。

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:my-route")
      .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
          // Do something blocking...
        }
      })
      .to("https://:8080");
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
  .addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true)));

camel.start();
bridge.start();

vertx.eventBus().request("camel-route", "hello").onComplete(reply -> {
  if (reply.succeeded()) {
    Object theResponse = reply.result().body();
  } else {
    Throwable theCause = reply.cause();
  }
});

默认情况下,它使用默认的工作线程池,这可以通过 setWorkerExecutor 方法进行自定义。

停止桥接

不要忘记使用 stop 方法停止桥接。stop 方法是异步的。您可以使用返回的 Future 在桥接停止时获得通知。

交换自定义对象

如果您想发送和接收自定义对象,您需要在事件总线上注册一个编解码器

vertx.eventBus().registerDefaultCodec(Person.class, codec);