1.動態導入模塊html
2.粘包問題python
3.paramkio(ssh連接模塊)linux
4.多線程git
5.GIL鎖github
6.互訴鎖windows
7.遞歸鎖緩存
8.Semaphore(信號量)服務器
9.事件(多線程標誌位)網絡
10.隊列(queue)多線程
11.生產者消費者模型
12.多進程
13.進程之間通信
14.進程之間數據共享
15.進程池
16. 協程
17.事件驅動
18.堵塞IO 非堵塞,同步IO,異步IO
aa.py def test(): print("ok") class C: def __init__(self): self.name = 'abc' __import__ data = __import__('day5.aa') a =data.aa a.test() b =data.aa.C() print(b.name) -------------------------------------------- import importlib aa = importlib.import_module('day5.aa') print(aa.C().name) aa.test() --------------------------------------------- conn.send(str(len(cmd_res.encode())).encode("utf-8")) -------------------------------------------------------------
while True: cmd = input(">>:").strip() if len(cmd) == 0:continue if cmd.startwith("get"): clinet.send(cmd.encode()) file_toal_size = int(server_response.decode()) received_size = 0 filename = cmd.split()[1] f = open(filename,'wb') m = hashlib.md5() while received_size < file_toal_size: if file_toal_size - received_size > 1024: size = 1024 else: size = file_toal_size - received_size data = client.recv(size) received_size += len(data) f.write(data) else: new_file_md5 = m.hexdigest() -------------------------------------------- import socketserver class MyTCPHandler(socketserver.BaseRequestHandler): def handler(self): while True: try: self.data = self.request.recv(1024).strip() print("{}wrote:".format(self.client_address[0])) print(self.data) self.request.send(self.data.upper()) except ConnectionAbortedError as e: print("ree",e) break if __name__ == "__main__": HOST,PORT = "localhost",9999
paramiko模塊安裝
http://blog.csdn.net/qwertyupoiuytr/article/details/54098029
#!/usr/bin/env python # _*_ encoding:utf-8 _*_ import paramiko #建立SSH對象 ssh = paramiko.SSHClient() #容許連接不在know_host文件主機中 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #連接服務器 ssh.connect(hostname='192.168.80.11',port=22,username='root',password='123.com') #執行命令 stdin,stdout,stderr = ssh.exec_command('df') #獲取結果 res,err = stdout.read(),stderr.read() resilt = res if res else err print(resilt) #關閉鏈接 ssh.close() ------------------------------------------------------- transport = paramiko.Transport(('192.168.80.11',22)) transport.connect(username='root',password='123.com') ssh = paramiko.SSHClient() ssh._transport = transport stdin,stdout,stderr = ssh.exec_command('df') print(stdout.read()) transport.close()
transport = paramiko.Transport(('192.168.80.11',22)) transport.connect(username='root',password='123.com') sftp = paramiko.SFTPClient.from_transport(transport) #上傳文件 sftp.put('windows.txt','/root/win.txt') #下載文件 #sftp.get('linux.txt','linux.txt') transport.close()
linux 拷貝公鑰 ssh-copy-id "root@192.168.80.11" #指定公鑰 private_key = paramiko.RSAKey.from_private_key_file('id_rsa') ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='192.168.80.11',port=22,username='root',pkey=private_key) stdin,stdout,stderr = ssh.exec_command('df') result = stdout.read() print(result) ssh.close()
private_key = paramiko.RSAKey.from_private_key_file('id_rsa') transport = paramiko.Transport(('192.168.80.11',22)) transport.connect(username='root',pkey=private_key) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put('windows.txt','/root/windows.txt') sftp.get('linux.txt','linux.txt') transport.close()
io 操做不沾用cpu
計算佔用cpu,1+1
python多線程 不適合cpu密集操做型的任務,適合io密集型的任務操做
def run(n): print('task',n) time.sleep(2) t1=threading.Thread(target=run,args=('t1',)) t2=threading.Thread(target=run,args=('t2',)) t1.start() t2.start() ############ import threading import time class MyThread(threading.Thread): def __init__(self,n): super(MyThread,self).__init__() self.n = n def run(self): print("runnint task",self.n) t1 = MyThread("t1") t1.start() ################ start_time = time.time() t_objs = [] def run(n): print('task',n) time.sleep(2) for i in range(50): t = threading.Thread(target=run,args=('t-%s'%i,)) t.start() t_objs.append(t) for t in t_objs: t.join() print('--------------all-------------') print("cost",time.time() - start_time)
print('-all--',threading.current_thread(),threading.active_count())
import threading import time start_time = time.time() t_objs = [] def run(n): print('task',n) time.sleep(2) for i in range(50): t = threading.Thread(target=run,args=('t-%s'%i,)) t.setDaemon(True) #把子線程變成守護線程(守護線程,主線程執行完推出,不等待守護線程執行結束) t.start() t_objs.append(t) for t in t_objs: t.join() print('--------------all-------------',threading.current_thread(),threading.active_count()) print("cost",time.time() - start_time)
python只能執行一個進程,因此在執的多進程工做室,是利用上下切花來完成的 由於python是調用C語言原始的進程接口,不能夠調整 進程工做的順序,在同一時間內只有一個進程在處理數據
互訴鎖 防止上下切換覆蓋數據 import threading import time def run(n): lock.acpuire() #加鎖 global num num += 1 time.sleep(1) lock.release() #釋放鎖 lock = threading.Lock() #調用鎖 num = 0 t_objs = [] for i in range(50): t = threading.Thread(target=run,args=("t-%s"%i,)) t.start() t_objs.append(t) for t in t_objs: t.join() print('num',num)
遞歸鎖 防止鎖順序錯亂 import threading,time def run1(): print("grab the first part dara") lock.acquire() global num num +=1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2+=2 lock.release() return num2 def run3(): lock.acquire() res = run1() print("----run1 run2-----") res2 = run2() lock.release() print(res,res2) if __name__ == '__main__': num,num2 = 0,0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() !=1: print(threading.active_count()) else: print(num,num2)
互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。 import threading,time def run(n): semaphore.acquire() time.sleep(1) print("run the threading:%s\n"%n) semaphore.release() if __name__ == '__main__': semaphore = threading.BoundedSemaphore(5) #容許5個線程同時運行 for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start() while threading.active_count() !=1: pass else: print('----all-------')
import threading,time event = threading.Event() def lighter(): count = 0 event.set() while True: if count >5 and count <10: event.clear() print("\033[41m--->紅燈\033[0m") elif count >10: event.set() count = 0 else: print("\033[42m--->綠燈\033[0m") time.sleep(1) count +=1 def car(name): while True: if event.is_set(): print("[%s] running..."% name) time.sleep(1) else: print("[%s]sees red light waiting.."%name) event.wait() print("\033[34m[%s] green ligth is on start going....\033[0m"%name) light = threading.Thread(target=lighter,) light.start() car1 = threading.Thread(target=car,args=("Tesla",)) car1.start()
解耦,使程序直接耦合,提升程序效率,一個進程修改不影響其餘進程
q = queue.Queue() # q = queue.Queue(maxsize=3) 設置隊列數量 q.put(1) 傳數據 q.put(2) q.put(3) print(q.qsize()) 查看隊列大小 print(q.get()) 取數據 print(q.get()) # print(q.get_nowait()) 取數據爲空時不會卡住 # print(q.get(block=False)) 設置false取數據爲空時不會卡住 print(q.get(timeout=1)) 設置其數據時間爲1秒
import queue q = queue.LifoQueue() q.put(1) q.put(2) q.put(2) print(q.get()) print(q.get()) print(q.get())
import queue q = queue.PriorityQueue() q.put((-1,"a")) q.put((3,"b")) q.put((6,"c")) print(q.get()) print(q.get()) print(q.get())
import threading import queue q = queue.Queue() def producer(): for i in range(10): q.put("骨頭%s"%i) print("開始等待骨頭被取走。。。") q.join() print("全部骨頭被取完了。。。") def consumer(n): while q.qsize() >0: print("%s 取到"%n,q.get()) q.task_done() p = threading.Thread(target=producer,) p.start() # b = threading.Thread(target=consumer,args=("abc",)) # b.start() consumer("abc")
def run(name): time.sleep(2) print("hello",name) if __name__ == '__main__': for i in range(10): p = multiprocessing.Process(target=run,args=("bob %s"%i,)) p.start() def thread_run(): print(threading.get_ident()) #返回當前線程的「線程標識符 def run(name): time.sleep(2) print("hello",name) t = threading.Thread(target=thread_run,) t.start() if __name__ == '__main__': for i in range(10): p = multiprocessing.Process(target=run,args=("bob %s"%i,)) p.start()
from multiprocessing import Process,Queue def f(qq): qq.put([42,None,'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f,args=(q,)) p.start() print(q.get()) p.join() #################### from multiprocessing import Process import os def info(title): print(title) print('module name',__name__) print('parent process',os.getppid()) 打印父進程id print('process id:',os.getpid()) 打印子進程id print("\n\n") def f(name): info('\033[31mfunction f\033[0m') print('hello',name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=f,args=('bob',)) p.start() p.join() #Pipes #Pipe是經過管道傳送和接受數據的 from multiprocessing import Process,Pipe def f(conn): conn.send([42,None,"hello"]) conn.close() if __name__ == '__main__': parent_conn,child_conn = Pipe() p = Process(target=f,args=(child_conn,)) p.start() print(parent_conn.recv()) p.join()
from multiprocessing import Process,Manager import os def f(d,l): d[os.getpid()] = os.getpid() l.append(os.getpid()) print(l) print(d) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list =[] for i in range(10): p = Process(target=f,args=(d,l,)) p.start() p_list.append(p) for res in p_list: res.join() # print(d) # print(l) ---------------------------------------------- from multiprocessing import Process,Lock def f(l,i): l.acquire() try: print('hello world',i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f,args=(lock,num)).start()
from multiprocessing import process,Pool,freeze_support (windows 須要加,freeze_support) import time,os def Foo(i): time.sleep(2) print('in process',os.getpid()) return i + 100 def Bar(arg): print('-->exec done:',arg,os.getpid()) if __name__ == '__main__': pool = Pool(processes=3) 容許進程池同時放入5個進程 print("主進程",os.getpid()) for i in range(10): pool.apply_async(func=Foo,args=(i,),callback=Bar) #同步(並行) (callback方法 執行完Foo執行Bar 避免重複的長鏈接) #pool.apply(func=Foo,args=(i,)) 串行 print('end') pool.close() pool.join()
import time import queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n +=1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") producer() ######################### from greenlet import greenlet def test1(): print(12) #2 gr2.switch() #切換 print(34) #4 gr2.switch() #切換 def test2(): print(56) #3 gr1.switch() #切換 print(78) #5 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() #1
協程切換原理 遇到IO操做就切換,執行時間短的先執行,(IO 爲等待時間) import gevent def foo(): print('foo 1') #1 gevent.sleep(2) print('foo 2') #6 def bar(): print('bar 1') #2 gevent.sleep(1) print('bar 2') #5 def func3(): print("func 1") #3 gevent.sleep(0) print('func 2') #4 gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), gevent.spawn(func3), ])
#!/usr/bin/env python # _*_ encoding:utf-8 _*_ from greenlet import greenlet from urllib import request import gevent,time from gevent import monkey monkey.patch_all() #把當前程序全部的IO操做作上標記(不然gevent沒法識別) def f(url): print('GET: %s'% url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s'%(len(data),url)) urls = ['https://www.python.org/ ', 'https://www.yahoo.com/ ', 'https://github.com/ '] time_start = time.time() for url in urls: f(url) print("cost",time.time() - time_start) async_time_start = time.time() gevent.joinall([ gevent.spawn(f,'https://www.python.org/ '), gevent.spawn(f,'https://www.yahoo.com/ '), gevent.spawn(f,'https://github.com/ '),]) print("cost",time.time() - async_time_start)
server #!/usr/bin/env python # _*_ encoding:utf-8 _*_ import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('localhost',port)) s.listen(500) while True: cli,addr = s.accept() gevent.spawn(handle_request,cli) def handle_request(conn): try: while True: data = conn.recv(1024) print('recv:',data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001) clinet import socket HOST = 'localhost' PORT = 8001 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((HOST,PORT)) while True: msg = bytes(input(">>"),encoding='utf8') s.sendall(msg) data = s.recv(1024) print('Received',data) s.close()
對事件作處理 例如:點擊鼠標 放到一個時間列表 按鍵盤放到一個時間列表 有一個進程來處理
http://www.cnblogs.com/alex3714/articles/5876749.html
服務收到一個事件,會放到對應的列表裏面,文件描述符就是對應的索引,而索引對應是文件句柄(文件對象)
程序不能夠直接調用系內核,程序打開文件都是調用內核來完成的,例如拷貝文件,是先拷貝到內核緩存區 而後再拷貝到io
一個進程正在執行,另外一個進程在等待,就形成了堵塞。
當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,
而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,
而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,
因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,
那麼它立刻就將數據拷貝到了用戶內存,而後返回。
因此,nonblocking IO的特色是用戶進程須要不斷的主動詢問kernel數據好了沒有。
IO multiplexing就是咱們說的select,poll,epoll,
有些地方也稱這種IO方式爲event driven IO。
select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。
它的基本原理就是select,poll,
epoll這個function會不斷的輪詢所負責的全部socket,
當某個socket有數據到達了,就通知用戶進程。
當用戶進程調用了select,那麼整個進程會被block,
而同時,kernel會「監視」全部select負責的socket,
當任何一個socket中的數據準備好了,select就會返回。
這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。
因此,I/O 多路複用的特色是經過一種機制一個進程能同時等待多個文件描述符,
而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就能夠返回。
select,poll,epoll是IO多路復中監視數據
select
例若有100個連接過來 內核檢測到其中只有兩個有數據,內核不會告訴select
所以select 須要本身循環查找消耗事件,
select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,
不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。
poll
poll在1986年誕生於System V Release 3,
它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。
epoll
它幾乎具有了以前所說的一切優勢,
被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。
例若有100個連接過來 內核檢測到其中只有兩個有數據,
內核會直接告訴epoll只有兩個有數據 就不用本身查找。
用戶進程發起read操做以後,馬上就能夠開始去作其它的事。
而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,
首先它會馬上返回,因此不會對用戶進程產生任何block。而後,
kernel會等待數據準備完成,而後將數據拷貝到用戶內存,
當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。
阻塞IO,非阻塞IO,IO多路複用:都爲同步IO
異步IO:異步IO
IO多路複用之select模式
1 #!/usr/bin/env python 2 # _*_ encoding:utf-8 _*_ 3 4 import socket 5 import queue 6 import select 7 8 msg_dic = {} 9 10 server = socket.socket() 11 server.bind(('localhost',9000)) 12 server.listen(1000) 13 14 #不阻塞 15 server.setblocking(False) 16 17 inputs = [server,] 18 outputs = [] 19 20 while True: #新鏈接 ,下次循環執行,異常 21 readable,writeable,execeptional=select.select(inputs,outputs,inputs) #io多路複用select模式 22 for r in readable: 23 if r is server: #表明來了一個新鏈接 24 conn,addr = server.accept() 25 print("來了新鏈接",addr) 26 inputs.append(conn) #是由於這個新建的連接尚未發送數據過來,如今就接收, 27 #因此要想實現這個客戶端發送數據來時server能知道,就讓server再監測這個conn. 28 29 msg_dic[conn]= queue.Queue() #初始化一個隊列,後面存要返給這個客戶端的數據 30 else: 31 data = r.recv(1024) 32 print('收到數據',data) 33 msg_dic[r].put(data) #把新來的連接添加隊列 34 outputs.append(r) #添加outputs下次循環執行 35 36 for w in writeable: #要返回給客戶端的連接列表 37 data_to_client = msg_dic[w].get() #取隊列數據 38 w.send(data_to_client) #發送數據 39 outputs.remove(w) #確保下次循環的時候writeabke,不反回已經處理完的連接 40 41 for e in execeptional: 42 if e in outputs: 43 outputs.remove(e) #刪除 44 45 inputs.remove(e) #刪除 46 47 del msg_dic[e] #刪除 48 49 server
1 import socket 2 3 HOST = 'localhost' 4 PORT = 9000 5 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 6 s.connect((HOST,PORT)) 7 8 while True: 9 msg = bytes(input(">>"),encoding='utf8') 10 s.sendall(msg) 11 data = s.recv(1024) 12 print('Received',data) 13 14 s.close()
IO多路複用之epoll模式
1 #!/usr/bin/env python 2 # _*_ encoding:utf-8 _*_ 3 import selectors 4 import socket 5 6 sel = selectors.DefaultSelector() 7 8 def accept(sock,mask): 9 conn,addr = sock.accept() 10 conn.setblocking(False) 11 sel.register(conn,selectors.EVENT_READ,read) 12 13 def read(conn,mask): 14 data = conn.recv(1000) 15 if data: 16 print('echoing', repr(data), 'to', conn) 17 conn.send(data) 18 else: 19 print('closing', conn) 20 sel.unregister(conn) 21 conn.close() 22 23 24 25 sock = socket.socket() 26 sock.bind(('localhost',9999)) 27 sock.listen(100) 28 sock.setblocking(False) #設置非堵塞 29 sel.register(sock,selectors.EVENT_READ,accept) #註冊 30 31 while True: 32 events = sel.select() #默認堵塞,有活動連接就返回活動連接列表 33 for key,mask in events: #有連接過來 34 callback = key.data #accept 35 callback(key.fileobj,mask) #執行accept函數 key.fileobj=conn
1 import socket 2 import sys 3 4 messages = ['This is the mess', 5 'It will be sent', 6 'in parts', 7 ] 8 9 server_address = ('localhost',9999) 10 11 socks = [socket.socket(socket.AF_INET,socket.SOCK_STREAM) for i in range(5)] 12 13 for s in socks: 14 s.connect(server_address) 15 16 for message in messages: 17 for s in socks: 18 s.send(message.encode()) 19 print('send %s %s' % s.getsockname(),message) 20 21 for s in socks: 22 data = s.recv(1024) 23 print('recv %s %s'% s.getsockname(),data) 24 if not data: 25 print('not data %s %s'%s.getsockname(),data) 26 s.close()