目錄python
1、隊列mysql
2、生產者消費者模型git
3、協程程序員
4、select\poll\epollgithub
5、paramikosql
6、mysql API調用數據庫
1、隊列(queue)編程
隊列分如下三種:數組
class queue.Queue(maxsize=0) #先入先出服務器
class queue.LifoQueue(maxsize=0) #last in fisrt out
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列
代碼以下:
import queue class Foo(object): def __init__(self,n): self.n = n #先進先出 q = queue.Queue(maxsize=3) # q.get(timeout=3) #當隊列爲空時,get值會阻塞,加timeout參數後,會報錯 # q.get_nowait() #與上面同樣的效果 q.put([1,2,3]) q.put(Foo(1)) data = q.get_nowait() data2 = q.get_nowait() print(data,type(data)) print(data2,type(data2)) #輸出結果 # [1, 2, 3] <class 'list'> # <__main__.Foo object at 0x000001E060337278> <class '__main__.Foo'> print("-----------------") print(q.full()) #判斷隊列裏數據是否滿了,滿爲True,未滿爲False。 print(q.qsize())#查看如今隊列的大小(隊列中有多少個數據) print(q.empty())#判斷隊列是否爲空 print("---------------") #先入後出 q = queue.LifoQueue(maxsize=3) q.put(1) q.put(2) q.put(3) print(q.get_nowait()) print(q.get_nowait()) print(q.get_nowait()) #輸出結果 # 3 # 2 # 1 print("---------------") #可設施優先級的隊列 q = queue.PriorityQueue(maxsize=5) q.put((15,1)) q.put((5,2)) q.put((7,3)) q.put((1,[1,2,3])) print(q.get()) print(q.get()) print(q.get()) print(q.get()) #輸出結果 # (1, [1, 2, 3]) # (5, 2) # (7, 3) # (15, 1)
2、生產者消費者模型
import queue,threading import time,random q = queue.Queue() def producer(name): count = 0 while q.qsize() < 20: time.sleep(random.randrange(4)) q.put(count) print("producer %s has produced %s baozi.." %(name,q.qsize())) count += 1 def consumer(name): count = 0 while count <20: time.sleep(random.randrange(3)) if not q.empty(): data2 = q.get_nowait() print('\033[32;1mconsumer %s has eat %s baozi...\033[0m' %(name,count)) else: print("-----no baozi anymore-----") count += 1 p1 = threading.Thread(target=producer,args=("XiaoLi",)) p2 = threading.Thread(target=producer,args=("XiaoMin",)) c1 = threading.Thread(target=consumer,args=("PangZi",)) p1.start() p2.start() c1.start()
3、協程
線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。
協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程;
協程的好處:
缺點:
yield實現代碼:
import time,queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" %(name,new_baozi)) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n+=1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s"%n) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
Greenlet實現代碼:
from greenlet import greenlet def test1(): print("12") gr2.switch() print("34") gr2.switch() def test2(): print("56") gr1.switch() print("78") gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
Gevent實現代碼:
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
import gevent def foo(): print('\033[31;1mRunning in foo\033[0m') gevent.sleep(1) print("\033[31;1mExplicit context switch to foo again\033[0m") def bar(): print('\033[32;1mRunning in foo\033[0m') gevent.sleep(1) print("\033[32;1mExplicit context switch to foo again\033[0m") def ex(): print('\033[33;1mRunning in foo\033[0m') gevent.sleep(1) print("\033[33;1mExplicit context switch to foo again\033[0m") gevent.joinall( [ gevent.spawn(foo), gevent.spawn(bar), gevent.spawn(ex), ] )
上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn
。 初始化的greenlet列表存放在數組threads
中,此數組被傳給gevent.joinall
函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。
遇到IO阻塞時會自動切換任務
from gevent import monkey; monkey.patch_all() import gevent from urllib.request import urlopen def f(url): print('GET: %s' % url) resp = urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
經過gevent實現單線程下的多socket併發
server端
import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(s): try: while True: data = s.recv(1024) print("recv:", data) s.send(data) if not data: s.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: s.close() if __name__ == '__main__': server(8001)
client 端
import socket HOST = 'localhost' # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"),encoding="utf8") s.sendall(msg) data = s.recv(1024) #print(data) print('Received', repr(data)) s.close()
4、select\selectors\poll\epoll
select、poll、epoll區別:
select:
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。
select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import select 5 import socket 6 import sys 7 import queue 8 9 # Create a TCP/IP socket 10 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #建立socket實例 11 server.setblocking(False) #設置套接字爲非阻塞 12 13 # Bind the socket to the port 14 server_address = ('localhost', 20000) #設置通訊地址 15 print(sys.stderr, 'starting up on %s port %s' % server_address) #若是有錯誤,打印錯誤 16 server.bind(server_address) #綁定通訊地址 17 18 # Listen for incoming connections 19 server.listen(5) #最大監聽5個連接 20 21 # Sockets from which we expect to read 22 #select()會監控三個通訊列表, 23 inputs = [ server ] #定義一個全部的輸入 24 25 # Sockets to which we expect to write 26 outputs = [ ] #定義一個全部的輸出 27 28 message_queues = {} 29 while inputs: #客戶端過來的全部實例,都放到inputs裏,要使inputs爲True,將server放入 30 31 # Wait for at least one of the sockets to be ready for processing 32 print( '\nwaiting for the next event') 33 readable, writable, exceptional = select.select(inputs, outputs, inputs,2) #將三個list傳給select後,select會返回三個新的list,分別爲可讀,可寫,異常。 34 # 35 36 37 # Handle inputs 38 for s in readable: #循環readable,裏面有當前全部連接和server,若是server返回到readable裏,表明着有新鏈接,server就緒。 39 40 if s is server: #new connection #若是server就緒,則有新連接 41 # A "readable" server socket is ready to accept a connection 42 connection, client_address = s.accept() #接收新連接 43 print('new connection from', client_address) #打印 44 connection.setblocking(False) #設置新連接爲非阻塞 45 inputs.append(connection) #將新鏈接存入inputs裏,當下次select時,檢測此連接,如有數據則會返回此連接,則接受 46 47 # Give the connection a queue for data we want to send 48 message_queues[connection] = queue.Queue() #生成一個此連接的隊列,放入message_queues字典 49 else: #若是不是server,表明有數據過來 50 data = s.recv(1024) #接受數據 51 if data: #若是data有數據 52 # A readable client socket has data 53 print(sys.stderr, 'received "%s" from %s' % (str(data,'utf8'), s.getpeername()) ) #打印data 54 message_queues[s].put(str(data,'utf8')) #把data放入字典中相應的連接列表 55 # Add output channel for response 56 if s not in outputs: #若是此連接再也不outputs裏 57 outputs.append(s) #把新的連接放入outputs 58 else: #若是data沒有數據,(出錯了) 59 # Interpret empty result as closed connection 60 print('closing', client_address, 'after reading no data') #打印 61 # Stop listening for input on the connection 62 if s in outputs: #若是連接在outpus裏 63 outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,因此這時候若是這個客戶端的鏈接對象還在outputs列表中,就把它刪掉 64 inputs.remove(s) #inputs中也刪除掉 65 s.close() #把這個鏈接關閉掉 66 67 # Remove message queue 68 del message_queues[s] #把此連接在字典中的列表也刪除 69 # Handle outputs 70 for s in writable: #循環發送隊列,查找這個連接 71 try: 72 next_msg = message_queues[s].get_nowait() #獲取發送的數據 73 except queue.Empty: #若是爲空 74 # No messages waiting so stop checking for writability. 75 print('output queue for', s.getpeername(), 'is empty') 76 outputs.remove(s) #將outputs中的這個連接刪除 77 else: #若是取到數據 78 print( 'sending "%s" to %s' % (next_msg, s.getpeername())) 79 s.send(bytes(next_msg,'utf8')) #發送數據給客戶端 80 # Handle "exceptional conditions" 81 for s in exceptional: #客戶端斷開後,句柄錯誤,返回到exceptional 82 print('handling exceptional condition for', s.getpeername() ) 83 # Stop listening for input on the connection 84 inputs.remove(s) #刪除inputs中的句柄 85 if s in outputs: #若是句柄在outputs中 86 outputs.remove(s) #將outputs中的句柄刪除 87 s.close() #關閉連接 88 89 # Remove message queue 90 del message_queues[s] #刪除此連接在message_queues中的列表
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socket 4 import sys 5 #消息列表 6 messages = [ 'This is the message. ', 7 'It will be sent ', 8 'in parts.', 9 ] 10 server_address = ('localhost', 20000) #設置通訊地址 11 12 # Create a TCP/IP socket 13 #配置連接組 14 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), 15 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 16 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 17 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 18 ] 19 20 # Connect the socket to the port where the server is listening 21 print(sys.stderr, 'connecting to %s port %s' % server_address) #打印連接信息 22 for s in socks: #循環連接配置組 23 s.connect(server_address) #連接服務器 24 25 for message in messages: #循環消息列表 26 27 # Send messages on both sockets 28 for s in socks: #循環連接組 29 print(sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)) #打印連接信息與消息 30 s.send(bytes(message,"utf8")) #向服務器端發送消息 31 32 # Read responses on both sockets 33 for s in socks: #循環連接組 34 data = s.recv(1024) #接受服務端發來的數據 35 print(sys.stderr, '%s: received "%s"' % (s.getsockname(), str(data,'utf8'))) #打印消息 36 if not data: #若是接受的數據爲空 37 print(sys.stderr, 'closing socket', s.getsockname()) #打印出錯信息 38 s.close() #關閉鏈接
selectors:
This module allows high-level and efficient I/O multiplexing, built upon the select module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.
該模塊容許高層次和高效率的輸入/輸出複用,創建在選擇模塊原語。用戶被鼓勵使用這個模塊,除非他們想要精確控制操做系統級的原語。
1 import selectors 2 import socket 3 4 sel = selectors.DefaultSelector() 5 6 def accept(sock, mask): 7 conn, addr = sock.accept() # Should be ready 8 print('accepted', conn, 'from', addr) 9 conn.setblocking(False) 10 sel.register(conn, selectors.EVENT_READ, read) 11 12 def read(conn, mask): 13 data = conn.recv(1000) # Should be ready 14 if data: 15 print('echoing', repr(data), 'to', conn) 16 conn.send(data) # Hope it won't block 17 else: 18 print('closing', conn) 19 sel.unregister(conn) 20 conn.close() 21 22 sock = socket.socket() 23 sock.bind(('localhost', 10000)) 24 sock.listen(100) 25 sock.setblocking(False) 26 sel.register(sock, selectors.EVENT_READ, accept) 27 28 while True: 29 events = sel.select() 30 for key, mask in events: 31 callback = key.data 32 callback(key.fileobj, mask)
poll:
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。
poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。
epoll:
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。
epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。
epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。
另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
5、paramiko
執行遠程命令:
import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect("某IP地址",22,"用戶名", "口令(密碼)") stdin, stdout, stderr = ssh.exec_command("你的命令") print stdout.readlines() ssh.close()
上傳文件到遠程:
import paramiko t = paramiko.Transport(("某IP地址",22)) t.connect(username = "用戶名", password = "口令") sftp = paramiko.SFTPClient.from_transport(t) remotepath='/tmp/test.txt' localpath='/tmp/test.txt' sftp.put(localpath,remotepath) t.close()
從遠端下載文件:
import paramiko t = paramiko.Transport(("某IP地址",22)) t.connect(username = "用戶名", password = "口令") sftp = paramiko.SFTPClient.from_transport(t) remotepath='/tmp/test.txt' localpath='/tmp/test.txt' sftp.get(remotepath, localpath) t.close()
6、MySQL操做
mysql基本操做
一、數據庫操做
show databases; use [databasename]; create database [name];
二、數據表操做
show tables; create table students ( id int not null auto_increment primary key, name char(8) not null, sex char(4) not null, age tinyint unsigned not null, tel char(13) null default "-" );
三、數據操做
insert into students(name,sex,age,tel) values('alex','man',18,'151515151') delete from students where id =2; update students set name = 'sb' where id =1; select * from students
四、其餘
主鍵 外鍵 左右鏈接
mysql API操做
插入數據
import MySQLdb conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='123',db='s12') cur = conn.cursor() reCount = cur.execute('insert into day9(Name,Address) values(%s,%s)',('tom','usa')) conn.commit() cur.close() conn.close() print reCount
批量添加
import MySQLdb conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='123',db='s12') cur = conn.cursor() li =[ ('tonny','cn'), ('xiaoming','cn'), ] reCount = cur.executemany('insert into day9(Name,Address) values(%s,%s)',li) conn.commit() cur.close() conn.close() print reCount
注意:cur.lastrowid
刪除數據
import MySQLdb conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='123',db='s12') cur = conn.cursor() reCount = cur.execute('delete from day9') conn.commit() cur.close() conn.close() print reCount
修改數據
import MySQLdb conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='123',db='s12') cur = conn.cursor() reCount = cur.execute('update day9 set Name = %s',('alin',)) conn.commit() cur.close() conn.close() print reCount
查詢數據
import MySQLdb conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='123',db='s12') cur = conn.cursor() reCount = cur.execute('select * from day9') res = cur.fetchone() #查詢一條 #res = cur.fetchall() #查詢所有 #res = cur.fetchmany(3) #查詢幾條 print(res)
conn.commit() cur.close() conn.close() print reCount
回滾操做
import MySQLdb conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='123',db='s12') cur = conn.cursor() reCount = cur.execute('insert into day9(Name,Address) values(%s,%s)',('tom','usa')) reCount = cur.execute('insert into day9(Name,Address) values(%s,%s)',('cat','hk'))
conn.rollback()#回滾操做 conn.commit() cur.close() conn.close() print reCount