python之線程、進程

線程語法

class Thread(_Verbose):
    """A class that represents a thread of control.

    This class can be safely subclassed in a limited fashion.

    """
    __initialized = False
    # Need to store a reference to sys.exc_info for printing
    # out exceptions when a thread tries to use a global var. during interp.
    # shutdown and thus raises an exception about trying to perform some
    # operation on/with a NoneType
    __exc_info = _sys.exc_info
    # Keep sys.exc_clear too to clear the exception just before
    # allowing .join() to return.
    __exc_clear = _sys.exc_clear

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        """This constructor should always be called with keyword arguments. Arguments are:

        *group* should be None; reserved for future extension when a ThreadGroup
        class is implemented.

        *target* is the callable object to be invoked by the run()
        method. Defaults to None, meaning nothing is called.

        *name* is the thread name. By default, a unique name is constructed of
        the form "Thread-N" where N is a small decimal number.

        *args* is the argument tuple for the target invocation. Defaults to ().

        *kwargs* is a dictionary of keyword arguments for the target
        invocation. Defaults to {}.

        If a subclass overrides the constructor, it must make sure to invoke
        the base class constructor (Thread.__init__()) before doing anything
        else to the thread.

"""
        assert group is None, "group argument must be None for now"
        _Verbose.__init__(self, verbose)
        if kwargs is None:
            kwargs = {}
        self.__target = target
        self.__name = str(name or _newname())
        self.__args = args
        self.__kwargs = kwargs
        self.__daemonic = self._set_daemon()
        self.__ident = None
        self.__started = Event()
        self.__stopped = False
        self.__block = Condition(Lock())
        self.__initialized = True
        # sys.stderr is not stored in the class like
        # sys.exc_info since it can be changed between instances
        self.__stderr = _sys.stderr
……

 

線程

線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務python

一、直接調用

 

二、繼承調用:先繼承原類,再重寫其中的run方法

 

 

三、線程並行執行:join阻塞,直到此進程執行完畢再往下繼續執行

 

四、守護線程

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__Author__ = 'KongZhaGen'
import threading
import time

# 定義用於線程執行的函數
def sayhi(num):
    print "runing on number :%s"%num
    time.sleep(3)
    print 'done',num

# main函數同時執行5個線程
def main():
    for i in range(5):
        t = threading.Thread(target=sayhi,args=(i,))
        t.start()

# 定義一個線程m,執行main函數
m = threading.Thread(target=main,args=())
# m線程定義爲守護線程
m.setDaemon(True)
m.start()
# 守護線程執行結束,其中的全部子線程所有被迫結束運行,接着繼續執行其後的代碼
m.join(timeout=2)
print "---------main thread done-----"

 

五、線程鎖

  無線程鎖的狀況

一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?編程

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'

import time
import threading

def addNum():
    global num
    print '---getNum--',num
    time.sleep(1)
    num -= 1

num = 100
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)  # 同時開啓100個線程
    t.start()
    thread_list.append(t)

for t in thread_list:
    t.join()

print "final num",num

  其結果是:一會爲0,一會爲其它值多線程

  加線程鎖後...

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'

import time
import threading

def addNum():
    global num
    print '---getNum--',num
    time.sleep(1)
    lock.acquire()  # 數據修改前加鎖
    num -= 1
    lock.release()  # 數據修改後釋放鎖

num = 100
thread_list = []
lock = threading.Lock()  # 生成全局鎖

for i in range(100):
    t = threading.Thread(target=addNum)  # 同時開啓100個線程
    t.start()
    thread_list.append(t)

for t in thread_list:
    t.join()

print "final num",num

  其結果一直爲0,說明鎖起了做用。併發

 

Semaphore(信號量)app

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 dom

 

 

定時器async

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'
import threading

def hello():
    print "Hello Word.."

t = threading.Timer(20,hello)  # 20秒鐘以後執行hello函數
t.start()

  

 

隊列Queue

Queue.qsize() 返回隊列的大小 
Queue.empty() 若是隊列爲空,返回True,反之False 
Queue.full() 若是隊列滿了,返回True,反之False
Queue.full 與 maxsize 大小對應 
Queue.get([block[, timeout]])獲取隊列,timeout等待時間 
Queue.get_nowait() 至關Queue.get(False)
非阻塞 Queue.put(item) 寫入隊列,timeout等待時間 
Queue.put_nowait(item) 至關Queue.put(item, False)
Queue.task_done() 在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號
Queue.join() block直到queue被消費完畢ide

先進先出 :Queue.Queue 

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'
import Queue

q = Queue.Queue(maxsize=3)
q.put(1)
q.put(3)
q.put(4)
# q.put(7)  # 超出三個會阻塞

print q.get()  # 1
print q.get()  # 3
print q.get()  # 4

# 先進先出
q.put(7)
print q.get()

  

後進先出:Queue.LifoQueue

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'
import Queue

q = Queue.LifoQueue(maxsize=3)
q.put(1)
q.put(7)
q.put(4)

print q.get()  # 4
print q.get()  # 7
print q.get()  # 1
# 結果:後進先出

  

設置隊列優先級:Queue.PriorityQueue

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'
import Queue

