Vert.x Consul 客户端

Consul 是一种用于在基础设施中发现和配置服务的工具。Vert.x 客户端允许应用程序通过阻塞和非阻塞 HTTP API 与 Consul 系统进行交互。

使用 Vert.x Consul 客户端

要使用此项目,请将以下依赖项添加到您的构建描述符的依赖项部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-consul-client</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile 'io.vertx:vertx-consul-client:5.0.1'

创建客户端

只需使用工厂方法

ConsulClient client = ConsulClient.create(vertx);

客户端也可以通过选项进行配置。

ConsulClientOptions options = new ConsulClientOptions()
  .setHost("consul.example.com");

ConsulClient client = ConsulClient.create(vertx, options);

Consul 客户端支持以下配置

host

Consul 主机。默认为 localhost

port

Consul HTTP API 端口。默认为 8500

timeout

设置超时时间(毫秒),在此期间如果请求未返回任何数据,则会向处理器传递失败信息,并且请求将被关闭。

aclToken

ACL 令牌。如果提供,客户端将在向 Consul 发送请求时通过提供“?token”查询参数来使用此令牌。如果未提供,则使用空令牌,该令牌映射到“匿名” ACL 策略。

dc

数据中心名称。如果提供,客户端将在向 Consul 发送请求时通过提供“?dc”查询参数来使用它。如果未提供,则查询 Consul 代理的数据中心。

ConsulClient 选项继承自 vertx-web-client 模块的 WebClientOptions,因此有很多设置可用。请参阅文档。

使用 API

客户端 API 由 ConsulClient 表示。该 API 与 Consul API 文档中描述的 Consul HTTP API 非常相似。

阻塞查询

某些端点支持“阻塞查询”功能。阻塞查询用于使用长轮询等待潜在的更改。任何支持阻塞的端点都提供一个唯一标识符(索引),表示请求资源的当前状态。以下配置用于执行阻塞查询

index

值,表示客户端希望等待该索引之后的任何更改。

wait

参数,指定阻塞请求的最大持续时间。此限制为 10 分钟。

BlockingQueryOptions opts = new BlockingQueryOptions()
  .setIndex(lastIndex)
  .setWait("1m");

一个重要的注意事项是,阻塞请求的返回**不保证**有更改。可能是达到了超时,或者存在不影响查询结果的幂等写入。

键/值存储

KV 端点用于访问 Consul 的简单键/值存储,可用于存储服务配置或其他元数据。支持以下端点

  • 用于管理单个键的更新、单个键或键前缀的删除,以及单个键或键前缀的获取

  • 用于在单个原子事务中管理多个键的更新或获取

从存储中获取键值对

Consul 客户端可以返回某个键的值

consulClient.getValue("key").onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("retrieved value: " + res.result().getValue());
    System.out.println("modify index: " + res.result().getModifyIndex());
  } else {
    res.cause().printStackTrace();
  }
});

…​或者它可以返回所有具有给定前缀的键值对

consulClient.getValues("prefix").onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("modify index: " + res.result().getIndex());
    for (KeyValue kv : res.result().getList()) {
      System.out.println("retrieved value: " + kv.getValue());
    }
  } else {
    res.cause().printStackTrace();
  }
});

返回的键值对象包含以下字段(请参阅文档

createIndex

表示条目创建时间的内部索引值。

modifyIndex

最后修改此键的索引

lockIndex

此键在锁中成功获取的次数

key

flags

附加到此条目的标志。客户端可以根据其应用程序的需要选择使用此标志

value

session

拥有锁的会话

修改索引可用于阻塞查询

BlockingQueryOptions opts = new BlockingQueryOptions()
  .setIndex(modifyIndex)
  .setWait("1m");

consulClient.getValueWithOptions("key", opts).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("retrieved value: " + res.result().getValue());
    System.out.println("new modify index: " + res.result().getModifyIndex());
  } else {
    res.cause().printStackTrace();
  }
});

将键值对放入存储

consulClient.putValue("key", "value").onComplete(res -> {
  if (res.succeeded()) {
    String opResult = res.result() ? "success" : "fail";
    System.out.println("result of the operation: " + opResult);
  } else {
    res.cause().printStackTrace();
  }
});

也接受带选项的 Put 请求

KeyValueOptions opts = new KeyValueOptions()
  .setFlags(42)
  .setCasIndex(modifyIndex)
  .setAcquireSession("acquireSessionID")
  .setReleaseSession("releaseSessionID");

