响应式 DB2 客户端

响应式 DB2 客户端是一个用于 DB2 的客户端,具有直观的 API,专注于可伸缩性和低开销。

该客户端是响应式且非阻塞的,允许单线程处理许多数据库连接。

特性

  • 支持 Linux、Unix 和 Windows 上的 DB2

  • 对 z/OS 上的 DB2 支持有限

  • 事件驱动

  • 轻量级

  • 内置连接池

  • 预处理查询缓存

  • 批处理和游标

  • 行流式传输

  • RxJava API

  • 直接内存到对象,无需不必要的复制

  • Java 8 日期和时间

  • SSL/TLS

  • 支持 HTTP/1.x CONNECT、SOCKS4a 或 SOCKS5 代理

当前限制

  • 不支持存储过程

  • 不支持某些列类型(例如 BLOB 和 CLOB)

用法

要使用响应式 DB2 客户端,请将以下依赖项添加到构建描述符的 dependencies 部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-db2-client</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-db2-client:5.0.1'
}

快速入门

以下是连接、查询和断开连接的最简单方法

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the client pool
Pool client = DB2Builder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .build();

// A simple query
client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }

  // Now close the pool
  client.close();
});

连接到 DB2

大部分时间您将使用连接池连接到 DB2

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
SqlClient client = DB2Builder.client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

池化客户端使用连接池,任何操作都将从池中借用一个连接来执行操作并将其释放回池中。

如果您正在运行 Vert.x,您可以将您的 Vertx 实例传递给它

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// Create the pooled client
SqlClient client = DB2Builder.client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

当您不再需要客户端时,需要释放它

client.close();

当您需要在同一连接上执行多个操作时,需要从连接池中获取一个 连接

您可以轻松地从连接池中获取一个

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
Pool client = DB2Builder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Get a connection from the pool
client.getConnection().compose(conn -> {
  System.out.println("Got a connection from the pool");

  // All operations execute on the same connection
  return conn
    .query("SELECT * FROM users WHERE id='julien'")
    .execute()
    .compose(res -> conn
      .query("SELECT * FROM users WHERE id='emad'")
      .execute())
    .onComplete(ar -> {
      // Release the connection to the pool
      conn.close();
    });
}).onComplete(ar -> {
  if (ar.succeeded()) {

    System.out.println("Done");
  } else {
    System.out.println("Something went wrong " + ar.cause().getMessage());
  }
});

一旦您使用完连接,您必须关闭它以将其释放回连接池,以便可以重用它。

连接池与池化客户端

DB2Builder 允许您创建连接池或池化客户端

SqlClient client = DB2Builder.client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Pipelined
Future<RowSet<Row>> res1 = client.query(sql).execute();

// Connection pool
Pool pool = DB2Builder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Not pipelined
Future<RowSet<Row>> res2 = pool.query(sql).execute();
  • 连接池操作不是流水线化的,只有从连接池获取的连接才是流水线化的

  • 池化客户端操作是流水线化的,您无法从池化客户端获取连接

连接池共享

您可以在多个 Verticle 或同一 Verticle 的多个实例之间共享一个连接池。这样的连接池应该在 Verticle 之外创建,否则当创建它的 Verticle 卸载时,它将被关闭

Pool pool = DB2Builder.pool()
  .with(new PoolOptions().setMaxSize(maxSize))
  .connectingTo(database)
  .using(vertx)
  .build();
vertx.deployVerticle(() -> new VerticleBase() {
  @Override
  public Future<?> start() throws Exception {
    // Use the pool
    return super.start();
  }
}, new DeploymentOptions().setInstances(4));

您也可以在每个 Verticle 中创建一个共享连接池

vertx.deployVerticle(() -> new VerticleBase() {
  Pool pool;
  @Override
  public Future<?> start() throws Exception {
    // Get or create a shared pool
    // this actually creates a lease to the pool
    // when the verticle is undeployed, the lease will be released automaticaly
    pool = DB2Builder.pool()
      .with(new PoolOptions()
        .setMaxSize(maxSize)
        .setShared(true)
        .setName("my-pool"))
      .connectingTo(database)
      .using(vertx)
      .build();
    return super.start();
  }
}, new DeploymentOptions().setInstances(4));

