Python學習路程day9

本節內容html

  1. Gevent協程
  2. Select\Poll\Epoll異步IO與事件驅動
  3. Python鏈接Mysql數據庫操做
  4. RabbitMQ隊列
  5. Redis\Memcached緩存
  6. Paramiko SSH
  7. Twsited網絡框架

queue隊列 

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.python

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.mysql

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).linux

exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.git

exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.程序員

Queue. qsize ()
Queue. empty () #return True if empty  
Queue. full () # return True if full 
Queue. put (itemblock=Truetimeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).github

Queue. put_nowait (item)

Equivalent to put(item, False).sql

Queue. get (block=Truetimeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).數據庫

Queue. get_nowait ()

Equivalent to get(False).編程

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue. task_done ()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue. join () block直到queue被消費完畢

生產者消費者模型

 1 import time,random
 2 import queue,threading
 3 q = queue.Queue()
 4 def Producer(name):
 5   count = 0
 6   while count <20:
 7     time.sleep(random.randrange(3))
 8     q.put(count)
 9     print('Producer %s has produced %s baozi..' %(name, count))
10     count +=1
11 def Consumer(name):
12   count = 0
13   while count <20:
14     time.sleep(random.randrange(4))
15     if not q.empty():
16         data = q.get()
17         print(data)
18         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
19     else:
20         print("-----no baozi anymore----")
21     count +=1
22 p1 = threading.Thread(target=Producer, args=('A',))
23 c1 = threading.Thread(target=Consumer, args=('B',))
24 p1.start()
25 c1.start()

協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是協程:協程是一種用戶態的輕量級線程

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

 

協程的好處:

  • 無需線程上下文切換的開銷
  • 無需原子操做鎖定及同步的開銷
  • 方便切換控制流,簡化編程模型
  • 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

缺點:

  • 沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

使用yield實現協程操做例子    

 1 import time
 2 import queue
 3 def consumer(name):
 4     print("--->starting eating baozi...")
 5     while True:
 6         new_baozi = yield
 7         print("[%s] is eating baozi %s" % (name,new_baozi))
 8         #time.sleep(1)
 9  
10 def producer():
11  
12     r = con.__next__()
13     r = con2.__next__()
14     n = 0
15     while n < 5:
16         n +=1
17         con.send(n)
18         con2.send(n)
19         print("\033[32;1m[producer]\033[0m is making baozi %s" %n )
20  
21  
22 if __name__ == '__main__':
23     con = consumer("c1")
24     con2 = consumer("c2")
25     p = producer()

Greenlet

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3   
 4   
 5 from greenlet import greenlet
 6   
 7   
 8 def test1():
 9     print 12
10     gr2.switch()
11     print 34
12     gr2.switch()
13   
14   
15 def test2():
16     print 56
17     gr1.switch()
18     print 78
19   
20 gr1 = greenlet(test1)
21 gr2 = greenlet(test2)
22 gr1.switch()

Gevent 

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

 1 import gevent
 2  
 3 def foo():
 4     print('Running in foo')
 5     gevent.sleep(0)
 6     print('Explicit context switch to foo again')
 7  
 8 def bar():
 9     print('Explicit context to bar')
10     gevent.sleep(0)
11     print('Implicit context switch back to bar')
12  
13 gevent.joinall([
14     gevent.spawn(foo),
15     gevent.spawn(bar),
16 ])

輸出:

Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar

 同步與異步的性能區別 

 1 import gevent
 2  
 3 def task(pid):
 4     """
 5     Some non-deterministic task
 6     """
 7     gevent.sleep(0.5)
 8     print('Task %s done' % pid)
 9  
10 def synchronous():
11     for i in range(1,10):
12         task(i)
13  
14 def asynchronous():
15     threads = [gevent.spawn(task, i) for i in range(10)]
16     gevent.joinall(threads)
17  
18 print('Synchronous:')
19 synchronous()
20  
21 print('Asynchronous:')
22 asynchronous()

urllib

python 3.x:    from urllib.request import urlopen

python 2.x:    from urllib2 import urlopen

 1 from gevent import monkey; monkey.patch_all()
 2 import gevent
 3 #from urllib2 import urlopen
 4 from  urllib.request import urlopen
 5 
 6 def f(url):
 7     print('GET: %s' % url)
 8     resp = urlopen(url)
 9     data = resp.read()
10     print('%d bytes received from %s.' % (len(data), url))
11 
12 gevent.joinall([
13         gevent.spawn(f, 'https://www.python.org/'),
14         gevent.spawn(f, 'https://www.yahoo.com/'),
15         gevent.spawn(f, 'https://github.com/'),
16 ])

在訪問第一個python頁面時,正常是等待接收返回的數據,可是此時會接着訪問yahoo的頁面,等待數據時,又會去返回github頁面,至關於併發操做

經過gevent實現單線程下的多socket併發

 server side 

import sys
import socket
import time
import gevent
 
