python 併發編程

             線程                   

1. 應用程序/進程/線程的關係

     爲何要建立線程? 

    因爲線程是cpu工做的最小單元,建立線程能夠利用多核優點實現並行操做(Java/C#)。python

    注意:線程是爲了工做。react

    爲何要建立進程  

    進程和進程之間作數據隔離(Java/C#)。注意:進程是爲了提供環境讓線程工做。git

2.併發和並行 

  併發,僞,因爲執行速度特別塊,人感受不到停頓。程序員

  並行,真,建立10我的同時操做github

3.線程、進程  

  a.單進程、單線程的應用程序

    print("666")web

  b. 到底什麼是線程?什麼是進程?

    Python本身沒有這玩意,Python中調用的操做系統的線程和進程。安全

  c. 單進程、多線程的應用程序 

    一個應用程序(軟件),能夠有多個進程(默認只有一個),一個進程中能夠建立多個線程(默認一個)。多線程

import threading


def func(s,arg):
    print(arg)


t1 = threading.Thread(target=func,args=(5,))
t1.start()

t2 = threading.Thread(target=func,args=(88,))
t2.start()

print("666")

4. Python中線程和進程(GIL鎖)

  GIL鎖,全局解釋器鎖。用於限制一個進程中同一時刻只有一個線程被cpu調度。併發

  擴展:默認GIL鎖在執行100個cpu指令(過時時間)。app

  aPython中存在一個GIL鎖

     形成:多線程沒法利用多核優點

     解決:開多進程處理(浪費資源)

     總結:

      IO密集型:多線程

      計算密集型:多進程

5. 線程的建立 

  - Thread

# 多線程方式1(常見)
def func(arg):
    print(arg)


t1 = threading.Thread(target=func,args=(888,))
t1.start()

t2 = threading.Thread(target=func,args=(777,))
t2.start()
print("666")

 

  - MyThread 

# 多線程方式2 (面向對象)
class MyThread(threading.Thread):
    def run(self):
        print(self._args,self._kwargs)

t1 = MyThread(args=(666,555,444))
t1.start()
t2 = MyThread(args=(111,222,333))
t2.start()

print("hahaha")

  - join   

# 開發者能夠控制主線程等待子線程(最多等待時間)
def func(arg):
    time.sleep(10)
    print(arg)
t1 = threading.Thread(target=func,args=(5,))
t1.start()
# 無參數,讓主線程在這裏等着,等到子線程t1執行完畢,才能夠繼續往下走。
# 有參數,讓主線程在這裏最多等待n秒,不管是否執行完畢,會繼續往下走。
t1.join(1)
t2 = threading.Thread(target=func,args=(88,))
t2.start()
t1.join(1)
print("666")

  - setDeanon 

# 主線程再也不等,主線程終止則全部子線程終止
def func(arg):
    time.sleep(2)
    print(arg)


t1 = threading.Thread(target=func,args=(5,))
t1.setDaemon(True)
t1.start()

t2 = threading.Thread(target=func,args=(88,))
t2.setDaemon(True)
t2.start()

print("666")

  - setName 

  - threading.current_thread()

# 線程名稱
def func(arg):
    # 獲取當前執行該函數的線程的對象
    t = threading.current_thread()
    # 根據當前線程對象獲取當前線程名稱
    name = t.getName()
    print(t,name,arg)
    
    
t1 = threading.Thread(target=func,args=(5,))
t1.setName("qwer")        # 線程命名
t1.start()
t2
= threading.Thread(target=func,args=(88,)) t2.setName("asdf") # 線程命名 t2.start()
print("666")

6.鎖  

  -得到

    lock.acquire() # 加鎖,此區域的代碼同一時刻只能有一個線程執行

  -釋放

    lock.release() # 釋放鎖

 

  1. 鎖:Lock (1次放1個)

    線程安全,多線程操做時,內部會讓全部線程排隊處理。如:list/dict/Queue
    線程不安全 + 人 => 排隊處理。
    鎖一個代碼塊:

#Lock鎖     1次放1個
v = []
lock = threading.Lock()
def func(arg):
    lock.acquire()  # 加鎖
    v.append(arg)
    time.sleep(0.01)
    m= v[-1]
    print(arg,m)
    lock.release()   # 解鎖
for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

 

  2. 鎖:RLock (1次放1個)

#RLock鎖    1次放1個
v = []
lock = threading.RLock()
def func(arg):
    lock.acquire()
    lock.acquire()

    v.append(arg)
    time.sleep(0.01)
    m = v[-1]
    print(arg,m)

    lock.release()
    lock.release()
for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

 

  3. 鎖:BoundedSemaphore(1次放N個)信號量

#semaphore鎖    1次放n個
lock = threading.BoundedSemaphore(3)
def func(arg):
    lock.acquire()
    print(arg)
    time.sleep(1)
    lock.release()

for i in range(20):
    t = threading.Thread(target=func,args=(i,))
    t.start()

 

  4. 鎖:Condition(1次方法x個)

#Condition鎖 (1次放x個)
lock = threading.Condition()
def func(arg):
    print("線程進來了")
    lock.acquire()
    lock.wait()  # 加鎖
    print(arg)
    time.sleep(1)
    lock.release()

for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()
while True:
    inp = int(input(">>>"))
    lock.acquire()
    lock.notify(inp)
    lock.release()

 

  5. 鎖:Event(1次放全部)

#Event鎖  1次放全部
lock = threading.Event()
def func(arg):
    print("線程進來了")
    lock.wait()     # 加鎖,紅燈

    print(arg)

for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

input("1>>>")
lock.set()          # 綠燈

lock.clear()        # 再次變紅燈
for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

input("2>>>")
lock.set()

 


  總結:
    線程安全,列表和字典線程安全;
  爲何要加鎖?
    非線程安全
    控制一段代碼

  6. threading.local
    做用:
      內部自動爲每一個線程維護一個空間(字典),用於當前存取屬於本身的值。保證線程之間的數據隔離。

 

import time
import threading
DATA_DICT = {}
# # threading.local 內部自動爲每一個線程維護一個空間(字典),用於當前存取屬於本身的值。保證線程之間的數據隔離。
def func(arg):
    ident = threading.get_ident()
    DATA_DICT[ident] = arg
    time.sleep(1)
    print(DATA_DICT[ident],arg)

for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

 

  內部原理

INFO = {}
class Local(object):
    def __getattr__(self, item):
        ident = threading.get_ident()
        return INFO[ident][item]

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in INFO:
            INFO[ident][key] = value
        else:
            INFO[ident] = {key:value}

obj = Local()

def func(arg):
    obj.phone = arg   # 調用__setattr__方法
    time.sleep(2)
    print(obj.phone,arg)

for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

 

 

            進程                   

  1. 進程    

    - 進程間數據不共享

data_list = []
def task(arg):
    data_list.append(arg)
    print(data_list)


def run():
    for i in range(1,10):

        p = multiprocessing.Process(target=task,args=(i,5))
        # p = threading.Thread(target=task,args=(i,4))
        p.start()
        p.join()

if __name__ == '__main__':
    run()

 

   2.經常使用功能:                           

       - join- deamon

      - name

      - multiprocessing.current_process()

      - multiprocessing.current_process().ident/pid

 

   3. 類繼承方式建立進程    

      

class MyProcess(multiprocessing.Process):

    def run(self):
        print('當前進程',multiprocessing.current_process())


def run():
    p1 = MyProcess()
    p1.start()

    p2 = MyProcess()
    p2.start()

if __name__ == '__main__':
    run()

  

  4.進程鎖                                            

  

import time
import threading
import multiprocessing


lock = multiprocessing.RLock()

def task(arg):
    print('鬼子來了')
    lock.acquire()
    time.sleep(2)
    print(arg)
    lock.release()

if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task,args=(1,))
    p1.start()

    p2 = multiprocessing.Process(target=task, args=(2,))
    p2.start()

 

  5.進程池                          

 

  

import time
from concurrent.futures import ProcessPoolExecutor
def task(arg):
    time.sleep(2)
    print(arg)


if __name__ == '__main__':

    pool = ProcessPoolExecutor(5)
    for i in range(10):
        pool.submit(task,i)

 

進程和線程的區別?
  第一:
    進程是cpu資源分配的最小單元。
    線程是cpu計算的最小單元。
  第二:
    一個進程中能夠有多個線程。
  第三:
    對於Python來講他的進程和線程和其餘語言有差別,是有GIL鎖。
    GIL鎖保證一個進程中同一時刻只有一個線程被cpu調度。

  注意:IO密集型操做可使用多線程;計算密集型可使用多進程

         協程                  

  概念:

    進程,操做系統中存在

    線程,操做系統中存在

    協程,是由程序員創造出來的一個不是真實存在的東西

  協程:

    是微線程,對一個線程進程分片,使得線程在代碼塊之間進行來回切換執行,而不是在原來逐行執行。

  協程 + 遇到 IO 就切換 >>>gevent

from gevent import monkey
monkey.patch_all() # 之後代碼中遇到IO都會自動執行greenlet的switch進行切換
import requests
import gevent


def get_page1(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page2(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page3(url):
    ret = requests.get(url)
    print(url,ret.content)

gevent.joinall([
    gevent.spawn(get_page1, 'https://www.python.org/'), # 協程1
    gevent.spawn(get_page2, 'https://www.yahoo.com/'),  # 協程2
    gevent.spawn(get_page3, 'https://github.com/'),     # 協程3
])

  >>>twisted

 

from twisted.web.client import getPage, defer
from twisted.internet import reactor

def all_done(arg):
    reactor.stop()

def callback(contents):
    print(contents)

deferred_list = []
url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
    deferred = getPage(bytes(url, encoding='utf8'))
    deferred.addCallback(callback)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)

reactor.run()

 

         IO多路複用             

  IO多路複用做用:

    檢測多個socket是否已經發生變化(是否已經鏈接成功/是否已經獲取數據)(可讀/可寫)

    操做系統檢測socket是否發生變化,有三種模式:

      select:最多1024個socket;循環去檢測。

      poll:不限制監聽socket個數;循環去檢測(水平觸發)。

      epoll:不限制監聽socket個數;回調方式(邊緣觸發)。

  基於IO多路複用+socket實現併發請求(一個線程100個請求)

    IO多路複用

      socket非阻塞

# by luffycity.com
import socket
import select



client1 = socket.socket()
client1.setblocking(False) # 百度建立鏈接: 非阻塞

try:
    client1.connect(('www.baidu.com',80))
except BlockingIOError as e:
    pass


client2 = socket.socket()
client2.setblocking(False) # 百度建立鏈接: 非阻塞
try:
    client2.connect(('www.sogou.com',80))
except BlockingIOError as e:
    pass


client3 = socket.socket()
client3.setblocking(False) # 百度建立鏈接: 非阻塞
try:
    client3.connect(('www.oldboyedu.com',80))
except BlockingIOError as e:
    pass

socket_list = [client1,client2,client3]
conn_list = [client1,client2,client3]

while True:
    rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)
    # wlist中表示已經鏈接成功的socket對象
    for sk in wlist:
        if sk == client1:
            sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')
        elif sk==client2:
            sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n')
        else:
            sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.oldboyedu.com\r\n\r\n')
        conn_list.remove(sk)
    for sk in rlist:
        chunk_list = []
        while True:
            try:
                chunk = sk.recv(8096)
                if not chunk:
                    break
                chunk_list.append(chunk)
            except BlockingIOError as e:
                break
        body = b''.join(chunk_list)
        # print(body.decode('utf-8'))
        print('------------>',body)
        sk.close()
        socket_list.remove(sk)
    if not socket_list:
        break

 

  基於事件循環實現的異步非阻塞框架:

    非阻塞:不等待

    異步:執行完某我的物後自動調用我給他的函數。

    1.什麼是異步非阻塞?

      - 非阻塞,不等待。好比建立socket對某個地址進行connect、獲取接收數據recv時默認都會等待(鏈接成功或接收到數據),才執行後續操做。

        若是設置setblocking(False),以上兩個過程就再也不等待,可是會報BlockingIOError的錯誤,只要捕獲便可。

      - 異步,通知,執行完成以後自動執行回調函數或自動執行某些操做(通知)。好比作爬蟲中向某個地址baidu.com發送請求,當請求執行完成以後自執行回調函數。

    2.什麼是同步阻塞?

      - 阻塞:等

       - 同步:按照順序逐步執行

相關文章
相關標籤/搜索