pyspider + RabbitMQ 使用記 - 下

首先咱們須要安裝 RabbitMQ,而後經過服務啓動它,默認爲註冊到本機的5672端口。咱們的爬蟲和數據庫寫入腳本都須要鏈接到 RabbitMQ,一邊往隊列中寫入數據,另外一邊從隊列中取出數據,而後插入到數據。html

Python 中使用 RabbitMQ 能夠經過調用 pika 這個庫,安裝過程見官方文檔,對於 RabbitMQ 自己也有中文教程python

本項目用到的模型是一對一的,用 pika 寫很容易,代碼以下:數據庫

import pika
# 導入庫
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 設置一個新鏈接,鏈接到本地的 RabbitMQ 服務端。
channel = connection.channel()
# 註冊到 books 隊列
channel.queue_declare(queue='books')
channel.basic_publish(exchange='', routing_key='books', body='Whats up')
# 發送消息 body
connection.close()
#

在 basic_publish 這個函數中,咱們設置 exchange 爲空,而 routing-key 爲 books,此時 basic_publish 會默認把咱們的 body 信息根據 routing-key 的內容發送到 books 的隊列中。django

這裏 exchange 實際上是一個信息中轉站,以下圖,P 爲咱們要發送的信息,X 就是信息中轉站,咱們經過 exchange 字段來設置咱們的目標中轉站,而後由 exchange 來決定咱們的信息要往哪裏走。 exchangeapp

而 routing-key 的設置也頗有講究,能夠參考教程中 Routing 一節。scrapy

到此,咱們已經寫好生產者了,接下來咱們看消費者。ide

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='books')


def callback(ch, method, properties, body):
    print body

channel.basic_consume(callback, queue='books', no_ack=True)
# 註冊回調函數,當有消息取出時,程序調用 callback 函數,其中 body 就是取出的消息。
channel.start_consuming()

這裏咱們在回調函數中能夠直接打印 body 到控制檯。

最後貼一下代碼,爬蟲端:函數

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

from pyspider.libs.base_handler import *
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='books')


class Handler(BaseHandler):
    crawl_config = {}

    def on_start(self):
        self.crawl('http://scrapy.org/', callback=self.index_page)
    
    def on_result(self, result):
        # 重寫 on_result 函數
        if not result:
            return
        assert self.task, "on_result can't outside a callback."
        result['callback'] = self.task['process']['callback']
        if self.is_debugger():
            pprint(result)
            channel.basic_publish(exchange='', routing_key='books', body=result)
        if self.__env__.get('result_queue'):
            channel.basic_publish(exchange='', routing_key='books', body=result)
            self.__env__['result_queue'].put((self.task, result))
        
    @config(priority=5)
    def index_page(self, response):
        url_list = []
        for each in response.doc('a[href^="http"]').items():
            url_list.append(each.attr.href)
        return url_list

消費者端:ui

import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'spider.settings')
import django
django.setup()
from django.core.exceptions import ObjectDoesNotExist
from importer.models import Books

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='books')


def callback(ch, method, properties, body):
    new_book = Books()
    new_book.url = body
    new_book.save()
    print body + ' saved!'

channel.basic_consume(callback, queue='books', no_ack=True)
channel.start_consuming()

本例中調用了 django 的 BOOKS 的數據庫模型,在隊列中取出消息後,存入 BOOKS 表中,字段爲url。url

相關文章
相關標籤/搜索