day9-進程、線程

  1. 線程
  2. 進程
  3. 協程
  4. IO多路複用

1. 線程

線程是最小的工做單位,一個應用程序至少有一個進程,一個進程至少有一個線程。
應用場景:
IO密集型:使用線程
計算密集型:使用進程
GIL: 全局解釋器鎖,保證同一進程中只有一個線程同時被調度。python

線程的基本使用:git

def task(arg):
    time.sleep(arg)
    print(arg)

for i in range(5):
    t = threading.Thread(target=task,args=[i,])
    # t.setDaemon(True) # 主線程終止,不等待子線程
    # t.setDaemon(False)
    t.start()
    # t.join()  # 一直等
    # t.join(1) # 等待最大時間

鎖:
1.只有一我的使用鎖:github

import threading
import time
lock = threading.Lock()
v =10
def task(arg):
    time.sleep(1)
    lock.acquire()
    global v
    v -= 1
    print(v)
    lock.release()

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()

2.多人使用鎖:併發

lock = threading.BoundedSemaphore(3)
v =10
def task(arg):

    lock.acquire()
    time.sleep(1)
    global v
    v -= 1
    print(v)
    lock.release()

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()

3.事件鎖(全部人解脫鎖的限制)app

lock = threading.Event()

def task(arg):

    time.sleep(1)
    lock.wait()
    print(arg)

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()

while True:
    value = input(">>>")
    if value == '1':
        lock.set()

4.自定義解鎖個數(爲所欲爲鎖)socket

lock = threading.Condition()

def task(arg):
    time.sleep(1)
    lock.acquire()
    lock.wait()
    print("線程:%s" % arg)
    lock.release()

for i in range(10):
    t = threading.Thread(target=task,args=[i,])
    t.start()

while True:
    value = input(">>")
    lock.acquire()
    lock.notify(int(value))
    lock.release()

線程池:
模式一: 直接處理ui

def task(url):
    """
    任務執行兩個操做:下載;保存本地
    """
    # response中封裝了Http請求響應的全部數據
    # - response.url            請求的URL
    # - response.status_code    響應狀態碼
    # - response.text           響應內容(字符串格式)
    # - response.content        響應內容(字節格式)
    # 下載
    response = requests.get(url)

    # 下載內容保存至本地
    f = open('a.log','wb')
    f.write(response.content)
    f.close()

pool = ThreadPoolExecutor(2)
url_list = [
    'http://www.oldboyedu.com',
    'http://www.autohome.com.cn',
    'http://www.baidu.com',
]
for url in url_list:
    print('開始請求',url)
    # 去鏈接池中獲取連接
    pool.submit(task,url)

模式二:分步處理url

def save(future):
    """
    只作保存  # future中包含response
    """
    response = future.result()

    # 下載內容保存至本地
    f = open('a.log','wb')
    f.write(response.content)
    f.close()

def task(url):
    """
    只作下載 requests
    """
    # response中封裝了Http請求響應的全部數據
    # - response.url            請求的URL
    # - response.status_code    響應狀態碼
    # - response.text           響應內容(字符串格式)
    # - response.content        響應內容(字節格式)
    # 下載
    response = requests.get(url)
    return response

pool = ThreadPoolExecutor(2)
url_list = [
    'http://www.oldboyedu.com',
    'http://www.autohome.com.cn',
    'http://www.baidu.com',
]
for url in url_list:
    print('開始請求',url)
    # 去鏈接池中獲取連接
    # future中包含response
    future = pool.submit(task,url)
    # 下載成功後,自動調用save方法
    future.add_done_callback(save)

2. 進程

進程的基本使用:spa

from multiprocessing import Process
import time
def task(arg):
    time.sleep(arg)
    print(arg)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=task,args=(i,))
        p.daemon = True
        # p.daemon = False
        p.start()
        p.join(1)

    print('主進程最後...')

進程之間的數據共享:經過Array(‘類型’,長度) 或者Manager().list() / Manager().dict()線程

