Vert.x Redis

Vert.x Redis 是一个与 Vert.x 配合使用的 Redis 客户端。

此模块允许在 Redis 中保存、检索、搜索和删除数据。Redis 是一个开源的、高级的键值存储系统。它通常被称为数据结构服务器,因为键可以包含字符串、哈希、列表、集合和有序集合。要使用此模块,您的网络上必须运行一个 Redis 服务器实例。

Redis 具有丰富的 API,可以分为以下几组:

  • 集群 - 与集群管理相关的命令,请注意,使用其中大多数命令需要 Redis 服务器版本 >=3.0.0。

  • 连接 - 允许您切换数据库、连接、断开连接和验证到服务器的命令。

  • 哈希 - 允许对哈希进行操作的命令。

  • HyperLogLog - 用于近似计算多重集中不同元素数量的命令,即 HyperLogLog。

  • 键 - 处理键的命令。

  • 列表 - 处理列表的命令。

  • 发布/订阅 (Pub/Sub) - 创建队列和发布/订阅客户端的命令。

  • 脚本 - 在 Redis 中运行 Lua 脚本的命令。

  • 服务器 - 管理和获取服务器配置的命令。

  • 集合 - 处理无序集合的命令。

  • 有序集合 - 处理有序集合的命令。

  • 字符串 - 处理字符串的命令。

  • 事务 - 处理事务生命周期的命令。

  • 流 - 处理流的命令。

使用 Vert.x Redis

要使用 Vert.x Redis 客户端,请将以下依赖项添加到您的构建描述符的 dependencies 部分。

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-redis-client</artifactId>
  <version>${vertx-redis.version}</version>
</dependency>
  • Gradle(在您的 build.gradle 中)

compile 'io.vertx:vertx-redis-client:${vertx-redis.version}'

连接到 Redis

Redis 客户端可以在 4 种不同模式下运行:

  • 独立客户端(可能是大多数用户所需的)。

  • Sentinel(在 Redis 高可用性模式下工作时)。

  • 集群(在 Redis 集群模式下工作时)。

  • 复制(单分片,一个节点写入,多个节点读取)。

连接模式通过 Redis 接口上的工厂方法选择。无论何种模式,客户端都可以使用 RedisOptions 数据对象进行配置。默认情况下,一些配置值初始化为以下值:

  • netClientOptions:默认值为 TcpKeepAlive: true, TcpNoDelay: true

  • endpoint:默认值为 redis://:6379

  • masterName:默认值为 mymaster

  • role:默认值为 MASTER

  • useReplicas:默认值为 NEVER

要获取连接,请使用以下代码:

Redis.createClient(vertx)
  .connect()
  .onSuccess(conn -> {
    // use the connection
  });

如果配置中包含 password 和/或 select 数据库,则一旦成功建立与服务器的连接,这两个命令将自动执行。

Redis.createClient(
    vertx,
    // The client handles REDIS URLs. The select database as per spec is the
    // numerical path of the URL and the password is the password field of
    // the URL authority
    "redis://:abracadabra@localhost:6379/1")
  .connect()
  .onSuccess(conn -> {
    // use the connection
  });

使用 TLS 连接

您可以通过配置客户端 TCP 选项来使用 TLS 连接到 Redis 服务器,请确保设置:

  • ssl 标志

  • 服务器证书或信任所有标志

  • 如果您想验证服务器身份,将主机名验证算法设置为 "HTTPS";否则设置为 ""

RedisOptions options = new RedisOptions();
options.setConnectionString("redis://:abracadabra@localhost:6379/1");
NetClientOptions tcpOptions = options.getNetClientOptions();
tcpOptions
  .setSsl(true)
  .setTrustOptions(new PemTrustOptions().addCertPath("/path/to/server.crt"))
  // Algo can be the empty string "" or "HTTPS" to verify the server hostname
  .setHostnameVerificationAlgorithm(algo);
Redis.createClient(
    vertx,
    options)
  .connect()
  .onSuccess(conn -> {
    // use the connection
  });
有关 TLS 客户端配置的更多详细信息,请参见此处

连接字符串

客户端将识别遵循此表达式的地址:

redis://[:password@]host[:port][/db-number]

