Mybatis源码-datasource(数据源)总结

这个包主要功能的如何获取到数据源对象, 间接获取Connection(连接对象)来操作数据库

1、获取DataSource方式有两种

1.1. 通过jndi的(InitialContext上下文)获取,jndi的lookup方法,从某个地方获取配置生成一个DataSource
1.2. 通过java代码,传入datasource需要参数,比如用户名、密码、驱动类路径等等

2、这个包一个关系的简图

关系图

3、PooledConnection类解析(部分核心代码)

package org.apache.ibatis.datasource.pooled;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;

import org.apache.ibatis.reflection.ExceptionUtil;

/**
 * 连接池
 * 1、连接池数据源、真实连接和代理连接对象
 * 2、还有时间相关,比如上一次使用连接对象的时间
 * 3、实现InvocationHandler的invoke方法,这个方法就是在调用真实方法之前调用该方法,在创建代理对象时候将代理对象与InvocationHandler
 * 进行关联,相当于类成员变量proxyConnection,代理对象关联本类的invoke方法,主要是为判断是否执行是close方法,执行
 * close方法需要进行额外的操作
 * @author Clinton Begin
 */
class PooledConnection implements InvocationHandler {

  /**
   * 关闭
   */
  private static final String CLOSE = "close";

  /**
   * 连接类,创建代理对象用改的
   */
  private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };

  /**
   * hashCode
   */
  private final int hashCode;

  /**
   * 连接池数据源
   */
  private final PooledDataSource dataSource;

  /**
   * 真实连接
   */
  private final Connection realConnection;
  /**
   * 代理连接
   */
  private final Connection proxyConnection;

  /**
   * 检出时间戳
   */
  private long checkoutTimestamp;
  /**
   * 创建的时间戳
   */
  private long createdTimestamp;
  /**
   * 最后使用的时间戳
   */
  private long lastUsedTimestamp;

  /**
   * 这个主要用于区分唯一性, 用户名+密码+url生成hashCode 确定唯一性
   */
  private int connectionTypeCode;

  /**
   * 是否有效 valid?
   */
  private boolean valid;

  /**
   * 使用连接对象和连接池数据源对象
   * Constructor for SimplePooledConnection that uses the Connection and PooledDataSource passed in.
   *
   * @param connection - the connection that is to be presented as a pooled connection
   * @param dataSource - the dataSource that the connection is from
   */
  public PooledConnection(Connection connection, PooledDataSource dataSource) {
    //连接hashCode
    //连接对象
    //数据源
    //创建时间戳
    //最后使用时间戳
    //初始化有效
    //代理连接对象创建在执行proxyConnection的方法将会调用 当前invoke的方法
    this.hashCode = connection.hashCode();
    this.realConnection = connection;
    this.dataSource = dataSource;
    this.createdTimestamp = System.currentTimeMillis();
    this.lastUsedTimestamp = System.currentTimeMillis();
    this.valid = true;
    this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
  }

 /**
   * Required for InvocationHandler implementation.
   * 实现InvocationHandler的实现方法, 代理对象是为了,判断方法是不是close方法
   * @param proxy  - not used
   * @param method - the method to be executed
   * @param args   - the parameters to be passed to the method
   * @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])
   */
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    //获取Connection调用的方法的名称是不是close方法
    //是需要关闭连接操作
    String methodName = method.getName();
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      dataSource.pushConnection(this);
      return null;
    }
    try {
      if (!Object.class.equals(method.getDeclaringClass())) {
        // issue #579 toString() should never fail
        // throw an SQLException instead of a Runtime
        checkConnection();
      }
      return method.invoke(realConnection, args);
    } catch (Throwable t) {
      throw ExceptionUtil.unwrapThrowable(t);
    }

  }

}

3.1 总结

  1. 这里关键地方应该是它创建一个代理Connection对象,主要拦截close方法,将用完连接对象放回池中

4、PooledDataSource

/**
 * 这个是一个简单、同步,线程安全数据连接池
 * This is a simple, synchronous, thread-safe database connection pool.
 * 1、PooledDataSource和UnpooledDataSource的区别主要前面比后面多一个方法, 从线程中取连接对象,popConnection, 连接集合对象的状态
 * 2、修改配置信息需要强制清空激活连接数集合和空闲连接对象集合
 * 3、调用connection close的方法时候,它将用完的连接对象放回连接池,但是这个connection对象是不变,但是外面包PooledConnection变成最新的
 * 4、完成线程池的功能主要利用PoolState类的中两个集合,活动连接集合和空闲连接集合, 包括里面一些统计操作
 * @author Clinton Begin
 */
public class PooledDataSource implements DataSource {

  /**
   * 获取日志对象
   */
  private static final Log log = LogFactory.getLog(PooledDataSource.class);

  /**
   * 根据当前对象创建连接池状态
   */
  private final PoolState state = new PoolState(this);

  /**
   * 非连接池数据源
   */
  private final UnpooledDataSource dataSource;

  // OPTIONAL CONFIGURATION FIELDS
  /**
   * 连接池活动连接对象默认是10
   */
  protected int poolMaximumActiveConnections = 10;
  /**
   * 空闲连接对象是5
   */
  protected int poolMaximumIdleConnections = 5;
  /**
   * 连接池的检出的时间是2s
   */
  protected int poolMaximumCheckoutTime = 20000;

  /**
   * 连接池超时时间为2s
   */
  protected int poolTimeToWait = 20000;

  /**
   * 可以容忍最大本地连接失败是3次
   */
  protected int poolMaximumLocalBadConnectionTolerance = 3;

