Scrapy是一個比較好用的Python爬蟲框架,你只須要編寫幾個組件就能夠實現網頁數據的爬取。可是當咱們要爬取的頁面很是多的時候,單個主機的處理能力就不能知足咱們的需求了(不管是處理速度仍是網絡請求的併發數),這時候分佈式爬蟲的優點就顯現出來。html
而Scrapy-Redis則是一個基於Redis的Scrapy分佈式組件。它利用Redis對用於爬取的請求(Requests)進行存儲和調度(Schedule),並對爬取產生的項目(items)存儲以供後續處理使用。scrapy-redi重寫了scrapy一些比較關鍵的代碼,將scrapy變成一個能夠在多個主機上同時運行的分佈式爬蟲。python
原生的Scrapy的架構是這樣子的:git
加上了Scrapy-Redis以後的架構變成了:github
scrapy-redis的官方文檔寫的比較簡潔,沒有說起其運行原理,因此若是想全面的理解分佈式爬蟲的運行原理,仍是得看scrapy-redis的源代碼才行,不過scrapy-redis的源代碼不多,也比較好懂,很快就能看完。redis
scrapy-redis工程的主體仍是是redis和scrapy兩個庫,工程自己實現的東西不是不少,這個工程就像膠水同樣,把這兩個插件粘結了起來。數據庫
scrapy-redis所實現的兩種分佈式:爬蟲分佈式以及item處理分佈式。分別是由模塊scheduler和模塊pipelines實現。json
connection.py服務器
負責根據setting中配置實例化redis鏈接。被dupefilter和scheduler調用,總之涉及到redis存取的都要使用到這個模塊。網絡
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
import
redis
import
six
from
scrapy
.
utils
.
misc
import
load_object
DEFAULT_REDIS_CLS
=
redis
.
StrictRedis
# Sane connection defaults.
DEFAULT_PARAMS
=
{
'socket_timeout'
:
30
,
'socket_connect_timeout'
:
30
,
'retry_on_timeout'
:
True
,
}
# Shortcut maps 'setting name' -> 'parmater name'.
SETTINGS_PARAMS_MAP
=
{
'REDIS_URL'
:
'url'
,
'REDIS_HOST'
:
'host'
,
'REDIS_PORT'
:
'port'
,
}
def
get_redis_from_settings
(
settings
)
:
""
"Returns a redis client instance from given Scrapy settings object.
This function uses ``get_client`` to instantiate the client and uses
``DEFAULT_PARAMS`` global as defaults values for the parameters. You can
override them using the ``REDIS_PARAMS`` setting.
Parameters
----------
settings : Settings
A scrapy settings object. See the supported settings below.
Returns
-------
server
Redis client instance.
Other Parameters
----------------
REDIS_URL : str, optional
Server connection URL.
REDIS_HOST : str, optional
Server host.
REDIS_PORT : str, optional
Server port.
REDIS_PARAMS : dict, optional
Additional client parameters.
"
""
params
=
DEFAULT_PARAMS
.
copy
(
)
params
.
update
(
settings
.
getdict
(
'REDIS_PARAMS'
)
)
# XXX: Deprecate REDIS_* settings.
for
source
,
dest
in
SETTINGS_PARAMS_MAP
.
items
(
)
:
val
=
settings
.
get
(
source
)
if
val
:
params
[
dest
]
=
val
# Allow ``redis_cls`` to be a path to a class.
if
isinstance
(
params
.
get
(
'redis_cls'
)
,
six
.
string_types
)
:
params
[
'redis_cls'
]
=
load_object
(
params
[
'redis_cls'
]
)
return
get_redis
(
*
*
params
)
# Backwards compatible alias.
from_settings
=
get_redis_from_settings
def
get_redis
(
*
*
kwargs
)
:
""
"Returns a redis client instance.
Parameters
----------
redis_cls : class, optional
Defaults to ``redis.StrictRedis``.
url : str, optional
If given, ``redis_cls.from_url`` is used to instantiate the class.
**kwargs
Extra parameters to be passed to the ``redis_cls`` class.
Returns
-------
server
Redis client instance.
"
""
redis_cls
=
kwargs
.
pop
(
'redis_cls'
,
DEFAULT_REDIS_CLS
)
url
=
kwargs
.
pop
(
'url'
,
None
)
if
url
:
return
redis_cls
.
from_url
(
url
,
*
*
kwargs
)
else
:
return
redis_cls
(
*
*
kwargs
)
|
connect文件引入了redis模塊,這個是redis-python庫的接口,用於經過python訪問redis數據庫,可見,這個文件主要是實現鏈接redis數據庫的功能(返回的是redis庫的Redis對象或者StrictRedis對象,這倆都是能夠直接用來進行數據操做的對象)。這些鏈接接口在其餘文件中常常被用到。其中,咱們能夠看到,要想鏈接到redis數據庫,和其餘數據庫差很少,須要一個ip地址、端口號、用戶名密碼(可選)和一個整形的數據庫編號,同時咱們還能夠在scrapy工程的setting文件中配置套接字的超時時間、等待時間等。數據結構
dupefilter.py
負責執行requst的去重,實現的頗有技巧性,使用redis的set數據結構。可是注意scheduler並不使用其中用於在這個模塊中實現的dupefilter鍵作request的調度,而是使用queue.py模塊中實現的queue。當request不重複時,將其存入到queue中,調度時將其彈出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
import
logging
import
time
from
scrapy
.
dupefilters
import
BaseDupeFilter
from
scrapy
.
utils
.
request
import
request_fingerprint
from
.
connection
import
get_redis_from_settings
DEFAULT_DUPEFILTER_KEY
=
"dupefilter:%(timestamp)s"
logger
=
logging
.
getLogger
(
__name__
)
# TODO: Rename class to RedisDupeFilter.
class
RFPDupeFilter
(
BaseDupeFilter
)
:
""
"Redis-based request duplicates filter.
This class can also be used with default Scrapy's scheduler.
"
""
logger
=
logger
def
__init__
(
self
,
server
,
key
,
debug
=
False
)
:
""
"Initialize the duplicates filter.
Parameters
----------
server : redis.StrictRedis
The redis server instance.
key : str
Redis key Where to store fingerprints.
debug : bool, optional
Whether to log filtered requests.
"
""
self
.
server
=
server
self
.
key
=
key
self
.
debug
=
debug
self
.
logdupes
=
True
@
classmethod
def
from_settings
(
cls
,
settings
)
:
""
"Returns an instance from given settings.
This uses by default the key ``dupefilter:<timestamp>``. When using the
``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
it needs to pass the spider name in the key.
Parameters
----------
settings : scrapy.settings.Settings
Returns
-------
RFPDupeFilter
A RFPDupeFilter instance.
"
""
server
=
get_redis_from_settings
(
settings
)
# XXX: This creates one-time key. needed to support to use this
# class as standalone dupefilter with scrapy's default scheduler
# if scrapy passes spider on open() method this wouldn't be needed
# TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
key
=
DEFAULT_DUPEFILTER_KEY
%
{
'timestamp'
:
int
(
time
.
time
(
)
)
}
debug
=
settings
.
getbool
(
'DUPEFILTER_DEBUG'
)
return
cls
(
server
,
key
=
key
,
debug
=
debug
)
@
classmethod
def
from_crawler
(
cls
,
crawler
)
:
""
"Returns instance from crawler.
Parameters
----------
crawler : scrapy.crawler.Crawler
Returns
-------
RFPDupeFilter
Instance of RFPDupeFilter.
"
""
return
cls
.
from_settings
(
crawler
.
settings
)
def
request_seen
(
self
,
request
)
:
""
"Returns True if request was already seen.
Parameters
----------
request : scrapy.http.Request
Returns
-------
bool
"
""
fp
=
self
.
request_fingerprint
(
request
)
# This returns the number of values added, zero if already exists.
added
=
self
.
server
.
sadd
(
self
.
key
,
fp
)
return
added
==
0
def
request_fingerprint
(
self
,
request
)
:
""
"Returns a fingerprint for a given request.
Parameters
----------
request : scrapy.http.Request
Returns
-------
str
"
""
return
request_fingerprint
(
request
)
def
close
(
self
,
reason
=
''
)
:
""
"Delete data on close. Called by Scrapy's scheduler.
Parameters
----------
reason : str, optional
"
""
self
.
clear
(
)
def
clear
(
self
)
:
""
"Clears fingerprints data."
""
self
.
server
.
delete
(
self
.
key
)
def
log
(
self
,
request
,
spider
)
:
""
"Logs given request.
Parameters
----------
request : scrapy.http.Request
spider : scrapy.spiders.Spider
"
""
if
self
.
debug
:
msg
=
"Filtered duplicate request: %(request)s"
self
.
logger
.
debug
(
msg
,
{
'request'
:
request
}
,
extra
=
{
'spider'
:
spider
}
)
elif
self
.
logdupes
:
msg
=
(
"Filtered duplicate request %(request)s"
" - no more duplicates will be shown"
" (see DUPEFILTER_DEBUG to show all duplicates)"
)
msg
=
"Filtered duplicate request: %(request)s"
self
.
logger
.
debug
(
msg
,
{
'request'
:
request
}
,
extra
=
{
'spider'
:
spider
}
)
self
.
logdupes
=
False
|
這個文件看起來比較複雜,重寫了scrapy自己已經實現的request判重功能。由於自己scrapy單機跑的話,只須要讀取內存中的request隊列或者持久化的request隊列(scrapy默認的持久化彷佛是json格式的文件,不是數據庫)就能判斷此次要發出的request url是否已經請求過或者正在調度(本地讀就好了)。而分佈式跑的話,就須要各個主機上的scheduler都鏈接同一個數據庫的同一個request池來判斷此次的請求是不是重複的了。
在這個文件中,經過繼承BaseDupeFilter重寫他的方法,實現了基於redis的判重。根據源代碼來看,scrapy-redis使用了scrapy自己的一個fingerprint接request_fingerprint,這個接口頗有趣,根據scrapy文檔所說,他經過hash來判斷兩個url是否相同(相同的url會生成相同的hash結果),可是當兩個url的地址相同,get型參數相同可是順序不一樣時,也會生成相同的hash結果(這個真的比較神奇。。。)因此scrapy-redis依舊使用url的fingerprint來判斷request請求是否已經出現過。這個類經過鏈接redis,使用一個key來向redis的一個set中插入fingerprint(這個key對於同一種spider是相同的,redis是一個key-value的數據庫,若是key是相同的,訪問到的值就是相同的,這裏使用spider名字+DupeFilter的key就是爲了在不一樣主機上的不一樣爬蟲實例,只要屬於同一種spider,就會訪問到同一個set,而這個set就是他們的url判重池),若是返回值爲0,說明該set中該fingerprint已經存在(由於集合是沒有重複值的),則返回False,若是返回值爲1,說明添加了一個fingerprint到set中,則說明這個request沒有重複,因而返回True,還順便把新fingerprint加入到數據庫中了。 DupeFilter判重會在scheduler類中用到,每個request在進入調度以前都要進行判重,若是重複就不須要參加調度,直接捨棄就行了,否則就是白白浪費資源。
queue.py
其做用如dupefilter.py所述,可是這裏實現了三種方式的queue:FIFO的SpiderQueue,SpiderPriorityQueue,以及LIFI的SpiderStack。默認使用的是第二種,這也就是出現以前文章中所分析狀況的緣由(連接)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
from
scrapy
.
utils
.
reqser
import
request_to_dict
,
request_from_dict
from
.
import
picklecompat
class
Base
(
object
)
:
""
"Per-spider queue/stack base class"
""
def
__init__
(
self
,
server
,
spider
,
key
,
serializer
=
None
)
:
""
"Initialize per-spider redis queue.
Parameters:
server -- redis connection
spider -- spider instance
key -- key for this queue (e.g. "
%
(
spider
)
s
:
queue
")
"
""
if
serializer
is
None
:
# Backward compatibility.
# TODO: deprecate pickle.
serializer
=
picklecompat
if
not
hasattr
(
serializer
,
'loads'
)
:
raise
TypeError
(
"serializer does not implement 'loads' function: %r"
%
serializer
)
if
not
hasattr
(
serializer
,
'dumps'
)
:
raise
TypeError
(
"serializer '%s' does not implement 'dumps' function: %r"
%
serializer
)
self
.
server
=
server
self
.
spider
=
spider
self
.
key
=
key
%
{
'spider'
:
spider
.
name
}
self
.
serializer
=
serializer
def
_encode_request
(
self
,
request
)
:
""
"Encode a request object"
""
obj
=
request_to_dict
(
request
,
self
.
spider
)
return
self
.
serializer
.
dumps
(
obj
)
def
_decode_request
(
self
,
encoded_request
)
:
""
"Decode an request previously encoded"
""
obj
=
self
.
serializer
.
loads
(
encoded_request
)
return
request_from_dict
(
obj
,
self
.
spider
)
def
__len__
(
self
)
:
""
"Return the length of the queue"
""
raise
NotImplementedError
def
push
(
self
,
request
)
:
""
"Push a request"
""
raise
NotImplementedError
def
pop
(
self
,
timeout
=
0
)
:
""
"Pop a request"
""
raise
NotImplementedError
def
clear
(
self
)
:
""
"Clear queue/stack"
""
self
.
server
.
delete
(
self
.
key
)
class
SpiderQueue
(
Base
)
:
""
"Per-spider FIFO queue"
""
def
__len__
(
self
)
:
""
"Return the length of the queue"
""
return
self
.
server
.
llen
(
self
.
key
)
def
push
(
self
,
request
)
:
""
"Push a request"
""
self
.
server
.
lpush
(
self
.
key
,
self
.
_encode_request
(
request
)
)
def
pop
(
self
,
timeout
=
0
)
:
""
"Pop a request"
""
if
timeout
>
0
:
data
=
self
.
server
.
brpop
(
self
.
key
,
timeout
)
if
isinstance
(
data
,
tuple
)
:
data
=
data
[
1
]
else
:
data
=
self
.
server
.
rpop
(
self
.
key
)
if
data
:
return
self
.
_decode_request
(
data
)
class
SpiderPriorityQueue
(
Base
)
:
""
"Per-spider priority queue abstraction using redis' sorted set"
""
def
__len__
(
self
)
:
""
"Return the length of the queue"
""
return
self
.
server
.
zcard
(
self
.
key
)
def
push
(
self
,
request
)
:
""
"Push a request"
""
data
=
self
.
_encode_request
(
request
)
score
=
-
request
.
priority
# We don't use zadd method as the order of arguments change depending on
# whether the class is Redis or StrictRedis, and the option of using
# kwargs only accepts strings, not bytes.
self
.
server
.
execute_command
(
'ZADD'
,
self
.
key
,
score
,
data
)
def
pop
(
self
,
timeout
=
0
)
:
""
"
Pop a request
timeout not support in this queue class
"
""
# use atomic range/remove using multi/exec
pipe
=
self
.
server
.
pipeline
(
)
pipe
.
multi
(
)
pipe
.
zrange
(
self
.
key
,
0
,
0
)
.
zremrangebyrank
(
self
.
key
,
0
,
0
)
results
,
count
=
pipe
.
execute
(
)
if
results
:
return
self
.
_decode_request
(
results
[
0
]
)
class
SpiderStack
(
Base
)
:
""
"Per-spider stack"
""
def
__len__
(
self
)
:
""
"Return the length of the stack"
""
return
self
.
server
.
llen
(
self
.
key
)
def
push
(
self
,
request
)
:
""
"Push a request"
""
self
.
server
.
lpush
(
self
.
key
,
self
.
_encode_request
(
request
)
)
def
pop
(
self
,
timeout
=
0
)
:
""
"Pop a request"
""
if
timeout
>
0
:
data
=
self
.
server
.
blpop
(
self
.
key
,
timeout
)
if
isinstance
(
data
,
tuple
)
:
data
=
data
[
1
]
else
:
data
=
self
.
server
.
lpop
(
self
.
key
)
if
data
:
return
self
.
_decode_request
(
data
)
__all__
=
[
'SpiderQueue'
,
'SpiderPriorityQueue'
,
'SpiderStack'
]
|
該文件實現了幾個容器類,能夠看這些容器和redis交互頻繁,同時使用了咱們上邊picklecompat中定義的serializer。這個文件實現的幾個容器大致相同,只不過一個是隊列,一個是棧,一個是優先級隊列,這三個容器到時候會被scheduler對象實例化,來實現request的調度。好比咱們使用SpiderQueue最爲調度隊列的類型,到時候request的調度方法就是先進先出,而實用SpiderStack就是先進後出了。
咱們能夠仔細看看SpiderQueue的實現,他的push函數就和其餘容器的同樣,只不過push進去的request請求先被scrapy的接口request_to_dict變成了一個dict對象(由於request對象實在是比較複雜,有方法有屬性很差串行化),以後使用picklecompat中的serializer串行化爲字符串,而後使用一個特定的key存入redis中(該key在同一種spider中是相同的)。而調用pop時,其實就是從redis用那個特定的key去讀其值(一個list),從list中讀取最先進去的那個,因而就先進先出了。
這些容器類都會做爲scheduler調度request的容器,scheduler在每一個主機上都會實例化一個,而且和spider一一對應,因此分佈式運行時會有一個spider的多個實例和一個scheduler的多個實例存在於不一樣的主機上,可是,由於scheduler都是用相同的容器,而這些容器都鏈接同一個redis服務器,又都使用spider名加queue來做爲key讀寫數據,因此不一樣主機上的不一樣爬蟲實例公用一個request調度池,實現了分佈式爬蟲之間的統一調度。
picklecompat.py
1
2
3
4
5
6
7
8
9
|
""
"A pickle wrapper module with protocol=-1 by default."
""
try
:
import
cPickle
as
pickle
# PY2
except
ImportError
:
import
pickle
def
loads
(
s
)
:
return
pickle
.
loads
(
s
)
def
dumps
(
obj
)
:
return
pickle
.
dumps
(
obj
,
protocol
=
-
1
)
|
這裏實現了loads和dumps兩個函數,其實就是實現了一個serializer,由於redis數據庫不能存儲複雜對象(value部分只能是字符串,字符串列表,字符串集合和hash,key部分只能是字符串),因此咱們存啥都要先串行化成文本才行。這裏使用的就是python的pickle模塊,一個兼容py2和py3的串行化工具。這個serializer主要用於一會的scheduler存reuqest對象,至於爲何不實用json格式,我也不是很懂,item pipeline的串行化默認用的就是json。
pipelines.py
這是是用來實現分佈式處理的做用。它將Item存儲在redis中以實現分佈式處理。另外能夠發現,一樣是編寫pipelines,在這裏的編碼實現不一樣於文章中所分析的狀況,因爲在這裏須要讀取配置,因此就用到了from_crawler()函數。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
from
scrapy
.
utils
.
misc
import
load_object
from
scrapy
.
utils
.
serialize
import
ScrapyJSONEncoder
from
twisted
.
internet
.
threads
import
deferToThread
from
.
import
connection
default_serialize
=
ScrapyJSONEncoder
(
)
.
encode
class
RedisPipeline
(
object
)
:
""
"Pushes serialized item into a redis list/queue"
""
def
__init__
(
self
,
server
,
key
=
'%(spider)s:items'
,
serialize_func
=
default_serialize
)
:
self
.
server
=
server
self
.
key
=
key
self
.
serialize
=
serialize
_func
@
classmethod
def
from_settings
(
cls
,
settings
)
:
params
=
{
'server'
:
connection
.
from_settings
(
settings
)
,
}
if
settings
.
get
(
'REDIS_ITEMS_KEY'
)
:
params
[
'key'
]
=
settings
[
'REDIS_ITEMS_KEY'
]
if
settings
.
get
(
'REDIS_ITEMS_SERIALIZER'
)
:
params
[
'serialize_func'
]
=
load_object
(
settings
[
'REDIS_ITEMS_SERIALIZER'
]
)
return
cls
(
*
*
params
)
@
classmethod
def
from_crawler
(
cls
,
crawler
)
:
return
cls
.
from_settings
(
crawler
.
settings
)
def
process_item
(
self
,
item
,
spider
)
:
return
deferToThread
(
self
.
_process_item
,
item
,
spider
)
def
_process_item
(
self
,
item
,
spider
)
:
key
=
self
.
item_key
(
item
,
spider
)
data
=
self
.
serialize
(
item
)
self
.
server
.
rpush
(
key
,
data
)
return
item
def
item_key
(
self
,
item
,
spider
)
:
""
"Returns redis key based on given spider.
Override this function to use a different key depending on the item
and/or spider.
"
""
return
self
.
key
%
{
'spider'
:
spider
.
name
}
|
pipeline文件實現了一個item pipieline類,和scrapy的item pipeline是同一個對象,經過從settings中拿到咱們配置的REDIS_ITEMS_KEY做爲key,把item串行化以後存入redis數據庫對應的value中(這個value能夠看出出是個list,咱們的每一個item是這個list中的一個結點),這個pipeline把提取出的item存起來,主要是爲了方便咱們延後處理數據。
scheduler.py
此擴展是對scrapy中自帶的scheduler的替代(在settings的SCHEDULER變量中指出),正是利用此擴展實現crawler的分佈式調度。其利用的數據結構來自於queue中實現的數據結構。
scrapy-redis所實現的兩種分佈式:爬蟲分佈式以及item處理分佈式就是由模塊scheduler和模塊pipelines實現。上述其它模塊做爲爲兩者輔助的功能模塊。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
import
importlib
import
six
from
scrapy
.
utils
.
misc
import
load_object
from
.
import
connection
# TODO: add SCRAPY_JOB support.
class
Scheduler
(
object
)
:
""
"Redis-based scheduler"
""
def
__init__
(
self
,
server
,
persist
=
False
,
flush_on_start
=
False
,
queue_key
=
'%(spider)s:requests'
,
queue_cls
=
'scrapy_redis.queue.SpiderPriorityQueue'
,
dupefilter_key
=
'%(spider)s:dupefilter'
,
dupefilter_cls
=
'scrapy_redis.dupefilter.RFPDupeFilter'
,
idle_before_close
=
0
,
serializer
=
None
)
:
""
"Initialize scheduler.
Parameters
----------
server : Redis
The redis server instance.
persist : bool
Whether to flush requests when closing. Default is False.
flush_on_start : bool
Whether to flush requests on start. Default is False.
queue_key : str
Requests queue key.
queue_cls : str
Importable path to the queue class.
dupefilter_key : str
Duplicates filter key.
dupefilter_cls : str
Importable path to the dupefilter class.
idle_before_close : int
Timeout before giving up.
"
""
if
idle_before_close
<
0
:
raise
TypeError
(
"idle_before_close cannot be negative"
)
self
.
server
=
server
self
.
persist
=
persist
self
.
flush_on_start
=
flush_on_start
self
.
queue_key
=
queue_key
self
.
queue_cls
=
queue_cls
self
.
dupefilter_cls
=
dupefilter_cls
self
.
dupefilter_key
=
dupefilter_key
self
.
idle_before_close
=
idle_before_close
self
.
serializer
=
serializer
self
.
stats
=
None
def
__len__
(
self
)
:
return
len
(
self
.
queue
)
@
classmethod
def
from_settings
(
cls
,
settings
)
:
kwargs
=
{
'persist'
:
settings
.
getbool
(
'SCHEDULER_PERSIST'
)
,
'flush_on_start'
:
settings
.
getbool
(
'SCHEDULER_FLUSH_ON_START'
)
,
'idle_before_close'
:
settings
.
getint
(
'SCHEDULER_IDLE_BEFORE_CLOSE'
)
,
}
# If these values are missing, it means we want to use the defaults.
optional
=
{
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
'queue_key'
:
'SCHEDULER_QUEUE_KEY'
,
'queue_cls'
:
'SCHEDULER_QUEUE_CLASS'
,
'dupefilter_key'
:
'SCHEDULER_DUPEFILTER_KEY'
,
# We use the default setting name to keep compatibility.
'dupefilter_cls'
:
'DUPEFILTER_CLASS'
,
'serializer'
:
'SCHEDULER_SERIALIZER'
,
}
for
name
,
setting_name
in
optional
.
items
(
)
:
val
=
settings
.
get
(
setting_name
)
if
val
:
kwargs
[
name
]
=
val
# Support serializer as a path to a module.
if
isinstance
(
kwargs
.
get
(
'serializer'
)
,
six
.
string_types
)
:
kwargs
[
'serializer'
]
=
importlib
.
import_module
(
kwargs
[
'serializer'
]
)
server
=
connection
.
from_settings
(
settings
)
# Ensure the connection is working.
server
.
ping
(
)
return
cls
(
server
=
server
,
*
*
kwargs
)
@
classmethod
def
from_crawler
(
cls
,
crawler
)
:
instance
=
cls
.
from_settings
(
crawler
.
settings
)
# FIXME: for now, stats are only supported from this constructor
instance
.
stats
=
crawler
.
stats
return
instance
def
open
(
self
,
spider
)
:
self
.
spider
=
spider
try
:
self
.
queue
=
load_object
(
self
.
queue_cls
)
(
server
=
self
.
server
,
spider
=
spider
,
key
=
self
.
queue_key
%
{
'spider'
:
spider
.
name
}
,
serializer
=
self
.
serializer
,
)
except
TypeError
as
e
:
raise
ValueError
(
"Failed to instantiate queue class '%s': %s"
,
self
.
queue_cls
,
e
)
try
:
self
.
df
=
load_object
(
self
.
dupefilter_cls
)
(
server
=
self
.
server
,
key
=
self
.
dupefilter_key
%
{
'spider'
:
spider
.
name
}
,
debug
=
spider
.
settings
.
getbool
(
'DUPEFILTER_DEBUG'
)
,
)
except
TypeError
as
e
:
raise
ValueError
(
"Failed to instantiate dupefilter class '%s': %s"
,
self
.
dupefilter_cls
,
e
)
|