q = Queue.PriorityQueue(maxsize=4)
print q.empty()  # 空隊列
print q.qsize()  # 隊列個數0
q.put((1,5))
q.put((5,1))
print q.qsize()  # 隊列個數2
print q.full()  # 隊列未滿
q.put((3,6))
print q.full()  # 隊列未滿
q.put((2,6))
print q.full()  # 隊列已滿
# q.put_nowait(10)  # 隊列滿後再填入不阻塞,直接報錯

print q.get()  # 1
print q.get()  # 2
print q.get()  # 3
print q.get()  # 5
# 結果:優先小的先出
print q.get_nowait()   # 隊列爲空後再取出不阻塞,直接報錯

  

生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。函數

爲何要使用生產者和消費者模式ui

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

  最基本的例子(一個生產者,一個消費者)

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'
import Queue
import threading
import time

# 生產者
def producer():
    for i in range(5):
        q.put(i)  # 往隊列放15個麪包
    print "等待取走麪包....\n"
    q.join()  # 阻塞直到隊列消費完畢
    print "麪包被所有取走...\n"

# 消費者
def consumer(n):
    # 隊列不爲空則一直消費
    while q.qsize() > 0:
        print "%s 取走麪包 %s"%(n, q.get())
        q.task_done()  # 告訴queue完成了一次消費,完成消費次數等於qsize,隊列消費完畢,join取消阻塞
        time.sleep(1)

q = Queue.Queue()

p = threading.Thread(target=producer,)
p.start()

c = threading.Thread(target=consumer, args=(['小鬼']))
c.start()

  結果:

等待取走麪包....

小鬼 取走麪包 0
小鬼 取走麪包 1
小鬼 取走麪包 2
小鬼 取走麪包 3
小鬼 取走麪包 4
麪包被所有取走...

  多個生產者與消費者

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'
import Queue
import threading
import time,random

q = Queue.Queue()
# 生產者
def producer(n):
    count = 0
    # 少於5個麪包就生產
    while count < 5:
        time.sleep(random.randrange(3))
        q.put(count)
        print "%s product 麪包"%n
        count += 1
    print "%s 已生產麪包%s" %(n, q.qsize())

def consumer(n):
    while True:
        time.sleep(random.randrange(4))
        # 有面包就消費
        if not q.empty():
            print "\033[31;1m%s 吃了 麪包 %s\033[0m"%(n, q.get())
        else:
            # 沒麪包就退出
            print "沒有面包了..."
            break

p1 = threading.Thread(target=producer, args=(['廚師1']))
p2 = threading.Thread(target=producer, args=(['廚師2']))
p1.start()
p2.start()

c1 = threading.Thread(target=consumer, args=(['客戶1']))
c2 = threading.Thread(target=consumer, args=(['客戶2']))
c1.start()
c2.start()

  

 進程間通迅

queue

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'

from multiprocessing import Process,Queue
import time
q = Queue()
# 往隊列傳入三個值
def pro_put(q):
    q.put(1)
    q.put(2)
    q.put(3)

if __name__ == '__main__':
    # 開始一個新進程
    p = Process(target=pro_put, args=(q,))
    p.start()
    # 從隊列獲取原進程的數據
    print q.get()
    print q.get()
    print q.get()

  

pipe

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'

from multiprocessing import Process,Pipe

def f(conn):
    # 往管道發送
    conn.send(1)
    conn.send(2)
    conn.send(3)
    conn.close()

if __name__ == '__main__':
    # Pipe 分兩端,任何一端send, 任何一端recv
    con_left, con_right = Pipe()
    p = Process(target=f, args=(con_right,))
    p.start()
    # 從管道接收原進程生成的數據
    print con_left.recv()
    print con_left.recv()
    print con_left.recv()
    p.join()

  

進程同步

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'

from multiprocessing import Process, Lock
import time

def f(l, i):
    # 當多個進程須要訪問共享資源的時候,Lock能夠用來避免訪問的衝突
    l.acquire() # Lock實現進程同步進行
    try:
        print "hello, ",i
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

  

進程池

非阻塞進程池

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'
from multiprocessing import Pool
import time,os
print 'parent:',os.getpid()

def f(msg):
    print 'child:',os.getpid()
    print "start:",msg
    time.sleep(2)
    print "end  :",msg
    print "============"

if __name__ == '__main__':
    # 進程池最大數量2
    pool = Pool(2)
    for i in range(5):
        msg = "hello %s" % i
        # 同時啓動5個進程
        pool.apply_async(func=f, args=(msg,))

    print "---------------------"
    pool.close()
    pool.join()

  

  結果:

 

 

阻塞進程池

#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = 'kongZhaGen'
from multiprocessing import Pool
import time

def f(msg):
    print "start:",msg
    time.sleep(2)
    print "end  :",msg
    print "============"

if __name__ == '__main__':
    # 進程池最大數量2
    pool = Pool(2)
    for i in range(5):
        msg = "hello %s" % i
        # 同時啓動5個進程
        pool.apply(func=f, args=(msg,))

    print "---------------------"
    pool.close()
    pool.join()

  結果:

 

相關文章
相關標籤/搜索