1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > 《HikariCP数据库连接池实战》学习笔记(二):获取连接原理

《HikariCP数据库连接池实战》学习笔记(二):获取连接原理

时间:2018-12-14 18:11:23

相关推荐

《HikariCP数据库连接池实战》学习笔记(二):获取连接原理

这篇文章是关于HikariCP源码的一些碎片分析记录。

ConcurrentBag(final IBagStateListener listener)

ConcurrentBag具有无锁设计、ThreadLocal缓存、队列窃取、直接切换优化四大特点。

CopyOnWriteArrayList:负责存放ConcurrentBag中全部用于出借的资源。

ThreadLocal:用于加速线程本地化资源访问。

SynchronousQueue:用于存在资源等待线程时第一手资源交接。这里采用的是公平模式。底层实现是TransferQueue。SynchronousQueue无存储空间,每一个put操作必须等待一个take操作或者poll操作,否则不能添加新的元素。

private final CopyOnWriteArrayList<T> sharedList;private final ThreadLocal<List<Object>> threadList;private final SynchronousQueue<T> handoffQueue;public ConcurrentBag(final IBagStateListener listener){this.listener = listener;this.weakThreadLocals = useWeakThreadLocals();this.handoffQueue = new SynchronousQueue<>(true);this.waiters = new AtomicInteger();this.sharedList = new CopyOnWriteArrayList<>();if (weakThreadLocals) {this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));}else {this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));}}

HikariPool(final HikariConfig config)

HikariPool的有参构造函数做了以下事情:

(1)实例化ConcurrentBag。

(2)初始化HouseKeeping线程池,该线程池用于销毁空闲连接。

(3)初始化添加连接线程池,关闭连接线程池。

public HikariPool(final HikariConfig config){super(config);this.connectionBag = new ConcurrentBag<>(this);this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();checkFailFast();if (config.getMetricsTrackerFactory() != null) {setMetricsTrackerFactory(config.getMetricsTrackerFactory());}else {setMetricRegistry(config.getMetricRegistry());}setHealthCheckRegistry(config.getHealthCheckRegistry());handleMBeans(this, true);ThreadFactory threadFactory = config.getThreadFactory();final int maxPoolSize = config.getMaximumPoolSize();LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize);this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue);this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) {addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));final long startTime = currentTime();while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) {quietlySleep(MILLISECONDS.toMillis(100));}addConnectionExecutor.setCorePoolSize(1);addConnectionExecutor.setMaximumPoolSize(1);}}

HikariDataSource#getConnection

使用双层检查锁初始化数据库连接池。

@Overridepublic Connection getConnection() throws SQLException{if (isClosed()) {throw new SQLException("HikariDataSource " + this + " has been closed.");}if (fastPathPool != null) {return fastPathPool.getConnection();}// See /wiki/Double-checked_locking#Usage_in_JavaHikariPool result = pool;if (result == null) {synchronized (this) {result = pool;if (result == null) {validate();LOGGER.info("{} - Starting...", getPoolName());try {pool = result = new HikariPool(this);this.seal();}catch (PoolInitializationException pie) {if (pie.getCause() instanceof SQLException) {throw (SQLException) pie.getCause();}else {throw pie;}}LOGGER.info("{} - Start completed.", getPoolName());}}}return result.getConnection();}

HikariPool#getConnection()

调用ConnectionBag#connect方法获取一个数据库连接,对连接的有效性进行检查,通过PoolEntry是否被标记为清除状态、当前PoolEntry的存活时间是否超时和当前连接是否是活跃有效的进行判断,如果存在一项检查不通过,就关闭这个链接,再重新从资源池获取连接,直到获取到有效连接为止。

获取到有效连接后,会通过ProxyFactory.getProxyConnection返回代理过的连接。HikariCp利用Java字节码修改类库 Javassist实现动态代理。

HikariPool实现了HikariPoolMXBean接口,用于对外暴露HikariCP连接池相关的监控管理功能;

HikariConfig实现了HikariConfigMXBean接口,用于暴露配置相关的监控管理功能。

public Connection getConnection() throws SQLException{return getConnection(connectionTimeout);}public Connection getConnection(final long hardTimeout) throws SQLException{suspendResumeLock.acquire();final long startTime = currentTime();try {long timeout = hardTimeout;do {PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);if (poolEntry == null) {break; // We timed out... break and throw exception}final long now = currentTime();if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);timeout = hardTimeout - elapsedMillis(startTime);}else {metricsTracker.recordBorrowStats(poolEntry, startTime);return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);}} while (timeout > 0L);metricsTracker.recordBorrowTimeoutStats(startTime);throw createTimeoutException(startTime);}catch (InterruptedException e) {Thread.currentThread().interrupt();throw new SQLException(poolName + " - Interrupted during connection acquisition", e);}finally {suspendResumeLock.release();}}

PoolEntry#createProxyConnection

Connection createProxyConnection(final ProxyLeakTask leakTask, final long now){return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);}

ProxyFactory#getProxyConnection

static ProxyConnection getProxyConnection(final PoolEntry poolEntry, final Connection connection, final FastList<Statement> openStatements, final ProxyLeakTask leakTask, final long now, final boolean isReadOnly, final boolean isAutoCommit){// Body is replaced (injected) by JavassistProxyFactorythrow new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");}

ConcurrentBag#borrow

ConcurentBag采用queue-stealing的机制获取元素:首先从ThreadLocal中获取属于当前线程的元素来避免锁竞争。如果没有可用的元素再从CopyOnWriteArrayList中获取。

private final ThreadLocal<List<Object>> threadList;private final boolean weakThreadLocals;private final AtomicInteger waiters;private final CopyOnWriteArrayList<T> sharedList;private final SynchronousQueue<T> handoffQueue;public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException{// Try the thread-local list firstfinal List<Object> list = threadList.get();for (int i = list.size() - 1; i >= 0; i--) {final Object entry = list.remove(i);@SuppressWarnings("unchecked")final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;if (bagEntry != null && pareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}}// Otherwise, scan the shared list ... then poll the handoff queuefinal int waiting = waiters.incrementAndGet();try {for (T bagEntry : sharedList) {if (pareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {// If we may have stolen another waiter's connection, request another bag add.if (waiting > 1) {listener.addBagItem(waiting - 1);}return bagEntry;}}listener.addBagItem(waiting);timeout = timeUnit.toNanos(timeout);do {final long start = currentTime();final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);if (bagEntry == null || pareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {return bagEntry;}timeout -= elapsedNanos(start);} while (timeout > 10_000);return null;}finally {waiters.decrementAndGet();}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。