一、什麼是裝飾器html
二、裝飾器的做用node
三、使用高階函數模仿裝飾器功能python
1.定義:把一個函數名當作實參傳給另外一個函數
2.返回值中包含函數名
3.下面使用高階函數雖然能夠實現裝飾器的一些功能,可是違反了裝飾器不能改變調用方式的原則,
之前使用bar()如今將調用方式改編成了test1(bar)就是將bar的函數名當作變量傳給了test1()mysql
#! /usr/bin/env python # -*- coding: utf-8 -*- import time def timer(func): start_time = time.time() func() print '函數執行時間爲', time.time() - start_time def test(): print '開始執行test' time.sleep(3) print 'test執行結束' timer(test) ''' 開始執行test test執行結束 函數執行時間爲 3.00332999229 '''
4.高階函數——不修改高階函數的調用方式增長新的功能(可是沒法傳參數)
注:bar = test2(bar) 等價於:@timer從新將函數名bar賦值,將原函數bar的內存地址當作實參傳遞該函數test2(),再將test2()賦值給barlinux
import time def bar(): time.sleep(3) print("in the bar") def test2(func): print(func) return func bar = test2(bar) bar()
5.嵌套函數
嵌套函數:在一個函數中嵌套另外一個函數,並在函數內部調用git
def foo(): print("in the foo") def bar(): print("in the bar") bar() foo()
四、可以適應90%的業務需求github
import time def timer(func): #timer(test1) func=test1 def deco(*args,**kwargs): start_time = time.time() func(*args,**kwargs) #run test1 stop_time = time.time() print("running time is %s"%(stop_time-start_time)) return deco @timer # test1=timer(test1) def test1(): time.sleep(3) print("in the test1") @timer def test2(name): print("in the test2",name) test1() test2("tom")
五、對特定網頁進行身份驗證redis
import time user,passwd = 'aaa','123' def auth(func): def wrapper(*args,**kwargs): username = input("Username:").strip() password = input("Password:").strip() if user == username and password == passwd: print("User has passed authentication") res = func(*args,**kwargs) #這裏執行func()至關於執行調用的函數如home() return res #爲了得到home()函數返回值,能夠將執行結果賦值給res而後返回print(home())結果是"from home"而不是"None"了 else: exit("Invalid username or password") return wrapper def index(): print("welcome to index page") @auth def home(): print("welcome to home page") return "from home" @auth def bbs(): print("welcome to bbs page") index() print(home()) #在這裏調用home()至關於調用wrapper() bbs()
六、實現對不一樣網頁不一樣方式的身份認證算法
import time user,passwd = 'aaa','123' def auth(auth_type): print("auth func:",auth_type) def outer_wrapper(func): def wrapper(*args, **kwargs): print("wrapper func args:", *args, **kwargs) if auth_type == "local": username = input("Username:").strip() password = input("Password:").strip() if user == username and passwd == password: print("\033[32;1mUser has passed authentication\033[0m") res = func(*args, **kwargs) # from home print("---after authenticaion ") return res else: exit("\033[31;1mInvalid username or password\033[0m") elif auth_type == "ldap": print("搞毛線ldap,不會。。。。") return wrapper return outer_wrapper def index(): print("welcome to index page") @auth(auth_type="local") # home = wrapper() def home(): print("welcome to home page") return "from home" @auth(auth_type="ldap") def bbs(): print("welcome to bbs page") index() print(home()) #wrapper() bbs()
#! /usr/bin/env python # -*- coding: utf-8 -*- import time def auth(auth_type): print("auth func:",auth_type) def outer_wrapper(func): def wrapper(*args, **kwargs): print("wrapper func args:", *args, **kwargs) print('運行前') func(*args, **kwargs) print('運行後') return wrapper return outer_wrapper @auth(auth_type="local") # home = wrapper() def home(): print("welcome to home page") return "from home" home()
七、使用閉包實現裝飾器功能sql
閉包概念:
#! /usr/bin/env python # -*- coding: utf-8 -*- import time def timer(func): #timer(test1) func=test1 def deco(*args,**kwargs): # # 函數嵌套 start_time = time.time() func(*args,**kwargs) # 跨域訪問,引用了外部變量func (func實質是函數內存地址) stop_time = time.time() print "running time is %s"%(stop_time-start_time) return deco # 內層函數做爲外層函數返回值 def test(name): print "in the test2",name time.sleep(2) test = timer(test) # 等價於 ==》 @timer語法糖 test("tom") ''' 運行結果: in the test2 tom running time is 2.00302696228 '''
一、什麼是生成器
二、定義
三、生成器哪些場景應用
import time t1 = time.time() g = (i for i in range(100000000)) t2 = time.time() lst = [i for i in range(100000000)] t3 = time.time() print('生成器時間:',t2 - t1) # 生成器時間: 0.0 print('列表時間:',t3 - t2) # 列表時間: 5.821957349777222
四、生成器的做用
print( [i*2 for i in range(10)] ) #列表生成式: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] print( (i*2 for i in range(10)) ) #生 成 器: <generator object <genexpr> at 0x005A3690>
g = (i*2 for i in range(10)) print( g.__next__() ) # 0 print( g.__next__() ) # 2
五、生成器工做原理
六、yield生成器運行機制
def fib(max_num): a,b = 1,1 while a < max_num: yield b a,b=b,a+b g = fib(10) #生成一個生成器:[1,2, 3, 5, 8, 13] print(g.__next__()) #第一次調用返回:1 print(list(g)) #把剩下元素變成列表:[2, 3, 5, 8, 13]
七、yield實現單線程下的併發效果
def consumer(name): print("%s 準備吃包子啦!" %name) while True: baozi = yield print("包子[%s]來了,被[%s]吃了!" %(baozi,name)) c = consumer("Tom") c.__next__() b1 = "韭菜餡包子" c.send(b1) # c.send(b1)做用: # c.send()的做用是給yied的傳遞一個值,而且每次調用c.send()的同時自動調用一次__next__ '''運行結果: Tom 準備吃包子啦! 包子[韭菜餡包子]來了,被[Tom]吃了! '''
import time def consumer(name): print("%s 準備吃包子啦!" %name) while True: baozi = yield print("包子[%s]來了,被[%s]吃了!" %(baozi,name)) def producer(name): c = consumer('A') c2 = consumer('B') c.__next__() c2.__next__() print("老子開始準備作包子啦!") for i in range(10): time.sleep(1) print("作了2個包子!") c.send(i) c2.send(i) producer("alex") '''運行結果: A 準備吃包子啦! B 準備吃包子啦! 老子開始準備作包子啦! 作了2個包子! 包子[0]來了,被[A]吃了! 包子[0]來了,被[B]吃了! 作了2個包子! 包子[1]來了,被[A]吃了! 包子[1]來了,被[B]吃了! 作了2個包子! 包子[2]來了,被[A]吃了! 包子[2]來了,被[B]吃了! 作了2個包子! 包子[3]來了,被[A]吃了! 包子[3]來了,被[B]吃了! 作了2個包子! 包子[4]來了,被[A]吃了! 包子[4]來了,被[B]吃了! 作了2個包子! 包子[5]來了,被[A]吃了! 包子[5]來了,被[B]吃了! '''
一、什麼是迭代器
二、定義:
三、迭代器和可迭代對象
for
循環的對象都是可迭代的(Iterable)類型;next()
函數的對象都是迭代器(Iterator)
類型,它們表示一個惰性計算的序列;list
、dict
、str
等是可迭代的但不是迭代器,不過能夠經過iter()
函數得到一個Iterator
對象。for
循環本質上就是經過不斷調用next()
函數實現的四、迭代器的兩個方法
a = iter([1,2,]) #生成一個迭代器 print(a.__next__()) print(a.__next__()) print(a.__next__()) #在這一步會引起 「StopIteration」 的異常
五、判斷是迭代器和可迭代對象
注:列表,元組,字典是可迭代的但不是迭代器
from collections import Iterable print(isinstance([],Iterable)) #True print(isinstance({},Iterable)) #True print(isinstance((),Iterable)) #True print(isinstance("aaa",Iterable)) #True print(isinstance((x for x in range(10)),Iterable)) #True
六、列表不是迭代器,只有生成器是迭代器
from collections import Iterator t = [1,2,3,4] print(isinstance(t,Iterator)) #False t1 = iter(t) print(isinstance(t1,Iterator)) #True
七、自定義迭代器
#! /usr/bin/env python # -*- coding: utf-8 -*- class MyRange(object): def __init__(self, n): self.idx = 0 self.n = n def __iter__(self): return self def next(self): if self.idx < self.n: val = self.idx self.idx += 1 return self.n[val] else: raise StopIteration() l = [4,5,6,7,8] obj = MyRange(l) print obj.next() # 4 print obj.next() # 5 print obj.next() # 6
八、迭代器與生成器
#! /usr/bin/env python # -*- coding: utf-8 -* l = [1,2,3,4,5] # 列表是一個可迭代對象,不是一個迭代器 print dir(l) # 因此 l 中有 __iter__() 方法,沒有 __next__()方法 iter_obj = l.__iter__() # __iter__()方法返回迭代器對象自己(這個迭代器對象就會有 next 方法了) print '###################################\n' print iter_obj.next() # 1 print iter_obj.next() # 2 print iter_obj.next() # 3
一、什麼是進程(process)?(進程是資源集合)
二、進程是資源分配的最小單位( 內存、cpu、網絡、io)
三、一個運行起來的程序就是一個進程
CPU分時
進程如何通訊
爲何須要進程池
二、定義:進程是資源分配最小單位
三、進程併發性:
四、線程併發性:
五、有了進程爲何還要線程?
1.進程優勢:
2. 進程的兩個重要缺點
六、什麼是線程(thread)(線程是操做系統最小的調度單位)
七、進程和線程的區別
八、進程和程序的區別
Python多線程編程中經常使用方法:
GIL全局解釋器鎖:
線程鎖(互斥鎖):
一、線程2種調用方式:直接調用, 繼承式調用
import threading import time def sayhi(num): # 定義每一個線程要運行的函數 print("running on number:%s" % num) time.sleep(3) #一、target=sayhi :sayhi是定義的一個函數的名字 #二、args=(1,) : 括號內寫的是函數的參數 t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一個線程實例 t2 = threading.Thread(target=sayhi, args=(2,)) # 生成另外一個線程實例 t1.start() # 啓動線程 t2.start() # 啓動另外一個線程 print(t1.getName()) # 獲取線程名 print(t2.getName())
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#定義每一個線程要運行的函數 print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
二、for循環同時啓動多個線程
import threading import time def sayhi(num): #定義每一個線程要運行的函數 print("running on number:%s" %num) time.sleep(3) for i in range(50): t = threading.Thread(target=sayhi,args=('t-%s'%i,)) t.start()
三、t.join(): 實現全部線程都執行結束後再執行主線程
import threading import time start_time = time.time() def sayhi(num): #定義每一個線程要運行的函數 print("running on number:%s" %num) time.sleep(3) t_objs = [] #將進程實例對象存儲在這個列表中 for i in range(50): t = threading.Thread(target=sayhi,args=('t-%s'%i,)) t.start() #啓動一個線程,程序不會阻塞 t_objs.append(t) print(threading.active_count()) #打印當前活躍進程數量 for t in t_objs: #利用for循環等待上面50個進程所有結束 t.join() #阻塞某個程序 print(threading.current_thread()) #打印執行這個命令進程 print("----------------all threads has finished.....") print(threading.active_count()) print('cost time:',time.time() - start_time)
四、setDaemon(): 守護線程,主線程退出時,須要子線程隨主線程退出
import threading import time start_time = time.time() def sayhi(num): #定義每一個線程要運行的函數 print("running on number:%s" %num) time.sleep(3) for i in range(50): t = threading.Thread(target=sayhi,args=('t-%s'%i,)) t.setDaemon(True) #把當前線程變成守護線程,必須在t.start()前設置 t.start() #啓動一個線程,程序不會阻塞 print('cost time:',time.time() - start_time)
五、GIL鎖和用戶鎖(Global Interpreter Lock 全局解釋器鎖)
import time import threading lock = threading.Lock() #1 生成全局鎖 def addNum(): global num #2 在每一個線程中都獲取這個全局變量 print('--get num:',num ) time.sleep(1) lock.acquire() #3 修改數據前加鎖 num -= 1 #4 對此公共變量進行-1操做 lock.release() #5 修改後釋放
在有GIL的狀況下執行 count = count + 1 會出錯緣由解析,用線程鎖解決方法
# 1)第一步:count = 0 count初始值爲0 # 2)第二步:線程1要執行對count加1的操做首先申請GIL全局解釋器鎖 # 3)第三步:調用操做系統原生線程在操做系統中執行 # 4)第四步:count加1還未執行完畢,時間到了被要求釋放GIL # 5)第五步:線程1釋放了GIL後線程2此時也要對count進行操做,此時線程1還未執行完,因此count仍是0 # 6)第六步:線程2此時拿到count = 0後也要對count進行加1操做,假如線程2執行很快,一次就完成了 # count加1的操做,那麼count此時就從0變成了1 # 7)第七步:線程2執行完加1後就賦值count=1並釋放GIL # 8)第八步:線程2執行完後cpu又交給了線程1,線程1根據上下文繼續執行count加1操做,先拿到GIL # 鎖,完成加1操做,因爲線程1先拿到的數據count=0,執行完加1後結果仍是1 # 9)第九步:線程1將count=1在次賦值給count並釋放GIL鎖,此時連個線程都對數據加1,可是值最終是1
1 >> lock = threading.Lock() #定義一把鎖 2 >> lock.acquire() #對數據操做前加鎖防止數據被另外一線程操做 3 >> lock.release() #對數據操做完成後釋放鎖
六、死鎖
from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A鎖\033[0m' %self.name) mutexB.acquire() print('\033[42m%s 拿到B鎖\033[0m' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('\033[43m%s 拿到B鎖\033[0m' %self.name) time.sleep(2) mutexA.acquire() print('\033[44m%s 拿到A鎖\033[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(2): t=MyThread() t.start() # 運行結果:輸出下面結果後程序卡死,再也不向下進行了 # Thread-1 拿到A鎖 # Thread-1 拿到B鎖 # Thread-1 拿到B鎖 # Thread-2 拿到A鎖
七、遞歸鎖:lock = threading.RLock() 解決死鎖問題
from threading import Thread,Lock,RLock import time mutexA=mutexB=RLock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 拿到A鎖' %self.name) mutexB.acquire() print('%s 拿到B鎖' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 拿到B鎖' % self.name) time.sleep(0.1) mutexA.acquire() print('%s 拿到A鎖' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(5): t=MyThread() t.start() # 下面是運行結果:不會產生死鎖 # Thread-1 拿到A鎖 # Thread-1 拿到B鎖 # Thread-1 拿到B鎖 # Thread-1 拿到A鎖 # Thread-2 拿到A鎖 # Thread-2 拿到B鎖 # Thread-2 拿到B鎖 # Thread-2 拿到A鎖 # Thread-4 拿到A鎖 # Thread-4 拿到B鎖 # Thread-4 拿到B鎖 # Thread-4 拿到A鎖 # Thread-3 拿到A鎖 # Thread-3 拿到B鎖 # Thread-3 拿到B鎖 # Thread-3 拿到A鎖 # Thread-5 拿到A鎖 # Thread-5 拿到B鎖 # Thread-5 拿到B鎖 # Thread-5 拿到A鎖
八、Semaphore(信號量)
# import threading,time # def run(n): # semaphore.acquire() # time.sleep(1) # print("run the thread: %s\n" %n) # semaphore.release() # # if __name__ == '__main__': # semaphore = threading.BoundedSemaphore(5) #最多容許5個線程同時運行 # for i in range(22): # t = threading.Thread(target=run,args=(i,)) # t.start() # # while threading.active_count() != 1: # pass #print threading.active_count() # else: # print('----all threads done---') # 代碼結果說明:這裏能夠清晰看到運行時0-4是同時運行的沒有順序,並且是前五個, # 表示再semaphore這個信號量的定義下程序同時僅能執行5個線程
九、events總共就只有四個方法
1. event.set() : # 設置標誌位 2. event.clear() : # 清除標誌位 3. event.wait() : # 等待標誌被設定 4. event.is_set() : # 判斷標誌位是否被設定
import time,threading event = threading.Event() #第一:寫一個紅綠燈的死循環 def lighter(): count = 0 event.set() #1先設置爲綠燈 while True: if count > 5 and count <10: #2改爲紅燈 event.clear() #3把標誌位清了 print("red light is on.....") elif count > 10: event.set() #4再設置標誌位,變綠燈 count = 0 else: print("green light is on.....") time.sleep(1) count += 1 #第二:寫一個車的死循環 def car(name): while True: if event.is_set(): #設置了標誌位表明綠燈 print("[%s] is running"%name) time.sleep(1) else: print('[%s] sees red light, waiting......'%name) event.wait() print('[%s] green light is on,start going.....'%name) light = threading.Thread(target=lighter,) light.start() car1 = threading.Thread(target=car,args=("Tesla",)) car1.start()
import multiprocessing,time,threading #3 被多線程調用的函數 def thread_run(): print(threading.get_ident()) #打印線程id號 time.sleep(2) #2 被多進程調用的函數,以及在這個函數中起一個進程 def run(name): time.sleep(2) print("hello",name) t = threading.Thread(target=thread_run,) #在進程調用的函數中啓用一個線程 t.start() #1 一次性啓動多個進程 if __name__ == '__main__': for i in range(10): p = multiprocessing.Process(target=run,args=('bob %s'%i,)) #啓用一個多線程 p.start()
from multiprocessing import Process import queue import threading def f(): q.put([42, None, 'hello']) if __name__ == '__main__': q = queue.Queue() #1 在父進程中定義一個隊列實例q # p = threading.Thread(target=f,) #在線程程中就能夠相互訪問,線程中內存共享 p = Process(target=f,) #2 在父進程中起一個子進程 p,在子進程中使用父進程的q會報錯 p.start() print(q.get()) p.join()
from multiprocessing import Process, Queue def f(qq): # 將符進程中的q傳遞過來叫qq qq.put([42, None, 'hello']) # 此時子進程就可使用符進程中的q if __name__ == '__main__': q = Queue() # 使用Queue()在父進程中定義一個隊列實例q p = Process(target=f, args=(q,)) # 在父進程中起一個子進程 p,將父進程剛定義的q傳遞給子進程p p.start() print(q.get()) p.join() # 運行結果: [42, None, 'hello']
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) # 3 子進程發送數據,就像socket同樣 print("son process recv:", conn.recv()) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() # 1 生成一個管道實例,實例一輩子成就會生成兩個返回對象,一個是管道這頭,一個是管道那頭 p = Process(target=f, args=(child_conn,)) # 2 啓動一個子進程將管道其中一頭傳遞給子進程 p.start() print(parent_conn.recv()) # 4 父進程收消息 # prints "[42, None, 'hello']" parent_conn.send('i am parent process') p.join() # 運行結果: # [42, None, 'hello'] # son process recv: i am parent process
from multiprocessing import Process, Manager import os def f(d, l): d[1] = '1' # 是個進程對字典放入的是同一個值,因此看上去效果不明顯 l.append(os.getpid()) # 將這是個進程的進程id放入列表中 if __name__ == '__main__': with Manager() as manager: # 1 將Manager()賦值給manager d = manager.dict() # 2 定義一個能夠在多個進程間能夠共享的字典 l = manager.list(range(5)) # 3 定義一個能夠在多個進程間能夠共享的列表,默認寫五個數據 p_list = [] for i in range(10): # 生成是個進程 p = Process(target=f, args=(d, l)) # 將剛剛生成的可共享字典和列表傳遞給子進程 p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
from multiprocessing import Process, Lock def f(l, i): l.acquire() #一個進程要打印數據時先鎖定 print('hello world', i) l.release() #打印完畢後就釋放這把鎖 if __name__ == '__main__': lock = Lock() #先生成一把鎖 for num in range(5): Process(target=f, args=(lock, num)).start() # 運行結果: # hello world 4 # hello world 0 # hello world 2 # hello world 3 # hello world 1
from multiprocessing import Process,Pool import time,os def foo(i): time.sleep(2) print("in the process",os.getpid()) #打印子進程的pid return i+100 def call(arg): print('-->exec done:',arg,os.getpid()) if __name__ == '__main__': pool = Pool(3) #進程池最多容許5個進程放入進程池 print("主進程pid:",os.getpid()) #打印父進程的pid for i in range(10): #用法1 callback做用是指定只有當Foo運行結束後就執行callback調用的函數,父進程調用的callback函數 pool.apply_async(func=foo, args=(i,),callback=call) #用法2 串行 啓動進程不在用Process而是直接用pool.apply() # pool.apply(func=foo, args=(i,)) print('end') pool.close() #關閉pool pool.join() #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。
殭屍進程
#!/usr/bin/env python #coding=utf8 import os, sys, time #產生子進程 pid = os.fork() if pid == 0: #子進程退出 sys.exit(0) #父進程休息30秒 time.sleep(30) # 先產生一個子進程,子進程退出,父進程休息30秒,那就會產生一個殭屍進程
[root@linux-node4 ~]# ps -ef| grep defunct root 110401 96083 0 19:11 pts/2 00:00:00 python defunct.py root 110402 110401 0 19:11 pts/2 00:00:00 [python] <defunct> root 110406 96105 0 19:11 pts/3 00:00:00 grep --color=auto defunct
一、什麼是協程(進入上一次調用的狀態)
二、協程的好處
三、協程缺點
四、使用yield實現協程相同效果
import time import queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield # 只要遇到yield程序就返回,yield還能夠接收數據 print("[%s] is eating baozi %s" % (name, new_baozi)) time.sleep(1) def producer(): r = con.__next__() # 直接調用消費者的__next__方法 r = con2.__next__() # 函數裏面有yield第一次加括號調用會變成一個生成器函數不執行,運行next才執行 n = 0 while n < 5: n += 1 con.send(n) # send恢復生成器同時並傳遞一個值給yield con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" % n) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
五、協程爲什麼能處理大併發1:Greenlet遇到I/O手動切換
from greenlet import greenlet def test1(): print(12) #4 gr1會調用test1()先打印12 gr2.switch() #5 而後gr2.switch()就會切換到gr2這個協程 print(34) #8 因爲在test2()切換到了gr1,因此gr1又從上次中止的位置開始執行 gr2.switch() #9 在這裏又切換到gr2,會再次切換到test2()中執行 def test2(): print(56) #6 啓動gr2後會調用test2()打印56 gr1.switch() #7 而後又切換到gr1 print(78) #10 切換到gr2後會接着上次執行,打印78 gr1 = greenlet(test1) #1 啓動一個協程gr1 gr2 = greenlet(test2) #2 啓動第二個協程gr2 gr1.switch() #3 首先gr1.switch() 就會去執行gr1這個協程
六、協程爲什麼能處理大併發2:Gevent遇到I/O自動切換
七、Gevent實現簡單的自動切換小例子
import gevent def func1(): print('\033[31;1m第一次打印\033[0m') gevent.sleep(2) # 爲何用gevent.sleep()而不是time.sleep()由於是爲了模仿I/O print('\033[31;1m第六次打印\033[0m') def func2(): print('\033[32;1m第二次打印\033[0m') gevent.sleep(1) print('\033[32;1m第四次打印\033[0m') def func3(): print('\033[32;1m第三次打印\033[0m') gevent.sleep(1) print('\033[32;1m第五次打印\033[0m') gevent.joinall([ # 將要啓動的多個協程放到event.joinall的列表中,便可實現自動切換 gevent.spawn(func1), # gevent.spawn(func1)啓動這個協程 gevent.spawn(func2), gevent.spawn(func3), ]) # 運行結果: # 第一次打印 # 第二次打印 # 第三次打印 # 第四次打印 # 第五次打印 # 第六次打印
八、使用Gevent實現併發下載網頁與串行下載網頁時間比較
from urllib import request import gevent,time from gevent import monkey monkey.patch_all() #把當前程序全部的I/O操做給我單獨作上標記 def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) #1 併發執行部分 time_binxing = time.time() gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ]) print("並行時間:",time.time()-time_binxing) #2 串行部分 time_chuanxing = time.time() urls = [ 'https://www.python.org/', 'https://www.yahoo.com/', 'https://github.com/', ] for url in urls: f(url) print("串行時間:",time.time()-time_chuanxing) # 注:爲何要在文件開通使用monkey.patch_all() # 1. 由於有不少模塊在使用I / O操做時Gevent是沒法捕獲的,因此爲了使Gevent可以識別出程序中的I / O操做。 # 2. 就必須使用Gevent模塊的monkey模塊,把當前程序全部的I / O操做給我單獨作上標記 # 3.使用monkey作標記僅用兩步便可: 第一步(導入monkey模塊): from gevent import monkey 第二步(聲明作標記) : monkey.patch_all()
九、經過gevent本身實現單線程下的多socket併發
import gevent from gevent import socket,monkey #下面使用的socket是Gevent的socket,實際測試monkey沒用 # monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0',port)) s.listen(5) while True: cli,addr = s.accept() gevent.spawn(handle_request,cli) def handle_request(conn): try: while True: data = conn.recv(1024) print('recv:',data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as e: print(e) finally: conn.close() if __name__=='__main__': server(8001)
import socket HOST = 'localhost' # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"),encoding="utf8").strip() if len(msg) == 0:continue s.sendall(msg) data = s.recv(1024) print('Received', repr(data)) s.close()
十、協程本質原理
十一、epoll處理 I/O 請求原理
十二、select處理協程
1三、select、epool、pool
Python進程池和線程池(ThreadPoolExecutor&ProcessPoolExecutor)
1. Executor
2. Future
from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 建立一個最大可容納2個task的線程池 future1 = pool.submit(return_future_result, ("hello")) # 往線程池裏面加入一個task future2 = pool.submit(return_future_result, ("world")) # 往線程池裏面加入一個task print(future1.done()) # 判斷task1是否結束 time.sleep(3) print(future2.done()) # 判斷task2是否結束 print(future1.result()) # 查看task1返回的結果 print(future2.result()) # 查看task2返回的結果 # 運行結果: # False # 這個False與下面的True會等待3秒 # True # 後面三個輸出都是一塊兒打出來的 # hello # world
import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Start the load operations and mark each future with its URL # future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} # 這一句至關於下面for循環獲取的字典 future_to_url = {} for url in URLS: future_to_url[executor.submit(load_url,url,60)] = url # {'future對象':'url'} future對象做爲key,url做爲value for future in concurrent.futures.as_completed(future_to_url): # as_completed返回已經有返回結果的future對象 url = future_to_url[future] # 經過future對象獲取對應的url try: data = future.result() # 獲取future對象的返回結果 except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
from concurrent.futures import ThreadPoolExecutor # 建立線程池 executor = ThreadPoolExecutor(10) def test_function(num1,num2): return "%s + %s = %s"%(num1,num2,num1+num2) result_iterators = executor.map(test_function,[1,2,3],[5,6,7]) for result in result_iterators: print(result) # 1 + 5 = 6 # 2 + 6 = 8 # 3 + 7 = 10
import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: future_dic = {} for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data))) future_dic[url] = data # {'url':'執行結果'} url做爲key,執行結果做爲value # 'http://httpbin.org' page is 13011 bytes # 'http://example.com/' page is 1270 bytes # 'https://api.github.com/' page is 2039 bytes
使用線程池、進程池、協程向多個url併發獲取頁面數據比較
import requests url_list = [ 'https://www.baidu.com', 'http://dig.chouti.com/', ] for url in url_list: result = requests.get(url) print(result.text)
import requests from concurrent.futures import ProcessPoolExecutor def fetch_request(url): result = requests.get(url) print(result.text) url_list = [ 'https://www.baidu.com', 'https://www.google.com/', #google頁面會卡住,知道頁面超時後這個進程才結束 'http://dig.chouti.com/', #chouti頁面內容會直接返回,不會等待Google頁面的返回 ] if __name__ == '__main__': pool = ProcessPoolExecutor(10) # 建立線程池 for url in url_list: pool.submit(fetch_request,url) # 去線程池中獲取一個進程,進程去執行fetch_request方法 pool.shutdown(False)
import requests from concurrent.futures import ThreadPoolExecutor def fetch_request(url): result = requests.get(url) print(result.text) url_list = [ 'https://www.baidu.com', 'https://www.google.com/', #google頁面會卡住,知道頁面超時後這個進程才結束 'http://dig.chouti.com/', #chouti頁面內容會直接返回,不會等待Google頁面的返回 ] pool = ThreadPoolExecutor(10) # 建立一個線程池,最多開10個線程 for url in url_list: pool.submit(fetch_request,url) # 去線程池中獲取一個線程,線程去執行fetch_request方法 pool.shutdown(True) # 主線程本身關閉,讓子線程本身拿任務執行
from concurrent.futures import ThreadPoolExecutor import requests def fetch_async(url): response = requests.get(url) return response.text def callback(future): print(future.result()) url_list = ['http://www.github.com', 'http://www.bing.com'] pool = ThreadPoolExecutor(5) for url in url_list: v = pool.submit(fetch_async, url) v.add_done_callback(callback) pool.shutdown(wait=True)
import gevent import requests from gevent import monkey monkey.patch_all() # 這些請求誰先回來就先處理誰 def fetch_async(method, url, req_kwargs): response = requests.request(method=method, url=url, **req_kwargs) print(response.url, response.content) # ##### 發送請求 ##### gevent.joinall([ gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://www.google.com/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}), ])