响应式 PostgreSQL 客户端

响应式 PostgreSQL 客户端是一个用于 PostgreSQL 的客户端,具有直接的 API,专注于可伸缩性和低开销。

该客户端是响应式且非阻塞的,允许单线程处理许多数据库连接。

  • 事件驱动

  • 轻量级

  • 内置连接池

  • 预处理查询缓存

  • 使用 PostgreSQL NOTIFY/LISTEN 进行发布/订阅

  • 批处理和游标

  • 行流式传输

  • 命令管道化

  • RxJava API

  • 直接内存到对象,无需不必要的复制

  • Java 8 日期和时间

  • SSL/TLS

  • Unix 域套接字

  • HTTP/1.x CONNECT、SOCKS4a 或 SOCKS5 代理支持

  • 代理(4 层和 7 层)支持

用法

要使用响应式 PostgreSQL 客户端,请将以下依赖项添加到构建描述符的 dependencies 部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-pg-client</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-pg-client:5.0.1'
}

快速入门

以下是连接、查询和断开连接的最简单方法

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the client pool
SqlClient client = PgBuilder
  .client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .build();

// A simple query
client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> result = ar.result();
      System.out.println("Got " + result.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }

    // Now close the pool
    client.close();
  });

连接到 PostgreSQL

大多数情况下,您将使用连接池连接到 PostgreSQL

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
SqlClient client = PgBuilder
  .client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .build();

池化客户端使用连接池,任何操作都将从池中借用一个连接来执行操作并将其释放回池中。

如果您正在运行 Vert.x,您可以将您的 Vertx 实例传递给它

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
SqlClient client = PgBuilder
  .client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

当您不再需要客户端时,需要释放它

client.close();

当您需要在同一连接上执行多个操作时,需要从连接池中获取一个 connection

您可以轻松地从连接池中获取一个

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
Pool pool = PgBuilder
  .pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Get a connection from the pool
pool.getConnection().compose(conn -> {
  System.out.println("Got a connection from the pool");

  // All operations execute on the same connection
  return conn
    .query("SELECT * FROM users WHERE id='julien'")
    .execute()
    .compose(res -> conn
      .query("SELECT * FROM users WHERE id='emad'")
      .execute())
    .onComplete(ar -> {
      // Release the connection to the pool
      conn.close();
    });
}).onComplete(ar -> {
  if (ar.succeeded()) {
    System.out.println("Done");
  } else {
    System.out.println("Something went wrong " + ar.cause().getMessage());
  }
});

一旦您使用完连接,您必须关闭它以将其释放回连接池,以便可以重用它。

命令管道化

在某些用例中,命令管道化可以提高数据库访问性能。

您可以配置客户端以使用管道化

Pool pool = PgBuilder
  .pool()
  .connectingTo(connectOptions.setPipeliningLimit(16))
  .with(poolOptions)
  .using(vertx)
  .build();

默认的管道化限制为 256

您可以将此值设置为 1 以禁用管道化。

连接池与池化客户端

PgBuilder 允许您创建连接池或池化客户端

SqlClient client = PgBuilder
  .client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Pipelined
Future<RowSet<Row>> res1 = client.query(sql).execute();

// Connection pool
Pool pool = PgBuilder
  .pool()
  .connectingTo(connectOptions)
  .with(poolOptions)
  .using(vertx)
  .build();

// Not pipelined
Future<RowSet<Row>> res2 = pool.query(sql).execute();
  • 连接池操作不是流水线化的,只有从连接池获取的连接才是流水线化的

  • 池化客户端操作是流水线化的,您无法从池化客户端获取连接

连接池共享

您可以在多个 Verticle 或同一 Verticle 的多个实例之间共享一个连接池。这样的连接池应该在 Verticle 之外创建,否则当创建它的 Verticle 卸载时,它将被关闭

Pool pool = Pool.pool(database, new PoolOptions().setMaxSize(maxSize));
vertx.deployVerticle(() -> new VerticleBase() {
  @Override
  public Future<?> start() throws Exception {
    // Use the pool
    return super.start();
  }
}, new DeploymentOptions().setInstances(4));

您也可以在每个 Verticle 中创建一个共享连接池

vertx.deployVerticle(() -> new VerticleBase() {
  Pool pool;
  @Override
  public Future<?> start() throws Exception {
    // Get or create a shared pool
    // this actually creates a lease to the pool
    // when the verticle is undeployed, the lease will be released automaticaly
    pool = PgBuilder.pool()
      .with(new PoolOptions()
        .setMaxSize(maxSize)
        .setShared(true)
        .setName("my-pool"))
      .connectingTo(database)
      .using(vertx)
      .build();
    return super.start();
  }
}, new DeploymentOptions().setInstances(4));

