背景:服務須要高頻發出GET請求,而後咱們封裝的是 golang 的net/http 庫, 由於開源的好比req 和gorequsts 都是封裝的net/http ,因此咱們仍是選用原生(req 使用不當也會掉坑裏)。咱們的場景是多協程從chan 中取任務,併發get 請求,而後設置超時,設置代理,完了。咱們知道net/http 是自帶了鏈接池的,能自動回收鏈接,可是,發現鏈接暴漲,起了1萬個鏈接。python
該文章後續仍在不斷的更新修改中, 請移步到原文地址http://dmwan.ccgolang
首先,咱們初版的代碼是基於python 的,是沒有鏈接暴漲的問題的,封裝的requests,封裝以下:安全
def fetch(self, url, body, method, proxies=None, header=None): res = None timeout = 4 self.error = '' stream_flag = False if not header: header = {} if not proxies: proxies = {} try: self.set_extra(header) res = self.session.request(method, url, data=body, headers=header, timeout=timeout, proxies=proxies) # to do: self.error variable to logger except requests.exceptions.Timeout: self.error = "fetch faild !!! url:{0} except: connect timeout".format(url) except requests.exceptions.TooManyRedirects: self.error = "fetch faild !!! url:{0} except: redirect more than 3 times".format(url) except requests.exceptions.ConnectionError: self.error = "fetch faild !!! url:{0} except: connect error".format(url) except socket.timeout: self.error = "fetch faild !!! url:{0} except: recv timetout".format(url) except: self.error = "fetch faild !!! url:{0} except: {1}".format(url, traceback.format_exc()) if res is not None and self.error == "": self.logger.info("url: %s, body: %s, method: %s, header: %s, proxy: %s, request success!", url, str(body)[:100], method, header, proxies) self.logger.info("url: %s, resp_header: %s, sock_ip: %s, response success!", url, res.headers, self.get_sock_ip(res)) else: self.logger.warning("url: %s, body: %s, method: %s, header: %s, proxy: %s, error: %s, reuqest failed!", url, str(body)[:100], method, header, proxies, self.error) return res
改用golang後,咱們選擇的是net/http。看net/http 的文檔,最基本的請求,如get,post 可使用以下的方式:session
resp, err := http.Get("http://example.com/") resp, err := http.Post("http://example.com/upload", "image/jpeg", &buf) resp, err := http.PostForm("http://example.com/form",url.Values{"key": {"Value"}, "id": {"123"}})
咱們須要添加超時,代理和設置head 頭,官方推薦的是使用client 方式,以下:數據結構
client := &http.Client{ CheckRedirect: redirectPolicyFunc, Timeout: time.Duration(10)*time.Second,//設置超時 } client.Transport = &http.Transport{ Proxy: http.ProxyURL(proxyUrl), MaxIdleConnsPerHost: 1000, } //設置代理ip resp, err := client.Get("http://example.com") req, err := http.NewRequest("GET", "http://example.com", nil) //設置header req.Header.Add("If-None-Match", `W/"wyzzy"`) resp, err := client.Do(req)
這裏官方文檔指出,client 只須要全局實例化,而後是協程安全的,因此,使用多協程的方式,用共享的client 去發送req 是可行的。 併發
根據官方文檔,和咱們的業務場景,咱們寫出了以下的業務代碼:socket
var client *http.Client //初始化全局client func init (){ client = &http.Client{ Timeout: time.Duration(10)*time.Second, } } type HttpClient struct {} //提供給多協程調用 func (this *HttpClient) Fetch(dstUrl string, method string, proxyHost string, header map[string]string)(*http.Response){ //實例化req req, _ := http.NewRequest(method, dstUrl, nil) //添加header for k, v := range header { req.Header.Add(k, v) } //添加代理ip proxy := "http://" + proxyHost proxyUrl, _ := url.Parse(proxy) client.Transport = &http.Transport{ Proxy: http.ProxyURL(proxyUrl), MaxIdleConnsPerHost: 1000, } resp, err := client.Do(req) return resp, err }
當咱們使用協程池併發開100個 worker 調用Fetch() 的時候,照理說,established 的鏈接應該是100個,可是,我壓測的時候,發現,established 的鏈接塊到一萬個了,net/http的鏈接池根本沒起做用?估計這是哪裏用法不對吧。tcp
使用python的庫併發請求是沒有任何問題的,那這個問題到底出在哪裏?其實若是熟悉golang net/http庫的流程,就很清楚了,問題就處在上面的Transport ,每一個transport 維護了一個鏈接池,咱們代碼中每一個協程都會new 一個transport ,這樣,就會不斷新建鏈接。post
咱們看下transport 的數據結構:fetch
type Transport struct { idleMu sync.Mutex wantIdle bool // user has requested to close all idle conns idleConn map[connectMethodKey][]*persistConn idleConnCh map[connectMethodKey]chan *persistConn reqMu sync.Mutex reqCanceler map[*Request]func() altMu sync.RWMutex altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper //Dial獲取一個tcp 鏈接,也就是net.Conn結構, Dial func(network, addr string) (net.Conn, error) }
結構體中兩個map, 保存的就是不一樣的協議 不一樣的host,到不一樣的請求 的映射。很是明顯,這個結構體應該是和client 同樣全局的。因此,爲了避開使用鏈接池失效,是不能不斷new transport 的!
咱們不斷new transport 的緣由就是爲了設置代理,這裏不能使用這種方式了,那怎麼達到目的?若是知道代理的原理,咱們這裏解決其實很簡單,請求使用ip ,host 帶上域名就ok了。代碼以下:
var client *http.Client func init (){ client = &http.Client{} client.Transport = &http.Transport{ MaxIdleConnsPerHost: 1000, } } type HttpClient struct {} func NewHttpClient()(*HttpClient){ httpClient := HttpClient{} return &httpClient } func (this *HttpClient) replaceUrl(srcUrl string, ip string)(string){ httpPrefix := "http://" parsedUrl, err := url.Parse(srcUrl) if err != nil { return "" } return httpPrefix + ip + parsedUrl.Path } func (this *HttpClient) Fetch(dstUrl string, method string, proxyHost string, header map[string]string, preload bool, timeout int64)(*http.Response, error){ // proxyHost 換掉 url 中請求 newUrl := this.replaceUrl(dstUrl, proxyHost) req, _ := http.NewRequest(method, newUrl, nil) for k, v := range header { req.Header.Add(k, v) } client.Timeout = time.Duration(timeout)*time.Second resp, err := client.Do(req) return resp, err //由調用方close body }
使用header 中加host 的方式後,這裏的tcp 建連數 馬上降低到和協程池數量一致,問題獲得解決。