本文由 Jilen 發表在 ScalaCool 團隊博客。mysql
上一篇文章分析了 Mysql 異步驅動的一些缺點,大部分已經在咱們內部版本中修復了。git
其中分區設計的連接池在實際使用過程當中會產生一些很是嚴重的問題。github
前文中曾經提到 SingleThreadedAsyncObjectPool
這個單線程的鏈接池實現並非徹底非阻塞的,再多個線程請求連接狀況下仍舊會產生鎖阻塞。
同時文章中也提到 Play!Framework 這樣的框架主線程數能夠很是少,因此不用過度擔心。sql
事實證實這是錯誤的,由於 PartitionedAsyncObjectPool
默認使用了 Executors.newCachedThreadPool
, 這就致使不論主線程數多少,高併發狀況下會建立大量線程同時去獲取連接。
而 SingleThreadedAsyncObjectPool
使用了 Executors.newFixedThreadPool
,顯然這意味着每次入隊都會產生一個鎖阻塞,在系統併發很是高的狀況下,這會極大加重鎖競爭,一旦得到鎖線程被中斷,則全部的線程都會處於併發
驅動中默認狀況下,存在多個 ExecutionContext
,憑空增長了內存消耗和上下文切換框架
在實際使用過程當中,咱們經歷了運行一段時間後 JVM 瘋狂 FGC 的狀況。
經分析發現存在連接泄漏,鏈接池存在大量未被回收的 MySQLConnection
對象,而且很是詭異的是咱們沒法定位究竟是誰持有了這些未釋放的 Connection。異步
考慮到上述問題,我開始着手設計一個全新的連接池,名字就叫 NewPool
async
這種全新鏈接池實現主要依賴如下設計高併發
ConcurrentLinkedQueue
保存等待列表和空閒連接,全程不存在鎖Semaphore
保證鏈接數和隊列長度不超過限制主要代碼以下(部分)post
val conns: ConcurrentLinkedQueue[Future[Connection]] = ...
val queue: ConcurrentLinkedQueue[Promise[Connection]] = ...
val createSemaphore: Semaphore = ...
val queueSemaphore: Semaphore = ...
def withConnection[A](f: Connection => Future[A]): Future[A] = {
val c = acquire()
c.flatMap { cc =>
f(cc).andThen { //此處可能須要 try catch 處理不按套路拋出異常的狀況
case _ => release(c)
}
}
}
private def acquire(): Future[Connection] = {
val conn = conns.poll()
if (conn != null) { //有空餘連接,則返回這個連接
reconnectIfDead(conn)
} else if (createSemaphore.tryAcquire()) { //鏈接數少於最大連接數,建立一個
createNew()
} else if (queueSemaphore.tryAcquire()) { //隊列未滿,入隊
val p = Promise[Connection]
enqueueTask(p)
p.future
} else { //返回隊列已滿
Future.failed(QueueIsFull)
}
}
private def release(c: Future[Connection]) = {
val wait = queue.poll()
if (wait == null) {
conns.offer(c)
} else {
wait.completeWith(c)
queueSemaphore.release()
}
}複製代碼
Semaphore
的 trypAcquire 操做和 ConcurrentLinkedQueue
都不會產生鎖,確實作到了 Lock-Free。
爲了驗證上述猜想,我基於 scalameter 作了簡單的性能測試。結果以下
新的方案(圖中藍色線條)對很是簡單的查詢,仍舊有 100% 左右的性能提高
執行 SQL 以下
for {
u <- c.sendQuery(s"SELECT * FROM user WHERE id = ${id}")
r <- c.sendQuery(s"UPDATE user SET remain = remain + 100 WHERE id = ${id}")
} yield r複製代碼
能夠看到新方案(圖中綠色線條)有很是大幅度提高