Redis實現簡單消息隊列

任務異步化

打開瀏覽器,輸入地址,按下回車,打開了頁面。因而一個HTTP請求(request)就由客戶端發送到服務器,服務器處理請求,返回響應(response)內容。html

咱們天天都在瀏覽網頁,發送大大小小的請求給服務器。有時候,服務器接到了請求,會發現他也須要給另外的服務器發送請求,或者服務器也須要作另一些事情,因而最初們發送的請求就被阻塞了,也就是要等待服務器完成其餘的事情。redis

更多的時候,服務器作的額外事情,並不須要客戶端等待,這時候就能夠把這些額外的事情異步去作。從事異步任務的工具備不少。主要原理仍是處理通知消息,針對通知消息一般採起是隊列結構。生產和消費消息進行通訊和業務實現。flask

生產消費與隊列

上述異步任務的實現,能夠抽象爲生產者消費模型。如同一個餐館,廚師在作飯,吃貨在吃飯。若是廚師作了不少,暫時賣不完,廚師就會休息;若是客戶不少,廚師快馬加鞭的忙碌,客戶則須要慢慢等待。實現生產者和消費者的方式用不少,下面使用Python標準庫Queue寫個小例子:後端

 

1瀏覽器

2服務器

3app

4dom

5異步

6工具

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

import random

import time

from Queue import Queue

from threading import Thread

 

queue = Queue(10)

 

class Producer(Thread):

    def run(self):

        while True:

            elem = random.randrange(9)

            queue.put(elem)

            print "廚師 {} 作了 {} 飯 --- 還剩 {} 飯沒賣完".format(self.name, elem, queue.qsize())

            time.sleep(random.random())

 

class Consumer(Thread):

    def run(self):

        while True:

            elem = queue.get()

            print "吃貨{} 吃了 {} 飯 --- 還有 {} 飯能夠吃".format(self.name, elem, queue.qsize())

            time.sleep(random.random())

 

def main():

    for i in range(3):

        p = Producer()

        p.start()

    for i in range(2):

        c = Consumer()

        c.start()

 

if __name__ == '__main__':

    main()

大概輸出以下:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

廚師 Thread-1 作了 1 飯 --- 還剩 1 飯沒賣完

廚師 Thread-2 作了 8 飯 --- 還剩 2 飯沒賣完

廚師 Thread-3 作了 3 飯 --- 還剩 3 飯沒賣完

吃貨Thread-4 吃了 1 飯 --- 還有 2 飯能夠吃

吃貨Thread-5 吃了 8 飯 --- 還有 1 飯能夠吃

吃貨Thread-4 吃了 3 飯 --- 還有 0 飯能夠吃

廚師 Thread-1 作了 0 飯 --- 還剩 1 飯沒賣完

廚師 Thread-2 作了 0 飯 --- 還剩 2 飯沒賣完

廚師 Thread-1 作了 1 飯 --- 還剩 3 飯沒賣完

廚師 Thread-1 作了 1 飯 --- 還剩 4 飯沒賣完

吃貨Thread-4 吃了 0 飯 --- 還有 3 飯能夠吃

廚師 Thread-3 作了 3 飯 --- 還剩 4 飯沒賣完

吃貨Thread-5 吃了 0 飯 --- 還有 3 飯能夠吃

吃貨Thread-5 吃了 1 飯 --- 還有 2 飯能夠吃

廚師 Thread-2 作了 8 飯 --- 還剩 3 飯沒賣完

廚師 Thread-2 作了 8 飯 --- 還剩 4 飯沒賣完

 

Redis 隊列

Python內置了一個好用的隊列結構。咱們也能夠是用redis實現相似的操做。並作一個簡單的異步任務。

Redis提供了兩種方式來做消息隊列。一個是使用生產者消費模式模式,另一個方法就是發佈訂閱者模式。前者會讓一個或者多個客戶端監聽消息隊列,一旦消息到達,消費者立刻消費,誰先搶到算誰的,若是隊列裏沒有消息,則消費者繼續監聽。後者也是一個或多個客戶端訂閱消息頻道,只要發佈者發佈消息,全部訂閱者都能收到消息,訂閱者都是ping的。

生產消費模式

主要使用了redis提供的blpop獲取隊列數據,若是隊列沒有數據則阻塞等待,也就是監聽。

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

import redis

 

class Task(object):

    def __init__(self):

        self.rcon = redis.StrictRedis(host='localhost', db=5)

        self.queue = 'task:prodcons:queue'

 

    def listen_task(self):

        while True:

            task = self.rcon.blpop(self.queue, 0)[1]

            print "Task get", task

 

if __name__ == '__main__':

    print 'listen task queue'

    Task().listen_task()

 

發佈訂閱模式

使用redis的pubsub功能,訂閱者訂閱頻道,發佈者發佈消息到頻道了,頻道就是一個消息隊列。

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

import redis

 

class Task(object):

 

    def __init__(self):

        self.rcon = redis.StrictRedis(host='localhost', db=5)

        self.ps = self.rcon.pubsub()

        self.ps.subscribe('task:pubsub:channel')

 

    def listen_task(self):

        for i in self.ps.listen():

            if i['type'] == 'message':

                print "Task get", i['data']

 

if __name__ == '__main__':

    print 'listen task channel'

    Task().listen_task()

 

Flask 入口

咱們分別實現了兩種異步任務的後端服務,直接啓動他們,就能監聽redis隊列或頻道的消息了。簡單的測試以下:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

import redis

import random

import logging

from flask import Flask, redirect

 

app = Flask(__name__)

 

rcon = redis.StrictRedis(host='localhost', db=5)

prodcons_queue = 'task:prodcons:queue'

pubsub_channel = 'task:pubsub:channel'

 

@app.route('/')

def index():

 

    html = """

<br>

<center><h3>Redis Message Queue</h3>

<br>

<a href="/prodcons">生產消費者模式</a>

<br>

<br>

<a href="/pubsub">發佈訂閱者模式</a>

</center>

"""

    return html

 

@app.route('/prodcons')

def prodcons():

    elem = random.randrange(10)

    rcon.lpush(prodcons_queue, elem)

    logging.info("lpush {} -- {}".format(prodcons_queue, elem))

    return redirect('/')

 

@app.route('/pubsub')

def pubsub():

    ps = rcon.pubsub()

    ps.subscribe(pubsub_channel)

    elem = random.randrange(10)

    rcon.publish(pubsub_channel, elem)

    return redirect('/')

 

if __name__ == '__main__':

    app.run(debug=True)

啓動腳本,使用

 

1

2

siege -c10 -r 5 http://127.0.0.1:5000/prodcons

siege -c10 -r 5 http://127.0.0.1:5000/pubsub

能夠分別在監聽的腳本輸入中看到異步消息。在異步的任務中,能夠執行一些耗時間的操做,固然目前這些作法並不知道異步的執行結果,若是須要知道異步的執行結果,能夠考慮設計協程任務或者使用一些工具如RQ或者celery等。

相關文章
相關標籤/搜索