响应式 MySQL 客户端

响应式 MySQL 客户端是用于 MySQL 的客户端,其 API 简单直接,专注于可伸缩性和低开销。

该客户端是响应式且非阻塞的,允许单线程处理许多数据库连接。

特性

  • 事件驱动

  • 轻量级

  • 内置连接池

  • 预处理查询缓存

  • 游标支持

  • 行流式传输

  • RxJava API

  • 直接内存到对象,无需不必要的复制

  • 完整数据类型支持

  • 存储过程支持

  • TLS/SSL 支持

  • 查询管道化

  • MySQL 工具命令支持

  • 使用 MySQL 和 MariaDB

  • 丰富的排序规则和字符集支持

  • Unix 域套接字

用法

要使用响应式 MySQL 客户端,请将以下依赖项添加到您的构建描述符的 dependencies 部分

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-mysql-client</artifactId>
  <version>5.0.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.vertx:vertx-mysql-client:5.0.1'
}

快速入门

以下是连接、查询和断开连接的最简单方法

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the client pool
SqlClient client = MySQLBuilder
  .client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .build();

// A simple query
client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> result = ar.result();
      System.out.println("Got " + result.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }

    // Now close the pool
    client.close();
  });

连接到 MySQL

大多数情况下,您将使用连接池连接到 MySQL

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
Pool client = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .build();

池化客户端使用连接池,任何操作都将从池中借用一个连接来执行操作并将其释放回池中。

如果您正在运行 Vert.x,您可以将您的 Vertx 实例传递给它

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// Create the pooled client
Pool client = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

当您不再需要连接池时,您需要将其释放

pool.close();

当您需要在同一连接上执行多个操作时,您需要使用客户端 connection

您可以轻松地从连接池中获取一个

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
Pool pool = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Get a connection from the pool
pool.getConnection().compose(conn -> {
  System.out.println("Got a connection from the pool");

  // All operations execute on the same connection
  return conn
    .query("SELECT * FROM users WHERE id='julien'")
    .execute()
    .compose(res -> conn
      .query("SELECT * FROM users WHERE id='emad'")
      .execute())
    .onComplete(ar -> {
      // Release the connection to the pool
      conn.close();
    });
}).onComplete(ar -> {
  if (ar.succeeded()) {

    System.out.println("Done");
  } else {
    System.out.println("Something went wrong " + ar.cause().getMessage());
  }
});

一旦您使用完连接,您必须关闭它以将其释放回连接池,以便可以重用它。

命令管道化

在某些使用场景中,命令管道化可以提高数据库访问性能。

您可以配置客户端以使用管道化

Pool pool = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions.setPipeliningLimit(16))
  .using(vertx)
  .build();

默认的管道化限制为 1,这会禁用管道化。

如果您使用的代理不支持管道化,请不要启用它。否则,代理可能会突然关闭客户端连接。

连接池与池化客户端

MySQLBuilder 允许您创建连接池或池化客户端

connectOptions.setPipeliningLimit(64);
SqlClient client = MySQLBuilder.client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Pipelined
Future<RowSet<Row>> res1 = client.query(sql).execute();

// Connection pool
Pool pool = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Not pipelined
Future<RowSet<Row>> res2 = pool.query(sql).execute();
  • 连接池操作不是流水线化的,只有从连接池获取的连接才是流水线化的

  • 池化客户端操作是流水线化的,您无法从池化客户端获取连接

连接池共享

您可以在多个 Verticle 或同一 Verticle 的多个实例之间共享一个连接池。这样的连接池应该在 Verticle 之外创建,否则当创建它的 Verticle 卸载时,它将被关闭

Pool pool = MySQLBuilder.pool()
  .with(new PoolOptions().setMaxSize(maxSize))
  .connectingTo(database)
  .using(vertx)
  .build();
vertx.deployVerticle(() -> new VerticleBase() {
  @Override
  public Future<?> start() throws Exception {
    // Use the pool
    return super.start();
  }
}, new DeploymentOptions().setInstances(4));

您也可以在每个 Verticle 中创建一个共享连接池

vertx.deployVerticle(() -> new VerticleBase() {
  Pool pool;
  @Override
  public Future<?> start() throws Exception {
    // Get or create a shared pool
    // this actually creates a lease to the pool
    // when the verticle is undeployed, the lease will be released automaticaly
    pool = MySQLBuilder.pool()
      .with(new PoolOptions()
        .setMaxSize(maxSize)
        .setShared(true)
        .setName("my-pool"))
      .using(vertx)
      .build();
    return super.start();
  }
}, new DeploymentOptions().setInstances(4));

首次创建共享池时,它将为该池创建资源。后续调用将重用此池并创建对此池的租约。资源在所有租约关闭后处置。

默认情况下,连接池在需要创建 TCP 连接时会重用当前事件循环。因此,共享连接池将随机使用使用它的 verticle 的事件循环。

