Vert.x 的 Cassandra 客户端

一个 Vert.x 客户端,允许应用程序与 Apache Cassandra 服务进行交互。

快速入门

要使用此模块,请将以下内容添加到您的 Maven POM 文件的 依赖项 部分

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-cassandra-client</artifactId>
  <version>5.0.1</version>
</dependency>

或者,如果您使用 Gradle

compile 'io.vertx:vertx-cassandra-client:5.0.1'

创建客户端

客户端选项

Cassandra 是一个分布式系统,它可以有许多节点。要连接到 Cassandra,您需要在创建 CassandraClientOptions 对象时指定一些集群节点的地址。

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("node1.address", 9142)
  .addContactPoint("node2.address", 9142)
  .addContactPoint("node3.address", 9142);
CassandraClient client = CassandraClient.create(vertx, options);

默认情况下,Vert.x 的 Cassandra 客户端连接到本地机器的 9042 端口,并且不绑定到任何特定的键空间。但是您可以设置其中一个或两个选项。

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("localhost", 9142)
  .setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.create(vertx, options);
出于微调目的,CassandraClientOptions 暴露了一个 com.datastax.driver.core.Cluster.Builder 实例。

共享客户端

如果您部署了多个 verticle 实例或有不同的 verticle 与同一个数据库交互,建议创建共享客户端。

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("node1.address", 9142)
  .addContactPoint("node2.address", 9142)
  .addContactPoint("node3.address", 9142)
  .setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.createShared(vertx, "sharedClientName", options);

同名的共享客户端将使用单个底层 com.datastax.driver.core.Session

客户端生命周期

客户端创建后,直到执行第一个查询才连接。

如果同名的另一个客户端已经执行了查询,则共享客户端可以在创建后连接。

在 verticle 中创建的客户端在 verticle 卸载时会自动停止。换句话说,您不需要在 verticle 的 stop 方法中调用 close

在所有其他情况下,您必须手动关闭客户端。

当共享客户端关闭时,如果同名的其他客户端仍在运行,则驱动程序会话不会关闭。

使用 API

客户端 API 由 CassandraClient 表示。

查询

您可以使用三种不同的方式获取查询结果。

流式处理

当您需要迭代地消费结果时,例如您想要处理每个项目时,流式 API 最为适用。这对于大量行来说效率非常高。

为了给您一些关于如何使用 API 的启发和想法,我们希望您考虑这个示例:

cassandraClient.queryStream("SELECT my_string_col FROM my_keyspace.my_table where my_key = 'my_value'")
  .onComplete(queryStream -> {
  if (queryStream.succeeded()) {
    CassandraRowStream stream = queryStream.result();

    // resume stream when queue is ready to accept buffers again
    response.drainHandler(v -> stream.resume());

    stream.handler(row -> {
      String value = row.getString("my_string_col");
      response.write(value);

      // pause row stream when we buffer queue is full
      if (response.writeQueueFull()) {
        stream.pause();
      }
    });

    // end request when we reached end of the stream
    stream.endHandler(end -> response.end());

  } else {
    queryStream.cause().printStackTrace();
    // response with internal server error if we are not able to execute given query
    response
      .setStatusCode(500)
      .end("Unable to execute the query");
  }
});

在此示例中,我们正在执行查询并通过 HTTP 流式传输结果。

批量获取

当您需要同时处理所有行时,应使用此 API。

cassandraClient.executeWithFullFetch("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'")
  .onComplete(executeWithFullFetch -> {
  if (executeWithFullFetch.succeeded()) {
    List<Row> rows = executeWithFullFetch.result();
    for (Row row : rows) {
      // handle each row here
    }
  } else {
    System.out.println("Unable to execute the query");
    executeWithFullFetch.cause().printStackTrace();
  }
});
仅当您能够将完整的结果集加载到内存中时,才使用批量获取。

收集器查询

您可以将 Java 收集器与查询 API 结合使用

cassandraClient.execute("SELECT * FROM users", listCollector)
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Get the string created by the collector
      String list = ar.result();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

低级获取

此 API 提供了对加载的更大控制,但代价是比流式和批量获取 API 的级别略低。

cassandraClient.execute("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'")
  .onComplete(execute -> {
  if (execute.succeeded()) {
    ResultSet resultSet = execute.result();

    if (resultSet.remaining() != 0) {
      Row row = resultSet.one();
      System.out.println("One row successfully fetched");
    } else if (!resultSet.hasMorePages()) {
      System.out.println("No pages to fetch");
    } else {
      resultSet.fetchNextPage().onComplete(fetchMoreResults -> {
        if (fetchMoreResults.succeeded()) {
          int availableWithoutFetching = resultSet.remaining();
          System.out.println("Now we have " + availableWithoutFetching + " rows fetched, but not consumed!");
        } else {
          System.out.println("Unable to fetch more results");
          fetchMoreResults.cause().printStackTrace();
        }
      });
    }
  } else {
    System.out.println("Unable to execute the query");
    execute.cause().printStackTrace();
  }
});

预处理查询

出于安全和效率原因,对于您使用多次的所有查询,最好使用预处理语句。

您可以准备一个查询

cassandraClient.prepare("SELECT * FROM my_keyspace.my_table where my_key = ? ")
  .onComplete(preparedStatementResult -> {
    if (preparedStatementResult.succeeded()) {
      System.out.println("The query has successfully been prepared");
      PreparedStatement preparedStatement = preparedStatementResult.result();
      // now you can use this PreparedStatement object for the next queries
    } else {
      System.out.println("Unable to prepare the query");
      preparedStatementResult.cause().printStackTrace();
    }
  });

然后将 PreparedStatement 用于所有后续查询。

cassandraClient.execute(preparedStatement.bind("my_value"))
  .onComplete(done -> {
    ResultSet results = done.result();
    // handle results here
  });

// Bulk fetching API
cassandraClient.executeWithFullFetch(preparedStatement.bind("my_value"))
  .onComplete(done -> {
    List<Row> results = done.result();
    // handle results here
  });

// Streaming API
cassandraClient.queryStream(preparedStatement.bind("my_value"))
  .onComplete(done -> {
    CassandraRowStream results = done.result();
    // handle results here
  });

批处理

如果您想一次执行多个查询,可以使用 BatchStatement

BatchStatement batchStatement = BatchStatement.newInstance(BatchType.LOGGED)
  .add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Pavel')"))
  .add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Thomas')"))
  .add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Julien')"));

cassandraClient
  .execute(batchStatement)
  .onComplete(result -> {
    if (result.succeeded()) {
      System.out.println("The given batch executed successfully");
    } else {
      System.out.println("Unable to execute the batch");
      result.cause().printStackTrace();
    }
  });

跟踪查询

当 Vert.x 启用了追踪时,Cassandra 客户端可以追踪查询执行。

客户端报告以下客户端跨度

  • Query 操作名称

  • 标签

    • peer.address: 驱动程序已知节点列表,格式为 [127_0_0_1:9042,localhost:9042,myhost_mydomain:9042]

    • span.kind: client

    • db.instance: 键空间

    • db.statement: CQL 查询

    • db.type: cassandra

默认的追踪策略是 PROPAGATE,客户端只会在活跃的追踪中创建跨度。

您可以使用 setTracingPolicy 更改客户端策略。例如,您可以将 ALWAYS 设置为始终报告跨度。

CassandraClientOptions options = new CassandraClientOptions()
  .setTracingPolicy(TracingPolicy.ALWAYS);