consulClient.putValueWithOptions("key", "value", opts).onComplete(res -> {
  if (res.succeeded()) {
    String opResult = res.result() ? "success" : "fail";
    System.out.println("result of the operation: " + opResult);
  } else {
    res.cause().printStackTrace();
  }
});

可与 PUT 请求一起使用的查询选项列表

flags

这可用于指定介于 0264-1 之间的无符号值。客户端可以根据其应用程序的需要选择使用此值。

casIndex

此标志用于将 PUT 转换为 Check-And-Set 操作。这作为更复杂同步原语的构建块非常有用。如果索引为 0,Consul 将仅在键不存在时才放入该键。如果索引为非零,则仅当索引与该键的 ModifyIndex 匹配时才设置该键。

acquireSession

此标志用于将 PUT 转换为锁获取操作。这很有用,因为它允许在 Consul 之上构建领导者选举。如果未持有锁且会话有效,则此操作将递增 LockIndex 并设置键的 Session 值,此外还会更新键内容。键不需要存在即可被获取。如果锁已被给定会话持有,则 LockIndex 不会递增,但会更新键内容。这允许当前锁持有者更新键内容,而无需放弃锁并重新获取它。

releaseSession

此标志用于将 PUT 转换为锁释放操作。与 acquireSession 配对时这很有用,因为它允许客户端释放锁。这将使 LockIndex 保持不变,但会清除键关联的 Session。键必须由该会话持有才能被解锁。

事务

当连接到 Consul 0.7 及更高版本时,客户端允许在单个原子事务中管理多个键的更新或获取。KV 是唯一可用的操作类型,尽管 Consul 的未来版本中可能会添加其他类型的操作,与键/值操作混合使用(请参阅文档)。

TxnRequest request = new TxnRequest()
  .addOperation(new TxnKVOperation().setKey("key1").setValue("value1").setType(TxnKVVerb.SET))
  .addOperation(new TxnKVOperation().setKey("key2").setValue("value2").setType(TxnKVVerb.SET));

consulClient.transaction(request).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("succeeded results: " + res.result().getResults().size());
    System.out.println("errors: " + res.result().getErrors().size());
  } else {
    res.cause().printStackTrace();
  }
});

删除键值对

最后,Consul 客户端允许从存储中删除键值对

consulClient.deleteValue("key").onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("complete");
  } else {
    res.cause().printStackTrace();
  }
});

…​或所有具有相应键前缀的键值对

consulClient.deleteValues("prefix").onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("complete");
  } else {
    res.cause().printStackTrace();
  }
});

服务

服务发现的主要目标之一是提供可用服务的目录。为此,代理提供了一种简单的服务定义格式,用于声明服务的可用性并可能将其与健康检查关联。

服务注册

服务定义必须包含 name,并且可以选择提供 idtagsaddressportchecks

ServiceOptions opts = new ServiceOptions()
  .setName("serviceName")
  .setId("serviceId")
  .setTags(Arrays.asList("tag1", "tag2"))
  .setCheckOptions(new CheckOptions().setTtl("10s"))
  .setAddress("10.0.0.1")
  .setPort(8048);
name

服务名称

id

如果未提供 id,则将其设置为 name。要求每个节点的所有服务都具有唯一的 ID,因此如果名称可能冲突,则应提供唯一的 ID。

标签

对 Consul 不透明但可用于区分主节点或从节点、不同版本或任何其他服务级别标签的值列表。

address

用于指定服务特定的 IP 地址。默认情况下,使用代理的 IP 地址,因此不需要提供此项。

port

也用于使面向服务的架构配置更简单;通过这种方式,可以发现服务的地址和端口。

checks

关联的健康检查

这些选项用于在目录中注册服务

consulClient.registerService(opts).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Service successfully registered");
  } else {
    res.cause().printStackTrace();
  }

});

服务发现

Consul 客户端允许获取提供服务的节点的实际列表

consulClient.catalogServiceNodes("serviceName").onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("found " + res.result().getList().size() + " services");
    System.out.println("consul state index: " + res.result().getIndex());
    for (Service service : res.result().getList()) {
      System.out.println("Service node: " + service.getNode());
      System.out.println("Service address: " + service.getAddress());
      System.out.println("Service port: " + service.getPort());
    }
  } else {
    res.cause().printStackTrace();
  }
});

可以获取此列表以及关联健康检查的状态。结果可以按检查状态进行筛选。

consulClient.healthServiceNodes("serviceName", passingOnly).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("found " + res.result().getList().size() + " services");
    System.out.println("consul state index: " + res.result().getIndex());
    for (ServiceEntry entry : res.result().getList()) {
      System.out.println("Service node: " + entry.getNode());
      System.out.println("Service address: " + entry.getService().getAddress());
      System.out.println("Service port: " + entry.getService().getPort());
    }
  } else {
    res.cause().printStackTrace();
  }
});

