衆所周知, 計算機是由軟件和硬件組成. 硬件中的CPU主要用於解釋指令和處理數據, 軟件中的操做系統負責資源的管理和分配以及任務的調度. 而程序則是運行在操做系統上具備特定功能的軟件. 每當程序執行完成特定功能的時候, 爲了保證程序的獨立運行不受影響每每須要進程控制塊(專門管理和控制程序執行的數據結構)的做用.
說了以上這麼多基本理論知識, 接下來咱們談談進程. 進程本質上就是一個程序在一個數據集上的動態執行過程. 進程一般由程序, 數據集和進程控制塊三部分組成.python
而線程在如今的多處理器電子設備中是最小的處理單元. 一個進程能夠有多個線程, 這些線程之間彼此共享該進程的資源. 可是進程之間默認是相互獨立的, 若數據共享則須要另外特定的操做. 這裏作一個比喻. 如今有一個大型工廠, 該工廠負責生產汽車. 同時這個工廠又有多個車間, 每一個車間負責不一樣的功能, 有的生產輪胎, 有的生產方向盤等等. 每一個車間又有多個車間工人, 這些工人相互合做, 彼此共享資源來共同生產輪胎方向盤等等. 這裏的工廠就至關於一個應用程序, 而每一個車間至關於一個進程, 每一個車間工人就至關於線程.數據結構
普通多線程建立使用多線程
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def showThreading(arg):
time.sleep(1)
print("current thread is : ",arg)
if __name__ == '__main__':
for tmp in range(10):
t=threading.Thread(target=showThreading,args=(tmp,))
t.start()
print('main thread has been stopped')複製代碼
執行結果以下:併發
t=threading.Thread(target=showThreading,args=(tmp,))
這一句建立一個線程, target=
代表線程所執行的函數, args=
代表函數的參數t.start()
線程準備完畢等待cpu調度處理, 當線程被cpu調度後會自動執行線程對象的run方法(自定義線程類時候可用)t.setName(string)
爲當前線程設置名字t.getName()
獲取當前線程的名字t.join()
該方法表示主線程必須在此位置等待子線程執行完畢後才能繼續執行主線程後面的代碼, 當且僅當setDaemon爲false時有效自定義線程類app
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
class MyThread(threading.Thread):
def __init__(self,target,arg=()):
super(MyThread, self).__init__()
self.target=target
self.arg=arg
def run(self):
self.target(self.arg)
def test(arg):
time.sleep(1)
print("current thread is : ",arg)
if __name__ == '__main__':
for tmp in range(10):
mt=MyThread(target=test,arg=(tmp,))
mt.start()
print("main thread has been stopped")複製代碼
class MyThread(threading.Thread):
自定義線程類須要繼承threading.Thread
類super(MyThread, self).__init__()
自定義線程類初始化時候需將當前對象傳遞給父類並執行父類的初始化方法run(self)
線程啓動以後會執行該方法因爲CPU對線程是隨機調度執行, 而且每每會在當前線程執行一小段代碼以後便直接換爲其餘線程執行, 如此往復循環直到全部的線程執行結束. 所以在一個共享資源和數據的進程中, 多個線程對同一資源操或者同一數據操做容易形成資源搶奪和產生髒數據. 此時咱們引入鎖的概念, 對這種資源和數據進行加鎖, 直到當前線程操做完畢再釋放鎖讓其餘線程操做.async
咱們先看看不加鎖時候對數據的操做狀況:函數
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time
NUM = 0
def add():
global NUM
NUM += 1
name=t.getName()
time.sleep(1)
print('current thread is: ',name ,' current NUM is: ',NUM )
if __name__ == '__main__':
for tmp in range(10):
t=threading.Thread(target=add)
t.start()
print("main thread has been stopped !")複製代碼
time.sleep(1)
這一句讓線程阻塞的位置會影響線程的執行順序咱們再來看看加鎖的狀況:fetch
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time
NUM = 0
def add():
global NUM
lock.acquire()
NUM += 1
name=t.getName()
lock.release()
time.sleep(1)
print('current thread is: ',name ,' current NUM is: ',NUM )
if __name__ == '__main__':
lock=threading.Lock()
for tmp in range(10):
t=threading.Thread(target=add)
t.start()
print("main thread has been stopped !")複製代碼
lock=threading.Lock()
實例化鎖對象lock.acquire()
從該句開始加鎖lock.release()
釋放鎖python中在threading模塊中定義了一下幾種鎖:ui
Semaphore 信號量鎖使用:this
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time
def test():
semaphore.acquire()
print("current thread is: ", t.getName())
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5)
for tmp in range(20):
t = threading.Thread(target=test)
t.start()複製代碼
semaphore = threading.BoundedSemaphore(5)
得到信號量鎖對象semaphore.acquire()
加鎖semaphore.release()
釋放鎖event 事件機制鎖使用
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
def test():
print(t.getName())
event.wait()
if __name__ == '__main__':
event=threading.Event()
for tmp in range(10):
t=threading.Thread(target=test)
t.start()
print("zhe middle of main thread")
if input("input your flag: ")=='1':
event.set()
print("main thread has been stopped")複製代碼
event=threading.Event()
獲取事件鎖對象event.wait()
檢測標誌位flag, 爲true則放行該線程, 爲false則阻塞該線程event.set()
將標誌位flag設置爲trueevent.clear()
將標誌位flag設置爲falsecondition 條件鎖使用
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
def condition():
inp = input("input your condition: ")
print(inp)
if inp == "yes":
return True
return False
def test():
cd.acquire()
# cd.wait(1)
cd.wait_for(condition)
# cd.notify(2)
print(t.getName())
cd.release()
if __name__ == '__main__':
cd = threading.Condition()
for tmp in range(10):
t = threading.Thread(target=test)
t.start()
t.join()
print("\nmain thread has been stopped")複製代碼
yes
則放行一個線程cd = threading.Condition()
獲取條件鎖對象cd.wait(1)
設置線程最多等待時間cd.wait_for(condition)
設置放行的條件, 該方法接受condition
函數的返回值在python的queue模塊中內置了一種特殊的數據結構, 即隊列. 這裏咱們能夠把隊列簡單的看做是規定順序執行的一組線程.
Queue 先進先出隊列的使用:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
q=queue.Queue(10)
for tmp in range(10):
q.put(tmp)
for tmp in range(10):
print(q.get(),q.qsize())複製代碼
q=queue.Queue(10)
生成隊列對象, 設置隊列最多存放的數據爲10個q.put(tmp)
往隊列中存入數據q.get()
獲取隊列數據q.qsize()
獲取當前隊列的大小利用Queue實現生產者消費者模型
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time, threading, queue
def productor(i):
while True:
q.put(i)
time.sleep(1)
def consumer(i):
while True:
print("consumer-%s ate %s" % (i, q.get()))
if __name__ == '__main__':
q = queue.Queue(10)
for tmp in range(8):
t = threading.Thread(target=productor, args=(tmp,))
t.start()
for tmp in range(5):
t = threading.Thread(target=consumer, args=(tmp,))
t.start()
print("main has been stopped")複製代碼
不斷的建立和銷燬線程是很是消耗CPU的, 所以咱們會採起維護一個線程池來實現多線程. 可是python中並未提供線程池的模塊, 這裏就須要咱們本身來寫.
簡單版本的線程池實現:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue, threading
class ThreadPool(object):
def __init__(self, max_num=5):
self.queue = queue.Queue(max_num)
for i in range(max_num):
self.queue.put(threading.Thread)
def get_thread(self):
return self.queue.get()
def add_thread(self):
self.queue.put(threading.Thread)
def test(pool, i):
tm = __import__("time")
tm.sleep(1)
print("current thread is: ", i)
pool.add_thread()
if __name__ == '__main__':
p = ThreadPool()
for tmp in range(20):
thread = p.get_thread()
t = thread(target=test, args=(p, tmp))
t.start()
print("main thread has been stopped")複製代碼
threading.Thread
類. 每當須要線程時候, 直接獲取該類並建立線程, 使用完畢則返回線程池中健壯版本的線程池:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue, threading, contextlib
stopFlag = object()
class ThreadPool(object):
def __init__(self, max_num):
self.queue = queue.Queue()
self.max_num = max_num
self.terminal = False
self.queue_real_list_list = []
self.queue_free_list = []
def run(self, target, args, callback):
task_tuple = (target, args, callback)
self.queue.put(task_tuple)
if len(self.queue_free_list) == 0 and len(self.queue_real_list_list) < self.max_num:
self.add_thread()
def add_thread(self):
t = threading.Thread(target=self.fetch)
t.start()
def fetch(self):
current_thread = threading.currentThread
self.queue_real_list_list.append(current_thread)
task_tuple = self.queue.get()
while task_tuple != stopFlag:
func, args, callback = task_tuple
result_status = True
try:
result = func(*args)
except Exception as e:
result_status = False
result = e
if callback is not None:
try:
callback(result_status, result)
except Exception as e:
pass
if not self.terminal:
# self.queue_free_list.append(current_thread)
# task_tuple = self.queue.get()
# self.queue_free_list.remove(current_thread)
with ThreadPool.queue_operate(self.queue_free_list,current_thread):
task_tuple = self.queue.get()
else:
task_tuple = stopFlag
else:
self.queue_real_list_list.remove(current_thread)
def close(self):
num = len(self.queue_real_list_list)
while num:
self.queue.put(stopFlag)
num -= 1
def terminate(self):
self.terminal = True
max_num = len(self.queue_real_list_list)
while max_num:
self.queue.put(stopFlag)
max_num -= 1
def terminate_clean_queue(self):
self.terminal = True
while self.queue_real_list_list:
self.queue.put(stopFlag)
self.queue.empty()
@staticmethod
@contextlib.contextmanager
def queue_operate(ls, ct):
ls.append(ct)
try:
yield
finally:
ls.remove(ct)
def callback_func(result_status, result):
print(result_status, result)
def test(i):
tm = __import__("time")
tm.sleep(1)
return "current thread is: {}".format(i)
if __name__ == '__main__':
pool = ThreadPool(5)
for tmp in range(20):
pool.run(target=test, args=(tmp,), callback=callback_func)
# pool.close()
pool.terminate()複製代碼
pool = ThreadPool(5)
生成線程池對象, 指定線程池最多線程數爲5
__init__(self, max_num)
被執行self.queue = queue.Queue()
任務隊列self.max_num = max_num
最多線程數self.terminal = False
是否當即終止標誌self.queue_real_list_list = []
當前已經建立的線程對象列表self.queue_free_list = []
空閒的線程對象列表pool.run(target=test, args=(tmp,), callback=callback_func)
運行線程池對象, target=test
線程運行的功能函數, args=(tmp,)
功能函數的參數, callback=callback_func
功能函數執行完畢以後調用的函數(即 回調函數)
task_tuple = (target, args, callback)
將線程要執行的功能函數和回調函數打包成任務元組self.queue.put(task_tuple)
將任務元組加入到隊列中if len(self.queue_free_list) == 0 and len(self.queue_real_list_list) < self.max_num:
self.add_thread()複製代碼
判斷空閒列表是否爲空且真實的線程列表數目是否小於最大線程數目, 如果則執行add_thread()
函數添加線程add_thread(self)
添加並啓動線程, 並將線程要執行的功能交給fetch(self)
函數current_thread = threading.currentThread
獲取當前線程, self.queue_real_list_list.append(current_thread)
將當前線程加入到真實線程列表中task_tuple = self.queue.get()
從任務隊列中獲取任務元組while task_tuple != stopFlag
該循環語句內容表示任務元組對象不是stopFlag
結束標誌的時候執行其具體的功能和回調函數if not self.terminal
判斷是否當即終止當前線程(等待當前線程執行完任何當即結束)pool.close()
根據當前真實線程列表添加對應的stopFlag
終止符pool.terminate()
此爲不清空任務隊列的當即終止線程方法terminate_clean_queue(self)
清空任務隊列的當即終止線程方法在python中由multiprocess模塊提供的Process類來實現進程相關功能(process與Process是不一樣的)
Process的使用:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
def test(pro):
print("current process is: ",pro)
if __name__ == '__main__':
for tmp in range(10):
p = Process(target=test,args=(tmp,))
p.start()複製代碼
args=(tmp,)
這裏傳入的是元組, 不加逗號則表示整型數據p = Process(target=test,args=(tmp,))
建立進程對象普通的數據共享在進程中的實現:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
ls = []
def test(i):
ls.append(i)
print("current process is: ", i, " and list is: ", ls)
if __name__ == '__main__':
for tmp in range(10):
p = Process(target=test, args=(tmp,))
p.start()
p.join()
print("The final list is: ", ls)複製代碼
用Array共享數據
# -*- coding:utf-8 -*-
from multiprocessing import Process, Array
def test(i, ay):
ay[i] += 10
print('current process is: ', i)
for tmp in ay:
print(tmp)
if __name__ == '__main__':
ay = Array('i', [1, 2, 3, 4, 5, 6])
for tmp in range(5):
p = Process(target=test, args=(tmp, ay))
p.start()複製代碼
ay = Array('i', [1, 2, 3, 4, 5, 6])
建立整型的Array共享數據對象p = Process(target=test, args=(tmp, ay))
進程直接不能像線程之間共享數據, 故須要傳入ay
對象使用Manager共享數據:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Manager, Process
def test(i, dic):
dic[i] = i + 10
print('current process is: ', i)
for k, v in dic.items():
print(k, v)
if __name__ == '__main__':
mg = Manager()
dic = mg.dict()
for tmp in range(10):
p = Process(target=test, args=(tmp, dic))
p.start()
p.join()複製代碼
mg = Manager()
初始化Manager
對象dic = mg.dict()
生成共享字典數據類型p.join()
這裏須要保證每一個進程執行完畢以後才能進行接下來的操做, 不然會報錯使用queue共享數據:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process,queues
import multiprocessing
def test(i,qu):
qu.put(i+10)
print("current process is: ",i," and zhe size of zhe queue is: ",qu.qsize())
if __name__ == '__main__':
qu=queues.Queue(10,ctx=multiprocessing)
for tmp in range(10):
p=Process(target=test,args=(tmp,qu))
p.start()複製代碼
在進程中共享數據也會出現髒數據的問題, 好比用multiprocessing
模塊中的queue
或者Queue
共享數據時候就會出現髒數據. 此時咱們每每須要設置進程鎖. 進程鎖的使用和線程鎖使用徹底相同(Rlock
, Lock
, Semaphore
, Event
, Condition
, 這些鎖均在multiprocess中
)
在實際開發中咱們並不會採起直接建立多進程來實現某些功能, 而是主動維護一個指定進程數的進程池來實現多進程. 由於不斷的建立進程和銷燬進程對CPU的開銷太大. python中內置了了進程池Pool
模塊
進程池Pool的使用:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time
def test(arg):
time.sleep(1)
return arg + 10
def call_end(arg):
print(arg)
if __name__ == '__main__':
p = Pool(5)
for tmp in range(10):
p.apply_async(func=test, args=(tmp,), callback=call_end)
p.close()
# p.terminate()
p.join()複製代碼
p.apply()
從進程池中取出一個進程執行其對應的功能p.apply_async(func=test, args=(tmp,), callback=call_end)
與p.apply()
做用相同, p.apply_async()
能夠調用回調函數. callback=call_end
代表call_end
是回調函數, 當test
執行完畢以後會將其返回值做爲參數傳遞給該回調函數p.close()
等到全部進程結束後關閉進程池p.join()
代表主進程必須等待全部子進程執行結束後方能結束(須要放在p.close()
或者p.terminate()
後面)協成是python中特有的一個概念, 它是人爲的利用單線程在操做某任務等待空閒的時間內, 經過yield
來保存當時狀態, 進而用該線程作其餘的操做. 由此實現的併發操做, 本質上跟IO多路複用相似.
基礎版本協成的使用:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import greenlet
def f1():
print('1111')
gr2.switch()
print('2222')
gr2.switch()
def f2():
print('3333')
gr1.switch()
print('4444')
if __name__ == '__main__':
gr1 = greenlet.greenlet(f1)
gr2 = greenlet.greenlet(f2)
gr1.switch()複製代碼
gr1 = greenlet.greenlet(f1)
建立f1
函數的協成對象gr1.switch()
由當前線程轉到到執行f1
函數封裝後的協成模塊使用:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import gevent
def f1():
print('this is f1 !!!')
gevent.sleep(0)
print('f1 after sleep')
def f2():
print("this is f2 !!!")
gevent.sleep(0)
print('f2 after sleep')
if __name__ == '__main__':
gevent.joinall([
gevent.spawn(f1),
gevent.spawn(f2),
])複製代碼
gevent.joinall([
gevent.spawn(f1),
gevent.spawn(f2),
])複製代碼
f1
和f2
執行完成再結束當前線程, 相似線程中的join()
方法gevent.sleep(0)
設置等待時間用協成訪問網頁簡單例子:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from gevent import monkey
monkey.patch_all()
import gevent, requests
def fetch(url):
print('current url %s' % url)
rp = requests.get(url)
data = rp.text
print(url, len(data))
if __name__ == '__main__':
gevent.joinall([
gevent.spawn(fetch, 'https://www.baidu.com'),
gevent.spawn(fetch, 'https://www.sogou.com/'),
gevent.spawn(fetch, 'http://www.jianshu.com'),
])複製代碼
print('current url %s' % url)
以後, 當前線程會處於等待請求狀態, 此時該線程會發送第二個url, 依次類推. 直到最後請求數據獲取後, 才返回到第一次執行的函數中執行後續操做