您可以为连接池分配一个事件循环数量,使其独立于使用它的上下文

Pool pool = MySQLBuilder.pool()
  .with(new PoolOptions()
    .setMaxSize(maxSize)
    .setShared(true)
    .setName("my-pool")
    .setEventLoopSize(4))
  .using(vertx)
  .build();

Unix 域套接字

有时出于简单性、安全性或性能原因,需要通过 Unix 域套接字进行连接。

由于 JVM 不支持域套接字,首先您必须将原生传输扩展添加到您的项目。

  • Maven(在您的 pom.xml 中)

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-epoll</artifactId>
  <version>${netty.version}</version>
  <classifier>linux-x86_64</classifier>
</dependency>
  • Gradle(在您的 build.gradle 文件中)

dependencies {
  compile 'io.netty:netty-transport-native-epoll:${netty.version}:linux-x86_64'
}
ARM64 的原生 epoll 支持也可以通过分类器 linux-aarch64 添加。
如果您的团队中有 Mac 用户,请添加带有分类器 osx-x86_64netty-transport-native-kqueue

然后通过 MySQLConnectOptions#setHost 设置域套接字的路径

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setHost("/var/run/mysqld/mysqld.sock")
  .setDatabase("the-db");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
Pool client = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Create the pooled client with a vertx instance
// Make sure the vertx instance has enabled native transports
// vertxOptions.setPreferNativeTransport(true);
Pool client2 = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

有关原生传输的更多信息,请参阅 Vert.x 文档

配置

您可以通过几种替代方案配置客户端。

数据对象

配置客户端的一种简单方法是指定一个 MySQLConnectOptions 数据对象。

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);

// Create the pool from the data object
Pool pool = MySQLBuilder
  .pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

pool.getConnection()
  .onComplete(ar -> {
    // Handling your connection
  });

排序规则和字符集

响应式 MySQL 客户端支持配置排序规则或字符集,并将其映射到相应的 java.nio.charset.Charset。例如,您可以像这样为连接指定字符集

MySQLConnectOptions connectOptions = new MySQLConnectOptions();

// set connection character set to utf8 instead of the default charset utf8mb4
connectOptions.setCharset("utf8");

响应式 MySQL 客户端将把 utf8mb4 作为默认字符集。密码和错误消息等字符串值始终以 UTF-8 字符集解码。

characterEncoding 选项用于确定将使用哪个 Java 字符集来编码字符串值,例如查询字符串和参数值,默认字符集是 UTF-8,如果将其设置为 null,则客户端将使用默认的 Java 字符集。

您也可以像这样为连接指定排序规则

MySQLConnectOptions connectOptions = new MySQLConnectOptions();

// set connection collation to utf8_general_ci instead of the default collation utf8mb4_general_ci
// setting a collation will override the charset option
connectOptions.setCharset("gbk");
connectOptions.setCollation("utf8_general_ci");

请注意,在数据对象上设置排序规则将覆盖 **charset** 和 **characterEncoding** 选项。

您可以执行 SQL SHOW COLLATION;SHOW CHARACTER SET; 来获取服务器支持的排序规则和字符集。

有关 MySQL 字符集和排序规则的更多信息,请参阅 MySQL 参考手册

连接属性

您还可以使用 setPropertiesaddProperty 方法配置连接属性。请注意,setProperties 将覆盖默认客户端属性。

MySQLConnectOptions connectOptions = new MySQLConnectOptions();

// Add a connection attribute
connectOptions.addProperty("_java_version", "1.8.0_212");

// Override the attributes
Map<String, String> attributes = new HashMap<>();
attributes.put("_client_name", "myapp");
attributes.put("_client_version", "1.0.0");
connectOptions.setProperties(attributes);

有关客户端连接属性的更多信息,请参阅 MySQL 参考手册

useAffectedRows

您可以配置 useAffectedRows 选项来决定连接到服务器时是否设置 CLIENT_FOUND_ROWS 标志。如果指定了 CLIENT_FOUND_ROWS 标志,则受影响的行数是找到的行数的数值,而不是受影响的行数。

有关此内容的更多信息,请参阅 MySQL 参考手册

连接 URI

除了使用 MySQLConnectOptions 数据对象进行配置外,当您想使用连接 URI 进行配置时,我们还为您提供了另一种连接方式

String connectionUri = "mysql://dbuser:[email protected]:3306/mydb";

// Create the pool from the connection URI
Pool pool = MySQLBuilder
  .pool()
  .connectingTo(connectionUri)
  .using(vertx)
  .build();

// Create the connection from the connection URI
MySQLConnection.connect(vertx, connectionUri)
  .onComplete(res -> {
    // Handling your connection
  });

有关连接字符串格式的更多信息,请参阅 MySQL 参考手册

