<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-cassandra-client</artifactId>
<version>5.0.1</version>
</dependency>
Vert.x 的 Cassandra 客户端
一个 Vert.x 客户端,允许应用程序与 Apache Cassandra 服务进行交互。
快速入门
要使用此模块,请将以下内容添加到您的 Maven POM 文件的 依赖项 部分
或者,如果您使用 Gradle
compile 'io.vertx:vertx-cassandra-client:5.0.1'
创建客户端
客户端选项
Cassandra 是一个分布式系统,它可以有许多节点。要连接到 Cassandra,您需要在创建 CassandraClientOptions
对象时指定一些集群节点的地址。
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.address", 9142)
.addContactPoint("node2.address", 9142)
.addContactPoint("node3.address", 9142);
CassandraClient client = CassandraClient.create(vertx, options);
默认情况下,Vert.x 的 Cassandra 客户端连接到本地机器的 9042
端口,并且不绑定到任何特定的键空间。但是您可以设置其中一个或两个选项。
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("localhost", 9142)
.setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.create(vertx, options);
出于微调目的,CassandraClientOptions 暴露了一个 com.datastax.driver.core.Cluster.Builder 实例。 |
共享客户端
如果您部署了多个 verticle 实例或有不同的 verticle 与同一个数据库交互,建议创建共享客户端。
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.address", 9142)
.addContactPoint("node2.address", 9142)
.addContactPoint("node3.address", 9142)
.setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.createShared(vertx, "sharedClientName", options);
同名的共享客户端将使用单个底层 com.datastax.driver.core.Session
。
使用 API
客户端 API 由 CassandraClient
表示。
查询
您可以使用三种不同的方式获取查询结果。
流式处理
当您需要迭代地消费结果时,例如您想要处理每个项目时,流式 API 最为适用。这对于大量行来说效率非常高。
为了给您一些关于如何使用 API 的启发和想法,我们希望您考虑这个示例:
cassandraClient.queryStream("SELECT my_string_col FROM my_keyspace.my_table where my_key = 'my_value'")
.onComplete(queryStream -> {
if (queryStream.succeeded()) {
CassandraRowStream stream = queryStream.result();
// resume stream when queue is ready to accept buffers again
response.drainHandler(v -> stream.resume());
stream.handler(row -> {
String value = row.getString("my_string_col");
response.write(value);
// pause row stream when we buffer queue is full
if (response.writeQueueFull()) {
stream.pause();
}
});
// end request when we reached end of the stream
stream.endHandler(end -> response.end());
} else {
queryStream.cause().printStackTrace();
// response with internal server error if we are not able to execute given query
response
.setStatusCode(500)
.end("Unable to execute the query");
}
});
在此示例中,我们正在执行查询并通过 HTTP 流式传输结果。
批量获取
当您需要同时处理所有行时,应使用此 API。
cassandraClient.executeWithFullFetch("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'")
.onComplete(executeWithFullFetch -> {
if (executeWithFullFetch.succeeded()) {
List<Row> rows = executeWithFullFetch.result();
for (Row row : rows) {
// handle each row here
}
} else {
System.out.println("Unable to execute the query");
executeWithFullFetch.cause().printStackTrace();
}
});
仅当您能够将完整的结果集加载到内存中时,才使用批量获取。 |
收集器查询
您可以将 Java 收集器与查询 API 结合使用
cassandraClient.execute("SELECT * FROM users", listCollector)
.onComplete(ar -> {
if (ar.succeeded()) {
// Get the string created by the collector
String list = ar.result();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
低级获取
此 API 提供了对加载的更大控制,但代价是比流式和批量获取 API 的级别略低。
cassandraClient.execute("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'")
.onComplete(execute -> {
if (execute.succeeded()) {
ResultSet resultSet = execute.result();
if (resultSet.remaining() != 0) {
Row row = resultSet.one();
System.out.println("One row successfully fetched");
} else if (!resultSet.hasMorePages()) {
System.out.println("No pages to fetch");
} else {
resultSet.fetchNextPage().onComplete(fetchMoreResults -> {
if (fetchMoreResults.succeeded()) {
int availableWithoutFetching = resultSet.remaining();
System.out.println("Now we have " + availableWithoutFetching + " rows fetched, but not consumed!");
} else {
System.out.println("Unable to fetch more results");
fetchMoreResults.cause().printStackTrace();
}
});
}
} else {
System.out.println("Unable to execute the query");
execute.cause().printStackTrace();
}
});
预处理查询
出于安全和效率原因,对于您使用多次的所有查询,最好使用预处理语句。
您可以准备一个查询
cassandraClient.prepare("SELECT * FROM my_keyspace.my_table where my_key = ? ")
.onComplete(preparedStatementResult -> {
if (preparedStatementResult.succeeded()) {
System.out.println("The query has successfully been prepared");
PreparedStatement preparedStatement = preparedStatementResult.result();
// now you can use this PreparedStatement object for the next queries
} else {
System.out.println("Unable to prepare the query");
preparedStatementResult.cause().printStackTrace();
}
});
然后将 PreparedStatement
用于所有后续查询。
cassandraClient.execute(preparedStatement.bind("my_value"))
.onComplete(done -> {
ResultSet results = done.result();
// handle results here
});
// Bulk fetching API
cassandraClient.executeWithFullFetch(preparedStatement.bind("my_value"))
.onComplete(done -> {
List<Row> results = done.result();
// handle results here
});
// Streaming API
cassandraClient.queryStream(preparedStatement.bind("my_value"))
.onComplete(done -> {
CassandraRowStream results = done.result();
// handle results here
});
批处理
如果您想一次执行多个查询,可以使用 BatchStatement
:
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.LOGGED)
.add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Pavel')"))
.add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Thomas')"))
.add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Julien')"));
cassandraClient
.execute(batchStatement)
.onComplete(result -> {
if (result.succeeded()) {
System.out.println("The given batch executed successfully");
} else {
System.out.println("Unable to execute the batch");
result.cause().printStackTrace();
}
});
跟踪查询
当 Vert.x 启用了追踪时,Cassandra 客户端可以追踪查询执行。
客户端报告以下客户端跨度
-
Query
操作名称 -
标签
-
peer.address
: 驱动程序已知节点列表,格式为[127_0_0_1:9042,localhost:9042,myhost_mydomain:9042]
-
span.kind
:client
-
db.instance
: 键空间 -
db.statement
: CQL 查询 -
db.type
:cassandra
-
默认的追踪策略是 PROPAGATE
,客户端只会在活跃的追踪中创建跨度。
您可以使用 setTracingPolicy
更改客户端策略。例如,您可以将 ALWAYS
设置为始终报告跨度。
CassandraClientOptions options = new CassandraClientOptions()
.setTracingPolicy(TracingPolicy.ALWAYS);