unix://[:password@]/domain/docker.sock[?select=db-number]

当指定密码或数据库时,这些命令总是在连接开始时执行。

异步提供 RedisConnectOptions

Redis.createClient() 方法接受一个包含所有选项的 RedisOptions 对象。这是连接到 Redis 最常用的方式。

然而,也有一个选项可以同步提供 RedisOptions 和异步提供 RedisConnectOptions。有 4 个具有相同参数列表的方法允许这样做:

  • Redis.createStandaloneClient()

  • Redis.createReplicationClient()

  • Redis.createSentinelClient()

  • Redis.createClusterClient()

这些方法接受 Vertx 对象、RedisOptions 对象以及一个 Supplier<Future<RedisConnectOptions>>RedisOptions 对象主要提供静态的 NetClientOptionsPoolOptionsSupplier<Future<RedisConnectOptions>> 在需要创建连接时使用,并提供动态选项。该类型清楚地表明这些动态选项可以异步提供。

您可能需要此功能的主要示例是使用 IAM 身份验证的 Amazon ElastiCache。IAM 身份验证接受短期令牌(其生命周期仅为 15 分钟),因此需要频繁重新生成。

这是 Supplier<Future<RedisConnectOptions>> 的一个实现,它将 RedisConnectOptions 缓存 10 分钟:

public class RedisConnectOptionsSupplier<OPTS extends RedisConnectOptions> implements Supplier<Future<OPTS>> {
  private final Vertx vertx;
  private final RedisOptions options;
  private final Function<RedisOptions, OPTS> creator;
  private final Supplier<String> userName;
  private final Supplier<String> password;
  private final AtomicReference<Future<OPTS>> future;

  public RedisConnectOptionsSupplier(Vertx vertx, RedisOptions options, Function<RedisOptions, OPTS> creator,
    Supplier<String> userName, Supplier<String> password) {
    this.vertx = vertx;
    this.options = options;
    this.creator = creator;
    this.userName = userName;
    this.password = password;
    this.future = new AtomicReference<>();
  }

  @Override
  public Future<OPTS> get() {
    while (true) {
      Future<OPTS> currentFuture = this.future.get();
      if (currentFuture != null) {
        return currentFuture;
      }

      Promise<OPTS> promise = Promise.promise();
      Future<OPTS> future = promise.future();
      if (this.future.compareAndSet(null, future)) {
        OPTS result = creator.apply(options);
        result.setUser(userName.get());
        result.setPassword(password.get()); // TODO run on worker thread
        promise.complete(result);

        // clean up in 10 minutes, to force refresh
        vertx.setTimer(10 * 60 * 1000, ignored -> {
          this.future.set(null);
        });

        return future;
      }
    }
  }
}
package examples;

import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4FamilyHttpSigner;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
import software.amazon.awssdk.http.auth.spi.signer.SignRequest;

import java.net.URI;
import java.time.Duration;

public class IamAuthToken {
  private static final String PROTOCOL = "http://";

  private final String userId;
  private final String replicationGroupId;
  private final String region;
  private final AwsCredentialsProvider credentials;

  public IamAuthToken(String userId, String replicationGroupId, String region, AwsCredentialsProvider credentials) {
    this.userId = userId;
    this.replicationGroupId = replicationGroupId;
    this.region = region;
    this.credentials = credentials;
  }

  public String getUserId() {
    return userId;
  }

  public String getToken() {
    URI uri = URI.create(PROTOCOL + replicationGroupId + "/");
    SdkHttpRequest request = SdkHttpRequest.builder()
      .method(SdkHttpMethod.GET)
      .uri(uri)
      .appendRawQueryParameter("Action", "connect")
      .appendRawQueryParameter("User", userId)
      .build();

    SdkHttpRequest signedRequest = sign(request, credentials.resolveCredentials());
    return signedRequest.getUri().toString().replace(PROTOCOL, "");
  }

