安裝 http://www.rabbitmq.com/install-standalone-mac.htmljavascript
安裝python rabbitMQ module html
1
2
3
4
5
6
7
|
pip install pika
or
easy_install pika
or
源碼
https:
/
/
pypi.python.org
/
pypi
/
pika
|
實現最簡單的隊列通訊java
send端node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/usr/bin/env python
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'localhost'
))
channel
=
connection.channel()
#聲明queue
channel.queue_declare(queue
=
'hello'
)
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange
=
'',
routing_key
=
'hello'
,
body
=
'Hello World!'
)
print
(
" [x] Sent 'Hello World!'"
)
connection.close()
|
receive端python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
#_*_coding:utf-8_*_
__author__
=
'Alex Li'
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'localhost'
))
channel
=
connection.channel()
#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue
=
'hello'
)
def
callback(ch, method, properties, body):
print
(
" [x] Received %r"
%
body)
channel.basic_consume(callback,
queue
=
'hello'
,
no_ack
=
True
)
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.start_consuming()
|
遠程鏈接rabbitmq server的話,須要配置權限噢git
首先在rabbitmq server上建立一個用戶github
1
|
sudo
rabbitmqctl add_user alex alex3714
|
同時還要配置權限,容許從外面訪問web
1
|
sudo
rabbitmqctl set_permissions -p / alex
".*"
".*"
".*"
|
set_permissions [-p vhost] {user} {conf} {write} {read}redis
The name of the virtual host to which to grant the user access, defaulting to /.算法
The name of the user to grant access to the specified virtual host.
A regular expression matching resource names for which the user is granted configure permissions.
A regular expression matching resource names for which the user is granted write permissions.
A regular expression matching resource names for which the user is granted read permissions.
客戶端鏈接的時候須要配置認證參數
1
2
3
4
5
6
|
credentials
=
pika.PlainCredentials(
'alex'
,
'alex3714'
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'10.211.55.5'
,
5672
,
'/'
,credentials))
channel
=
connection.channel()
|
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少
消息提供者代碼
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 聲明queue channel.queue_declare(queue='task_queue') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. import sys message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent %r" % message) connection.close()
消費者代碼
#_*_coding:utf-8_*_ import pika, time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(20) print(" [x] Done") print("method.delivery_tag",method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='task_queue', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
此時,先啓動消息生產者,而後再分別啓動3個消費者,經過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.
Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.
1
2
3
4
5
6
7
8
|
def
callback(ch, method, properties, body):
print
" [x] Received %r"
%
(body,)
time.sleep( body.count(
'.'
) )
print
" [x] Done"
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_consume(callback,
queue
=
'hello'
)
|
Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered
We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.
First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:
1
|
channel.queue_declare(queue
=
'hello'
, durable
=
True
)
|
Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:
1
|
channel.queue_declare(queue
=
'task_queue'
, durable
=
True
)
|
This queue_declare change needs to be applied to both the producer and consumer code.
At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.
1
2
3
4
5
6
|
channel.basic_publish(exchange
=
'',
routing_key
=
"task_queue"
,
body
=
message,
properties
=
pika.BasicProperties(
delivery_mode
=
2
,
# make message persistent
))
|
若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
1
|
channel.basic_qos(prefetch_count
=
1
)
|
生產者端
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消費者端
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,
An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
表達式符號說明:#表明一個或多個字符,*表明任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout
headers: 經過headers 來決定把消息發給哪些queue
消息publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
消息subscriber
#_*_coding:utf-8_*_ __author__ = 'Alex Li' import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.
In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).
That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.
publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
subscriber
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
To receive all the logs run:
python receive_logs_topic.py "#"
To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*"
Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical"
You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"
And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error"
To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:
1
2
3
|
fibonacci_rpc
=
FibonacciRpcClient()
result
=
fibonacci_rpc.call(
4
)
print
(
"fib(4) is %r"
%
result)
|
RPC server
#_*_coding:utf-8_*_ __author__ = 'Alex Li' import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
RPC client
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
NoSQL(NoSQL = Not Only SQL ),意即「不只僅是SQL」,泛指非關係型的數據庫,隨着互聯網web2.0網站的興起,傳統的關係數據庫在應付web2.0網站,特別是超大規模和高併發的SNS類型的web2.0純動態網站已經顯得力不從心,暴露了不少難以克服的問題,而非關係型的數據庫則因爲其自己的特色獲得了很是迅速的發展。NoSQL數據庫的產生就是爲了解決大規模數據集合多重數據種類帶來的挑戰,尤爲是大數據應用難題。
redis是業界主流的key-value nosql 數據庫之一。和Memcached相似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操做,並且這些操做都是原子性的。在此基礎上,redis支持各類不一樣方式的排序。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。
異常快速 : Redis是很是快的,每秒能夠執行大約110000設置操做,81000個/每秒的讀取操做。
支持豐富的數據類型 : Redis支持最大多數開發人員已經知道如列表,集合,可排序集合,哈希等數據類型。
操做都是原子的 : 全部 Redis 的操做都是原子,從而確保當兩個客戶同時訪問 Redis 服務器獲得的是更新後的值(最新值)。
$sudo apt-get update $sudo apt-get install redis-server
啓動 Redis
$redis-server
查看 redis 是否還在運行
$redis-cli
redis 127.0.0.1:6379>
redis 127.0.0.1:6379> ping PONG
1
2
3
4
5
6
7
|
sudo pip install redis
or
sudo easy_install redis
or
源碼安裝
詳見:https:
/
/
github.com
/
WoLpH
/
redis
-
py
|
要在Ubuntu 上安裝 Redis桌面管理,能夠從 http://redisdesktop.com/download 下載包並安裝它。
redis-py 的API的使用能夠分類爲:
一、操做模式
redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類,用於向後兼容舊版本的redis-py。
1
2
3
4
5
|
import
redis
r
=
redis.Redis(host
=
'10.211.55.4'
, port
=
6379
)
r.
set
(
'foo'
,
'Bar'
)
print
r.get(
'foo'
)
|
二、鏈接池
redis-py使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。
1. String操做
redis中的String在在內存中按照一個name對應一個value來存儲。如圖:
set(name, value, ex=None, px=None, nx=False, xx=False)
1
2
3
4
5
6
|
在Redis中設置值,默認,不存在則建立,存在則修改
參數:
ex,過時時間(秒)
px,過時時間(毫秒)
nx,若是設置爲True,則只有name不存在時,當前set操做才執行
xx,若是設置爲True,則只有name存在時,崗前set操做才執行
|
setnx(name, value)
1
|
設置值,只有name不存在時,執行設置操做(添加)
|
setex(name, value, time)
1
2
3
|
# 設置值
# 參數:
# time,過時時間(數字秒 或 timedelta對象)
|
psetex(name, time_ms, value)
1
2
3
|
# 設置值
# 參數:
# time_ms,過時時間(數字毫秒 或 timedelta對象)
|
mset(*args, **kwargs)
1
2
3
4
5
|
批量設置值
如:
mset(k1=
'v1'
, k2=
'v2'
)
或
mget({
'k1'
:
'v1'
,
'k2'
:
'v2'
})
|
get(name)
1
|
獲取值
|
mget(keys, *args)
1
2
3
4
5
|
批量獲取
如:
mget(
'ylr'
,
'wupeiqi'
)
或
r.mget([
'ylr'
,
'wupeiqi'
])
|
getset(name, value)
1
|
設置新值並獲取原來的值
|
getrange(key, start, end)
1
2
3
4
5
6
|
# 獲取子序列(根據字節獲取,非字符)
# 參數:
# name,Redis 的 name
# start,起始位置(字節)
# end,結束位置(字節)
# 如: "武沛齊" ,0-3表示 "武"
|
setrange(name, offset, value)
1
2
3
4
|
# 修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加)
# 參數:
# offset,字符串的索引,字節(一個漢字三個字節)
# value,要設置的值
|
setbit(name, offset, value)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# 對name對應值的二進制表示的位進行操做
# 參數:
# name,redis的name
# offset,位的索引(將值變換成二進制後再進行索引)
# value,值只能是 1 或 0
# 注:若是在Redis中有一個對應: n1 = "foo",
那麼字符串foo的二進制表示爲:
01100110
01101111
01101111
因此,若是執行 setbit(
'n1'
,
7
,
1
),則就會將第
7
位設置爲
1
,
那麼最終二進制則變成
01100111
01101111
01101111
,即:
"goo"
# 擴展,轉換二進制表示:
# source = "武沛齊"
source
=
"foo"
for
i
in
source:
num
=
ord
(i)
print
bin
(num).replace(
'b'
,'')
特別的,若是source是漢字
"武沛齊"
怎麼辦?
答:對於utf
-
8
,每個漢字佔
3
個字節,那麼
"武沛齊"
則有
9
個字節
對於漢字,
for
循環時候會按照 字節 迭代,那麼在迭代時,將每個字節轉換 十進制數,而後再將十進制數轉換成二進制
11100110
10101101
10100110
11100110
10110010
10011011
11101001
10111101
10010000
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
武 沛 齊
|
*用途舉例,用最省空間的方式,存儲在線用戶數及分別是哪些用戶在線
getbit(name, offset)
1
|
# 獲取name對應的值的二進制表示中的某位的值 (0或1)
|
bitcount(key, start=None, end=None)
1
2
3
4
5
|
# 獲取name對應的值的二進制表示中 1 的個數
# 參數:
# key,Redis的name
# start,位起始位置
# end,位結束位置
|
strlen(name)
1
|
# 返回name對應值的字節長度(一個漢字3個字節)
|
incr(self, name, amount=1)
1
2
3
4
5
6
7
|
# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。
# 參數:
# name,Redis的name
# amount,自增數(必須是整數)
# 注:同incrby
|
incrbyfloat(self, name, amount=1.0)
1
2
3
4
5
|
# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。
# 參數:
# name,Redis的name
# amount,自增數(浮點型)
|
decr(self, name, amount=1)
1
2
3
4
5
|
# 自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減。
# 參數:
# name,Redis的name
# amount,自減數(整數)
|
append(key, value)
1
2
3
4
5
|
# 在redis name對應的值後面追加內容
# 參數:
key, redis的name
value, 要追加的字符串
|
2. Hash操做
hash表現形式上有些像pyhton中的dict,能夠存儲一組關聯性較強的數據 , redis中Hash在內存中的存儲格式以下圖:
hset(name, key, value)
1
2
3
4
5
6
7
8
9
|
# name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改)
# 參數:
# name,redis的name
# key,name對應的hash中的key
# value,name對應的hash中的value
# 注:
# hsetnx(name, key, value),當name對應的hash中不存在當前key時則建立(至關於添加)
|
hmset(name, mapping)
1
2
3
4
5
6
7
8
|
# 在name對應的hash中批量設置鍵值對
# 參數:
# name,redis的name
# mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
# 如:
# r.hmset('xx', {'k1':'v1', 'k2': 'v2'})
|
hget(name,key)
1
|
# 在name對應的hash中獲取根據key獲取value
|
hmget(name, keys, *args)
1
2
3
4
5
6
7
8
9
10
11
|
# 在name對應的hash中獲取多個key的值
# 參數:
# name,reids對應的name
# keys,要獲取key集合,如:['k1', 'k2', 'k3']
# *args,要獲取的key,如:k1,k2,k3
# 如:
# r.mget('xx', ['k1', 'k2'])
# 或
# print r.hmget('xx', 'k1', 'k2')
|
hgetall(name)
1
|
獲取name對應
hash
的全部鍵值
|
hlen(name)
1
|
# 獲取name對應的hash中鍵值對的個數
|
hkeys(name)
1
|
# 獲取name對應的hash中全部的key的值
|
hvals(name)
1
|
# 獲取name對應的hash中全部的value的值
|
hexists(name, key)
1
|
# 檢查name對應的hash是否存在當前傳入的key
|
hdel(name,*keys)
1
|
# 將name對應的hash中指定key的鍵值對刪除
|
hincrby(name, key, amount=1)
1
2
3
4
5
|
# 自增name對應的hash中的指定key的值,不存在則建立key=amount
# 參數:
# name,redis中的name
# key, hash對應的key
# amount,自增數(整數)
|
hincrbyfloat(name, key, amount=1.0)
1
2
3
4
5
6
7
8
|
# 自增name對應的hash中的指定key的值,不存在則建立key=amount
# 參數:
# name,redis中的name
# key, hash對應的key
# amount,自增數(浮點數)
# 自增name對應的hash中的指定key的值,不存在則建立key=amount
|
hscan(name, cursor=0, match=None, count=None)
Start a full hash scan with:
HSCAN myhash 0
Start a hash scan with fields matching a pattern with:
HSCAN myhash 0 MATCH order_*
Start a hash scan with fields matching a pattern and forcing the scan command to do more scanning with:
HSCAN myhash 0 MATCH order_* COUNT 1000
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# 增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而放置內存被撐爆
# 參數:
# name,redis的name
# cursor,遊標(基於遊標分批取獲取數據)
# match,匹配指定key,默認None 表示全部的key
# count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
# 如:
# 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
# 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
# ...
# 直到返回值cursor的值爲0時,表示數據已經經過分片獲取完畢
|
hscan_iter(name, match=None, count=None)
1
2
3
4
5
6
7
8
9
|
# 利用yield封裝hscan建立生成器,實現分批去redis中獲取數據
# 參數:
# match,匹配指定key,默認None 表示全部的key
# count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
# 如:
# for item in r.hscan_iter('xx'):
# print item
|
3. list
List操做,redis中的List在在內存中按照一個name對應一個List來存儲。如圖:
lpush(name,values)
1
2
3
4
5
6
7
8
|
# 在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊
# 如:
# r.lpush('oo', 11,22,33)
# 保存順序爲: 33,22,11
# 擴展:
# rpush(name, values) 表示從右向左操做
|
lpushx(name,value)
1
2
3
4
|
# 在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊
# 更多:
# rpushx(name, value) 表示從右向左操做
|
llen(name)
1
|
# name對應的list元素的個數
|
linsert(name, where, refvalue, value))
1
2
3
4
5
6
7
|
# 在name對應的列表的某一個值前或後插入一個新值
# 參數:
# name,redis的name
# where,BEFORE或AFTER
# refvalue,標杆值,即:在它先後插入數據
# value,要插入的數據
|
r.lset(name, index, value)
1
2
3
4
5
6
|
# 對name對應的list中的某一個索引位置從新賦值
# 參數:
# name,redis的name
# index,list的索引位置
# value,要設置的值
|
r.lrem(name, value, num)
1
2
3
4
5
6
7
8
|
# 在name對應的list中刪除指定的值
# 參數:
# name,redis的name
# value,要刪除的值
# num, num=0,刪除列表中全部的指定值;
# num=2,從前到後,刪除2個;
# num=-2,從後向前,刪除2個
|
lpop(name)
1
2
3
4
|
# 在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素
# 更多:
# rpop(name) 表示從右向左操做
|
lindex(name, index)
1
|
在name對應的列表中根據索引獲取列表元素
|
lrange(name, start, end)
1
2
3
4
5
|
# 在name對應的列表分片獲取數據
# 參數:
# name,redis的name
# start,索引的起始位置
# end,索引結束位置
|
ltrim(name, start, end)
1
2
3
4
5
|
# 在name對應的列表中移除沒有在start-end索引之間的值
# 參數:
# name,redis的name
# start,索引的起始位置
# end,索引結束位置
|
rpoplpush(src, dst)
1
2
3
4
|
# 從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊
# 參數:
# src,要取數據的列表的name
# dst,要添加數據的列表的name
|
blpop(keys, timeout)
1
2
3
4
5
6
7
8
|
# 將多個列表排列,按照從左到右去pop對應列表的元素
# 參數:
# keys,redis的name的集合
# timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
# 更多:
# r.brpop(keys, timeout),從右向左獲取數據
|
brpoplpush(src, dst, timeout=0)
1
2
3
4
5
6
|
# 從一個列表的右側移除一個元素並將其添加到另外一個列表的左側
# 參數:
# src,取出並要移除元素的列表對應的name
# dst,要插入元素的列表對應的name
# timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞
|
4.set集合操做
Set操做,Set集合就是不容許重複的列表
sadd(name,values)
1# name對應的集合中添加元素
scard(name)
1獲取name對應的集合中元素個數
sdiff(keys, *args)
1在第一個name對應的集合中且不在其餘name對應的集合的元素集合
sdiffstore(dest, keys, *args)
1# 獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中
sinter(keys, *args)
1# 獲取多一個name對應集合的並集
sinterstore(dest, keys, *args)
1# 獲取多一個name對應集合的並集,再講其加入到dest對應的集合中
sismember(name, value)
1# 檢查value是不是name對應的集合的成員
smembers(name)
1# 獲取name對應的集合的全部成員
smove(src, dst, value)
1# 將某個成員從一個集合中移動到另一個集合
spop(name)
1# 從集合的右側(尾部)移除一個成員,並將其返回
srandmember(name, numbers)
1# 從name對應的集合中隨機獲取 numbers 個元素
srem(name, values)
1# 在name對應的集合中刪除某些值
sunion(keys, *args)
1# 獲取多一個name對應的集合的並集
sunionstore(dest,keys, *args)
1# 獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
1# 同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大
有序集合,在集合的基礎上,爲每元素排序;元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。
zadd(name, *args, **kwargs)
12345# 在name對應的有序集合中添加元素
# 如:
# zadd('zz', 'n1', 1, 'n2', 2)
# 或
# zadd('zz', n1=11, n2=22)
zcard(name)
1# 獲取name對應的有序集合元素的數量
zcount(name, min, max)
1# 獲取name對應的有序集合中分數 在 [min,max] 之間的個數
zincrby(name, value, amount)
1# 自增name對應的有序集合的 name 對應的分數
r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
123456789101112131415161718# 按照索引範圍獲取name對應的有序集合的元素
# 參數:
# name,redis的name
# start,有序集合索引發始位置(非分數)
# end,有序集合索引結束位置(非分數)
# desc,排序規則,默認按照分數從小到大排序
# withscores,是否獲取元素的分數,默認只獲取元素的值
# score_cast_func,對分數進行數據轉換的函數
# 更多:
# 從大到小排序
# zrevrange(name, start, end, withscores=False, score_cast_func=float)
# 按照分數範圍獲取name對應的有序集合的元素
# zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
# 從大到小排序
# zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
zrank(name, value)
1234# 獲取某個值在 name對應的有序集合中的排行(從 0 開始)
# 更多:
# zrevrank(name, value),從大到小排序
zrem(name, values)
1
2
3
|
# 刪除name對應的有序集合中值是values的成員
# 如:zrem('zz', ['s1', 's2'])
|
zremrangebyrank(name, min, max)
1
|
# 根據排行範圍刪除
|
zremrangebyscore(name, min, max)
1
|
# 根據分數範圍刪除
|
zscore(name, value)
1
|
# 獲取name對應有序集合中 value 對應的分數
|
zinterstore(dest, keys, aggregate=None)
1
2
|
# 獲取兩個有序集合的交集,若是遇到相同值不一樣分數,則按照aggregate進行操做
# aggregate的值爲: SUM MIN MAX
|
zunionstore(dest, keys, aggregate=None)
1
2
|
# 獲取兩個有序集合的並集,若是遇到相同值不一樣分數,則按照aggregate進行操做
# aggregate的值爲: SUM MIN MAX
|
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
1
|
# 同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做
|
其餘經常使用操做
delete(*names)
1# 根據刪除redis中的任意數據類型
exists(name)
1# 檢測redis的name是否存在
keys(pattern='*')
1234567# 根據模型獲取redis的name
# 更多:
# KEYS * 匹配數據庫中全部 key 。
# KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
# KEYS h*llo 匹配 hllo 和 heeeeello 等。
# KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
expire(name ,time)
1# 爲某個redis的某個name設置超時時間
rename(src, dst)
1# 對redis的name重命名爲
move(name, db))
1# 將redis的某個值移動到指定的db下
randomkey()
1# 隨機獲取一個redis的name(不刪除)
type(name)
1# 獲取name對應值的類型
scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)
1# 同字符串操做,用於增量迭代獲取key
redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import
redis
pool
=
redis.ConnectionPool(host
=
'10.211.55.4'
, port
=
6379
)
r
=
redis.Redis(connection_pool
=
pool)
# pipe = r.pipeline(transaction=False)
pipe
=
r.pipeline(transaction
=
True
)
pipe.
set
(
'name'
,
'alex'
)
pipe.
set
(
'role'
,
'sb'
)
pipe.execute()
|
發佈者:服務器
訂閱者:Dashboad和數據處理
Demo以下:
import redis class RedisHelper: def __init__(self): self.__conn = redis.Redis(host='10.211.55.4') self.chan_sub = 'fm104.5' self.chan_pub = 'fm104.5' def public(self, msg): self.__conn.publish(self.chan_pub, msg) return True def subscribe(self): pub = self.__conn.pubsub() pub.subscribe(self.chan_sub) pub.parse_response() return pub
訂閱者:
1
2
3
4
5
6
7
8
9
10
11
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from
monitor.RedisHelper
import
RedisHelper
obj
=
RedisHelper()
redis_sub
=
obj.subscribe()
while
True
:
msg
=
redis_sub.parse_response()
print
msg
|
發佈者:
1
2
3
4
5
6
7
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from
monitor.RedisHelper
import
RedisHelper
obj
=
RedisHelper()
obj.public(
'hello'
)
|
更多參見:https://github.com/andymccurdy/redis-py/
http://doc.redisfans.com/
Go for legacy relational databases (RDBMS) when:
Go for NoSQL databases when:
source page https://www.quora.com/When-should-you-use-NoSQL-vs-regular-RDBMS
準備環境:
由於找不到可用的1000M網絡機器,使用一根直通線將兩臺筆記本連起來組成1000M Ethernet網。沒錯,是直通線如今網卡都能自適應交叉線、直通線,速度不受影響,用了一段時間機器也沒出問題。
服務端:T420 i5-2520M(2.5G)/8G ubuntu 11.10
客戶端:Acer i5-2430M(2.4G)/4G mint 11
redis版本:2.6.9
測試腳本:./redis-benchmark -h xx -p xx -t set -q -r 1000 -l -d 20
長度 | 速度/sec | 帶寬(MByte/s) 發送+接收 | CPU | CPU Detail |
20Byte | 17w | 24M+12M | 98.00% | Cpu0 : 21.0%us, 40.7%sy, 0.0%ni, 4.3%id, 0.0%wa, 0.0%hi, 34.0%si, 0.0%st |
100Byte | 17w | 37M+12M | 97.00% | Cpu0 : 20.3%us, 37.9%sy, 0.0%ni, 7.0%id, 0.0%wa, 0.0%hi, 34.9%si, 0.0%st |
512Byte | 12w | 76M+9M | 87.00% | Cpu0 : 20.9%us, 33.2%sy, 0.0%ni, 25.6%id, 0.0%wa, 0.0%hi, 20.3%si, 0.0%st |
1K | 9w | 94M+8M | 81.00% | Cpu0 : 19.9%us, 30.2%sy, 0.0%ni, 34.2%id, 0.0%wa, 0.0%hi, 15.6%si, 0.0%st |
2K | 5w | 105M+6M | 77.00% | Cpu0 : 18.0%us, 32.0%sy, 0.0%ni, 34.7%id, 0.0%wa, 0.0%hi, 15.3%si, 0.0%st |
5K | 2.2w | 119M+3.2M | 77.00% | Cpu0 : 22.5%us, 32.8%sy, 0.0%ni, 32.8%id, 0.0%wa, 0.0%hi, 11.9%si, 0.0%st |
10K | 1.1w | 119M+1.7M | 70.00% | Cpu0 : 18.2%us, 29.8%sy, 0.0%ni, 42.7%id, 0.0%wa, 0.0%hi, 9.3%si, 0.0%st |
20K | 0.57w | 120M+1M | 58.00% | Cpu0 : 17.8%us, 26.4%sy, 0.0%ni, 46.2%id, 0.0%wa, 0.0%hi, 9.6%si, 0.0%st |
value 在1K以上時,1000M網卡輕鬆的被跑慢,並且redis-server cpu連一個核心都沒佔用到,可見redis高效,redis的服務也不須要過高配置,瓶頸在網卡速度。
整理看redis的us都在20%左右,用戶層代碼資源佔用比例都很小。