服务查询还有其他参数

ServiceQueryOptions queryOpts = new ServiceQueryOptions()
  .setTag("tag1")
  .setNear("_agent")
  .setBlockingOptions(new BlockingQueryOptions().setIndex(lastIndex));
tag

默认情况下,返回所有匹配服务的节点。可以使用 tag 查询参数按标签筛选列表

near

添加可选的 near 参数(带节点名称)将根据从该节点估算的往返时间按升序对节点列表进行排序。传递 near=_agent 将使用代理节点进行排序。

blockingOptions

阻塞查询选项

那么请求应该看起来像

consulClient.healthServiceNodesWithOptions("serviceName", passingOnly, queryOpts).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("found " + res.result().getList().size() + " services");
  } else {
    res.cause().printStackTrace();
  }

});

注销服务

服务可以通过其 ID 注销

consulClient.deregisterService("serviceId").onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Service successfully deregistered");
  } else {
    res.cause().printStackTrace();
  }
});

健康检查

代理的主要职责之一是管理系统级和应用程序级健康检查。如果健康检查与某个服务关联,则被认为是应用程序级检查。如果未与服务关联,则该检查监视整个节点的健康状况。

CheckOptions opts = new CheckOptions()
  .setTcp("localhost:4848")
  .setInterval("1s");

Consul 客户端支持的检查选项列表是

id

检查 ID

name

检查名称

script

检查脚本的本地路径。您还应该设置检查间隔

http

要检查的 HTTP 地址。您还应该设置检查间隔

ttl

检查的生存时间

tcp

要检查的 TCP 地址。您还应该设置检查间隔

interval

检查间隔,使用 Go 的时间格式,即一系列十进制数字,每个数字都有可选的小数和单位后缀,例如“300ms”、“-1.5h”或“2h45m”。有效的时间单位是“ns”、“us”(或“µs”)、“ms”、“s”、“m”、“h”

notes

检查备注

serviceId

将注册的检查与代理提供的现有服务关联的服务 ID。

deregisterAfter

注销超时。这是一个可选字段,其超时格式与 Interval 和 TTL 相同。如果检查与服务关联,并且其关键状态持续时间超过此配置值,则其关联服务(及其所有关联检查)将自动注销。最小超时时间为 1 分钟,并且清理关键服务的进程每 30 秒运行一次,因此触发注销可能比配置的超时时间稍长。通常,应将此超时配置为远长于给定服务任何预期可恢复中断的时间。

status

检查状态,用于指定健康检查的初始状态

Name 字段是强制性的,ScriptHTTPTCPTTL 之一也是强制性的。ScriptTCPHTTP 也要求设置 Interval。如果未提供 ID,则将其设置为 Name。每个代理不能有重复的 ID 条目,因此可能需要提供一个 ID。

consulClient.registerCheck(opts).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("check successfully registered");
  } else {
    res.cause().printStackTrace();
  }
});

事件

Consul 提供了一种机制,可以将自定义用户事件发送到整个数据中心。这些事件对 Consul 是不透明的,但它们可用于构建脚本基础设施,以执行自动化部署、重启服务或执行任何其他编排操作。

要发送用户事件,只需要其名称

consulClient.fireEvent("eventName").onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Event sent");
    System.out.println("id: " + res.result().getId());
  } else {
    res.cause().printStackTrace();
  }
});

还可以指定其他选项。

node

用于按节点名称筛选接收者的正则表达式

service

用于按服务筛选接收者的正则表达式

tag

用于按标签筛选接收者的正则表达式

payload

事件的可选主体。主体内容对 Consul 是不透明的,并成为事件的“负载”

EventOptions opts = new EventOptions()
  .setTag("tag")
  .setPayload("message");

consulClient.fireEventWithOptions("eventName", opts).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Event sent");
    System.out.println("id: " + res.result().getId());
  } else {
    res.cause().printStackTrace();
  }
});

Consul 客户端支持查询以获取代理已知的所有最新事件。事件使用 gossip 协议广播,因此它们没有全局顺序,也不保证传递。代理仅缓冲最新的条目。当前缓冲区大小为 256,但此值将来可能会更改。

consulClient.listEvents().onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Consul index: " + res.result().getIndex());
    for(Event event: res.result().getList()) {
      System.out.println("Event id: " + event.getId());
      System.out.println("Event name: " + event.getName());
      System.out.println("Event payload: " + event.getPayload());
    }
  } else {
    res.cause().printStackTrace();
  }
});

