executeBatch

Api

java.sql.Statement#addBatch
java.sql.PreparedStatement#addBatch
java.sql.Statement#executeBatch
java.sql.Statement#clearBatch

原理

  1. 减少了 JDBC 客户端和数据库服务器之间网络传输的开销:使用 batch 功能前,每提交一个SQL,都需要一次网络IO开销,且提交后需要等待服务端返回结果后,才能提交下一个SQL;而使用 batch 功能后,客户端的多个SQL是一起提交给服务器的,只涉及到一次网络IO开销

  2. 当 batch 底层使用的是静态SQL并参数化执行时(JAVA中一般是使用类 java.sql.PreparedStatement 来参数化执行静态SQL),数据库服务器可以只做一次解析:利用对参数化机制的支持,数据库服务器仅需要对 PreparedStatement 做一次解析(sql parse),即可传入不同参数执行该 batch 中所有的 SQL

批量插入:

  • 会改写批量中的一组sql为一条 “multi-values” 语句,并一次性提交给数据库服务器:batch Insert 会被改写为 insert into t(…) values (…), (…), (…) 并一次性提交;

  • 如果不能被改写为 "multi-values", 则会改写为多个;分割的sql语句并一次性提交:insert into t(...) values(...);insert into t(...) values(...)

批量更新:

batchUpdate 会被改写为 update t set… where id = 1; update t set… where id = 2; update t set… where id = 3… 并一次性提交;

批量删除:

batchDelete 会被改写为 delete from t where id = 1; delete from t where id = 2; delete from t where id = 3;….并一次性提交;

注意点:

  • batch 功能对 statement 和 PreparedStatement 都有效,但为了避免 SQL 注入的风险,不推荐使用动态SQL,而是推荐使用静态 SQL 和绑定变量(也就是使用 PreparedStatement 和 stored procedures);

  • batch 功能对所有SQL 都有效, 包括 SELECT/INSERT/UPDATE/DELETE,但由于使用 batch 功能后,返回值是 int[] 数组,不太方便获取 batch 底层每个sql的执行结果,所以大家一般不会对 SELECT 语句使用 batch 功能 (毕竟select查询的目的是获得每个select语句的结果resultSet),而只会在大量 INSERT/UPDATE/DELETE 的场景下,尤其是大批量插入的场景下,使用 batch 功能,所以大家提到 batch时,常说“批量更新“;

  • 另外需要注意的是,使用 batch 功能并不代表所有的 SQL 都在一个事务里:自动提交(autocommit=true)模式下,在创建了每个statement 后,数据库将确保结果正确存在,然后再转到下一个statement 。如果批处理的第n句引发约束异常,则不会回滚以前插入的所有行。

MySQL

设置:

&rewriteBatchedStatements=true
  • MySQL的 JDBC连接的url中要加 &rewriteBatchedStatements 参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入

  • MySQL JDBC 驱动在默认情况下会无视 executeBatch() 语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给 MySQL 数据库,批量插入实际上是单条插入,直接造成较低的性能。

  • 只有把 rewriteBatchedStatements 参数置为 true, 驱动才会帮你批量执行SQL

  • 这个选项对 INSERT/UPDATE/DELETE 都有效

executeBatch

HikariProxyPreparedStatement
    executeBatch()
    
ProxyStatement
    executeBatch()
        this.connection.markCommitStateDirty();
        return this.delegate.executeBatch();
​
PreparedStatementWrapper p6spy
    
StatementWrapper    p6spy
    executeBatch()
        this.delegate.executeBatch();
    
StatementImpl
    executeBatch()
        return Util.truncateAndConvertToInt(executeBatchInternal());
    
    executeBatchInternal()
        ...
        
ClientPreparedStatement
    executeBatchInternal()
        int batchTimeout = getTimeoutInMillis();
        // 如果加了批量连接参数 且是插入
        return executeBatchedInserts(batchTimeout);
        // 如果加了批量连接参数 不是插入
        return executePreparedBatchAsMultiStatement(batchTimeout);
        // 没加
        return executeBatchSerially(batchTimeout);
​
=============================
    
    executeBatchedInserts()
        // 数据条数
        int numBatchedArgs = this.query.getBatchedArgs().size();
        long[] updateCounts = new long[numBatchedArgs]
        long updateCountRunningTotal = 0;
        // 有一条数据插入失败直接是0,因为服务器实际执行时是一条完整sql,要么全部成功返回记录条数,要么有出错返回0
        updateCountRunningTotal += batchedStatement.executeLargeUpdate();
        如果 numBatchedArgs > 1
            // 要么是-2要么是0
            long updCount = updateCountRunningTotal > 0 ? java.sql.Statement.SUCCESS_NO_INFO : 0;
            for (int j = 0; j < numBatchedArgs; j++) {
                updateCounts[j] = updCount;
            }
        否则
            updateCounts[0] = updateCountRunningTotal;
        return updateCounts;    
​
    executeLargeUpdate()
        executeUpdateInternal(true, false)
        
    executeUpdateInternal(boolean,boolean)
        return executeUpdateInternal(((PreparedQuery) this.query).getQueryBindings(), isBatch);
​
=============================
​
    executeBatchSerially(int batchTimeout)
        int nbrCommands = this.query.getBatchedArgs().size();
        updateCounts = new long[nbrCommands];
        循环填充为-3
        循环执行
            Object arg = this.query.getBatchedArgs().get(batchCommandIndex);
            QueryBindings queryBindings = (QueryBindings) arg;
            updateCounts[batchCommandIndex] = executeUpdateInternal(queryBindings, true);
