Python之線程與進程

1.程序python

程序指的是指令的集合;程序不能單獨的運行,必須將程序裝載在內存中,系統給它分配資源才能夠運行。git

程序是進程動態運行的靜態描述文本程序員

2.進程github

進程指的是程序在數據集中一次動態運行的過程;算法

優勢:同時利用多個cpu,可以同時進行多個操做
缺點:耗費資源(從新開闢內存空間)安全

3.線程多線程

線程進程的最小執行單位,真正在CPU運行的是線程併發

優勢:共享內存,IO操做的時候,創造併發操做
缺點:搶佔資源app

4.進程與線程的關係異步

一個線程只能在一個進程裏面,一個進程能夠包含多個線程; 進程是資源管理單位(容器)  線程是最小執行單位

5.並行與併發

並行:指的是同時處理多個任務(多個線程被不一樣的CPU執行)

併發:指的是交替處理多個任務(多個線程被一個CPU執行)

6.同步與異步

同步:請求數據的時候,若是被請求方沒有返回數據,那麼程序就會一直等下去,不會執行下面的操做,直到接收到返回的數據。(單線程)

異步:不會等下去,而是直接執行下面的操做,若是有數據返回,那麼系統會通知進程去處理(多線程)

 

這些知識概念的性的知識,如今開始瞭解下線程和進程吧!

線程:threading模塊

import time
import threading

def f1():
    print(11)

def f2(a1,a2):
    time.sleep(10)
    print(a1,a2)
    f1()

t=threading.Thread(target=f2,args=(22,33))
t.start()

t=threading.Thread(target=f2,args=(44,55))
t.start()

t=threading.Thread(target=f2,args=(66,77))
t.start()

上面代碼建立了3個前臺線程,而後控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。

更多方法:

    • start            線程準備就緒,等待CPU調度
    • setName      爲線程設置名稱
    • getName      獲取線程名稱
    • setDaemon   設置爲後臺線程或前臺線程(默認爲False)
                         若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止(設置爲True)
                          若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止
    • join              逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義
    • run              線程被cpu調度後自動執行線程對象的run方法
import threading
import time
 
 
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num
 
    def run(self):#定義每一個線程要運行的函數
 
        print("running on number:%s" %self.num)
 
        time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()
自定義線程類

 

線程鎖(Lock、RLock)

因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,當多個線程同時修改同一條數據時可能會出現髒數據,因此,出現了線程鎖 - 同一時刻容許一個線程執行操做。

import threading
import time

gl_num = 0

def show(arg):
    global gl_num
    time.sleep(1)
    gl_num +=1
    print gl_num

for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()

print 'main thread stop'
未使用鎖
import threading
import time
   
gl_num = 0
   
lock = threading.RLock()
   
def Func():
    lock.acquire()
    global gl_num
    gl_num +=1
    time.sleep(1)
    print gl_num
    lock.release()
       
for i in range(10):
    t = threading.Thread(target=Func)
    t.start()

 

信號量(Semaphore)

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

import threading,time
 
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s" %n)
    semaphore.release()
 
if __name__ == '__main__':
 
    num= 0
    semaphore  = threading.BoundedSemaphore(5) #最多容許5個線程同時運行
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()

 

事件(event)

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

  • clear:將「Flag」設置爲False
  • set:將「Flag」設置爲True
import threading
 
def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" %n)
    con.release()
 
if __name__ == '__main__':
 
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
 
    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()

 

 
def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

Timer

定時器,指定n秒後執行某操做

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

 

 進程

from multiprocessing import Process
import threading
import time
  
def foo(i):
    print 'say hi',i
  
for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()

注意:因爲進程之間的數據須要各自持有一份,因此建立進程須要的很是大的開銷。

 

進程數據共享

進程各自持有一份數據,默認沒法共享數據

#!/usr/bin/env python
#coding:utf-8
 
from multiprocessing import Process
from multiprocessing import Manager
 
import time
 
li = []
 
def foo(i):
    li.append(i)
    print 'say hi',li
  
for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()
     