首次创建共享池时,它将为该池创建资源。后续调用将重用此池并创建对此池的租约。资源在所有租约关闭后处置。

默认情况下,连接池在需要创建 TCP 连接时会重用当前事件循环。因此,共享连接池将随机使用使用它的 verticle 的事件循环。

您可以为连接池分配一个事件循环数量,使其独立于使用它的上下文

Pool pool = PgBuilder.pool()
  .with(new PoolOptions()
    .setMaxSize(maxSize)
    .setShared(true)
    .setName("my-pool")
    .setEventLoopSize(4))
  .connectingTo(database)
  .using(vertx)
  .build();

Unix 域套接字

有时您希望通过 Unix 域套接字连接来提高性能,我们通过 Vert.x Native transports 实现这一点。

请确保您已在 classpath 中添加了所需的 netty-transport-native 依赖项并启用了 Unix 域套接字选项。

PgConnectOptions connectOptions = new PgConnectOptions()
  .setHost("/var/run/postgresql")
  .setPort(5432)
  .setDatabase("the-db");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
Pool client = PgBuilder
  .pool()
  .connectingTo(connectOptions)
  .with(poolOptions)
  .build();

// Create the pooled client with a vertx instance
// Make sure the vertx instance has enabled native transports
Pool client2 = PgBuilder
  .pool()
  .connectingTo(connectOptions)
  .with(poolOptions)
  .using(vertx)
  .build();

更多信息请参见 Vert.x 文档

连接重试

您可以配置客户端在连接建立失败时进行重试。

options
  .setReconnectAttempts(2)
  .setReconnectInterval(1000);

配置

您可以通过几种替代方案配置客户端。

数据对象

配置客户端的一个简单方法是指定一个 PgConnectOptions 数据对象。

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);

// Create the pool from the data object
Pool pool = PgBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

pool.getConnection()
  .onComplete(ar -> {
    // Handling your connection
  });

您还可以使用 setPropertiesaddProperty 方法配置通用属性。请注意,setProperties 将覆盖默认的客户端属性。

将此客户端与 CockroachDB DBaaS 一起使用时,需要使用 addProperty("options", "--cluster=<cluster-id>") 或在 URL 中使用 …​&options=--cluster%3D<cluster-id> 来包含 cluster 选项

例如,您可以通过添加 search_path 属性来为连接设置默认模式。

PgConnectOptions connectOptions = new PgConnectOptions();

// Set the default schema
Map<String, String> props = new HashMap<>();
props.put("search_path", "myschema");
connectOptions.setProperties(props);

有关可用属性的更多信息,请参见 PostgreSQL 手册

连接 URI

除了使用 PgConnectOptions 数据对象进行配置外,当您希望使用连接 URI 进行配置时,我们还提供了另一种连接方式

String connectionUri = "postgresql://dbuser:[email protected]:5432/mydb";

// Create the pool from the connection URI
Pool pool = PgBuilder.pool()
  .connectingTo(connectionUri)
  .using(vertx)
  .build();

// Create the connection from the connection URI
PgConnection
  .connect(vertx, connectionUri)
  .onComplete(res -> {
    // Handling your connection
  });

有关连接字符串格式的更多信息,请参见 PostgreSQL 手册

目前,客户端支持以下参数键

  • host

  • hostaddr

  • port

  • user

  • password

  • dbname

  • sslmode

  • 附加属性,包括

    • application_name

    • fallback_application_name

    • search_path

    • options

在连接 URI 中配置参数将覆盖默认属性。

环境变量

您还可以使用环境变量设置默认连接值,这在您希望避免硬编码数据库连接信息时很有用。有关更多详细信息,您可以参考官方文档。支持以下参数

  • PGHOST

  • PGHOSTADDR

  • PGPORT

  • PGDATABASE

  • PGUSER

  • PGPASSWORD

  • PGSSLMODE

如果您未指定数据对象或连接 URI 字符串进行连接,环境变量将优先于它们。

$ PGUSER=user \
  PGHOST=the-host \
  PGPASSWORD=secret \
  PGDATABASE=the-db \
  PGPORT=5432 \
  PGSSLMODE=DISABLE
Pool pool = PgBuilder.pool()
  .using(vertx)
  .build();

// Create the connection from the environment variables
PgConnection.connect(vertx)
  .onComplete(res -> {
    // Handling your connection
  });

SASL SCRAM-SHA-256 认证机制。