from multiprocessing import Process,Array,Manager
from threading import Thread
'''
# 驗證進程之間數據不共享
def task(num,li):
    li.append(num)
    print(li)

if __name__ == "__main__":
    v = []
    for i in range(10):
        p = Process(target=task,args=(i,v,))
        p.start()

'''

'''
# 進程間數據共享方式一:
def task(num,li):
    li[num] = 1
    print(list(li))

if __name__ == "__main__":
    v = Array('i',10)
    for i in range(10):
        p = Process(target=task,args=(i,v,))
        p.start()
'''

# 進程間數據共享方式二:經過socket
def task(num,li):
    li.append(num)
    print(li)

if __name__ == '__main__':
    v = Manager().list()
    # v = Manager().dict()
    for i in range(10):
        p = Process(target=task,args=(i,v,))
        p.start()
        p.join()

進程池的使用:

from concurrent.futures import ProcessPoolExecutor

def call(arg):
    data = arg.result()
    print(data)

def task(arg):
    return arg + 100

if __name__ == "__main__":
    pool = ProcessPoolExecutor(10)
    for i in range(100):
        obj = pool.submit(task,i)
        obj.add_done_callback(call)

3.協程

協程永遠是一個進程在執行,是對線程的分片處理。
greenlet: python自帶的協程模塊

# 協程
from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

# 根據協程二次開發:協程+IO
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    response = requests.get(url)
    print(response.url,response.status_code)

gevent.joinall([
        gevent.spawn(f, 'http://www.oldboyedu.com/'),
        gevent.spawn(f, 'http://www.baidu.com/'),
        gevent.spawn(f, 'http://github.com/'),
])

4. IO多路複用

IO多路複用,用於監聽多個socket對象是否變化(可讀,可寫,發送錯誤)

import socket
import select

# IO多路複用:8002,8001
#
############### 基於select實現服務端的「僞」併發 ###############
sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001,))
sk1.listen(5)

sk2 = socket.socket()
sk2.bind(('127.0.0.1',8002,))
sk2.listen(5)
inputs = [sk1,sk2,]
w_inputs = []
while True:
    # IO多路複用,同時監聽多個socket對象
    #    - select,內部進行循環操做(1024)  主動查看
    #    - poll, 內部進行循環操做         主動查看
    #    - epoll,                        被動告知
    r,w,e = select.select(inputs,w_inputs,inputs,0.05)
    # r = [sk2,]
    # r = [sk1,]
    # r = [sk1,sk2]
    # r = []
    # r = [conn,]
    # r = [sk1,Wconn]
    #######?
    for obj in r:
        if obj in [sk1,sk2]:
            # 新鏈接撿來了...
            print('新鏈接來了:',obj)
            conn,addr = obj.accept()
            inputs.append(conn)
        else:
            # 有鏈接用戶發送消息來了..
            print('有用戶發送數據了:',obj)
            try:
                data = obj.recv(1024)
            except Exception as ex:
                data = ""
            if data:
                w_inputs.append(obj)
                # obj.sendall(data)
            else:
                obj.close()
                inputs.remove(obj)
                w_inputs.remove(obj)

    for obj in w:
        obj.sendall(b'ok')
        w_inputs.remove(obj)

模擬socketserver:

import socket
import select
import threading

def process_request(conn):
    while True:
        v = conn.recv(1024)
        conn.sendall(b'1111')

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001,))
sk1.listen(5)
inputs=[sk1,]
while True:
    # IO多路複用,同時監聽多個socket對象
    #    - select,內部進行循環操做(1024)  主動查看
    #    - poll, 內部進行循環操做         主動查看
    #    - epoll,                        被動告知
    r,w,e = select.select(inputs,[],inputs,0.05)

    for obj in r:
        if obj in sk1:
            # conn客戶端的socket
            conn,addr = obj.accept()
            t = threading.Thread(target=process_request,args=(conn,))
            t.start()
相關文章
相關標籤/搜索