Vert.x 熔断器

Vert.x 熔断器是 Vert.x 中熔断器模式的实现。它跟踪最近的故障次数,并在达到阈值时阻止进一步的执行。可选地,会执行一个备用方案。

支持的故障类型有

  • 您的代码在 Future 中报告的故障

  • 您的代码抛出的异常

  • 未完成的 Future (超时)

由熔断器保护的操作旨在是非阻塞和异步的,以便从 Vert.x 执行模型中受益。

使用 Vert.x 熔断器

要使用 Vert.x 熔断器,请将以下依赖项添加到您的构建描述符的依赖项部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-circuit-breaker</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile 'io.vertx:vertx-circuit-breaker:5.0.1'

使用熔断器

要使用熔断器,您需要

  1. 创建一个熔断器,包含您想要的配置(超时、故障阈值)

  2. 使用熔断器执行一些代码

重要提示:不要在每次调用时重新创建熔断器。熔断器是一个有状态实体。建议将熔断器实例存储在字段中。

这是一个示例

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
  new CircuitBreakerOptions()
    .setMaxFailures(5) // number of failures before opening the circuit breaker
    .setTimeout(2000) // considered a failure if the operation does not succeed in time
    .setFallbackOnFailure(true) // call the fallback on failure
    .setResetTimeout(10000) // time spent in open state before attempting to retry
);

// ---
// Store the circuit breaker in a field and access it as follows
// ---

breaker.execute(promise -> {
  // some code executing with the circuit breaker
  // the code reports failures or success on the given promise
  // if this promise is marked as failed, the circuit breaker
  // increases the number of failures
}).onComplete(ar -> {
  // Get the operation result.
});

执行块接收一个 Promise 对象作为参数,以表示操作的成功或失败以及结果。在以下示例中,结果是 REST 端点调用的输出

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
  new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);

// ---
// Store the circuit breaker in a field and access it as follows
// ---

breaker.<String>execute(promise -> {
  vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
    .compose(req -> req
      .send()
      .compose(resp -> {
        if (resp.statusCode() != 200) {
          return Future.failedFuture("HTTP error");
        } else {
          return resp.body().map(Buffer::toString);
        }
      }))
    .onComplete(promise);
}).onComplete(ar -> {
  // Do something with the result
});

操作的结果通过以下方式提供

  • 调用 execute 方法时返回的 Future

  • 调用 executeAndReport 方法时提供的 Promise

可选地,您可以提供一个备用方案,当熔断器处于打开状态时执行

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
  new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);

// ---
// Store the circuit breaker in a field and access it as follows
// ---

breaker.executeWithFallback(promise -> {
  vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
    .compose(req -> req
      .send()
      .compose(resp -> {
        if (resp.statusCode() != 200) {
          return Future.failedFuture("HTTP error");
        } else {
          return resp.body().map(Buffer::toString);
        }
      }))
    .onComplete(promise);
}, v -> {
  // Executed when the circuit breaker is open
  return "Hello";
}).onComplete(ar -> {
  // Do something with the result
});

当熔断器处于打开状态,或当 isFallbackOnFailure 启用时,将调用备用方案。当设置了备用方案时,整体结果通过调用备用函数获得。备用函数接收一个 Throwable 对象作为参数,并返回预期类型的对象。

备用方案也可以直接在 CircuitBreaker 对象上设置

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
  new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).fallback(v -> {
  // Executed when the circuit breaker is open.
  return "hello";
});

breaker.<String>execute(promise -> {
  vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
    .compose(req -> req
      .send()
      .compose(resp -> {
        if (resp.statusCode() != 200) {
          return Future.failedFuture("HTTP error");
        } else {
          return resp.body().map(Buffer::toString);
        }
      }))
    .onComplete(promise);
});

报告的异常

备用方案接收

重试

您还可以通过 setMaxRetries 指定熔断器在失败前应执行您的代码的次数。如果您将其设置为大于 0 的值,您的代码将在最后一次执行失败之前被执行多次。如果代码在其中一次重试中成功,您的处理器将收到通知,并且不再进行重试。只有当熔断器处于关闭状态时才支持重试。

如果您将 maxRetries 设置为 2,您的操作可能会被调用 3 次:初始尝试和 2 次重试。

默认情况下,重试之间的延迟设置为 0,这意味着重试将立即一个接一个地执行。然而,这会导致被调用服务的负载增加,并可能延迟其恢复。为了缓解这个问题,建议延迟执行重试。

