核心模块

Vert.x Core 手册

Vert.x 的核心是一组我们称之为 Vert.x Core 的 Java API

Vert.x Core 提供了以下功能:

  • 编写 TCP 客户端和服务器

  • 编写 HTTP 客户端和服务器,包括对 WebSockets 的支持

  • 事件总线

  • 共享数据 - 本地映射和集群分布式映射

  • 周期性和延迟操作

  • 部署和取消部署 Verticle

  • 数据报套接字

  • DNS 客户端

  • 文件系统访问

  • 虚拟线程

  • 高可用性

  • 原生传输

  • 集群

Core 中的功能级别相对较低——你不会在这里找到数据库访问、授权或高级 Web 功能之类的东西——这些功能你会在 Vert.x ext(扩展)中找到。

Vert.x Core 小巧轻量。你只需使用你想要的部分。它也可以完全嵌入到你现有的应用程序中——我们不会强迫你以特殊的方式构建应用程序,只为了让你能够使用 Vert.x。

你可以在 Vert.x 支持的任何其他语言中使用 Core。但这里有一个很棒的特性——我们不会强迫你直接从 JavaScript 或 Ruby 等语言使用 Java API——毕竟,不同的语言有不同的约定和惯用法,强迫 Ruby 开发者(例如)使用 Java 惯用法会很奇怪。相反,我们会自动为每种语言生成核心 Java API 的惯用等效版本。

从现在起,我们只使用词语 Core 来指代 Vert.x Core。

如果你使用 Maven 或 Gradle,请将以下依赖项添加到项目描述符的 dependencies 部分,以访问 Vert.x Core API

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-core</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-core:5.0.1'
}

让我们讨论 Core 中的不同概念和特性。

最初是 Vert.x

在 Vert.x 的世界里,除非你能与 Vertx 对象通信,否则你无法做太多事情!

它是 Vert.x 的控制中心,是你完成几乎所有事情的方式,包括创建客户端和服务器、获取事件总线的引用、设置定时器以及许多其他事情。

那么如何获取一个实例呢?

如果你正在嵌入 Vert.x,那么你只需按如下方式创建一个实例:

Vertx vertx = Vertx.vertx();
大多数应用程序只需要一个 Vert.x 实例,但如果你需要(例如)在事件总线或不同组的服务器和客户端之间进行隔离,则可以创建多个 Vert.x 实例。

创建 Vertx 对象时指定选项

创建 Vert.x 对象时,如果默认设置不适合你,你也可以指定选项。

Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));

VertxOptions 对象有许多设置,允许你配置集群、高可用性、池大小以及各种其他设置。

创建集群 Vert.x 对象

如果你正在创建集群 Vert.x(有关集群事件总线的更多信息,请参阅 事件总线 部分),那么通常会使用异步变体来创建 Vertx 对象。

这是因为集群中不同的 Vert.x 实例分组通常需要一些时间(可能几秒钟)。在此期间,我们不希望阻塞调用线程,因此我们会异步地将结果返回给你。

你熟悉流式 API 吗?

你可能已经注意到,在前面的示例中使用了流式(Fluent) API。

流式 API 允许多个方法调用链式地连接在一起。例如:

request.response().putHeader("Content-Type", "text/plain").end("some text");

这是 Vert.x API 中常见的模式,所以请习惯它。

像这样链式调用可以让你编写稍微简洁一点的代码。当然,如果你不喜欢流式方法,我们不会强迫你那样做,如果你愿意,可以欣然忽略它,并像这样编写你的代码:

HttpServerResponse response = request.response();
response.putHeader("Content-Type", "text/plain");
response.end("some text");

不要调用我们,我们会调用你。

Vert.x API 大部分是事件驱动的。这意味着当 Vert.x 中发生你感兴趣的事情时,Vert.x 会通过向你发送事件来调用你。

一些事件示例有:

  • 定时器已触发

  • 套接字上已接收到一些数据

  • 已从磁盘读取一些数据

  • 发生了一个异常

  • HTTP 服务器已收到请求

你通过向 Vert.x API 提供处理器(handler)来处理事件。例如,要每秒接收一个定时器事件,你可以这样做:

vertx.setPeriodic(1000, id -> {
  // This handler will get called every second
  System.out.println("timer fired!");
});

或者接收一个 HTTP 请求:

server.requestHandler(request -> {
  // This handler will be called every time an HTTP request is received at the server
  request.response().end("hello world!");
});

稍后,当 Vert.x 有事件要传递给你的处理器时,Vert.x 将异步调用它。

这引出了 Vert.x 中的一些重要概念。

不要阻塞我!

除了极少数例外(例如,一些以“Sync”结尾的文件系统操作),Vert.x 中的所有 API 都不会阻塞调用线程。

如果结果可以立即提供,它将立即返回;否则,你通常会提供一个处理器来稍后接收事件。

因为 Vert.x 的所有 API 都不阻塞线程,这意味着你可以使用 Vert.x,仅用少量线程来处理大量并发。

使用传统的阻塞 API,调用线程可能会在以下情况时阻塞:

  • 从套接字读取数据

  • 将数据写入磁盘

  • 向接收方发送消息并等待回复。

  • ……许多其他情况

在以上所有情况中,当你的线程等待结果时,它无法做任何其他事情——它实际上是无用的。

这意味着如果你想使用阻塞 API 实现大量并发,那么你需要大量线程来防止应用程序陷入停滞。

线程在所需内存(例如,其堆栈)和上下文切换方面都存在开销。

对于许多现代应用程序所需的并发级别,阻塞方法根本无法扩展。

Reactor 和 Multi-Reactor

我们之前提到 Vert.x API 是事件驱动的——当事件可用时,Vert.x 会将事件传递给处理器。

在大多数情况下,Vert.x 使用一个名为事件循环(event loop)的线程来调用你的处理器。

由于 Vert.x 或你的应用程序中没有任何东西会阻塞,事件循环可以愉快地持续运行,随着事件的到来依次将它们分发给不同的处理器。

因为没有任何阻塞,事件循环可以在短时间内处理大量事件。例如,单个事件循环可以非常快速地处理数千个 HTTP 请求。

我们称之为 Reactor 模式

你可能之前听说过这个——例如 Node.js 就实现了这种模式。

在标准的 Reactor 实现中,有一个单事件循环线程,它循环运行,将所有事件在到达时分发给所有处理器。

单线程的问题在于它在任何时候只能在一个核心上运行,因此如果你希望你的单线程 Reactor 应用程序(例如你的 Node.js 应用程序)在多核服务器上扩展,你必须启动并管理许多不同的进程。

Vert.x 在这方面有所不同。每个 Vertx 实例不是维护一个事件循环,而是维护多个事件循环。默认情况下,我们根据机器上可用核心的数量来选择数量,但这可以被覆盖。

这意味着单个 Vertx 进程可以跨服务器扩展,这与 Node.js 不同。

我们将这种模式称为多 Reactor 模式(Multi-Reactor Pattern),以区别于单线程 Reactor 模式。

即使 Vertx 实例维护多个事件循环,任何特定的处理器都不会并发执行,并且在大多数情况下(工作 verticle 除外)将始终使用完全相同的事件循环调用。

黄金法则 - 不要阻塞事件循环

我们已经知道 Vert.x API 是非阻塞的,不会阻塞事件循环,但如果你在处理器中自己阻塞事件循环,那也没多大帮助。

如果你这样做,那么在它被阻塞时,该事件循环将无法执行任何其他操作。如果你阻塞了 Vertx 实例中的所有事件循环,那么你的应用程序将完全停滞!

所以不要这样做!你已被警告

阻塞的例子包括:

  • Thread.sleep()

  • 等待锁

  • 等待互斥锁或监视器(例如 synchronized 块)

  • 执行长时间的数据库操作并等待结果

  • 执行耗时较长的复杂计算。

  • 循环中空转

如果上述任何操作使事件循环在相当长的时间内无法执行任何其他操作,那么你应该立即受到警告,并等待进一步的指示。

那么……什么是相当长的时间呢?

这就像“一根绳子有多长”一样。它真正取决于你的应用程序以及你所需的并发量。

如果你有一个单一事件循环,并且你想每秒处理 10000 个 HTTP 请求,那么很明显每个请求的处理时间不能超过 0.1 毫秒,所以你不能阻塞超过这个时间。

算术不难,留作读者的练习。

如果你的应用程序没有响应,这可能表明你在某个地方阻塞了事件循环。为了帮助你诊断此类问题,Vert.x 会在检测到事件循环长时间未返回时自动记录警告。如果你在日志中看到此类警告,则应进行调查。

Thread vertx-eventloop-thread-3 has been blocked for 20458 ms

Vert.x 还会提供堆栈跟踪,以精确指出阻塞发生的位置。

如果你想关闭这些警告或更改设置,可以在创建 Vertx 对象之前在 VertxOptions 对象中进行设置。

Future 结果

Vert.x 4 使用 future 来表示异步结果。

任何异步方法都会返回一个 Future 对象作为调用结果:一个成功或一个失败

你不能直接与 future 的结果交互,而是需要设置一个处理器,该处理器将在 future 完成且结果可用时被调用,就像任何其他类型的事件一样。

FileSystem fs = vertx.fileSystem();

Future<FileProps> future = fs.props("/my_file.txt");

future.onComplete((AsyncResult<FileProps> ar) -> {
  if (ar.succeeded()) {
    FileProps props = ar.result();
    System.out.println("File size = " + props.size());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

不要将 futurespromises 混淆。

如果 future 代表异步结果的“读端”,那么 promise 则是“写端”。它们允许你延迟提供结果的操作。

在大多数情况下,你不需要在 Vert.x 应用程序中自己创建 promise。Future 组合Future 协调为你提供了转换和合并异步结果的工具。

诸如 onSuccessonFailureonComplete 等终止操作对回调的调用顺序不提供任何保证。

考虑一个注册了 2 个回调的 future:

future.onComplete(ar -> {
  // Do something
});
future.onComplete(ar -> {
  // May be invoked first
});

第二个回调可能在第一个回调之前被调用。

如果你需要这样的保证,请考虑使用 andThen 进行 Future 组合

Future 组合

compose 可用于链式连接 futures:

  • 当当前 future 成功时,应用给定函数,该函数返回一个 future。当此返回的 future 完成时,组合成功。

  • 当当前 future 失败时,组合失败。

FileSystem fs = vertx.fileSystem();

Future<Void> future = fs
  .createFile("/foo")
  .compose(v -> {
    // When the file is created (fut1), execute this:
    return fs.writeFile("/foo", Buffer.buffer());
  })
  .compose(v -> {
    // When the file is written (fut2), execute this:
    return fs.move("/foo", "/bar");
  });

在此示例中,有 3 个操作链式连接在一起:

  1. 创建一个文件

  2. 将数据写入此文件

  3. 移动该文件

当这 3 个步骤都成功时,最终的 future(future)将成功。但是,如果其中一个步骤失败,最终的 future 将失败。

除此之外,Future 还提供了更多功能:maprecoverotherwiseandThen,甚至还有一个 flatMap,它是 compose 的别名。

Future 协调

Vert.x 的 futures 可以实现多个 future 的协调。它支持并发组合(并行运行多个异步操作)和顺序组合(链式异步操作)。

Future.all 接受多个 future 参数(最多 6 个),并返回一个 future,当所有 future 都成功时,该 future 成功;当至少一个 future 失败时,该 future 失败

Future<HttpServer> httpServerFuture = httpServer.listen();

Future<NetServer> netServerFuture = netServer.listen();

Future.all(httpServerFuture, netServerFuture).onComplete(ar -> {
  if (ar.succeeded()) {
    // All servers started
  } else {
    // At least one server failed
  }
});

这些操作并发运行,当组合完成时,附加到返回 future 的 Handler 会被调用。当其中一个操作失败(传入的 future 之一被标记为失败)时,结果 future 也被标记为失败。当所有操作成功时,结果 future 以成功完成。

成功时,resultAt 方法保证结果的顺序与调用 Future.all 时指定的顺序相同。在上面的例子中,无论哪个项首先完成,httpServer 的结果都可以通过 resultAt(0) 访问,netServer 的结果可以通过 resultAt(1) 访问。

或者,你可以传入一个 future 列表(可能为空):

Future.all(Arrays.asList(future1, future2, future3));

all 组合会等待直到所有 future 都成功(或一个失败),而 any 组合会等待第一个成功的 future。Future.any 接受多个 future 参数(最多 6 个),并返回一个 future,当其中一个 future 成功时,该 future 成功;当所有 future 都失败时,该 future 失败。

Future.any(future1, future2).onComplete(ar -> {
  if (ar.succeeded()) {
    // At least one is succeeded
  } else {
    // All failed
  }
});

也可以使用 future 列表:

Future.any(Arrays.asList(f1, f2, f3));

join 组合会等待直到所有 future 都完成,无论是成功还是失败。Future.join 接受多个 future 参数(最多 6 个),并返回一个 future,当所有 future 都成功时,该 future 成功;当所有 future 都完成且至少有一个失败时,该 future 失败。

Future.join(future1, future2, future3).onComplete(ar -> {
  if (ar.succeeded()) {
    // All succeeded
  } else {
    // All completed and at least one failed
  }
});

也可以使用 future 列表:

Future.join(Arrays.asList(future1, future2, future3));

CompletionStage 互操作性

Vert.x Future API 提供了与 CompletionStage双向兼容性,CompletionStage 是 JDK 中用于可组合异步操作的接口。

我们可以使用 toCompletionStage 方法将 Vert.x Future 转换为 CompletionStage,如下所示:

Future<String> future = vertx.createDnsClient().lookup("vertx.io");
future.toCompletionStage().whenComplete((ip, err) -> {
  if (err != null) {
    System.err.println("Could not resolve vertx.io");
    err.printStackTrace();
  } else {
    System.out.println("vertx.io => " + ip);
  }
});

相反,我们可以使用 Future.fromCompletionStageCompletionStage 转换为 Vert.x Future。有两种变体:

  1. 第一种变体只接受一个 CompletionStage,并从解析 CompletionStage 实例的线程中调用 Future 方法;

  2. 第二种变体接受一个额外的 Context 参数,以便在 Vert.x 上下文中调用 Future 方法。

在大多数情况下,你将希望使用带有 CompletionStageContext 的变体,以遵循 Vert.x 线程模型,因为 Vert.x Future 更可能与 Vert.x 代码、库和客户端一起使用。

以下是将 CompletionStage 转换为 Vert.x Future 并在上下文中分派的示例:

Future.fromCompletionStage(completionStage, vertx.getOrCreateContext())
  .flatMap(str -> {
    String key = UUID.randomUUID().toString();
    return storeInDb(key, str);
  })
  .onSuccess(str -> {
    System.out.println("We have a result: " + str);
  })
  .onFailure(err -> {
    System.err.println("We have a problem");
    err.printStackTrace();
  });

Verticle

Vert.x 开箱即用地提供了一个简单、可伸缩的、类似 Actor 的部署和并发模型,你可以使用它来节省自己编写的时间。

该模型是完全可选的,如果你不愿意,Vert.x 不会强迫你以这种方式创建应用程序。.

该模型并不声称是严格的 Actor 模型实现,但它确实在并发、扩展和部署方面具有相似之处。

要使用此模型,你需要将代码编写为一组 verticle

Verticle 是由 Vert.x 部署和运行的代码块。默认情况下,一个 Vert.x 实例维护 N 个事件循环线程(其中 N 默认是核心数*2)。Verticle 可以用 Vert.x 支持的任何语言编写,并且单个应用程序可以包含用多种语言编写的 verticle。

你可以将 verticle 看作 Actor 模型 中的一个 Actor。

一个应用程序通常由许多同时在同一个 Vert.x 实例中运行的 verticle 实例组成。不同的 verticle 实例通过在 事件总线 上发送消息来相互通信。

编写 Verticle

Verticle 类必须实现 Deployable 接口。

如果你愿意,它们可以直接实现该接口,但通常更简单的方法是扩展抽象类 VerticleBase

这里有一个 verticle 示例:

class MyVerticle extends VerticleBase {

  // Called when verticle is deployed
  public Future<?> start() throws Exception {
    return super.start();
  }

  // Optional - called when verticle is un-deployed
  public Future<?> stop() throws Exception {
    return super.stop();
  }
}

通常你会像上面的示例那样覆盖 start 方法。

当 Vert.x 部署 verticle 时,它会调用 start 方法,当该方法返回的 future 完成时,该 verticle 将被视为已启动。

你也可以选择性地覆盖 stop 方法。当 verticle 被取消部署时,Vert.x 会调用此方法,并且当该方法返回的 future 完成时,该 verticle 将被视为已停止。

这里有一个更详细的示例:

class MyVerticle extends VerticleBase {

  private HttpServer server;

  @Override
  public Future<?> start() {
    server = vertx.createHttpServer().requestHandler(req -> {
      req.response()
        .putHeader("content-type", "text/plain")
        .end("Hello from Vert.x!");
    });

    // Now bind the server:
    return server.listen(8080);
  }
}

你甚至可以编写一个单行 Verticle:

Deployable verticle = context -> vertx
  .createHttpServer()
  .requestHandler(req -> req.response()
    .putHeader("content-type", "text/plain")
    .end("Hello from Vert.x!"))
  .listen(8080);
你不需要在 verticle 的 stop 方法中手动停止由 verticle 启动的 HTTP 服务器。当 verticle 被取消部署时,Vert.x 会自动停止任何正在运行的服务器。

Vert.x 4 Verticle 和 AbstractVerticle 合同发生了什么变化?

VerticleAbstractVerticle 定义的契约不再适用于 Vert.x 5 基于 future 的模型。

class MyVerticle extends AbstractVerticle {
  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    Future<String> future = bindService();

    // Requires to write
    future.onComplete(ar -> {
      if (ar.succeeded()) {
        startPromise.complete();
      } else {
        startPromise.fail(ar.cause());
      }
    });

    // Or
    future
      .<Void>mapEmpty()
      .onComplete(startPromise);
  }
}

尽管如此,VerticleAbstractVerticle 在 Vert.x 5 中并未弃用。使用它们没有问题,但它们不再是默认推荐的选择。

Verticle 类型

Verticle 有两种不同的类型:

标准 Verticle

这是最常见和最有用的类型——它们总是使用事件循环线程执行。我们将在下一节中详细讨论这一点。

工作 Verticle

它们使用工作线程池中的线程运行。一个实例绝不会被一个以上的线程并发执行。

标准 verticle

标准 verticle 在创建时会被分配一个事件循环线程,并且 start 方法会使用该事件循环进行调用。当你从事件循环中调用任何其他接受核心 API 处理器的 Vert.x 方法时,Vert.x 将保证这些处理器在被调用时会在相同的事件循环上执行。

这意味着我们可以保证你的 verticle 实例中的所有代码始终在同一个事件循环上执行(只要你不创建自己的线程并调用它!)。

这意味着你可以将应用程序中的所有代码编写为单线程,然后让 Vert.x 处理线程和扩展。不再需要担心 synchronized 和 volatile,并且还可以避免在手动编写“传统”多线程应用程序开发时普遍存在的许多其他竞争条件和死锁情况。

工作 verticle

工作 verticle 与标准 verticle 类似,但它使用的是 Vert.x 工作线程池中的线程执行,而不是使用事件循环。

工作 verticle 专为调用阻塞代码而设计,因为它们不会阻塞任何事件循环。

如果你不想使用工作 verticle 来运行阻塞代码,你也可以在事件循环上直接运行内联阻塞代码

如果你想将 verticle 部署为工作 verticle,可以使用 setThreadingModel

DeploymentOptions options = new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER);
vertx.deployVerticle(new MyOrderProcessorVerticle(), options);

工作 verticle 实例绝不会被 Vert.x 的多个线程并发执行,但可以在不同时间由不同线程执行。

虚拟线程 verticle

虚拟线程 verticle 类似于标准 verticle,但它使用虚拟线程执行,而不是使用事件循环。

虚拟线程 verticle 旨在与 Vert.x futures 配合使用 async/await 模型。

如果你想将 verticle 部署为虚拟线程 verticle,可以使用 setThreadingModel

DeploymentOptions options = new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD);
vertx.deployVerticle(new MyOrderProcessorVerticle(), options);
此功能需要 Java 21

以编程方式部署 verticle

你可以使用 deployVerticle 方法之一来部署 verticle,指定 verticle 名称,或者传入你已经创建的 verticle 实例。

VerticleBase myVerticle = new MyVerticle();
vertx.deployVerticle(myVerticle);

你也可以通过指定 verticle 名称来部署 verticle。

verticle 名称用于查找将实例化实际 verticle 实例的特定 VerticleFactory

这是一个使用类名部署 Java Verticle 的示例:

vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle");

将 verticle 名称映射到 verticle 工厂的规则

当使用名称部署 verticle 时,该名称用于选择将实例化 verticle 的实际 verticle 工厂。

Verticle 名称可以带前缀——这是一个字符串,后面跟着冒号。当存在前缀时,它将用于查找工厂,例如:

 groovy:com.mycompany.SomeGroovyCompiledVerticle // Use the Groovy verticle factory

如果没有前缀,Vert.x 将查找后缀并使用它来查找工厂,例如:

 SomeScript.groovy // Will use the Groovy verticle factory

如果没有前缀或后缀,Vert.x 将假定它是一个 Java 完全限定类名(FQCN),并尝试实例化它。

Verticle 工厂是如何定位的?

大多数 Verticle 工厂都是从 classpath 加载并在 Vert.x 启动时注册的。

如果你愿意,你也可以使用 registerVerticleFactoryunregisterVerticleFactory 以编程方式注册和注销 verticle 工厂。

等待部署完成

Verticle 部署是异步的,可能在调用 deploy 返回后一段时间才完成。

如果你想在部署完成后收到通知,可以在部署时指定一个完成处理器:

vertx
  .deployVerticle(new MyOrderProcessorVerticle())
  .onComplete(res -> {
    if (res.succeeded()) {
      System.out.println("Deployment id is: " + res.result());
    } else {
      System.out.println("Deployment failed!");
    }
  });

如果部署成功,完成处理器将收到一个包含部署 ID 字符串的结果。

如果以后你想取消部署,可以使用此部署 ID。

取消部署 verticle

可以使用 undeploy 取消部署。

取消部署本身是异步的,因此如果你想在取消部署完成后收到通知,可以在部署时指定一个完成处理器:

vertx
  .undeploy(deploymentID)
  .onComplete(res -> {
    if (res.succeeded()) {
      System.out.println("Undeployed ok");
    } else {
      System.out.println("Undeploy failed!");
    }
  });

指定 verticle 实例数量

当使用 verticle 部署 verticle 时,你可以指定要部署的 verticle 实例数量,你还需要传递一个 Callable<Deployable>,以便 Vert.x 可以实例化你的 verticle 实例。

DeploymentOptions options = new DeploymentOptions().setInstances(16);
vertx.deployVerticle(() -> new MyOrderProcessorVerticle(), options);

这对于在多个核心上轻松扩展非常有用。例如,你可能有一个要部署的 Web 服务器 verticle,并且你的机器有多个核心,因此你希望部署多个实例来利用所有核心。

向 verticle 传递配置

JSON 格式的配置可以在部署时传递给 verticle:

JsonObject config = new JsonObject().put("name", "tim").put("directory", "/blah");
DeploymentOptions options = new DeploymentOptions().setConfig(config);
vertx.deployVerticle(new MyOrderProcessorVerticle(), options);

然后,可以通过 Context 对象或直接使用 config 方法访问此配置。配置以 JSON 对象的形式返回,因此你可以按如下方式检索数据:

System.out.println("Configuration: " + config().getString("name"));

在 Verticle 中访问环境变量

环境变量和系统属性可以使用 Java API 访问。

System.getProperty("prop");
System.getenv("HOME");

使 Vert.x 退出

Vert.x 实例维护的线程不是守护线程,因此它们会阻止 JVM 退出。

如果你正在嵌入 Vert.x 并且已完成使用它,你可以调用 close 来关闭它。

这将关闭所有内部线程池并关闭其他资源,并允许 JVM 退出。

Context 对象

当 Vert.x 向处理器提供事件或调用 Verticle 的 start 或 stop 方法时,执行与一个 Context 相关联。通常,上下文是事件循环上下文,并绑定到特定的事件循环线程。因此,该上下文的执行始终发生在完全相同的事件循环线程上。对于工作 verticle 和运行内联阻塞代码的情况,将有一个工作上下文与执行相关联,该工作上下文将使用工作线程池中的线程。

要检索上下文,请使用 getOrCreateContext 方法:

Context context = vertx.getOrCreateContext();

如果当前线程已有关联的上下文,则重用该上下文对象。否则,将创建一个新的上下文实例。你可以测试你检索到的上下文的类型

Context context = vertx.getOrCreateContext();
if (context.isEventLoopContext()) {
  System.out.println("Context attached to Event Loop");
} else if (context.isWorkerContext()) {
  System.out.println("Context attached to Worker Thread");
} else if (! Context.isOnVertxThread()) {
  System.out.println("Context not attached to a thread managed by vert.x");
}

当你检索到上下文对象后,你可以在此上下文中异步运行代码。换句话说,你提交一个任务,该任务最终将在相同的上下文中运行,但会在稍后:

vertx.getOrCreateContext().runOnContext( (v) -> {
  System.out.println("This will be executed asynchronously in the same context");
});

当多个处理器在同一个上下文中运行时,它们可能希望共享数据。上下文对象提供了存储和检索上下文中共享数据的方法。例如,它允许你将数据传递给使用 runOnContext 运行的某些操作:

final Context context = vertx.getOrCreateContext();
context.put("data", "hello");
context.runOnContext((v) -> {
  String hello = context.get("data");
});

上下文对象还允许你使用 config 方法访问 verticle 配置。有关此配置的更多详细信息,请查看向 verticle 传递配置部分。

执行周期性操作和延迟操作

在 Vert.x 中,在延迟后或周期性地执行某个操作是非常常见的。

在标准 verticle 中,你不能仅仅让线程休眠来引入延迟,因为那会阻塞事件循环线程。

相反,你使用 Vert.x 定时器。定时器可以是一次性(one-shot)周期性(periodic)的。我们将讨论这两种类型:

一次性定时器

一次性定时器在特定延迟(以毫秒表示)后调用一个事件处理器。

要设置定时器触发一次,你可以使用 setTimer 方法,传入延迟时间和处理器:

long timerID = vertx.setTimer(1000, id -> {
  System.out.println("And one second later this is printed");
});

System.out.println("First this is printed");

返回值是一个唯一的定时器 ID,该 ID 稍后可用于取消定时器。处理器也会传入定时器 ID。

周期性定时器

你也可以使用 setPeriodic 来设置定时器周期性触发。

将有一个等于周期的初始延迟。

setPeriodic 的返回值是一个唯一的定时器 ID(long 类型)。如果需要取消定时器,稍后可以使用此 ID。

传入定时器事件处理器的参数也是唯一的定时器 ID。

请记住,定时器会周期性触发。如果你的周期性处理需要很长时间才能完成,你的定时器事件可能会持续运行,甚至更糟:堆积起来。

在这种情况下,你应该考虑使用 setTimer。一旦你的处理完成,你就可以设置下一个定时器。

long timerID = vertx.setPeriodic(1000, id -> {
  System.out.println("And every second this is printed");
});

System.out.println("First this is printed");

取消定时器

要取消周期性定时器,请调用 cancelTimer 并指定定时器 ID。例如:

vertx.cancelTimer(timerID);

作为 Future 的定时器

Timer 在单个 API 中结合了一次性定时器。

Future<String> timer = vertx
  .timer(10, TimeUnit.SECONDS)
  .map(v -> "Success");

timer.onSuccess(value -> {
  System.out.println("Timer fired: " + value);
});
timer.onFailure(cause -> {
  System.out.println("Timer cancelled: " + cause.getMessage());
});

当定时器触发时,future 成功;反之,取消定时器会使 future 失败。

verticle 中的自动清理

如果你从 verticle 内部创建定时器,当 verticle 被取消部署时,这些定时器将自动关闭。

Verticle 工作池

Verticle 使用 Vert.x 工作池来执行阻塞操作,即 executeBlocking 或工作 verticle。

可以在部署选项中指定不同的工作池:

vertx.deployVerticle(new MyOrderProcessorVerticle(), new DeploymentOptions().setWorkerPoolName("the-specific-pool"));

事件总线

事件总线是 Vert.x 的神经系统

每个 Vert.x 实例都有一个事件总线实例,通过 eventBus 方法获取。

事件总线允许你的应用程序的不同部分相互通信,无论它们是用何种语言编写的,也无论它们是在同一个 Vert.x 实例中,还是在不同的 Vert.x 实例中。

它甚至可以被桥接到,允许在浏览器中运行的客户端 JavaScript 在同一个事件总线上进行通信。

事件总线形成了一个跨越多个服务器节点和多个浏览器的分布式点对点消息系统。

事件总线支持发布/订阅、点对点和请求-响应消息。

事件总线 API 非常简单。它基本上包括注册处理器、注销处理器以及发送和发布消息。

首先是一些理论

理论

地址寻址

消息通过事件总线发送到一个地址

Vert.x 不费心于任何花哨的地址方案。在 Vert.x 中,地址只是一个字符串。任何字符串都是有效的。然而,明智的做法是使用某种方案,例如使用句点来划分命名空间。

有效地址的一些例子是 europe.news.feed1、acme.games.pacman、sausages 和 X。

处理器

消息由处理器接收。你在一个地址上注册一个处理器。

许多不同的处理器可以注册到同一个地址。

一个处理器可以注册到许多不同的地址。

发布/订阅消息

事件总线支持发布消息。

消息发布到一个地址。发布意味着将消息传递给注册在该地址上的所有处理器。

这是大家熟悉的发布/订阅消息模式。

点对点和请求-响应消息

事件总线也支持点对点消息。

消息被发送到一个地址。Vert.x 然后将它们路由到注册在该地址的众多处理器中的一个。

如果存在多个处理器注册到同一地址,将使用非严格的轮询算法选择其中一个。

在点对点消息中,发送消息时可以指定一个可选的回复处理器。

当接收方收到并处理消息后,接收方可以选择回复该消息。如果他们这样做,则会调用回复处理器。

