最近工做比較忙,此外花了時間看雜書,文章寫的比較少。html
本篇文章分享一個工做中遇到的小問題。python
要完成一個開發任務,讀取一個具備80w條數據的CSV文件,將其入庫,由於CSV文件中缺乏2個關鍵數據,須要經過調用內部Web API接口的形式去得到,2個參數分別請求兩個URL,每次請求參數不一樣。mysql
先不考慮Web API方面的內容,80w條數據,如何快速的入庫MySQL?sql
一個直觀的想法就是將多條INSERT語句合併成一條INSERT執行,合併成一條SQL後,會減小MySQL的日誌,從而下降日誌的數據量與使用磁盤的評率,從而提升效率,此外合併SQL語句後,能夠減小SQL語句的解析書以及減小網絡傳輸IO(MYSQL C/S模式)緩存
查閱資料,主要閱讀了「MySQL 批量 SQL 插入性能優化」這篇文章,文中給出了相應的測試效果,這裏分享一下。性能優化
多條插入數據服務器
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('0', 'userid_0', 'content_0', 0);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('1', 'userid_1', 'content_1', 1);
複製代碼
合併成一條後網絡
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('0', 'userid_0', 'content_0', 0), ('1', 'userid_1', 'content_1', 1);
複製代碼
文中提供一些測試對比數據,分別是進行單條數據的導入與轉化成一條 SQL 語句進行導入,分別測試 1 百、1 千、1 萬條數據記錄。session
此外文中提出,使用事務能夠提升數據的插入效率,其緣由是由於MySQL在進行INSERT操做時內部會創建一個事務,在事務內才進行真正的插入處理,在使用INSERT語句時,直接使用事務能夠減小屢次建立事務的消耗。併發
使用事務的修改以下。
START TRANSACTION;
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('0', 'userid_0', 'content_0', 0);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('1', 'userid_1', 'content_1', 1);
...
COMMIT;
複製代碼
文中一樣給出了測試對比,分別是不使用事務與使用事務在記錄數爲 1 百、1 千、1 萬的狀況。
此外,還須要注意,插入數據時,其索引是有序的會比無序索引快那麼一些。
數據有序的插入是指插入記錄在主鍵上是有序排列。
例如 datetime 是記錄的主鍵。
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('1', 'userid_1', 'content_1', 1);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('0', 'userid_0', 'content_0', 0);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('2', 'userid_2', 'content_2',2);
複製代碼
從上面sql能夠看出,datetime是記錄主鍵,但倒是無序的。
將其修改爲。
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('0', 'userid_0', 'content_0', 0);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('1', 'userid_1', 'content_1', 1);
INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`)
VALUES ('2', 'userid_2', 'content_2',2);
複製代碼
MySQL在進行入庫操做時,須要維護索引數據,插入時,索引數據無序會增大維護索引的成本,由於MySQL索引使用的結構是B+樹,這個樹本文就不討論先,留點素材給將來的本身。
下面是隨機數據與順序數據的性能對比,分別是記錄爲 1 百、1 千、1 萬、10 萬、100 萬。
將上面提到的三種方式合起來使用,可讓INSERT語句執行效果大幅提升。
引用「MySQL 批量 SQL 插入性能優化」結論
從測試結果能夠看到,合併數據 + 事務的方法在較小數據量時,性能提升是很明顯的,數據量較大時(1 千萬以上),性能會急劇降低,這是因爲此時數據量超過了 innodb_buffer 的容量,每次定位索引涉及較多的磁盤讀寫操做,性能降低較快。而使用合併數據 + 事務 + 有序數據的方式在數據量達到千萬級以上表現依舊是良好,在數據量較大時,有序數據索引定位較爲方便,不須要頻繁對磁盤進行讀寫操做,因此能夠維持較高的性能。
由於我插入的數據其主鍵索引自己就是無序的,因此使用了「合併數據 + 事務」的方法,但在具體實踐時,仍是遇到了「(2006, "MySQL server has gone away (BrokenPipeError(32, 'Broken pipe'))")」這個問題。
閱讀MySQL相關文檔與其餘資料,出現這個問題有3個可能緣由。
1.max_allowed_packet值過小,在MySQL中max_allowed_packet默認爲4M,即插入數據其大小不能超過4M,我要作的就是將其設置成更大的值。
// 查看 max_allowed_packet
>show VARIABLES like '%max_allowed_packet%';
// 將修改爲 100M
>set global max_allowed_packet = 1024*1024*100;
複製代碼
這種修改只會臨時生效,MySQL重啓後,依舊會變爲4M,想要長期生效,須要修改「my.ini」。
2.wait_timeout過小,MySQL連接長時間沒有新請求,就被Server端關閉了,對於一些ORM庫而言,這個過程是透明的,此時還在使用被Server端關閉的連接來進行SQL操做,就會出現上述錯誤,而我要作的就是將其設置爲更大的值。
>show global variables like '%timeout';
// MySQL無操做28800秒後會被自動化關閉
> set global wait_time = 28800;
複製代碼
但這不是長久之策,由於長時間不操做,MySQL Server端依舊會將其關閉,這個問題依舊會出現,爲了不這個問題,你須要本身關閉連接,對於一些MySQL操做量不大的情景,建議使用短鏈接的形式,若是依舊須要用MySQL鏈接池,以長鏈接的方式來操做MySQL,就須要實現判斷當前連接是否存活的邏輯並在不存活的狀況下自動重連。
若是使用的是pymysql,那麼能夠經過ping()方法來進行重連,其源碼以下。
def ping(self, reconnect=True):
"""Check if the server is alive"""
if self._sock is None:
if reconnect:
self.connect()
reconnect = False
else:
raise err.Error("Already closed")
try:
self._execute_command(COMMAND.COM_PING, "")
return self._read_ok_packet()
except Exception:
if reconnect:
self.connect()
return self.ping(False)
else:
raise
複製代碼
其實就是先判斷當前連接是否存活,不存在就經過connect()方法再連接一次。
此外能夠利用「try...except...」,當使用當前MySQL連接執行SQL時,若是報錯,直接執行except中的邏輯,在except中只需將當前連接關閉,而後再獲取新連接,而後再執行SQL則可。
3.此外執行大量數據的INSERT或REPLACE也可能會致使此類錯誤(這纔是我遇到這個錯誤的緣由),要作的就是下降單詞INSERT的數據行數則可,我本來一次性插入10000條,將其改爲5000條後,這個問題就沒有出現了。
這個緣由能夠從其開發官方查閱到,地址爲:dev.mysql.com/doc/refman/…
支持,80w條數據高效插入的問題就解決了
MySQL入庫方面的問題解決後,接着就來考慮一下requests問題,要實現23條請求,併發是必須的,鍵盤啪啪啪兩三下,利用線程池(ThreadPoolExecutor)的併發請求邏輯就實現好了,一個經驗是,將參數處理後,其餘的事情就不用多想了,經過ThreadPoolExecutor.map()方法,輕鬆實現併發。
我覺得事情就這樣結束了,但程序正常運行一段時間後,出現了大量的「Cannot assign requested address」報錯。
這是由於客戶端短期內頻繁的請求服務器,每次請求連接都在很短的時間內結束,從而致使不少TIME_WAIT,操做系統的端口號等資源被迅速用光,新的請求連接沒辦法得到新的端口號等資源,就拋出Cannot assign requested address。
能夠經過netstat -an | grep TIME_WAIT
驗證一下。
理解了問題所在,解決起來就很簡單了,使用requests鏈接池就行了,實現端口號等資源的複用。
須要注意,這裏請求的Web API是相同的2個API,因此能夠經過創建長鏈接的方式複用資源,若是每次請求的host、IP等都不一樣,這種方式是沒有什麼效果的。
requests其實已經考慮到了鏈接池的狀況,簡單使用一下則可。
@staticmethod
def get_http_session(pool_connections=1, pool_maxsize=10, max_retries=3):
''' http鏈接池 pool_connections 要緩存的 urllib3 鏈接池的數量。 pool_maxsize 要保存在池中的最大鏈接數。 max_retries 每一個鏈接的最大重試次數 '''
session =requests.session()
# 建立適配器
adapter = requests.adapters.HTTPAdapter(pool_connections=pool_connections,
pool_maxsize=pool_maxsize, max_retries=max_retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
複製代碼
至此,這個東西就正常跑起來了。
你們在工做中有沒有遇到什麼有意思的問題?
若是本文對你有所啓發,記得點擊「在看」支持二兩。