[Python] 10 - Concurrent: asyncio

Ref: HOWTO Fetch Internet Resources Using The urllib Packagehtml

Ref: Python High Performance - Second Edition【基於python3】html5

Ref: http://online.fliphtml5.com/odjuw/kcqs/#p=8【在線電子書】python

Ref: 廖雪峯的異步IO【仍是這個比較好一點】git

Ref: Efficient web-scraping with Python’s asynchronous programming【參考】github

Ref: A Web Crawler With asyncio Coroutines【參考】web

 

 

一些概念

並行:parallel 編程

併發:concurrentapi

協程:Coroutines服務器

一種比線程更加輕量級的存在。正如一個進程能夠擁有多個線程同樣,一個線程也能夠擁有多個協程。併發

協程不是被操做系統內核所管理,而徹底是由程序所控制(也就是在用戶態執行)。

這樣帶來的好處就是性能獲得了很大的提高,不會像線程切換那樣消耗資源。

 

Linux異步原理

參考一:boost coroutine with multi core

參考二:poll 和 select

poll 和 select 的實現基本上是一致的,只是傳遞參數有所不一樣,他們的基本流程以下:

1. 複製用戶數據到內核空間

2. 估計超時時間

3. 遍歷每一個文件並調用f_op->poll 取得文件當前就緒狀態, 若是前面遍歷的文件都沒有就緒,向文件插入wait_queue節點

4. 遍歷完成後檢查狀態:

        a). 若是已經有就緒的文件轉到5;

        b). 若是有信號產生,重啓poll或select(轉到 1或3);

        c). 不然掛起進程等待超時或喚醒,超時或被喚醒後再次遍歷全部文件取得每一個文件的就緒狀態

5. 將全部文件的就緒狀態複製到用戶空間

6. 清理申請的資源

 

 

 

 

寫在開始


requests.get 串行策略

import requests
import string
import random

# 生成url def generate_urls(base_url, num_urls):
    """
    We add random characters to the end of the URL to break any caching
    mechanisms in the requests library or the server
    """
    for i in range(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))

# 執行url def run_experiment(base_url, num_iter=500):
    response_size = 0
    for url in generate_urls(base_url, num_iter):
        print(url)
        response = requests.get(url)
        response_size += len(response.text)
    return response_size

if __name__ == "__main__": import time delay = 100 num_iter = 50 base_url = "http://www.baidu.com/add?name=serial&delay={}&".format(delay) start = time.time() result = run_experiment(base_url, num_iter) end = time.time() print("Result: {}, Time: {}".format(result, end - start))

 

 

Gevent 方案

【暫時放棄該方案,太複雜且代碼不可用】

如下是有變化部分的代碼:

from gevent import monkey
monkey.patch_socket()
#---------------------------------- import gevent
from gevent.coros import Semaphore
import urllib2
from contextlib import closing
import string import random
def download(url, semaphore): with semaphore, closing(urllib2.urlopen(url)) as data: return data.read() def chunked_requests(urls, chunk_size=100): semaphore = Semaphore(chunk_size)
requests
= [gevent.spawn(download, u, semaphore) for u in urls]
for response in gevent.iwait(requests): yield response def run_experiment(base_url, num_iter=500): urls = generate_urls(base_url, num_iter)
response_futures
= chunked_requests(urls, 100) response_size = sum(len(r.value) for r in response_futures)
return response_size

 

gevent.spawn()

Create a new Greenlet object and schedule it to run function(*args, **kwargs)

greenlet的源代碼,代碼很少,就2000行C語言的代碼,其中有一部分棧寄存器的修改的代碼是由彙編實現的。

一句話來講明greenlet的實現原理:經過棧的複製切換來實現不一樣協程之間的切換。

 

contextlib 的 closing

對於不支持使用 "with"語句 的 "相似文件」 的對象,使用 contextlib.closing():

import contextlib.closing
with closing(urllib.urlopen(
"http://www.python.org/")) as front_page: for line in front_page: print line

 

 

 

 

異步IO


1、簡單的模型

yield是有返回值的。

def consumer():
    r = ''
    while True:
        n = yield r
if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)  # <-- 啓動生成器
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

#-------------------------------------------------- c
= consumer() produce(c)    # 給消費者c喂消息

 

 

2、asyncio 的由來

傳統方式

Ref: https://www.liaoxuefeng.com/wiki/1016959663602400/1017970488768640

(1) 從asyncio模塊中直接獲取一個EventLoop的引用,

(2) 而後把須要執行的協程扔到EventLoop中執行,就實現了異步IO。

import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)     # 當作是一個耗時的io操做 print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()               # (1) 獲取一個EventLoop引用
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))  # (2) 將攜程扔到EventLoop中去執行
loop.close()

 

異步wget網頁

writer.drain():這是一個與底層IO輸入緩衝區交互的流量控制方法。當緩衝區達到上限時,drain()阻塞,待到緩衝區回落到下限時,寫操做能夠被恢復。當不須要等待時,drain()會當即返回。

#%%
import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)

# (1) 首先,得到socket雙向管道 connect
= asyncio.open_connection(host, 80) reader, writer = yield from connect

# (2) 發送request要網頁內容 header
= 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain()

# (3) 得到網頁內容
while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close()
loop
= asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()

 

 

總結下來就是主要作了兩件事:

(1) @asyncio.coroutine

(2) yield from:不但願堵塞的地方

 

換爲 async, await

換個寫法,看上去幹淨一些。

import threading
import asyncio

async def hello():
    print('Hello world! (%s)' % threading.currentThread())
    await asyncio.sleep(1)     # 當作是一個耗時的io操做
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()               # (1) 獲取一個EventLoop引用
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))  # (2) 將協程扔到EventLoop中去執行
loop.close()

 

 

3、aiohttp 助力

如今是把asyncio放在了服務器端!

asyncio能夠實現單線程併發IO操做。若是僅用在客戶端,發揮的威力不大。若是把asyncio用在服務器端,例如Web服務器,因爲HTTP鏈接就是IO操做,所以能夠用單線程+coroutine實現多用戶的高併發支持。

# server code

import
asyncio from aiohttp import web async def index(request): await asyncio.sleep(0.5) return web.Response(body=b'<h1>Index</h1>') async def hello(request): await asyncio.sleep(0.5) text = '<h1>hello, %s!</h1>' % request.match_info['name'] return web.Response(body=text.encode('utf-8')) async def init(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/hello/{name}', hello)
srv
= await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv
loop
= asyncio.get_event_loop() loop.run_until_complete(init(loop)) loop.run_forever()

 

 

 

 

異步百萬併發


Ref: python異步編程之asyncio(百萬併發)

文章不錯,詳見連接。

值得注意的一點是:最大併發限制的設置。

semaphore = asyncio.Semaphore(500) # 限制併發量爲500

  

End.

相關文章
相關標籤/搜索