当发送方收到回复后,也可以再次回复。这可以无限期地重复,并允许在两个不同的 verticle 之间建立对话。

这是一种常见的消息模式,称为请求-响应模式。

尽力而为的交付

Vert.x 尽力传递消息,不会有意丢弃它们。这被称为尽力而为的交付。

然而,在事件总线全部或部分发生故障的情况下,消息有可能会丢失。

如果你的应用程序关心丢失的消息,你应该将你的处理器编写为幂等,并让你的发送者在恢复后进行重试。

消息类型

Vert.x 开箱即用地支持将任何原始/简单类型、String 或 缓冲区作为消息发送。

然而,在 Vert.x 中,将消息作为 JSON 发送是一种约定俗成的常见做法。

JSON 在 Vert.x 支持的所有语言中都非常容易创建、读取和解析,因此它已成为 Vert.x 的一种通用语

但是,如果你不想使用 JSON,我们不会强迫你。

事件总线非常灵活,还支持通过事件总线发送任意对象。你可以通过为你想要发送的对象定义一个 编解码器(codec)来实现这一点。

事件总线 API

让我们深入了解 API。

获取事件总线

你可以按如下方式获取事件总线的引用:

EventBus eb = vertx.eventBus();

每个 Vert.x 实例都有一个事件总线实例。

注册处理器

注册处理器最简单的方法是使用 consumer。以下是一个示例:

EventBus eb = vertx.eventBus();

eb.consumer("news.uk.sport", message -> {
  System.out.println("I have received a message: " + message.body());
});

当消息到达你的处理器时,你的处理器将被调用,并传入 消息(message)

调用 consumer() 返回的对象是 MessageConsumer 的一个实例。

此对象随后可用于注销处理器,或将处理器用作流。

另外,你可以使用 consumer 返回一个未设置处理器的 MessageConsumer,然后在其上设置处理器。例如:

EventBus eb = vertx.eventBus();

MessageConsumer<String> consumer = eb.consumer("news.uk.sport");
consumer.handler(message -> {
  System.out.println("I have received a message: " + message.body());
});

当在集群事件总线上注册处理器时,注册传播到集群所有节点可能需要一些时间。

如果你想在此完成时收到通知,可以使用 MessageConsumer 对象上的 完成 future

consumer.completion().onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("The handler registration has reached all nodes");
  } else {
    System.out.println("Registration failed!");
  }
});

注销处理器

要注销处理器,请调用 unregister

如果你在集群事件总线上,注销可能需要一些时间才能在节点间传播。如果你想在此完成时收到通知,请使用 unregister 返回的 future。

consumer
  .unregister()
  .onComplete(res -> {
    if (res.succeeded()) {
      System.out.println("The handler un-registration has reached all nodes");
    } else {
      System.out.println("Un-registration failed!");
    }
  });

发布消息

发布消息很简单。只需使用 publish 并指定要发布的地址。

eventBus.publish("news.uk.sport", "Yay! Someone kicked a ball");

该消息将然后传递给所有注册到地址 news.uk.sport 的处理器。

发送消息

发送消息将导致注册在该地址上的唯一一个处理器接收到消息。这是点对点消息模式。处理器以非严格的轮询方式选择。

你可以使用 send 发送消息。

eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");

在消息上设置标头

通过事件总线发送的消息也可以包含标头。这可以通过在发送或发布时提供一个 DeliveryOptions 来指定:

DeliveryOptions options = new DeliveryOptions();
options.addHeader("some-header", "some-value");
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball", options);

消息排序

Vert.x 将以从任何特定发送方发送的相同顺序向任何特定处理器传递消息。

消息对象

你在消息处理器中接收到的对象是 Message

消息的 主体(body)对应于被发送或发布的对象。

消息的标头可以通过 headers 获取。

消息确认/发送回复

当使用 send 时,事件总线会尝试将消息传递给在事件总线上注册的 MessageConsumer

在某些情况下,发送方需要知道消费者何时收到消息并使用请求-响应模式“处理”了它,这很有用。

为了确认消息已处理,消费者可以通过调用 reply 来回复消息。

当发生这种情况时,会向发送方发送一个回复,并且回复处理器会以该回复被调用。

一个例子将阐明这一点:

接收方

MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
consumer.handler(message -> {
  System.out.println("I have received a message: " + message.body());
  message.reply("how interesting!");
});

发送方

eventBus
  .request("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Received reply: " + ar.result().body());
    }
  });

回复可以包含消息主体,其中可以包含有用的信息。

“处理”实际意味着什么是由应用程序定义的,完全取决于消息消费者做什么,而不是 Vert.x 事件总线本身知道或关心的事情。

一些例子:

  • 一个实现返回当天时间的服务的简单消息消费者,会以回复体中包含当天时间的消息进行确认。

  • 一个实现持久化队列的消息消费者,如果消息成功持久化到存储中,可能会用 true 进行确认;如果失败,则用 false

  • 一个处理订单的消息消费者,在订单成功处理后会以 true 进行确认,这样订单就可以从数据库中删除了。

带超时发送

当发送带回复处理器的消息时,你可以在 DeliveryOptions 中指定超时时间。

如果在该时间内未收到回复,则回复处理器将以失败状态被调用。

默认超时时间为 30 秒。

发送失败

消息发送可能由于其他原因失败,包括:

  • 没有可用的处理器可以发送消息

  • 接收方已使用 fail 显式地使消息失败。

在所有情况下,回复处理器都将以特定的失败被调用。

消息编解码器

如果你为你想要发送的对象定义并注册一个 消息编解码器(message codec),你就可以在事件总线上发送任何你喜欢的对象。

消息编解码器有一个名称,你在发送或发布消息时在 DeliveryOptions 中指定该名称:

eventBus.registerCodec(myCodec);

DeliveryOptions options = new DeliveryOptions().setCodecName(myCodec.name());

eventBus.send("orders", new MyPOJO(), options);

如果你总是希望为特定类型使用相同的编解码器,那么你可以为其注册一个默认编解码器,这样在每次发送时就不必在传输选项中指定编解码器了:

eventBus.registerDefaultCodec(MyPOJO.class, myCodec);

eventBus.send("orders", new MyPOJO());

你可以使用 unregisterCodec 注销消息编解码器。

消息编解码器不一定总是以相同的类型进行编码和解码。例如,你可以编写一个编解码器,允许发送 MyPOJO 类,但当该消息发送到处理器时,它会作为 MyOtherPOJO 类到达。

Vert.x 为某些数据类型提供了内置编解码器:

  • 基本类型(string、byte array、byte、int、long、double、boolean、short、char),或

  • 某些 Vert.x 数据类型(buffers、JSON array、JSON objects),或

  • 实现 ClusterSerializable 接口的类型,或

  • 实现 java.io.Serializable 接口的类型。

在集群模式下,出于安全原因,ClusterSerializablejava.io.Serializable 对象默认被拒绝。

你可以通过提供检查类名的函数来定义允许进行编码和解码的类:

集群事件总线

事件总线不仅仅存在于单个 Vert.x 实例中。通过在你的网络上将不同的 Vert.x 实例集群起来,它们可以形成一个单一的分布式事件总线。

如果你以编程方式创建 Vert.x 实例,可以通过将 Vert.x 实例配置为集群模式来获得集群事件总线:

VertxOptions options = new VertxOptions();
Vertx
  .clusteredVertx(options)
  .onComplete(res -> {
    if (res.succeeded()) {
      Vertx vertx = res.result();
      EventBus eventBus = vertx.eventBus();
      System.out.println("We now have a clustered event bus: " + eventBus);
    } else {
      System.out.println("Failed: " + res.cause());
    }
  });

你还应该确保你的 classpath 中有一个 ClusterManager 实现。

verticle 中的自动清理

如果你从 verticle 内部注册事件总线处理器,当 verticle 被取消部署时,这些处理器将自动注销。

配置事件总线

事件总线可以配置。当事件总线集群化时,这尤其有用。在底层,事件总线使用 TCP 连接来发送和接收消息,因此 EventBusOptions 允许你配置这些 TCP 连接的所有方面。由于事件总线既充当服务器又充当客户端,因此其配置与 NetClientOptionsNetServerOptions 相似。

VertxOptions options = new VertxOptions()
    .setEventBusOptions(new EventBusOptions()
        .setSsl(true)
        .setKeyCertOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
        .setTrustOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
        .setClientAuth(ClientAuth.REQUIRED)
    );

Vertx
  .clusteredVertx(options)
  .onComplete(res -> {
    if (res.succeeded()) {
      Vertx vertx = res.result();
      EventBus eventBus = vertx.eventBus();
      System.out.println("We now have a clustered event bus: " + eventBus);
    } else {
      System.out.println("Failed: " + res.cause());
    }
  });

上面的代码片段描绘了你如何为事件总线使用 SSL 连接,而不是普通的 TCP 连接。

为了在集群模式下强制实施安全性,你必须配置集群管理器以使用加密或强制安全。有关更多详细信息,请参阅集群管理器的文档。

事件总线配置需要在所有集群节点中保持一致。

EventBusOptions 还允许你指定事件总线是否为集群模式、端口和主机。

在容器中使用时,你还可以配置公共主机和端口:

VertxOptions options = new VertxOptions()
    .setEventBusOptions(new EventBusOptions()
        .setClusterPublicHost("whatever")
        .setClusterPublicPort(1234)
    );

Vertx
  .clusteredVertx(options)
  .onComplete(res -> {
    if (res.succeeded()) {
      Vertx vertx = res.result();
      EventBus eventBus = vertx.eventBus();
      System.out.println("We now have a clustered event bus: " + eventBus);
    } else {
      System.out.println("Failed: " + res.cause());
    }
  });

JSON

与其他一些语言不同,Java 没有对 JSON 的一流支持,因此我们提供了两个类,以便在你的 Vert.x 应用程序中处理 JSON 更加容易。

JSON 对象

JsonObject 类表示 JSON 对象。

JSON 对象基本上就是一个映射,它有字符串键,值可以是 JSON 支持的类型之一(字符串、数字、布尔值)。

JSON 对象也支持空值。

创建 JSON 对象

空的 JSON 对象可以使用默认构造函数创建。

你可以按如下方式从字符串 JSON 表示创建 JSON 对象:

String jsonString = "{\"foo\":\"bar\"}";
JsonObject object = new JsonObject(jsonString);

你可以按如下方式从映射创建 JSON 对象:

Map<String, Object> map = new HashMap<>();
map.put("foo", "bar");
map.put("xyz", 3);
JsonObject object = new JsonObject(map);

将条目放入 JSON 对象

使用 put 方法将值放入 JSON 对象。

由于流式 API,方法调用可以链式进行:

JsonObject object = new JsonObject();
object.put("foo", "bar").put("num", 123).put("mybool", true);

从 JSON 对象获取值

你可以使用 getXXX 方法从 JSON 对象获取值,例如:

String val = jsonObject.getString("some-key");
int intVal = jsonObject.getInteger("some-other-key");

JSON 对象和 Java 对象之间的映射

你可以按如下方式从 Java 对象的字段创建 JSON 对象:

你可以按如下方式实例化一个 Java 对象并从 JSON 对象填充其字段:

request.bodyHandler(buff -> {
  JsonObject jsonObject = buff.toJsonObject();
  User javaObject = jsonObject.mapTo(User.class);
});

请注意,上述两种映射方向都使用 Jackson 的 ObjectMapper#convertValue() 来执行映射。有关字段和构造函数可见性的影响、跨对象引用序列化和反序列化的注意事项等信息,请参阅 Jackson 文档。

但是,在最简单的情况下,如果 Java 类的所有字段都是 public(或具有 public getter/setter),并且存在一个 public 默认构造函数(或没有定义的构造函数),则 mapFrommapTo 都应该成功。

只要对象图是无环的,引用对象将以递归方式序列化/反序列化为/从嵌套的 JSON 对象。

将 JSON 对象编码为字符串

你使用 encode 将对象编码为字符串形式。

JSON 数组

JsonArray 类表示 JSON 数组。

JSON 数组是值的序列(字符串、数字、布尔值)。

JSON 数组也可以包含空值。

创建 JSON 数组

空的 JSON 数组可以使用默认构造函数创建。

你可以按如下方式从字符串 JSON 表示创建 JSON 数组:

String jsonString = "[\"foo\",\"bar\"]";
JsonArray array = new JsonArray(jsonString);

向 JSON 数组添加条目

你可以使用 add 方法向 JSON 数组添加条目。

JsonArray array = new JsonArray();
array.add("foo").add(123).add(false);

从 JSON 数组获取值

你可以使用 getXXX 方法从 JSON 数组获取值,例如:

String val = array.getString(0);
Integer intVal = array.getInteger(1);
Boolean boolVal = array.getBoolean(2);

将 JSON 数组编码为字符串

你使用 encode 将数组编码为字符串形式。

创建任意 JSON

创建 JSON 对象和数组假设你正在使用有效的字符串表示。

当你 F不确定字符串的有效性时,你应该改用 Json.decodeValue

Object object = Json.decodeValue(arbitraryJson);
if (object instanceof JsonObject) {
  // That's a valid json object
} else if (object instanceof JsonArray) {
  // That's a valid json array
} else if (object instanceof String) {
  // That's valid string
} else {
  // etc...
}

Jackson 配置

读取约束配置

自 Jackson 2.15 起,已添加上限约束以限制解析 JSON 输入时累积的字节数。

你可以使用以下系统属性覆盖 Vert.x 使用的底层解析器的默认配置:

  • vertx.jackson.defaultReadMaxNestingDepth: 最大嵌套深度

  • vertx.jackson.defaultReadMaxDocumentLength: 最大文档长度

  • vertx.jackson.defaultReadMaxNumberLength: 最大数字值长度

  • vertx.jackson.defaultReadMaxStringLength: 最大字符串值长度

  • vertx.jackson.defaultReadMaxNameLength: 最大属性名长度

  • vertx.jackson.defaultMaxTokenCount: 最大令牌数

你可以参考此处获取更多信息。

Json 指针

Vert.x 提供了 RFC6901 中的 Json 指针实现。你可以使用指针进行查询和写入。你可以使用字符串、URI 或手动附加路径来构建你的 JsonPointer

JsonPointer pointer1 = JsonPointer.from("/hello/world");
// Build a pointer manually
JsonPointer pointer2 = JsonPointer.create()
  .append("hello")
  .append("world");

实例化指针后,使用 queryJson 查询 JSON 值。你可以使用 writeJson 更新 JSON 值:

Object result1 = objectPointer.queryJson(jsonObject);
// Query a JsonArray
Object result2 = arrayPointer.queryJson(jsonArray);
// Write starting from a JsonObject
objectPointer.writeJson(jsonObject, "new element");
// Write starting from a JsonObject
arrayPointer.writeJson(jsonArray, "new element");

你可以通过提供 JsonPointerIterator 的自定义实现,将 Vert.x Json 指针与任何对象模型一起使用。

缓冲区

Vert.x 内部的大多数数据都使用缓冲区进行传输。

缓冲区是零个或多个字节的序列,可以从中读取或写入,并根据需要自动扩展以容纳写入的任何字节。你可以将缓冲区视为智能字节数组。

创建缓冲区

缓冲区可以通过使用静态 Buffer.buffer 方法之一来创建。

缓冲区可以从字符串或字节数组初始化,也可以创建空缓冲区。

这里是一些创建缓冲区的例子:

创建一个新的空缓冲区

Buffer buff = Buffer.buffer();

从字符串创建缓冲区。字符串将使用 UTF-8 编码到缓冲区中。

Buffer buff = Buffer.buffer("some string");

从字符串创建缓冲区:字符串将使用指定的编码进行编码,例如:

Buffer buff = Buffer.buffer("some string", "UTF-16");

从 byte[] 创建缓冲区

byte[] bytes = new byte[] {1, 3, 5};
Buffer buff = Buffer.buffer(bytes);

创建一个带初始大小提示的缓冲区。如果你知道你的缓冲区将写入一定量的数据,你可以创建缓冲区并指定这个大小。这使得缓冲区初始分配那么多内存,比缓冲区在数据写入时自动多次调整大小更有效率。

请注意,以这种方式创建的缓冲区是空的。它不会创建填充零到指定大小的缓冲区。

Buffer buff = Buffer.buffer(10000);

写入缓冲区

写入缓冲区有两种方式:追加和随机访问。无论哪种情况,缓冲区都会自动扩展以包含字节。使用缓冲区不可能获得 IndexOutOfBoundsException

追加到缓冲区

要将数据追加到缓冲区,请使用 appendXXX 方法。存在用于追加各种不同类型的 append 方法。

appendXXX 方法的返回值是缓冲区本身,因此这些方法可以链式调用:

Buffer buff = Buffer.buffer();

buff.appendInt(123).appendString("hello\n");

socket.write(buff);

随机访问缓冲区写入

你也可以使用 setXXX 方法在缓冲区的特定索引处写入。存在适用于各种不同数据类型的 set 方法。所有 set 方法都将一个索引作为第一个参数——这表示在缓冲区中开始写入数据的位置。

缓冲区将根据需要自动扩展以容纳数据。

Buffer buff = Buffer.buffer();

buff.setInt(1000, 123);
buff.setString(0, "hello");

从缓冲区读取

数据使用 getXXX 方法从缓冲区读取。存在适用于各种数据类型的 get 方法。这些方法的第一个参数是缓冲区中的索引,表示从何处获取数据。

Buffer buff = Buffer.buffer();
for (int i = 0; i < buff.length(); i += 4) {
  System.out.println("int value at " + i + " is " + buff.getInt(i));
}

使用无符号数

无符号数可以使用 getUnsignedXXXappendUnsignedXXXsetUnsignedXXX 方法从缓冲区读取或追加/设置到缓冲区。这在实现针对最小化带宽消耗而优化的网络协议编解码器时非常有用。

在以下示例中,值 200 在指定位置仅用一个字节设置:

Buffer buff = Buffer.buffer(128);
int pos = 15;
buff.setUnsignedByte(pos, (short) 200);
System.out.println(buff.getUnsignedByte(pos));

控制台显示 '200'。

缓冲区长度

使用 length 获取缓冲区的长度。缓冲区的长度是缓冲区中具有最大索引的字节的索引 + 1。

复制缓冲区

使用 copy 复制缓冲区:

缓冲区切片

切片缓冲区是一个新缓冲区,它依赖于原始缓冲区,即它不复制底层数据。使用 slice 创建切片缓冲区:

缓冲区重用

将缓冲区写入套接字或其他类似位置后,它们不能被重用。

编写 TCP 服务器和客户端

Vert.x 允许你轻松编写非阻塞 TCP 客户端和服务器。

创建 TCP 服务器

使用所有默认选项创建 TCP 服务器的最简单方法如下:

NetServer server = vertx.createNetServer();

配置 TCP 服务器

如果你不想使用默认设置,可以在创建服务器时传入一个 NetServerOptions 实例来配置服务器:

NetServerOptions options = new NetServerOptions().setPort(4321);
NetServer server = vertx.createNetServer(options);

启动服务器监听

要告诉服务器监听传入请求,你可以使用 listen 的其中一个替代方法。

告诉服务器监听选项中指定的主机和端口:

NetServer server = vertx.createNetServer();
server.listen();

或者在调用 listen 时指定主机和端口,忽略选项中配置的值:

NetServer server = vertx.createNetServer();
server.listen(1234, "localhost");

默认主机是 0.0.0.0,表示“监听所有可用地址”,默认端口是 0,这是一个特殊值,指示服务器查找一个随机未使用的本地端口并使用它。

实际的绑定是异步的,因此服务器可能在调用 listen 返回之后的一段时间内才真正开始监听。

如果你想在服务器实际开始监听时收到通知,可以为 listen 调用提供一个处理器。例如:

NetServer server = vertx.createNetServer();
server
  .listen(1234, "localhost")
  .onComplete(res -> {
    if (res.succeeded()) {
      System.out.println("Server is now listening!");
    } else {
      System.out.println("Failed to bind!");
    }
  });

监听随机端口

如果使用 0 作为监听端口,服务器将找到一个未使用的随机端口进行监听。

要找出服务器实际监听的端口,你可以调用 actualPort

NetServer server = vertx.createNetServer();
server
  .listen(0, "localhost")
  .onComplete(res -> {
    if (res.succeeded()) {
      System.out.println("Server is now listening on actual port: " + server.actualPort());
    } else {
      System.out.println("Failed to bind!");
    }
  });

监听 Unix 域套接字

当在 JDK 16+ 上运行,或者使用原生传输时,服务器可以监听 Unix 域套接字:

NetServer netServer = vertx.createNetServer();

// Only available when running on JDK16+, or using a native transport
SocketAddress address = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock");

netServer
  .connectHandler(so -> {
  // Handle application
  })
  .listen(address)
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Bound to socket
    } else {
      // Handle failure
    }
  });

接收传入连接通知

要在建立连接时收到通知,你需要设置一个 connectHandler

NetServer server = vertx.createNetServer();
server.connectHandler(socket -> {
  // Handle the connection in here
});

建立连接时,处理器将被调用,并带有一个 NetSocket 实例。

这是一个类似套接字的接口,用于实际连接,允许你读写数据以及执行其他各种操作,例如关闭套接字。

从套接字读取数据

要从套接字读取数据,请在套接字上设置 handler

每当套接字接收到数据时,此处理器都将与一个 Buffer 实例一起被调用。

NetServer server = vertx.createNetServer();
server.connectHandler(socket -> {
  socket.handler(buffer -> {
    System.out.println("I received some bytes: " + buffer.length());
  });
});

将数据写入套接字

你 F可以使用 write 之一向套接字写入数据。

Buffer buffer = Buffer.buffer().appendFloat(12.34f).appendInt(123);
socket.write(buffer);

// Write a string in UTF-8 encoding
socket.write("some data");

// Write a string using the specified encoding
socket.write("some data", "UTF-16");

写入操作是异步的,可能在调用 write 返回后的一段时间内才发生。

关闭处理器

如果你想在套接字关闭时收到通知,可以在其上设置一个 closeHandler

socket.closeHandler(v -> {
  System.out.println("The socket has been closed");
});

异常处理

你可以设置一个 exceptionHandler 来接收套接字上发生的任何异常。

你可以设置一个 exceptionHandler 来接收在连接传递到 connectHandler 之前(例如在 TLS 握手期间)发生的任何异常。

事件总线写入处理器

每个套接字都可以在事件总线上注册一个处理器,当此处理器接收到任何缓冲区时,它会将其写入自身。这些是本地订阅,无法从其他集群节点访问。

这使你能够通过将缓冲区发送到该处理器的地址,向可能位于完全不同 verticle 中的套接字写入数据。

此功能默认禁用,但是你可以使用 setRegisterWriteHandlersetRegisterWriteHandler 来启用它。

处理器的地址由 writeHandlerID 提供。

本地和远程地址

NetSocket 的本地地址可以通过 localAddress 获取。

NetSocket 的远程地址(即连接另一端的地址)可以通过 remoteAddress 获取。

从 classpath 发送文件或资源

文件和 classpath 资源可以直接使用 sendFile 写入套接字。这是一种非常高效的文件发送方式,因为在操作系统支持的情况下,它可以由操作系统内核直接处理。

有关 从类路径提供文件 的限制或禁用,请参阅相关章节。

socket.sendFile("myfile.dat");

流套接字

NetSocket 的实例也是 ReadStreamWriteStream 实例,因此它们可以用于将数据传输到其他读写流或从其他读写流传输数据。

更多信息请参阅 章节。

将连接升级到 SSL/TLS

非 SSL/TLS 连接可以使用 upgradeToSsl 升级到 SSL/TLS。

服务器或客户端必须配置为 SSL/TLS 才能使其正常工作。更多信息请参阅 SSL/TLS 章节

TCP 优雅关机

您可以关闭 serverclient

调用 shutdown 会启动关机阶段,在此阶段服务器或客户端有机会执行清理操作并处理协议级别的关机。

server
  .shutdown()
  .onSuccess(res -> {
    System.out.println("Server is now closed");
  });

关机将等待所有套接字关闭或关机超时触发。当超时触发时,所有套接字将被强制关闭。

每个已打开的套接字都会收到一个关机事件通知,允许在实际套接字关闭之前执行协议级别的关闭。

socket.shutdownHandler(v -> {
  socket
    // Write close frame
    .write(closeFrame())
    // Wait until we receive the remote close frame
    .compose(success -> closeFrameHandler(socket))
    // Close the socket
    .eventually(() -> socket.close());
});

默认关机超时为 30 秒,您可以覆盖此时间量。

server
  .shutdown(60, TimeUnit.SECONDS)
  .onSuccess(res -> {
    System.out.println("Server is now closed");
  });

TCP 关闭

您可以关闭 serverclient 以立即关闭所有打开的连接并释放所有资源。与 shutdown 不同,这里没有宽限期。

关闭操作实际上是异步的,可能在调用返回后一段时间才能完成。您可以使用返回的 future 来在实际关闭完成时得到通知。

当关闭完全完成时,此 future 会完成。

server
  .close()
  .onSuccess(res -> {
    System.out.println("Server is now closed");
  });

verticle 中的自动清理

如果您是在 Verticle 内部创建 TCP 服务器和客户端,那么这些服务器和客户端将在 Verticle 卸载时自动关闭。

扩展 - 共享 TCP 服务器

任何 TCP 服务器的处理器总是运行在相同的事件循环线程上。

这意味着如果您在多核服务器上运行,并且只部署了这一个实例,那么您的服务器上最多只会利用一个核心。

为了利用服务器的更多核心,您需要部署更多的服务器实例。

您可以在代码中以编程方式实例化更多实例。

class MyVerticle extends VerticleBase {

  NetServer server;

  @Override
  public Future<?> start() {
    server = vertx.createNetServer();
    server.connectHandler(socket -> {
      socket.handler(buffer -> {
        // Just echo back the data
        socket.write(buffer);
      });
    });
    return server.listen(1234, "localhost");
  }
}

// Create a few instances so we can utilise cores
vertx.deployVerticle(MyVerticle.class, new DeploymentOptions().setInstances(10));

一旦您这样做,您会发现回显服务器的功能与之前完全相同,但您的服务器上的所有核心都可以被利用,并且可以处理更多的工作。

此时您可能会问自己:“怎么可能在同一个主机和端口上监听多个服务器?一旦尝试部署多个实例,肯定会遇到端口冲突!”

Vert.x 在这里做了一些巧妙的处理。

当您在与现有服务器相同的主机和端口上部署另一个服务器时,它实际上不会尝试创建监听相同主机/端口的新服务器。

相反,它在内部只维护一个服务器,并且,当传入连接到达时,它会以轮询方式将它们分发给任何连接处理器。

因此,Vert.x TCP 服务器可以在可用核心上进行扩展,而每个实例仍然是单线程的。

创建 TCP 客户端

使用所有默认选项创建 TCP 客户端的最简单方法如下:

NetClient client = vertx.createNetClient();

配置 TCP 客户端

如果您不想要默认配置,可以在创建客户端时传入一个 NetClientOptions 实例来配置它。

NetClientOptions options = new NetClientOptions().setConnectTimeout(10000);
NetClient client = vertx.createNetClient(options);

建立连接

要连接到服务器,您可以使用 connect,指定服务器的端口和主机,以及一个处理程序,当连接成功时,该处理程序将收到包含 NetSocket 的结果;如果连接失败,则收到包含失败信息的结果。

NetClientOptions options = new NetClientOptions().setConnectTimeout(10000);
NetClient client = vertx.createNetClient(options);
client
  .connect(4321, "localhost")
  .onComplete(res -> {
    if (res.succeeded()) {
      System.out.println("Connected!");
      NetSocket socket = res.result();
    } else {
      System.out.println("Failed to connect: " + res.cause().getMessage());
    }
  });

连接到 Unix 域套接字

当运行在 JDK 16+ 或使用 原生传输 时,客户端可以连接到 Unix 域套接字。

NetClient netClient = vertx.createNetClient();

// Only available when running on JDK16+, or using a native transport
SocketAddress addr = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock");

// Connect to the server
netClient
  .connect(addr)
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Connected
    } else {
      // Handle failure
    }
  });

配置连接尝试

客户端可以配置为在无法连接时自动重试连接到服务器。这可以通过 setReconnectIntervalsetReconnectAttempts 进行配置。

目前,如果连接失败,Vert.x 不会尝试重新连接;重新连接尝试和间隔仅适用于创建初始连接。
NetClientOptions options = new NetClientOptions().
  setReconnectAttempts(10).
  setReconnectInterval(500);

NetClient client = vertx.createNetClient(options);

默认情况下,多次连接尝试被禁用。

日志网络活动

出于调试目的,可以记录网络活动。

NetServerOptions options = new NetServerOptions().setLogActivity(true);

NetServer server = vertx.createNetServer(options);

以下是一个简单 HTTP 服务器的输出:

id: 0x359e3df6, L:/127.0.0.1:8080 - R:/127.0.0.1:65351] READ: 78B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a |GET / HTTP/1.1..|
|00000010| 48 6f 73 74 3a 20 6c 6f 63 61 6c 68 6f 73 74 3a |Host: localhost:|
|00000020| 38 30 38 30 0d 0a 55 73 65 72 2d 41 67 65 6e 74 |8080..User-Agent|
|00000030| 3a 20 63 75 72 6c 2f 37 2e 36 34 2e 31 0d 0a 41 |: curl/7.64.1..A|
|00000040| 63 63 65 70 74 3a 20 2a 2f 2a 0d 0a 0d 0a       |ccept: */*....  |
+--------+-------------------------------------------------+----------------+
[id: 0x359e3df6, L:/127.0.0.1:8080 - R:/127.0.0.1:65351] WRITE: 50B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 31 31 0d 0a 0d 0a 48 65 6c 6c 6f 20 57 6f 72 | 11....Hello Wor|
|00000030| 6c 64                                           |ld              |
+--------+-------------------------------------------------+----------------+
[id: 0x359e3df6, L:/127.0.0.1:8080 - R:/127.0.0.1:65351] READ COMPLETE
[id: 0x359e3df6, L:/127.0.0.1:8080 - R:/127.0.0.1:65351] FLUSH

默认情况下,二进制数据以十六进制格式记录。

您可以通过设置日志数据格式,减少数据格式的详细程度,仅打印缓冲区长度而不是整个数据。

NetServerOptions options = new NetServerOptions()
  .setLogActivity(true)
  .setActivityLogDataFormat(ByteBufFormat.SIMPLE);

NetServer server = vertx.createNetServer(options);

以下是使用简单缓冲区格式的相同输出:

[id: 0xda8d41dc, L:/127.0.0.1:8080 - R:/127.0.0.1:65399] READ: 78B
[id: 0xda8d41dc, L:/127.0.0.1:8080 - R:/127.0.0.1:65399] WRITE: 50B
[id: 0xda8d41dc, L:/127.0.0.1:8080 - R:/127.0.0.1:65399] READ COMPLETE
[id: 0xda8d41dc, L:/127.0.0.1:8080 - R:/127.0.0.1:65399] FLUSH
[id: 0xda8d41dc, L:/127.0.0.1:8080 - R:/127.0.0.1:65399] READ COMPLETE
[id: 0xda8d41dc, L:/127.0.0.1:8080 ! R:/127.0.0.1:65399] INACTIVE
[id: 0xda8d41dc, L:/127.0.0.1:8080 ! R:/127.0.0.1:65399] UNREGISTERED

客户端也可以记录网络活动。

NetClientOptions options = new NetClientOptions().setLogActivity(true);

NetClient client = vertx.createNetClient(options);

网络活动由 Netty 以 DEBUG 级别和 io.netty.handler.logging.LoggingHandler 名称记录。使用网络活动日志记录时,需要注意以下几点:

  • 日志记录不是由 Vert.x 日志记录器执行,而是由 Netty 执行。

  • 这不是生产功能。

您应该阅读 Netty 日志记录 部分。

限制 TCP 连接的入站和出站带宽

TCP 服务器(Net/Http)可以通过流量整形选项配置,以启用带宽限制。入站和出站带宽都可以通过 TrafficShapingOptions 进行限制。对于 NetServer,流量整形选项可以通过 NetServerOptions 设置;对于 HttpServer,可以通过 HttpServerOptions 设置。

NetServerOptions options = new NetServerOptions()
  .setHost("localhost")
  .setPort(1234)
  .setTrafficShapingOptions(new TrafficShapingOptions()
    .setInboundGlobalBandwidth(64 * 1024)
    .setOutboundGlobalBandwidth(128 * 1024));

NetServer server = vertx.createNetServer(options);
HttpServerOptions options = new HttpServerOptions()
  .setHost("localhost")
  .setPort(1234)
  .setTrafficShapingOptions(new TrafficShapingOptions()
    .setInboundGlobalBandwidth(64 * 1024)
    .setOutboundGlobalBandwidth(128 * 1024));

HttpServer server = vertx.createHttpServer(options);

这些流量整形选项也可以在服务器启动后动态更新。

NetServerOptions options = new NetServerOptions()
                             .setHost("localhost")
                             .setPort(1234)
                             .setTrafficShapingOptions(new TrafficShapingOptions()
                                                         .setInboundGlobalBandwidth(64 * 1024)
                                                         .setOutboundGlobalBandwidth(128 * 1024));
NetServer server = vertx.createNetServer(options);
TrafficShapingOptions update = new TrafficShapingOptions()
                                 .setInboundGlobalBandwidth(2 * 64 * 1024) // twice
                                 .setOutboundGlobalBandwidth(128 * 1024); // unchanged
server
  .listen(1234, "localhost")
  // wait until traffic shaping handler is created for updates
  .onSuccess(v -> server.updateTrafficShapingOptions(update));
HttpServerOptions options = new HttpServerOptions()
                              .setHost("localhost")
                              .setPort(1234)
                              .setTrafficShapingOptions(new TrafficShapingOptions()
                                                          .setInboundGlobalBandwidth(64 * 1024)
                                                          .setOutboundGlobalBandwidth(128 * 1024));
HttpServer server = vertx.createHttpServer(options);
TrafficShapingOptions update = new TrafficShapingOptions()
                                 .setInboundGlobalBandwidth(2 * 64 * 1024) // twice
                                 .setOutboundGlobalBandwidth(128 * 1024); // unchanged
server
  .listen(1234, "localhost")
  // wait until traffic shaping handler is created for updates
  .onSuccess(v -> server.updateTrafficShapingOptions(update));

配置服务器和客户端以使用 SSL/TLS

TCP 客户端和服务器可以配置为使用 传输层安全 (TLS)——TLS 的早期版本被称为 SSL。

无论是否使用 SSL/TLS,服务器和客户端的 API 都相同,通过配置用于创建服务器或客户端的 NetClientOptionsNetServerOptions 实例即可启用。

在服务器上启用 SSL/TLS

SSL/TLS 通过 ssl 启用。

默认情况下,它是禁用的。

为服务器指定密钥/证书

SSL/TLS 服务器通常向客户端提供证书,以便向客户端验证其身份。

证书/密钥可以通过多种方式为服务器配置:

第一种方法是指定包含证书和私钥的 Java 密钥库的位置。

Java 密钥库可以使用 JDK 附带的 keytool 工具进行管理。

密钥库的密码也应提供。

NetServerOptions options = new NetServerOptions().setSsl(true).setKeyCertOptions(
  new JksOptions().
    setPath("/path/to/your/server-keystore.jks").
    setPassword("password-of-your-keystore")
);
NetServer server = vertx.createNetServer(options);

或者,您可以将密钥库本身读取为缓冲区并直接提供。

Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-keystore.jks");
JksOptions jksOptions = new JksOptions().
  setValue(myKeyStoreAsABuffer).
  setPassword("password-of-your-keystore");
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyCertOptions(jksOptions);
NetServer server = vertx.createNetServer(options);

PKCS#12 格式的密钥/证书(http://en.wikipedia.org/wiki/PKCS_12),通常带有 .pfx.p12 扩展名,也可以像 JKS 密钥库一样加载。

NetServerOptions options = new NetServerOptions().setSsl(true).setKeyCertOptions(
  new PfxOptions().
    setPath("/path/to/your/server-keystore.pfx").
    setPassword("password-of-your-keystore")
);
NetServer server = vertx.createNetServer(options);

也支持缓冲区配置。

Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-keystore.pfx");
PfxOptions pfxOptions = new PfxOptions().
  setValue(myKeyStoreAsABuffer).
  setPassword("password-of-your-keystore");
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyCertOptions(pfxOptions);
NetServer server = vertx.createNetServer(options);

另一种提供服务器私钥和证书的方式是使用单独的 .pem 文件。

NetServerOptions options = new NetServerOptions().setSsl(true).setKeyCertOptions(
  new PemKeyCertOptions().
    setKeyPath("/path/to/your/server-key.pem").
    setCertPath("/path/to/your/server-cert.pem")
);
NetServer server = vertx.createNetServer(options);

也支持缓冲区配置。

Buffer myKeyAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-key.pem");
Buffer myCertAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-cert.pem");
PemKeyCertOptions pemOptions = new PemKeyCertOptions().
  setKeyValue(myKeyAsABuffer).
  setCertValue(myCertAsABuffer);
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyCertOptions(pemOptions);
NetServer server = vertx.createNetServer(options);

Vert.x 支持从 PKCS8 PEM 文件读取未加密的 RSA 和/或 ECC 私钥。RSA 私钥也可以从 PKCS1 PEM 文件读取。X.509 证书可以从包含 RFC 7468,第 5 节 中定义的证书文本编码的 PEM 文件中读取。

请记住,未加密的 PKCS8 或 PKCS1 PEM 文件中包含的密钥可以被任何能够读取该文件的人提取。因此,请确保对此类 PEM 文件施加适当的访问限制,以防止滥用。

最后,您还可以加载通用的 Java 密钥库,这对于使用其他 KeyStore 实现(如 Bouncy Castle)非常有用。

NetServerOptions options = new NetServerOptions().setSsl(true).setKeyCertOptions(
  new KeyStoreOptions().
    setType("BKS").
    setPath("/path/to/your/server-keystore.bks").
    setPassword("password-of-your-keystore")
);
NetServer server = vertx.createNetServer(options);

为服务器指定信任

SSL/TLS 服务器可以使用证书颁发机构来验证客户端的身份。

证书颁发机构可以通过多种方式为服务器配置:

Java 信任库可以使用 JDK 附带的 keytool 工具进行管理。

信任库的密码也应提供。

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setTrustOptions(
    new JksOptions().
      setPath("/path/to/your/truststore.jks").
      setPassword("password-of-your-truststore")
  );
NetServer server = vertx.createNetServer(options);

或者,您可以将信任库本身读取为缓冲区并直接提供。

Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.jks");
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setTrustOptions(
    new JksOptions().
      setValue(myTrustStoreAsABuffer).
      setPassword("password-of-your-truststore")
  );
NetServer server = vertx.createNetServer(options);

PKCS#12 格式的证书颁发机构(http://en.wikipedia.org/wiki/PKCS_12),通常带有 .pfx.p12 扩展名,也可以像 JKS 信任库一样加载。

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setTrustOptions(
    new PfxOptions().
      setPath("/path/to/your/truststore.pfx").
      setPassword("password-of-your-truststore")
  );
NetServer server = vertx.createNetServer(options);

也支持缓冲区配置。

Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.pfx");
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setTrustOptions(
    new PfxOptions().
      setValue(myTrustStoreAsABuffer).
      setPassword("password-of-your-truststore")
  );
NetServer server = vertx.createNetServer(options);

另一种提供服务器证书颁发机构的方式是使用 .pem 文件列表。

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setTrustOptions(
    new PemTrustOptions().
      addCertPath("/path/to/your/server-ca.pem")
  );
NetServer server = vertx.createNetServer(options);

也支持缓冲区配置。

Buffer myCaAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-ca.pfx");
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setTrustOptions(
    new PemTrustOptions().
      addCertValue(myCaAsABuffer)
  );
NetServer server = vertx.createNetServer(options);

在客户端启用 SSL/TLS

Net 客户端也可以轻松配置为使用 SSL。它们在使用 SSL 时与使用标准套接字时具有完全相同的 API。

要在 NetClient 上启用 SSL,需要调用函数 setSSL(true)。

客户端信任配置

如果客户端的 trustALl 设置为 true,则客户端将信任所有服务器证书。连接仍然会被加密,但此模式容易受到“中间人”攻击。即,您无法确定您正在连接的对象。请谨慎使用此功能。默认值为 false。

NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustAll(true);
NetClient client = vertx.createNetClient(options);

如果未设置 trustAll,则必须配置客户端信任库,并且该信任库应包含客户端信任的服务器证书。

默认情况下,客户端 **未** 配置主机验证。这会根据服务器主机名验证服务器证书的 CN 部分,以避免 中间人攻击

您必须在客户端上明确配置它。

  • ""(空字符串)禁用主机验证。

  • "HTTPS" 启用基于 TLS 的 HTTP 验证

  • LDAPS 启用 LDAP v3 扩展的 TLS 验证

NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setHostnameVerificationAlgorithm(verificationAlgorithm);
NetClient client = vertx.createNetClient(options);
Vert.x HTTP 客户端使用 TCP 客户端,并配置 "HTTPS" 作为验证算法。

与服务器配置类似,客户端信任可以以多种方式配置:

第一种方法是指定包含证书颁发机构的 Java 信任库的位置。

它只是一个标准的 Java 密钥库,与服务器端的密钥库相同。客户端信任库位置通过在 jks options 上使用 path 函数进行设置。如果服务器在连接期间提供了一个不在客户端信任库中的证书,连接尝试将不会成功。

NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustOptions(
    new JksOptions().
      setPath("/path/to/your/truststore.jks").
      setPassword("password-of-your-truststore")
  );
NetClient client = vertx.createNetClient(options);

也支持缓冲区配置。

Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.jks");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustOptions(
    new JksOptions().
      setValue(myTrustStoreAsABuffer).
      setPassword("password-of-your-truststore")
  );
NetClient client = vertx.createNetClient(options);

PKCS#12 格式的证书颁发机构(http://en.wikipedia.org/wiki/PKCS_12),通常带有 .pfx.p12 扩展名,也可以像 JKS 信任库一样加载。

NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustOptions(
    new PfxOptions().
      setPath("/path/to/your/truststore.pfx").
      setPassword("password-of-your-truststore")
  );
NetClient client = vertx.createNetClient(options);

也支持缓冲区配置。

Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.pfx");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustOptions(
    new PfxOptions().
      setValue(myTrustStoreAsABuffer).
      setPassword("password-of-your-truststore")
  );
NetClient client = vertx.createNetClient(options);

另一种提供服务器证书颁发机构的方式是使用 .pem 文件列表。

NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustOptions(
    new PemTrustOptions().
      addCertPath("/path/to/your/ca-cert.pem")
  );
NetClient client = vertx.createNetClient(options);

也支持缓冲区配置。

Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/ca-cert.pem");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustOptions(
    new PemTrustOptions().
      addCertValue(myTrustStoreAsABuffer)
  );
NetClient client = vertx.createNetClient(options);

为客户端指定密钥/证书

如果服务器需要客户端认证,则客户端在连接时必须向服务器出示其自身的证书。客户端可以通过多种方式配置:

第一种方法是指定包含密钥和证书的 Java 密钥库的位置。同样,它只是一个普通的 Java 密钥库。客户端密钥库位置通过在 jks options 上使用 path 函数进行设置。

NetClientOptions options = new NetClientOptions().setSsl(true).setKeyCertOptions(
  new JksOptions().
    setPath("/path/to/your/client-keystore.jks").
    setPassword("password-of-your-keystore")
);
NetClient client = vertx.createNetClient(options);

也支持缓冲区配置。

Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-keystore.jks");
JksOptions jksOptions = new JksOptions().
  setValue(myKeyStoreAsABuffer).
  setPassword("password-of-your-keystore");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setKeyCertOptions(jksOptions);
NetClient client = vertx.createNetClient(options);

PKCS#12 格式的密钥/证书(http://en.wikipedia.org/wiki/PKCS_12),通常带有 .pfx.p12 扩展名,也可以像 JKS 密钥库一样加载。

NetClientOptions options = new NetClientOptions().setSsl(true).setKeyCertOptions(
  new PfxOptions().
    setPath("/path/to/your/client-keystore.pfx").
    setPassword("password-of-your-keystore")
);
NetClient client = vertx.createNetClient(options);

也支持缓冲区配置。

Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-keystore.pfx");
PfxOptions pfxOptions = new PfxOptions().
  setValue(myKeyStoreAsABuffer).
  setPassword("password-of-your-keystore");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setKeyCertOptions(pfxOptions);
NetClient client = vertx.createNetClient(options);

另一种提供服务器私钥和证书的方式是使用单独的 .pem 文件。

NetClientOptions options = new NetClientOptions().setSsl(true).setKeyCertOptions(
  new PemKeyCertOptions().
    setKeyPath("/path/to/your/client-key.pem").
    setCertPath("/path/to/your/client-cert.pem")
);
NetClient client = vertx.createNetClient(options);

也支持缓冲区配置。

Buffer myKeyAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-key.pem");
Buffer myCertAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-cert.pem");
PemKeyCertOptions pemOptions = new PemKeyCertOptions().
  setKeyValue(myKeyAsABuffer).
  setCertValue(myCertAsABuffer);
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setKeyCertOptions(pemOptions);
NetClient client = vertx.createNetClient(options);

请记住,在 PEM 配置中,私钥未加密。

更新 SSL/TLS 配置

您可以使用 updateSSLOptions 方法更新 TCP 服务器或客户端上的密钥/证书或信任(例如,实现证书轮换)。

Future<Boolean> fut = server.updateSSLOptions(new ServerSSLOptions()
  .setKeyCertOptions(
    new JksOptions()
      .setPath("/path/to/your/server-keystore.jks").
      setPassword("password-of-your-keystore")));

当更新成功时,将使用新的 SSL 配置,否则保留之前的配置。

选项对象会与现有选项进行比较(使用 equals),以防止当对象相等时进行更新,因为加载选项可能代价高昂。当对象相等时,您可以使用 force 参数强制更新。

用于测试和开发目的的自签名证书

请勿在生产环境中使用此功能,并请注意生成的密钥非常不安全。

通常需要自签名证书,无论是用于单元/集成测试还是用于运行应用程序的开发版本。

SelfSignedCertificate 可用于提供自签名 PEM 证书助手,并提供 KeyCertOptionsTrustOptions 配置。

SelfSignedCertificate certificate = SelfSignedCertificate.create();

NetServerOptions serverOptions = new NetServerOptions()
  .setSsl(true)
  .setKeyCertOptions(certificate.keyCertOptions())
  .setTrustOptions(certificate.trustOptions());

vertx.createNetServer(serverOptions)
  .connectHandler(socket -> socket.end(Buffer.buffer("Hello!")))
  .listen(1234, "localhost");

NetClientOptions clientOptions = new NetClientOptions()
  .setSsl(true)
  .setKeyCertOptions(certificate.keyCertOptions())
  .setTrustOptions(certificate.trustOptions());

NetClient client = vertx.createNetClient(clientOptions);
client
  .connect(1234, "localhost")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      ar.result().handler(buffer -> System.out.println(buffer));
    } else {
      System.err.println("Woops: " + ar.cause().getMessage());
    }
  });

客户端也可以配置为信任所有证书。

NetClientOptions clientOptions = new NetClientOptions()
  .setSsl(true)
  .setTrustAll(true);

请注意,自签名证书也适用于其他 TCP 协议,例如 HTTPS。

SelfSignedCertificate certificate = SelfSignedCertificate.create();

vertx.createHttpServer(new HttpServerOptions()
  .setSsl(true)
  .setKeyCertOptions(certificate.keyCertOptions())
  .setTrustOptions(certificate.trustOptions()))
  .requestHandler(req -> req.response().end("Hello!"))
  .listen(8080);

撤销证书颁发机构

信任可以配置为使用证书吊销列表(CRL)来处理已吊销的、不应再受信任的证书。crlPath 配置要使用的 CRL 列表。

NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustOptions(trustOptions).
  addCrlPath("/path/to/your/crl.pem");
NetClient client = vertx.createNetClient(options);

也支持缓冲区配置。

Buffer myCrlAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/crl.pem");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustOptions(trustOptions).
  addCrlValue(myCrlAsABuffer);
NetClient client = vertx.createNetClient(options);

配置密码套件

默认情况下,TLS 配置将使用 SSL 引擎的密码套件列表。

此密码套件可以配置为一组启用的密码。

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyCertOptions(keyStoreOptions).
  addEnabledCipherSuite("ECDHE-RSA-AES128-GCM-SHA256").
  addEnabledCipherSuite("ECDHE-ECDSA-AES128-GCM-SHA256").
  addEnabledCipherSuite("ECDHE-RSA-AES256-GCM-SHA384").
  addEnabledCipherSuite("CDHE-ECDSA-AES256-GCM-SHA384");
NetServer server = vertx.createNetServer(options);

当启用的密码套件已定义(即非空)时,它优先于 SSL 引擎的默认密码套件。

密码套件可以在 NetServerOptionsNetClientOptions 配置中指定。

配置 TLS 协议版本

默认情况下,默认的 TLS 配置启用以下协议:TLSv1.2 和 TLSv1.3。协议版本可以通过明确添加来启用。

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyCertOptions(keyStoreOptions).
  addEnabledSecureTransportProtocol("TLSv1.1");
NetServer server = vertx.createNetServer(options);

它们也可以被移除。

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyCertOptions(keyStoreOptions).
  removeEnabledSecureTransportProtocol("TLSv1.2");
NetServer server = vertx.createNetServer(options);

协议版本可以在 NetServerOptionsNetClientOptions 配置中指定。

TLS 1.0 (TLSv1) 和 TLS 1.1 (TLSv1.1) 已被广泛弃用,并自 Vert.x 4.4.0 起默认禁用。

SSL 引擎

引擎实现可以配置为使用 OpenSSL 而不是 JDK 实现。在 JDK 8 中开始使用硬件内在函数(CPU 指令)处理 AES 和在 JDK 9 中处理 RSA 之前,OpenSSL 提供了比 JDK 引擎更好的性能和 CPU 使用率。

要使用的引擎选项是:

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyCertOptions(keyStoreOptions);

// Use JDK SSL engine explicitly
options = new NetServerOptions().
  setSsl(true).
  setKeyCertOptions(keyStoreOptions).
  setSslEngineOptions(new JdkSSLEngineOptions());

// Use OpenSSL engine
options = new NetServerOptions().
  setSsl(true).
  setKeyCertOptions(keyStoreOptions).
  setSslEngineOptions(new OpenSSLEngineOptions());

服务器名称指示 (SNI)

服务器名称指示(SNI)是一个 TLS 扩展,客户端通过它指定尝试连接的主机名:在 TLS 握手期间,客户端提供一个服务器名称,服务器可以使用它来响应针对该服务器名称的特定证书,而不是默认部署的证书。如果服务器需要客户端认证,服务器可以根据指示的服务器名称使用特定的受信任 CA 证书。

当 SNI 活跃时,服务器使用:

  • 证书 CN 或 SAN DNS(主题备用名称与 DNS)进行精确匹配,例如 www.example.com

  • 证书 CN 或 SAN DNS 证书以匹配通配符名称,例如 *.example.com

  • 否则,当客户端未提供服务器名称或提供的服务器名称无法匹配时,使用第一个证书。

当服务器额外需要客户端认证时:

  • JksOptionsset on trust options 上设置时,将与信任库别名进行精确匹配。

  • 否则,可用的 CA 证书将以与未启用 SNI 时相同的方式使用。

服务器绑定失败并报告别名不正确。

您可以通过将 setSni 设置为 true 并在服务器上配置多个密钥/证书对来启用 SNI。

Java KeyStore 文件或 PKCS12 文件本身就可以存储多个密钥/证书对。

JksOptions keyCertOptions = new JksOptions().setPath("keystore.jks").setPassword("wibble");

NetServer netServer = vertx.createNetServer(new NetServerOptions()
    .setKeyCertOptions(keyCertOptions)
    .setSsl(true)
    .setSni(true)
);

PemKeyCertOptions 可以配置为包含多个条目。

PemKeyCertOptions keyCertOptions = new PemKeyCertOptions()
    .setKeyPaths(Arrays.asList("default-key.pem", "host1-key.pem", "etc..."))
    .setCertPaths(Arrays.asList("default-cert.pem", "host2-key.pem", "etc...")
    );

NetServer netServer = vertx.createNetServer(new NetServerOptions()
    .setKeyCertOptions(keyCertOptions)
    .setSsl(true)
    .setSni(true)
);

客户端将连接主机隐式地作为 SNI 服务器名称发送给完全限定域名 (FQDN)。

您可以在连接套接字时提供一个显式服务器名称。

NetClient client = vertx.createNetClient(new NetClientOptions()
    .setTrustOptions(trustOptions)
    .setSsl(true)
);

// Connect to 'localhost' and present 'server.name' server name
client
  .connect(1234, "localhost", "server.name")
  .onComplete(res -> {
    if (res.succeeded()) {
      System.out.println("Connected!");
      NetSocket socket = res.result();
    } else {
      System.out.println("Failed to connect: " + res.cause().getMessage());
    }
  });

它可用于不同的目的:

  • 显示与服务器主机不同的服务器名称。

  • 在连接到 IP 时显示服务器名称。

  • 在使用短名称时强制显示服务器名称。

应用层协议协商 (ALPN)

应用层协议协商(ALPN)是一个用于应用层协议协商的 TLS 扩展。它被 HTTP/2 使用:在 TLS 握手期间,客户端提供它接受的应用协议列表,服务器响应它支持的一个协议。

Java TLS 支持 ALPN(Java 8 最新版本)。

OpenSSL ALPN 支持

OpenSSL 也支持(原生)ALPN。

OpenSSL 需要配置 setSslEngineOptions 并在类路径中包含 netty-tcnative jar。使用 tcnative 可能需要根据 tcnative 实现,在您的操作系统上安装 OpenSSL。

为客户端连接使用代理

NetClient 支持 HTTP/1.x CONNECTSOCKS4aSOCKS5 代理。

代理可以在 NetClientOptions 中通过设置一个包含代理类型、主机名、端口以及可选的用户名和密码的 ProxyOptions 对象进行配置。

这是一个示例:

NetClientOptions options = new NetClientOptions()
  .setProxyOptions(new ProxyOptions().setType(ProxyType.SOCKS5)
    .setHost("localhost").setPort(1080)
    .setUsername("username").setPassword("secret"));
NetClient client = vertx.createNetClient(options);

DNS 解析总是在代理服务器上完成,要实现 SOCKS4 客户端的功能,需要在本地解析 DNS 地址。

您可以使用 setNonProxyHosts 配置一个绕过代理的主机列表。该列表接受 * 通配符用于匹配域名。

NetClientOptions options = new NetClientOptions()
  .setProxyOptions(new ProxyOptions().setType(ProxyType.SOCKS5)
    .setHost("localhost").setPort(1080)
    .setUsername("username").setPassword("secret"))
  .addNonProxyHost("*.foo.com")
  .addNonProxyHost("localhost");
NetClient client = vertx.createNetClient(options);

使用 HA PROXY 协议

HA PROXY 协议 提供了一种便捷的方式,可以安全地传输连接信息,例如客户端地址,跨多层 NAT 或 TCP 代理。

HA PROXY 协议可以通过设置选项 setUseProxyProtocol 并在类路径中添加以下依赖来启用:

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-codec-haproxy</artifactId>
  <!--<version>Should align with netty version that Vert.x uses</version>-->
</dependency>
NetServerOptions options = new NetServerOptions().setUseProxyProtocol(true);
NetServer server = vertx.createNetServer(options);
server.connectHandler(so -> {
  // Print the actual client address provided by the HA proxy protocol instead of the proxy address
  System.out.println(so.remoteAddress());

  // Print the address of the proxy
  System.out.println(so.localAddress());
});

编写 HTTP 服务器和客户端

Vert.x 允许您轻松编写非阻塞 HTTP 客户端和服务器。

Vert.x 支持 HTTP/1.0、HTTP/1.1 和 HTTP/2 协议。

HTTP 的基本 API 对于 HTTP/1.x 和 HTTP/2 来说是相同的,特定 API 功能可用于处理 HTTP/2 协议。

创建 HTTP 服务器

使用所有默认选项创建 HTTP 服务器的最简单方法如下:

HttpServer server = vertx.createHttpServer();

配置 HTTP 服务器

如果您不想要默认配置,可以在创建服务器时传入 HttpServerOptions 实例来配置它。

HttpServerOptions options = new HttpServerOptions().setMaxWebSocketFrameSize(1000000);

HttpServer server = vertx.createHttpServer(options);

配置 HTTP/2 服务器

Vert.x 支持基于 TLS 的 HTTP/2 (h2) 和基于 TCP 的 HTTP/2 (h2c)。

  • h2 标识通过 应用层协议协商 (ALPN) 协商的基于 TLS 使用的 HTTP/2 协议。

  • h2c 标识以明文形式通过 TCP 使用的 HTTP/2 协议,此类连接通过 HTTP/1.1 升级请求或直接建立。

要处理 h2 请求,必须启用 TLS 并设置 setUseAlpn

HttpServerOptions options = new HttpServerOptions()
    .setUseAlpn(true)
    .setSsl(true)
    .setKeyCertOptions(new JksOptions().setPath("/path/to/my/keystore"));

HttpServer server = vertx.createHttpServer(options);

ALPN 是一个 TLS 扩展,它在客户端和服务器开始交换数据之前协商协议。

不支持 ALPN 的客户端仍然能够进行“经典”SSL 握手。

ALPN 通常会协商 h2 协议,尽管如果服务器或客户端决定,也可以使用 http/1.1

要处理 h2c 请求,必须禁用 TLS,服务器将把任何想要升级到 HTTP/2 的 HTTP/1.1 请求升级到 HTTP/2。它还将接受以 PRI * HTTP/2.0\r\nSM\r\n 前言开始的直接 h2c 连接。

大多数浏览器不支持 h2c,因此对于提供网站服务,您应该使用 h2 而不是 h2c

当服务器接受 HTTP/2 连接时,它会向客户端发送其 初始设置。这些设置定义了客户端如何使用连接,服务器的默认初始设置为:

配置服务器支持的 HTTP 版本

默认支持的 HTTP 版本取决于服务器配置。

  • 当 TLS 被禁用时:

  • HTTP/1.1, HTTP/1.0

  • isHttp2ClearTextEnabledtrue 时为 HTTP/2。

  • 当 TLS 启用且 ALPN 禁用时:

  • HTTP/1.1 和 HTTP/1.0

  • 当 TLS 启用且 ALPN 启用时:

  • getAlpnVersions 定义的协议:默认是 HTTP/1.1 和 HTTP/2。

如果您想在服务器上禁用 HTTP/2——当 TLS 禁用时,将 setHttp2ClearTextEnabled 设置为 false——当 TLS 启用时——将 (isUseAlpn) 设置为 false——或者从 getAlpnVersions 列表中移除 HTTP/2。

记录网络服务器活动

出于调试目的,可以记录网络活动。

HttpServerOptions options = new HttpServerOptions().setLogActivity(true);

HttpServer server = vertx.createHttpServer(options);

详细说明请参阅 记录网络活动 章节。

启动服务器监听

要告诉服务器监听传入请求,您可以使用 listen 的其中一个替代方法。

告诉服务器监听选项中指定的主机和端口:

HttpServer server = vertx.createHttpServer();
server.listen();

或者在调用 listen 时指定主机和端口,忽略选项中配置的值:

HttpServer server = vertx.createHttpServer();
server.listen(8080, "myhost.com");

默认主机是 0.0.0.0,表示“监听所有可用地址”,默认端口是 80

实际绑定是异步的,因此服务器可能在调用 listen 返回后一段时间才真正开始监听。

如果你想在服务器实际开始监听时收到通知,可以为 listen 调用提供一个处理器。例如:

HttpServer server = vertx.createHttpServer();
server
  .listen(8080, "myhost.com")
  .onComplete(res -> {
    if (res.succeeded()) {
      System.out.println("Server is now listening!");
    } else {
      System.out.println("Failed to bind!");
    }
  });

监听 Unix 域套接字

当在 JDK 16+ 上运行,或者使用原生传输时,服务器可以监听 Unix 域套接字:

HttpServer httpServer = vertx.createHttpServer();

// Only available when running on JDK16+, or using a native transport
SocketAddress address = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock");

httpServer
  .requestHandler(req -> {
    // Handle application
  })
  .listen(address)
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Bound to socket
    } else {
      // Handle failure
    }
  });

获取传入请求通知

要在请求到达时得到通知,您需要设置一个 requestHandler

HttpServer server = vertx.createHttpServer();
server.requestHandler(request -> {
  // Handle the request in here
});

处理请求

当请求到达时,请求处理器被调用,并传入一个 HttpServerRequest 实例。此对象代表服务器端的 HTTP 请求。

当请求头已完全读取时,此处理程序将被调用。

如果请求包含正文,则该正文将在请求处理程序被调用后一段时间到达服务器。

服务器请求对象允许您检索 uripathparamsheaders 等信息。

每个服务器请求对象都关联一个服务器响应对象。您可以使用 response 获取 HttpServerResponse 对象的引用。

这是一个服务器处理请求并回复“hello world”的简单示例。

vertx.createHttpServer().requestHandler(request -> {
  request.response().end("Hello world");
}).listen(8080);

请求版本

可以使用 version 检索请求中指定的 HTTP 版本。

请求方法

使用 method 检索请求的 HTTP 方法。(即是 GET、POST、PUT、DELETE、HEAD、OPTIONS 等)。

请求 URI

使用 uri 检索请求的 URI。

请注意,这是 HTTP 请求中实际传递的 URI,它几乎总是相对 URI。

URI 的定义参见 HTTP 规范第 5.1.2 节 - 请求 URI

请求路径

使用 path 返回 URI 的路径部分。

例如,如果请求 URI 是 `a/b/c/page.html?param1=abc&param2=xyz

那么路径将是 /a/b/c/page.html

请求查询

使用 query 返回 URI 的查询部分。

例如,如果请求 URI 是 a/b/c/page.html?param1=abc&param2=xyz

那么查询将是 param1=abc&param2=xyz

请求头

使用 headers 返回 HTTP 请求的头部。

这会返回一个 MultiMap 实例——它类似于普通的 Map 或 Hash,但允许同一个键有多个值——这是因为 HTTP 允许同一个键有多个头部值。

它还具有不区分大小写的键,这意味着您可以执行以下操作:

MultiMap headers = request.headers();

// Get the User-Agent:
System.out.println("User agent is " + headers.get("user-agent"));

// You can also do this and get the same result:
System.out.println("User agent is " + headers.get("User-Agent"));

请求权限

使用 authority 返回 HTTP 请求的权限。

对于 HTTP/1.x 请求,返回 host 头;对于 HTTP/1 请求,返回 :authority 伪头。

请求参数

使用 params 返回 HTTP 请求的参数。

就像 headers 一样,这也会返回一个 MultiMap 实例,因为可以有多个同名参数。

请求参数在请求 URI 上发送,位于路径之后。例如,如果 URI 是 /page.html?param1=abc&param2=xyz

那么参数将包含以下内容:

param1: 'abc'
param2: 'xyz

请注意,这些请求参数是从请求的 URL 中检索的。如果您有作为 multi-part/form-data 请求正文提交的 HTML 表单的一部分发送的表单属性,那么它们将不会出现在这里的参数中。

远程地址

可以使用 remoteAddress 检索请求发送者的地址。

绝对 URI

HTTP 请求中传递的 URI 通常是相对的。如果您希望检索与请求对应的绝对 URI,可以使用 absoluteURI 获取它。

结束处理程序

当整个请求(包括任何正文)已完全读取时,请求的 endHandler 将被调用。

从请求正文读取数据

通常 HTTP 请求包含我们想要读取的正文。如前所述,请求处理程序在请求头刚到达时被调用,因此此时请求对象没有正文。

这是因为正文可能非常大(例如文件上传),我们通常不希望在将整个正文交给您之前将其全部缓存在内存中,因为这可能导致服务器耗尽可用内存。

要接收正文,您可以使用请求上的 handler,每次请求正文的一部分到达时都会调用它。这是一个示例:

request.handler(buffer -> {
  System.out.println("I have received a chunk of the body of length " + buffer.length());
});

传递给处理程序的对象是 Buffer,并且根据正文的大小,处理程序可以随着数据从网络到达而被多次调用。

在某些情况下(例如,如果正文很小),您会希望将整个正文聚合到内存中,您可以自己进行聚合,如下所示:

Buffer totalBuffer = Buffer.buffer();

request.handler(buffer -> {
  System.out.println("I have received a chunk of the body of length " + buffer.length());
  totalBuffer.appendBuffer(buffer);
});

request.endHandler(v -> {
  System.out.println("Full body received, length = " + totalBuffer.length());
});

这是一种非常常见的情况,Vert.x 提供了一个 bodyHandler 来为您完成此操作。当所有正文都已接收到时,此 body handler 将被调用一次。

request.bodyHandler(totalBuffer -> {
  System.out.println("Full body received, length = " + totalBuffer.length());
});

流式请求

请求对象是一个 ReadStream,因此您可以将请求正文通过管道传输到任何 WriteStream 实例。

详细说明请参阅 章节。

处理 HTML 表单

HTML 表单可以通过 application/x-www-form-urlencodedmultipart/form-data 两种内容类型提交。

对于 URL 编码的表单,表单属性被编码在 URL 中,就像普通的查询参数一样。

对于多部分表单,它们被编码在请求正文中,因此在整个正文从网络读取之前是不可用的。

多部分表单也可以包含文件上传。

如果您想检索多部分表单的属性,您应该在读取任何正文 **之前**,通过调用 setExpectMultipart 并将其设置为 true 来告诉 Vert.x 您期望接收此类表单,然后,在整个正文读取完毕后,您应该使用 formAttributes 检索实际属性。

server.requestHandler(request -> {
  request.setExpectMultipart(true);
  request.endHandler(v -> {
    // The body has now been fully read, so retrieve the form attributes
    MultiMap formAttributes = request.formAttributes();
  });
});

表单属性的最大大小为 8192 字节。当客户端提交的表单属性大小大于此值时,文件上传会在 HttpServerRequest 异常处理程序上触发异常。您可以使用 setMaxFormAttributeSize 设置不同的最大大小。

处理表单文件上传

Vert.x 还可以处理编码在多部分请求正文中的文件上传。

要接收文件上传,您需要告诉 Vert.x 期望一个多部分表单,并在请求上设置一个 uploadHandler

每当服务器收到一个上传文件时,此处理程序将被调用一次。

传递给处理程序的对象是 HttpServerFileUpload 实例。

server.requestHandler(request -> {
  request.setExpectMultipart(true);
  request.uploadHandler(upload -> {
    System.out.println("Got a file upload " + upload.name());
  });
});

文件上传可能很大,我们不会在一个单独的缓冲区中提供整个上传,因为那可能导致内存耗尽;相反,上传数据以块的形式接收。

request.uploadHandler(upload -> {
  upload.handler(chunk -> {
    System.out.println("Received a chunk of the upload of length " + chunk.length());
  });
});

上传对象是一个 ReadStream,因此您可以将请求正文通过管道传输到任何 WriteStream 实例。详细说明请参阅 章节。

如果您只是想将文件上传到磁盘上的某个位置,可以使用 streamToFileSystem

request.uploadHandler(upload -> {
  upload.streamToFileSystem("myuploads_directory/" + upload.filename());
});
请确保在生产系统中检查文件名,以避免恶意客户端将文件上传到文件系统上的任意位置。更多信息请参阅 安全注意事项

处理 Cookie

您可以使用 getCookie 按名称检索 Cookie,或使用 cookies 检索所有 Cookie。

要删除 Cookie,请使用 removeCookie

要添加 Cookie,请使用 addCookie

当响应头被写入时,Cookie 集将自动写入响应中,以便浏览器可以存储它们。

Cookie 由 Cookie 实例描述。这允许您检索名称、值、域、路径和其他正常的 Cookie 属性。

Same Site Cookies 让服务器要求 Cookie 不应随跨站点请求(站点由可注册域定义)发送,这提供了一些针对跨站请求伪造攻击的保护。这类 Cookie 使用 Setter 启用:setSameSite

Same site cookies 可以有以下 3 个值之一:

  • None - 浏览器将同时发送跨站请求和同站请求的 Cookie。

  • Strict - 浏览器将仅为同站请求(源自设置 Cookie 的站点的请求)发送 Cookie。如果请求源自与当前位置 URL 不同的 URL,则任何带有 Strict 属性的 Cookie 都不会被包含。

  • Lax - 同站 Cookie 在跨站子请求(例如加载图像或框架的调用)中被保留,但在用户从外部站点导航到 URL 时(例如通过点击链接)将被发送。

以下是查询和添加 Cookie 的示例:

Cookie someCookie = request.getCookie("mycookie");
String cookieValue = someCookie.getValue();

// Do something with cookie...

// Add a cookie - this will get written back in the response automatically
request.response().addCookie(Cookie.cookie("othercookie", "somevalue"));

处理压缩正文

Vert.x 可以处理客户端使用 deflategzipsnappybrotli 算法编码的压缩正文负载。

要在创建服务器时启用解压缩,请在选项上设置 setDecompressionSupported

Snappy 无需外部依赖即可支持。

您需要将 Brotli4j 放在类路径中以解压 Brotli,以及 Zstd-jni 用于 Zstandard。

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>com.aayushatharva.brotli4j</groupId>
  <artifactId>brotli4j</artifactId>
  <version>${brotli4j.version}</version>
</dependency>
<dependency>
  <groupId>com.github.luben</groupId>
  <artifactId>zstd-jni</artifactId>
  <version>${zstd-jini.version}</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  implementation 'com.aayushatharva.brotli4j:brotli4j:${brotli4j.version}'
  runtimeOnly 'com.aayushatharva.brotli4j:native-$system-and-arch:${brotli4j.version}'
  implementation 'com.github.luben:zstd-jni:${zstd-jini.version}'
}

使用 Gradle 时,您需要根据您的操作系统和架构手动添加运行时原生库。更多详细信息请参阅 Brotli4j 的 Gradle 部分

默认情况下,解压缩是禁用的。

接收自定义 HTTP/2 帧

HTTP/2 是一个带帧协议,其中包含用于 HTTP 请求/响应模型的各种帧。该协议允许发送和接收其他类型的帧。

要接收自定义帧,您可以使用请求上的 customFrameHandler,每次自定义帧到达时都会调用它。这是一个示例:

request.customFrameHandler(frame -> {

  System.out.println("Received a frame type=" + frame.type() +
      " payload" + frame.payload().toString());
});

HTTP/2 帧不受流控制——无论请求是否暂停,自定义帧一旦接收到就会立即调用帧处理程序。

发送响应

服务器响应对象是 HttpServerResponse 的实例,通过 response 从请求中获取。

您使用响应对象将响应写回 HTTP 客户端。

设置状态码和消息

响应的默认 HTTP 状态码是 200,表示 OK

使用 setStatusCode 设置不同的代码。

您还可以使用 setStatusMessage 指定自定义状态消息。

如果您未指定状态消息,则将使用与状态码对应的默认消息。

对于 HTTP/2,响应中不会出现状态,因为协议不会将消息传输给客户端。

写入 HTTP 响应

要向 HTTP 响应写入数据,您可以使用其中一个 write 操作。

这些操作可以在响应结束之前多次调用。它们可以通过几种方式调用:

带单个缓冲区。

HttpServerResponse response = request.response();
response.write(buffer);

带字符串。在这种情况下,字符串将使用 UTF-8 编码,结果将写入网络。

HttpServerResponse response = request.response();
response.write("hello world!");

带字符串和编码。在这种情况下,字符串将使用指定的编码进行编码,结果将写入网络。

HttpServerResponse response = request.response();
response.write("hello world!", "UTF-16");

写入响应是异步的,并且在写入排队后总是立即返回。

如果您只是将单个字符串或缓冲区写入 HTTP 响应,您可以一次性写入并结束响应,调用 end

第一次调用 write 会导致响应头被写入响应。因此,如果您不使用 HTTP 分块传输,则必须在写入响应之前设置 Content-Length 头,否则就太迟了。如果您使用 HTTP 分块传输,则无需担心。

结束 HTTP 响应

完成 HTTP 响应后,您应该 end 它。

这可以通过几种方式完成:

不带参数时,响应简单地结束。

HttpServerResponse response = request.response();
response.write("hello world!");
response.end();

它也可以像调用 write 一样,带一个字符串或缓冲区参数。在这种情况下,它与调用带字符串或缓冲区的 write 后再调用不带参数的 end 完全相同。例如:

HttpServerResponse response = request.response();
response.end("hello world!");

关闭底层 TCP 连接

您可以使用 close 关闭底层 TCP 连接。

非保持活动连接将在响应结束时由 Vert.x 自动关闭。

默认情况下,Vert.x 不会自动关闭保持活动连接。如果您希望保持活动连接在空闲时间后关闭,则需要配置 setIdleTimeout

HTTP/2 连接在关闭响应之前会发送一个 GOAWAY 帧。

设置响应头

HTTP 响应头可以通过直接添加到 headers 来添加到响应中。

HttpServerResponse response = request.response();
MultiMap headers = response.headers();
headers.set("content-type", "text/html");
headers.set("other-header", "wibble");

或者您可以使用 putHeader

HttpServerResponse response = request.response();
response.putHeader("content-type", "text/html").putHeader("other-header", "wibble");

所有头部必须在写入响应正文的任何部分之前添加。

分块 HTTP 响应和尾部

Vert.x 支持 HTTP 分块传输编码

这允许 HTTP 响应正文以块的形式写入,通常用于将大型响应正文流式传输到客户端且预先不知道总大小时。

您可以通过以下方式将 HTTP 响应设置为分块模式:

HttpServerResponse response = request.response();
response.setChunked(true);

默认情况下是非分块模式。在分块模式下,每次调用 write 方法之一都会写入一个新的 HTTP 块。

在分块模式下,您还可以将 HTTP 响应尾部写入响应。这些实际上是在响应的最后一个块中写入的。

分块响应对 HTTP/2 流没有影响。

要将尾部添加到响应中,请直接将它们添加到 trailers

HttpServerResponse response = request.response();
response.setChunked(true);
MultiMap trailers = response.trailers();
trailers.set("X-wibble", "woobble").set("X-quux", "flooble");

或者使用 putTrailer

HttpServerResponse response = request.response();
response.setChunked(true);
response.putTrailer("X-wibble", "woobble").putTrailer("X-quux", "flooble");

直接从磁盘或类路径提供文件

如果您正在编写一个 Web 服务器,从磁盘提供文件的一种方法是将其作为 AsyncFile 打开并将其传输到 HTTP 响应中。

或者您可以使用 readFile 一次性加载它,并将其直接写入响应。

另外,Vert.x 提供了一种方法,允许您一次性从磁盘或文件系统向 HTTP 响应提供文件。如果底层操作系统支持,这可能导致操作系统直接将文件中的字节传输到套接字,而无需经过用户空间复制。

这是通过使用 sendFile 完成的,通常对于大文件效率更高,但对于小文件可能较慢。

这是一个非常简单的 Web 服务器,它使用 sendFile 从文件系统提供文件。

vertx.createHttpServer().requestHandler(request -> {
  String file = "";
  if (request.path().equals("/")) {
    file = "index.html";
  } else if (!request.path().contains("..")) {
    file = request.path();
  }
  request.response().sendFile("web/" + file);
}).listen(8080);

当文件扩展名被 MimeMapping 熟知时(查找不区分大小写),HTTP 响应会使用文件扩展名来设置 HTTP 响应内容类型头。

发送文件是异步的,可能在调用返回后一段时间才能完成。如果您希望在文件写入完成后得到通知,可以使用 sendFile

有关类路径解析的限制或禁用,请参阅 从类路径提供文件 章节。

如果您在使用 HTTPS 的同时使用 sendFile,数据将通过用户空间复制,因为如果内核直接将数据从磁盘复制到套接字,我们将没有机会应用任何加密。
如果您要直接使用 Vert.x 编写 Web 服务器,请注意用户不能利用路径访问您不想提供服务的目录之外的文件或类路径。使用 Vert.x Web 可能会更安全。

当需要仅提供文件的一部分时,例如从给定字节开始,您可以通过以下方式实现:

vertx.createHttpServer().requestHandler(request -> {
  long offset = 0;
  try {
    offset = Long.parseLong(request.getParam("start"));
  } catch (NumberFormatException e) {
    // error handling...
  }

  long end = Long.MAX_VALUE;
  try {
    end = Long.parseLong(request.getParam("end"));
  } catch (NumberFormatException e) {
    // error handling...
  }

  request.response().sendFile("web/mybigfile.txt", offset, end);
}).listen(8080);

如果您想从某个偏移量开始发送文件直到末尾,则无需提供长度,在这种情况下,您只需执行:

vertx.createHttpServer().requestHandler(request -> {
  long offset = 0;
  try {
    offset = Long.parseLong(request.getParam("start"));
  } catch (NumberFormatException e) {
    // error handling...
  }

  request.response().sendFile("web/mybigfile.txt", offset);
}).listen(8080);

管道响应

服务器响应是一个 WriteStream,因此您可以将其从任何 ReadStream(例如 AsyncFileNetSocketWebSocketHttpServerRequest)管道传输到它。

这是一个示例,它将任何 PUT 方法的请求正文回显到响应中。它使用管道处理正文,因此即使 HTTP 请求正文远大于内存一次所能容纳的大小,它也能正常工作。

vertx.createHttpServer().requestHandler(request -> {
  HttpServerResponse response = request.response();
  if (request.method() == HttpMethod.PUT) {
    response.setChunked(true);
    request.pipeTo(response);
  } else {
    response.setStatusCode(400).end();
  }
}).listen(8080);

您还可以使用 send 方法发送 ReadStream

发送流是一个管道操作,但是由于这是 HttpServerResponse 的一个方法,当未设置 content-length 时,它还会负责对响应进行分块。

vertx.createHttpServer().requestHandler(request -> {
  HttpServerResponse response = request.response();
  if (request.method() == HttpMethod.PUT) {
    response.send(request);
  } else {
    response.setStatusCode(400).end();
  }
}).listen(8080);

写入 HTTP/2 帧

HTTP/2 是一个带帧协议,其中包含用于 HTTP 请求/响应模型的各种帧。该协议允许发送和接收其他类型的帧。

要发送此类帧,您可以使用响应上的 writeCustomFrame。这是一个示例:

int frameType = 40;
int frameStatus = 10;
Buffer payload = Buffer.buffer("some data");

// Sending a frame to the client
response.writeCustomFrame(frameType, frameStatus, payload);

这些帧会立即发送,不受流量控制——当发送此类帧时,它可能在其他 DATA 帧之前完成。

流重置

HTTP/1.x 不允许干净地重置请求或响应流,例如当客户端上传一个服务器上已存在的资源时,服务器需要接受整个响应。

HTTP/2 支持在请求/响应过程中的任何时候重置流。

request.response().reset();

默认情况下,发送 NO_ERROR (0) 错误代码,也可以发送其他代码。

request.response().reset(8);

HTTP/2 规范定义了可以使用的 错误代码 列表。

请求处理程序通过 request handlerresponse handler 接收流重置事件通知。

request.response().exceptionHandler(err -> {
  if (err instanceof StreamResetException) {
    StreamResetException reset = (StreamResetException) err;
    System.out.println("Stream reset " + reset.getCode());
  }
});

服务器推送

服务器推送是 HTTP/2 的一项新功能,它允许针对单个客户端请求并行发送多个响应。

当服务器处理请求时,它可以向客户端推送请求/响应。

HttpServerResponse response = request.response();

// Push main.js to the client
response
  .push(HttpMethod.GET, "/main.js")
  .onComplete(ar -> {

    if (ar.succeeded()) {

      // The server is ready to push the response
      HttpServerResponse pushedResponse = ar.result();

      // Send main.js response
      pushedResponse.
        putHeader("content-type", "application/json").
        end("alert(\"Push response hello\")");
    } else {
      System.out.println("Could not push client resource " + ar.cause());
    }
  });

// Send the requested resource
response.sendFile("<html><head><script src=\"/main.js\"></script></head><body></body></html>");

当服务器准备好推送响应时,将调用推送响应处理程序,并且该处理程序可以发送响应。

推送响应处理程序可能会收到失败,例如,客户端可能取消推送,因为它已经在缓存中拥有 main.js 并且不再需要它。

push 方法必须在初始响应结束之前调用,但是推送的响应可以在之后写入。

异常处理

您可以设置 exceptionHandler 来接收在连接传递给 requestHandlerwebSocketHandler 之前发生的任何异常,例如在 TLS 握手期间。

处理无效请求

Vert.x 将处理无效的 HTTP 请求,并提供一个默认的处理程序,该处理程序将适当处理常见情况,例如,当请求头过长时,它会响应 REQUEST_HEADER_FIELDS_TOO_LARGE

您可以设置自己的 invalidRequestHandler 来处理无效请求。您的实现可以处理特定情况,并将其他情况委托给 HttpServerRequest.DEFAULT_INVALID_REQUEST_HANDLER

HTTP 压缩

Vert.x 开箱即用地支持 HTTP 压缩。

这意味着您能够自动压缩响应正文,然后再将其发送回客户端。

如果客户端不支持 HTTP 压缩,则响应将不压缩正文直接发送回。

这允许同时处理支持 HTTP 压缩和不支持 HTTP 压缩的客户端。

要启用压缩,您可以使用 setCompressionSupported 进行配置。

默认情况下,压缩未启用。

当启用 HTTP 压缩时,服务器将检查客户端是否包含 Accept-Encoding 头,该头包含支持的压缩算法。常用的是 deflate 和 gzip。Vert.x 都支持。

如果找到这样的头部,服务器将自动使用其中一种支持的压缩算法压缩响应正文,并将其发送回客户端。

无论何时需要不压缩地发送响应,您都可以将头部 content-encoding 设置为 identity

request.response()
  .putHeader(HttpHeaders.CONTENT_ENCODING, HttpHeaders.IDENTITY)
  .sendFile("/path/to/image.jpg");

请注意,压缩可能会减少网络流量,但会更消耗 CPU。

为了解决后者的问题,Vert.x 允许您调整 gzip/deflate 压缩算法的原生“压缩级别”参数,并设置压缩的最小响应内容大小阈值。

压缩级别允许在结果数据的压缩比和压缩/解压缩操作的计算成本方面配置 gzip/deflate 算法。

压缩级别是一个介于“1”到“9”之间的整数值,其中“1”表示较低的压缩比但最快的算法,“9”表示可用的最大压缩比但较慢的算法。

使用高于 1-2 的压缩级别通常只能节省少量字节——增益不是线性的,并且取决于要压缩的具体数据——但它会给服务器在生成压缩响应数据时带来不可忽视的 CPU 周期成本(请注意,目前 Vert.x 不支持任何形式的压缩响应数据缓存,即使是静态文件,因此压缩是在每次请求正文生成时即时完成的),同样,它也会影响客户端在解码(解压)接收到的响应时的操作,随着级别增加,操作的 CPU 密集度也会增加。

默认情况下——如果通过 setCompressionSupported 启用压缩——Vert.x 将使用“6”作为压缩级别,但参数可以通过 setCompressionLevel 进行配置,以适应任何情况。

在某些大小阈值以下,压缩响应可能没有意义,因为 CPU 和节省的网络字节之间的权衡不划算。压缩的最小响应内容大小阈值可以通过 setCompressionContentSizeThreshold 配置。例如,如果设置为“100”,则小于 100 字节的响应将不被压缩。默认情况下,它为“0”,这意味着所有内容都可以被压缩。

HTTP 压缩算法

Vert.x 开箱即用地支持 deflate 和 gzip。

也可以使用 Brotli、snappy 和 zstandard。

new HttpServerOptions()
  .addCompressor(io.netty.handler.codec.compression.StandardCompressionOptions.gzip())
  .addCompressor(io.netty.handler.codec.compression.StandardCompressionOptions.deflate())
  .addCompressor(io.netty.handler.codec.compression.StandardCompressionOptions.brotli())
  .addCompressor(io.netty.handler.codec.compression.StandardCompressionOptions.zstd());
使用 StandardCompressionOptions 静态方法创建 CompressionOptions

Brotli 和 zstandard 库需要添加到类路径中,snappy 默认提供。

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>com.aayushatharva.brotli4j</groupId>
  <artifactId>brotli4j</artifactId>
  <version>${brotli4j.version}</version>
</dependency>
<dependency>
  <groupId>com.github.luben</groupId>
  <artifactId>zstd-jni</artifactId>
  <version>${zstd-jini.version}</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  implementation 'com.aayushatharva.brotli4j:brotli4j:${brotli4j.version}'
  runtimeOnly 'com.aayushatharva.brotli4j:native-$system-and-arch:${brotli4j.version}'
  implementation 'com.github.luben:zstd-jni:${zstd-jini.version}'
}

使用 Gradle 时,您需要根据您的操作系统和架构手动添加运行时原生库。更多详细信息请参阅 Brotli4j 的 Gradle 部分

您可以根据需要配置压缩器。

GzipOptions gzip = StandardCompressionOptions.gzip(6, 15, 8);

创建 HTTP 客户端

您可以通过以下方式创建带默认选项的 HttpClient 实例:

HttpClientAgent client = vertx.createHttpClient();

如果您想为客户端配置选项,请按如下方式创建它:

HttpClientOptions options = new HttpClientOptions().setKeepAlive(false);
HttpClientAgent client = vertx.createHttpClient(options);

Vert.x 支持基于 TLS 的 HTTP/2 (h2) 和基于 TCP 的 HTTP/2 (h2c)。

默认情况下,HTTP 客户端执行 HTTP/1.1 请求;要执行 HTTP/2 请求,必须将 setProtocolVersion 设置为 HTTP_2

对于 h2 请求,必须启用 TLS 和 应用层协议协商

HttpClientOptions options = new HttpClientOptions().
    setProtocolVersion(HttpVersion.HTTP_2).
    setSsl(true).
    setUseAlpn(true).
    setTrustAll(true);

HttpClient client = vertx.createHttpClient(options);

对于 h2c 请求,必须禁用 TLS,客户端将执行 HTTP/1.1 请求并尝试升级到 HTTP/2。

HttpClientOptions options = new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2);

HttpClient client = vertx.createHttpClient(options);

h2c 连接也可以直接建立,即连接以预知方式启动,当 setHttp2ClearTextUpgrade 选项设置为 false 时:连接建立后,客户端将发送 HTTP/2 连接前言,并期望从服务器接收相同的前言。

HTTP 服务器可能不支持 HTTP/2,实际版本可以在响应到达时通过 version 进行检查。

当客户端连接到 HTTP/2 服务器时,它会向服务器发送其 初始设置。这些设置定义了服务器如何使用连接,客户端的默认初始设置是 HTTP/2 RFC 定义的默认值。

连接到 Unix 域套接字

当运行在 JDK 16+ 或使用 原生传输 时,客户端可以连接到 Unix 域套接字。

HttpClient httpClient = vertx.createHttpClient();

// Only available when running on JDK16+, or using a native transport
SocketAddress addr = SocketAddress.domainSocketAddress("/var/tmp/myservice.sock");

// Send request to the server
httpClient.request(new RequestOptions()
  .setServer(addr)
  .setHost("localhost")
  .setPort(8080)
  .setURI("/"))
  .compose(request -> request.send().compose(HttpClientResponse::body))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Process response
    } else {
      // Handle failure
    }
  });

池配置

出于性能考虑,客户端在与 HTTP/1.1 服务器交互时使用连接池。该池为每个服务器最多创建 5 个连接。您可以像这样覆盖池配置:

PoolOptions options = new PoolOptions().setHttp1MaxSize(10);
HttpClientAgent client = vertx.createHttpClient(options);

您可以配置各种池 options,如下所示:

  • options#setHttp1MaxSize 每个 HTTP/1.x 服务器的最大打开连接数(默认为 5)。

  • options#setHttp2MaxSize 每个 HTTP/2 服务器的最大打开连接数(默认为 1),您 **不应** 更改此值,因为单个 HTTP/2 连接能够提供与多个 HTTP/1.x 连接相同的性能水平。

  • options#setCleanerPeriod 池检查过期连接的周期(默认为 1 秒)。

  • options#setEventLoopSize 设置池使用的事件循环数量(默认为 0)。

  • 值为 0 配置池使用调用者的事件循环。

  • 正值配置池通过由该值确定的事件循环列表来负载均衡连接的创建。

  • options#setMaxWaitQueueSize 等待连接可用的 HTTP 请求的最大数量,当队列满时,请求将被拒绝。

记录网络客户端活动

出于调试目的,可以记录网络活动。

HttpClientOptions options = new HttpClientOptions().setLogActivity(true);
HttpClientAgent client = vertx.createHttpClient(options);

详细说明请参阅 记录网络活动 章节。

高级 HTTP 客户端创建

您可以将选项传递给 createHttpClient 方法来配置 HTTP 客户端。

或者,您可以使用构建器 API 构建客户端。

HttpClientAgent build = vertx
  .httpClientBuilder()
  .with(options)
  .build();

除了 HttpClientOptionsPoolOptions,您还可以设置:

  • 当客户端 连接 到服务器时通知的连接事件处理程序。

  • 用于实现替代 HTTP 重定向 行为的重定向处理程序。

发出请求

HTTP 客户端非常灵活,您可以通过多种方式发出请求。

发出请求的第一步是获取到远程服务器的 HTTP 连接。

client
  .request(HttpMethod.GET, 8080, "myserver.mycompany.com", "/some-uri")
  .onComplete(ar1 -> {
    if (ar1.succeeded()) {
      // Connected to the server
    }
  });

客户端将连接到远程服务器或重用客户端连接池中可用的连接。

默认主机和端口

通常您希望使用 HTTP 客户端向同一个主机/端口发出许多请求。为了避免每次发出请求都重复主机/端口,您可以为客户端配置默认主机/端口。

HttpClientOptions options = new HttpClientOptions().setDefaultHost("wibble.com");

// Can also set default port if you want...
HttpClientAgent client = vertx.createHttpClient(options);
client
  .request(HttpMethod.GET, "/some-uri")
  .onComplete(ar1 -> {
    if (ar1.succeeded()) {
      HttpClientRequest request = ar1.result();
      request
        .send()
        .onComplete(ar2 -> {
          if (ar2.succeeded()) {
            HttpClientResponse response = ar2.result();
            System.out.println("Received response with status code " + response.statusCode());
          }
        });
    }
  });

写入请求头

您可以使用 HttpHeaders 如下所示向请求写入头部:

HttpClientAgent client = vertx.createHttpClient();

// Write some headers using the headers multi-map
MultiMap headers = HttpHeaders.set("content-type", "application/json").set("other-header", "foo");

client
  .request(HttpMethod.GET, "some-uri")
  .onComplete(ar1 -> {
    if (ar1.succeeded()) {
      if (ar1.succeeded()) {
        HttpClientRequest request = ar1.result();
        request.headers().addAll(headers);
        request
          .send()
          .onComplete(ar2 -> {
            HttpClientResponse response = ar2.result();
            System.out.println("Received response with status code " + response.statusCode());
          });
      }
    }
  });

头部是 MultiMap 的一个实例,它提供了添加、设置和删除条目的操作。HTTP 头部允许一个特定键有多个值。

您也可以使用 putHeader 写入头部。

request.putHeader("content-type", "application/json")
       .putHeader("other-header", "foo");

如果您希望向请求写入头部,则必须在写入请求正文的任何部分之前完成此操作。

写入请求和处理响应

HttpClientRequestrequest 方法连接到远程服务器或重用现有连接。获取的请求实例预填充了一些数据,例如主机或请求 URI,但您需要将此请求发送到服务器。

您可以调用 send 来发送请求,例如 HTTP GET,并处理异步的 HttpClientResponse

client
  .request(HttpMethod.GET, 8080, "myserver.mycompany.com", "/some-uri")
  .onComplete(ar1 -> {
    if (ar1.succeeded()) {
      HttpClientRequest request = ar1.result();

      // Send the request and process the response
      request
        .send()
        .onComplete(ar -> {
          if (ar.succeeded()) {
            HttpClientResponse response = ar.result();
            System.out.println("Received response with status code " + response.statusCode());
          } else {
            System.out.println("Something went wrong " + ar.cause().getMessage());
          }
        });
    }
  });

您也可以发送带请求体的请求。

使用字符串调用 send 时,如果 Content-Length 头部之前未设置,将为您设置。

client
  .request(HttpMethod.GET, 8080, "myserver.mycompany.com", "/some-uri")
  .onComplete(ar1 -> {
    if (ar1.succeeded()) {
      HttpClientRequest request = ar1.result();

      // Send the request and process the response
      request
        .send("Hello World")
        .onComplete(ar -> {
          if (ar.succeeded()) {
            HttpClientResponse response = ar.result();
            System.out.println("Received response with status code " + response.statusCode());
          } else {
            System.out.println("Something went wrong " + ar.cause().getMessage());
          }
        });
    }
  });

使用缓冲区调用 send 时,如果 Content-Length 头部之前未设置,将为您设置。

request
  .send(Buffer.buffer("Hello World"))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      HttpClientResponse response = ar.result();
      System.out.println("Received response with status code " + response.statusCode());
    } else {
      System.out.println("Something went wrong " + ar.cause().getMessage());
    }
  });

使用流调用 send 时,如果 Content-Length 头部之前未设置,请求将以分块 Content-Encoding 方式发送。

request
  .putHeader(HttpHeaders.CONTENT_LENGTH, "1000")
  .send(stream)
  .onComplete(ar -> {
    if (ar.succeeded()) {
      HttpClientResponse response = ar.result();
      System.out.println("Received response with status code " + response.statusCode());
    } else {
      System.out.println("Something went wrong " + ar.cause().getMessage());
    }
  });

流式请求体

send 方法一次性发送请求。

有时您可能希望对如何写入请求体进行底层控制。

HttpClientRequest 可用于写入请求体。

以下是一些写入带请求体的 POST 请求的示例。

HttpClientAgent client = vertx.createHttpClient();

client.request(HttpMethod.POST, "some-uri")
  .onSuccess(request -> {
    request.response().onSuccess(response -> {
      System.out.println("Received response with status code " + response.statusCode());
    });

    // Now do stuff with the request
    request.putHeader("content-length", "1000");
    request.putHeader("content-type", "text/plain");
    request.write(body);

    // Make sure the request is ended when you're done with it
    request.end();
});

存在以 UTF-8 编码写入字符串、以任何特定编码写入字符串以及写入缓冲区的方法。

request.write("some data");

// Write string encoded in specific encoding
request.write("some other data", "UTF-16");

// Write a buffer
Buffer buffer = Buffer.buffer();
buffer.appendInt(123).appendLong(245l);
request.write(buffer);

如果您只是向 HTTP 请求写入单个字符串或缓冲区,您可以在一次 end 函数调用中完成写入并结束请求。

request.end("some simple data");

// Write buffer and end the request (send it) in a single call
Buffer buffer = Buffer.buffer().appendDouble(12.34d).appendLong(432l);
request.end(buffer);

当您向请求写入时,首次调用 write 将导致请求头部被写入网络。

实际写入是异步的,并且可能在调用返回后的一段时间才发生。

不使用分块的 HTTP 请求(带有请求体)需要提供 Content-Length 头部。

因此,如果您不使用分块 HTTP,则必须在写入请求之前设置 Content-Length 头部,否则将为时已晚。

如果您调用接受字符串或缓冲区的 end 方法之一,Vert.x 将在写入请求体之前自动计算并设置 Content-Length 头部。

如果您使用 HTTP 分块,则不需要 Content-Length 头部,因此您无需预先计算大小。

结束流式 HTTP 请求

一旦您完成了 HTTP 请求,您必须使用 end 操作之一来结束它。

结束请求会导致所有头部被写入(如果它们尚未写入),并将请求标记为完成。

请求可以通过几种方式结束。不带参数时,请求将直接结束。

request.end();

或者可以在调用 end 时提供一个字符串或缓冲区。这就像在不带参数调用 end 之前,先用该字符串或缓冲区调用 write

request.end("some-data");

// End it with a buffer
Buffer buffer = Buffer.buffer().appendFloat(12.3f).appendInt(321);
request.end(buffer);

将请求用作流

HttpClientRequest 实例也是一个 WriteStream 实例。

您可以从任何 ReadStream 实例向其传输数据。

例如,您可以将磁盘上的文件传输到 HTTP 请求体,如下所示。

request.setChunked(true);
file.pipeTo(request);

分块 HTTP 请求

Vert.x 支持请求的 HTTP 分块传输编码

这允许 HTTP 请求体以分块方式写入,通常在向服务器流式传输大型请求体且其大小未知时使用。

您可以使用 setChunked 将 HTTP 请求设置为分块模式。

在分块模式下,每次调用 write 都会导致一个新的数据块被写入网络。在分块模式下,无需预先设置请求的 Content-Length

request.setChunked(true);

// Write some chunks
for (int i = 0; i < 10; i++) {
  request.write("this-is-chunk-" + i);
}

request.end();

表单提交

您可以使用 send 变体发送 HTTP 表单提交体。

ClientForm form = ClientForm.form();
form.attribute("firstName", "Dale");
form.attribute("lastName", "Cooper");

// Submit the form as a form URL encoded body
request
  .send(form)
  .onSuccess(res -> {
    // OK
  });

默认情况下,表单以 application/x-www-form-urlencoded 内容类型头部提交。您可以将其 content-type 头部设置为 multipart/form-data

ClientForm form = ClientForm.form();
form.attribute("firstName", "Dale");
form.attribute("lastName", "Cooper");

// Submit the form as a multipart form body
request
  .putHeader("content-type", "multipart/form-data")
  .send(form)
  .onSuccess(res -> {
    // OK
  });

如果您想上传文件并发送属性,您可以改为创建一个 ClientMultipartForm

ClientMultipartForm form = ClientMultipartForm.multipartForm()
  .attribute("imageDescription", "a very nice image")
  .binaryFileUpload(
    "imageFile",
    "image.jpg",
    "/path/to/image",
    "image/jpeg");

// Submit the form as a multipart form body
request
  .send(form)
  .onSuccess(res -> {
    // OK
  });

请求超时

您可以使用 setIdleTimeoutidleTimeout 设置空闲超时,以防止您的应用程序被无响应的服务器阻塞。当请求在超时期间内未返回任何数据时,将抛出异常导致结果失败,并且请求将被重置。

Future<Buffer> fut = client
  .request(new RequestOptions()
    .setHost(host)
    .setPort(port)
    .setURI(uri)
    .setIdleTimeout(timeoutMS))
  .compose(request -> request.send().compose(HttpClientResponse::body));
超时在 HttpClientRequest 可用时开始,这意味着已从连接池中获取了连接。

您可以设置连接超时,以防止您的应用程序被无响应的繁忙客户端连接池阻塞。如果在超时延迟之前未能获取连接,Future<HttpClientRequest> 将失败。

连接超时选项与 TCP setConnectTimeout 选项无关。当对池化的 HTTP 客户端发出请求时,该超时适用于从连接池获取连接以服务请求的持续时间。超时可能因为服务器未及时响应或连接池过于繁忙而触发。

您可以使用 setTimeout 配置两种超时。

Future<Buffer> fut = client
  .request(new RequestOptions()
    .setHost(host)
    .setPort(port)
    .setURI(uri)
    .setTimeout(timeoutMS))
  .compose(request -> request.send().compose(HttpClientResponse::body));

写入 HTTP/2 帧

HTTP/2 是一个带帧协议,其中包含用于 HTTP 请求/响应模型的各种帧。该协议允许发送和接收其他类型的帧。

要发送此类帧,您可以使用请求上的 write 方法。示例如下:

int frameType = 40;
int frameStatus = 10;
Buffer payload = Buffer.buffer("some data");

// Sending a frame to the server
request.writeCustomFrame(frameType, frameStatus, payload);

流重置

HTTP/1.x 不允许干净地重置请求或响应流,例如当客户端上传一个服务器上已存在的资源时,服务器需要接受整个响应。

HTTP/2 支持在请求/响应过程中的任何时候重置流。

request.reset();

默认情况下发送 NO_ERROR (0) 错误代码,也可以发送其他代码。

request.reset(8);

HTTP/2 规范定义了可以使用的 错误代码 列表。

请求处理器会收到流重置事件的通知,通过 请求处理器响应处理器

request.exceptionHandler(err -> {
  if (err instanceof StreamResetException) {
    StreamResetException reset = (StreamResetException) err;
    System.out.println("Stream reset " + reset.getCode());
  }
});

HTTP/2 RST 洪水保护

HTTP/2 服务器受到 RST 洪水 DDoS 攻击的保护 (CVE-2023-44487):服务器在时间窗口内可接收的 RST 帧数量存在上限。默认配置将上限设置为 200,持续时间为 30 秒。

您可以使用 setHttp2RstFloodMaxRstFramePerWindowsetHttp2RstFloodWindowDuration 来覆盖这些设置。

处理 HTTP 响应

您会收到一个 HttpClientResponse 实例,传入到您在请求方法中指定的处理器中,或者通过直接在 HttpClientRequest 对象上设置处理器。

您可以使用 statusCodestatusMessage 查询响应的状态码和状态消息。

request
  .send()
  .onComplete(ar2 -> {
    if (ar2.succeeded()) {

      HttpClientResponse response = ar2.result();

      // the status code - e.g. 200 or 404
      System.out.println("Status code is " + response.statusCode());

      // the status message e.g. "OK" or "Not Found".
      System.out.println("Status message is " + response.statusMessage());
    }
  });

将响应用作流

HttpClientResponse 实例也是一个 ReadStream,这意味着您可以将其传输到任何 WriteStream 实例。

响应头部和尾部

HTTP 响应可以包含头部。使用 headers 获取头部。

返回的对象是 MultiMap 类型,因为 HTTP 头部可以包含单个键的多个值。

String contentType = response.headers().get("content-type");
String contentLength = response.headers().get("content-lengh");

分块 HTTP 响应也可以包含尾部(trailers)——这些在响应体的最后一个块中发送。

您使用 trailers 获取尾部。尾部也是 MultiMap 类型。

读取响应体

当响应头部从网络中读取完毕时,将调用响应处理器。

如果响应有请求体,它可能在头部读取完毕后一段时间内分多部分到达。我们不会等待整个请求体到达后再调用响应处理器,因为响应可能非常大,我们可能会等待很长时间,或者对于大型响应会耗尽内存。

随着响应体部分的到达,将使用表示请求体片段的 Buffer 调用 handler

client
  .request(HttpMethod.GET, "some-uri")
  .onComplete(ar1 -> {

    if (ar1.succeeded()) {
      HttpClientRequest request = ar1.result();
      request
        .send()
        .onComplete(ar2 -> {
          HttpClientResponse response = ar2.result();
          response.handler(buffer -> {
            System.out.println("Received a part of the response body: " + buffer);
          });
        });
    }
  });

如果您知道响应体不是很大,并且希望在处理它之前将其全部聚合到内存中,您可以自己聚合它。

request
  .send()
  .onComplete(ar2 -> {

    if (ar2.succeeded()) {

      HttpClientResponse response = ar2.result();

      // Create an empty buffer
      Buffer totalBuffer = Buffer.buffer();

      response.handler(buffer -> {
        System.out.println("Received a part of the response body: " + buffer.length());

        totalBuffer.appendBuffer(buffer);
      });

      response.endHandler(v -> {
        // Now all the body has been read
        System.out.println("Total response body length is " + totalBuffer.length());
      });
    }
  });

或者您可以使用便捷的 body 方法,当响应完全读取后,该方法将使用整个响应体进行调用。

request
  .send()
  .onComplete(ar1 -> {

    if (ar1.succeeded()) {
      HttpClientResponse response = ar1.result();
      response
        .body()
        .onComplete(ar2 -> {

          if (ar2.succeeded()) {
            Buffer body = ar2.result();
            // Now all the body has been read
            System.out.println("Total response body length is " + body.length());
          }
        });
    }
  });

响应结束处理器

响应 endHandler 在整个响应体读取完毕后调用,或者在读取头部并调用响应处理器后立即调用(如果不存在请求体)。

请求和响应流组合

客户端接口非常简单,并遵循此模式:

  1. request 连接

  2. 向服务器 sendwrite/end 请求

  3. 处理 HttpClientResponse 的开始部分

  4. 处理响应事件

您可以使用 Vert.x 的 Future 组合方法来简化代码,然而,该 API 是事件驱动的,您需要理解它,否则可能会遇到数据竞争(即丢失事件导致数据损坏)。

如果此客户端对于您的用例来说级别太低,您可以考虑使用 Vert.x Web 客户端 作为更高级别的 API 替代方案(实际上它构建在此客户端之上)。

客户端 API 有意不返回 Future<HttpClientResponse>,因为在事件循环之外设置 Future 的完成处理器可能会导致竞争条件。

请求/响应组合中的潜在竞争条件
Future<HttpClientResponse> get = client.get("some-uri");

// Assuming we have a client that returns a future response
// assuming this is *not* on the event-loop
// introduce a potential data race for the sake of this example
Thread.sleep(100);

get.onSuccess(response -> {

  // Response events might have happen already
  response
    .body()
    .onComplete(ar -> {

    });
});

HttpClientRequest 的使用限制在 Verticle 内部是最简单的解决方案,因为 Verticle 将确保事件按顺序处理,从而避免竞争条件。

vertx.deployVerticle(() -> new AbstractVerticle() {
  @Override
  public void start() {

    HttpClient client = vertx.createHttpClient();

    Future<HttpClientRequest> future = client.request(HttpMethod.GET, "some-uri");
  }
}, new DeploymentOptions());

当您可能在 Verticle 之外与客户端交互时,只要您不延迟响应事件(例如,直接在事件循环上处理响应),就可以安全地执行组合操作。

Future<JsonObject> future = client
  .request(HttpMethod.GET, "some-uri")
  .compose(request -> request
    .send()
    .compose(response -> {
      // Process the response on the event-loop which guarantees no races
      if (response.statusCode() == 200 &&
          response.getHeader(HttpHeaders.CONTENT_TYPE).equals("application/json")) {
        return response
          .body()
          .map(buffer -> buffer.toJsonObject());
      } else {
        return Future.failedFuture("Incorrect HTTP response");
      }
    }));

// Listen to the composed final json result
future.onSuccess(json -> {
  System.out.println("Received json result " + json);
}).onFailure(err -> {
  System.out.println("Something went wrong " + err.getMessage());
});

您还可以使用HTTP 响应预期来保护响应体。

Future<JsonObject> future = client
  .request(HttpMethod.GET, "some-uri")
  .compose(request -> request
    .send()
    .expecting(HttpResponseExpectation.SC_OK.and(HttpResponseExpectation.JSON))
    .compose(response -> response
      .body()
      .map(buffer -> buffer.toJsonObject())));
// Listen to the composed final json result
future.onSuccess(json -> {
  System.out.println("Received json result " + json);
}).onFailure(err -> {
  System.out.println("Something went wrong " + err.getMessage());
});

如果您需要延迟响应处理,则需要 pause 响应或使用 pipe,当涉及另一个异步操作时,这可能是必要的。

Future<Void> future = client
  .request(HttpMethod.GET, "some-uri")
  .compose(request -> request
    .send()
    .compose(response -> {
      // Process the response on the event-loop which guarantees no races
      if (response.statusCode() == 200) {

        // Create a pipe, this pauses the response
        Pipe<Buffer> pipe = response.pipe();

        // Write the file on the disk
        return fileSystem
          .open("/some/large/file", new OpenOptions().setWrite(true))
          .onFailure(err -> pipe.close())
          .compose(file -> pipe.to(file));
      } else {
        return Future.failedFuture("Incorrect HTTP response");
      }
    }));

响应预期

如上所述,您必须在收到响应后手动执行完整性检查。

您可以使用 *响应预期* 来换取清晰和简洁性,牺牲一定的灵活性。

Response expectations 可以在响应不符合某个条件时保护控制流。

HTTP 客户端提供了一组开箱即用的谓词供您使用。

Future<Buffer> fut = client
  .request(options)
  .compose(request -> request
    .send()
    .expecting(HttpResponseExpectation.SC_SUCCESS)
    .compose(response -> response.body()));

当现有谓词不满足您的需求时,您还可以创建自定义谓词。

HttpResponseExpectation methodsPredicate =
  resp -> {
    String methods = resp.getHeader("Access-Control-Allow-Methods");
    return methods != null && methods.contains("POST");
  };

// Send pre-flight CORS request
client
  .request(new RequestOptions()
    .setMethod(HttpMethod.OPTIONS)
    .setPort(8080)
    .setHost("myserver.mycompany.com")
    .setURI("/some-uri")
    .putHeader("Origin", "Server-b.com")
    .putHeader("Access-Control-Request-Method", "POST"))
  .compose(request -> request
    .send()
    .expecting(methodsPredicate))
  .onSuccess(res -> {
    // Process the POST request now
  })
  .onFailure(err ->
    System.out.println("Something went wrong " + err.getMessage()));

预定义预期

为方便起见,HTTP 客户端提供了一些常用谓词。

对于状态码,例如 HttpResponseExpectation.SC_SUCCESS 用于验证响应是否具有 2xx 代码,您也可以创建自定义状态码。

client
  .request(options)
  .compose(request -> request
    .send()
    .expecting(HttpResponseExpectation.status(200, 202)))
  .onSuccess(res -> {
    // ....
  });

对于内容类型,例如 HttpResponseExpectation.JSON 用于验证响应体是否包含 JSON 数据,您也可以创建自定义内容类型。

client
  .request(options)
  .compose(request -> request
    .send()
    .expecting(HttpResponseExpectation.contentType("some/content-type")))
  .onSuccess(res -> {
    // ....
  });

有关预定义预期的完整列表,请参阅 HttpResponseExpectation 文档。

创建自定义失败

默认情况下,预期(包括预定义的预期)会传递一个简单的错误消息。您可以通过更改错误转换器来自定义异常类。

Expectation<HttpResponseHead> expectation = HttpResponseExpectation.SC_SUCCESS
  .wrappingFailure((resp, err) -> new MyCustomException(resp.statusCode(), err.getMessage()));
在 Java 中创建异常在捕获堆栈跟踪时可能会产生性能开销,因此您可能希望创建不捕获堆栈跟踪的异常。默认情况下,异常将使用不捕获堆栈跟踪的异常进行报告。

从响应中读取 Cookie

您可以使用 cookies 从响应中检索 Cookie 列表。

或者,您也可以自行解析响应中的 Set-Cookie 头部。

30x 重定向处理

当客户端收到以下响应时,可以配置客户端遵循 Location 响应头部提供的 HTTP 重定向:

  • 301302307308 状态码以及 HTTP GET 或 HEAD 方法

  • 303 状态码,此外,重定向请求执行 HTTP GET 方法。

这是一个示例:

client
  .request(HttpMethod.GET, "some-uri")
  .onComplete(ar1 -> {
    if (ar1.succeeded()) {

      HttpClientRequest request = ar1.result();
      request.setFollowRedirects(true);
      request
        .send()
        .onComplete(ar2 -> {
          if (ar2.succeeded()) {

            HttpClientResponse response = ar2.result();
            System.out.println("Received response with status code " + response.statusCode());
          }
        });
    }
  });

最大重定向次数默认为 16,可以通 setMaxRedirects 进行更改。

HttpClientAgent client = vertx.createHttpClient(
    new HttpClientOptions()
        .setMaxRedirects(32));

client
  .request(HttpMethod.GET, "some-uri")
  .onComplete(ar1 -> {
    if (ar1.succeeded()) {

      HttpClientRequest request = ar1.result();
      request.setFollowRedirects(true);
      request
        .send()
        .onComplete(ar2 -> {
          if (ar2.succeeded()) {

            HttpClientResponse response = ar2.result();
            System.out.println("Received response with status code " + response.statusCode());
          }
        });
    }
  });

一种尺寸不适合所有情况,默认重定向策略可能不适合您的需求。

默认重定向策略可以通过自定义实现进行更改。

HttpClientAgent client = vertx.httpClientBuilder()
  .withRedirectHandler(response -> {

    // Only follow 301 code
    if (response.statusCode() == 301 && response.getHeader("Location") != null) {

      // Compute the redirect URI
      String absoluteURI = resolveURI(response.request().absoluteURI(), response.getHeader("Location"));

      // Create a new ready to use request that the client will use
      return Future.succeededFuture(new RequestOptions().setAbsoluteURI(absoluteURI));
    }

    // We don't redirect
    return null;
  })
  .build();

该策略处理收到的原始 HttpClientResponse,并返回 nullFuture<HttpClientRequest>

  • 当返回 null 时,处理原始响应。

  • 当返回 Future 时,请求将在其成功完成后发送。

  • 当返回 Future 时,请求上设置的异常处理器在其失败时被调用。

返回的请求必须是未发送的,这样原始请求处理器可以被发送,并且客户端可以在之后发送它。

大部分原始请求设置将传播到新请求:

  • 请求头部,除非您已设置了一些头部

  • 请求体,除非返回的请求使用 GET 方法

  • 响应处理器

  • 请求异常处理器

  • 请求超时

100-Continue 处理

根据 HTTP 1.1 规范,客户端可以设置 Expect: 100-Continue 头部,并在发送请求体的其余部分之前发送请求头部。

服务器随后可以发送一个临时响应状态 Status: 100 (Continue),以告知客户端可以发送请求体的其余部分。

这里的想法是允许服务器在发送大量数据之前授权并接受/拒绝请求。如果请求可能不被接受,发送大量数据会浪费带宽并占用服务器读取最终将被丢弃的数据。

Vert.x 允许您在客户端请求对象上设置 continueHandler

如果服务器返回 Status: 100 (Continue) 响应以表明可以发送请求的其余部分,此处理器将被调用。

这与 sendHead 结合使用以发送请求头部。

这是一个示例:

client.request(HttpMethod.PUT, "some-uri")
  .onSuccess(request -> {
    request.response().onSuccess(response -> {
      System.out.println("Received response with status code " + response.statusCode());
    });

    request.putHeader("Expect", "100-Continue");

    request.continueHandler(v -> {
      // OK to send rest of body
      request.write("Some data");
      request.write("Some more data");
      request.end();
    });

    request.sendHead();
});

在服务器端,Vert.x HTTP 服务器可以配置为在收到 Expect: 100-Continue 头部时自动发送 100 Continue 临时响应。

这是通过设置 setHandle100ContinueAutomatically 选项来完成的。

如果您更喜欢手动决定是否发送继续响应,则应将此属性设置为 false(默认值),然后您可以检查头部并调用 writeContinue 让客户端继续发送请求体。

httpServer.requestHandler(request -> {
  if (request.getHeader("Expect").equalsIgnoreCase("100-Continue")) {

    // Send a 100 continue response
    request.response().writeContinue();

    // The client should send the body when it receives the 100 response
    request.bodyHandler(body -> {
      // Do something with body
    });

    request.endHandler(v -> {
      request.response().end();
    });
  }
});

您也可以直接发送失败状态码来拒绝请求:在这种情况下,请求体应被忽略或连接应被关闭(100-Continue 是一个性能提示,不能作为逻辑协议约束)。

httpServer.requestHandler(request -> {
  if (request.getHeader("Expect").equalsIgnoreCase("100-Continue")) {

    //
    boolean rejectAndClose = true;
    if (rejectAndClose) {

      // Reject with a failure code and close the connection
      // this is probably best with persistent connection
      request.response()
          .setStatusCode(405)
          .putHeader("Connection", "close")
          .end();
    } else {

      // Reject with a failure code and ignore the body
      // this may be appropriate if the body is small
      request.response()
          .setStatusCode(405)
          .end();
    }
  }
});

创建 HTTP 隧道

HTTP 隧道可以通过 connect 创建。

client.request(HttpMethod.CONNECT, "some-uri")
  .onSuccess(request -> {

    // Connect to the server
    request
      .connect()
      .onComplete(ar -> {
        if (ar.succeeded()) {
          HttpClientResponse response = ar.result();

          if (response.statusCode() != 200) {
            // Connect failed for some reason
          } else {
            // Tunnel created, raw buffers are transmitted on the wire
            NetSocket socket = response.netSocket();
          }
        }
      });
});

HTTP 响应头部收到后,将调用处理器,套接字将准备好进行隧道传输,并发送和接收缓冲区。

connect 的工作方式类似于 send,但它会重新配置传输以交换原始缓冲区。

客户端推送

服务器推送是 HTTP/2 的一项新功能,它允许针对单个客户端请求并行发送多个响应。

可以在请求上设置一个推送处理器,以接收服务器推送的请求/响应。

client.request(HttpMethod.GET, "/index.html")
  .onSuccess(request -> {

    request
      .response().onComplete(response -> {
        // Process index.html response
      });

    // Set a push handler to be aware of any resource pushed by the server
    request.pushHandler(pushedRequest -> {

      // A resource is pushed for this request
      System.out.println("Server pushed " + pushedRequest.path());

      // Set an handler for the response
      pushedRequest.response().onComplete(pushedResponse -> {
        System.out.println("The response for the pushed request");
      });
    });

    // End the request
    request.end();
});

如果客户端不想接收推送请求,它可以重置流。

request.pushHandler(pushedRequest -> {
  if (pushedRequest.path().equals("/main.js")) {
    pushedRequest.reset();
  } else {
    // Handle it
  }
});

当没有设置处理器时,任何被推送的流将由客户端自动取消,并带有一个流重置(错误代码 8)。

接收自定义 HTTP/2 帧

HTTP/2 是一个带帧协议,其中包含用于 HTTP 请求/响应模型的各种帧。该协议允许发送和接收其他类型的帧。

要接收自定义帧,您可以使用请求上的 customFrameHandler,每当自定义帧到达时,此处理器都会被调用。示例如下:

response.customFrameHandler(frame -> {

  System.out.println("Received a frame type=" + frame.type() +
      " payload" + frame.payload().toString());
});

在客户端启用压缩

HTTP 客户端开箱即用地支持 HTTP 压缩。

这意味着客户端可以告知远程 HTTP 服务器它支持压缩,并能够处理压缩的响应体。

HTTP 服务器可以自由选择使用受支持的压缩算法之一进行压缩,或者完全不压缩地发送响应体。因此,这只是一个 HTTP 服务器可以随意忽略的提示。

为了告知 HTTP 服务器客户端支持哪些压缩,它将在 Accept-Encoding 头部中包含支持的压缩算法作为值。支持多种压缩算法。在 Vert.x 中,这将导致添加以下头部:

Accept-Encoding: gzip, deflate

服务器将从这些算法中选择一个。您可以通过检查服务器返回响应中的 Content-Encoding 头部来检测服务器是否压缩了请求体。

如果响应体通过 gzip 压缩,例如,它将包含以下头部:

Content-Encoding: gzip

要启用压缩,请在创建客户端时使用的选项上设置 setDecompressionSupported

默认情况下,压缩是禁用的。

客户端负载均衡

默认情况下,当客户端将主机名解析为多个 IP 地址列表时,客户端使用返回的第一个 IP 地址。

HTTP 客户端可以配置为执行客户端负载均衡。

HttpClientAgent client = vertx
  .httpClientBuilder()
  .withLoadBalancer(LoadBalancer.ROUND_ROBIN)
  .build();

Vert.x 开箱即用地提供了几种可用的负载均衡策略:

大多数负载均衡策略都非常直观。

基于哈希的路由可以通过 LoadBalancer.CONSISTENT_HASHING 策略实现。

HttpClientAgent client = vertx
  .httpClientBuilder()
  .withLoadBalancer(LoadBalancer.ROUND_ROBIN)
  .build();

默认的一致性哈希策略为每个服务器使用 4 个虚拟节点,并在没有路由键的情况下使用随机策略。

您可以创建最符合您需求的策略配置。

LoadBalancer loadBalancer = LoadBalancer.consistentHashing(10, LoadBalancer.POWER_OF_TWO_CHOICES);

也可以使用自定义负载均衡策略。

LoadBalancer loadBalancer = endpoints -> {
  // Returns an endpoint selector for the given endpoints
  // a selector is a stateful view of the provided immutable list of endpoints
  return () -> indexOfEndpoint(endpoints);
};

HttpClientAgent client = vertx
  .httpClientBuilder()
  .withLoadBalancer(loadBalancer)
  .build();

HTTP/1.x 连接池和 Keep-Alive

HTTP Keep-Alive 允许 HTTP 连接用于多个请求。当您向同一服务器发出多个请求时,这可以更有效地利用连接。

对于 HTTP/1.x 版本,HTTP 客户端支持连接池,允许您在请求之间重用连接。

为了使连接池正常工作,在配置客户端时使用的选项上,必须通过 setKeepAlive 将 Keep-Alive 设置为 `true`。默认值为 `true`。

当 Keep-Alive 启用时,Vert.x 将为每个发送的 HTTP/1.0 请求添加 Connection: Keep-Alive 头部。当 Keep-Alive 禁用时,Vert.x 将为每个发送的 HTTP/1.1 请求添加 Connection: Close 头部,以表示连接将在响应完成后关闭。

**每个服务器**的最大连接池数量是使用 setHttp1MaxSize 配置的。

当启用连接池发出请求时,如果该服务器已创建的连接数小于最大连接数,Vert.x 将创建一个新连接,否则它会将请求添加到队列中。

Keep-Alive 连接将在超时后由客户端自动关闭。服务器可以使用 keep-alive 头部指定超时。

keep-alive: timeout=30

您可以使用 setKeepAliveTimeout 设置默认超时——在此超时时间内未使用的任何连接都将关闭。请注意,超时值以秒为单位,而不是毫秒。

HTTP/1.1 流水线

客户端还支持在同一连接上进行请求流水线。

流水线意味着在收到前一个请求的响应之前,在同一连接上发送另一个请求。流水线不适用于所有请求。

要启用流水线,必须使用 setPipelining 启用它。默认情况下,流水线是禁用的。

启用流水线时,请求将写入连接,而无需等待先前的响应返回。

单个连接上的流水线请求数量受 setPipeliningLimit 限制。此选项定义了发送到服务器并等待响应的最大 HTTP 请求数。此限制确保了客户端请求在连接到同一服务器的连接上的公平分布。

HTTP/2 多路复用

HTTP/2 提倡对服务器使用单个连接,默认情况下,HTTP 客户端为每个服务器使用单个连接,到同一服务器的所有流都通过同一连接进行多路复用。

当客户端需要使用多个连接并使用连接池时,应使用 setHttp2MaxSize

当需要限制每个连接的多路复用流数量并使用连接池而不是单个连接时,可以使用 setHttp2MultiplexingLimit

HttpClient client = vertx.createHttpClient(
  new HttpClientOptions().setHttp2MultiplexingLimit(10),
  new PoolOptions().setHttp2MaxSize(3)
);

连接的多路复用限制是客户端设置的一个配置,它限制了单个连接的流数量。如果服务器使用 SETTINGS_MAX_CONCURRENT_STREAMS 设置了较低的限制,则实际值可能更低。

HTTP/2 连接不会由客户端自动关闭。要关闭它们,您可以调用 close 或关闭客户端实例。

另外,您可以使用 setIdleTimeout 设置空闲超时——在此超时时间内未使用的任何连接都将关闭。请注意,空闲超时值以秒为单位,而不是毫秒。

未池化的客户端连接

大多数 HTTP 交互都使用 `HttpClientAgent` 请求/响应 API 执行:客户端从其连接池中获取连接以执行请求。

另外,您可以直接连接到服务器(绕过连接池)并获取 HTTP 客户端连接。

HttpConnectOptions connectOptions = new HttpConnectOptions()
  .setHost("example.com")
  .setPort(80);

Future<HttpClientConnection> fut = client.connect(connectOptions);
connection
  .request()
  .onSuccess(request -> {
    request.setMethod(HttpMethod.GET);
    request.setURI("/some-uri");
    Future<HttpClientResponse> response = request.send();
  });

客户端连接可以处理一定数量的并发请求。当达到最大连接数时,任何后续请求都将排队,直到有可用槽位。

HTTP 连接

HttpConnection 提供了用于处理 HTTP 连接事件、生命周期和设置的 API。

HTTP/2 完全实现了 HttpConnection API。

HTTP/1.x 部分实现了 HttpConnection API:只实现了关闭操作、关闭处理器和异常处理器。此协议未提供其他操作的语义。

服务器连接

connection 方法返回服务器上的请求连接。

HttpConnection connection = request.connection();

可以在服务器上设置连接处理器,以便在有任何传入连接时收到通知。

HttpServer server = vertx.createHttpServer(http2Options);

server.connectionHandler(connection -> {
  System.out.println("A client connected");
});

客户端连接

connection 方法返回客户端上的请求连接。

HttpConnection connection = request.connection();

可以在客户端构建器上设置连接处理器,以便在连接建立时收到通知。

vertx
  .httpClientBuilder()
  .with(options)
  .withConnectHandler(connection -> {
    System.out.println("Connected to the server");
  })
  .build();

连接设置

HTTP/2 的配置由 Http2Settings 数据对象配置。

每个端点都必须遵守连接另一端发送的设置。

建立连接时,客户端和服务器交换初始设置。初始设置通过客户端的 setInitialSettings 和服务器的 setInitialSettings 进行配置。

连接建立后,设置可以随时更改。

connection.updateSettings(new Http2Settings().setMaxConcurrentStreams(100));

由于远程端在收到设置更新时应发送确认,因此可以提供一个回调函数,以便在收到确认时得到通知。

connection
  .updateSettings(new Http2Settings().setMaxConcurrentStreams(100))
  .onSuccess(v -> System.out.println("The settings update has been acknowledged "));

相反,当收到新的远程设置时,remoteSettingsHandler 会收到通知。

connection.remoteSettingsHandler(settings -> {
  System.out.println("Received new settings");
});
这仅适用于 HTTP/2 协议。

连接 Ping

HTTP/2 连接 Ping 对于确定连接往返时间或检查连接有效性很有用:ping 向远程端点发送 PING 帧。

Buffer data = Buffer.buffer();
for (byte i = 0;i < 8;i++) {
  data.appendByte(i);
}
connection
  .ping(data)
  .onSuccess(pong -> System.out.println("Remote side replied"));

当收到 PING 帧时,Vert.x 会自动发送确认;可以设置一个处理器,以便在收到每个 Ping 时得到通知。

connection.pingHandler(ping -> {
  System.out.println("Got pinged by remote side");
});

处理器只是收到通知,无论如何都会发送确认。此功能旨在实现在 HTTP/2 之上的协议。

这仅适用于 HTTP/2 协议。

连接关闭和 Go Away

调用 shutdown 将向连接的远程端发送一个 GOAWAY 帧,要求其停止创建流:客户端将停止发出新请求,服务器将停止推送响应。发送 GOAWAY 帧后,连接将等待一段时间(默认 30 秒),直到所有当前流关闭并关闭连接。

connection.shutdown();

shutdownHandler 在所有流关闭时通知,连接尚未关闭。

可以只发送一个 GOAWAY 帧,与关闭的主要区别在于,它只会告诉连接的远程端停止创建新流,而不会安排连接关闭。

connection.goAway(0);

相反,当收到 GOAWAY 时也可以收到通知。

connection.goAwayHandler(goAway -> {
  System.out.println("Received a go away frame");
});

当所有当前流已关闭并且连接可以关闭时,shutdownHandler 将被调用。

connection.goAway(0);
connection.shutdownHandler(v -> {

  // All streams are closed, close the connection
  connection.close();
});

这也适用于收到 GOAWAY 的情况。

这仅适用于 HTTP/2 协议。

连接关闭

连接 close 关闭连接:

  • 它关闭 HTTP/1.x 的套接字。

  • HTTP/2 无延迟关闭,在连接关闭之前,GOAWAY 帧仍将发送。

closeHandler 在连接关闭时通知。

优雅关闭

HTTP 服务器和客户端支持优雅关闭。

您可以关闭 serverclient

调用 shutdown 启动关闭阶段,在此阶段,服务器或客户端有机会执行清理操作。

  • 独立 HTTP 服务器解绑。

  • 共享 HTTP 服务器将从接受服务器集合中移除。

  • HTTP 客户端拒绝发送任何新请求。

当所有连接中的正在进行中的请求处理完毕后,服务器或客户端随后关闭。

此外,HTTP/2 连接发送一个 GOAWAY 帧,以通知远程端点连接不能再使用。

server
  .shutdown()
  .onSuccess(res -> {
    System.out.println("Server is now closed");
  });

关机将等待所有套接字关闭或关机超时触发。当超时触发时,所有套接字将被强制关闭。

每个打开的 HTTP 连接都会收到一个关闭事件通知,允许在实际连接关闭之前执行清理。

server.connectionHandler(conn -> {
  conn.shutdownHandler(v -> {
    // Perform clean-up
  });
});

默认关闭超时为 30 秒,您可以覆盖该超时。

server
  .shutdown(60, TimeUnit.SECONDS)
  .onSuccess(res -> {
    System.out.println("Server is now closed");
  });

客户端共享

您可以在多个 Verticle 或同一 Verticle 的实例之间共享 HTTP 客户端。此类客户端应在 Verticle 外部创建,否则,当创建它的 Verticle 被卸载时,它将被关闭。

HttpClientAgent client = vertx.createHttpClient(new HttpClientOptions().setShared(true));
vertx.deployVerticle(() -> new AbstractVerticle() {
  @Override
  public void start() throws Exception {
    // Use the client
  }
}, new DeploymentOptions().setInstances(4));

您也可以在每个 Verticle 中创建共享 HTTP 客户端。

vertx.deployVerticle(() -> new AbstractVerticle() {
  HttpClientAgent client;
  @Override
  public void start() {
    // Get or create a shared client
    // this actually creates a lease to the client
    // when the verticle is undeployed, the lease will be released automaticaly
    client = vertx.createHttpClient(new HttpClientOptions().setShared(true).setName("my-client"));
  }
}, new DeploymentOptions().setInstances(4));

首次创建共享客户端时,它将创建并返回一个客户端。随后的调用将重用此客户端并为此客户端创建一个租约。在所有租约都被释放后,客户端关闭。

默认情况下,当客户端需要创建 TCP 连接时,它会重用当前的事件循环。因此,HTTP 客户端将安全地随机使用使用它的 Verticle 的事件循环。

您可以为客户端分配它将独立于使用它的客户端使用的事件循环数量。

vertx.deployVerticle(() -> new AbstractVerticle() {
  HttpClientAgent client;
  @Override
  public void start() {
    // The client creates and use two event-loops for 4 instances
    client = vertx.createHttpClient(new HttpClientOptions().setShared(true).setName("my-client"), new PoolOptions().setEventLoopSize(2));
  }
}, new DeploymentOptions().setInstances(4));

服务器共享

当多个 HTTP 服务器监听同一端口时,Vert.x 使用轮询策略协调请求处理。

让我们以一个创建 HTTP 服务器的 Verticle 为例,例如:

io.vertx.examples.http.sharing.HttpServerVerticle
vertx.createHttpServer().requestHandler(request -> {
  request.response().end("Hello from server " + this);
}).listen(8080);

该服务正在监听 8080 端口。

那么,当这个 Verticle 被多次实例化时,例如 deploymentOptions.setInstances(2),会发生什么呢?如果两个 Verticle 都绑定到同一个端口,您将收到套接字异常。幸运的是,Vert.x 会为您处理这种情况。当您在与现有服务器相同的主机和端口上部署另一个服务器时,它实际上并不会尝试创建新的服务器来监听相同的主机/端口。它只绑定到套接字一次。收到请求时,它会按照轮询策略调用服务器处理器。

现在让我们设想一个客户端,例如:

vertx.setPeriodic(100, (l) -> {
  vertx
    .createHttpClient()
    .request(HttpMethod.GET, 8080, "localhost", "/")
    .onComplete(ar1 -> {
      if (ar1.succeeded()) {
        HttpClientRequest request = ar1.result();
        request
          .send()
          .onComplete(ar2 -> {
            if (ar2.succeeded()) {
              HttpClientResponse resp = ar2.result();
              resp.bodyHandler(body -> {
                System.out.println(body.toString("ISO-8859-1"));
              });
            }
          });
      }
    });
});

Vert.x 依次将请求委托给其中一个服务器。

Hello from i.v.e.h.s.HttpServerVerticle@1
Hello from i.v.e.h.s.HttpServerVerticle@2
Hello from i.v.e.h.s.HttpServerVerticle@1
Hello from i.v.e.h.s.HttpServerVerticle@2
...

因此,服务器可以根据可用核心进行扩展,而每个 Vert.x Verticle 实例仍严格保持单线程,您无需进行任何特殊操作,例如编写负载均衡器,即可在多核机器上扩展服务器。

您可以使用负端口值绑定到共享随机端口,首次绑定将随机选择一个端口,随后绑定到相同端口值的操作将共享此随机端口。

io.vertx.examples.http.sharing.HttpServerVerticle
vertx.createHttpServer().requestHandler(request -> {
  request.response().end("Hello from server " + this);
}).listen(-1);

在 Vert.x 中使用 HTTPS

Vert.x HTTP 服务器和客户端可以配置为使用 HTTPS,方式与网络服务器完全相同。

有关更多信息,请参阅配置网络服务器以使用 SSL

SSL 也可以通过 RequestOptions 按请求启用/禁用,或者在使用 setAbsoluteURI 方法指定方案时启用/禁用。

client
  .request(new RequestOptions()
    .setHost("localhost")
    .setPort(8080)
    .setURI("/")
    .setSsl(true))
  .onComplete(ar1 -> {
    if (ar1.succeeded()) {
      HttpClientRequest request = ar1.result();
      request
        .send()
        .onComplete(ar2 -> {
          if (ar2.succeeded()) {
            HttpClientResponse response = ar2.result();
            System.out.println("Received response with status code " + response.statusCode());
          }
        });
    }
  });

setSsl 设置作为默认客户端设置。

setSsl 覆盖默认客户端设置:

  • 将值设置为 false 将禁用 SSL/TLS,即使客户端配置为使用 SSL/TLS。

  • 将值设置为 true 将启用 SSL/TLS,即使客户端配置为不使用 SSL/TLS,实际的客户端 SSL/TLS(例如信任、密钥/证书、密码、ALPN 等)将被重用。

同样,setAbsoluteURI 方案也覆盖默认客户端设置。

服务器名称指示 (SNI)

Vert.x HTTP 服务器可以配置为使用 SNI,方式与 {@linkplain io.vertx.core.net 网络服务器} 完全相同。

Vert.x HTTP 客户端将在 TLS 握手期间将实际主机名作为 *服务器名称* 呈现。

WebSocket

WebSocket 是一种 Web 技术,它允许 HTTP 服务器和 HTTP 客户端(通常是浏览器)之间建立全双工的类套接字连接。

Vert.x 在客户端和服务器端都支持 WebSocket。

服务器端 WebSocket

服务器端处理 WebSocket 有两种方式。

WebSocket 处理器

第一种方式是在服务器实例上提供一个 webSocketHandler

当与服务器建立 WebSocket 连接时,将调用处理器,并传入 ServerWebSocket 实例。

server.webSocketHandler(webSocket -> {
  System.out.println("Connected!");
});
服务器 WebSocket 握手

默认情况下,服务器接受任何入站 WebSocket。

您可以设置一个 WebSocket 握手处理器来控制 WebSocket 握手的结果,即接受或拒绝传入的 WebSocket。

您可以通过调用 acceptreject 来选择拒绝 WebSocket。

server.webSocketHandshakeHandler(handshake -> {
  authenticate(handshake.headers(), ar -> {
    if (ar.succeeded()) {
      if (ar.result()) {
        // Terminate the handshake with the status code 101 (Switching Protocol)
        handshake.accept();
      } else {
        // Reject the handshake with 401 (Unauthorized)
        handshake.reject(401);
      }
    } else {
      // Will send a 500 error
      handshake.reject(500);
    }
  });
});
WebSocket 将在处理器调用后自动接受,除非已设置 WebSocket 的握手。
升级到 WebSocket

处理 WebSocket 的第二种方式是处理客户端发送的 HTTP Upgrade 请求,并在服务器请求上调用 toWebSocket

server.requestHandler(request -> {
  if (request.path().equals("/myapi")) {

    Future<ServerWebSocket> fut = request.toWebSocket();
    fut.onSuccess(ws -> {
      // Do something
    });

  } else {
    // Reject
    request.response().setStatusCode(400).end();
  }
});
服务器 WebSocket

ServerWebSocket 实例使您能够检索 WebSocket 握手的 HTTP 请求的 headerspathqueryURI

客户端 WebSocket

Vert.x WebSocketClient 支持 WebSocket。

You can connect a WebSocket to a server using one of the `link:../../apidocs/io/vertx/core/http/WebSocketClient.html#connect-int-java.lang.String-java.lang.String-[connect]` operations.
The returned future will be completed with an instance of `link:../../apidocs/io/vertx/core/http/WebSocket.html[WebSocket]` when the connection has been made:
WebSocketClient client = vertx.createWebSocketClient();

client
  .connect(80, "example.com", "/some-uri")
  .onComplete(res -> {
    if (res.succeeded()) {
      WebSocket ws = res.result();
      ws.textMessageHandler(msg -> {
        // Handle msg
      });
      System.out.println("Connected!");
    }
  });

当从非 Vert.x 线程连接时,您可以创建一个 ClientWebSocket,配置其处理器,然后连接到服务器。

[source,java]
----
WebSocketClient client = vertx.createWebSocketClient();

client .webSocket() .textMessageHandler(msg → { // Handle msg }) .connect(80, "example.com", "/some-uri") .onComplete(res → { if (res.succeeded()) { WebSocket ws = res.result(); } }); ----

默认情况下,客户端将 origin 头部设置为服务器主机,例如 http://www.example.com。某些服务器会拒绝此类请求,您可以配置客户端不设置此头部。

WebSocketConnectOptions options = new WebSocketConnectOptions()
  .setHost(host)
  .setPort(port)
  .setURI(requestUri)
  .setAllowOriginHeader(false);
client
  .connect(options)
  .onComplete(res -> {
    if (res.succeeded()) {
      WebSocket ws = res.result();
      System.out.println("Connected!");
    }
  });

您也可以设置不同的头部。

WebSocketConnectOptions options = new WebSocketConnectOptions()
  .setHost(host)
  .setPort(port)
  .setURI(requestUri)
  .addHeader(HttpHeaders.ORIGIN, origin);
client
  .connect(options)
  .onComplete(res -> {
    if (res.succeeded()) {
      WebSocket ws = res.result();
      System.out.println("Connected!");
    }
  });
WebSocket 协议的旧版本改用 sec-websocket-origin

向 WebSocket 写入消息

如果您希望向 WebSocket 写入单个 WebSocket 消息,您可以使用 writeBinaryMessagewriteTextMessage 来完成。

Buffer buffer = Buffer.buffer().appendInt(123).appendFloat(1.23f);
webSocket.writeBinaryMessage(buffer);

// Write a simple text message
String message = "hello";
webSocket.writeTextMessage(message);

如果 WebSocket 消息大于通过 setMaxFrameSize 配置的最大 WebSocket 帧大小,则 Vert.x 将在发送到网络之前将其拆分为多个 WebSocket 帧。

向 WebSocket 写入帧

一个 WebSocket 消息可以由多个帧组成。在这种情况下,第一个帧可以是 *二进制* 帧或 *文本* 帧,后跟零个或多个 *延续* 帧。

消息中的最后一个帧被标记为 *最终帧*。

要发送由多个帧组成的消息,您可以使用 WebSocketFrame.binaryFrameWebSocketFrame.textFrameWebSocketFrame.continuationFrame 创建帧,并使用 writeFrame 将它们写入 WebSocket。

以下是二进制帧的示例:

WebSocketFrame frame1 = WebSocketFrame.binaryFrame(buffer1, false);
webSocket.writeFrame(frame1);

WebSocketFrame frame2 = WebSocketFrame.continuationFrame(buffer2, false);
webSocket.writeFrame(frame2);

// Write the final frame
WebSocketFrame frame3 = WebSocketFrame.continuationFrame(buffer2, true);
webSocket.writeFrame(frame3);

在许多情况下,您只想发送一个由单个最终帧组成的 WebSocket 消息,因此我们提供了 writeFinalBinaryFramewriteFinalTextFrame 这两个快捷方法来完成此操作。

这是一个示例:

webSocket.writeFinalTextFrame("Geronimo!");

// Send a WebSocket message consisting of a single final binary frame:

Buffer buff = Buffer.buffer().appendInt(12).appendString("foo");

webSocket.writeFinalBinaryFrame(buff);

从 WebSocket 读取帧

要从 WebSocket 读取帧,您可以使用 frameHandler

当帧到达时,帧处理器将使用 WebSocketFrame 实例进行调用,例如:

webSocket.frameHandler(frame -> {
  System.out.println("Received a frame of size!");
});

关闭 WebSocket

完成 WebSocket 连接后,使用 close 关闭它。

管道式 WebSocket

WebSocket 实例也是一个 ReadStream 和一个 WriteStream,因此它可以与管道一起使用。

当将 WebSocket 用作写入流或读取流时,它只能用于使用未拆分为多个帧的二进制帧的 WebSocket 连接。

事件总线处理器

每个 WebSocket 都可以在事件总线上注册两个处理器,当这些处理器接收到任何数据时,它会将数据写入自身。这些是本地订阅,无法从其他集群节点访问。

这使您能够向 WebSocket 写入数据,该 WebSocket 可能位于完全不同的 Verticle 中,向该处理器的地址发送数据。

此功能默认禁用,但您可以使用 setRegisterWebSocketWriteHandlerssetRegisterWriteHandlers 启用它。

处理器的地址由 binaryHandlerIDtextHandlerID 提供。

为 HTTP/HTTPS 连接使用代理

HTTP 客户端支持通过 HTTP 代理(例如 Squid)或 *SOCKS4a* 或 *SOCKS5* 代理访问 HTTP/HTTPS URL。CONNECT 协议使用 HTTP/1.x,但可以连接到 HTTP/1.x 和 HTTP/2 服务器。

连接到 h2c(未加密的 HTTP/2 服务器)可能不受 HTTP 代理支持,因为它们通常只支持 HTTP/1.1。

可以在 HttpClientOptions 中配置代理,通过设置一个 ProxyOptions 对象,其中包含代理类型、主机名、端口以及可选的用户名和密码。

以下是使用 HTTP 代理的示例:

HttpClientOptions options = new HttpClientOptions()
    .setProxyOptions(new ProxyOptions().setType(ProxyType.HTTP)
        .setHost("localhost").setPort(3128)
        .setUsername("username").setPassword("secret"));
HttpClientAgent client = vertx.createHttpClient(options);

当客户端连接到 HTTP URL 时,它会连接到代理服务器,并在 HTTP 请求中提供完整的 URL("GET http://www.somehost.com/path/file.html HTTP/1.1")。

当客户端连接到 HTTPS URL 时,它会要求代理使用 CONNECT 方法创建一个到远程主机的隧道。

对于 SOCKS5 代理:

HttpClientOptions options = new HttpClientOptions()
    .setProxyOptions(new ProxyOptions().setType(ProxyType.SOCKS5)
        .setHost("localhost").setPort(1080)
        .setUsername("username").setPassword("secret"));
HttpClientAgent client = vertx.createHttpClient(options);

DNS 解析总是在代理服务器上完成,要实现 SOCKS4 客户端的功能,需要在本地解析 DNS 地址。

代理选项也可以按请求设置。

client.request(new RequestOptions()
  .setHost("example.com")
  .setProxyOptions(proxyOptions))
  .compose(request -> request
    .send()
    .compose(HttpClientResponse::body))
  .onSuccess(body -> {
    System.out.println("Received response");
  });
客户端连接池感知代理(包括身份验证),因此,通过不同代理向同一主机发出的两个请求不会共享同一个池化连接。

您可以使用 setNonProxyHosts 配置一个绕过代理的主机列表。该列表支持使用 * 通配符匹配域名。

HttpClientOptions options = new HttpClientOptions()
  .setProxyOptions(new ProxyOptions().setType(ProxyType.SOCKS5)
    .setHost("localhost").setPort(1080)
    .setUsername("username").setPassword("secret"))
  .addNonProxyHost("*.foo.com")
  .addNonProxyHost("localhost");
HttpClientAgent client = vertx.createHttpClient(options);

其他协议的处理

如果代理支持,HTTP 代理实现支持获取 `ftp://` URL。

当 HTTP 请求 URI 包含完整 URL 时,客户端将不会计算完整的 HTTP URL,而是使用请求 URI 中指定的完整 URL。

HttpClientOptions options = new HttpClientOptions()
    .setProxyOptions(new ProxyOptions().setType(ProxyType.HTTP));
HttpClientAgent client = vertx.createHttpClient(options);
client
  .request(HttpMethod.GET, "ftp://ftp.gnu.org/gnu/")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      HttpClientRequest request = ar.result();
      request
        .send()
        .onComplete(ar2 -> {
          if (ar2.succeeded()) {
            HttpClientResponse response = ar2.result();
            System.out.println("Received response with status code " + response.statusCode());
          }
        });
    }
  });

使用 HA PROXY 协议

HA PROXY 协议 提供了一种便捷的方式,可以安全地传输连接信息,例如客户端地址,跨多层 NAT 或 TCP 代理。

HA PROXY 协议可以通过设置 setUseProxyProtocol 选项并在您的类路径中添加以下依赖项来启用:

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-codec-haproxy</artifactId>
  <!--<version>Should align with netty version that Vert.x uses</version>-->
</dependency>
HttpServerOptions options = new HttpServerOptions()
  .setUseProxyProtocol(true);

HttpServer server = vertx.createHttpServer(options);
server.requestHandler(request -> {
  // Print the actual client address provided by the HA proxy protocol instead of the proxy address
  System.out.println(request.remoteAddress());

  // Print the address of the proxy
  System.out.println(request.localAddress());
});

verticle 中的自动清理

如果您在 Verticle 内部创建 HTTP 服务器和客户端,当 Verticle 卸载时,这些服务器和客户端将自动关闭。

使用 SharedData API

顾名思义,SharedData API 允许您安全地在以下之间共享数据:

  • 应用程序的不同部分之间,或者

  • 同一 Vert.x 实例中的不同应用程序之间,或者

  • Vert.x 实例集群中的不同应用程序之间。

实际上,它提供了:

  • 同步映射(仅本地)

  • 异步映射

  • 异步锁

  • 异步计数器

分布式数据结构的行为取决于您使用的集群管理器。备份(复制)以及在遇到网络分区时的行为由集群管理器及其配置定义。请参阅集群管理器文档以及底层框架手册。

本地映射

Local maps 允许您在同一 Vert.x 实例中的不同事件循环(例如不同 Verticle)之间安全地共享数据。

它们只允许某些数据类型用作键和值:

  • 不可变类型(例如字符串、布尔值等),或者

  • 实现 Shareable 接口的类型(缓冲区、JSON 数组、JSON 对象或您自己的可共享对象)。

在后一种情况下,键/值在放入映射之前将被复制。

这样我们可以确保您的 Vert.x 应用程序中不同线程之间没有对 *可变状态的共享访问*。您也无需担心通过同步访问来保护该状态。

为方便起见,实现 ClusterSerializablejava.io.Serializable 的对象也可以用作键和值。在这种情况下,键/值在放入映射之前将通过序列化/反序列化进行复制。因此,建议考虑实现 Shareable 以获得更好的性能。

以下是使用共享本地映射的示例:

SharedData sharedData = vertx.sharedData();

LocalMap<String, String> map1 = sharedData.getLocalMap("mymap1");

map1.put("foo", "bar"); // Strings are immutable so no need to copy

LocalMap<String, Buffer> map2 = sharedData.getLocalMap("mymap2");

map2.put("eek", Buffer.buffer().appendInt(123)); // This buffer will be copied before adding to map

// Then... in another part of your application:

map1 = sharedData.getLocalMap("mymap1");

String val = map1.get("foo");

map2 = sharedData.getLocalMap("mymap2");

Buffer buff = map2.get("eek");

异步共享映射

Asynchronous shared maps 允许将数据放入映射中,并在本地或从任何其他节点检索。

这使得它们对于诸如在托管 Vert.x Web 应用程序的服务器群中存储会话状态之类的功能非常有用。

它们只允许某些数据类型用作键和值:

  • 不可变类型(例如字符串、布尔值等),或者

  • 实现 ClusterSerializable 接口的类型(缓冲区、JSON 数组、JSON 对象或您自己的集群可序列化对象),或者

  • 实现 java.io.Serializable 接口的类型。

获取映射是异步的,结果将返回到您指定的处理器中。示例如下:

SharedData sharedData = vertx.sharedData();

sharedData.
  <String, String>getAsyncMap("mymap")
  .onComplete(res -> {
    if (res.succeeded()) {
      AsyncMap<String, String> map = res.result();
    } else {
      // Something went wrong!
    }
  });

当 Vert.x 处于集群模式时,您放入映射中的数据可以在本地以及任何其他集群成员上访问。

在集群模式下,异步共享映射依赖于集群管理器提供的分布式数据结构。请注意,异步共享映射操作在集群模式下的延迟可能远高于本地模式。

如果您的应用程序不需要将数据与每个其他节点共享,您可以检索仅本地的映射。

SharedData sharedData = vertx.sharedData();

sharedData.
  <String, String>getLocalAsyncMap("mymap")
  .onComplete(res -> {
    if (res.succeeded()) {
      // Local-only async map
      AsyncMap<String, String> map = res.result();
    } else {
      // Something went wrong!
    }
  });

将数据放入映射

您可以使用 put 将数据放入映射中。

实际的 `put` 操作是异步的,并且一旦完成,返回的 Future 就会收到通知。

map
  .put("foo", "bar")
  .onComplete(resPut -> {
    if (resPut.succeeded()) {
      // Successfully put the value
    } else {
      // Something went wrong!
    }
  });

从映射中获取数据

您可以使用 get 从映射中获取数据。

实际的 `get` 操作是异步的,并且返回的 Future 将在稍后收到结果通知。

map
  .get("foo")
  .onComplete(resGet -> {
    if (resGet.succeeded()) {
      // Successfully got the value
      Object val = resGet.result();
    } else {
      // Something went wrong!
    }
  });
其他映射操作

您还可以从异步映射中移除条目、清空它们并获取大小。

有关映射操作的详细列表,请参阅 API 文档

异步锁

Asynchronous locks 允许您在本地或跨集群获取排他锁。这在您希望在集群的任何一个时间点上只在一个节点上执行操作或访问资源时非常有用。

异步锁具有异步 API,这与大多数锁定 API 不同,后者会阻塞调用线程直到获得锁。

要获取锁,请使用 getLock。这不会阻塞,但当锁可用时,返回的 Future 将以 Lock 实例完成,表示您现在拥有该锁。

在您拥有锁期间,本地或集群中的其他任何调用者都无法获取该锁。

当您使用完锁后,您调用 release 来释放它,以便另一个调用者可以获取它。

SharedData sharedData = vertx.sharedData();

sharedData
  .getLock("mylock")
  .onComplete(res -> {
    if (res.succeeded()) {
      // Got the lock!
      Lock lock = res.result();

      // 5 seconds later we release the lock so someone else can get it

      vertx.setTimer(5000, tid -> lock.release());

    } else {
      // Something went wrong
    }
  });

您也可以获取带超时的锁。如果在超时时间内未能获取锁,处理器将以失败状态被调用。

SharedData sharedData = vertx.sharedData();

sharedData
  .getLockWithTimeout("mylock", 10000)
  .onComplete(res -> {
    if (res.succeeded()) {
      // Got the lock!
      Lock lock = res.result();

    } else {
      // Failed to get lock
    }
  });

有关锁操作的详细列表,请参阅 API 文档

在集群模式下,异步锁依赖于集群管理器提供的分布式数据结构。请注意,异步共享锁操作在集群模式下的延迟可能远高于本地模式。

如果您的应用程序不需要将锁与每个其他节点共享,您可以检索仅本地的锁。

SharedData sharedData = vertx.sharedData();

sharedData
  .getLocalLock("mylock")
  .onComplete(res -> {
    if (res.succeeded()) {
      // Local-only lock
      Lock lock = res.result();

      // 5 seconds later we release the lock so someone else can get it

      vertx.setTimer(5000, tid -> lock.release());

    } else {
      // Something went wrong
    }
  });

有时,您使用锁 API 获取异步结果,并在异步调用周围应用获取/释放模式。Vert.x 提供了一个简化的锁 API 来简化这种模式。

SharedData sharedData = vertx.sharedData();

Future<String> res = sharedData.withLock("mylock", () -> {
  // Obtained the lock!
  Future<String> future = getAsyncString();
  // It will be released upon completion of this future
  return future;
});

锁在调用供应商之前获取,并在供应商返回的 Future 完成时释放。

异步计数器

在本地或跨应用程序的不同节点维护原子计数器通常很有用。

您可以使用 Counter 来完成此操作。

您可以使用 getCounter 获取一个实例。

SharedData sharedData = vertx.sharedData();

sharedData
  .getCounter("mycounter")
  .onComplete(res -> {
    if (res.succeeded()) {
      Counter counter = res.result();
    } else {
      // Something went wrong!
    }
  });

一旦您获得了实例,您可以使用各种方法检索当前计数、原子地递增、递减并向其添加值。

有关计数器操作的详细列表,请参阅 API 文档

在集群模式下,异步计数器依赖于集群管理器提供的分布式数据结构。请注意,异步共享计数器操作在集群模式下的延迟可能远高于本地模式。

如果您的应用程序不需要将计数器与每个其他节点共享,您可以检索仅本地的计数器。

SharedData sharedData = vertx.sharedData();

sharedData
  .getLocalCounter("mycounter")
  .onComplete(res -> {
    if (res.succeeded()) {
      // Local-only counter
      Counter counter = res.result();
    } else {
      // Something went wrong!
    }
  });

在 Vert.x 中使用文件系统

Vert.x FileSystem 对象提供了许多操作来操纵文件系统。

每个 Vert.x 实例都有一个文件系统对象,您可以通过 fileSystem 获取它。

每种操作都提供了阻塞和非阻塞版本。非阻塞版本接受一个处理器,该处理器在操作完成或发生错误时被调用。

以下是文件异步复制的示例:

FileSystem fs = vertx.fileSystem();

// Copy file from foo.txt to bar.txt
fs.copy("foo.txt", "bar.txt")
  .onComplete(res -> {
    if (res.succeeded()) {
      // Copied ok!
    } else {
      // Something went wrong
    }
  });

阻塞版本命名为 xxxBlocking,并直接返回结果或抛出异常。在许多情况下,根据操作系统和文件系统,一些可能阻塞的操作可以快速返回,这就是我们提供它们的原因,但强烈建议您在从事件循环中使用它们之前,测试它们在您的特定应用程序中返回所需的时间,以免违反黄金法则。

以下是使用阻塞 API 进行复制的示例:

FileSystem fs = vertx.fileSystem();

// Copy file from foo.txt to bar.txt synchronously
fs.copyBlocking("foo.txt", "bar.txt");

存在许多操作,例如复制、移动、截断、chmod 以及许多其他文件操作。我们不会在此处列出所有这些操作,请查阅 API 文档 以获取完整列表。

让我们看几个使用异步方法的示例:

vertx.fileSystem()
  .readFile("target/classes/readme.txt")
  .onComplete(result -> {
    if (result.succeeded()) {
      System.out.println(result.result());
    } else {
      System.err.println("Oh oh ..." + result.cause());
    }
  });

// Copy a file
vertx.fileSystem()
  .copy("target/classes/readme.txt", "target/classes/readme2.txt")
  .onComplete(result -> {
    if (result.succeeded()) {
      System.out.println("File copied");
    } else {
      System.err.println("Oh oh ..." + result.cause());
    }
  });

// Write a file
vertx.fileSystem()
  .writeFile("target/classes/hello.txt", Buffer.buffer("Hello"))
  .onComplete(result -> {
    if (result.succeeded()) {
      System.out.println("File written");
    } else {
      System.err.println("Oh oh ..." + result.cause());
    }
  });

// Check existence and delete
vertx.fileSystem()
  .exists("target/classes/junk.txt")
  .compose(exist -> {
    if (exist) {
      return vertx.fileSystem().delete("target/classes/junk.txt");
    } else {
      return Future.failedFuture("File does not exist");
    }
  }).onComplete(result -> {
    if (result.succeeded()) {
      System.out.println("File deleted");
    } else {
      System.err.println("Oh oh ... - cannot delete the file: " + result.cause().getMessage());
    }
  });

异步文件

Vert.x 提供了一个异步文件抽象,允许您在文件系统上操作文件。

您可以如下打开 AsyncFile

OpenOptions options = new OpenOptions();
fileSystem
  .open("myfile.txt", options)
  .onComplete(res -> {
    if (res.succeeded()) {
      AsyncFile file = res.result();
    } else {
      // Something went wrong!
    }
  });

AsyncFile 实现了 ReadStreamWriteStream,因此您可以将文件与网络套接字、HTTP 请求和响应以及 WebSocket 等其他流对象进行 *管道传输*。

它们还允许您直接读写。

随机访问写入

要将 AsyncFile 用于随机访问写入,请使用 write 方法。

该方法的参数是:

  • buffer:要写入的缓冲区。

  • position:文件中写入缓冲区的整数位置。如果该位置大于或等于文件大小,文件将扩大以适应偏移量。

以下是随机访问写入的示例:

vertx.fileSystem()
  .open("target/classes/hello.txt", new OpenOptions())
  .onComplete(result -> {
    if (result.succeeded()) {
      AsyncFile file = result.result();
      Buffer buff = Buffer.buffer("foo");
      for (int i = 0; i < 5; i++) {
        file
          .write(buff, buff.length() * i)
          .onComplete(ar -> {
            if (ar.succeeded()) {
              System.out.println("Written ok!");
              // etc
            } else {
              System.err.println("Failed to write: " + ar.cause());
            }
          });
      }
    } else {
      System.err.println("Cannot open file " + result.cause());
    }
  });

随机访问读取

要将 AsyncFile 用于随机访问读取,请使用 read 方法。

该方法的参数是:

  • buffer:将数据读取到其中的缓冲区。

  • offset:缓冲区中放置读取数据的整数偏移量。

  • position:文件中读取数据的位置。

  • length:要读取的数据字节数。

  • handler:结果处理器。

以下是随机访问读取的示例:

vertx.fileSystem()
  .open("target/classes/les_miserables.txt", new OpenOptions())
  .onComplete(result -> {
    if (result.succeeded()) {
      AsyncFile file = result.result();
      Buffer buff = Buffer.buffer(1000);
      for (int i = 0; i < 10; i++) {
        file
          .read(buff, i * 100, i * 100, 100)
          .onComplete(ar -> {
            if (ar.succeeded()) {
              System.out.println("Read ok!");
            } else {
              System.err.println("Failed to write: " + ar.cause());
            }
          });
      }
    } else {
      System.err.println("Cannot open file " + result.cause());
    }
  });

打开选项

打开 AsyncFile 时,您会传入一个 OpenOptions 实例。这些选项描述了文件访问的行为。例如,您可以使用 setReadsetWritesetPerms 方法配置文件权限。

您还可以配置如果打开的文件已存在时的行为,使用 setCreateNewsetTruncateExisting

您还可以使用 setDeleteOnClose 将文件标记为在关闭时或 JVM 关闭时删除。

将数据刷新到底层存储。

OpenOptions 中,您可以使用 setDsync 启用/禁用每次写入时内容的自动同步。在这种情况下,您可以通过调用 flush 方法手动将 OS 缓存中的任何写入刷新。

此方法也可以带有一个处理器调用,该处理器将在刷新完成时被调用。

将 AsyncFile 用作 ReadStream 和 WriteStream

AsyncFile 实现了 ReadStreamWriteStream。然后您可以将它们与 *管道* 一起使用,以便将数据传输到其他读写流以及从其他读写流传输数据。例如,这会将内容复制到另一个 AsyncFile

final AsyncFile output = vertx.fileSystem().openBlocking("target/classes/plagiary.txt", new OpenOptions());

vertx.fileSystem()
  .open("target/classes/les_miserables.txt", new OpenOptions())
  .compose(file -> file
    .pipeTo(output)
    .eventually(() -> file.close()))
  .onComplete(result -> {
    if (result.succeeded()) {
      System.out.println("Copy done");
    } else {
      System.err.println("Cannot copy file " + result.cause().getMessage());
    }
  });

您还可以使用 *管道* 将文件内容写入 HTTP 响应,或者更一般地写入任何 WriteStream

从类路径访问文件

当 Vert.x 无法在文件系统上找到文件时,它会尝试从类路径解析文件。请注意,类路径资源路径永远不会以 `/` 开头。

由于 Java 不提供对类路径资源的异步访问,因此当首次访问类路径资源时,文件会被复制到文件系统中的一个工作线程中,并从那里异步提供服务。当第二次访问同一资源时,文件将直接从文件系统提供服务。即使类路径资源发生变化(例如在开发系统中),原始内容也会被提供。

此缓存行为可以在 setFileCachingEnabled 选项上设置。此选项的默认值为 true,除非定义了系统属性 vertx.disableFileCaching

默认情况下,文件缓存路径是 `/tmp/vertx-cache-UUID`,可以通过设置系统属性 vertx.cacheDirBase 进行自定义。使用此属性时,请注意它应指向进程可读/写位置的目录前缀,例如:-Dvertx.cacheDirBase=/tmp/my-vertx-cache(请注意,没有 UUID)。

每个 Vert.x 进程都会附加自己的 UUID,以便在同一机器上运行的不同应用程序之间保持缓存独立。

整个类路径解析功能可以在系统范围内禁用,通过将系统属性 vertx.disableFileCPResolving 设置为 true

这些系统属性在加载 io.vertx.core.file.FileSystemOptions 类时只评估一次,因此,这些属性应在加载此类之前或在启动时作为 JVM 系统属性进行设置。

如果您想为特定应用程序禁用类路径解析,但仍希望在系统范围内默认启用它,您可以通过 setClassPathResolvingEnabled 选项来实现。

关闭 AsyncFile

要关闭 AsyncFile,请调用 close 方法。关闭是异步的,如果您希望在关闭完成时收到通知,您可以指定一个处理器函数作为参数。

数据报套接字 (UDP)

在 Vert.x 中使用用户数据报协议 (UDP) 轻而易举。

UDP 是一种无连接传输协议,这基本上意味着您与远程对等方没有持久连接。

相反,您可以发送和接收数据包,并且每个数据包中都包含远程地址。

除此之外,UDP 的使用不如 TCP 安全,这意味着无法保证发送的数据报包能够完全到达其端点。

唯一能保证的是它要么完整接收,要么完全不接收。

此外,您通常不能发送大于网络接口 MTU 大小的数据,这是因为每个数据包将作为一个数据包发送。

但是,请注意,即使数据包大小小于 MTU,它仍然可能失败。

失败的大小取决于操作系统等。因此经验法则是尝试发送小数据包。

由于 UDP 的特性,它最适合允许丢弃数据包的应用程序(例如监控应用程序)。

优点是与 TCP 相比,它的开销要小得多,TCP 可以由 NetServer 和 NetClient 处理(见上文)。

创建数据报套接字

要使用 UDP,您首先需要创建一个 DatagramSocket。在这里,您是只想发送数据还是发送和接收数据并不重要。

DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());

返回的 DatagramSocket 不会绑定到特定端口。如果您只想发送数据(像客户端一样),这不是问题,但更多内容将在下一节介绍。

发送数据报包

如前所述,用户数据报协议 (UDP) 以数据包形式向远程对等方发送数据,但不以持久方式连接到它们。

这意味着每个数据包可以发送到不同的远程对等方。

发送数据包就像这里显示的一样简单:

DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
Buffer buffer = Buffer.buffer("content");
// Send a Buffer
socket
  .send(buffer, 1234, "10.0.0.1")
  .onComplete(asyncResult -> System.out.println("Send succeeded? " + asyncResult.succeeded()));
// Send a String
socket
  .send("A string used as content", 1234, "10.0.0.1")
  .onComplete(asyncResult -> System.out.println("Send succeeded? " + asyncResult.succeeded()));

接收数据报包

如果您想接收数据包,您需要通过在其上调用 listen(…​) 来绑定 DatagramSocket

这样您将能够接收发送到 DatagramSocket 监听的地址和端口的 DatagramPacket

除此之外,您还希望设置一个 Handler,它将在收到每个 DatagramPacket 时被调用。

DatagramPacket 具有以下方法:

  • sender:表示数据包发送方的 InetSocketAddress。

  • data:包含接收数据的缓冲区。

因此,要在特定地址和端口上监听,您可以像这里显示的那样操作:

DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
socket
  .handler(packet -> {
    // Do something with the packet
  })
  .listen(1234, "0.0.0.0")
  .onComplete(asyncResult -> System.out.println("Send succeeded? " + asyncResult.succeeded()));
;

请注意,即使 `{code AsyncResult}` 成功,这也只意味着它可能已写入网络堆栈,但不保证它是否已经或将会到达远程对等方。

如果您需要这样的保证,那么您需要使用 TCP 并在此之上构建一些握手逻辑。

多播

发送多播数据包

多播允许多个套接字接收相同的数据包。其工作原理是让套接字加入同一个多播组,然后您可以向该组发送数据包。

我们将在下一节介绍如何加入多播组并接收数据包。

发送多播数据包与发送普通数据报包没有区别。区别在于您将多播组地址传递给发送方法。

如下所示:

DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
Buffer buffer = Buffer.buffer("content");
// Send a Buffer to a multicast address
socket
  .send(buffer, 1234, "230.0.0.1")
  .onComplete(asyncResult -> System.out.println("Send succeeded? " + asyncResult.succeeded()));

所有已加入多播组 230.0.0.1 的套接字都将接收到该数据包。

接收多播数据包

如果您想接收特定多播组的数据包,您需要通过在其上调用 listen(…​) 来绑定 DatagramSocket 以加入多播组。

这样您将接收到发送到 DatagramSocket 监听的地址和端口的数据报包,以及发送到多播组的数据报包。

除此之外,您还希望设置一个处理器,它将在收到每个数据报包时被调用。

DatagramPacket 具有以下方法:

  • sender():表示数据包发送方的 InetSocketAddress。

  • data():包含接收数据的缓冲区。

因此,要在特定地址和端口上监听,并接收多播组 230.0.0.1 的数据包,您可以像这里显示的那样操作:

DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
socket
  .handler(packet -> {
    // Do something with the packet
  })
  .listen(1234, "0.0.0.0")
  .compose(v -> socket.listenMulticastGroup("230.0.0.1")) // join the multicast group
  .onComplete(asyncResult -> System.out.println("Listen succeeded? " + asyncResult.succeeded()));
取消监听 / 离开多播组

有时您可能希望在有限时间内接收某个多播组的数据包。

在这种情况下,您可以首先开始监听它们,然后再取消监听。

如下所示:

DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
socket
  .handler(packet -> {
    // Do something with the packet
  })
  .listen(1234, "0.0.0.0")
  .compose(v -> socket.listenMulticastGroup("230.0.0.1")) // join the multicast group
  .onComplete(asyncResult -> {
    if (asyncResult.succeeded()) {
      // will now receive packets for group

      // do some work

      socket.unlistenMulticastGroup("230.0.0.1").onComplete(asyncResult2 -> {
        System.out.println("Unlisten succeeded? " + asyncResult2.succeeded());
      });
    } else {
      System.out.println("Listen failed" + asyncResult.cause());
    }
  });
阻塞多播

除了取消监听多播地址,还可以只针对特定发送方地址阻塞多播。

请注意,这仅在某些操作系统和内核版本上有效。因此,请检查操作系统文档以确认是否支持。

这是一项专家功能。

要从特定地址阻塞多播,您可以在 DatagramSocket 上调用 blockMulticastGroup(…​),如这里所示:

DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());

// Some code

// This would block packets which are send from 10.0.0.2
socket
  .blockMulticastGroup("230.0.0.1", "10.0.0.2")
  .onComplete(asyncResult -> System.out.println("block succeeded? " + asyncResult.succeeded()));

DatagramSocket 属性

创建 DatagramSocket 时,有多个属性可以使用 DatagramSocketOptions 对象进行设置,以更改其行为。这些属性在此处列出:

  • setSendBufferSize 设置发送缓冲区大小(字节)。

  • setReceiveBufferSize 设置 TCP 接收缓冲区大小(字节)。

  • setReuseAddress 如果为 true,则处于 TIME_WAIT 状态的地址在关闭后可以被重用。

  • setTrafficClass

  • setBroadcast 设置或清除 SO_BROADCAST 套接字选项。当此选项设置时,数据报(UDP)包可以发送到本地接口的广播地址。

  • setMulticastNetworkInterface 设置或清除 IP_MULTICAST_LOOP 套接字选项。当此选项设置时,多播包也将在本地接口上接收。

  • setMulticastTimeToLive 设置 IP_MULTICAST_TTL 套接字选项。TTL 代表“生存时间”,但在本上下文中,它指定了一个数据包允许通过的 IP 跳数,特别是对于多播流量。转发数据包的每个路由器或网关都会将 TTL 减 1。如果路由器将 TTL 减到 0,则数据包将不再转发。

DatagramSocket 本地地址

您可以通过调用 localAddress 查找套接字的本地地址(即 UDP 套接字这一端的地址)。只有在您之前使用 listen(…​) 绑定了 DatagramSocket 时,这才会返回 `InetSocketAddress`,否则它将返回 null。

关闭 DatagramSocket

您可以通过调用 close 方法来关闭套接字。这将关闭套接字并释放所有资源。

DNS 客户端

您经常会遇到需要异步获取 DNS 信息的情况。不幸的是,使用 Java 虚拟机自带的 API 无法实现这一点。因此,Vert.x 提供了自己的 DNS 解析 API,它是完全异步的。

要获取 DnsClient 实例,您将通过 Vertx 实例创建一个新实例。

DnsClient client = vertx.createDnsClient(53, "10.0.0.1");

您还可以使用选项创建客户端并配置查询超时。

DnsClient client = vertx.createDnsClient(new DnsClientOptions()
  .setPort(53)
  .setHost("10.0.0.1")
  .setQueryTimeout(10000)
);

创建不带参数或省略服务器地址的客户端将使用内部用于非阻塞地址解析的服务器地址。

DnsClient client1 = vertx.createDnsClient();

// Just the same but with a different query timeout
DnsClient client2 = vertx.createDnsClient(new DnsClientOptions().setQueryTimeout(10000));

客户端使用单个事件循环进行查询,它可以安全地从任何线程使用,包括非 Vert.x 线程。

查找

尝试查找给定名称的 A (ipv4) 或 AAAA (ipv6) 记录。返回的第一个将被使用,因此它的行为与您在操作系统上使用“nslookup”时类似。

要查找“vertx.io”的 A / AAAA 记录,您通常会这样使用它:

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .lookup("vertx.io")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println(ar.result());
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

查找 IPv4

尝试查找给定名称的 A (ipv4) 记录。返回的第一个将被使用,因此它的行为与您在操作系统上使用“nslookup”时类似。

要查找“vertx.io”的 A 记录,您通常会这样使用它:

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .lookup4("vertx.io")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println(ar.result());
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

查找 IPv6

尝试查找给定名称的 AAAA (ipv6) 记录。返回的第一个将被使用,因此它的行为与您在操作系统上使用“nslookup”时类似。

要查找“vertx.io”的 A 记录,您通常会这样使用它:

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .lookup6("vertx.io")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println(ar.result());
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

解析 A 记录

尝试解析给定名称的所有 A (ipv4) 记录。这与在类 Unix 操作系统上使用“dig”非常相似。

要查找“vertx.io”的所有 A 记录,您通常会这样做:

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .resolveA("vertx.io")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      List<String> records = ar.result();
      for (String record : records) {
        System.out.println(record);
      }
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

解析 AAAA 记录

尝试解析给定名称的所有 AAAA (ipv6) 记录。这与在类 Unix 操作系统上使用“dig”非常相似。

要查找“vertx.io”的所有 AAAAA 记录,您通常会这样做:

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .resolveAAAA("vertx.io")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      List<String> records = ar.result();
      for (String record : records) {
        System.out.println(record);
      }
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

解析 CNAME 记录

尝试解析给定名称的所有 CNAME 记录。这与在 Unix 类操作系统上使用“dig”非常相似。

要查找“vertx.io”的所有 CNAME 记录,你通常会这样做

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .resolveCNAME("vertx.io")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      List<String> records = ar.result();
      for (String record : records) {
        System.out.println(record);
      }
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

resolveMX

尝试解析给定名称的所有 MX 记录。MX 记录用于定义哪个邮件服务器接受给定域的电子邮件。

要查找“vertx.io”的所有 MX 记录,你通常会这样做

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .resolveMX("vertx.io")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      List<MxRecord> records = ar.result();
      for (MxRecord record : records) {
        System.out.println(record);
      }
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

请注意,此列表将包含按优先级排序的MxRecord,这意味着优先级较小的 MX 记录会先出现在列表中。

MxRecord允许你通过以下方法访问 MX 记录的优先级和名称:

record.priority();
record.name();

resolveTXT

尝试解析给定名称的所有 TXT 记录。TXT 记录通常用于定义域的额外信息。

要解析“vertx.io”的所有 TXT 记录,你可以使用类似这样的代码

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .resolveTXT("vertx.io")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      List<String> records = ar.result();
      for (String record : records) {
        System.out.println(record);
      }
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

resolveNS

尝试解析给定名称的所有 NS 记录。NS 记录指定哪个 DNS 服务器托管给定域的 DNS 信息。

要解析“vertx.io”的所有 NS 记录,你可以使用类似这样的代码

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .resolveNS("vertx.io")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      List<String> records = ar.result();
      for (String record : records) {
        System.out.println(record);
      }
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

resolveSRV

尝试解析给定名称的所有 SRV 记录。SRV 记录用于定义服务的额外信息,例如端口和主机名。某些协议需要此额外信息。

要查找“vertx.io”的所有 SRV 记录,你通常会这样做

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .resolveSRV("vertx.io")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      List<SrvRecord> records = ar.result();
      for (SrvRecord record : records) {
        System.out.println(record);
      }
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

请注意,此列表将包含按优先级排序的 SrvRecord,这意味着优先级较小的 SrvRecord 会先出现在列表中。

SrvRecord允许你访问 SRV 记录本身包含的所有信息

record.priority();
record.name();
record.weight();
record.port();
record.protocol();
record.service();
record.target();

请参阅 API 文档了解具体细节。

resolvePTR

尝试解析给定名称的 PTR 记录。PTR 记录将 IP 地址映射到名称。

要解析 IP 地址 10.0.0.1 的 PTR 记录,你将使用“1.0.0.10.in-addr.arpa”的 PTR 概念

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .resolvePTR("1.0.0.10.in-addr.arpa")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      String record = ar.result();
      System.out.println(record);
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

reverseLookup

尝试对 IP 地址进行反向查找。这与解析 PTR 记录基本相同,但允许你只传入 IP 地址,而不是有效的 PTR 查询字符串。

要对 IP 地址 10.0.0.1 进行反向查找,请执行类似以下操作:

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client
  .reverseLookup("10.0.0.1")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      String record = ar.result();
      System.out.println(record);
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());
    }
  });

错误处理

正如你在前面部分所看到的,DnsClient 允许你传入一个 Handler,一旦查询完成,该 Handler 将收到一个 AsyncResult 通知。如果发生错误,它将收到一个 DnsException 通知,该异常将包含一个DnsResponseCode,指示解析失败的原因。此 DnsResponseCode 可用于更详细地检查原因。

可能的 DnsResponseCodes 有

所有这些错误都由 DNS 服务器本身“生成”。

你可以像这样从 DnsException 中获取 DnsResponseCode:

DnsClient client = vertx.createDnsClient(53, "10.0.0.1");
client
  .lookup("nonexisting.vert.xio")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      String record = ar.result();
      System.out.println(record);
    } else {
      Throwable cause = ar.cause();
      if (cause instanceof DnsException) {
        DnsException exception = (DnsException) cause;
        DnsResponseCode code = exception.code();
        // ...
      } else {
        System.out.println("Failed to resolve entry" + ar.cause());
      }
    }
  });

Vert.x 虚拟线程

使用虚拟线程编写看起来同步的 Vert.x 代码。

你仍然编写处理事件的传统 Vert.x 代码,但你可以在复杂工作流中编写同步代码,并在此类工作流中使用线程本地变量。

简介

Vert.x 的非阻塞特性导致了异步 API。异步 API 可以采用各种形式,包括回调风格、Promise 和响应式扩展。

在某些情况下,使用异步 API 编程可能比使用直接同步风格更具挑战性,特别是当你需要顺序执行多个操作时。此外,在使用异步 API 时,错误传播通常更复杂。

虚拟线程支持允许你使用异步 API,但采用你已经熟悉的直接同步风格。

它是通过使用 Java 21 虚拟线程来实现的。虚拟线程是非常轻量级的线程,不对应底层内核线程。当它们被阻塞时,它们不会阻塞内核线程。

使用虚拟线程

你可以部署虚拟线程 verticle。

虚拟线程 verticle 能够等待 Vert.x future 并同步获取结果。

当 verticle 等待结果时,verticle 仍然可以像事件循环 verticle 一样处理事件。

AbstractVerticle verticle = new AbstractVerticle() {
  @Override
  public void start() {
    HttpClient client = vertx.createHttpClient();
    HttpClientRequest req = client.request(
      HttpMethod.GET,
      8080,
      "localhost",
      "/").await();
    HttpClientResponse resp = req.send().await();
    int status = resp.statusCode();
    Buffer body = resp.body().await();
  }
};

// Run the verticle a on virtual thread
vertx.deployVerticle(verticle, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD));
使用虚拟线程需要 Java 21 或更高版本。

在虚拟线程 verticle 中阻塞

你可以使用await来暂停当前虚拟线程,直到可获得等待的结果。

虚拟线程实际上被阻塞了,但应用程序仍然可以处理事件。

当虚拟线程等待结果并且 verticle 需要处理任务时,会创建一个新的虚拟线程来处理此任务。

当结果可用时,虚拟线程的执行会恢复并在当前任务暂停或完成之后调度。

与任何 verticle 一样,最多同时执行一个任务。

你可以等待 Vert.x Future

Buffer body = response.body().await();

或等待 JDK CompletionStage

Buffer body = Future.fromCompletionStage(completionStage).await();

你还可以将 Vert.x ReadStream 转换为 Java 阻塞流

server.requestHandler(request -> {
  Stream<Buffer> blockingStream = request.blockingStream();
  HttpServerResponse response = request.response();
  response.setChunked(true);
  blockingStream
    .map(buff -> "" + buff.length())
    .forEach(size -> response.write(size));
  response.end();
});

字段可见性

虚拟线程 verticle 可以在调用await之前安全地与字段交互。但是,如果你在调用await之前读取字段并在调用之后重用该值,则应该记住该值可能已更改。

int value = counter;
value += getRemoteValue().await();
// the counter value might have changed
counter = value;

你应该在调用await之前读取/写入字段以避免这种情况。

counter += getRemoteValue().await();
这与事件循环 verticle 的行为相同

等待多个 future

当你需要等待多个 future 时,可以使用 Vert.x CompositeFuture

Future<String> f1 = getRemoteString();
Future<Integer> f2 = getRemoteValue();
CompositeFuture res = Future.all(f1, f2).await();
String v1 = res.resultAt(0);
Integer v2 = res.resultAt(1);

不使用 await 的阻塞

当你的应用程序在不使用await的情况下阻塞时,例如使用ReentrantLock#lock,Vert.x 调度器不会知道这一点,也无法在 verticle 上调度事件:它的行为就像一个 worker verticle,但仍使用虚拟线程。

此用例不被鼓励但也不禁止,但是 verticle 应该部署多个实例以提供所需的并发性,就像 worker verticle 一样。

线程本地支持

线程本地变量仅在上下文任务的执行范围内可靠。

ThreadLocal<String> local = new ThreadLocal();
local.set(userId);
HttpClientRequest req = client.request(HttpMethod.GET, 8080, "localhost", "/").await();
HttpClientResponse resp = req.send().await();

Vert.x 中有几个对象允许从中读取和写入项目。

在 Vert.x 中,写入调用会立即返回,并且写入会内部排队。

不难看出,如果你写入对象的速度快于它实际将数据写入其底层资源的速度,则写入队列可能会无限制地增长 - 最终导致内存耗尽。

为了解决这个问题,Vert.x API 中的一些对象提供了简单的流控制(背压)功能。

任何可以写入的流控制感知对象都实现WriteStream,而任何可以读取的流控制对象都说实现ReadStream

让我们举一个例子,我们想从ReadStream读取数据,然后将数据写入WriteStream

一个非常简单的例子是从NetSocket读取数据然后写回同一个NetSocket——因为NetSocket实现了ReadStreamWriteStream。请注意,这适用于任何符合ReadStreamWriteStream的对象,包括 HTTP 请求、HTTP 响应、异步文件 I/O、WebSocket 等。

一种简单的方法是直接获取已读取的数据并立即将其写入NetSocket

NetServer server = vertx.createNetServer(
    new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
  sock.handler(buffer -> {
    // Write the data straight back
    sock.write(buffer);
  });
}).listen();

上面这个例子有一个问题:如果从 socket 读取数据的速度快于写入 socket 的速度,数据就会在NetSocket的写入队列中积压,最终导致内存耗尽。例如,如果 socket 另一端的客户端读取速度不够快,从而对连接造成背压,就可能发生这种情况。

由于NetSocket实现了WriteStream,我们可以在写入之前检查WriteStream是否已满

NetServer server = vertx.createNetServer(
    new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
  sock.handler(buffer -> {
    if (!sock.writeQueueFull()) {
      sock.write(buffer);
    }
  });

}).listen();

这个例子不会耗尽内存,但如果写入队列已满,我们最终会丢失数据。我们真正想要做的是在写入队列已满时暂停NetSocket

NetServer server = vertx.createNetServer(
    new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
  sock.handler(buffer -> {
    sock.write(buffer);
    if (sock.writeQueueFull()) {
      sock.pause();
    }
  });
}).listen();

我们几乎成功了,但还差一点。当文件满时,NetSocket现在会暂停,但我们还需要在写入队列处理完积压数据时取消暂停它

NetServer server = vertx.createNetServer(
    new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
  sock.handler(buffer -> {
    sock.write(buffer);
    if (sock.writeQueueFull()) {
      sock.pause();
      sock.drainHandler(done -> {
        sock.resume();
      });
    }
  });
}).listen();

就这样。当写入队列准备好接受更多数据时,将调用drainHandler事件处理程序,这将恢复NetSocket,从而可以读取更多数据。

在编写 Vert.x 应用程序时,这种操作非常常见,因此我们添加了pipeTo方法,它为你完成了所有这些繁重的工作。你只需将WriteStream传递给它并使用它即可

NetServer server = vertx.createNetServer(
  new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
  sock.pipeTo(sock);
}).listen();

这与更详细的示例做的事情完全相同,此外它还处理流故障和终止:当管道成功完成或发生故障时,目标WriteStream会终止。

操作完成时你可以收到通知

server.connectHandler(sock -> {

  // Pipe the socket providing an handler to be notified of the result
  sock
    .pipeTo(sock)
    .onComplete(ar -> {
      if (ar.succeeded()) {
        System.out.println("Pipe succeeded");
      } else {
        System.out.println("Pipe failed");
      }
    });
}).listen();

当处理异步目标时,你可以创建一个Pipe实例,该实例会暂停源,并在源被连接到目标时恢复它

server.connectHandler(sock -> {

  // Create a pipe to use asynchronously
  Pipe<Buffer> pipe = sock.pipe();

  // Open a destination file
  fs.open("/path/to/file", new OpenOptions())
    .onComplete(ar -> {
      if (ar.succeeded()) {
        AsyncFile file = ar.result();

        // Pipe the socket to the file and close the file at the end
        pipe.to(file);
      } else {
        sock.close();
      }
  });
}).listen();

当需要中止传输时,你需要将其关闭

vertx.createHttpServer()
  .requestHandler(request -> {

    // Create a pipe that to use asynchronously
    Pipe<Buffer> pipe = request.pipe();

    // Open a destination file
    fs.open("/path/to/file", new OpenOptions())
      .onComplete(ar -> {
        if (ar.succeeded()) {
          AsyncFile file = ar.result();

          // Pipe the socket to the file and close the file at the end
          pipe.to(file);
        } else {
          // Close the pipe and resume the request, the body buffers will be discarded
          pipe.close();

          // Send an error response
          request.response().setStatusCode(500).end();
        }
      });
  }).listen(8080);

当管道关闭时,流处理程序将被取消设置,ReadStream将恢复。

如上所述,默认情况下,当流完成时,目标总是终止,你可以在管道对象上控制此行为

这是一个简短的例子

src.pipe()
  .endOnSuccess(false)
  .to(dst)
  .onComplete(rs -> {
    // Append some text and close the file
    dst.end(Buffer.buffer("done"));
  });

现在让我们更详细地查看ReadStreamWriteStream上的方法

ReadStream

  • handler:设置一个处理程序,该处理程序将从 ReadStream 接收项。

  • pause:暂停流。暂停时,处理程序将不会接收任何项。

  • fetch:从流中获取指定数量的项。如果任何项到达,将调用处理程序。获取是累积的。

  • resume:恢复流。如果任何项到达,将调用处理程序。恢复等同于获取Long.MAX_VALUE项。

  • exceptionHandler:当 ReadStream 上发生异常时调用。

  • endHandler:当流结束时调用。这可能是当 ReadStream 表示文件时到达 EOF,或者当它是 HTTP 请求时到达请求结束,或者当它是 TCP socket 时连接关闭。

读取流处于流动获取模式

  • 流最初处于<i>流动</i>模式

  • 当流处于流动模式时,元素会传递给处理程序

  • 当流处于获取模式时,只有所请求数量的元素会传递给处理程序

pause, resumefetch 更改模式

  • resume()设置流动模式

  • pause()设置获取模式并将需求重置为0

  • fetch(long)请求特定数量的元素并将其添加到实际需求中

WriteStream

功能

  • write:向 WriteStream 写入一个对象。此方法永远不会阻塞。写入操作在内部排队,并异步写入底层资源。

  • setWriteQueueMaxSize:设置写入队列被认为是“满”时的对象数量,此时方法writeQueueFull返回true。请注意,当写入队列被认为是满时,如果调用写入方法,数据仍将被接受并排队。实际数量取决于流的实现,对于Buffer,大小表示实际写入的字节数,而不是缓冲区数量。

  • writeQueueFull:如果写入队列被认为是满的,则返回true

  • exceptionHandler:当WriteStream上发生异常时将调用此方法。

  • drainHandler:当WriteStream被认为不再满时,将调用此处理程序。

流归约

Java 收集器可以异步地将ReadStream归约为结果,其方式类似于java.util.Stream

Future<Long> result = stream.collect(Collectors.counting());

result.onSuccess(count -> System.out.println("Stream emitted " + count + " elements"));

请注意,collect会覆盖流上之前设置的任何处理程序。

记录解析器

记录解析器允许你轻松解析由字节序列或固定大小记录分隔的协议。它将输入缓冲区序列转换为按配置结构化的缓冲区序列(固定大小或分隔记录)。

例如,如果你有一个简单的 ASCII 文本协议,由“\n”分隔,输入如下:

buffer1:HELLO\nHOW ARE Y
buffer2:OU?\nI AM
buffer3: DOING OK
buffer4:\n

记录解析器将生成

buffer1:HELLO
buffer2:HOW ARE YOU?
buffer3:I AM DOING OK

让我们看看相关的代码

final RecordParser parser = RecordParser.newDelimited("\n", h -> {
  System.out.println(h.toString());
});

parser.handle(Buffer.buffer("HELLO\nHOW ARE Y"));
parser.handle(Buffer.buffer("OU?\nI AM"));
parser.handle(Buffer.buffer("DOING OK"));
parser.handle(Buffer.buffer("\n"));

你还可以生成固定大小的块,如下所示

RecordParser.newFixed(4, h -> {
  System.out.println(h.toString());
});

更多详细信息,请查看RecordParser类。

Json 解析器

你可以轻松解析 JSON 结构,但这需要一次性提供 JSON 内容,当你需要解析非常大的结构时可能不太方便。

非阻塞 JSON 解析器是一个事件驱动的解析器,能够处理非常大的结构。它将输入缓冲区序列转换为 JSON 解析事件序列。

JsonParser parser = JsonParser.newParser();

// Set handlers for various events
parser.handler(event -> {
  switch (event.type()) {
    case START_OBJECT:
      // Start an objet
      break;
    case END_OBJECT:
      // End an objet
      break;
    case START_ARRAY:
      // Start an array
      break;
    case END_ARRAY:
      // End an array
      break;
    case VALUE:
      // Handle a value
      String field = event.fieldName();
      if (field != null) {
        // In an object
      } else {
        // In an array or top level
        if (event.isString()) {

        } else {
          // ...
        }
      }
      break;
  }
});

解析器是非阻塞的,并且发出的事件由输入缓冲区驱动。

JsonParser parser = JsonParser.newParser();

// start array event
// start object event
// "firstName":"Bob" event
parser.handle(Buffer.buffer("[{\"firstName\":\"Bob\","));

// "lastName":"Morane" event
// end object event
parser.handle(Buffer.buffer("\"lastName\":\"Morane\"},"));

// start object event
// "firstName":"Luke" event
// "lastName":"Lucky" event
// end object event
parser.handle(Buffer.buffer("{\"firstName\":\"Luke\",\"lastName\":\"Lucky\"}"));

// end array event
parser.handle(Buffer.buffer("]"));

// Always call end
parser.end();

事件驱动的解析提供了更多控制,但代价是需要处理细粒度事件,这有时可能不方便。JSON 解析器允许你在需要时将 JSON 结构作为值处理

JsonParser parser = JsonParser.newParser();

parser.objectValueMode();

parser.handler(event -> {
  switch (event.type()) {
    case START_ARRAY:
      // Start the array
      break;
    case END_ARRAY:
      // End the array
      break;
    case VALUE:
      // Handle each object
      break;
  }
});

parser.handle(Buffer.buffer("[{\"firstName\":\"Bob\"},\"lastName\":\"Morane\"),...]"));
parser.end();

在解析过程中可以设置和取消设置值模式,允许你在细粒度事件或 JSON 对象值事件之间切换。

JsonParser parser = JsonParser.newParser();

parser.handler(event -> {
  // Start the object

  switch (event.type()) {
    case START_OBJECT:
      // Set object value mode to handle each entry, from now on the parser won't emit start object events
      parser.objectValueMode();
      break;
    case VALUE:
      // Handle each object
      // Get the field in which this object was parsed
      String id = event.fieldName();
      System.out.println("User with id " + id + " : " + event.value());
      break;
    case END_OBJECT:
      // Set the object event mode so the parser emits start/end object events again
      parser.objectEventMode();
      break;
  }
});

parser.handle(Buffer.buffer("{\"39877483847\":{\"firstName\":\"Bob\"},\"lastName\":\"Morane\"),...}"));
parser.end();

你也可以对数组进行同样的操作

JsonParser parser = JsonParser.newParser();

parser.handler(event -> {
  // Start the object

  switch (event.type()) {
    case START_OBJECT:
      // Set array value mode to handle each entry, from now on the parser won't emit start array events
      parser.arrayValueMode();
      break;
    case VALUE:
      // Handle each array
      // Get the field in which this object was parsed
      System.out.println("Value : " + event.value());
      break;
    case END_OBJECT:
      // Set the array event mode so the parser emits start/end object events again
      parser.arrayEventMode();
      break;
  }
});

parser.handle(Buffer.buffer("[0,1,2,3,4,...]"));
parser.end();

你也可以解码 POJO

parser.handler(event -> {
  // Handle each object
  // Get the field in which this object was parsed
  String id = event.fieldName();
  User user = event.mapTo(User.class);
  System.out.println("User with id " + id + " : " + user.firstName + " " + user.lastName);
});

每当解析器无法处理缓冲区时,除非你设置了异常处理程序,否则将抛出异常

JsonParser parser = JsonParser.newParser();

parser.exceptionHandler(err -> {
  // Catch any parsing or decoding error
});

解析器还会解析 json 流

  • 连接的 json 流:{"temperature":30}{"temperature":50}

  • 行分隔的 json 流:{"an":"object"}\r\n3\r\n"a string"\r\nnull

更多详细信息,请查看JsonParser类。

线程安全

大多数 Vert.x 对象都可以安全地从不同线程访问。但是,当它们从创建它们的同一上下文中访问时,性能会得到优化。

例如,如果你部署了一个 verticle,它创建了一个NetServer并在其处理程序中提供了NetSocket实例,那么最好始终从 verticle 的事件循环中访问该 socket 实例。

如果你遵循标准的 Vert.x verticle 部署模型并避免在 verticle 之间共享对象,那么你无需考虑这一点即可实现。

运行阻塞代码

在一个完美的世界里,没有战争或饥饿,所有的 API 都将异步编写,兔子和羊羔将在阳光明媚的绿色草地上手牵手跳跃。

但是……现实世界并非如此。(你最近看新闻了吗?)

事实是,许多(如果不是大多数)库,尤其是在 JVM 生态系统中,都具有同步 API,并且许多方法可能会阻塞。JDBC API 就是一个很好的例子——它本质上是同步的,无论 Vert.x 如何努力,都无法对其施加魔法使其异步化。

我们不会在一夜之间将所有内容重写为异步,因此我们需要提供一种方法,让你可以在 Vert.x 应用程序中安全地使用“传统”阻塞 API。

如前所述,你不能直接从事件循环调用阻塞操作,因为那会阻止它执行任何其他有用的工作。那么你如何做到这一点呢?

这通过调用executeBlocking并传入要执行的阻塞代码来完成,返回的是一个 Future,其中包含阻塞代码执行的结果。

vertx.executeBlocking(() -> {
  // Call some blocking API that takes a significant amount of time to return
  return someAPI.blockingMethod("hello");
}).onComplete(res -> {
  System.out.println("The result is: " + res.result());
});
阻塞代码应该阻塞合理的时间(即不超过几秒)。长时间阻塞操作或轮询操作(即线程以阻塞方式循环轮询事件)是被禁止的。当阻塞操作持续时间超过 10 秒时,阻塞线程检查器将在控制台打印一条消息。长时间阻塞操作应使用应用程序管理的专用线程,该线程可以使用事件总线或runOnContext与 verticle 交互

默认情况下,如果从同一上下文(例如,同一个 verticle 实例)多次调用 executeBlocking,则不同的 executeBlocking 将串行执行(即,一个接一个地执行)。

如果你不关心顺序,你可以调用executeBlocking,将ordered参数设置为false。在这种情况下,任何 executeBlocking 都可以在工作池中并行执行。

运行阻塞代码的另一种方法是使用worker verticle

工作 verticle 总是使用工作池中的一个线程执行。

默认情况下,阻塞代码在 Vert.x 工作池中执行,该工作池通过setWorkerPoolSize进行配置。

可以为不同目的创建额外的池

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(() -> {
  // Call some blocking API that takes a significant amount of time to return
  return someAPI.blockingMethod("hello");
}).onComplete(res -> {
  System.out.println("The result is: " + res.result());
});

当不再需要时,worker 执行器必须关闭

executor.close();

当创建了几个同名 worker 时,它们将共享同一个池。当所有使用它的 worker executor 都关闭时,worker 池将被销毁。

当在 Verticle 中创建执行器时,Vert.x 将在 Verticle 卸载时自动为你关闭它。

Worker 执行器可以在创建时配置

int poolSize = 10;

// 2 minutes
long maxExecuteTime = 2;
TimeUnit maxExecuteTimeUnit = TimeUnit.MINUTES;

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime, maxExecuteTimeUnit);
配置是在工作池创建时设置的

Vert.x SPI

Vert.x 实例有几个扩展点,称为SPI(服务提供商接口)。

此类 SPI 通常使用 Java 的ServiceLoader机制从 classpath 加载。

度量和跟踪 SPI

默认情况下,Vert.x 不记录任何度量或进行任何跟踪。相反,它提供了一个 SPI 供其他实现者使用,可以将其添加到 classpath 中。度量 SPI 是一项功能,允许实现者从 Vert.x 捕获事件以收集和报告度量,同样跟踪 SPI 也执行相同的操作。

有关此内容的更多信息,请参阅https://vertx.com.cn/docs/#monitoring

集群管理器 SPI

在 Vert.x 中,集群管理器用于各种功能,包括

  • 集群中 Vert.x 节点的发现和组成员

  • 维护集群范围的主题订阅者列表(这样我们就知道哪些节点对哪些事件总线地址感兴趣)

  • 分布式映射支持

  • 分布式锁

  • 分布式计数器

集群管理器处理事件总线节点间传输,这直接由 Vert.x 通过 TCP 连接完成。

Vert.x 发行版中使用的默认集群管理器是使用Hazelcast的管理器,但这可以很容易地被不同的实现替换,因为 Vert.x 集群管理器是可插拔的。

集群管理器必须实现接口ClusterManager。Vert.x 在运行时通过使用 Java Service Loader功能在 classpath 中查找ClusterManager的实例来定位集群管理器。

如果你正在命令行使用 Vert.x 并且想要使用集群,你应该确保 Vert.x 安装的lib目录包含你的集群管理器 jar。

如果你通过 Maven 或 Gradle 项目使用 Vert.x,只需将集群管理器 jar 添加为项目的依赖项即可。

有关此内容的更多信息,请参阅https://vertx.com.cn/docs/#clustering

Vert.x 构建器

Vertx.vertxVertx.clusteredVertx静态方法是获取Vertx实例最简单的方法。

你也可以使用构建器模式来创建一个Vertx实例。构建器允许你以编程方式配置一些提供者(SPI),这些提供者通常使用VertxOptions和/或 Java Service Loader 插件加载。

  • 原生传输

  • 集群管理器

  • 追踪

  • 度量实例

Vertx vertx = Vertx.builder()
  .with(options)
  .withTracer(tracerFactory)
  .withMetrics(metricsFactory)
  .build();

也可以创建集群实例。

Future<Vertx> vertx = Vertx.builder()
  .with(options)
  .withClusterManager(clusterManager)
  .buildClustered();

日志

Vert.x 使用其内部日志 API 进行日志记录,并支持各种日志后端。

日志后端选择如下

  1. 如果存在vertx.logger-delegate-factory-class-name系统属性所指示的后端,或者

  2. 当 classpath 中存在vertx-default-jul-logging.properties文件时使用 JDK logging,或者

  3. classpath 中存在的后端,按以下优先顺序

    1. SLF4J

    2. Log4J2

否则 Vert.x 默认使用 JDK logging。

通过系统属性配置

vertx.logger-delegate-factory-class-name系统属性设置为

  • io.vertx.core.logging.SLF4JLogDelegateFactory用于 SLF4J,或者

  • io.vertx.core.logging.Log4j2LogDelegateFactory用于 Log4J2,或者

  • io.vertx.core.logging.JULLogDelegateFactory用于 JDK logging

自动配置

当没有设置vertx.logger-delegate-factory-class-name系统属性时,Vert.x 将尝试找到最合适的日志记录器

  • 当 classpath 中有实际实现时使用 SLF4J(即LoggerFactory.getILoggerFactory()不是NOPLoggerFactory的实例)

  • 否则,当 classpath 中有 Log4j2 时使用它

  • 否则使用 JUL

配置 JUL 日志

JUL 日志配置文件可以通过正常的 JUL 方式指定,即提供一个名为java.util.logging.config.file的系统属性,其值为你的配置文件。有关此内容以及 JUL 配置文件结构的更多信息,请查阅 JDK 日志文档。

Vert.x 还提供了一种更便捷的方式来指定配置文件,而无需设置系统属性。只需在你的 classpath 中(例如,在你的 fatjar 中)提供一个名为vertx-default-jul-logging.properties的 JUL 配置文件,Vert.x 就会使用它来配置 JUL。

Netty 日志

Netty 不依赖外部日志配置(例如系统属性)。相反,它实现了一种基于 Netty 类可见的日志库的日志配置

  • 如果SLF4J库可见,则使用它

  • 否则,如果Log4j可见,则使用它

  • 否则,如果Log4j2可见,则使用它

  • 否则,回退到java.util.logging

眼尖的你可能已经注意到 Vert.x 遵循相同的优先级顺序。

可以通过直接在io.netty.util.internal.logging.InternalLoggerFactory上设置 Netty 的内部日志实现来强制日志记录器使用特定的实现

// Force logging to Log4j 2
InternalLoggerFactory.setDefaultFactory(Log4J2LoggerFactory.INSTANCE);

故障排除

启动时的 SLF4J 警告

如果你在启动应用程序时看到以下消息

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

这意味着你的 classpath 中有 SLF4J-API,但没有实际绑定。使用 SLF4J 记录的消息将被丢弃。你应该在 classpath 中添加一个绑定。请查看https://www.slf4j.org/manual.html#swapping选择并配置一个绑定。

请注意,Netty 会查找 SLF4-API jar 并默认使用它。

连接被对端重置

如果你的日志显示大量

io.vertx.core.net.impl.ConnectionBase
SEVERE: java.io.IOException: Connection reset by peer

这意味着客户端正在重置 HTTP 连接而不是关闭它。此消息还表明你可能没有消费完整的负载(在你能够消费之前连接就被切断了)。

主机名解析

Vert.x 使用地址解析器将主机名解析为 IP 地址,而不是 JVM 内置的阻塞解析器。

主机名使用以下方式解析为 IP 地址

  • 操作系统的hosts文件

  • 否则,对服务器列表进行 DNS 查询

默认情况下,它将使用环境变量中的系统 DNS 服务器地址列表,如果无法检索该列表,它将使用 Google 的公共 DNS 服务器"8.8.8.8""8.8.4.4"

在创建Vertx实例时也可以配置 DNS 服务器

Vertx vertx = Vertx.vertx(new VertxOptions().
    setAddressResolverOptions(
        new AddressResolverOptions().
            addServer("192.168.0.1").
            addServer("192.168.0.2:40000"))
);

DNS 服务器的默认端口是53,当服务器使用不同端口时,可以使用冒号分隔符设置该端口:192.168.0.2:40000

有时可能需要使用 JVM 内置解析器,JVM 系统属性-Dvertx.disableDnsResolver=true会激活此行为

故障转移

当服务器未及时回复时,解析器将尝试列表中的下一个服务器,搜索限制为setMaxQueries(默认值为4次查询)。

当解析器在getQueryTimeout毫秒内(默认值为5秒)没有收到正确答案时,DNS 查询被视为失败。

服务器列表轮换

默认情况下,DNS 服务器选择使用第一个,其余服务器用于故障转移。

你可以将setRotateServers配置为true,让解析器执行轮询选择。它将查询负载分散到服务器之间,并避免所有查找都命中列表中的第一个服务器。

故障转移仍然适用,并将使用列表中的下一个服务器。

主机映射

操作系统的hosts文件用于执行 IP 地址的主机名查找。

可以使用替代的hosts文件

Vertx vertx = Vertx.vertx(new VertxOptions().
    setAddressResolverOptions(
        new AddressResolverOptions().
            setHostsPath("/path/to/hosts"))
);

搜索域

默认情况下,解析器将使用环境中的系统 DNS 搜索域。或者,可以提供明确的搜索域列表

Vertx vertx = Vertx.vertx(new VertxOptions().
    setAddressResolverOptions(
        new AddressResolverOptions().addSearchDomain("foo.com").addSearchDomain("bar.com"))
);

当使用搜索域列表时,点数的阈值为1或从 Linux 上的/etc/resolv.conf加载,可以通过setNdots配置为特定值。

MacOS 配置

MacOS 有一个特定的原生扩展,可以根据Apple 的开源 mDNSResponder获取系统的名称服务器配置。当此扩展不存在时,Netty 会记录以下警告。

[main] WARN io.netty.resolver.dns.DnsServerAddressStreamProviders - Can not find io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider in the classpath, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS.

此扩展不是必需的,因为它的缺失不会阻止 Vert.x 执行,但建议使用。

你可以将其添加到你的 classpath 中以改善集成并消除警告。

基于 Intel 的 Mac
<profile>
  <id>mac-intel</id>
  <activation>
    <os>
      <family>mac</family>
      <arch>x86_64</arch>
    </os>
  </activation>
  <dependencies>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-resolver-dns-native-macos</artifactId>
      <classifier>osx-x86_64</classifier>
      <!--<version>Should align with netty version that Vert.x uses</version>-->
    </dependency>
  </dependencies>
</profile>
M1/M2 Mac
<profile>
  <id>mac-silicon</id>
  <activation>
    <os>
      <family>mac</family>
      <arch>aarch64</arch>
    </os>
  </activation>
  <dependencies>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-resolver-dns-native-macos</artifactId>
      <classifier>osx-aarch_64</classifier>
      <!--<version>Should align with netty version that Vert.x uses</version>-->
    </dependency>
  </dependencies>
</profile>

原生传输

Vert.x 可以在 BSD (OSX) 和 Linux 上与原生传输(如果可用)一起运行

Vertx vertx = Vertx.vertx(new VertxOptions().
  setPreferNativeTransport(true)
);

// True when native is available
boolean usingNative = vertx.isNativeTransportEnabled();
System.out.println("Running with native: " + usingNative);
优先使用原生传输不会阻止应用程序执行(例如,可能缺少原生依赖)。如果你的应用程序需要原生传输,你需要检查isNativeTransportEnabled

你还可以显式配置要使用的传输

Transport transport = Transport.nativeTransport();

// Or use a very specific transport
transport = Transport.EPOLL;

Vertx vertx = Vertx.builder()
  .withTransport(transport)
  .build();

原生 epoll

Linux 上的原生提供了额外的网络选项

  • SO_REUSEPORT

  • TCP_QUICKACK

  • TCP_CORK

  • TCP_FASTOPEN

  • TCP_USER_TIMEOUT

你需要在 classpath 中添加以下依赖

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-epoll</artifactId>
  <classifier>linux-x86_64</classifier>
  <!--<version>Should align with netty version that Vert.x uses</version>-->
</dependency>

原生 io_uring

你需要在 classpath 中添加以下依赖

<dependency>
  <groupId>io.netty</groupId>
  <classifier>linux-x86_64</classifier>
  <artifactId>netty-transport-native-io_uring</artifactId>
  <!--<version>Should align with netty version that Vert.x uses</version>-->
</dependency>

原生 kqueue

你需要在 classpath 中添加以下依赖

基于 Intel 的 Mac
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-kqueue</artifactId>
  <classifier>osx-x86_64</classifier>
  <!--<version>Should align with netty version that Vert.x uses</version>-->
</dependency>
M1/M2 Mac
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-aarch_64</classifier>
<!--<version>Should align with netty version that Vert.x uses</version>-->
</dependency>

支持 MacOS Sierra 及更高版本。

BSD 上的原生提供了额外的网络选项

  • SO_REUSEPORT

vertx.createHttpServer(new HttpServerOptions().setReusePort(reusePort));

安全注意事项

Vert.x 是一个工具包,而不是一个强制你以某种方式做事的框架。这赋予你作为开发人员巨大的力量,但同时也伴随着巨大的责任。

与任何工具包一样,编写不安全的应用程序是可能的,因此你在开发应用程序时应始终小心,特别是当它暴露于公共(例如,通过互联网)时。

Web 应用程序

如果编写 Web 应用程序,强烈建议你使用 Vert.x-Web 而不是直接使用 Vert.x core 来提供资源和处理文件上传。

Vert.x-Web 对请求中的路径进行规范化,以防止恶意客户端通过构造 URL 访问 Web 根目录之外的资源。

类似地,对于文件上传,Vert.x-Web 提供了将文件上传到磁盘上已知位置的功能,并且不依赖于客户端在上传中提供的文件名,因为文件名可能被伪造以上传到磁盘上的不同位置。

Vert.x core 本身不提供此类检查,因此作为开发人员,你需要自行实现它们。

集群事件总线流量

当在网络上的不同 Vert.x 节点之间集群事件总线时,流量以未加密的方式发送,因此如果你的 Vert.x 节点不在受信任的网络上且你需要发送机密数据,请勿使用此功能。

标准安全最佳实践

任何服务都可能存在潜在漏洞,无论是使用 Vert.x 还是其他工具包编写的,因此请始终遵循安全最佳实践,尤其是当你的服务面向公众时。

例如,你应该始终将它们运行在 DMZ 中,并使用具有有限权限的用户帐户,以限制服务被攻破时造成的损害范围。

配置 Vert.x 缓存

当 Vert.x 需要从 classpath 读取文件(嵌入在 fat jar 中,以 jar 形式存在于 classpath 中,或 classpath 上的文件)时,它会将其复制到缓存目录。原因很简单:从 jar 或输入流读取文件是阻塞的。因此,为了避免每次都付出代价,Vert.x 会将文件复制到其缓存目录,并从那里进行后续读取。此行为可以配置。

首先,默认情况下,Vert.x 使用$CWD/.vertx作为缓存目录。它在此目录内创建一个唯一的目录以避免冲突。可以通过使用vertx.cacheDirBase系统属性来配置此位置。例如,如果当前工作目录不可写(例如在不可变容器环境中),请使用以下命令启动应用程序

java -jar my-fat.jar vertx.cacheDirBase=/tmp/vertx-cache
该目录必须是可写的

当你编辑 HTML、CSS 或 JavaScript 等资源时,这种缓存机制可能会很烦人,因为它只提供文件的第一个版本(因此如果你重新加载页面,将看不到你的编辑)。为了避免这种行为,请使用-Dvertx.disableFileCaching=true启动应用程序。在此设置下,Vert.x 仍然使用缓存,但始终用原始源刷新缓存中存储的版本。因此,如果你编辑从 classpath 提供的文件并刷新浏览器,Vert.x 会从 classpath 读取它,将其复制到缓存目录并从那里提供。不要在生产环境中使用此设置,它可能会严重影响性能。

最后,你可以通过使用-Dvertx.disableFileCPResolving=true完全禁用缓存。此设置并非没有后果。Vert.x 将无法读取 classpath 中的任何文件(只能从文件系统读取)。使用此设置时请务必小心。