  private SdkHttpRequest sign(SdkHttpRequest request, AwsCredentials credentials) {
    SignRequest<AwsCredentials> signRequest = SignRequest.builder(credentials)
      .request(request)
      .putProperty(AwsV4HttpSigner.REGION_NAME, region)
      .putProperty(AwsV4HttpSigner.SERVICE_SIGNING_NAME, "elasticache")
      .putProperty(AwsV4HttpSigner.EXPIRATION_DURATION, Duration.ofSeconds(900))
      .putProperty(AwsV4HttpSigner.AUTH_LOCATION, AwsV4FamilyHttpSigner.AuthLocation.QUERY_STRING)
      .build();
    return AwsV4HttpSigner.create().sign(signRequest).request();
  }
}

这个辅助类可以这样实例化:

AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.builder()
  .asyncCredentialUpdateEnabled(true)
  .build();
IamAuthToken token = new IamAuthToken("my-user", "my-redis", "us-east-1", credentialsProvider);

然后,Redis 客户端可以这样实例化:

Redis client = Redis.createStandaloneClient(vertx, redisOptions, new RedisConnectOptionsSupplier<>(vertx,
  redisOptions, RedisStandaloneConnectOptions::new, token::getUserId, token::getToken));

运行命令

鉴于 Redis 客户端已连接到服务器,现在可以使用此模块执行所有命令。该模块提供了一个简洁的 API 来执行命令,无需手动编写命令本身,例如,如果想获取某个键的值,可以这样做:

RedisAPI redis = RedisAPI.api(client);

redis
  .get("mykey")
  .onSuccess(value -> {
    // do something...
  });

响应对象是一个通用类型,允许将基本的 Redis 类型转换为您的语言类型。例如,如果您的响应类型为 INTEGER,那么您可以将该值作为任何数字基本类型(如 intlong 等)获取。

或者您可以执行更复杂的任务,例如将响应作为迭代器处理:

if (response.type() == ResponseType.MULTI) {
  for (Response item : response) {
    // do something with item...
  }
}

Sentinel 模式

要在 Sentinel 模式(也称为高可用模式)下工作,连接创建方式非常相似:

Redis.createClient(
    vertx,
    new RedisOptions()
      .setType(RedisClientType.SENTINEL)
      .addConnectionString("redis://127.0.0.1:5000")
      .addConnectionString("redis://127.0.0.1:5001")
      .addConnectionString("redis://127.0.0.1:5002")
      .setMasterName("sentinel7000")
      .setRole(RedisRole.MASTER))
  .connect()
  .onSuccess(conn -> {
    conn.send(Request.cmd(Command.INFO))
      .onSuccess(info -> {
        // do something...
      });
  });

这里的连接字符串指向 sentinel 节点,这些节点用于发现实际的主节点和副本节点。

需要注意的是,在此模式下,当选定角色为 MASTER(这是默认值)且启用自动故障转移 (RedisOptions.setAutoFailover(true)) 时,会有一个额外的连接到其中一个 sentinel 节点,用于监听故障转移事件。当 sentinel 通知有新的主节点被选出时,所有客户端将关闭与旧主节点的连接,并透明地重新连接到新主节点。

请注意,在旧主节点发生故障和新主节点被选举出来之间会有一段短暂的时间,在此期间,现有连接将暂时无法执行所有操作。新主节点选举成功后,连接将自动切换到新主节点并重新开始工作。

集群模式

要在集群模式下工作,连接创建方式非常相似:

final RedisOptions options = new RedisOptions()
  .setType(RedisClientType.CLUSTER)
  .addConnectionString("redis://127.0.0.1:7000")
  .addConnectionString("redis://127.0.0.1:7001")
  .addConnectionString("redis://127.0.0.1:7002")
  .addConnectionString("redis://127.0.0.1:7003")
  .addConnectionString("redis://127.0.0.1:7004")
  .addConnectionString("redis://127.0.0.1:7005");

在这种情况下,配置需要已知一个或多个集群成员。此列表将用于向集群查询当前配置,这意味着如果列出的任何成员不可用,它将被跳过。

在集群模式下,每个节点都会建立连接,执行命令时需要特别小心。建议阅读 Redis 手册以了解集群的工作原理。在此模式下运行的客户端将尽力识别执行命令使用的槽位,以便在正确的节点上执行。在某些情况下可能无法识别,此时,作为一种尽力而为的措施,命令将在随机节点上运行。