首次创建共享池时,它将为该池创建资源。后续调用将重用此池并创建对此池的租约。资源在所有租约关闭后处置。

默认情况下,连接池在需要创建 TCP 连接时会重用当前事件循环。因此,共享连接池将随机使用使用它的 verticle 的事件循环。

您可以为连接池分配一个事件循环数量,使其独立于使用它的上下文

Pool pool = DB2Builder.pool()
  .with(new PoolOptions()
    .setMaxSize(maxSize)
    .setShared(true)
    .setName("my-pool")
    .setEventLoopSize(4))
  .connectingTo(database)
  .using(vertx)
  .build();

配置

您可以通过几种替代方案配置客户端。

数据对象

配置客户端的一种简单方法是指定一个 DB2ConnectOptions 数据对象。

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);

// Create the pool from the data object
Pool pool = DB2Builder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

pool.getConnection()
  .onComplete(ar -> {
    // Handling your connection
  });

您还可以使用 setPropertiesaddProperty 方法配置通用属性。请注意,setProperties 将覆盖默认客户端属性。

连接 URI

除了使用 DB2ConnectOptions 数据对象进行配置外,当您希望使用连接 URI 进行配置时,我们还提供了另一种连接方式

String connectionUri = "db2://dbuser:[email protected]:50000/mydb";

// Create the pool from the connection URI
Pool pool = DB2Builder.pool()
  .connectingTo(connectionUri)
  .using(vertx)
  .build();

// Create the connection from the connection URI
DB2Connection.connect(vertx, connectionUri)
  .onComplete(res -> {
    // Handling your connection
  });

连接字符串的 URI 格式为

db2://[user[:[password]]@]host[:port][/database][?<key1>=<value1>[&<key2>=<value2>]]

目前,客户端支持以下参数键

  • 主机

  • 端口

  • 用户

  • 密码

  • 数据库

在连接 URI 中配置参数将覆盖默认属性。

连接重试

您可以配置客户端在连接建立失败时进行重试。

options
  .setReconnectAttempts(2)
  .setReconnectInterval(1000);

运行查询

当您不需要事务或运行单个查询时,您可以直接在连接池上运行查询;连接池将使用其一个连接来运行查询并将结果返回给您。

以下是运行简单查询的方法

client
  .query("SELECT * FROM users WHERE id='andy'")
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

预处理查询

您也可以对预处理查询执行相同的操作。

SQL 字符串可以通过位置引用参数,使用数据库语法 ?

