首先咱們須要安裝 RabbitMQ,而後經過服務啓動它,默認爲註冊到本機的5672端口。咱們的爬蟲和數據庫寫入腳本都須要鏈接到 RabbitMQ,一邊往隊列中寫入數據,另外一邊從隊列中取出數據,而後插入到數據。html
Python 中使用 RabbitMQ 能夠經過調用 pika 這個庫,安裝過程見官方文檔,對於 RabbitMQ 自己也有中文教程。python
本項目用到的模型是一對一的,用 pika 寫很容易,代碼以下:數據庫
import pika # 導入庫 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) # 設置一個新鏈接,鏈接到本地的 RabbitMQ 服務端。 channel = connection.channel() # 註冊到 books 隊列 channel.queue_declare(queue='books') channel.basic_publish(exchange='', routing_key='books', body='Whats up') # 發送消息 body connection.close() #
在 basic_publish 這個函數中,咱們設置 exchange 爲空,而 routing-key 爲 books,此時 basic_publish 會默認把咱們的 body 信息根據 routing-key 的內容發送到 books 的隊列中。django
這裏 exchange 實際上是一個信息中轉站,以下圖,P 爲咱們要發送的信息,X 就是信息中轉站,咱們經過 exchange 字段來設置咱們的目標中轉站,而後由 exchange 來決定咱們的信息要往哪裏走。 app
而 routing-key 的設置也頗有講究,能夠參考教程中 Routing 一節。scrapy
到此,咱們已經寫好生產者了,接下來咱們看消費者。ide
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='books') def callback(ch, method, properties, body): print body channel.basic_consume(callback, queue='books', no_ack=True) # 註冊回調函數,當有消息取出時,程序調用 callback 函數,其中 body 就是取出的消息。 channel.start_consuming()
最後貼一下代碼,爬蟲端:函數
#!/usr/bin/env python # -*- encoding: utf-8 -*- from pyspider.libs.base_handler import * import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='books') class Handler(BaseHandler): crawl_config = {} def on_start(self): self.crawl('http://scrapy.org/', callback=self.index_page) def on_result(self, result): # 重寫 on_result 函數 if not result: return assert self.task, "on_result can't outside a callback." result['callback'] = self.task['process']['callback'] if self.is_debugger(): pprint(result) channel.basic_publish(exchange='', routing_key='books', body=result) if self.__env__.get('result_queue'): channel.basic_publish(exchange='', routing_key='books', body=result) self.__env__['result_queue'].put((self.task, result)) @config(priority=5) def index_page(self, response): url_list = [] for each in response.doc('a[href^="http"]').items(): url_list.append(each.attr.href) return url_list
消費者端:ui
import os os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'spider.settings') import django django.setup() from django.core.exceptions import ObjectDoesNotExist from importer.models import Books import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='books') def callback(ch, method, properties, body): new_book = Books() new_book.url = body new_book.save() print body + ' saved!' channel.basic_consume(callback, queue='books', no_ack=True) channel.start_consuming()
本例中調用了 django 的 BOOKS 的數據庫模型,在隊列中取出消息後,存入 BOOKS 表中,字段爲url。url