本節內容html
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.python
queue.
Queue
(maxsize=0) #先入先出
queue.
LifoQueue
(maxsize=0) #last in fisrt out
queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.mysql
The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]
). A typical pattern for entries is a tuple in the form: (priority_number, data)
.linux
queue.
Empty
Exception raised when non-blocking get()
(or get_nowait()
) is called on a Queue
object which is empty.git
queue.
Full
Exception raised when non-blocking put()
(or put_nowait()
) is called on a Queue
object which is full.程序員
Queue.
qsize
()
Queue.
empty
() #return True if empty
Queue.
full
() # return True if full
Queue.
put
(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full
exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full
exception (timeout is ignored in that case).github
Queue.
put_nowait
(item)
Equivalent to put(item, False)
.sql
Queue.
get
(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty
exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty
exception (timeout is ignored in that case).數據庫
Queue.
get_nowait
()
Equivalent to get(False)
.編程
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
Queue.
task_done
()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get()
used to fetch a task, a subsequent call to task_done()
tells the queue that the processing on the task is complete.
If a join()
is currently blocking, it will resume when all items have been processed (meaning that a task_done()
call was received for every item that had been put()
into the queue).
Raises a ValueError
if called more times than there were items placed in the queue.
Queue.
join
() block直到queue被消費完畢
1 import time,random 2 import queue,threading 3 q = queue.Queue() 4 def Producer(name): 5 count = 0 6 while count <20: 7 time.sleep(random.randrange(3)) 8 q.put(count) 9 print('Producer %s has produced %s baozi..' %(name, count)) 10 count +=1 11 def Consumer(name): 12 count = 0 13 while count <20: 14 time.sleep(random.randrange(4)) 15 if not q.empty(): 16 data = q.get() 17 print(data) 18 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 19 else: 20 print("-----no baozi anymore----") 21 count +=1 22 p1 = threading.Thread(target=Producer, args=('A',)) 23 c1 = threading.Thread(target=Consumer, args=('B',)) 24 p1.start() 25 c1.start()
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是協程:協程是一種用戶態的輕量級線程。
協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:
協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
協程的好處:
缺點:
使用yield實現協程操做例子
1 import time 2 import queue 3 def consumer(name): 4 print("--->starting eating baozi...") 5 while True: 6 new_baozi = yield 7 print("[%s] is eating baozi %s" % (name,new_baozi)) 8 #time.sleep(1) 9 10 def producer(): 11 12 r = con.__next__() 13 r = con2.__next__() 14 n = 0 15 while n < 5: 16 n +=1 17 con.send(n) 18 con2.send(n) 19 print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) 20 21 22 if __name__ == '__main__': 23 con = consumer("c1") 24 con2 = consumer("c2") 25 p = producer()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 5 from greenlet import greenlet 6 7 8 def test1(): 9 print 12 10 gr2.switch() 11 print 34 12 gr2.switch() 13 14 15 def test2(): 16 print 56 17 gr1.switch() 18 print 78 19 20 gr1 = greenlet(test1) 21 gr2 = greenlet(test2) 22 gr1.switch()
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
1 import gevent 2 3 def foo(): 4 print('Running in foo') 5 gevent.sleep(0) 6 print('Explicit context switch to foo again') 7 8 def bar(): 9 print('Explicit context to bar') 10 gevent.sleep(0) 11 print('Implicit context switch back to bar') 12 13 gevent.joinall([ 14 gevent.spawn(foo), 15 gevent.spawn(bar), 16 ])
輸出:
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
同步與異步的性能區別
1 import gevent 2 3 def task(pid): 4 """ 5 Some non-deterministic task 6 """ 7 gevent.sleep(0.5) 8 print('Task %s done' % pid) 9 10 def synchronous(): 11 for i in range(1,10): 12 task(i) 13 14 def asynchronous(): 15 threads = [gevent.spawn(task, i) for i in range(10)] 16 gevent.joinall(threads) 17 18 print('Synchronous:') 19 synchronous() 20 21 print('Asynchronous:') 22 asynchronous()
python 3.x: from urllib.request import urlopen
python 2.x: from urllib2 import urlopen
1 from gevent import monkey; monkey.patch_all() 2 import gevent 3 #from urllib2 import urlopen 4 from urllib.request import urlopen 5 6 def f(url): 7 print('GET: %s' % url) 8 resp = urlopen(url) 9 data = resp.read() 10 print('%d bytes received from %s.' % (len(data), url)) 11 12 gevent.joinall([ 13 gevent.spawn(f, 'https://www.python.org/'), 14 gevent.spawn(f, 'https://www.yahoo.com/'), 15 gevent.spawn(f, 'https://github.com/'), 16 ])
在訪問第一個python頁面時,正常是等待接收返回的數據,可是此時會接着訪問yahoo的頁面,等待數據時,又會去返回github頁面,至關於併發操做
經過gevent實現單線程下的多socket併發
server side
import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(s): try: while True: data = s.recv(1024) print("recv:", data) s.send(data) if not data: s.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: s.close() if __name__ == '__main__': server(8001)
client side
1 import socket 2 3 HOST = 'localhost' # The remote host 4 PORT = 8001 # The same port as used by the server 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 6 s.connect((HOST, PORT)) 7 while True: 8 msg = bytes(input(">>:"),encoding="utf8") 9 s.sendall(msg) 10 data = s.recv(1024) 11 #print(data) 12 13 print('Received', repr(data)) 14 s.close()
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。
讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。
在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。
在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。
在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。
當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:
當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。
網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。
http://www.cnblogs.com/alex3714/p/4372426.html
selectors模塊
This module allows high-level and efficient I/O multiplexing, built upon the select
module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.
1 import selectors 2 import socket 3 4 sel = selectors.DefaultSelector() 5 6 def accept(sock, mask): 7 conn, addr = sock.accept() # Should be ready 8 print('accepted', conn, 'from', addr) 9 conn.setblocking(False) 10 sel.register(conn, selectors.EVENT_READ, read) 11 12 def read(conn, mask): 13 data = conn.recv(1000) # Should be ready 14 if data: 15 print('echoing', repr(data), 'to', conn) 16 conn.send(data) # Hope it won't block 17 else: 18 print('closing', conn) 19 sel.unregister(conn) 20 conn.close() 21 22 sock = socket.socket() 23 sock.bind(('localhost', 10000)) 24 sock.listen(100) 25 sock.setblocking(False) 26 sel.register(sock, selectors.EVENT_READ, accept) 27 28 while True: 29 events = sel.select() 30 for key, mask in events: 31 callback = key.data 32 callback(key.fileobj, mask)
SSHClient
用於鏈接遠程服務器並執行基本命令
基於用戶名密碼鏈接:
1 import paramiko 2 3 # 建立SSH對象 4 ssh = paramiko.SSHClient() 5 # 容許鏈接不在know_hosts文件中的主機 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 鏈接服務器 8 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123') 9 10 # 執行命令 11 stdin, stdout, stderr = ssh.exec_command('df') 12 # 獲取命令結果 13 result = stdout.read() 14 15 # 關閉鏈接 16 ssh.close()
基於公鑰密鑰鏈接:
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') 4 5 # 建立SSH對象 6 ssh = paramiko.SSHClient() 7 # 容許鏈接不在know_hosts文件中的主機 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 9 # 鏈接服務器 10 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key) 11 12 # 執行命令 13 stdin, stdout, stderr = ssh.exec_command('df') 14 # 獲取命令結果 15 result = stdout.read() 16 17 # 關閉鏈接 18 ssh.close()
SFTPClient
用於鏈接遠程服務器並執行上傳下載
基於用戶名密碼上傳下載
1 import paramiko 2 3 transport = paramiko.Transport(('hostname',22)) 4 transport.connect(username='wupeiqi',password='123') 5 6 sftp = paramiko.SFTPClient.from_transport(transport) 7 # 將location.py 上傳至服務器 /tmp/test.py 8 sftp.put('/tmp/location.py', '/tmp/test.py') 9 # 將remove_path 下載到本地 local_path 10 sftp.get('remove_path', 'local_path') 11 12 transport.close()
基於公鑰密鑰上傳下載
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') 4 5 transport = paramiko.Transport(('hostname', 22)) 6 transport.connect(username='wupeiqi', pkey=private_key ) 7 8 sftp = paramiko.SFTPClient.from_transport(transport) 9 # 將location.py 上傳至服務器 /tmp/test.py 10 sftp.put('/tmp/location.py', '/tmp/test.py') 11 # 將remove_path 下載到本地 local_path 12 sftp.get('remove_path', 'local_path') 13 14 transport.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import paramiko 4 import uuid 5 6 class Haproxy(object): 7 8 def __init__(self): 9 self.host = '172.16.103.191' 10 self.port = 22 11 self.username = 'wupeiqi' 12 self.pwd = '123' 13 self.__k = None 14 15 def create_file(self): 16 file_name = str(uuid.uuid4()) 17 with open(file_name,'w') as f: 18 f.write('sb') 19 return file_name 20 21 def run(self): 22 self.connect() 23 self.upload() 24 self.rename() 25 self.close() 26 27 def connect(self): 28 transport = paramiko.Transport((self.host,self.port)) 29 transport.connect(username=self.username,password=self.pwd) 30 self.__transport = transport 31 32 def close(self): 33 34 self.__transport.close() 35 36 def upload(self): 37 # 鏈接,上傳 38 file_name = self.create_file() 39 40 sftp = paramiko.SFTPClient.from_transport(self.__transport) 41 # 將location.py 上傳至服務器 /tmp/test.py 42 sftp.put(file_name, '/home/wupeiqi/tttttttttttt.py') 43 44 def rename(self): 45 46 ssh = paramiko.SSHClient() 47 ssh._transport = self.__transport 48 # 執行命令 49 stdin, stdout, stderr = ssh.exec_command('mv /home/wupeiqi/tttttttttttt.py /home/wupeiqi/ooooooooo.py') 50 # 獲取命令結果 51 result = stdout.read() 52 53 54 ha = Haproxy() 55 ha.run()
Python 操做 Mysql 模塊的安裝
1 linux: 2 yum install MySQL-python 3 4 window: 5 http://files.cnblogs.com/files/wupeiqi/py-mysql-win.zip
SQL基本使用
一、數據庫操做
1 show databases; 2 use [databasename]; 3 create database [name];
二、數據表操做
1 show tables; 2 3 create table students 4 ( 5 id int not null auto_increment primary key, 6 name char(8) not null, 7 sex char(4) not null, 8 age tinyint unsigned not null, 9 tel char(13) null default "-" 10 );
CREATE TABLE `wb_blog` ( `id` smallint(8) unsigned NOT NULL, `catid` smallint(5) unsigned NOT NULL DEFAULT '0', `title` varchar(80) NOT NULL DEFAULT '', `content` text NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `catename` (`catid`) ) ;
三、數據操做
1 insert into students(name,sex,age,tel) values('alex','man',18,'151515151') 2 3 delete from students where id =2; 4 5 update students set name = 'sb' where id =1; 6 7 select * from students
四、其餘
1 主鍵 2 外鍵 3 左右鏈接
Python MySQL API
1、插入數據
1 import MySQLdb 2 3 conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb') 4 5 cur = conn.cursor() 6 7 reCount = cur.execute('insert into UserInfo(Name,Address) values(%s,%s)',('alex','usa')) 8 # reCount = cur.execute('insert into UserInfo(Name,Address) values(%(id)s, %(name)s)',{'id':12345,'name':'wupeiqi'}) 9 10 conn.commit() 11 12 cur.close() 13 conn.close() 14 15 print reCount
1 import MySQLdb 2 3 conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb') 4 5 cur = conn.cursor() 6 7 li =[ 8 ('alex','usa'), 9 ('sb','usa'), 10 ] 11 reCount = cur.executemany('insert into UserInfo(Name,Address) values(%s,%s)',li) 12 13 conn.commit() 14 cur.close() 15 conn.close() 16 17 print reCount
注意:cur.lastrowid
2、刪除數據
1 import MySQLdb 2 3 conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb') 4 5 cur = conn.cursor() 6 7 reCount = cur.execute('delete from UserInfo') 8 9 conn.commit() 10 11 cur.close() 12 conn.close() 13 14 print reCount
3、修改數據
import MySQLdb conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb') cur = conn.cursor() reCount = cur.execute('update UserInfo set Name = %s',('alin',)) conn.commit() cur.close() conn.close() print reCount
4、查數據
1 # ############################## fetchone/fetchmany(num) ############################## 2 3 import MySQLdb 4 5 conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb') 6 cur = conn.cursor() 7 8 reCount = cur.execute('select * from UserInfo') 9 10 print cur.fetchone() 11 print cur.fetchone() 12 cur.scroll(-1,mode='relative') 13 print cur.fetchone() 14 print cur.fetchone() 15 cur.scroll(0,mode='absolute') 16 print cur.fetchone() 17 print cur.fetchone() 18 19 cur.close() 20 conn.close() 21 22 print reCount 23 24 25 26 # ############################## fetchall ############################## 27 28 import MySQLdb 29 30 conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb') 31 #cur = conn.cursor(cursorclass = MySQLdb.cursors.DictCursor) 32 cur = conn.cursor() 33 34 reCount = cur.execute('select Name,Address from UserInfo') 35 36 nRet = cur.fetchall() 37 38 cur.close() 39 conn.close() 40 41 print reCount 42 print nRet 43 for i in nRet: 44 print i[0],i[1]
安裝 http://www.rabbitmq.com/install-standalone-mac.html
安裝python rabbitMQ module