retryPolicy 方法可用于指定重试策略。重试策略是一个函数,它接收操作故障和重试次数作为参数,并返回重试前应延迟的毫秒数。

它允许实现复杂的策略,例如,使用不可用服务发送的 Retry-After 头的值。一些常用策略是开箱即用的:RetryPolicy.constantDelayRetryPolicy.linearDelayRetryPolicy.exponentialDelayWithJitter

下面是一个带有抖动的指数延迟示例

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
  new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000)
).retryPolicy(RetryPolicy.exponentialDelayWithJitter(50, 500));

breaker.<String>execute(promise -> {
  vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
    .compose(req -> req
      .send()
      .compose(resp -> {
        if (resp.statusCode() != 200) {
          return Future.failedFuture("HTTP error");
        } else {
          return resp.body().map(Buffer::toString);
        }
      }))
    .onComplete(promise);
});

失败策略

默认情况下,熔断器的失败策略是如果命令未能成功完成,则报告失败。另外,您可以使用 failurePolicy 配置熔断器的失败策略。这将允许您指定熔断器将 AsyncResult 视为失败的标准。如果您决定覆盖失败策略,请注意这可能会允许在诸如 executeAndReport 等函数中提供失败的结果。

下面是一个使用自定义失败策略的示例。

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx);
breaker.<HttpClientResponse>failurePolicy(ar -> {
  // A failure will be either a failed operation or a response with a status code other than 200
  if (ar.failed()) {
    return true;
  }
  HttpClientResponse resp = ar.result();
  return resp.statusCode() != 200;
});

Future<HttpClientResponse> future = breaker.execute(promise -> {
  vertx.createHttpClient()
    .request(HttpMethod.GET, 8080, "localhost", "/")
    .compose(request -> request.send()
      // Complete when the body is fully received
      .compose(response -> response.body().map(response)))
    .onComplete(promise);
});

回调

您还可以配置当熔断器打开或关闭时调用的回调

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
  new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).openHandler(v -> {
  System.out.println("Circuit breaker opened");
}).closeHandler(v -> {
  System.out.println("Circuit breaker closed");
});

breaker.<String>execute(promise -> {
  vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
    .compose(req -> req
      .send()
      .compose(resp -> {
        if (resp.statusCode() != 200) {
          return Future.failedFuture("HTTP error");
        } else {
          return resp.body().map(Buffer::toString);
        }
      }))
    .onComplete(promise);
});

当熔断器进入半开状态(尝试重置)时,您也可以收到通知。您可以使用 halfOpenHandler 注册此类回调。

事件总线通知

每当熔断器状态改变时,都可以在事件总线上发布一个事件。

要启用此功能,请将 通知地址 设置为非 null 的值

options.setNotificationAddress(CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);

该事件包含熔断器指标。计算这些指标需要将以下依赖项添加到您的构建描述符的依赖项部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>org.hdrhistogram</groupId>
  <artifactId>HdrHistogram</artifactId>
  <version>2.1.12</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

compile 'org.hdrhistogram:HdrHistogram:2.1.12'

启用后,通知默认仅传递给本地消费者。如果通知必须发送到集群中的所有消费者,您可以使用 setNotificationLocalOnly 更改此行为。

每个事件都包含一个 Json 对象,其中包含

  • state:新的熔断器状态(OPENCLOSEDHALF_OPEN

  • name:熔断器的名称

  • failures:故障次数

  • node:节点的标识符(如果 Vert.x 未以集群模式运行,则为 local

  • 指标

半开状态

当熔断器处于 open 状态时,对熔断器的调用会立即失败,而不尝试执行实际操作。在经过适当的时间(由 setResetTimeout 配置)后,熔断器判断操作有成功的机会,因此它进入 half-open 状态。在此状态下,允许对熔断器的下一次调用执行受保护的操作。如果调用成功,熔断器将重置并返回 closed 状态,准备进行更多常规操作。然而,如果这次试探性调用失败,熔断器将返回 open 状态,直到另一个超时过去。

使用 Resilience4j

Resilience4j 是一个流行的库,它实现了常见的容错策略

  • bulkhead (并发限制器)

  • 熔断器

  • 限流器

  • 重试

  • 时间限制器 (超时)

已发布一篇操作指南,演示了 Resilience4j 与 Vert.x 的用法。该操作指南的存储库包含上述所有容错策略的 Vert.x 适配器。这些适配器将 Resilience4j API 与 Vert.x 的 Future 连接起来。

Resilience4j 2.0 需要 Java 17。