为了知道哪个 Redis 节点持有哪个槽位,集群 Redis 客户端会维护一个哈希槽分配的缓存。当缓存为空时,第一次尝试获取连接将执行 CLUSTER SLOTS。该缓存有一个可配置的 TTL(生存时间),默认为 1 秒。每当客户端执行的任何命令收到 MOVED 重定向时,缓存也会被清除。

集群工具

RedisCluster 类包含少量在 Redis 集群中有用的方法。要创建实例,请使用 Redis 对象或 RedisConnection 对象调用 create()。如果您使用非集群的 Redis / RedisConnection 调用 create(),则会抛出异常。

RedisCluster 提供的方法有:

  • onAllNodes(Request):针对集群中的所有节点运行请求。返回一个 Future,该 Future 在完成时包含一个响应列表(每个节点一个),或者在其中一个操作失败时返回失败。请注意,如果发生故障,无法保证请求在其他 Redis 集群节点上是否成功执行。也不保证结果顺序。

  • onAllMasterNodes(Request):针对集群中的所有 节点运行请求。返回一个 Future,该 Future 在完成时包含一个响应列表(每个主节点一个),或者在其中一个操作失败时返回失败。请注意,如果发生故障,无法保证请求在其他 Redis 集群主节点上是否成功执行。也不保证结果顺序。

  • groupByNodes(List<Request>):将请求分组到 RequestGrouping 中,其中包含:

    • 带键 请求:包含键的请求,因此可以确定应将其发送到哪个主节点;keyed 集合中每个内部列表中的所有请求都保证发送到同一个 节点;

    • 不带键 请求:不包含键的请求,因此 无法 确定应将其发送到哪个主节点。

    如果任何请求包含属于不同主节点的多个键,则生成的 Future 将失败。

    如果集群客户端是使用 RedisReplicas.SHARERedisReplicas.ALWAYS 创建的,并且命令是单独执行的(使用 RedisConnection.send(),而不是 RedisConnection.batch()),则命令可能会分散到同一主节点的不同副本上。

    请注意,此方法仅在 Redis 集群处于稳定状态时才可靠。在重新分片、故障转移或集群拓扑发生任何变化的情况下,无法保证结果的有效性。

复制模式

使用复制对客户端是透明的。获取连接是一个昂贵的操作。客户端将循环提供的端点直到找到主节点。一旦识别出主节点(所有写入命令将在此节点执行),将尽力连接所有副本节点(读取节点)。

掌握所有节点信息后,客户端现在将过滤对正确节点类型执行读写操作。请注意,useReplica 配置会影响此选择。就像集群一样,当配置声明使用副本节点为 ALWAYS 时,任何读取操作都将在副本节点上执行;SHARED 将在主节点和副本节点之间随机共享读取;而 NEVER 意味着永远不使用副本。

考虑到连接获取成本,此模式的推荐用法是只要应用程序可能需要连接就重用它。

Redis.createClient(
    vertx,
    new RedisOptions()
      .setType(RedisClientType.REPLICATION)
      .addConnectionString("redis://:7000")
      .setMaxPoolSize(4)
      .setMaxPoolWaiting(16))
  .connect()
  .onSuccess(conn -> {
    // this is a replication client,
    // write operations will end up on the master node
    conn.send(Request.cmd(Command.SET).arg("key").arg("value"));
    // and read operations may end up on the replica nodes
    // (depending on configuration and their availability)
    conn.send(Request.cmd(Command.GET).arg("key"));
  });

静态拓扑

复制模式允许静态配置多节点拓扑。在静态拓扑中,配置中的第一个节点被假定为 节点,而其余节点被假定为 副本 节点。节点未经验证;应用程序开发人员有责任确保静态配置正确。

为此:

  • 重复调用 RedisOptions.addConnectionString() 来配置静态拓扑(第一次调用配置主节点,后续调用配置副本节点),并且

  • 调用 RedisOptions.setTopology(RedisTopology.STATIC)