Consul 索引可用于准备阻塞请求

EventListOptions opts = new EventListOptions()
  .setName("eventName")
  .setBlockingOptions(new BlockingQueryOptions().setIndex(lastIndex));

consulClient.listEventsWithOptions(opts).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Consul index: " + res.result().getIndex());
    for(Event event: res.result().getList()) {
      System.out.println("Event id: " + event.getId());
    }
  } else {
    res.cause().printStackTrace();
  }
});

会话

Consul 提供了一个会话机制,可用于构建分布式锁。会话充当节点、健康检查和键/值数据之间的绑定层。构建会话时,可以提供节点名称、健康检查列表、行为、TTL 和锁延迟。

SessionOptions opts = new SessionOptions()
  .setNode("nodeId")
  .setBehavior(SessionBehavior.RELEASE);
lockDelay

可以指定为持续时间字符串,使用 's' 作为秒的后缀。默认值为 '15s'。

name

可用于为会话提供一个人类可读的名称。

node

如果指定,必须引用已注册的节点。默认情况下,使用代理自身的节点名称。

checks

用于提供关联健康检查的列表。强烈建议,如果您覆盖此列表,请包含默认的 serfHealth

behavior

可以设置为 releasedelete。这控制了会话失效时的行为。默认情况下,这是 release,导致释放任何持有的锁。将其更改为 delete 会导致删除任何持有的锁。delete 对于创建临时键/值条目很有用。

ttl

是一个持续时间字符串,与 LockDelay 类似,它可以使用 s 作为秒的后缀。如果指定,目前必须介于 10s 和 86400s 之间。如果提供,会话如果在 TTL 到期前未续订,则会失效。

有关完整信息,请参阅 Consul 会话内部

新构建的会话提供了一个命名 ID,可用于识别它。此 ID 可与 KV 存储一起使用以获取锁:互斥的建议机制。

consulClient.createSessionWithOptions(opts).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Session successfully created");
    System.out.println("id: " + res.result());
  } else {
    res.cause().printStackTrace();
  }
});

也可以销毁它

consulClient.destroySession(sessionId).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Session successfully destroyed");
  } else {
    res.cause().printStackTrace();
  }
});

列出属于某个节点的会话

consulClient.listNodeSessions("nodeId").onComplete(res -> {
  if (res.succeeded()) {
    for(Session session: res.result().getList()) {
      System.out.println("Session id: " + session.getId());
      System.out.println("Session node: " + session.getNode());
      System.out.println("Session create index: " + session.getCreateIndex());
    }
  } else {
    res.cause().printStackTrace();
  }
});

所有读取会话端点都支持阻塞查询和所有一致性模式。

BlockingQueryOptions blockingOpts = new BlockingQueryOptions()
  .setIndex(lastIndex);

consulClient.listSessionsWithOptions(blockingOpts).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Found " + res.result().getList().size() + " sessions");
  } else {
    res.cause().printStackTrace();
  }
});

数据中心中的节点

consulClient.catalogNodes().onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("found " + res.result().getList().size() + " nodes");
    System.out.println("consul state index " + res.result().getIndex());
  } else {
    res.cause().printStackTrace();
  }
});

此端点支持阻塞查询和按与指定节点的距离排序

NodeQueryOptions opts = new NodeQueryOptions()
  .setNear("_agent")
  .setBlockingOptions(new BlockingQueryOptions().setIndex(lastIndex));

consulClient.catalogNodesWithOptions(opts).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("found " + res.result().getList().size() + " nodes");
  } else {
    res.cause().printStackTrace();
  }
});

预准备查询

此端点创建、更新、销毁和执行预准备查询。预准备查询允许您注册一个复杂的服务查询,然后通过其 ID 或名称稍后执行它,以获取提供给定服务的健康节点集。这与 Consul 的 DNS 接口结合使用特别有用,因为它允许进行比 DNS 暴露的有限入口点更丰富的查询。

创建预准备查询有许多参数。有关完整详细信息,请参阅文档

dc

指定要查询的数据中心。这将默认为被查询代理的数据中心。这作为 URL 的一部分以查询参数的形式指定。

name

指定一个可选的友好名称,可用于执行查询而不是使用其 ID。

session

指定现有会话的 ID。这提供了一种在给定会话失效时自动删除预准备查询的方法。如果未给出,则在不再需要时必须手动删除预准备查询。

token