目前,客户端支持以下参数键(不区分大小写)

  • host

  • port

  • user

  • password

  • schema

  • socket

  • useAffectedRows

在连接 URI 中配置参数将覆盖默认属性。

连接重试

您可以配置客户端在连接建立失败时进行重试。

options
  .setReconnectAttempts(2)
  .setReconnectInterval(1000);

运行查询

当您不需要事务或运行单个查询时,您可以直接在连接池上运行查询;连接池将使用其一个连接来运行查询并将结果返回给您。

以下是运行简单查询的方法

client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

预处理查询

您也可以对预处理查询执行相同的操作。

SQL 字符串可以通过位置引用参数,使用数据库语法 ?

client
  .preparedQuery("SELECT * FROM users WHERE id=?")
  .execute(Tuple.of("julien"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

查询方法提供一个异步 RowSet 实例,适用于 SELECT 查询

client
  .preparedQuery("SELECT first_name, last_name FROM users")
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

UPDATE/INSERT 查询

client
  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
  .execute(Tuple.of("Julien", "Viet"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Row 允许您通过索引访问数据

System.out.println("User " + row.getString(0) + " " + row.getString(1));
列索引从 0 开始,而不是从 1 开始。

或者,可以通过名称检索数据

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

客户端在这里不会做任何神奇的事情,列名是根据表中的名称来识别的,无论您的 SQL 文本是什么。

您可以访问各种类型

String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

您可以使用缓存的预处理语句来执行一次性预处理查询

connectOptions.setCachePreparedStatements(true);
client
  .preparedQuery("SELECT * FROM users WHERE id = ?")
  .execute(Tuple.of("julien"))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Got " + rows.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

您可以创建一个 PreparedStatement 并自行管理生命周期。

sqlConnection
  .prepare("SELECT * FROM users WHERE id = ?")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      PreparedStatement preparedStatement = ar.result();
      preparedStatement.query()
        .execute(Tuple.of("julien"))
        .onComplete(ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            System.out.println("Got " + rows.size() + " rows ");
            preparedStatement.close();
          } else {
            System.out.println("Failure: " + ar2.cause().getMessage());
          }
        });
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

批处理

您可以执行预处理批处理

List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));

// Execute the prepared batch
client
  .preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
  .executeBatch(batch)
  .onComplete(res -> {
  if (res.succeeded()) {

    // Process rows
    RowSet<Row> rows = res.result();
  } else {
    System.out.println("Batch failed " + res.cause());
  }
});

MySQL LAST_INSERT_ID

如果您向表中插入记录,可以获取自动递增的值。

client
  .query("INSERT INTO test(val) VALUES ('v1')")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      long lastInsertId = rows.property(MySQLClient.LAST_INSERTED_ID);
      System.out.println("Last inserted id is: " + lastInsertId);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

更多信息请参阅 如何获取最后插入行的唯一 ID

使用连接

获取连接

当您需要执行顺序查询(不带事务)时,您可以创建新连接或从连接池中借用一个。请记住,在从连接池获取连接并将其返回到连接池之间,您应该注意连接,因为它可能因空闲超时等原因被服务器关闭。

pool
  .getConnection()
  .compose(connection ->
    connection
      .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
      .executeBatch(Arrays.asList(
        Tuple.of("Julien", "Viet"),
        Tuple.of("Emad", "Alblueshi")
      ))
      .compose(res -> connection
        // Do something with rows
        .query("SELECT COUNT(*) FROM Users")
        .execute()
        .map(rows -> rows.iterator().next().getInteger(0)))
      // Return the connection to the pool
      .eventually(() -> connection.close())
  ).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

可以创建预处理查询

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE ?")
  .compose(pq ->
    pq.query()
      .execute(Tuple.of("Julien"))
      .eventually(() -> pq.close())
  ).onSuccess(rows -> {
  // All rows
});

简化连接 API

当您使用连接池时,可以调用 withConnection,将一个在连接中执行的函数传递给它。

它从连接池中借用一个连接,并使用该连接调用函数。

该函数必须返回一个任意结果的 Future。

Future 完成后,连接将返回到连接池,并提供总结果。