vertx-pg-client 支持 SCRAM-SHA-256SCRAM-SHA-256-PLUS SASL 认证。从 v5.0.0 版本开始,vertx-pg-client 附带 com.ongres.scram 库依赖项,当从旧版本迁移时,vertx-pg-client 用户可以从其 pom.xmlbuild.gradle 文件中删除 com.ongres.scram 库。

如果客户端仅使用其他认证方法,则可以使用 maven <exclusions> 或 gradle exclude 排除传递的 com.ongres.scram 依赖项,以减小软件大小。

运行查询

当您不需要事务或运行单个查询时,您可以直接在连接池上运行查询;连接池将使用其一个连接来运行查询并将结果返回给您。

以下是运行简单查询的方法

client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

预处理查询

您也可以对预处理查询执行相同的操作。

SQL 字符串可以使用数据库语法 `$1`、`$2` 等按位置引用参数…

client
  .preparedQuery("SELECT * FROM users WHERE id=$1")
  .execute(Tuple.of("julien"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

查询方法提供一个异步 RowSet 实例,适用于 SELECT 查询

client
  .preparedQuery("SELECT first_name, last_name FROM users")
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

UPDATE/INSERT 查询

client
  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)")
  .execute(Tuple.of("Julien", "Viet"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Row 允许您通过索引访问数据

System.out.println("User " + row.getString(0) + " " + row.getString(1));
列索引从 0 开始,而不是从 1 开始。

或者,可以通过名称检索数据

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

客户端在这里不会做任何神奇的事情,列名是根据表中的名称来识别的,无论您的 SQL 文本是什么。

您可以访问各种类型

String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

您可以使用缓存的预处理语句来执行一次性预处理查询

connectOptions.setCachePreparedStatements(true);
client
  .preparedQuery("SELECT * FROM users WHERE id = $1")
  .execute(Tuple.of("julien"))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Got " + rows.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

您可以创建一个 PreparedStatement 并自行管理生命周期。

sqlConnection
  .prepare("SELECT * FROM users WHERE id = $1")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      PreparedStatement preparedStatement = ar.result();
      preparedStatement.query()
        .execute(Tuple.of("julien"))
        .onComplete(ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            System.out.println("Got " + rows.size() + " rows ");
            preparedStatement.close();
          } else {
            System.out.println("Failure: " + ar2.cause().getMessage());
          }
        });
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

批处理

您可以执行预处理批处理

List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));

// Execute the prepared batch
client
  .preparedQuery("INSERT INTO USERS (id, name) VALUES ($1, $2)")
  .executeBatch(batch)
  .onComplete(res -> {
  if (res.succeeded()) {

    // Process rows
    RowSet<Row> rows = res.result();
  } else {
    System.out.println("Batch failed " + res.cause());
  }
});

Returning 子句

您可以在查询中使用 'RETURNING' 子句获取生成的键

client
  .preparedQuery("INSERT INTO color (color_name) VALUES ($1), ($2), ($3) RETURNING color_id")
  .execute(Tuple.of("white", "red", "blue"))
  .onSuccess(rows -> {
    for (Row row : rows) {
      System.out.println("generated key: " + row.getInteger("color_id"));
    }
});

只要存在 RETURNING 子句,这适用于任何 SQL

client
  .query("DELETE FROM color RETURNING color_name")
  .execute()
  .onSuccess(rows -> {
    for (Row row : rows) {
      System.out.println("deleted color: " + row.getString("color_name"));
    }
  });

带有 RETURNING 子句的批处理查询会创建一个 RowSet,其中包含批处理中每个元素的单个条目

client
  .preparedQuery("INSERT INTO color (color_name) VALUES ($1) RETURNING color_id")
  .executeBatch(Arrays.asList(Tuple.of("white"), Tuple.of("red"), Tuple.of("blue")))
  .onSuccess(res -> {
    for (RowSet<Row> rows = res; rows != null; rows = rows.next()) {
      Integer colorId = rows.iterator().next().getInteger("color_id");
      System.out.println("generated key: " + colorId);
    }
  });

使用连接

获取连接

当您需要执行顺序查询(不带事务)时,可以创建一个新连接或从连接池中借用一个。请记住,在从连接池获取连接并将其返回到连接池之间,您应该注意连接,因为它可能会因空闲超时等原因被服务器关闭。

pool
  .getConnection()
  .compose(connection ->
    connection
      .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
      .executeBatch(Arrays.asList(
        Tuple.of("Julien", "Viet"),
        Tuple.of("Emad", "Alblueshi")
      ))
      .compose(res -> connection
        // Do something with rows
        .query("SELECT COUNT(*) FROM Users")
        .execute()
        .map(rows -> rows.iterator().next().getInteger(0)))
      // Return the connection to the pool
      .eventually(() -> connection.close())
  ).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

