<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-camel-bridge</artifactId>
<version>5.0.1</version>
</dependency>
Vert.x Camel 桥接
Apache Camel (http://camel.apache.org) 是一个开源的 Java 框架,致力于使集成对开发者来说更简单、更易于访问。这个桥接允许 Vert.x 应用程序与 Camel 端点进行交互
-
应用程序可以将消息发送到 Camel。
-
应用程序可以从 Camel 接收消息。
该桥接依赖于 Vert.x 事件总线,并将事件总线地址与 Camel 端点关联起来。
该组件不是多语言的,因为它需要一些只能在 Java 中使用的 Camel 类。 |
使用 vertx-camel-bridge
要使用 Vert.x Camel 桥接,请将以下依赖项添加到构建描述符的dependencies部分
-
Maven(在您的
pom.xml
中)
-
Gradle(在您的
build.gradle
文件中)
compile 'io.vertx:vertx-camel-bridge:5.0.1'
桥接配置
在使用之前,需要对桥接进行配置和启动
CamelContext camel = new DefaultCamelContext();
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
).start();
桥接需要一个 CamelContext
。它将从上下文中查找端点。桥接在使用前需要启动。请注意,start
方法是异步的。您可以使用返回的 Future 在桥接启动时获得通知。
入站映射
入站映射将 Camel 端点与事件总线地址关联起来。在此端点上接收到的消息会转换为事件总线消息。
Endpoint endpoint = camel.getEndpoint("direct:foo");
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address"))
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.withoutHeadersCopy())
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.usePublish())
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.withBodyType(String.class))
);
上述代码片段展示了配置入站映射的不同方式
-
您可以使用
Endpoint
对象或其 URI 来配置 Camel 端点 -
您可以禁用消息头复制(Camel 消息头会被复制到事件总线消息中)
-
您可以使用
publish
而不是send
将消息广播到所有事件总线消费者 -
您可以配置事件总线消息体的类型。如果未设置,它将使用 Camel 消息负载。如果已设置,它将在 Camel 上下文中查找 Camel 消息负载与所需类型之间的转换器。
注意: org.fusesource.hawtbuf.Buffer
会自动转换为 Buffer
。
如果使用 send
(而不是 publish
),并且当 Camel 交换期望回复(In Out 交换)时,Vert.x 代码会期望对发送的消息进行回复。当回复到达时,它会传播到交换中
Endpoint endpoint = camel.getEndpoint("direct:stuff");
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addInboundMapping(new InboundMapping().setAddress("test-reply").setEndpoint(endpoint)));
vertx.eventBus().consumer("with-reply", message -> {
message.reply("How are you ?");
});
camel.start();
bridge.start();
ProducerTemplate template = camel.createProducerTemplate();
Future<Object> future = template.asyncRequestBody(endpoint, "hello");
String response = template.extractFutureBody(future, String.class);
您还可以使用 setTimeout
配置回复 timeout
。
出站映射
出站映射将事件总线地址与 Camel 端点关联起来。在此事件总线地址上接收到的消息会转换为 Camel 消息并发送到该端点。
Endpoint endpoint = camel.getEndpoint("stream:out");
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint)
.withoutHeadersCopy())
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
);
上述代码片段展示了配置出站映射的不同方式。
您可以将出站映射连接到 Camel 路由
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.transform(constant("OK"));
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("direct:start")));
camel.start();
bridge.start();
vertx.eventBus().request("test", "hello").onComplete(reply -> {
// Reply from the route (here it's "OK")
});
如果您在事件总线上发送消息时注册了回复处理器,它会配置 Camel 交换以期望响应(它使用了 EIP 的请求-回复模式)。响应会作为回复体传递。如果路由失败,您会收到回复失败(收件人失败),并附带消息作为原因
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:my-route")
.to("https://:8080");
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route")));
camel.start();
bridge.start();
vertx.eventBus().request("camel-route", "hello").onComplete(reply -> {
if (reply.succeeded()) {
Object theResponse = reply.result().body();
} else {
Throwable theCause = reply.cause();
}
});
如果您应用的处理器是阻塞的,您必须将blocking设置为 true
。这可以避免在事件循环线程上执行处理。
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:my-route")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// Do something blocking...
}
})
.to("https://:8080");
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true)));
camel.start();
bridge.start();
vertx.eventBus().request("camel-route", "hello").onComplete(reply -> {
if (reply.succeeded()) {
Object theResponse = reply.result().body();
} else {
Throwable theCause = reply.cause();
}
});
默认情况下,它使用默认的工作线程池,这可以通过 setWorkerExecutor
方法进行自定义。
停止桥接
不要忘记使用 stop
方法停止桥接。stop
方法是异步的。您可以使用返回的 Future 在桥接停止时获得通知。
交换自定义对象
如果您想发送和接收自定义对象,您需要在事件总线上注册一个编解码器
vertx.eventBus().registerDefaultCodec(Person.class, codec);