<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-pg-client</artifactId>
<version>5.0.1</version>
</dependency>
响应式 PostgreSQL 客户端
响应式 PostgreSQL 客户端是一个用于 PostgreSQL 的客户端,具有直接的 API,专注于可伸缩性和低开销。
该客户端是响应式且非阻塞的,允许单线程处理许多数据库连接。
-
事件驱动
-
轻量级
-
内置连接池
-
预处理查询缓存
-
使用 PostgreSQL
NOTIFY/LISTEN
进行发布/订阅 -
批处理和游标
-
行流式传输
-
命令管道化
-
RxJava API
-
直接内存到对象,无需不必要的复制
-
Java 8 日期和时间
-
SSL/TLS
-
Unix 域套接字
-
HTTP/1.x CONNECT、SOCKS4a 或 SOCKS5 代理支持
-
代理(4 层和 7 层)支持
用法
要使用响应式 PostgreSQL 客户端,请将以下依赖项添加到构建描述符的 dependencies 部分
-
Maven(在您的
pom.xml
中)
-
Gradle(在您的
build.gradle
文件中)
dependencies {
compile 'io.vertx:vertx-pg-client:5.0.1'
}
快速入门
以下是连接、查询和断开连接的最简单方法
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the client pool
SqlClient client = PgBuilder
.client()
.with(poolOptions)
.connectingTo(connectOptions)
.build();
// A simple query
client
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// Now close the pool
client.close();
});
连接到 PostgreSQL
大多数情况下,您将使用连接池连接到 PostgreSQL
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
SqlClient client = PgBuilder
.client()
.with(poolOptions)
.connectingTo(connectOptions)
.build();
池化客户端使用连接池,任何操作都将从池中借用一个连接来执行操作并将其释放回池中。
如果您正在运行 Vert.x,您可以将您的 Vertx 实例传递给它
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
SqlClient client = PgBuilder
.client()
.with(poolOptions)
.connectingTo(connectOptions)
.using(vertx)
.build();
当您不再需要客户端时,需要释放它
client.close();
当您需要在同一连接上执行多个操作时,需要从连接池中获取一个 connection
。
您可以轻松地从连接池中获取一个
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
Pool pool = PgBuilder
.pool()
.with(poolOptions)
.connectingTo(connectOptions)
.using(vertx)
.build();
// Get a connection from the pool
pool.getConnection().compose(conn -> {
System.out.println("Got a connection from the pool");
// All operations execute on the same connection
return conn
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.compose(res -> conn
.query("SELECT * FROM users WHERE id='emad'")
.execute())
.onComplete(ar -> {
// Release the connection to the pool
conn.close();
});
}).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Done");
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
一旦您使用完连接,您必须关闭它以将其释放回连接池,以便可以重用它。
命令管道化
在某些用例中,命令管道化可以提高数据库访问性能。
您可以配置客户端以使用管道化
Pool pool = PgBuilder
.pool()
.connectingTo(connectOptions.setPipeliningLimit(16))
.with(poolOptions)
.using(vertx)
.build();
默认的管道化限制为 256
。
您可以将此值设置为 1
以禁用管道化。
连接池与池化客户端
PgBuilder
允许您创建连接池或池化客户端
SqlClient client = PgBuilder
.client()
.with(poolOptions)
.connectingTo(connectOptions)
.using(vertx)
.build();
// Pipelined
Future<RowSet<Row>> res1 = client.query(sql).execute();
// Connection pool
Pool pool = PgBuilder
.pool()
.connectingTo(connectOptions)
.with(poolOptions)
.using(vertx)
.build();
// Not pipelined
Future<RowSet<Row>> res2 = pool.query(sql).execute();
-
连接池操作不是流水线化的,只有从连接池获取的连接才是流水线化的
-
池化客户端操作是流水线化的,您无法从池化客户端获取连接
连接池共享
您可以在多个 Verticle 或同一 Verticle 的多个实例之间共享一个连接池。这样的连接池应该在 Verticle 之外创建,否则当创建它的 Verticle 卸载时,它将被关闭
Pool pool = Pool.pool(database, new PoolOptions().setMaxSize(maxSize));
vertx.deployVerticle(() -> new VerticleBase() {
@Override
public Future<?> start() throws Exception {
// Use the pool
return super.start();
}
}, new DeploymentOptions().setInstances(4));
您也可以在每个 Verticle 中创建一个共享连接池
vertx.deployVerticle(() -> new VerticleBase() {
Pool pool;
@Override
public Future<?> start() throws Exception {
// Get or create a shared pool
// this actually creates a lease to the pool
// when the verticle is undeployed, the lease will be released automaticaly
pool = PgBuilder.pool()
.with(new PoolOptions()
.setMaxSize(maxSize)
.setShared(true)
.setName("my-pool"))
.connectingTo(database)
.using(vertx)
.build();
return super.start();
}
}, new DeploymentOptions().setInstances(4));
首次创建共享池时,它将为该池创建资源。后续调用将重用此池并创建对此池的租约。资源在所有租约关闭后处置。
默认情况下,连接池在需要创建 TCP 连接时会重用当前事件循环。因此,共享连接池将随机使用使用它的 verticle 的事件循环。
您可以为连接池分配一个事件循环数量,使其独立于使用它的上下文
Pool pool = PgBuilder.pool()
.with(new PoolOptions()
.setMaxSize(maxSize)
.setShared(true)
.setName("my-pool")
.setEventLoopSize(4))
.connectingTo(database)
.using(vertx)
.build();
Unix 域套接字
有时您希望通过 Unix 域套接字连接来提高性能,我们通过 Vert.x Native transports 实现这一点。
请确保您已在 classpath 中添加了所需的 netty-transport-native
依赖项并启用了 Unix 域套接字选项。
PgConnectOptions connectOptions = new PgConnectOptions()
.setHost("/var/run/postgresql")
.setPort(5432)
.setDatabase("the-db");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
Pool client = PgBuilder
.pool()
.connectingTo(connectOptions)
.with(poolOptions)
.build();
// Create the pooled client with a vertx instance
// Make sure the vertx instance has enabled native transports
Pool client2 = PgBuilder
.pool()
.connectingTo(connectOptions)
.with(poolOptions)
.using(vertx)
.build();
更多信息请参见 Vert.x 文档。
连接重试
您可以配置客户端在连接建立失败时进行重试。
options
.setReconnectAttempts(2)
.setReconnectInterval(1000);
配置
您可以通过几种替代方案配置客户端。
数据对象
配置客户端的一个简单方法是指定一个 PgConnectOptions
数据对象。
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// Create the pool from the data object
Pool pool = PgBuilder.pool()
.with(poolOptions)
.connectingTo(connectOptions)
.using(vertx)
.build();
pool.getConnection()
.onComplete(ar -> {
// Handling your connection
});
您还可以使用 setProperties
或 addProperty
方法配置通用属性。请注意,setProperties
将覆盖默认的客户端属性。
将此客户端与 CockroachDB DBaaS 一起使用时,需要使用 addProperty("options", "--cluster=<cluster-id>") 或在 URL 中使用 …&options=--cluster%3D<cluster-id> 来包含 cluster 选项 |
例如,您可以通过添加 search_path
属性来为连接设置默认模式。
PgConnectOptions connectOptions = new PgConnectOptions();
// Set the default schema
Map<String, String> props = new HashMap<>();
props.put("search_path", "myschema");
connectOptions.setProperties(props);
有关可用属性的更多信息,请参见 PostgreSQL 手册。
连接 URI
除了使用 PgConnectOptions
数据对象进行配置外,当您希望使用连接 URI 进行配置时,我们还提供了另一种连接方式
String connectionUri = "postgresql://dbuser:[email protected]:5432/mydb";
// Create the pool from the connection URI
Pool pool = PgBuilder.pool()
.connectingTo(connectionUri)
.using(vertx)
.build();
// Create the connection from the connection URI
PgConnection
.connect(vertx, connectionUri)
.onComplete(res -> {
// Handling your connection
});
有关连接字符串格式的更多信息,请参见 PostgreSQL 手册。
目前,客户端支持以下参数键
-
host
-
hostaddr
-
port
-
user
-
password
-
dbname
-
sslmode
-
附加属性,包括
-
application_name
-
fallback_application_name
-
search_path
-
options
-
在连接 URI 中配置参数将覆盖默认属性。 |
环境变量
您还可以使用环境变量设置默认连接值,这在您希望避免硬编码数据库连接信息时很有用。有关更多详细信息,您可以参考官方文档。支持以下参数
-
PGHOST
-
PGHOSTADDR
-
PGPORT
-
PGDATABASE
-
PGUSER
-
PGPASSWORD
-
PGSSLMODE
如果您未指定数据对象或连接 URI 字符串进行连接,环境变量将优先于它们。
$ PGUSER=user \
PGHOST=the-host \
PGPASSWORD=secret \
PGDATABASE=the-db \
PGPORT=5432 \
PGSSLMODE=DISABLE
Pool pool = PgBuilder.pool()
.using(vertx)
.build();
// Create the connection from the environment variables
PgConnection.connect(vertx)
.onComplete(res -> {
// Handling your connection
});
SASL SCRAM-SHA-256 认证机制。
vertx-pg-client
支持 SCRAM-SHA-256
和 SCRAM-SHA-256-PLUS
SASL 认证。从 v5.0.0 版本开始,vertx-pg-client
附带 com.ongres.scram
库依赖项,当从旧版本迁移时,vertx-pg-client
用户可以从其 pom.xml
和 build.gradle
文件中删除 com.ongres.scram
库。
如果客户端仅使用其他认证方法,则可以使用 maven <exclusions>
或 gradle exclude
排除传递的 com.ongres.scram
依赖项,以减小软件大小。
运行查询
当您不需要事务或运行单个查询时,您可以直接在连接池上运行查询;连接池将使用其一个连接来运行查询并将结果返回给您。
以下是运行简单查询的方法
client
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
预处理查询
您也可以对预处理查询执行相同的操作。
SQL 字符串可以使用数据库语法 `$1`、`$2` 等按位置引用参数…
client
.preparedQuery("SELECT * FROM users WHERE id=$1")
.execute(Tuple.of("julien"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
查询方法提供一个异步 RowSet
实例,适用于 SELECT 查询
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
或 UPDATE/INSERT 查询
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)")
.execute(Tuple.of("Julien", "Viet"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Row
允许您通过索引访问数据
System.out.println("User " + row.getString(0) + " " + row.getString(1));
列索引从 0 开始,而不是从 1 开始。 |
或者,可以通过名称检索数据
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
客户端在这里不会做任何神奇的事情,列名是根据表中的名称来识别的,无论您的 SQL 文本是什么。
您可以访问各种类型
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
您可以使用缓存的预处理语句来执行一次性预处理查询
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = $1")
.execute(Tuple.of("julien"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您可以创建一个 PreparedStatement
并自行管理生命周期。
sqlConnection
.prepare("SELECT * FROM users WHERE id = $1")
.onComplete(ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"))
.onComplete(ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
批处理
您可以执行预处理批处理
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// Execute the prepared batch
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES ($1, $2)")
.executeBatch(batch)
.onComplete(res -> {
if (res.succeeded()) {
// Process rows
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
Returning 子句
您可以在查询中使用 'RETURNING' 子句获取生成的键
client
.preparedQuery("INSERT INTO color (color_name) VALUES ($1), ($2), ($3) RETURNING color_id")
.execute(Tuple.of("white", "red", "blue"))
.onSuccess(rows -> {
for (Row row : rows) {
System.out.println("generated key: " + row.getInteger("color_id"));
}
});
只要存在 RETURNING
子句,这适用于任何 SQL
client
.query("DELETE FROM color RETURNING color_name")
.execute()
.onSuccess(rows -> {
for (Row row : rows) {
System.out.println("deleted color: " + row.getString("color_name"));
}
});
带有 RETURNING
子句的批处理查询会创建一个 RowSet
,其中包含批处理中每个元素的单个条目
client
.preparedQuery("INSERT INTO color (color_name) VALUES ($1) RETURNING color_id")
.executeBatch(Arrays.asList(Tuple.of("white"), Tuple.of("red"), Tuple.of("blue")))
.onSuccess(res -> {
for (RowSet<Row> rows = res; rows != null; rows = rows.next()) {
Integer colorId = rows.iterator().next().getInteger("color_id");
System.out.println("generated key: " + colorId);
}
});
使用连接
获取连接
当您需要执行顺序查询(不带事务)时,可以创建一个新连接或从连接池中借用一个。请记住,在从连接池获取连接并将其返回到连接池之间,您应该注意连接,因为它可能会因空闲超时等原因被服务器关闭。
pool
.getConnection()
.compose(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
// Return the connection to the pool
.eventually(() -> connection.close())
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
可以创建预处理查询
connection
.prepare("SELECT * FROM users WHERE first_name LIKE $1")
.compose(pq ->
pq.query()
.execute(Tuple.of("Julien"))
.eventually(() -> pq.close())
).onSuccess(rows -> {
// All rows
});
简化连接 API
当您使用连接池时,可以调用 withConnection
,将一个在连接中执行的函数传递给它。
它从连接池中借用一个连接,并使用该连接调用函数。
该函数必须返回一个任意结果的 Future。
Future 完成后,连接将返回到连接池,并提供总结果。
pool.withConnection(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
使用事务
连接事务
您可以使用 SQL 的 BEGIN
/COMMIT
/ROLLBACK
来执行事务,如果您这样做,则必须使用 SqlConnection
并自行管理它。
或者您可以使用 SqlConnection
的事务 API
pool.getConnection()
// Transaction must use a connection
.onSuccess(conn -> {
// Begin the transaction
conn.begin()
.compose(tx -> conn
// Various statements
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.compose(res2 -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute())
// Commit the transaction
.compose(res3 -> tx.commit()))
// Return the connection to the pool
.eventually(() -> conn.close())
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});
当数据库服务器报告当前事务失败时(例如臭名昭著的 current transaction is aborted, commands ignored until end of transaction block),事务将被回滚,并且 completion
future 将以 TransactionRollbackException
失败
tx.completion()
.onFailure(err -> {
System.out.println("Transaction failed => rolled back");
});
简化的事务 API
当您使用连接池时,您可以调用 withTransaction
,将一个在事务中执行的函数传递给它。
它从连接池中借用一个连接,开始事务,并使用一个在事务范围内执行所有操作的客户端调用该函数。
该函数必须返回一个任意结果的 Future
-
当 Future 成功时,客户端将提交事务
-
当 Future 失败时,客户端将回滚事务
事务完成后,连接将返回到连接池,并提供总结果。
pool.withTransaction(client -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.flatMap(res -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute()
// Map to a message result
.map("Users inserted")))
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
游标和流式传输
默认情况下,预处理查询执行会获取所有行,您可以使用 游标
来控制您想要读取的行数
connection
.prepare("SELECT * FROM users WHERE first_name LIKE $1")
.onComplete(ar0 -> {
if (ar0.succeeded()) {
PreparedStatement pq = ar0.result();
// Cursors require to run within a transaction
connection
.begin()
.onComplete(ar1 -> {
if (ar1.succeeded()) {
Transaction tx = ar1.result();
// Create a cursor
Cursor cursor = pq.cursor(Tuple.of("julien"));
// Read 50 rows
cursor.read(50).onComplete(ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - commit the transaction
tx.commit();
}
}
});
}
});
}
});
游标在提前释放时应关闭
cursor
.read(50)
.onComplete(ar2 -> {
if (ar2.succeeded()) {
// Close the cursor
cursor.close();
}
});
游标也提供了流 API,这可能更方便,尤其是在 Rxified 版本中。
connection
.prepare("SELECT * FROM users WHERE first_name LIKE $1")
.onComplete(ar0 -> {
if (ar0.succeeded()) {
PreparedStatement pq = ar0.result();
// Streams require to run within a transaction
connection
.begin()
.onComplete(ar1 -> {
if (ar1.succeeded()) {
Transaction tx = ar1.result();
// Fetch 50 rows at a time
RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));
// Use the stream
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
// Close the stream to release the resources in the database
stream
.close()
.onComplete(closed -> {
tx.commit()
.onComplete(committed -> {
System.out.println("End of stream");
});
});
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
}
});
流以 50
行的批次读取行并进行流式传输,当行传递给处理器后,新的 50
行批次将被读取,依此类推。
流可以暂停或恢复,加载的行将保留在内存中直到它们被传递,并且游标将停止迭代。
PostgreSQL 在事务结束时销毁游标,因此游标 API 应在事务中使用,否则您很可能会遇到 34000 PostgreSQL 错误。 |
跟踪查询
当 Vert.x 启用了跟踪时,SQL 客户端可以跟踪查询执行。
客户端报告以下客户端跨度
-
Query
操作名称 -
标签
-
db.system
:数据库管理系统产品 -
db.user
:数据库用户名 -
db.instance
:数据库实例 -
db.statement
:SQL 查询 -
db.type
:sql
默认的追踪策略是 PROPAGATE
,客户端只会在活跃的追踪中创建跨度。
您可以通过 setTracingPolicy
更改客户端策略,例如您可以将 ALWAYS
设置为始终报告一个 span
options.setTracingPolicy(TracingPolicy.ALWAYS);
PostgreSQL 类型映射
目前客户端支持以下 PostgreSQL 类型
-
BOOLEAN (
java.lang.Boolean
) -
INT2 (
java.lang.Short
) -
INT4 (
java.lang.Integer
) -
INT8 (
java.lang.Long
) -
FLOAT4 (
java.lang.Float
) -
FLOAT8 (
java.lang.Double
) -
CHAR (
java.lang.String
) -
VARCHAR (
java.lang.String
) -
TEXT (
java.lang.String
) -
ENUM (
java.lang.String
) -
NAME (
java.lang.String
) -
SERIAL2 (
java.lang.Short
) -
SERIAL4 (
java.lang.Integer
) -
SERIAL8 (
java.lang.Long
) -
NUMERIC (
io.vertx.sqlclient.data.Numeric
) -
UUID (
java.util.UUID
) -
DATE (
java.time.LocalDate
) -
TIME (
java.time.LocalTime
) -
TIMETZ (
java.time.OffsetTime
) -
TIMESTAMP (
java.time.LocalDateTime
) -
TIMESTAMPTZ (
java.time.OffsetDateTime
) -
INTERVAL (
io.vertx.pgclient.data.Interval
) -
BYTEA (
io.vertx.core.buffer.Buffer
) -
JSON (
io.vertx.core.json.JsonObject
,io.vertx.core.json.JsonArray
,Number
,Boolean
,String
,io.vertx.sqlclient.Tuple#JSON_NULL
) -
JSONB (
io.vertx.core.json.JsonObject
,io.vertx.core.json.JsonArray
,Number
,Boolean
,String
,io.vertx.sqlclient.Tuple#JSON_NULL
) -
POINT (
io.vertx.pgclient.data.Point
) -
LINE (
io.vertx.pgclient.data.Line
) -
LSEG (
io.vertx.pgclient.data.LineSegment
) -
BOX (
io.vertx.pgclient.data.Box
) -
PATH (
io.vertx.pgclient.data.Path
) -
POLYGON (
io.vertx.pgclient.data.Polygon
) -
CIRCLE (
io.vertx.pgclient.data.Circle
) -
TSVECTOR (
java.lang.String
) -
TSQUERY (
java.lang.String
) -
INET (
io.vertx.pgclient.data.Inet
) -
MONEY (
io.vertx.pgclient.data.Money
)
元组解码在存储值时使用上述类型,如果可能,它还会在飞行中执行实际值的转换
pool
.query("SELECT 1::BIGINT \"VAL\"")
.execute()
.onComplete(ar -> {
RowSet<Row> rowSet = ar.result();
Row row = rowSet.iterator().next();
// Stored as java.lang.Long
Object value = row.getValue(0);
// Convert to java.lang.Integer
Integer intValue = row.getInteger(0);
});
元组编码使用上述类型映射进行编码,除非类型是数字,在这种情况下使用 java.lang.Number
代替
pool
.query("SELECT 1::BIGINT \"VAL\"")
.execute()
.onComplete(ar -> {
RowSet<Row> rowSet = ar.result();
Row row = rowSet.iterator().next();
// Stored as java.lang.Long
Object value = row.getValue(0);
// Convert to java.lang.Integer
Integer intValue = row.getInteger(0);
});
支持这些类型的数组。
处理 JSON
PostgreSQL JSON
和 JSONB
类型由以下 Java 类型表示
-
String
-
Number
-
Boolean
-
io.vertx.core.json.JsonObject
-
io.vertx.core.json.JsonArray
-
io.vertx.sqlclient.Tuple#JSON_NULL
用于表示 JSON 空字面量
Tuple tuple = Tuple.of(
Tuple.JSON_NULL,
new JsonObject().put("foo", "bar"),
3);
// Retrieving json
Object value = tuple.getValue(0); // Expect JSON_NULL
//
value = tuple.get(JsonObject.class, 1); // Expect JSON object
//
value = tuple.get(Integer.class, 2); // Expect 3
value = tuple.getInteger(2); // Expect 3
处理 NUMERIC
Numeric
Java 类型用于表示 PostgreSQL NUMERIC
类型。
Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
// Handle NaN
} else {
BigDecimal value = numeric.bigDecimalValue();
}
处理日期/时间无穷大
PostgreSQL 定义了表示无穷大的特殊值。
相应类型的最大/最小值常量表示特殊值。
-
OffsetDateTime.MAX
/OffsetDateTime.MIN` -
LocalDateTime.MAX
/LocalDateTime.MIN` -
LocalDate.MAX
/LocalDate.MIN`
client
.query("SELECT 'infinity'::DATE \"LocalDate\"")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
Row row = ar.result().iterator().next();
System.out.println(row.getLocalDate("LocalDate").equals(LocalDate.MAX));
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
处理自定义类型
字符串用于表示自定义类型,无论是发送到还是从 Postgres 返回的。
您可以从 PostgreSQL 读取并将自定义类型作为字符串获取
client
.preparedQuery("SELECT address, (address).city FROM address_book WHERE id=$1")
.execute(Tuple.of(3))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("Full Address " + row.getString(0) + ", City " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您也可以通过提供字符串写入 PostgreSQL
client
.preparedQuery("INSERT INTO address_book (id, address) VALUES ($1, $2)")
.execute(Tuple.of(3, "('Anytown', 'Second Ave', false)"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
处理文本搜索
文本搜索使用 Java String
处理
client
.preparedQuery("SELECT to_tsvector( $1 ) @@ to_tsquery( $2 )")
.execute(Tuple.of("fat cats ate fat rats", "fat & rat"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("Match : " + row.getBoolean(0));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
tsvector
和 tsquery
可以使用 Java String
从数据库中获取
client
.preparedQuery("SELECT to_tsvector( $1 ), to_tsquery( $2 )")
.execute(Tuple.of("fat cats ate fat rats", "fat & rat"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("Vector : " + row.getString(0) + ", query : "+row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
处理枚举类型
PostgreSQL 枚举类型被映射到 Java 字符串。
client
.preparedQuery("INSERT INTO colors VALUES ($2)")
.execute(Tuple.of("red"))
.onComplete(res -> {
// ...
});
使用 Java 枚举类型
您可以将 Java 枚举类型映射到这些列类型
-
字符串 (VARCHAR, TEXT)
-
PostgreSQL 枚举类型
-
数字 (INT2, INT4, INT8)
client
.preparedQuery("INSERT INTO colors VALUES ($1)")
.execute(Tuple.of(Color.red))
.flatMap(res ->
client
.preparedQuery("SELECT color FROM colors")
.execute()
).onComplete(res -> {
if (res.succeeded()) {
RowSet<Row> rows = res.result();
for (Row row : rows) {
System.out.println(row.get(Color.class, "color"));
}
}
});
字符串和 PostgreSQL 枚举类型与 Java 枚举的 name()
方法返回的名称匹配。
数字类型与 Java 枚举的 ordinal()
方法返回的序号匹配。
收集器查询
您可以将 Java 收集器与查询 API 结合使用
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// Run the query with the collector
client.query("SELECT * FROM users")
.collecting(collector)
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// Get the map created by the collector
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
收集器处理不得保留对 Row
的引用,因为处理整个集合只使用一个行。
Java Collectors
提供了许多有趣的预定义收集器,例如您可以轻松地直接从行集创建字符串
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client
.query("SELECT * FROM users")
.collecting(collector)
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
发布/订阅
PostgreSQL 支持发布/订阅通信通道。
您可以设置一个 notificationHandler
来接收 PostgreSQL 通知
connection.notificationHandler(notification -> {
System.out.println("Received " + notification.getPayload() + " on channel " + notification.getChannel());
});
connection
.query("LISTEN some-channel")
.execute()
.onComplete(ar -> {
System.out.println("Subscribed to channel");
});
PgSubscriber
是一个通道管理器,管理单个连接以提供每个通道的订阅
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
);
// You can set the channel before connect
subscriber.channel("channel1").handler(payload -> {
System.out.println("Received " + payload);
});
subscriber
.connect()
.onComplete(ar -> {
if (ar.succeeded()) {
// Or you can set the channel after connect
subscriber.channel("channel2").handler(payload -> {
System.out.println("Received " + payload);
});
}
});
传递给通道方法的通道名称将是 PostgreSQL 用于发送通知的通道的精确名称。请注意,这与 SQL 中通道名称的表示方式不同,并且在内部 PgSubscriber
会将提交的通道名称准备为带引号的标识符
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
);
subscriber
.connect()
.onComplete(ar -> {
if (ar.succeeded()) {
// Complex channel name - name in PostgreSQL requires a quoted ID
subscriber.channel("Complex.Channel.Name").handler(payload -> {
System.out.println("Received " + payload);
});
subscriber.channel("Complex.Channel.Name").subscribeHandler(subscribed -> {
subscriber.actualConnection()
.query("NOTIFY \"Complex.Channel.Name\", 'msg'")
.execute()
.onComplete(notified -> {
System.out.println("Notified \"Complex.Channel.Name\"");
});
});
// PostgreSQL simple ID's are forced lower-case
subscriber.channel("simple_channel").handler(payload -> {
System.out.println("Received " + payload);
});
subscriber.channel("simple_channel").subscribeHandler(subscribed -> {
// The following simple channel identifier is forced to lower case
subscriber.actualConnection()
.query("NOTIFY Simple_CHANNEL, 'msg'")
.execute()
.onComplete(notified -> {
System.out.println("Notified simple_channel");
});
});
// The following channel name is longer than the current
// (NAMEDATALEN = 64) - 1 == 63 character limit and will be truncated
subscriber.channel("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbb")
.handler(payload -> {
System.out.println("Received " + payload);
});
}
});
您可以提供一个重连策略作为函数,该函数将重试次数 retries
作为参数,并返回一个 amountOfTime
值
-
当
amountOfTime < 0
时:订阅者关闭,不进行重试 -
当
amountOfTime = 0
时:订阅者立即重试连接 -
当
amountOfTime > 0
时:订阅者在amountOfTime
毫秒后重试
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
);
// Reconnect at most 10 times after 100 ms each
subscriber.reconnectPolicy(retries -> {
if (retries < 10) {
return 100L;
} else {
return -1L;
}
});
默认策略是不重连。
通知消息
PostgreSQL 可以在连接的生命周期内发送通知消息。
默认情况下,此类消息在控制台上记录为警告。
您可以在连接上设置一个处理器来捕获它们并对其进行有用的处理。
connection.noticeHandler(notice -> {
System.out.println("Received notice " + notice.getSeverity() + "" + notice.getMessage());
});
取消请求
PostgreSQL 支持取消正在进行的请求。您可以使用 cancelRequest
取消正在进行的请求。取消请求会打开与服务器的新连接,取消请求,然后关闭连接。
connection
.query("SELECT pg_sleep(20)")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
// imagine this is a long query and is still running
System.out.println("Query success");
} else {
// the server will abort the current query after cancelling request
System.out.println("Failed to query due to " + ar.cause().getMessage());
}
});
connection
.cancelRequest()
.onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Cancelling request has been sent");
} else {
System.out.println("Failed to send cancelling request");
}
});
取消信号可能有效也可能无效——例如,如果它在后端完成查询处理后到达,则它将无效。如果取消有效,则会导致当前命令提前终止并显示错误消息。
更多信息请参见官方文档。
使用 SSL/TLS
要配置客户端使用 SSL 连接,您可以像配置 Vert.x NetClient
一样配置 PgConnectOptions
。支持所有 SSL 模式,并且您可以配置 sslmode
。客户端默认处于 DISABLE
SSL 模式。ssl
参数仅作为设置 sslmode
的快捷方式。setSsl(true)
等同于 setSslMode(VERIFY_CA)
,setSsl(false)
等同于 setSslMode(DISABLE)
。
PgConnectOptions options = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setSslMode(SslMode.VERIFY_CA)
.setSslOptions(new ClientSSLOptions().setTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem")));
PgConnection
.connect(vertx, options)
.onComplete(res -> {
if (res.succeeded()) {
// Connected with SSL
} else {
System.out.println("Could not connect " + res.cause());
}
});
更多信息可在 Vert.x 文档中找到。
使用 7 层代理
7 层代理可以在与实际数据库的多个连接上进行查询负载均衡。当发生这种情况时,客户端可能会因缺少会话亲和性而感到困惑,并可能出现不必要的错误,例如 *ERROR: unnamed prepared statement does not exist (26000)*。
支持的代理
-
配置了
_pool_mode=transaction
的 PgBouncer。
您可以配置客户端以与代理以不同方式交互
connectOptions.setUseLayer7Proxy(true);
这样做时,预处理语句无法被缓存,因此
-
必须禁用预处理语句缓存
-
显式预处理语句只能存在于事务范围内,这意味着您可以使用游标,但游标的预处理语句必须在事务范围内创建和销毁
高级连接池配置
服务器负载均衡
您可以将连接池配置为包含服务器列表而不是单个服务器。
Pool pool = PgBuilder.pool()
.with(options)
.connectingTo(Arrays.asList(server1, server2, server3))
.using(vertx)
.build();
连接池在创建连接时使用轮询负载均衡来选择不同的服务器。
这在创建连接时提供负载均衡,而不是在从连接池借用连接时。 |
连接池初始化
您可以使用 withConnectHandler
在连接创建后并将其插入连接池之前与连接进行交互。
builder.withConnectHandler(conn -> {
conn.query(sql).execute().onSuccess(res -> {
// Release the connection to the pool, ready to be used by the application
conn.close();
});
});
一旦您完成连接,您应该简单地关闭它以向连接池发出信号以使用它。
动态连接配置
您可以使用 Java 供应商而不是 SqlConnectOptions
实例来配置连接池的连接详情。
由于供应商是异步的,它可以用于提供动态的连接池配置(例如密码轮换)。
Pool pool = PgBuilder.pool()
.with(poolOptions)
.connectingTo(() -> {
Future<SqlConnectOptions> connectOptions = retrieveOptions();
return connectOptions;
})
.using(vertx)
.build();
支持的数据类型
**响应式 Postgres 客户端**目前支持以下数据类型
_ | 值 | 数组 | ||
---|---|---|---|---|
Postgres | Java | 支持 | JAVA | 支持 |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✕ |
|
| ✔ |
| ✕ |
|
| ✔ |
| ✕ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
|
| ✔ |
| ✔ |
PostgreSQL JSON 和 JSONB 类型由以下 Java 类型表示
-
java.lang.String
-
java.lang.Number
-
java.lang.Boolean
-
io.vertx.core.json.JsonObject
-
io.vertx.core.json.JsonArray
-
io.vertx.sqlclient.Tuple#JSON_NULL
用于表示 JSON 空字面量
以下类型
BIT、VARBIT、MACADDR、MACADDR8、XML、HSTORE、OID、VOID
尚未实现(欢迎提交 PR)。