pool.withConnection(connection ->
  connection
    .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
    .executeBatch(Arrays.asList(
      Tuple.of("Julien", "Viet"),
      Tuple.of("Emad", "Alblueshi")
    ))
    .compose(res -> connection
      // Do something with rows
      .query("SELECT COUNT(*) FROM Users")
      .execute()
      .map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

使用事务

连接事务

您可以使用 SQL 的 BEGIN/COMMIT/ROLLBACK 来执行事务,如果您这样做,则必须使用 SqlConnection 并自行管理它。

或者您可以使用 SqlConnection 的事务 API

pool.getConnection()
  // Transaction must use a connection
  .onSuccess(conn -> {
    // Begin the transaction
    conn.begin()
      .compose(tx -> conn
        // Various statements
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
        .execute()
        .compose(res2 -> conn
          .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
          .execute())
        // Commit the transaction
        .compose(res3 -> tx.commit()))
      // Return the connection to the pool
      .eventually(() -> conn.close())
      .onSuccess(v -> System.out.println("Transaction succeeded"))
      .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
  });

当数据库服务器报告当前事务失败时(例如臭名昭著的*当前事务已中止,命令将被忽略直到事务块结束*),事务将被回滚,并且 completion future 将以 TransactionRollbackException 失败。

tx.completion()
  .onFailure(err -> {
    System.out.println("Transaction failed => rolled back");
  });

简化的事务 API

当您使用连接池时,您可以调用 withTransaction,将一个在事务中执行的函数传递给它。

它从连接池中借用一个连接,开始事务,并使用一个在事务范围内执行所有操作的客户端调用该函数。

该函数必须返回一个任意结果的 Future

  • 当 Future 成功时,客户端将提交事务

  • 当 Future 失败时,客户端将回滚事务

事务完成后,连接将返回到连接池,并提供总结果。

pool.withTransaction(client -> client
  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  .execute()
  .flatMap(res -> client
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
    .execute()
    // Map to a message result
    .map("Users inserted")))
  .onSuccess(v -> System.out.println("Transaction succeeded"))
  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

游标和流式传输

在撰写本文时,如果您通过 ProxySQL 连接到数据库,则无法使用以下功能。如果尝试,您将看到此错误消息

RECEIVED AN UNKNOWN COMMAND: 28 -- PLEASE REPORT A BUG

这是因为代理不处理从游标中获取行所需的某种命令(COM_STMT_FETCH)。

默认情况下,预处理查询执行会获取所有行,您可以使用 游标 来控制您想要读取的行数

connection
  .prepare("SELECT * FROM users WHERE age > ?")
  .onComplete(ar1 -> {
  if (ar1.succeeded()) {
    PreparedStatement pq = ar1.result();

    // Create a cursor
    Cursor cursor = pq.cursor(Tuple.of(18));

    // Read 50 rows
    cursor
      .read(50)
      .onComplete(ar2 -> {
      if (ar2.succeeded()) {
        RowSet<Row> rows = ar2.result();

        // Check for more ?
        if (cursor.hasMore()) {
          // Repeat the process...
        } else {
          // No more rows - close the cursor
          cursor.close();
        }
      }
    });
  }
});

游标在提前释放时应关闭

cursor
  .read(50)
  .onComplete(ar2 -> {
  if (ar2.succeeded()) {
    // Close the cursor
    cursor.close();
  }
});

游标也提供了流 API,这可能更方便,尤其是在 Rxified 版本中。

connection
  .prepare("SELECT * FROM users WHERE age > ?")
  .onComplete(ar1 -> {
  if (ar1.succeeded()) {
    PreparedStatement pq = ar1.result();

    // Fetch 50 rows at a time
    RowStream<Row> stream = pq.createStream(50, Tuple.of(18));

    // Use the stream
    stream.exceptionHandler(err -> {
      System.out.println("Error: " + err.getMessage());
    });
    stream.endHandler(v -> {
      System.out.println("End of stream");
    });
    stream.handler(row -> {
      System.out.println("User: " + row.getString("last_name"));
    });
  }
});

流以 50 行的批次读取行并进行流式传输,当行传递给处理器后,新的 50 行批次将被读取,依此类推。

流可以暂停或恢复,加载的行将保留在内存中直到它们被传递,并且游标将停止迭代。

跟踪查询

当 Vert.x 启用了跟踪时,SQL 客户端可以跟踪查询执行。

客户端报告以下客户端跨度

  • Query 操作名称

  • 标签

  • db.system:数据库管理系统产品

  • db.user:数据库用户名

  • db.instance:数据库实例

  • db.statement:SQL 查询

  • db.typesql

默认的追踪策略是 PROPAGATE,客户端只会在活跃的追踪中创建跨度。

您可以通过 setTracingPolicy 更改客户端策略,例如,您可以将 ALWAYS 设置为始终报告跨度(span)

options.setTracingPolicy(TracingPolicy.ALWAYS);

MySQL 类型映射

目前客户端支持以下 MySQL 类型

  • BOOL,BOOLEAN (java.lang.Byte)

  • TINYINT (java.lang.Byte)

  • TINYINT UNSIGNED(java.lang.Short)

  • SMALLINT (java.lang.Short)

  • SMALLINT UNSIGNED(java.lang.Integer)

  • MEDIUMINT (java.lang.Integer)

  • MEDIUMINT UNSIGNED(java.lang.Integer)

  • INT,INTEGER (java.lang.Integer)

  • INTEGER UNSIGNED(java.lang.Long)

  • BIGINT (java.lang.Long)

  • BIGINT UNSIGNED(io.vertx.sqlclient.data.Numeric)

  • FLOAT (java.lang.Float)

  • FLOAT UNSIGNED(java.lang.Float)

  • DOUBLE (java.lang.Double)

  • DOUBLE UNSIGNED(java.lang.Double)

  • BIT (java.lang.Long)

  • NUMERIC (io.vertx.sqlclient.data.Numeric)

  • NUMERIC UNSIGNED(io.vertx.sqlclient.data.Numeric)

  • DATE (java.time.LocalDate)

  • DATETIME (java.time.LocalDateTime)

  • TIME (java.time.Duration)

  • TIMESTAMP (java.time.LocalDateTime)

  • YEAR (java.lang.Short)

  • CHAR (java.lang.String)

  • VARCHAR (java.lang.String)

  • BINARY (io.vertx.core.buffer.Buffer)

  • VARBINARY (io.vertx.core.buffer.Buffer)

  • TINYBLOB (io.vertx.core.buffer.Buffer)

  • TINYTEXT (java.lang.String)

  • BLOB (io.vertx.core.buffer.Buffer)

  • TEXT (java.lang.String)

  • MEDIUMBLOB (io.vertx.core.buffer.Buffer)

  • MEDIUMTEXT (java.lang.String)

  • LONGBLOB (io.vertx.core.buffer.Buffer)

  • LONGTEXT (java.lang.String)

  • ENUM (java.lang.String)

  • SET (java.lang.String)

  • JSON (io.vertx.core.json.JsonObject, io.vertx.core.json.JsonArray, Number, Boolean, String, io.vertx.sqlclient.Tuple#JSON_NULL)

  • GEOMETRY(io.vertx.mysqlclient.data.spatial.*)

元组(Tuple)解码在存储值时使用上述类型

注意:在 Java 中没有特定的无符号数值表示,因此此客户端会将无符号值转换为相关的 Java 类型。

隐式类型转换

响应式 MySQL 客户端在执行预处理语句时支持隐式类型转换。假设您的表中有 TIME 列,以下两个示例都将在此处工作。

client
  .preparedQuery("SELECT * FROM students WHERE updated_time = ?")
  .execute(Tuple.of(LocalTime.of(19, 10, 25)))
  .onComplete(ar -> {
    // handle the results
  });
// this will also work with implicit type conversion
client
  .preparedQuery("SELECT * FROM students WHERE updated_time = ?")
  .execute(Tuple.of("19:10:25"))
  .onComplete(ar -> {
    // handle the results
  });

用于编码的 MySQL 数据类型将从参数值中推断,类型映射如下

参数值类型 编码 MySQL 类型

null

MYSQL_TYPE_NULL

java.lang.Byte

MYSQL_TYPE_TINY

java.lang.Boolean

MYSQL_TYPE_TINY

java.lang.Short

MYSQL_TYPE_SHORT

java.lang.Integer

MYSQL_TYPE_LONG

java.lang.Long

MYSQL_TYPE_LONGLONG

java.lang.Double

MYSQL_TYPE_DOUBLE

java.lang.Float

MYSQL_TYPE_FLOAT

java.time.LocalDate

MYSQL_TYPE_DATE

java.time.Duration

MYSQL_TYPE_TIME

java.time.LocalTime

MYSQL_TYPE_TIME

io.vertx.core.buffer.Buffer

MYSQL_TYPE_BLOB

java.time.LocalDateTime

MYSQL_TYPE_DATETIME

io.vertx.mysqlclient.data.spatial.*

MYSQL_TYPE_BLOB

默认

MYSQL_TYPE_STRING

处理 BOOLEAN

在 MySQL 中,BOOLEANBOOL 数据类型是 TINYINT(1) 的同义词。值为零被视为假,非零值被视为真。BOOLEAN 数据类型值在 RowTuple 中存储为 java.lang.Byte 类型,您可以调用 Row#getValue 将其检索为 java.lang.Byte 值,或者调用 Row#getBoolean 将其检索为 java.lang.Boolean 值。

client
  .query("SELECT graduated FROM students WHERE id = 0")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rowSet = ar.result();
      for (Row row : rowSet) {
        int pos = row.getColumnIndex("graduated");
        Byte value = row.get(Byte.class, pos);
        Boolean graduated = row.getBoolean("graduated");
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

当您想要执行带有 BOOLEAN 值参数的预处理语句时,您只需将 java.lang.Boolean 值添加到参数列表中。

client
  .preparedQuery("UPDATE students SET graduated = ? WHERE id = 0")
  .execute(Tuple.of(true))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Updated with the boolean value");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

处理 JSON

MySQL JSON 数据类型由以下 Java 类型表示

  • String

  • Number

  • Boolean

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • 用于表示 JSON 空字面量的 io.vertx.sqlclient.Tuple#JSON_NULL

Tuple tuple = Tuple.of(
  Tuple.JSON_NULL,
  new JsonObject().put("foo", "bar"),
  3);

// Retrieving json
Object value = tuple.getValue(0); // Expect JSON_NULL

//
value = tuple.get(JsonObject.class, 1); // Expect JSON object

//
value = tuple.get(Integer.class, 2); // Expect 3
value = tuple.getInteger(2); // Expect 3

处理 BIT

BIT 数据类型映射到 java.lang.Long 类型,但 Java 没有无符号数值的概念,因此如果您想插入或更新具有 BIT(64) 最大值的记录,您可以通过将参数设置为 -1L 来实现。

处理 TIME

MySQL TIME 数据类型可用于表示一天中的时间或时间间隔,范围从 -838:59:59838:59:59。在响应式 MySQL 客户端中,TIME 数据类型原生映射到 java.time.Duration,但您也可以通过 Row#getLocalTime 访问器将其检索为 java.time.LocalTime

处理 NUMERIC

Numeric Java 类型用于表示 MySQL NUMERIC 类型。

Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
  // Handle NaN
} else {
  BigDecimal value = numeric.bigDecimalValue();
}

处理 ENUM

MySQL 支持 ENUM 数据类型,客户端将这些类型检索为 String 数据类型。

您可以像这样将 Java 枚举编码为 String

client
  .preparedQuery("INSERT INTO colors VALUES (?)")
  .execute(Tuple.of(Color.red))
  .onComplete(res -> {
    // ...
  });

您可以像这样将 ENUM 列检索为 Java 枚举

client
  .preparedQuery("SELECT color FROM colors")
  .execute()
  .onComplete(res -> {
  if (res.succeeded()) {
    RowSet<Row> rows = res.result();
    for (Row row : rows) {
      System.out.println(row.get(Color.class, "color"));
    }
  }
});

处理 GEOMETRY

MySQL GEOMETRY 数据类型也受支持。这里有一些示例,展示了您可以以 Well-Known Text (WKT) 格式或 Well-Known Binary (WKB) 格式处理几何数据,数据被解码为 MySQL TEXT 或 BLOB 数据类型。有许多优秀的第三方库用于处理这种格式的数据。

您可以以 WKT 格式获取空间数据

client
  .query("SELECT ST_AsText(g) FROM geom;")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Fetch the spatial data in WKT format
      RowSet<Row> result = ar.result();
      for (Row row : result) {
        String wktString = row.getString(0);
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

或者您可以以 WKB 格式获取空间数据

client
  .query("SELECT ST_AsBinary(g) FROM geom;")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Fetch the spatial data in WKB format
      RowSet<Row> result = ar.result();
      for (Row row : result) {
        Buffer wkbValue = row.getBuffer(0);
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

我们还为您提供了一种在响应式 MySQL 客户端中处理几何数据类型的简单方法。

您可以将几何数据检索为 Vert.x 数据对象

client
  .query("SELECT g FROM geom;")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Fetch the spatial data as a Vert.x Data Object
      RowSet<Row> result = ar.result();
      for (Row row : result) {
        Point point = row.get(Point.class, 0);
        System.out.println("Point x: " + point.getX());
        System.out.println("Point y: " + point.getY());
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

您也可以将其作为预处理语句参数,以 WKB 表示。

Point point = new Point(0, 1.5, 1.5);
// Send as a WKB representation
client
  .preparedQuery("INSERT INTO geom VALUES (ST_GeomFromWKB(?))")
  .execute(Tuple.of(point))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Success");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

收集器查询

您可以将 Java 收集器与查询 API 结合使用

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));

// Run the query with the collector
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      SqlResult<Map<Long, String>> result = ar.result();

      // Get the map created by the collector
      Map<Long, String> map = result.value();
      System.out.println("Got " + map);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

收集器处理不得保留对 Row 的引用,因为处理整个集合只使用一个行。

Java Collectors 提供了许多有趣的预定义收集器,例如您可以轻松地直接从行集创建字符串

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
);

// Run the query with the collector
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();

      // Get the string created by the collector
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

MySQL 存储过程

您可以在查询中运行存储过程。结果将根据 MySQL 协议从服务器检索,没有任何额外的“魔法”操作。

client.query("CREATE PROCEDURE multi() BEGIN\n" +
  "  SELECT 1;\n" +
  "  SELECT 1;\n" +
  "  INSERT INTO ins VALUES (1);\n" +
  "  INSERT INTO ins VALUES (2);\n" +
  "END;")
  .execute()
  .onComplete(ar1 -> {
  if (ar1.succeeded()) {
    // create stored procedure success
    client
      .query("CALL multi();")
      .execute()
      .onComplete(ar2 -> {
      if (ar2.succeeded()) {
        // handle the result
        RowSet<Row> result1 = ar2.result();
        Row row1 = result1.iterator().next();
        System.out.println("First result: " + row1.getInteger(0));

        RowSet<Row> result2 = result1.next();
        Row row2 = result2.iterator().next();
        System.out.println("Second result: " + row2.getInteger(0));

        RowSet<Row> result3 = result2.next();
        System.out.println("Affected rows: " + result3.rowCount());
      } else {
        System.out.println("Failure: " + ar2.cause().getMessage());
      }
    });
  } else {
    System.out.println("Failure: " + ar1.cause().getMessage());
  }
});

注意:目前不支持预处理语句绑定 OUT 参数。

MySQL LOCAL INFILE

此客户端支持处理 LOCAL INFILE 请求,如果您想将数据从本地文件加载到服务器,可以使用查询 LOAD DATA LOCAL INFILE '<filename>' INTO TABLE <table>;。更多信息请参阅 MySQL 参考手册

认证

默认身份验证插件

此客户端支持在连接开始时指定要使用的默认身份验证插件。目前支持以下插件

  • mysql_native_password

  • caching_sha2_password

  • mysql_clear_password

MySQLConnectOptions options = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setAuthenticationPlugin(MySQLAuthenticationPlugin.MYSQL_NATIVE_PASSWORD); // set the default authentication plugin

MySQL 8 中引入的新身份验证方法

MySQL 8.0 引入了一种名为 caching_sha2_password 的新身份验证方法,它是默认的身份验证方法。为了使用此新身份验证方法连接到服务器,您需要使用安全连接(即启用 TLS/SSL)或使用 RSA 密钥对交换加密密码,以避免密码泄露。RSA 密钥对在通信过程中自动交换,但服务器 RSA 公钥在此过程中可能会被破解,因为它是在不安全的连接上传输的。因此,如果您处于不安全的连接并想避免暴露服务器 RSA 公钥的风险,您可以像这样设置服务器 RSA 公钥

MySQLConnectOptions options1 = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setServerRsaPublicKeyPath("tls/files/public_key.pem"); // configure with path of the public key

MySQLConnectOptions options2 = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setServerRsaPublicKeyValue(Buffer.buffer("-----BEGIN PUBLIC KEY-----\n" +
    "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3yvG5s0qrV7jxVlp0sMj\n" +
    "xP0a6BuLKCMjb0o88hDsJ3xz7PpHNKazuEAfPxiRFVAV3edqfSiXoQw+lJf4haEG\n" +
    "HQe12Nfhs+UhcAeTKXRlZP/JNmI+BGoBduQ1rCId9bKYbXn4pvyS/a1ft7SwFkhx\n" +
    "aogCur7iIB0WUWvwkQ0fEj/Mlhw93lLVyx7hcGFq4FOAKFYr3A0xrHP1IdgnD8QZ\n" +
    "0fUbgGLWWLOossKrbUP5HWko1ghLPIbfmU6o890oj1ZWQewj1Rs9Er92/UDj/JXx\n" +
    "7ha1P+ZOgPBlV037KDQMS6cUh9vTablEHsMLhDZanymXzzjBkL+wH/b9cdL16LkQ\n" +
    "5QIDAQAB\n" +
    "-----END PUBLIC KEY-----\n")); // configure with buffer of the public key

有关 caching_sha2_password 身份验证方法的更多信息,请参阅 MySQL 参考手册

使用 SSL/TLS

要配置客户端使用 SSL 连接,您可以像配置 Vert.x NetClient 一样配置 MySQLConnectOptions。支持所有 SSL 模式,并且您可以配置 sslmode。客户端默认处于 DISABLED SSL 模式。ssl 参数仅作为设置 sslmode 的一个简写。setSsl(true) 等同于 setSslMode(VERIFY_CA)setSsl(false) 等同于 setSslMode(DISABLED)

MySQLConnectOptions options = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setSslMode(SslMode.VERIFY_CA)
  .setSslOptions(new ClientSSLOptions().setTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem")));

MySQLConnection.connect(vertx, options)
  .onComplete(res -> {
    if (res.succeeded()) {
      // Connected with SSL
    } else {
      System.out.println("Could not connect " + res.cause());
    }
  });

更多信息可在 Vert.x 文档中找到。

MySQL 工具命令

有时您想使用 MySQL 工具命令,我们为此提供了支持。更多信息请参阅 MySQL 工具命令

COM_PING

您可以使用 COM_PING 命令检查服务器是否存活。如果服务器响应 PING,处理程序将收到通知,否则处理程序将永远不会被调用。

connection.ping().onComplete(ar -> {
  System.out.println("The server has responded to the PING");
});

COM_RESET_CONNECTION

您可以使用 COM_RESET_CONNECTION 命令重置会话状态,这将重置连接状态,例如: - 用户变量 - 临时表 - 预处理语句

connection
  .resetConnection()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Connection has been reset now");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

COM_CHANGE_USER

您可以更改当前连接的用户,这将执行重新身份验证并重置连接状态,就像 COM_RESET_CONNECTION 一样。

MySQLAuthOptions authenticationOptions = new MySQLAuthOptions()
  .setUser("newuser")
  .setPassword("newpassword")
  .setDatabase("newdatabase");
connection
  .changeUser(authenticationOptions)
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("User of current connection has been changed.");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

COM_INIT_DB

您可以使用 COM_INIT_DB 命令更改连接的默认模式。

connection
  .specifySchema("newschema")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Default schema changed to newschema");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

COM_STATISTICS

您可以使用 COM_STATISTICS 命令获取 MySQL 服务器中一些内部状态变量的可读字符串。

connection
  .getInternalStatistics()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Statistics: " + ar.result());
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

COM_DEBUG

您可以使用 COM_DEBUG 命令将调试信息转储到 MySQL 服务器的标准输出 (STDOUT)。

connection
  .debug()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Debug info dumped to server's STDOUT");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

COM_SET_OPTION

您可以使用 COM_SET_OPTION 命令为当前连接设置选项。目前只能设置 CLIENT_MULTI_STATEMENTS

例如,您可以使用此命令禁用 CLIENT_MULTI_STATEMENTS

connection
  .setOption(MySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF)
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("CLIENT_MULTI_STATEMENTS is off now");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

MySQL 和 MariaDB 版本支持矩阵

MySQL MariaDB

版本

支持

版本

支持

5.5

10.1

5.6

10.2

5.7

10.3

8.0

10.4

已知问题

  • 连接重置工具命令在 MySQL 5.5、5.6 和 MariaDB 10.1 中不起作用

  • MariaDB 10.2 和 10.3 不支持更改用户工具命令

陷阱与最佳实践

以下是一些使用响应式 MySQL 客户端时避免常见陷阱的最佳实践。

预处理语句数量限制

有时您可能会遇到臭名昭著的错误 Can’t create more than max_prepared_stmt_count statements (current value: 16382),这是因为服务器已达到预处理语句总数的限制。

您可以调整服务器系统变量 max_prepared_stmt_count,但它有一个上限值,所以您无法通过这种方式完全消除错误。

缓解此问题的最佳方法是启用预处理语句缓存,这样具有相同 SQL 字符串的预处理语句可以被重用,客户端无需为每个请求创建全新的预处理语句。预处理语句在执行后将自动关闭。通过这种方式,达到限制的可能性可以大大降低,尽管不能完全消除。

您还可以通过 SqlConnection#prepare 接口手动创建 PreparedStatement 对象来管理预处理语句的生命周期,从而选择何时释放语句句柄,甚至使用 SQL 语法预处理语句

揭秘预处理批处理

有时您想批量插入数据到数据库中,您可以使用 PreparedQuery#executeBatch,它提供了一个简单的 API 来处理此操作。请记住,MySQL 不原生支持批处理协议,因此此 API 只是通过逐个执行预处理语句来实现的一种“语法糖”,这意味着与通过执行一个带有值列表的预处理语句插入多行相比,需要更多的网络往返。

棘手的 DATE 和 TIME 数据类型

处理 MySQL DATE 和 TIME 数据类型,尤其是涉及时区时,是棘手的,因此响应式 MySQL 客户端不会对这些值进行任何“魔法”转换。

  • MySQL DATETIME 数据类型不包含时区信息,因此无论当前会话中的时区是什么,您获取的值都与您设置的值相同。

  • MySQL TIMESTAMP 数据类型包含时区信息,因此当您设置或获取值时,它总是由服务器根据当前会话中设置的时区进行转换。

高级连接池配置

服务器负载均衡

您可以将连接池配置为包含服务器列表而不是单个服务器。

Pool pool = MySQLBuilder.pool()
  .with(options)
  .connectingTo(Arrays.asList(server1, server2, server3))
  .using(vertx)
  .build();

连接池在创建连接时使用轮询负载均衡来选择不同的服务器。

这在创建连接时提供负载均衡,而不是在从连接池借用连接时。

连接池初始化

您可以使用 withConnectHandler 在连接创建后并将其插入连接池之前与连接进行交互。

builder.withConnectHandler(conn -> {
  conn.query(sql).execute().onSuccess(res -> {
    // Release the connection to the pool, ready to be used by the application
    conn.close();
  });
});

一旦您完成连接,您应该简单地关闭它以向连接池发出信号以使用它。

动态连接配置

您可以使用 Java 供应商而不是 SqlConnectOptions 实例来配置连接池的连接详情。

由于供应商是异步的,它可以用于提供动态的连接池配置(例如密码轮换)。

Pool pool = MySQLBuilder.pool()
  .connectingTo(() -> {
    Future<SqlConnectOptions> connectOptions = retrieveOptions();
    return connectOptions;
  })
  .using(vertx)
  .build();