client
  .preparedQuery("SELECT * FROM users WHERE id=?")
  .execute(Tuple.of("andy"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

查询方法提供一个异步 RowSet 实例,适用于 SELECT 查询

client
  .preparedQuery("SELECT first_name, last_name FROM users")
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

UPDATE/INSERT 查询

client
  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
  .execute(Tuple.of("Andy", "Guibert"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Row 允许您通过索引访问数据

System.out.println("User " + row.getString(0) + " " + row.getString(1));
列索引从 0 开始,而不是从 1 开始。

或者,可以通过名称检索数据

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

客户端在这里不会做任何神奇的事情,列名是根据表中的名称来识别的,无论您的 SQL 文本是什么。

您可以访问各种类型

String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

您可以使用缓存的预处理语句来执行一次性预处理查询

connectOptions.setCachePreparedStatements(true);
client
  .preparedQuery("SELECT * FROM users WHERE id = ?")
  .execute(Tuple.of("julien"))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Got " + rows.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

您可以创建一个 PreparedStatement 并自行管理生命周期。

sqlConnection
  .prepare("SELECT * FROM users WHERE id = ?")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      PreparedStatement preparedStatement = ar.result();
      preparedStatement.query()
        .execute(Tuple.of("julien"))
        .onComplete(ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            System.out.println("Got " + rows.size() + " rows ");
            preparedStatement.close();
          } else {
            System.out.println("Failure: " + ar2.cause().getMessage());
          }
        });
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

批处理

您可以执行预处理批处理

List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julient Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
batch.add(Tuple.of("andy", "Andy Guibert"));

// Execute the prepared batch
client
  .preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
  .executeBatch(batch)
  .onComplete(res -> {
  if (res.succeeded()) {

    // Process rows
    RowSet<Row> rows = res.result();
  } else {
    System.out.println("Batch failed " + res.cause());
  }
});

您可以通过将查询包装在 SELECT <COLUMNS> FROM FINAL TABLE ( <SQL> ) 中来获取生成的键,例如

client
  .preparedQuery("SELECT color_id FROM FINAL TABLE ( INSERT INTO color (color_name) VALUES (?), (?), (?) )")
  .execute(Tuple.of("white", "red", "blue"))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Inserted " + rows.rowCount() + " new rows.");
      for (Row row : rows) {
        System.out.println("generated key: " + row.getInteger("color_id"));
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

使用连接

获取连接

当您需要执行顺序查询(不带事务)时,可以创建一个新连接或从连接池中借用一个。请记住,在从连接池获取连接并将其返回到连接池之间,您应该注意连接,因为它可能因某些原因(例如空闲超时)而被服务器关闭。

pool
  .getConnection()
  .compose(connection ->
    connection
      .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
      .executeBatch(Arrays.asList(
        Tuple.of("Julien", "Viet"),
        Tuple.of("Andy", "Guibert")
      ))
      .compose(res -> connection
        // Do something with rows
        .query("SELECT COUNT(*) FROM Users")
        .execute()
        .map(rows -> rows.iterator().next().getInteger(0)))
      // Return the connection to the pool
      .eventually(() -> connection.close())
  ).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

可以创建预处理查询

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE ?")
  .compose(pq ->
    pq.query()
      .execute(Tuple.of("Andy"))
      .eventually(() -> pq.close())
  ).onSuccess(rows -> {
  // All rows
});

简化连接 API

当您使用连接池时,可以调用 withConnection,将一个在连接中执行的函数传递给它。

它从连接池中借用一个连接,并使用该连接调用函数。

该函数必须返回一个任意结果的 Future。

Future 完成后,连接将返回到连接池,并提供总结果。

pool.withConnection(connection ->
  connection
    .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
    .executeBatch(Arrays.asList(
      Tuple.of("Julien", "Viet"),
      Tuple.of("Andy", "Guibert")
    ))
    .compose(res -> connection
      // Do something with rows
      .query("SELECT COUNT(*) FROM Users")
      .execute()
      .map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

使用事务

连接事务

您可以使用 SQL 的 BEGIN/COMMIT/ROLLBACK 来执行事务,如果您这样做,则必须使用 SqlConnection 并自行管理它。

或者您可以使用 SqlConnection 的事务 API

pool.getConnection()
  // Transaction must use a connection
  .onSuccess(conn -> {
    // Begin the transaction
    conn.begin()
      .compose(tx -> conn
        // Various statements
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
        .execute()
        .compose(res2 -> conn
          .query("INSERT INTO Users (first_name,last_name) VALUES ('Andy','Guibert')")
          .execute())
        // Commit the transaction
        .compose(res3 -> tx.commit()))
      // Return the connection to the pool
      .eventually(() -> conn.close())
      .onSuccess(v -> System.out.println("Transaction succeeded"))
      .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
  });

当数据库服务器报告当前事务失败时(例如臭名昭著的 current transaction is aborted, commands ignored until end of transaction block),事务将回滚,并且 completion future 将以 TransactionRollbackException 失败。

tx.completion()
  .onFailure(err -> {
    System.out.println("Transaction failed => rolled back");
  });

简化的事务 API

当您使用连接池时,您可以调用 withTransaction,将一个在事务中执行的函数传递给它。

它从连接池中借用一个连接,开始事务,并使用一个在事务范围内执行所有操作的客户端调用该函数。

该函数必须返回一个任意结果的 Future

  • 当 Future 成功时,客户端将提交事务

  • 当 Future 失败时,客户端将回滚事务

事务完成后,连接将返回到连接池,并提供总结果。

pool.withTransaction(client -> client
  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  .execute()
  .flatMap(res -> client
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Andy','Guibert')")
    .execute()
    // Map to a message result
    .map("Users inserted")))
  .onSuccess(v -> System.out.println("Transaction succeeded"))
  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

游标和流式传输

默认情况下,预处理查询执行会获取所有行,您可以使用 游标 来控制您想要读取的行数

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE ?")
  .onComplete(ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();

    // Cursors require to run within a transaction
    connection
      .begin()
      .onComplete(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();

        // Create a cursor
        Cursor cursor = pq.cursor(Tuple.of("julien"));

        // Read 50 rows
        cursor
          .read(50)
          .onComplete(ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();

            // Check for more ?
            if (cursor.hasMore()) {
              // Repeat the process...
            } else {
              // No more rows - commit the transaction
              tx.commit();
            }
          }
        });
      }
    });
  }
});

