<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-oracle-client</artifactId>
<version>5.0.1</version>
</dependency>
响应式 Oracle 客户端
响应式 Oracle 客户端是一个 Oracle 客户端,其 API 简洁明了,专注于可伸缩性和低开销。
特性
-
事件驱动
-
内置连接池
-
Java 8 日期和时间
-
SSL/TLS
-
RxJava API
-
游标
-
行流式传输
暂不支持
-
预处理查询缓存
-
存储过程
用法
要使用响应式 Oracle 客户端,请将以下依赖项添加到构建描述符的 dependencies 部分中
-
Maven(在您的
pom.xml
中)
-
Gradle(在您的
build.gradle
文件中)
dependencies {
compile 'io.vertx:vertx-oracle-client:5.0.1'
}
快速入门
以下是连接、查询和断开连接的最简单方法
OracleConnectOptions connectOptions = new OracleConnectOptions()
.setPort(1521)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the client pool
Pool client = OracleBuilder.pool()
.with(poolOptions)
.connectingTo(connectOptions)
.build();
// A simple query
client
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// Now close the pool
client.close();
});
连接到 Oracle
大多数情况下,您会使用连接池连接到 Oracle
OracleConnectOptions connectOptions = new OracleConnectOptions()
.setPort(1521)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
Pool client = OracleBuilder.pool()
.with(poolOptions)
.connectingTo(connectOptions)
.using(vertx)
.build();
池化客户端使用连接池,任何操作都将从池中借用一个连接来执行操作并将其释放回池中。
如果您正在运行 Vert.x,您可以将您的 Vertx 实例传递给它
OracleConnectOptions connectOptions = new OracleConnectOptions()
.setPort(1521)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
Pool client = OracleBuilder.pool()
.with(poolOptions)
.connectingTo(connectOptions)
.using(vertx)
.build();
当您不再需要连接池时,您需要将其释放
pool.close();
当您需要在同一连接上执行多个操作时,您需要使用客户端 连接
。
您可以轻松地从连接池中获取一个
OracleConnectOptions connectOptions = new OracleConnectOptions()
.setPort(1521)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
Pool client = OracleBuilder.pool()
.with(poolOptions)
.connectingTo(connectOptions)
.using(vertx)
.build();
// Get a connection from the pool
client.getConnection().compose(conn -> {
System.out.println("Got a connection from the pool");
// All operations execute on the same connection
return conn
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.compose(res -> conn
.query("SELECT * FROM users WHERE id='emad'")
.execute())
.onComplete(ar -> {
// Release the connection to the pool
conn.close();
});
}).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Done");
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
一旦您使用完连接,您必须关闭它以将其释放回连接池,以便可以重用它。
连接池共享
您可以在多个 Verticle 或同一 Verticle 的多个实例之间共享一个连接池。这样的连接池应该在 Verticle 之外创建,否则当创建它的 Verticle 卸载时,它将被关闭
您也可以在每个 Verticle 中创建一个共享连接池
首次创建共享池时,它将为该池创建资源。后续调用将重用此池并创建对此池的租约。资源在所有租约关闭后处置。
默认情况下,连接池在需要创建 TCP 连接时会重用当前事件循环。因此,共享连接池将随机使用使用它的 verticle 的事件循环。
您可以为连接池分配一个事件循环数量,使其独立于使用它的上下文
配置
您可以通过几种替代方案配置客户端。
数据对象
配置客户端的一种简单方法是指定一个 OracleConnectOptions
数据对象。
OracleConnectOptions connectOptions = new OracleConnectOptions()
.setPort(1521)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// Create the pool from the data object
Pool pool = OracleBuilder.pool()
.with(poolOptions)
.connectingTo(connectOptions)
.using(vertx)
.build();
pool
.getConnection()
.onComplete(ar -> {
// Handling your connection
});
连接 URI
除了使用 OracleConnectOptions
数据对象进行配置外,当您想使用连接 URI 进行配置时,我们还为您提供了另一种连接方式。
使用 EZConnect 格式
String connectionUri = "oracle:thin:@mydbhost1:5521/mydbservice?connect_timeout=10sec";
// Connect options
OracleConnectOptions connectOptions = OracleConnectOptions.fromUri(connectionUri)
.setUser("user")
.setPassword("secret");
// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// Create the pool from the connection URI
Pool pool = OracleBuilder.pool()
.with(poolOptions)
.connectingTo(connectOptions)
.using(vertx)
.build();
或者,使用 TNS 别名格式
String connectionUri = "oracle:thin:@prod_db?TNS_ADMIN=/work/tns/";
// Connect options
OracleConnectOptions connectOptions = OracleConnectOptions.fromUri(connectionUri)
.setUser("user")
.setPassword("secret");
// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// Create the pool from the connection URI
Pool pool = OracleBuilder.pool()
.with(poolOptions)
.connectingTo(connectOptions)
.using(vertx)
.build();
连接重试
您可以配置客户端在连接建立失败时进行重试。
options
.setReconnectAttempts(2)
.setReconnectInterval(1000);
运行查询
当您不需要事务或运行单个查询时,您可以直接在连接池上运行查询;连接池将使用其一个连接来运行查询并将结果返回给您。
以下是运行简单查询的方法
client
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
预处理查询
您也可以对预处理查询执行相同的操作。
SQL 字符串可以通过位置引用参数,使用数据库语法 ?
client
.preparedQuery("SELECT * FROM users WHERE id=?")
.execute(Tuple.of("julien"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
查询方法提供一个异步 RowSet
实例,适用于 SELECT 查询
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
或 UPDATE/INSERT 查询
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
.execute(Tuple.of("Julien", "Viet"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Row
允许您通过索引访问数据
System.out.println("User " + row.getString(0) + " " + row.getString(1));
列索引从 0 开始,而不是从 1 开始。 |
或者,可以通过名称检索数据
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
客户端在这里不会做任何神奇的事情,列名是根据表中的名称来识别的,无论您的 SQL 文本是什么。
您可以访问各种类型
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
您可以使用缓存的预处理语句来执行一次性预处理查询
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = ?")
.execute(Tuple.of("julien"))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您可以创建一个 PreparedStatement
并自行管理生命周期。
sqlConnection
.prepare("SELECT * FROM users WHERE id = ?")
.onComplete(ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"))
.onComplete(ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
批处理
您可以执行预处理批处理
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// Execute the prepared batch
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
.executeBatch(batch)
.onComplete(res -> {
if (res.succeeded()) {
// Process rows
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
检索生成的键值
执行 INSERT
查询时,您可以检索生成的键值。
这些值以 Row
实例的形式返回。通过使用 OracleClient.GENERATED_KEYS
属性类型调用 SqlResult.property(kind)
,可以访问此实例。
键值可以通过列名检索
String sql = "INSERT INTO EntityWithIdentity (name, position) VALUES (?, ?)";
// Retrieve generated key column value by name
OraclePrepareOptions options = new OraclePrepareOptions()
.setAutoGeneratedKeysIndexes(new JsonArray().add("ID"));
client.preparedQuery(sql, options)
.execute(Tuple.of("john", 3))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
Row generated = result.property(OracleClient.GENERATED_KEYS);
Long id = generated.getLong("ID");
}
});
或者,可以通过列索引检索
String sql = "INSERT INTO EntityWithIdentity (name, position) VALUES (?, ?)";
// Retrieve generated key column value by index
OraclePrepareOptions options = new OraclePrepareOptions()
.setAutoGeneratedKeysIndexes(new JsonArray().add("1"));
client.preparedQuery(sql, options)
.execute(Tuple.of("john", 3))
.onComplete(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
Row generated = result.property(OracleClient.GENERATED_KEYS);
Long id = generated.getLong("ID");
}
});
使用连接
获取连接
当您需要执行顺序查询(不带事务)时,可以创建新连接或从连接池中借用一个连接。请记住,在从连接池获取连接并将其返回到连接池之间,您应该注意连接,因为它可能由于某种原因(例如空闲超时)而被服务器关闭。
pool
.getConnection()
.compose(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
// Return the connection to the pool
.eventually(() -> connection.close())
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
可以创建预处理查询
connection
.prepare("SELECT * FROM users WHERE first_name LIKE ?")
.compose(pq ->
pq.query()
.execute(Tuple.of("Julien"))
.eventually(() -> pq.close())
).onSuccess(rows -> {
// All rows
});
简化连接 API
当您使用连接池时,可以调用 withConnection
,将一个在连接中执行的函数传递给它。
它从连接池中借用一个连接,并使用该连接调用函数。
该函数必须返回一个任意结果的 Future。
Future 完成后,连接将返回到连接池,并提供总结果。
pool.withConnection(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
使用事务
连接事务
您可以使用 SQL 的 BEGIN
/COMMIT
/ROLLBACK
来执行事务,如果您这样做,则必须使用 SqlConnection
并自行管理它。
或者您可以使用 SqlConnection
的事务 API
pool.getConnection()
// Transaction must use a connection
.onSuccess(conn -> {
// Begin the transaction
conn.begin()
.compose(tx -> conn
// Various statements
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.compose(res2 -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute())
// Commit the transaction
.compose(res3 -> tx.commit()))
// Return the connection to the pool
.eventually(() -> conn.close())
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});
当数据库服务器报告当前事务失败时(例如臭名昭著的*当前事务已中止,在事务块结束前忽略命令*),事务将回滚,并且 completion
future 将因 TransactionRollbackException
而失败。
tx.completion()
.onFailure(err -> {
System.out.println("Transaction failed => rolled back");
});
简化的事务 API
当您使用连接池时,您可以调用 withTransaction
,将一个在事务中执行的函数传递给它。
它从连接池中借用一个连接,开始事务,并使用一个在事务范围内执行所有操作的客户端调用该函数。
该函数必须返回一个任意结果的 Future
-
当 Future 成功时,客户端将提交事务
-
当 Future 失败时,客户端将回滚事务
事务完成后,连接将返回到连接池,并提供总结果。
pool.withTransaction(client -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.flatMap(res -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute()
// Map to a message result
.map("Users inserted")))
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
游标和流式传输
默认情况下,预处理查询执行会获取所有行,您可以使用 游标
来控制您想要读取的行数
connection
.prepare("SELECT * FROM users WHERE age > ?")
.onComplete(ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Create a cursor
Cursor cursor = pq.cursor(Tuple.of(18));
// Read 50 rows
cursor
.read(50)
.onComplete(ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - close the cursor
cursor.close();
}
}
});
}
});
游标在提前释放时应关闭
cursor
.read(50)
.onComplete(ar2 -> {
if (ar2.succeeded()) {
// Close the cursor
cursor.close();
}
});
游标也提供了流 API,这可能更方便,尤其是在 Rxified 版本中。
connection
.prepare("SELECT * FROM users WHERE age > ?")
.onComplete(ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Fetch 50 rows at a time
RowStream<Row> stream = pq.createStream(50, Tuple.of(18));
// Use the stream
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
System.out.println("End of stream");
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
流以 50
行的批次读取行并进行流式传输,当行传递给处理器后,新的 50
行批次将被读取,依此类推。
流可以暂停或恢复,加载的行将保留在内存中直到它们被传递,并且游标将停止迭代。
支持的数据类型
目前,客户端支持以下 Oracle 数据类型
-
CHAR/VARCHAR2(
java.lang.String
) -
NCHAR/NVARCHAR2(
java.lang.String
) -
NUMBER(
BigDecimal
) -
FLOAT(
java.lang.Double
) -
DATE(
java.time.LocalDate
) -
TIMESTAMP(
java.time.LocalDateTime
) -
RAW(
io.vertx.core.buffer.Buffer
)
元组解码在存储值时使用上述类型。
client.preparedQuery("INSERT INTO images (name, data) VALUES (?, ?)")
// Use io.vertx.oracleclient.data.Blob when inserting
.execute(Tuple.of("beautiful-sunset.jpg", Blob.copy(imageBuffer)))
.onComplete(ar -> {
// Do something
});
client.preparedQuery("SELECT data FROM images WHERE id = ?")
.execute(Tuple.of(id))
.onComplete(ar -> {
if (ar.succeeded()) {
Row row = ar.result().iterator().next();
// Use io.vertx.core.buffer.Buffer when reading
Buffer data = row.getBuffer("data");
}
});
跟踪查询
当 Vert.x 启用了跟踪时,SQL 客户端可以跟踪查询执行。
客户端报告以下客户端跨度
-
Query
操作名称 -
标签
-
db.system
:数据库管理系统产品 -
db.user
:数据库用户名 -
db.instance
:数据库实例 -
db.statement
:SQL 查询 -
db.type
:sql
默认的追踪策略是 PROPAGATE
,客户端只会在活跃的追踪中创建跨度。
您可以通过 setTracingPolicy
更改客户端策略,例如您可以将 ALWAYS
设置为始终报告跨度。
options.setTracingPolicy(TracingPolicy.ALWAYS);
收集器查询
您可以将 Java 收集器与查询 API 结合使用
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// Run the query with the collector
client.query("SELECT * FROM users")
.collecting(collector)
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// Get the map created by the collector
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
收集器处理不得保留对 Row
的引用,因为处理整个集合只使用一个行。
Java Collectors
提供了许多有趣的预定义收集器,例如您可以轻松地直接从行集创建字符串
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users")
.collecting(collector)
.execute()
.onComplete(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
使用 SSL/TLS
要在客户端选项中启用加密,请使用 setSsl
方法。默认情况下,ssl
设置为 false
。
oracleConnectOptions.setSsl(true);
可以使用属性自定义加密。例如,设置信任库
oracleConnectOptions
.setSsl(true)
.addProperty("javax.net.ssl.trustStore", pathToTrustStore)
.addProperty("javax.net.ssl.trustStoreType", "JKS")
.addProperty("javax.net.ssl.trustStorePassword", trustStorePassword);