from gevent import socket,monkey
monkey.patch_all()
def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)
def handle_request(s):
    try:
        while True:
            data = s.recv(1024)
            print("recv:", data)
            s.send(data)
            if not data:
                s.shutdown(socket.SHUT_WR)
 
    except Exception as  ex:
        print(ex)
    finally:
 
        s.close()
if __name__ == '__main__':
    server(8001)
View Code

 client side  

 1 import socket
 2  
 3 HOST = 'localhost'    # The remote host
 4 PORT = 8001           # The same port as used by the server
 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 6 s.connect((HOST, PORT))
 7 while True:
 8     msg = bytes(input(">>:"),encoding="utf8")
 9     s.sendall(msg)
10     data = s.recv(1024)
11     #print(data)
12  
13     print('Received', repr(data))
14 s.close()
View Code

論事件驅動與異步IO

事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。

讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。

在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。

在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。

當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:

  1. 程序中有許多任務,並且…
  2. 任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…
  3. 在等待事件到來時,某些任務會阻塞。

當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。

網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。

Select\Poll\Epoll異步IO 

 http://www.cnblogs.com/alex3714/p/4372426.html

selectors模塊

This module allows high-level and efficient I/O multiplexing, built upon the select module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.

 1 import selectors
 2 import socket
 3  
 4 sel = selectors.DefaultSelector()
 5  
 6 def accept(sock, mask):
 7     conn, addr = sock.accept()  # Should be ready
 8     print('accepted', conn, 'from', addr)
 9     conn.setblocking(False)
10     sel.register(conn, selectors.EVENT_READ, read)
11  
12 def read(conn, mask):
13     data = conn.recv(1000)  # Should be ready
14     if data:
15         print('echoing', repr(data), 'to', conn)
16         conn.send(data)  # Hope it won't block
17     else:
18         print('closing', conn)
19         sel.unregister(conn)
20         conn.close()
21  
22 sock = socket.socket()
23 sock.bind(('localhost', 10000))
24 sock.listen(100)
25 sock.setblocking(False)
26 sel.register(sock, selectors.EVENT_READ, accept)
27  
28 while True:
29     events = sel.select()
30     for key, mask in events:
31         callback = key.data
32         callback(key.fileobj, mask)

 

數據庫操做與Paramiko模塊 

SSHClient

用於鏈接遠程服務器並執行基本命令

基於用戶名密碼鏈接:

 1 import paramiko
 2   
 3 # 建立SSH對象
 4 ssh = paramiko.SSHClient()
 5 # 容許鏈接不在know_hosts文件中的主機
 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 7 # 鏈接服務器
 8 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123')
 9   
10 # 執行命令
11 stdin, stdout, stderr = ssh.exec_command('df')
12 # 獲取命令結果
13 result = stdout.read()
14   
15 # 關閉鏈接
16 ssh.close()
View Code

基於公鑰密鑰鏈接:

 1 import paramiko
 2  
 3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
 4  
 5 # 建立SSH對象
 6 ssh = paramiko.SSHClient()
 7 # 容許鏈接不在know_hosts文件中的主機
 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 9 # 鏈接服務器
10 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)
11  
12 # 執行命令
13 stdin, stdout, stderr = ssh.exec_command('df')
14 # 獲取命令結果
15 result = stdout.read()
16  
17 # 關閉鏈接
18 ssh.close()
View Code

SFTPClient

用於鏈接遠程服務器並執行上傳下載

基於用戶名密碼上傳下載

 1 import paramiko
 2  
 3 transport = paramiko.Transport(('hostname',22))
 4 transport.connect(username='wupeiqi',password='123')
 5  
 6 sftp = paramiko.SFTPClient.from_transport(transport)
 7 # 將location.py 上傳至服務器 /tmp/test.py
 8 sftp.put('/tmp/location.py', '/tmp/test.py')
 9 # 將remove_path 下載到本地 local_path
10 sftp.get('remove_path', 'local_path')
11  
12 transport.close()
View Code

基於公鑰密鑰上傳下載

 1 import paramiko
 2  
 3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
 4  
 5 transport = paramiko.Transport(('hostname', 22))
 6 transport.connect(username='wupeiqi', pkey=private_key )
 7  
 8 sftp = paramiko.SFTPClient.from_transport(transport)
 9 # 將location.py 上傳至服務器 /tmp/test.py
10 sftp.put('/tmp/location.py', '/tmp/test.py')
11 # 將remove_path 下載到本地 local_path
12 sftp.get('remove_path', 'local_path')
13  
14 transport.close()
View Code
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import paramiko
 4 import uuid
 5 
 6 class Haproxy(object):
 7 
 8     def __init__(self):
 9         self.host = '172.16.103.191'