Redis.createClient(
    vertx,
    new RedisOptions()
      .setType(RedisClientType.REPLICATION)
      .setTopology(RedisTopology.STATIC)
      .addConnectionString("redis://:7000")
      .addConnectionString("redis://:7004")
      .setMaxPoolSize(4)
      .setMaxPoolWaiting(16))
  .connect()
  .onSuccess(conn -> {
    // this is a replication client,
    // write operations will end up on the master node
    conn.send(Request.cmd(Command.SET).arg("key").arg("value"));
    // and read operations may end up on the replica nodes
    // (depending on configuration and their availability)
    conn.send(Request.cmd(Command.GET).arg("key"));
  });

请注意,自动发现拓扑通常是首选。静态配置应仅在必要时使用。一个这样的例子是 Amazon Elasticache for Redis (集群模式禁用),其中:

  • 主节点应设置为 主端点,并且

  • 一个副本节点应设置为 读取器端点

请注意,Elasticache for Redis(集群模式禁用)的读取器端点是一个域名,它解析为一个指向其中一个副本的 CNAME 记录。读取器端点解析到的 CNAME 记录会随时间变化。这种基于 DNS 的负载均衡形式与 DNS 解析缓存和连接池配合不佳。因此,一些副本可能会利用不足。Elasticache for Redis(集群模式启用)不会遇到此问题,因为它使用经典的轮询 DNS。

Redis 事务

Vert.x Redis 客户端支持 Redis 事务。您只需发出相应的命令:MULTIEXECDISCARDWATCHUNWATCH。请注意,Redis 中的事务 不是 SQL 数据库中的经典 ACID 事务;它们只是允许将多个命令排队以便稍后执行。

事务必须在单个连接上执行。尝试在无连接模式 (Redis.send()) 下执行事务命令将失败。可以在无连接批处理 (Redis.batch()) 中执行事务,但批处理必须包含整个事务;它不能拆分成多个批次。

建议始终获取一个连接 (Redis.connect()),并在该连接上执行事务的所有命令。

集群中的事务

默认情况下,Redis 集群中的事务是禁用的。尝试执行事务命令会导致失败。

可以在 Redis 集群中启用单节点事务,通过:

options.setClusterTransactions(RedisClusterTransactions.SINGLE_NODE);

在单节点事务中,第一个命令(如果是 WATCH)或第二个命令(如果第一个是 MULTI)决定事务应在哪个节点上执行。连接绑定到所选节点,所有后续命令都发送到该节点,无论哈希槽分配如何。当事务的最终命令(EXECDISCARD)执行时,连接将重置为默认模式,并且不再绑定到单个节点。

如果事务以 WATCH 开头,该命令带有键,因此决定目标节点。如果事务以 MULTI 开头,该命令不会直接发送到 Redis,而是排队直到下一个命令执行。是该命令决定目标节点(因此它应该有键,否则目标节点是随机的)。

请注意,所有这些仅适用于 RedisConnection.send()。命令批处理 (RedisConnection.batch()) 始终在集群中的单个节点上执行,因此没有对事务的特殊支持(它们甚至默认没有被禁用)。同样,批处理必须包含整个事务;它不能拆分成多个批次。

发布/订阅模式

Redis 支持队列和发布/订阅模式,在此模式下,一旦连接调用了订阅者模式,就不能再用于运行除退出该模式的命令之外的其他命令。

要启动订阅者,可以这样做:

Redis.createClient(vertx, new RedisOptions())
  .connect()
  .onSuccess(conn -> {
    conn.handler(message -> {
      // do whatever you need to do with your message
    });
  });

并从代码中的另一个地方将消息发布到队列:

redis.send(Request.cmd(Command.PUBLISH).arg("channel1").arg("Hello World!"))
  .onSuccess(res -> {
    // published!
  });
重要的是要记住,命令 SUBSCRIBEUNSUBSCRIBEPSUBSCRIBEPUNSUBSCRIBEvoid。这意味着成功时的结果是 null,而不是响应实例。所有消息随后通过客户端上的处理程序路由。

EventBusHandler

Vert.x Redis 客户端 4.x 版本会自动将消息转发到 Vert.x 事件总线,除非注册了 RedisConnection.handler()

在 Vert.x Redis 客户端 5.x 版本中,这种自动转发功能已移除。如果您仍然需要它,则必须手动创建 EventBusHandler 实例并使用 RedisConnection.handler() 注册它:

Redis redis = Redis.createClient(vertx);
redis.connect()
  .onSuccess(conn -> {
    conn.handler(EventBusHandler.create(vertx));
    conn.send(Request.cmd(Command.SUBSCRIBE).arg("news"));
  });

EventBusHandler 允许自定义地址前缀,因此如果您想使用 com.example.<the channel>(而不是 io.vertx.redis.<the channel>)的地址,可以使用 EventBusHandler.create(vertx, "com.example")

发送到 Vert.x 事件总线的消息是一个 JsonObject,格式如下:

{
  "status": "OK",
  "type": "message|subscribe|unsubscribe|pmessage|psubscribe|punsubscribe",
  "value": {
    "channel": "<the channel>", (1)
    "message": "<the message>", (2)
    "pattern": "<the pattern>", (3)
    "current": <number of current subscriptions> (4)
  }
}
1 对于 [p]messagesubscribeunsubscribe
2 对于 [p]message
3 对于 pmessagepsubscribepunsubscribe
4 对于 [p]subscribe[p]unsubscribe

事件总线地址对于 messagesubscribeunsubscribe 消息是 <prefix>.<the channel>,对于 pmessagepsubscribepunsubscribe 消息是 <prefix>.<the pattern>

跟踪命令

当 Vert.x 启用了跟踪功能时,Redis 客户端可以跟踪命令执行。