可以创建预处理查询

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE $1")
  .compose(pq ->
    pq.query()
      .execute(Tuple.of("Julien"))
      .eventually(() -> pq.close())
  ).onSuccess(rows -> {
  // All rows
});

简化连接 API

当您使用连接池时,可以调用 withConnection,将一个在连接中执行的函数传递给它。

它从连接池中借用一个连接,并使用该连接调用函数。

该函数必须返回一个任意结果的 Future。

Future 完成后,连接将返回到连接池,并提供总结果。

pool.withConnection(connection ->
  connection
    .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
    .executeBatch(Arrays.asList(
      Tuple.of("Julien", "Viet"),
      Tuple.of("Emad", "Alblueshi")
    ))
    .compose(res -> connection
      // Do something with rows
      .query("SELECT COUNT(*) FROM Users")
      .execute()
      .map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

使用事务

连接事务

您可以使用 SQL 的 BEGIN/COMMIT/ROLLBACK 来执行事务,如果您这样做,则必须使用 SqlConnection 并自行管理它。

或者您可以使用 SqlConnection 的事务 API

pool.getConnection()
  // Transaction must use a connection
  .onSuccess(conn -> {
  // Begin the transaction
  conn.begin()
    .compose(tx -> conn
      // Various statements
      .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
      .execute()
      .compose(res2 -> conn
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
        .execute())
      // Commit the transaction
      .compose(res3 -> tx.commit()))
    // Return the connection to the pool
    .eventually(() -> conn.close())
    .onSuccess(v -> System.out.println("Transaction succeeded"))
    .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});

当数据库服务器报告当前事务失败时(例如臭名昭著的 current transaction is aborted, commands ignored until end of transaction block),事务将被回滚,并且 completion future 将以 TransactionRollbackException 失败

tx.completion()
  .onFailure(err -> {
  System.out.println("Transaction failed => rolled back");
});

简化的事务 API

当您使用连接池时,您可以调用 withTransaction,将一个在事务中执行的函数传递给它。

它从连接池中借用一个连接,开始事务,并使用一个在事务范围内执行所有操作的客户端调用该函数。

该函数必须返回一个任意结果的 Future

  • 当 Future 成功时,客户端将提交事务

  • 当 Future 失败时,客户端将回滚事务

事务完成后,连接将返回到连接池,并提供总结果。

pool.withTransaction(client -> client
  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  .execute()
  .flatMap(res -> client
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
    .execute()
    // Map to a message result
    .map("Users inserted")))
  .onSuccess(v -> System.out.println("Transaction succeeded"))
  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

游标和流式传输

默认情况下,预处理查询执行会获取所有行,您可以使用 游标 来控制您想要读取的行数

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE $1")
  .onComplete(ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();

    // Cursors require to run within a transaction
    connection
      .begin()
      .onComplete(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();

        // Create a cursor
        Cursor cursor = pq.cursor(Tuple.of("julien"));

        // Read 50 rows
        cursor.read(50).onComplete(ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();

            // Check for more ?
            if (cursor.hasMore()) {
              // Repeat the process...
            } else {
              // No more rows - commit the transaction
              tx.commit();
            }
          }
        });
      }
    });
  }
});

游标在提前释放时应关闭

cursor
  .read(50)
  .onComplete(ar2 -> {
  if (ar2.succeeded()) {
    // Close the cursor
    cursor.close();
  }
});

游标也提供了流 API,这可能更方便,尤其是在 Rxified 版本中。

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE $1")
  .onComplete(ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();

    // Streams require to run within a transaction
    connection
      .begin()
      .onComplete(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();

        // Fetch 50 rows at a time
        RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));

        // Use the stream
        stream.exceptionHandler(err -> {
          System.out.println("Error: " + err.getMessage());
        });
        stream.endHandler(v -> {
          // Close the stream to release the resources in the database
          stream
            .close()
            .onComplete(closed -> {
            tx.commit()
              .onComplete(committed -> {
              System.out.println("End of stream");
            });
          });
        });
        stream.handler(row -> {
          System.out.println("User: " + row.getString("last_name"));
        });
      }
    });
  }
});

流以 50 行的批次读取行并进行流式传输,当行传递给处理器后,新的 50 行批次将被读取,依此类推。

流可以暂停或恢复,加载的行将保留在内存中直到它们被传递,并且游标将停止迭代。

PostgreSQL 在事务结束时销毁游标,因此游标 API 应在事务中使用,否则您很可能会遇到 34000 PostgreSQL 错误。

跟踪查询

当 Vert.x 启用了跟踪时,SQL 客户端可以跟踪查询执行。