游标在提前释放时应关闭

cursor
  .read(50)
  .onComplete(ar2 -> {
  if (ar2.succeeded()) {
    // Close the cursor
    cursor.close();
  }
});

游标也提供了流 API,这可能更方便,尤其是在 Rxified 版本中。

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE ?")
  .onComplete(ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();

    // Streams require to run within a transaction
    connection
      .begin()
      .onComplete(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();

        // Fetch 50 rows at a time
        RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));

        // Use the stream
        stream.exceptionHandler(err -> {
          System.out.println("Error: " + err.getMessage());
        });
        stream.endHandler(v -> {
          tx.commit();
          System.out.println("End of stream");
        });
        stream.handler(row -> {
          System.out.println("User: " + row.getString("last_name"));
        });
      }
    });
  }
});

流以 50 行的批次读取行并进行流式传输,当行传递给处理器后,新的 50 行批次将被读取,依此类推。

流可以暂停或恢复,加载的行将保留在内存中直到它们被传递,并且游标将停止迭代。

跟踪查询

当 Vert.x 启用了跟踪时,SQL 客户端可以跟踪查询执行。

客户端报告以下客户端跨度

  • Query 操作名称

  • 标签

  • db.system:数据库管理系统产品

  • db.user:数据库用户名

  • db.instance:数据库实例

  • db.statement:SQL 查询

  • db.typesql

默认的追踪策略是 PROPAGATE,客户端只会在活跃的追踪中创建跨度。

您可以使用 setTracingPolicy 更改客户端策略,例如您可以将 ALWAYS 设置为始终报告跨度。

options.setTracingPolicy(TracingPolicy.ALWAYS);

DB2 类型映射

目前客户端支持以下 DB2 类型

  • BOOLEAN (java.lang.Boolean) (仅限 DB2 LUW)

  • SMALLINT (java.lang.Short)

  • INTEGER (java.lang.Integer)

  • BIGINT (java.lang.Long)

  • REAL (java.lang.Float)

  • DOUBLE (java.lang.Double)

  • DECIMAL (io.vertx.sqlclient.data.Numeric)

  • CHAR (java.lang.String)

  • VARCHAR (java.lang.String)

  • ENUM (java.lang.String)

  • DATE (java.time.LocalDate)

  • TIME (java.time.LocalTime)

  • TIMESTAMP (java.time.LocalDateTime)

  • BINARY (byte[])

  • VARBINARY (byte[])

  • ROWID (io.vertx.db2client.impl.drda.DB2RowIdjava.sql.RowId) (仅限 DB2 z/OS)

目前不支持的类型包括

  • XML

  • BLOB

  • CLOB

  • DBCLOB

  • GRAPHIC / VARGRAPHIC

有关 DB2 数据类型的更多文档,请参阅以下资源

元组解码在存储值时使用上述类型,并且在可能的情况下还会对实际值进行即时转换

pool
  .query("SELECT an_int_column FROM exampleTable")
  .execute().onSuccess(rowSet -> {
    Row row = rowSet.iterator().next();

    // Stored as INTEGER column type and represented as java.lang.Integer
    Object value = row.getValue(0);

    // Convert to java.lang.Long
    Long longValue = row.getLong(0);
  });

