Go組件學習——手寫鏈接池並無那麼簡單

一、背景

前段時間在看gorm,發現gorm是複用database/sql的鏈接池。git

因而翻了下database/sql的數據庫鏈接池的代碼實現,看完代碼,好像也不是很複雜,可是總以爲理解不夠深入,因而萌生了本身想寫個鏈接池的想法。(最後也驗證了,看源碼的理解確實不夠深入,一看就會,一作就跪)github

二、鏈接池的實現原理

什麼是鏈接池sql

  • 顧名思義是一個池子
  • 池子裏面存放有限數量即時可用的鏈接,減小建立鏈接和關閉鏈接的時間
  • 鏈接是有存活時間的

具體到數據庫鏈接池,我根據本身的理解畫了一張獲取鏈接的流程圖數據庫

從上圖咱們能夠看出,除了鏈接池的容量大小,咱們還有一個最大鏈接數的限制。池子裏的鏈接讓咱們不用頻繁的建立和關閉鏈接,同時應該也要有最大鏈接的限制,避免無限制的建立鏈接致使服務器資源耗盡,拖垮服務不可用。緩存

池子中的鏈接也有存活時間,若是超過存活時間則會銷燬鏈接。安全

三、實現鏈接池咱們須要考慮哪些問題

3.1 功能點

  • 獲取鏈接bash

  • 釋放鏈接服務器

  • Ping併發

  • 關閉鏈接池函數

  • 設置最大鏈接數和鏈接池容量(鏈接存活時間等等)

3.2 實現細節

  • 鏈接應該有哪些屬性,好比最大鏈接數、鏈接池容量、鏈接建立時間和存活時間
  • 如何模擬使用鏈接池以及超過最大鏈接數後等待其餘鏈接釋放
  • 如何保證在多協程操做下數據的一致性
  • 若是實現鏈接的超時監聽和通知

四、具體實現

這裏的鏈接池實現包括

  • 設置最大鏈接數和鏈接池容量
  • 獲取鏈接
  • 釋放鏈接

4.1 結構定義

定義Conn結構體,這裏包含了幾乎全部的有關鏈接須要的信息屬性

type Conn struct {
	maxConn       int                     // 最大鏈接數
	maxIdle       int                     // 最大可用鏈接數
	freeConn      int                     // 線程池空閒鏈接數
	connPool      []int                   // 鏈接池
	openCount     int                     // 已經打開的鏈接數
	waitConn      map[int]chan Permission // 排隊等待的鏈接隊列
	waitCount     int                     // 等待個數
	lock          sync.Mutex              // 鎖
	nextConnIndex NextConnIndex						// 下一個鏈接的ID標識(用於區分每一個ID)
	freeConns     map[int]Permission 			// 鏈接池的鏈接 
}
複製代碼

這裏並不會建立一個真正的數據庫鏈接,而是使用一個非空的Permission表示拿到了鏈接。拿到一個非空的Permission纔有資格執行後面相似增刪改查的操做。

Permission對應的結構體以下

type Permission struct {
	NextConnIndex								 // 對應Conn中的NextConnIndex
	Content     string					 // 通行證的具體內容,好比"PASSED"表示成功獲取
	CreatedAt   time.Time				 // 建立時間,即鏈接的建立時間
	MaxLifeTime time.Duration    // 鏈接的存活時間,本次沒有用到這個屬性,保留
}
複製代碼

NextConnIndex對應的結構體以下

type NextConnIndex struct {
	Index int
}
複製代碼

還有一個用來設置最大鏈接數以及鏈接池最大鏈接數的Config

type Config struct {
	MaxConn int
	MaxIdle int
}
複製代碼

4.2 初始化鏈接池參數

func Prepare(ctx context.Context, config *Config) (conn *Conn) {
	// go func() {
		//for {
		//conn.expiredCh = make(chan string, len(conn.freeConns))
		//for _, value := range conn.freeConns {
		// if value.CreatedAt.Add(value.MaxLifeTime).Before(nowFunc()) {
		// conn.expiredCh <- "CLOSE"
		// }
		//}
	// }()
	return &Conn{
		maxConn:   config.MaxConn,
		maxIdle:   config.MaxIdle,
		openCount: 0,
		connPool:  []int{},
		waitConn:  make(map[int]chan Permission),
		waitCount: 0,
		freeConns: make(map[int]Permission),
	}
}
複製代碼