客户端报告一个 客户端 Span,包含以下详细信息:

  • 操作名称:Command

  • 标签

    • db.user:数据库用户名,如果已设置

    • db.instance:数据库编号,如果已知(通常为 0

    • db.statement:Redis 命令,不带参数(例如 getset

    • db.typeredis

默认的追踪策略是 PROPAGATE,客户端只会在活跃的追踪中创建跨度。

您可以使用 setTracingPolicy 更改客户端策略,例如您可以设置为 ALWAYS 以始终报告一个 Span。

options.setTracingPolicy(TracingPolicy.ALWAYS);

域套接字

大多数示例显示连接到 TCP 套接字,但是也可以使用 Redis 连接到 UNIX 域套接字。

Redis.createClient(vertx, "unix:///tmp/redis.sock")
  .connect()
  .onSuccess(conn -> {
    // so something...
  });

请注意,高可用(HA)和集群模式下的服务器地址始终报告为 TCP 地址,而不是域套接字。因此,这种组合是不可能的。这不是因为此客户端,而是因为 Redis 的工作方式。

连接池

所有客户端变体都由连接池支持。默认情况下,配置将池大小设置为 1,这意味着它就像一个单一连接一样运行。池有 4 个可调参数:

  • maxPoolSize 池中最大连接数(默认 6

  • maxPoolWaiting 队列中获取连接的最大等待处理程序数(默认 24

  • poolCleanerInterval 连接清理的频率间隔(默认 30 秒

  • poolRecycleTimeout 在池中保留未使用连接的超时时间(默认 3 分钟

连接池非常有用,可以避免自定义连接管理,例如您可以直接使用:

Redis.createClient(vertx, "redis://:7006")
  .send(Request.cmd(Command.PING))
  .onSuccess(res -> {
    // Should have received a pong...
  });

重要的是要注意,没有连接被获取或返回,这一切都由连接池处理。然而,当超过 1 个并发请求尝试从连接池获取连接时,可能会出现一些可伸缩性问题;为了解决这个问题,我们需要调整连接池。常见的配置是将连接池的最大大小设置为可用 CPU 核心数,并允许请求从连接池排队获取连接:

Redis.createClient(
    vertx,
    new RedisOptions()
      .setConnectionString("redis://:7006")
      // allow at max 8 connections to redis
      .setMaxPoolSize(8)
      // allow 32 connection requests to queue waiting
      // for a connection to be available.
      .setMaxWaitingHandlers(32))
  .send(Request.cmd(Command.PING))
  .onSuccess(res -> {
    // Should have received a pong...
  });
连接池与 SUBSCRIBEUNSUBSCRIBEPSUBSCRIBEPUNSUBSCRIBE 不兼容,因为这些命令会改变连接的操作方式,导致连接无法重用。

错误时重新连接的实现

尽管连接池非常有用,但为了性能,连接不应自动管理,而应由您控制。在这种情况下,您需要处理连接恢复、错误处理和重新连接。

典型的场景是用户希望在发生错误时重新连接到服务器。自动重新连接不是 Redis 客户端的一部分,因为它会强制执行可能不符合用户预期的行为,例如:

  1. 当前正在进行的请求应该如何处理?

  2. 是否应该调用异常处理程序?

  3. 如果重试也失败了怎么办?

  4. 应该恢复之前的状态(数据库、身份验证、订阅)吗?

  5. 等等…​

为了给用户提供完全的灵活性,此决策不应由客户端执行。但是,可以按如下方式实现一个简单的带指数退避超时的重新连接:

class RedisVerticle extends VerticleBase {

  private static final int MAX_RECONNECT_RETRIES = 16;

  private final RedisOptions options = new RedisOptions();
  private Redis redis;
  private RedisConnection client;
  private final AtomicBoolean CONNECTING = new AtomicBoolean();

  @Override
  public Future<?> start() {
    return createRedisClient()
      .onSuccess(conn -> {
        // connected to redis!
      });
  }

  /**
   * Will create a redis client and setup a reconnect handler when there is
   * an exception in the connection.
   */
  private Future<RedisConnection> createRedisClient() {
    Promise<RedisConnection> promise = Promise.promise();

    // make sure to invalidate old connection if present
    if (redis != null) {
      redis.close();;
    }

    if (CONNECTING.compareAndSet(false, true)) {
      redis = Redis.createClient(vertx, options);
      redis
        .connect()
        .onSuccess(conn -> {
          client = conn;

          // make sure the client is reconnected on error
          // eg, the underlying TCP connection is closed but the client side doesn't know it yet
          //     the client tries to use the staled connection to talk to server. An exceptions will be raised
          conn.exceptionHandler(e -> {
            attemptReconnect(0);
          });

          // make sure the client is reconnected on connection close
          // eg, the underlying TCP connection is closed with normal 4-Way-Handshake
          //     this handler will be notified instantly
          conn.endHandler(placeHolder -> {
            attemptReconnect(0);
          });

          // allow further processing
          promise.complete(conn);
          CONNECTING.set(false);
        }).onFailure(t -> {
          promise.fail(t);
          CONNECTING.set(false);
        });
    } else {
      promise.complete();
    }

    return promise.future();
  }

  /**
   * Attempt to reconnect up to MAX_RECONNECT_RETRIES
   */
  private void attemptReconnect(int retry) {
    if (retry > MAX_RECONNECT_RETRIES) {
      // we should stop now, as there's nothing we can do.
      CONNECTING.set(false);
    } else {
      // retry with backoff up to 10240 ms
      long backoff = (long) (Math.pow(2, Math.min(retry, 10)) * 10);

      vertx.setTimer(backoff, timer -> {
        createRedisClient()
          .onFailure(t -> attemptReconnect(retry + 1));
      });
    }
  }
}

在此示例中,客户端对象将在重新连接时被替换,并且应用程序将以最大 1280ms 的退避时间重试最多 16 次。通过丢弃旧客户端,我们确保所有旧的正在进行的响应都将丢失,所有新的响应都将在新连接上。

重要的是要注意,重新连接将创建一个新的连接对象,因此这些对象引用不应被缓存,而应每次都进行评估。

协议解析器

此客户端支持 RESP2RESP3 协议。默认情况下,客户端在连接握手时尝试协商对 RESP3 的支持。

可以使用 setPreferredProtocolVersion 方法选择首选版本,RESP2RESP3

options.setPreferredProtocolVersion(ProtocolVersion.RESP2);

解析器内部从服务器接收到的所有数据块创建一个“无限”可读缓冲区,以避免在内存收集方面产生太多垃圾,可在 JVM 启动时配置一个可调的水印值。系统属性 io.vertx.redis.parser.watermark 定义了在丢弃之前此可读缓冲区中保留的数据量。默认情况下,此值为 16 KB。这意味着每个与服务器的连接将至少使用此内存量。由于客户端以管道模式工作,保持连接数较低可获得最佳结果,这意味着将使用 16 KB * nconn 的内存。如果应用程序需要大量连接,则建议将水印值减小到更小的值甚至完全禁用它。