指定每次执行查询时使用的 ACL 令牌。这允许权限较低甚至没有 ACL 令牌的客户端执行查询,因此应谨慎使用。令牌本身只能被具有管理令牌的客户端看到。如果 Token 字段留空或省略,则将使用客户端的 ACL 令牌来确定他们是否可以访问被查询的服务。如果客户端未提供 ACL 令牌,则将使用匿名令牌。

service

指定要查询的服务名称。这是必填字段。

failover

包含两个字段,两者都是可选的,用于确定在执行查询时如果本地数据中心没有健康节点可用时会发生什么。它允许以很少的配置使用其他数据中心中的节点。

nearestN

指定查询将根据其使用 WAN gossip 池的网络坐标估算的往返时间,转发到最多 NearestN 个其他数据中心。使用处理查询的服务器到远程数据中心服务器的平均往返时间来确定优先级。

datacenters

指定一个固定的远程数据中心列表,如果没有本地数据中心中的健康节点,则将查询转发到这些数据中心。数据中心按列表中给定的顺序进行查询。如果此选项与 NearestN 结合使用,则将首先执行 NearestN 查询,然后是 Datacenters 给出的列表。在故障转移期间,给定数据中心只会查询一次,即使它同时被 NearestN 和 Datacenters 选中。

onlyPassing

指定查询的健康检查筛选行为。如果设置为 false,结果将包括检查处于通过和警告状态的节点。如果设置为 true,则仅返回检查处于通过状态的节点。

标签

指定服务标签列表以筛选查询结果。要使服务通过标签筛选器,它必须具有所有必需标签,并且没有被排除的标签(以 ! 为前缀)。

nodeMeta

指定用户定义的键/值对列表,将用于筛选查询结果,以查找具有给定元数据值的节点。

dnsTtl

指定通过 DNS 提供查询结果时的 TTL 持续时间。如果指定此项,它将优先于任何 Consul 代理特定的配置。

templateType

是查询类型,必须为 name_prefix_match。这意味着模板将应用于任何名称前缀与模板的 Name 字段匹配的查询查找。在此示例中,对 geo-db 的任何查询都将匹配此查询。查询模板使用最长前缀匹配解析,因此可能存在为特定服务覆盖的高级模板。静态查询始终首先解析,因此它们也可以覆盖模板。

templateRegexp

是一个可选的正则表达式,一旦选择此模板,它将用于从整个名称中提取字段。在此示例中,正则表达式将“-”之后的第一个项作为数据库名称,其余所有内容作为标签。有关此正则表达式的语法,请参阅 RE2 参考。

PreparedQueryDefinition def = new PreparedQueryDefinition()
  .setName("Query name")
  .setService("service-${match(1)}-${match(2)}")
  .setDcs(Arrays.asList("dc1", "dc42"))
  .setTemplateType("name_prefix_match")
  .setTemplateRegexp("^find_(.+?)_(.+?)$");

如果查询成功创建,将提供其 ID

consulClient.createPreparedQuery(def).onComplete(res -> {
  if (res.succeeded()) {
    String queryId = res.result();
    System.out.println("Query created: " + queryId);
  } else {
    res.cause().printStackTrace();
  }
});

预准备查询可以通过其 ID 执行

consulClient.executePreparedQuery(id).onComplete(res -> {
  if (res.succeeded()) {
    PreparedQueryExecuteResponse response = res.result();
    System.out.println("Found " + response.getNodes().size() + " nodes");
  } else {
    res.cause().printStackTrace();
  }
});

或通过必须匹配模板正则表达式的查询字符串

consulClient.executePreparedQuery("find_1_2").onComplete(res -> {
  // matches template regexp "^find_(.+?)_(.+?)$"
  if (res.succeeded()) {
    PreparedQueryExecuteResponse response = res.result();
    System.out.println("Found " + response.getNodes().size() + " nodes");
  } else {
    res.cause().printStackTrace();
  }
});

最后,ConsulClient 允许您修改、获取或删除预准备查询

consulClient.deletePreparedQuery(query).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Query deleted");
  } else {
    res.cause().printStackTrace();
  }
});

观察

观察是一种指定数据视图(例如节点列表、KV 对、健康检查)的方式,该视图受到监控以获取更新。当检测到更新时,将调用带有 WatchResultHandler。例如,您可以观察健康检查的状态并在检查处于关键状态时发出通知。

Watch.key("foo/bar", vertx)
  .setHandler(res -> {
    if (res.succeeded()) {
      System.out.println("value: " + res.nextResult().getValue());
    } else {
      res.cause().printStackTrace();
    }
  })
  .start();