這裏主要是初始化上面的Conn結構體參數。

註釋的部分,主要想經過啓動一個監聽協程,用於監聽已通過期的鏈接,並經過channel發送。(這塊還有一些細節沒有想清楚,先擱置)

4.3 設置MaxConn和MaxIdle

在main.go中添加代碼

ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
複製代碼

這裏意味鏈接池只能緩存一個鏈接,最大新建鏈接數爲2,超過則要加入等待隊列。

4.4 獲取鏈接

// 建立鏈接
func (conn *Conn) New(ctx context.Context) (permission Permission, err error) {
	/** 一、若是當前鏈接池已滿,即len(freeConns)=0 二、斷定openConn是否大於maxConn,若是大於,則丟棄獲取加入隊列進行等待 三、若是小於,則考慮建立新鏈接 */
	conn.lock.Lock()

	select {
	default:
	case <-ctx.Done():	// context取消或超時,則退出
		conn.lock.Unlock()

		return Permission{}, errors.New("new conn failed, context cancelled!")
	}

  // 鏈接池不爲空,從鏈接池獲取鏈接
	if len(conn.freeConns) > 0 {
		var (
			popPermission Permission
			popReqKey     int
		)
    
    // 獲取其中一個鏈接
		for popReqKey, popPermission = range conn.freeConns {
			break
		}
    // 從鏈接池刪除
		delete(conn.freeConns, popReqKey)
		fmt.Println("log", "use free conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
			conn.lock.Unlock()
		return popPermission, nil
	}

	if conn.openCount >= conn.maxConn { // 當前鏈接數大於上限,則加入等待隊列
		nextConnIndex := getNextConnIndex(conn)

		req := make(chan Permission, 1)
		conn.waitConn[nextConnIndex] = req
		conn.waitCount++
		conn.lock.Unlock()

		select {
      // 若是在等待指定超時時間後,仍然沒法獲取釋放鏈接,則放棄獲取鏈接,這裏若是不在超時時間後退出會一直阻塞
		case <-time.After(time.Second * time.Duration(3)):
			fmt.Println("超時,通知主線程退出")
			return
		case ret, ok := <-req: // 有放回的鏈接, 直接拿來用
			if !ok {
				return Permission{}, errors.New("new conn failed, no available conn release")
			}
			fmt.Println("log", "received released conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
			return ret, nil
		}
		return Permission{}, errors.New("new conn failed")
	}

	// 新建鏈接
	conn.openCount++
	conn.lock.Unlock()
	permission = Permission{NextConnIndex: NextConnIndex{nextConnIndex},
		Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
	fmt.Println("log", "create conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)
	return permission, nil
}
複製代碼

這裏主要分爲三個部分

  • 若是鏈接池不爲空,則直接從池子裏面獲取鏈接使用便可

  • 若是鏈接池爲空,且當前的鏈接數已經超過最大鏈接數maxConn,則會將當前任務加入等待隊列,同時監聽是否有釋放的可用鏈接,若是有則拿來直接用,若是超過指定等待時間後仍然取不到鏈接則退出阻塞返回。

  • 若是鏈接池爲空,且還沒有達到最大鏈接數maxConn,則新建一個新鏈接。

getNextConnIndex函數

func getNextConnIndex(conn *Conn) int {
	currentIndex := conn.nextConnIndex.Index
	conn.nextConnIndex.Index = currentIndex + 1
	return conn.nextConnIndex.Index
}
複製代碼

4.5 釋放鏈接

// 釋放鏈接
func (conn *Conn) Release(ctx context.Context) (result bool, err error) {
	conn.lock.Lock()
  // 若是等待隊列有等待任務,則通知正在阻塞等待獲取鏈接的進程(即New方法中"<-req"邏輯)
  // 這裏沒有作指定鏈接的釋放,只是保證釋放的鏈接會被利用起來
	if len(conn.waitConn) > 0 {
		var req chan Permission
		var reqKey int
		for reqKey, req = range conn.waitConn {
			break
		}
		// 假定釋放的鏈接就是下面新建的鏈接
		permission := Permission{NextConnIndex: NextConnIndex{reqKey},
			Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
		req <- permission
		conn.waitCount--
		delete(conn.waitConn, reqKey)
		conn.lock.Unlock()
	} else {
		if conn.openCount > 0 {
			conn.openCount--
      
			if len(conn.freeConns) < conn.maxIdle {	// 確保鏈接池大小不會超過maxIdle
				nextConnIndex := getNextConnIndex(conn)
				permission := Permission{NextConnIndex: NextConnIndex{nextConnIndex},
					Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}
				conn.freeConns[nextConnIndex] = permission
			}
		}
		conn.lock.Unlock()
	}
	return
}
複製代碼

這裏主要分爲兩部分

  • 若是釋放鏈接的時候發現等待隊列有任務在等待,則將釋放的鏈接經過channel發送,給正在等待鏈接釋放的阻塞任務使用,同時從等待隊列中刪除該任務。
  • 若是當前無等待任務,則將鏈接放入鏈接池

這裏的nowFunc

var nowFunc = time.Now
複製代碼

五、Case模擬

5.1 無釋放建立鏈接

即只有建立鏈接,拿到鏈接也不會釋放鏈接

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
}
複製代碼

執行結果以下

注意上面代碼都是一直在獲取鏈接,在獲取鏈接後沒有釋放鏈接。

第一次獲取,鏈接池爲空,則新建鏈接

第二次獲取,鏈接池爲空,繼續新建鏈接

第三次獲取,鏈接池爲空,同時已有鏈接數>=maxConn,因此會阻塞等待釋放鏈接,可是由於沒有鏈接釋放,因此一直等待,直到3秒超時後退出。

因此第三次、第四次和第五次都是超時退出

5.2 釋放鏈接

若是咱們釋放鏈接會怎麼樣,咱們能夠經過新啓一個協程用於釋放一個鏈接以下

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
	if _, err := conn.New(ctx); err != nil {
		return
	}
}

複製代碼

執行結果以下

log create conn!!!!! openCount:  1  freeConns:  map[]
log create conn!!!!! openCount:  2  freeConns:  map[]
log received released conn!!!!! openCount:  2  freeConns:  map[]
超時,通知主線程退出
超時,通知主線程退出
複製代碼

前兩次和上面同樣,可是第三次獲取的時候,會收到一個釋放的鏈接,因此能夠直接複用釋放的鏈接返回。

可是第四次和第五次建立,由於沒有釋放的鏈接,因此都會由於等待超時後退出。

5.3 使用鏈接池

上面的兩個case是在MaxConn=2,MaxIdle=1的狀況下執行的。

下面咱們看看若是基於以上兩個參數設定,模擬出正好使用鏈接池的狀況。

package main

import (
	"context"
	custom_pool "go-demo/main/src/custom-pool"
)

func main() {

	ctx := context.Background()
	config := &custom_pool.Config{
		MaxConn: 2,
		MaxIdle: 1,
	}
	conn := custom_pool.Prepare(ctx, config)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
	go conn.Release(ctx)
	if _, err := conn.New(ctx); err != nil {
		return
	}
}
複製代碼

即除了第一次,後面都會有鏈接釋放。

執行結果可能狀況以下

log create conn!!!!! openCount:  1  freeConns:  map[]
log create conn!!!!! openCount:  2  freeConns:  map[]
log use free conn!!!!! openCount:  1  freeConns:  map[]
log use free conn!!!!! openCount:  0  freeConns:  map[]
log create conn!!!!! openCount:  1  freeConns:  map[]
複製代碼

從執行結果能夠看出,這裏有兩次使用了鏈接池中的鏈接。

注意:由於釋放是新啓協程執行,因此沒法保證執行順序,不一樣的執行順序,會有不一樣的執行結果。上面只是執行結果的一種。

以上完整代碼參見https://github.com/DMinerJackie/go-demo/tree/master/main/src/custom-pool

六、總結和展望

6.1 總結

  • 經過手寫鏈接池加深對於鏈接池實現的理解
  • 學會使用channel和協程
  • 學會如何在channel阻塞指定時間後退出(設立超時時間)
  • 學會對於共享資源加鎖,好比nextConnIndex的獲取和更新須要加鎖

6.2 展望

  • Close和Ping沒有寫(實現不難)
  • 鏈接池鏈接須要有存活時間,並在鏈接過時的時候從鏈接池刪除
  • 實現使用的是普通的map集合,能夠考慮併發安全的syncMap
  • 代碼實現比較簡陋不夠優雅,能夠繼續完善保證職責單一

我的公衆號JackieZheng,歡迎關注~

相關文章
相關標籤/搜索