使用 Java 枚举类型

您可以将 Java 枚举类型 映射到这些列类型

  • 字符串 (VARCHAR, TEXT)

  • 数字 (SMALLINT, INTEGER, BIGINT)

client.preparedQuery("SELECT day_name FROM FINAL TABLE ( INSERT INTO days (day_name) VALUES (?), (?), (?) )")
  .execute(Tuple.of(Days.FRIDAY, Days.SATURDAY, Days.SUNDAY))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Inserted " + rows.rowCount() + " new rows");
      for (Row row : rows) {
        System.out.println("Day: " + row.get(Days.class, "day_name"));
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });
client.preparedQuery("SELECT day_num FROM FINAL TABLE ( INSERT INTO days (day_num) VALUES (?), (?), (?) )")
  .execute(Tuple.of(Days.FRIDAY.ordinal(), Days.SATURDAY.ordinal(), Days.SUNDAY.ordinal()))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Inserted " + rows.rowCount() + " new rows");
      for (Row row : rows) {
        System.out.println("Day: " + row.get(Days.class, "day_num"));
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

字符串类型与 name() 方法返回的 Java 枚举名称匹配。

数字类型与 ordinal() 方法返回的 Java 枚举的序数匹配,并且 row.get() 方法在检索到的整数值的序数位置返回相应枚举的 name() 值。

收集器查询

您可以将 Java 收集器与查询 API 结合使用

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));

// Run the query with the collector
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      SqlResult<Map<Long, String>> result = ar.result();

      // Get the map created by the collector
      Map<Long, String> map = result.value();
      System.out.println("Got " + map);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

收集器处理不得保留对 Row 的引用,因为处理整个集合只使用一个行。

Java Collectors 提供了许多有趣的预定义收集器,例如您可以轻松地直接从行集创建字符串

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
);

// Run the query with the collector
client
  .query("SELECT * FROM users")
  .collecting(collector)
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();

      // Get the string created by the collector
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

使用 SSL/TLS

要配置客户端以使用 SSL 连接,您可以像配置 Vert.x NetClient 一样配置 DB2ConnectOptions

DB2ConnectOptions options = new DB2ConnectOptions()
  .setPort(50001)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setSsl(true)
  .setSslOptions(new ClientSSLOptions().setTrustOptions(new JksOptions()
    .setPath("/path/to/keystore.p12")
    .setPassword("keystoreSecret")));

DB2Connection.connect(vertx, options)
  .onComplete(res -> {
    if (res.succeeded()) {
      // Connected with SSL
    } else {
      System.out.println("Could not connect " + res.cause());
    }
  });

更多信息可在 Vert.x 文档中找到。

使用代理

您还可以配置客户端以使用 HTTP/1.x CONNECT、SOCKS4a 或 SOCKS5 代理。

更多信息可在 Vert.x 文档中找到。

高级连接池配置

服务器负载均衡

您可以将连接池配置为包含服务器列表而不是单个服务器。

Pool pool = DB2Builder.pool()
  .with(options)
  .connectingTo(Arrays.asList(server1, server2, server3))
  .using(vertx)
  .build();

连接池在创建连接时使用轮询负载均衡来选择不同的服务器。

这在创建连接时提供负载均衡,而不是在从连接池借用连接时。

连接池初始化

您可以使用 withConnectHandler 在连接创建后并将其插入连接池之前与连接进行交互。

builder.withConnectHandler(conn -> {
  conn.query(sql).execute().onSuccess(res -> {
    // Release the connection to the pool, ready to be used by the application
    conn.close();
  });
});

一旦您完成连接,您应该简单地关闭它以向连接池发出信号以使用它。

动态连接配置

您可以使用 Java 供应商而不是 SqlConnectOptions 实例来配置连接池的连接详情。

由于供应商是异步的,它可以用于提供动态的连接池配置(例如密码轮换)。

Pool pool = DB2Builder.pool()
  .with(poolOptions)
  .connectingTo(() -> {
    Future<SqlConnectOptions> connectOptions = retrieveOptions();
    return connectOptions;
  })
  .using(vertx)
  .build();