10         self.port = 22
11         self.username = 'wupeiqi'
12         self.pwd = '123'
13         self.__k = None
14 
15     def create_file(self):
16         file_name = str(uuid.uuid4())
17         with open(file_name,'w') as f:
18             f.write('sb')
19         return file_name
20 
21     def run(self):
22         self.connect()
23         self.upload()
24         self.rename()
25         self.close()
26 
27     def connect(self):
28         transport = paramiko.Transport((self.host,self.port))
29         transport.connect(username=self.username,password=self.pwd)
30         self.__transport = transport
31 
32     def close(self):
33 
34         self.__transport.close()
35 
36     def upload(self):
37         # 鏈接,上傳
38         file_name = self.create_file()
39 
40         sftp = paramiko.SFTPClient.from_transport(self.__transport)
41         # 將location.py 上傳至服務器 /tmp/test.py
42         sftp.put(file_name, '/home/wupeiqi/tttttttttttt.py')
43 
44     def rename(self):
45 
46         ssh = paramiko.SSHClient()
47         ssh._transport = self.__transport
48         # 執行命令
49         stdin, stdout, stderr = ssh.exec_command('mv /home/wupeiqi/tttttttttttt.py /home/wupeiqi/ooooooooo.py')
50         # 獲取命令結果
51         result = stdout.read()
52 
53 
54 ha = Haproxy()
55 ha.run()
demo

數據庫操做

Python 操做 Mysql 模塊的安裝

1 linux:
2     yum install MySQL-python
3  
4 window:
5     http://files.cnblogs.com/files/wupeiqi/py-mysql-win.zip

SQL基本使用

一、數據庫操做

1 show databases;
2 use [databasename];
3 create database  [name];

二、數據表操做

 1 show tables;
 2  
 3 create table students
 4     (
 5         id int  not null auto_increment primary key,
 6         name char(8) not null,
 7         sex char(4) not null,
 8         age tinyint unsigned not null,
 9         tel char(13) null default "-"
10     );
CREATE TABLE `wb_blog` ( 
    `id` smallint(8) unsigned NOT NULL, 
    `catid` smallint(5) unsigned NOT NULL DEFAULT '0', 
    `title` varchar(80) NOT NULL DEFAULT '', 
    `content` text NOT NULL, 
    PRIMARY KEY (`id`), 
    UNIQUE KEY `catename` (`catid`) 
) ; 
View Code

三、數據操做

1 insert into students(name,sex,age,tel) values('alex','man',18,'151515151')
2  
3 delete from students where id =2;
4  
5 update students set name = 'sb' where id =1;
6  
7 select * from students

四、其餘

1 主鍵
2 外鍵
3 左右鏈接

Python MySQL API

1、插入數據

 1 import MySQLdb
 2   
 3 conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')
 4   
 5 cur = conn.cursor()
 6   
 7 reCount = cur.execute('insert into UserInfo(Name,Address) values(%s,%s)',('alex','usa'))
 8 # reCount = cur.execute('insert into UserInfo(Name,Address) values(%(id)s, %(name)s)',{'id':12345,'name':'wupeiqi'})
 9   
10 conn.commit()
11   
12 cur.close()
13 conn.close()
14   
15 print reCount
 1 import MySQLdb
 2 
 3 conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')
 4 
 5 cur = conn.cursor()
 6 
 7 li =[
 8      ('alex','usa'),
 9      ('sb','usa'),
10 ]
11 reCount = cur.executemany('insert into UserInfo(Name,Address) values(%s,%s)',li)
12 
13 conn.commit()
14 cur.close()
15 conn.close()
16 
17 print reCount
批量插入數據

注意:cur.lastrowid

2、刪除數據

 1 import MySQLdb
 2  
 3 conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')
 4  
 5 cur = conn.cursor()
 6  
 7 reCount = cur.execute('delete from UserInfo')
 8  
 9 conn.commit()
10  
11 cur.close()
12 conn.close()
13  
14 print reCount

3、修改數據

import MySQLdb
 
conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')
 
cur = conn.cursor()
 
reCount = cur.execute('update UserInfo set Name = %s',('alin',))
 
conn.commit()
cur.close()
conn.close()
 
print reCount

4、查數據

 1 # ############################## fetchone/fetchmany(num)  ##############################
 2  
 3 import MySQLdb
 4  
 5 conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')
 6 cur = conn.cursor()
 7  
 8 reCount = cur.execute('select * from UserInfo')
 9  
10 print cur.fetchone()
11 print cur.fetchone()
12 cur.scroll(-1,mode='relative')
13 print cur.fetchone()
14 print cur.fetchone()
15 cur.scroll(0,mode='absolute')
16 print cur.fetchone()
17 print cur.fetchone()
18  
19 cur.close()
20 conn.close()
21  
22 print reCount
23  
24  
25  
26 # ############################## fetchall  ##############################
27  
28 import MySQLdb
29  
30 conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')
31 #cur = conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)
32 cur = conn.cursor()
33  
34 reCount = cur.execute('select Name,Address from UserInfo')
35  
36 nRet = cur.fetchall()
37  
38 cur.close()
39 conn.close()
40  
41 print reCount
42 print nRet
43 for i in nRet:
44     print i[0],i[1]

 

RabbitMQ隊列  

安裝 http://www.rabbitmq.com/install-standalone-mac.html

安裝python rabbitMQ module 

相關文章
相關標籤/搜索