MySQL 異步驅動淺析 (三):鏈接池改進方案

本文由 Jilen 發表在 ScalaCool 團隊博客。mysql

上一篇文章分析了 Mysql 異步驅動的一些缺點,大部分已經在咱們內部版本中修復了。git

其中分區設計的連接池在實際使用過程當中會產生一些很是嚴重的問題。github

鏈接池中的鎖阻塞

Mysql Async Pool
Mysql Async Pool

前文中曾經提到 SingleThreadedAsyncObjectPool 這個單線程的鏈接池實現並非徹底非阻塞的,再多個線程請求連接狀況下仍舊會產生鎖阻塞。
同時文章中也提到 Play!Framework 這樣的框架主線程數能夠很是少,因此不用過度擔心。sql

事實證實這是錯誤的,由於 PartitionedAsyncObjectPool 默認使用了 Executors.newCachedThreadPool, 這就致使不論主線程數多少,高併發狀況下會建立大量線程同時去獲取連接。
SingleThreadedAsyncObjectPool 使用了 Executors.newFixedThreadPool,顯然這意味着每次入隊都會產生一個鎖阻塞,在系統併發很是高的狀況下,這會極大加重鎖競爭,一旦得到鎖線程被中斷,則全部的線程都會處於併發

頻繁的線程切換

驅動中默認狀況下,存在多個 ExecutionContext,憑空增長了內存消耗和上下文切換框架

難以定位的內存泄漏

在實際使用過程當中,咱們經歷了運行一段時間後 JVM 瘋狂 FGC 的狀況。
經分析發現存在連接泄漏,鏈接池存在大量未被回收的 MySQLConnection 對象,而且很是詭異的是咱們沒法定位究竟是誰持有了這些未釋放的 Connection。異步

考慮到上述問題,我開始着手設計一個全新的連接池,名字就叫 NewPoolasync

設計一個徹底無鎖無阻塞的鏈接池

這種全新鏈接池實現主要依賴如下設計高併發

  • 使用兩個 ConcurrentLinkedQueue 保存等待列表和空閒連接,全程不存在鎖
  • 使用 Semaphore 保證鏈接數和隊列長度不超過限制

主要代碼以下(部分)post

val conns: ConcurrentLinkedQueue[Future[Connection]] = ...
val queue: ConcurrentLinkedQueue[Promise[Connection]] = ...
val createSemaphore: Semaphore = ...
val queueSemaphore: Semaphore = ...

def withConnection[A](f: Connection => Future[A]): Future[A] = {
    val c = acquire()
    c.flatMap { cc =>
      f(cc).andThen { //此處可能須要 try catch 處理不按套路拋出異常的狀況
        case _ => release(c)
      }
    }
  }

  private def acquire(): Future[Connection] = {
    val conn = conns.poll()
    if (conn != null) { //有空餘連接,則返回這個連接
      reconnectIfDead(conn)
    } else if (createSemaphore.tryAcquire()) { //鏈接數少於最大連接數,建立一個
      createNew()
    } else if (queueSemaphore.tryAcquire()) { //隊列未滿,入隊
      val p = Promise[Connection]
      enqueueTask(p)
      p.future
    } else { //返回隊列已滿
      Future.failed(QueueIsFull)
    }
  }


  private def release(c: Future[Connection]) = {
    val wait = queue.poll()
    if (wait == null) {
      conns.offer(c)
    } else {
      wait.completeWith(c)
      queueSemaphore.release()
    }
  }複製代碼

Semaphore 的 trypAcquire 操做和 ConcurrentLinkedQueue 都不會產生鎖,確實作到了 Lock-Free。

性能測試

爲了驗證上述猜想,我基於 scalameter 作了簡單的性能測試。結果以下

簡單查詢(SELECT 1)

新的方案(圖中藍色線條)對很是簡單的查詢,仍舊有 100% 左右的性能提高

select performance
select performance

簡單事務(SELECT + UPDATE)

執行 SQL 以下

for {
  u <- c.sendQuery(s"SELECT * FROM user WHERE id = ${id}")
  r <- c.sendQuery(s"UPDATE user SET remain = remain + 100 WHERE id = ${id}")
} yield r複製代碼

能夠看到新方案(圖中綠色線條)有很是大幅度提高

transaction performance
transaction performance
相關文章
相關標籤/搜索