客户端报告以下客户端跨度

  • Query 操作名称

  • 标签

  • db.system:数据库管理系统产品

  • db.user:数据库用户名

  • db.instance:数据库实例

  • db.statement:SQL 查询

  • db.typesql

默认的追踪策略是 PROPAGATE,客户端只会在活跃的追踪中创建跨度。

您可以通过 setTracingPolicy 更改客户端策略,例如您可以将 ALWAYS 设置为始终报告一个 span

options.setTracingPolicy(TracingPolicy.ALWAYS);

PostgreSQL 类型映射

目前客户端支持以下 PostgreSQL 类型

  • BOOLEAN (java.lang.Boolean)

  • INT2 (java.lang.Short)

  • INT4 (java.lang.Integer)

  • INT8 (java.lang.Long)

  • FLOAT4 (java.lang.Float)

  • FLOAT8 (java.lang.Double)

  • CHAR (java.lang.String)

  • VARCHAR (java.lang.String)

  • TEXT (java.lang.String)

  • ENUM (java.lang.String)

  • NAME (java.lang.String)

  • SERIAL2 (java.lang.Short)

  • SERIAL4 (java.lang.Integer)

  • SERIAL8 (java.lang.Long)

  • NUMERIC (io.vertx.sqlclient.data.Numeric)

  • UUID (java.util.UUID)

  • DATE (java.time.LocalDate)

  • TIME (java.time.LocalTime)

  • TIMETZ (java.time.OffsetTime)

  • TIMESTAMP (java.time.LocalDateTime)

  • TIMESTAMPTZ (java.time.OffsetDateTime)

  • INTERVAL (io.vertx.pgclient.data.Interval)

  • BYTEA (io.vertx.core.buffer.Buffer)

  • JSON (io.vertx.core.json.JsonObject, io.vertx.core.json.JsonArray, Number, Boolean, String, io.vertx.sqlclient.Tuple#JSON_NULL)

  • JSONB (io.vertx.core.json.JsonObject, io.vertx.core.json.JsonArray, Number, Boolean, String, io.vertx.sqlclient.Tuple#JSON_NULL)

  • POINT (io.vertx.pgclient.data.Point)

  • LINE (io.vertx.pgclient.data.Line)

  • LSEG (io.vertx.pgclient.data.LineSegment)

  • BOX (io.vertx.pgclient.data.Box)

  • PATH (io.vertx.pgclient.data.Path)

  • POLYGON (io.vertx.pgclient.data.Polygon)

  • CIRCLE (io.vertx.pgclient.data.Circle)

  • TSVECTOR (java.lang.String)

  • TSQUERY (java.lang.String)

  • INET (io.vertx.pgclient.data.Inet)

  • MONEY (io.vertx.pgclient.data.Money)

元组解码在存储值时使用上述类型,如果可能,它还会在飞行中执行实际值的转换

pool
  .query("SELECT 1::BIGINT \"VAL\"")
  .execute()
  .onComplete(ar -> {
    RowSet<Row> rowSet = ar.result();
    Row row = rowSet.iterator().next();

    // Stored as java.lang.Long
    Object value = row.getValue(0);

    // Convert to java.lang.Integer
    Integer intValue = row.getInteger(0);
  });

元组编码使用上述类型映射进行编码,除非类型是数字,在这种情况下使用 java.lang.Number 代替

pool
  .query("SELECT 1::BIGINT \"VAL\"")
  .execute()
  .onComplete(ar -> {
    RowSet<Row> rowSet = ar.result();
    Row row = rowSet.iterator().next();

    // Stored as java.lang.Long
    Object value = row.getValue(0);

    // Convert to java.lang.Integer
    Integer intValue = row.getInteger(0);
  });

支持这些类型的数组。

处理 JSON

PostgreSQL JSONJSONB 类型由以下 Java 类型表示

  • String

  • Number

  • Boolean

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • io.vertx.sqlclient.Tuple#JSON_NULL 用于表示 JSON 空字面量

Tuple tuple = Tuple.of(
  Tuple.JSON_NULL,
  new JsonObject().put("foo", "bar"),
  3);

// Retrieving json
Object value = tuple.getValue(0); // Expect JSON_NULL

//
value = tuple.get(JsonObject.class, 1); // Expect JSON object

//
value = tuple.get(Integer.class, 2); // Expect 3
value = tuple.getInteger(2); // Expect 3

处理 NUMERIC

Numeric Java 类型用于表示 PostgreSQL NUMERIC 类型。

Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
  // Handle NaN
} else {
  BigDecimal value = numeric.bigDecimalValue();
}

处理数组

数组在 TupleRow 上可用

Tuple tuple = Tuple.of(new String[]{ "a", "tuple", "with", "arrays" });

// Add a string array to the tuple
tuple.addArrayOfString(new String[]{"another", "array"});

// Get the first array of string
String[] array = tuple.getArrayOfStrings(0);

处理日期/时间无穷大

PostgreSQL 定义了表示无穷大的特殊值。

相应类型的最大/最小值常量表示特殊值

  • OffsetDateTime.MAX/OffsetDateTime.MIN`

  • LocalDateTime.MAX/LocalDateTime.MIN`

  • LocalDate.MAX/LocalDate.MIN`

client
  .query("SELECT 'infinity'::DATE \"LocalDate\"")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      Row row = ar.result().iterator().next();
      System.out.println(row.getLocalDate("LocalDate").equals(LocalDate.MAX));
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

处理自定义类型

字符串用于表示自定义类型,无论是发送到还是从 Postgres 返回的。

您可以从 PostgreSQL 读取并将自定义类型作为字符串获取

client
  .preparedQuery("SELECT address, (address).city FROM address_book WHERE id=$1")
  .execute(Tuple.of(3))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("Full Address " + row.getString(0) + ", City " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

您也可以通过提供字符串写入 PostgreSQL

client
  .preparedQuery("INSERT INTO address_book (id, address) VALUES ($1, $2)")
  .execute(Tuple.of(3, "('Anytown', 'Second Ave', false)"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

文本搜索使用 Java String 处理

client
  .preparedQuery("SELECT to_tsvector( $1 ) @@ to_tsquery( $2 )")
  .execute(Tuple.of("fat cats ate fat rats", "fat & rat"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("Match : " + row.getBoolean(0));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

tsvectortsquery 可以使用 Java String 从数据库中获取

client
  .preparedQuery("SELECT to_tsvector( $1 ), to_tsquery( $2 )")
  .execute(Tuple.of("fat cats ate fat rats", "fat & rat"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("Vector : " + row.getString(0) + ", query : "+row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

处理枚举类型

PostgreSQL 枚举类型被映射到 Java 字符串。

client
  .preparedQuery("INSERT INTO colors VALUES ($2)")
  .execute(Tuple.of("red"))
  .onComplete(res -> {
    // ...
  });

使用 Java 枚举类型

您可以将 Java 枚举类型映射到这些列类型

  • 字符串 (VARCHAR, TEXT)

  • PostgreSQL 枚举类型

  • 数字 (INT2, INT4, INT8)

client
  .preparedQuery("INSERT INTO colors VALUES ($1)")
  .execute(Tuple.of(Color.red))
  .flatMap(res ->
    client
      .preparedQuery("SELECT color FROM colors")
      .execute()
  ).onComplete(res -> {
    if (res.succeeded()) {
      RowSet<Row> rows = res.result();
      for (Row row : rows) {
        System.out.println(row.get(Color.class, "color"));
      }
    }
});

字符串和 PostgreSQL 枚举类型与 Java 枚举的 name() 方法返回的名称匹配。

数字类型与 Java 枚举的 ordinal() 方法返回的序号匹配。

收集器查询

您可以将 Java 收集器与查询 API 结合使用

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));

// Run the query with the collector
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    SqlResult<Map<Long, String>> result = ar.result();

    // Get the map created by the collector
    Map<Long, String> map = result.value();
    System.out.println("Got " + map);
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

收集器处理不得保留对 Row 的引用,因为处理整个集合只使用一个行。

Java Collectors 提供了许多有趣的预定义收集器,例如您可以轻松地直接从行集创建字符串

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
);

// Run the query with the collector
client
  .query("SELECT * FROM users")
  .collecting(collector)
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();

      // Get the string created by the collector
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

发布/订阅

PostgreSQL 支持发布/订阅通信通道。

您可以设置一个 notificationHandler 来接收 PostgreSQL 通知

connection.notificationHandler(notification -> {
  System.out.println("Received " + notification.getPayload() + " on channel " + notification.getChannel());
});

connection
  .query("LISTEN some-channel")
  .execute()
  .onComplete(ar -> {
  System.out.println("Subscribed to channel");
});

PgSubscriber 是一个通道管理器,管理单个连接以提供每个通道的订阅

PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
);

// You can set the channel before connect
subscriber.channel("channel1").handler(payload -> {
  System.out.println("Received " + payload);
});

subscriber
  .connect()
  .onComplete(ar -> {
  if (ar.succeeded()) {

    // Or you can set the channel after connect
    subscriber.channel("channel2").handler(payload -> {
      System.out.println("Received " + payload);
    });
  }
});

传递给通道方法的通道名称将是 PostgreSQL 用于发送通知的通道的精确名称。请注意,这与 SQL 中通道名称的表示方式不同,并且在内部 PgSubscriber 会将提交的通道名称准备为带引号的标识符

PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
);

subscriber
  .connect()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Complex channel name - name in PostgreSQL requires a quoted ID
      subscriber.channel("Complex.Channel.Name").handler(payload -> {
        System.out.println("Received " + payload);
      });
      subscriber.channel("Complex.Channel.Name").subscribeHandler(subscribed -> {
        subscriber.actualConnection()
          .query("NOTIFY \"Complex.Channel.Name\", 'msg'")
          .execute()
          .onComplete(notified -> {
            System.out.println("Notified \"Complex.Channel.Name\"");
          });
      });

      // PostgreSQL simple ID's are forced lower-case
      subscriber.channel("simple_channel").handler(payload -> {
          System.out.println("Received " + payload);
      });
      subscriber.channel("simple_channel").subscribeHandler(subscribed -> {
        // The following simple channel identifier is forced to lower case
        subscriber.actualConnection()
          .query("NOTIFY Simple_CHANNEL, 'msg'")
          .execute()
          .onComplete(notified -> {
            System.out.println("Notified simple_channel");
          });
      });

      // The following channel name is longer than the current
      // (NAMEDATALEN = 64) - 1 == 63 character limit and will be truncated
      subscriber.channel("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbb")
        .handler(payload -> {
        System.out.println("Received " + payload);
      });
    }
  });

您可以提供一个重连策略作为函数,该函数将重试次数 retries 作为参数,并返回一个 amountOfTime

  • amountOfTime < 0 时:订阅者关闭,不进行重试

  • amountOfTime = 0 时:订阅者立即重试连接

  • amountOfTime > 0 时:订阅者在 amountOfTime 毫秒后重试

PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
);

// Reconnect at most 10 times after 100 ms each
subscriber.reconnectPolicy(retries -> {
  if (retries < 10) {
    return 100L;
  } else {
    return -1L;
  }
});

默认策略是不重连。

通知消息

PostgreSQL 可以在连接的生命周期内发送通知消息。

默认情况下,此类消息在控制台上记录为警告。

您可以在连接上设置一个处理器来捕获它们并对其进行有用的处理。

connection.noticeHandler(notice -> {
  System.out.println("Received notice " + notice.getSeverity() + "" + notice.getMessage());
});

取消请求

PostgreSQL 支持取消正在进行的请求。您可以使用 cancelRequest 取消正在进行的请求。取消请求会打开与服务器的新连接,取消请求,然后关闭连接。

connection
  .query("SELECT pg_sleep(20)")
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    // imagine this is a long query and is still running
    System.out.println("Query success");
  } else {
    // the server will abort the current query after cancelling request
    System.out.println("Failed to query due to " + ar.cause().getMessage());
  }
});
connection
  .cancelRequest()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    System.out.println("Cancelling request has been sent");
  } else {
    System.out.println("Failed to send cancelling request");
  }
});

取消信号可能有效也可能无效——例如,如果它在后端完成查询处理后到达,则它将无效。如果取消有效,则会导致当前命令提前终止并显示错误消息。

更多信息请参见官方文档

使用 SSL/TLS

要配置客户端使用 SSL 连接,您可以像配置 Vert.x NetClient 一样配置 PgConnectOptions。支持所有 SSL 模式,并且您可以配置 sslmode。客户端默认处于 DISABLE SSL 模式。ssl 参数仅作为设置 sslmode 的快捷方式。setSsl(true) 等同于 setSslMode(VERIFY_CA)setSsl(false) 等同于 setSslMode(DISABLE)

PgConnectOptions options = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setSslMode(SslMode.VERIFY_CA)
  .setSslOptions(new ClientSSLOptions().setTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem")));

PgConnection
  .connect(vertx, options)
  .onComplete(res -> {
  if (res.succeeded()) {
    // Connected with SSL
  } else {
    System.out.println("Could not connect " + res.cause());
  }
});

更多信息可在 Vert.x 文档中找到。

使用 4 层代理

您可以配置客户端以使用 HTTP/1.x CONNECT、SOCKS4a 或 SOCKS5 4 层代理。

更多信息请参见 Vert.x 文档

使用 7 层代理

7 层代理可以在与实际数据库的多个连接上进行查询负载均衡。当发生这种情况时,客户端可能会因缺少会话亲和性而感到困惑,并可能出现不必要的错误,例如 *ERROR: unnamed prepared statement does not exist (26000)*。

支持的代理

  • 配置了 _pool_mode=transactionPgBouncer

您可以配置客户端以与代理以不同方式交互

connectOptions.setUseLayer7Proxy(true);

这样做时,预处理语句无法被缓存,因此

  • 必须禁用预处理语句缓存

  • 显式预处理语句只能存在于事务范围内,这意味着您可以使用游标,但游标的预处理语句必须在事务范围内创建和销毁

高级连接池配置

服务器负载均衡

您可以将连接池配置为包含服务器列表而不是单个服务器。

Pool pool = PgBuilder.pool()
  .with(options)
  .connectingTo(Arrays.asList(server1, server2, server3))
  .using(vertx)
  .build();

连接池在创建连接时使用轮询负载均衡来选择不同的服务器。

这在创建连接时提供负载均衡,而不是在从连接池借用连接时。

连接池初始化

您可以使用 withConnectHandler 在连接创建后并将其插入连接池之前与连接进行交互。

builder.withConnectHandler(conn -> {
  conn.query(sql).execute().onSuccess(res -> {
    // Release the connection to the pool, ready to be used by the application
    conn.close();
  });
});

一旦您完成连接,您应该简单地关闭它以向连接池发出信号以使用它。

动态连接配置

您可以使用 Java 供应商而不是 SqlConnectOptions 实例来配置连接池的连接详情。

由于供应商是异步的,它可以用于提供动态的连接池配置(例如密码轮换)。

Pool pool = PgBuilder.pool()
  .with(poolOptions)
  .connectingTo(() -> {
    Future<SqlConnectOptions> connectOptions = retrieveOptions();
    return connectOptions;
  })
  .using(vertx)
  .build();

支持的数据类型

**响应式 Postgres 客户端**目前支持以下数据类型

_ 数组

Postgres

Java

支持

JAVA

支持

BOOLEAN

j.l.Boolean

j.l.Boolean[]

INT2

j.l.Short

j.l.Short[]

INT4

j.l.Integer

j.l.Integer[]

INT8

j.l.Long

j.l.Long[]

FLOAT4

j.l.Float

j.l.Float[]

FLOAT8

j.l.Double

j.l.Double[]

CHAR

j.l.Character

j.l.Character[]

VARCHAR

j.l.String

j.l.String[]

TEXT

j.l.String

j.l.String[]

ENUM

j.l.String

j.l.String[]

NAME

j.l.String

j.l.String[]

SERIAL2

j.l.Short

无效类型

SERIAL4

j.l.Integer

无效类型

SERIAL8

j.l.Long

无效类型

NUMERIC

i.r.p.data.Numeric

i.r.p.data.Numeric[]

UUID

j.u.UUID

j.u.UUID[]

DATE

j.t.LocalDate

j.t.LocalDate[]

TIME

j.t.LocalTime

j.t.LocalTime[]

TIMETZ

j.t.OffsetTime

j.t.OffsetTime[]

TIMESTAMP

j.t.LocalDateTime

j.t.LocalDateTime[]

TIMESTAMPTZ

j.t.OffsetDateTime

j.t.OffsetDateTime[]

INTERVAL

i.r.p.data.Interval

i.r.p.data.Interval[]

BYTEA

i.v.c.b.Buffer

i.v.c.b.Buffer[]

JSON

Object

Object[]

JSONB

Object

Object[]

POINT

i.r.p.data.Point

i.r.p.data.Point[]

LINE

i.r.p.data.Line

i.r.p.data.Line[]

LSEG

i.r.p.data.LineSegment

i.r.p.data.LineSegment[]

BOX

i.r.p.data.Box

i.r.p.data.Box[]

INET

io.vertx.pgclient.data.Inet

io.vertx.pgclient.data.Inet[]

MONEY

io.vertx.pgclient.data.Money

io.vertx.pgclient.data.Money[]

CIDR

io.vertx.pgclient.data.Cidr

io.vertx.pgclient.data.Cidr[]

PATH

i.r.p.data.Path

i.r.p.data.Path[]

POLYGON

i.r.p.data.Polygon

i.r.p.data.Polygon[]

CIRCLE

i.r.p.data.Circle

i.r.p.data.Circle[]

TSVECTOR

j.l.String

j.l.String[]

TSQUERY

j.l.String

j.l.String[]

未知

j.l.String

j.l.String[]

PostgreSQL JSON 和 JSONB 类型由以下 Java 类型表示

  • java.lang.String

  • java.lang.Number

  • java.lang.Boolean

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • io.vertx.sqlclient.Tuple#JSON_NULL 用于表示 JSON 空字面量

以下类型

BITVARBITMACADDRMACADDR8XMLHSTOREOIDVOID

尚未实现(欢迎提交 PR)。