  /**
   * 连接池ping查询
   */
  protected String poolPingQuery = "NO PING QUERY SET";
  /**
   * 开始连接池ping查询
   */
  protected boolean poolPingEnabled;

  /**
   * 连接对象没有被使用之后,需要ping 数据库的时间间隔
   *
   */
  protected int poolPingConnectionsNotUsedFor;

  /**
   * 希望连接类型code,其实就是url+username+password 提取hashCode
   */
  private int expectedConnectionTypeCode;

  /**
   * 创建一个无参函数
   * PooledDataSource与UnpooledDataSource区别?
   *
   */
  public PooledDataSource() {
    dataSource = new UnpooledDataSource();
  }

  /**
   * 有一个参数的构造函数,UnpooledDataSource
   * @param dataSource
   */
  public PooledDataSource(UnpooledDataSource dataSource) {
    this.dataSource = dataSource;
  }
   /**
   * 将用完的连接对象放回连接池
   * @param conn
   * @throws SQLException
   */
  protected void pushConnection(PooledConnection conn) throws SQLException {

    synchronized (state) {
      //从当前活动线程取出该连接对象
      //如果有效
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        //当前空闲连接对象的大小小于最大限制空闲连接数,且是当前用户名和密码登录连接池
        //统计累积检出时间
        //如果连接不是自动提交将手动调用rollback()方法
        //重新创建连接对象,并将它加入到空闲集合中
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          state.idleConnections.add(newConn);
          //设置当前时间戳为上一个连接的时间创建时间戳
          //最后使用的时间戳也是修改一下(其实应该说重新用PooledConnection 包一下老的Connection)
          //设置老connection无效
          //唤醒其他线程
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          state.notifyAll();
        } else {
          //空闲线程已经够了
          // 累积线程检出时间
          // 如果连接不是自动提交,就手动提交回滚
          //关闭连接对象
          //设置无效连接对象
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          conn.invalidate();
        }
      } else {
        //不是有效连接对象被遗弃
        //同时统计无效连接数据
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
  }

  /**
   * 从数据库中拿出一个连接代理Connection对象
   * @param username 用户名
   * @param password 密码
   * @return
   * @throws SQLException
   */
  private PooledConnection popConnection(String username, String password) throws SQLException {
    //计数等待
    //定义变量conn
    //本地失败连接对象计数
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;
    //从连接池获取连接对象
    //同步state对象
    while (conn == null) {
      synchronized (state) {
        //说明有空闲连接池对象
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection, 移除第一个索引位置连接对象,返回
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // Pool does not have available connection 没有空闲对象
          // 判断当前活动线程是否大于最大活动线程,没有则可以创建
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // Cannot create new connection
            // 不能创建新的信息(已经是最大活动连接数)获取活动连接数激活时间最长的连接对象
            //获取检出数据时间
            // 当前时间大于最大检出时间,认为过期(也就是这个线程有问题)
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // Can claim overdue connection
              // 声明连接过期连接数++
              // 累积检出过期连接对象
              // 累积检出时间
              // 移除当前老的对象
              // 不是自动提交模式,调用对象的回滚操作
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              state.activeConnections.remove(oldestActiveConnection);
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              // 获取连接对象
              // 设置当前时间戳
              // 设置上次使用时间戳
              // 标记无效
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
               //连接池不够用,需要等待了
              // Must wait
              try {
                // 等待计数,需要等待poolTimeToWait(2s)时间
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          // ping to server and check the connection is valid or not
          // 判断连接对象是否有效
          if (conn.isValid()) {
            //如果不是自动提交模式,调用回滚
            //
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            //设置连接hashcode
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            //设置检出时间
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            //设置上一次使用时间
            //添加到激活连对象中
            //请求数++
            //计数累积请求时间
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            // 没有获取连接对象, 异常++
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            //本次失败次数 > 最大空闲数量+最大单次失败次数,抛出异常(默认是9次
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }
    // 为啥conn会为null
    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }

  /**
   * 检查是否可用, ping操作
   * Method to check to see if a connection is still usable
   *
   * @param conn - the connection to check
   * @return True if the connection is still usable
   */
  protected boolean pingConnection(PooledConnection conn) {
    boolean result = true;
    //判断connection本身是否关闭了
    try {
      result = !conn.getRealConnection().isClosed();
    } catch (SQLException e) {
      if (log.isDebugEnabled()) {
        log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
      }
      result = false;
    }
    //未关闭操作
    //是否开启ping操作
    if (result) {
      if (poolPingEnabled) {
        // 这个离上一次使用时间到现在时间段已经大于ping设置间隔时间
        if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
          try {
            if (log.isDebugEnabled()) {
              log.debug("Testing connection " + conn.getRealHashCode() + " ...");
            }
            //获取真实对象
            //执行sql语句,然后关闭
            //如果抛出异常说明ping有问题
            Connection realConn = conn.getRealConnection();
            try (Statement statement = realConn.createStatement()) {
              statement.executeQuery(poolPingQuery).close();
            }
            if (!realConn.getAutoCommit()) {
              realConn.rollback();
            }
            result = true;
            if (log.isDebugEnabled()) {
              log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
            }
          } catch (Exception e) {
            log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
            //关闭连接对象
            try {
              conn.getRealConnection().close();
            } catch (Exception e2) {
              //ignore
            }
            result = false;
            if (log.isDebugEnabled()) {
              log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
            }
          }
        }
      }
    }
    return result;
  }
}

4.1、总结

  1. 这里关键方法就是popConnection,从连接池中取出连接对象,pushConnection将用完连接对象放回连接池,pingConnection判断连接对象是否有效

版权声明:本文为m0_37355951原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>