​
    executeUpdateInternal(QueryBindings bindings, boolean isReallyBatch)
        Message sendPacket = ((PreparedQuery) this.query).fillSendPacket(bindings);
        ResultSetInternalMethods rs = executeInternal(-1, sendPacket, false, false, null, isReallyBatch);
        this.updateCount = rs.getUpdateCount();
        if (containsOnDuplicateKeyUpdate() && this.compensateForOnDuplicateKeyUpdate) {
            if (this.updateCount == 2 || this.updateCount == 0) {
                this.updateCount = 1;
            }
        }
        return this.updateCount;
​
    executeInternal(...)
        JdbcConnection locallyScopedConnection = this.connection;
        ResultSetInternalMethods rs = ((NativeSession) locallyScopedConnection.getSession()).execSQL(...);
        return rs;
​
​
NativeSession
    execSQL(...)
          return packet == null? ((NativeProtocol) this.protocol).sendQueryString(...)
                    : ((NativeProtocol) this.protocol).sendQueryPacket(...);
​
NativeProtocol
    sendQueryPacket()
        NativePacketPayload resultPacket = sendCommand(queryPacket, false, 0);
        rs = readAllResults(...);
        return rs;
    
    sendCommand(...)
        send(queryPacket, queryPacket.getPosition());
        NativePacketPayload returnPacket = checkErrorMessage(command);
        return returnPacket;
​
    checkErrorMessage(int command)
        NativePacketPayload resultPacket = readMessage(this.reusablePacket);
        checkErrorMessage(resultPacket);
​
    checkErrorMessage(NativePacketPayload resultPacket)
        byte statusCode = (byte) resultPacket.readInteger(IntegerDataType.INT1);
        if (statusCode == (byte) 0xff)
            //从 Mysql报文解析错误码,封装异常
            // 抛出 CJException
    
    readAllResults()
        T topLevelResultSet = read(Resultset.class, ...)
        return topLevelResultSet;
    
    read()
        ProtocolEntityReader<T, NativePacketPayload> sr = isBinaryEncoded
                ? this.PROTOCOL_ENTITY_CLASS_TO_BINARY_READER.get(requiredClass)
                : this.PROTOCOL_ENTITY_CLASS_TO_TEXT_READER.get(requiredClass);
        // com.mysql.cj.protocol.a.TextResultsetReader
        return sr.read(...);
​
    readServerStatusForResultSets(...)
        OkPacket ok = OkPacket.parse(rowPacket,...);
        result = (T) ok;
        return result;
    
​
TextResultsetReader
    Resultset read()    
        // 封装 updateCount
        OkPacket ok = this.protocol.readServerStatusForResultSets(resultPacket, false); 
        // com.mysql.cj.jdbc.result.ResultSetFactory
        Resultset rs = resultSetFactory.createFromProtocolEntity(ok);
        // com.mysql.cj.jdbc.result.ResultSetImpl
        return rs;
​
ResultSetFactory
    ResultSetImpl createFromProtocolEntity(ProtocolEntity protocolEntity)
        // 是 OkPacket,updateCount 封装到 ResultSetImpl
        return new ResultSetImpl((OkPacket) protocolEntity, this.conn, this.stmt);
​
OkPacket
    OkPacket parse(NativePacketPayload buf, String errorMessageEncoding)
        OkPacket ok = new OkPacket();
        ok.setUpdateCount(buf.readInteger(IntegerDataType.INT_LENENC)); // affected_rows
        ok.setUpdateID(buf.readInteger(IntegerDataType.INT_LENENC)); // last_insert_id

PgSQL

设置:

&reWriteBatchedInserts=true

Oracle

方案

  1. 多个insert into t(...) values(...) 优化成 insert into t(…) values (…), (…), (…)

  2. 减少网络IO,客户端的多个SQL一起提交给服务器,只涉及一次网络IO开销

  3. 关闭自动提交事务,手动提交

  4. 设置分片:batchSize = 1000:解决 sql 过长问题,默认情况下 MySQL 可执行的最大 SQL 为 4M

  5. 采用并行流:

 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");

MySQL 插入10w 条数据耗时(ms):

(1)Statement executeBatch()分片

(2)JdbcTemplate TransactionTemplate

(3)insertBatchSomeColumn()分片(mp)

(4)saveBatch() (mp)分片

(5)SqlSession Batch

(6)for循环插入(mybatis)

没加连接参数

18579

20577

6259

36425

97434

160247

加了连接参数

3672

4151

5838

6257

7403

加了连接参数、使用并行流

2570

2716

3438

4813

5594

总结

  1. 设置连接参数 &allowMultiQueries=true,允许客户端按照 sql1;sql2;...sqln 的格式发送到服务器,服务器会逐个执行。

  2. 设置连接参数 &reWriteBatchedInserts=true,客户端的多个SQL一起提交给服务器,服务器最终执行时会将多个insert into t(...) values(...) 优化成 insert into t(…) values (…), (…), (…)。需要配合 PreparedStatement 的 addBatch()、executeBatch() 。

  3. Mp 的 insertBatchSomeColumn 是将多个insert into t(...) values(...) 先优化成 insert into t(…) values (…), (…), (…),再发到服务器,所以加连接参数影响不是很大。

我也放荡不羁爱自由!