基於gevent和pymysql實現mysql讀寫的異步非堵塞方案

我們常常使用的mysql庫,MySQL-Python庫是用C寫的,很遺憾它是阻塞的,要實現異步的MySQL驅動必須用Python版本的MySQL驅動!
python

如今社區裏面有兩個純python實現的mysql驅動。一個是 myconnpy  另外一個是PyMysql ~ 這兩個mysql驅動文檔至關的少呀,好在他們的用法和MySQldb至關的像,否則就要頭疼的看代碼了。。。 實現的方式是用socket來交互,不像mysqldb封裝了libmysqlclient那樣 !mysql


myconnpy中國的tornado大牛推薦過,可是也評價過,貌似有些bug的樣子。
sql

我這邊就用Mozilla公司 也在用的pymysql ~數據庫


我們先建立數據庫和數據表apache

201006358.jpg

安裝python的mysql模塊~多線程

200829712.jpg



PySQL針對mysql操做demo還算簡單的~ 併發


import pymysql
db = pymysql.connect(host = 'localhost', passwd = '123123', user = 'root', db = 'rui')
cursor = db.cursor()
sql='select count(*) from kkk'
data = cursor.execute(sql)
print cursor
cursor.close()
db.close()


其實我們也能夠模擬apache那樣prefork模式,來派生任務執行對象。app

prefork採用預派生子進程方式,用單獨的子進程來處理 不一樣的請求,進程之間彼此獨立。我這邊測試是mysql堵塞方式,你們也能夠利用這種方案模擬多個任務執行。有點相似多進程的樣子,消耗比較大的~異步


#!/usr/bin/python
# -*- coding: utf-8 -*-
#xiaorui.cc
import MySQLdb, pymysql
import signal, os, sys
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
workers = {}
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
def run():
  con = MySQLdb.connect(host='localhost', db='rui', user='root', passwd='123123')
  signal.signal(signal.SIGTERM, lambda sig, status: sys.exit(0))
  cur = con.cursor()
  cur.execute("SELECT SLEEP(30)")
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
def killall(sig, status):
  for pid in workers.keys():
    os.kill(pid, signal.SIGTERM)
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
def waitall():
  for pid in workers.keys():
    try:
      os.waitpid(pid, 0)
    except:
      print "waitpid: interrupted exception"
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
def main():
  print os.getpid()
  signal.signal(signal.SIGTERM, killall)
  for i in range(3):
    pid = os.fork()
    if pid == 0:
      try:
        run()
      except:
        print "run: interrupted exception"
        sys.exit(0)
    else:
      workers[pid] = 1
  waitall()
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
if __name__ == '__main__':
  main()


看到這三個sleep30都在跑嗎? 看起來生效了,但三個進程仍是消耗了130秒。socket

230057221.jpg


225729957.jpg


好了說正題,今天怎麼主要說的就是gevent和python下的mysql驅動測試,分享個gevent和pymysql的在一塊兒使用的實例demo ~


def goodquery(sql):
    db = pymysql.connect(host = 'localhost', passwd = '123123', user = 'root', db= 'rui')
    cursor = db.cursor()
    data = cursor.execute(sql)
    cursor.close()
    db.close()
    return cursor
sqla='select count(*) from kkk'
sqlb="select * from kkk where name like '%888888%'"
jobs = [gevent.spawn(goodquery, (sqla)),gevent.spawn(goodquery, (sqlb))]
#jobs = [gevent.spawn(goodquery, (sqla)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb))]
gevent.joinall(jobs, timeout=2)
what_you_want = [job.value for job in jobs]
print what_you_want
for i in what_you_want:
    for a in i:
        print a


221013629.jpg



哎,仍是有點堵塞。。。 跑了6個耗時3s的sql,共用了18秒的時間。。。。

231823673.jpg


通過一上午的折騰,得知gevent的版本沒有用對,只有gevent 1.0 才完美支持socket,而後須要在引入模塊的後面,打上別的補丁!

143638790.png


gevent.monkey.patch_socket()


real    0m8.993s
user    0m0.071s
sys     0m0.016s


在這裏我再測試下多線程的版本:

import pymysql
import threading
def goodquery(sql):
    db = pymysql.connect(host = 'localhost', passwd = '123123', user = 'root', db= 'rui')
    cursor = db.cursor()
    data = cursor.execute(sql)
    cursor.close()
    db.close()
    print cursor
    return cursor
sqla='select count(*) from kkk'
sqlb="select * from kkk where name like '%888888%'"
#jobs = [gevent.spawn(goodquery,(sqla)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb))]
#jobs = [gevent.spawn(goodquery, (sqla)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb))]
#gevent.joinall(jobs, timeout=30)
#what_you_want = [job.value for job in jobs]
threads=[]
for i in range(5):                                                                                                                
    threads.append( threading.Thread( target=goodquery,args=(sqlb,) ) )
for t in threads:
    t.start()
for t in threads:
    t.join()


034003590.jpg

他的測試結果要比gevent慢點 ~但也是併發的執行,能夠在mysql進程裏面看到執行的記錄 ~

[root@101 ~]# time python t.py
real    0m11.122s
user    0m0.095s
sys     0m0.026s


總結下:

       gevent pymysql 或者是 threading pymysql 是靠譜的~  是能夠解決大數據下的mysql讀寫堵塞的問題的~    

       可是和sohu、騰訊的朋友討論了下個人這個方案,mysql堵塞是在與事務處理時發生的。 看來mysql的堵塞不是這麼搞解決的,之後有環境後,會繼續的追蹤這事 ~

相關文章
相關標籤/搜索