print 'ending',li
進程間默認沒法數據共享
#方法一,Array
from multiprocessing import Process,Array
temp = Array('i', [11,22,33,44])
 
def Foo(i):
    temp[i] = 100+i
    for item in temp:
        print i,'----->',item
 
for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
 
#方法二:manage.dict()共享數據
from multiprocessing import Process,Manager
 
manage = Manager()
dic = manage.dict()
 
def Foo(i):
    dic[i] = 100+i
    print dic.values()
 
for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
    p.join()
複製代碼

    'c': ctypes.c_char,  'u': ctypes.c_wchar,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double

複製代碼
類型對應表
from multiprocessing import Process, Queue

def f(i,q):
    print(i,q.get())

if __name__ == '__main__':
    q = Queue()

    q.put("h1")
    q.put("h2")
    q.put("h3")

    for i in range(10):
        p = Process(target=f, args=(i,q,))
        p.start()
View Code

當建立進程時(非使用時),共享數據會被拿到子進程中,當進程中執行完畢後,再賦值給原值。

from multiprocessing import Process, Array, RLock

def Foo(lock,temp,i):
    """
    將第0個數加100
    """
    lock.acquire()
    temp[0] = 100+i
    for item in temp:
        print i,'----->',item
    lock.release()

lock = RLock()
temp = Array('i', [11, 22, 33, 44])

for i in range(20):
    p = Process(target=Foo,args=(lock,temp,i,))
    p.start()
進程鎖實例

進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

進程池中有兩個方法:

  • apply
  • apply_async
from  multiprocessing import Process,Pool
import time
  
def Foo(i):
    time.sleep(2)
    return i+100
  
def Bar(arg):
    print arg
  
pool = Pool(5)
#print pool.apply(Foo,(1,))
#print pool.apply_async(func =Foo, args=(1,)).get()
  
for i in range(10):
    pool.apply_async(func=Foo, args=(i,),callback=Bar)
  
print 'end'
pool.close()
pool.join()#進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。
View Code

 

協程

線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。

協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。

協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程;

greenlet

from greenlet import greenlet
 
 
def test1():
    print 12
    gr2.switch()
    print 34
    gr2.switch()
 
 
def test2():
    print 56
    gr1.switch()
    print 78
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

 

gevent

import gevent
 
def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')
 
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')
 
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

遇到IO操做自動切換:

from gevent import monkey; monkey.patch_all()
import gevent
import urllib2

def f(url):
    print('GET: %s' % url)
    resp = urllib2.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])

 

queue模塊

Queue 就是隊列,它是線程安全的

 隊列的特性:先進先出

import queue

q = queue.Queue(maxsize=0)  #構造一個先進先出的隊列,maxsize指定隊列長度,爲0時,表示隊列長度無限。

q.join()  #等到隊列爲None的時候,再執行別的操做
q.qsize()  #返回隊列的大小(不可靠)
q.empty()  #當隊列爲空的時候,返回True 不然返回False(不可靠)
q.full()  #當隊列滿的時候,返回True,不然返回False(不可靠)


q.put(item, block=True, timeout=None)  #將item放入Queue尾部,item必須存在,能夠參數block默認爲True,表示當隊列滿時,會等待隊列給出可用位置
                                       #爲Flase時爲非阻塞,此時若是隊列已經滿,會引起queue.Full異常。能夠選參數timeout,表示會阻塞的時間,
                                       #若是隊列沒法給出放入item的位置,則引起queue.Full異常。
q.get(block=True, timeout=None)  #等  移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞,爲False時,不阻塞,
                                   #若此時隊列爲空,則引起 queue.Empty異常。可選參數timeout,表示會阻塞設置的時候,事後,若是隊列爲空,則引起Empty異常。
q.put_nowait(item)  #等效put(item,block=False)
q.get_nowait()  #不等   等效於 get(item,block=False)

 

生產者,消費者

生產者將數據依次存入隊列,消費者依次從隊列中取出數據。

實例

import queue
import threading

message = queue.Queue(10)

def producer(i):
    while True:
        message.put(i)

def consumer(i):
    while True:
        msg = message.get()
        print(msg)

for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
相關文章
相關標籤/搜索