python學習筆記_week11_rabbitMQ_Redis

1、RabbitMQ

python的queue(生產者消費者模型)分爲線程queue(不能跨進程)和進程queue(父進程與子進程進行交互或者同屬於同一父進程下多個子進程進行交互),那兩個獨立的進程間的交互(不一樣程序,python-java,python-php)--(與json,pickle相似)怎麼辦呢----找個中間代理。好比QQ與Word之間的交互,①能夠創建socket通訊②經過disk(硬盤)交互---速度慢③找個中間代理broker(也是個獨立的程序)---能夠爲多個程序服務,維護方便(封裝好的socket)javascript

安裝:RabbitMQ是用erlang語言開發的,因此先安裝erlang http://www.erlang.org/downloads,再安裝RabbitMQ http://www.rabbitmq.com/install-standalone-mac.html,python中操做RabbitMQ用pika、Celery、Haigha模塊php

實現最簡單的隊列通訊html

producerjava

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3     'localhost'))
 4 channel = connection.channel() #聲明一個管道
 5 # 聲明queue
 6 channel.queue_declare(queue='hello3')
 7 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
 8 channel.basic_publish(exchange='',
 9                       routing_key='hello3',#queue名字
10                       body='Hello World!')#你的消息內容
11 print(" [x] Sent 'Hello World!'")
12 connection.close()
producer

consumerpython

 1 import pika,time
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3     'localhost'))
 4 channel = connection.channel()
 5 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
 6 # We could avoid that if we were sure that the queue already exists. For example if send.py program
 7 # was run before. But we're not yet sure which program to run first. In such cases it's a good
 8 # practice to repeat declaring the queue in both programs.
 9 channel.queue_declare(queue='hello3')
10 def callback(ch, method, properties, body): #回調函數
11     print("--->",ch,method,properties) #ch,管道的內存對象地址,method包含你要發消息給誰的信息,通常不用管
12     time.sleep(30)
13     print(" [x] Received %r" % body)
14     ch.basic_ack(delivery_tag=method.delivery_tag)#須要手動確認
15 channel.basic_consume(    #消費消息
16                       callback,  #若是收到消息就調用callback函數來處理消息
17                       queue='hello3',
18                       #no_ack=True) #no acknowledgement 不確認,通常不用
19                       )
20 print(' [*] Waiting for messages. To exit press CTRL+C')
21 channel.start_consuming()
consumer

遠程鏈接rabbitmq server的話,須要配置權限 噢 ,首先在rabbitmq server上建立一個用戶 sudo rabbitmqctl  add_user alex alex3714 同時還要配置權限,容許從外面訪問sudo rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"linux

set_permissions [-p vhost] {user} {conf} {write} {read}  ----vhost The name of the virtual host to which to grant the user access, defaulting to /.git

user The name of the user to grant access to the specified virtual host.github

conf A regular expression matching resource names for which the user is granted configure permissions.redis

write A regular expression matching resource names for which the user is granted write permissions.shell

read A regular expression matching resource names for which the user is granted read permissions.

客戶端鏈接的時候須要配置認證參數

1 credentials = pika.PlainCredentials('alex', 'alex3714')
2 connection = pika.BlockingConnection(pika.ConnectionParameters(
3     '10.211.55.5',5672,'/',credentials))
4 channel = connection.channel()
View Code

(一)Work Queues

在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少。消息分發輪詢,誰先啓動,producer先發給誰。

此時,先啓動消息生產者,而後再分別啓動3個消費者,經過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上  

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.

Using this code(沒有no_ack) we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered

(二)消息持久化

We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable  use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable

1 channel.queue_declare(queue='hello', durable=True)

Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:

1 channel.queue_declare(queue='task_queue', durable=True)

This queue_declare change needs to be applied to both the producer and consumer code.

At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.

---durable只是讓queue持久化,消息並無,想讓消息持久化,得加上 properties=pika.BasicProperties(delivery_mode = 2, )

1 channel.basic_publish(exchange='', 2                       routing_key="task_queue", 3                       body=message, 4                       properties=pika.BasicProperties( 5                          delivery_mode = 2, # make message persistent
6                       ))

(三)消息公平分發

若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

 1 def callback(ch, method, properties, body): #回調函數
 2     print("--->",ch,method,properties) #ch,管道的內存對象地址,method包含你要發消息給誰的信息,通常不用管
 3     #time.sleep(30)
 4     print(" [x] Received %r" % body)
 5     ch.basic_ack(delivery_tag=method.delivery_tag)#須要手動確認
 6 channel.basic_qos(prefetch_count=1)
 7 channel.basic_consume(    #消費消息
 8                       callback,  #若是收到消息就調用callback函數來處理消息
 9                       queue='hello3',
10                       #no_ack=True) #no acknowledgement 不確認,通常不用
11                       )

(四)Publish\Subscribe(消息發佈\訂閱)

以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播(收音機)的效果,這時候就要用到exchange了,

An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息

fanout: 全部bind到此exchange的queue均可以接收消息  direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息 topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息  

表達式符號說明:#表明一個或多個字符,*表明任何字符  例:#.a會匹配a.a,aa.a,aaa.a等   *.a會匹配a.a,b.a,c.a等  注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout 

headers: 經過headers 來決定把消息發給哪些queue

publisher

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     host='localhost'))
 5 channel = connection.channel()
 6 channel.exchange_declare(exchange='logs',
 7                          exchange_type='fanout')
 8 # message = ' '.join(sys.argv[1:]) or "info: Hello World!"
 9 message ="info: Hello World!"
10 channel.basic_publish(exchange='logs',
11                       routing_key='',
12                       body=message)
13 print(" [x] Sent %r" % message)
14 connection.close()
publisher

subscriber

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3     host='localhost'))
 4 channel = connection.channel()
 5 channel.exchange_declare(exchange='logs',
 6                          exchange_type='fanout')
 7 result = channel.queue_declare(exclusive=True)  #exclusive(排他的,惟一的)不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
 8 queue_name = result.method.queue #隨機生成的queue名
 9 print("random queue_name:",queue_name)
10 channel.queue_bind(exchange='logs',
11                    queue=queue_name)
12 print(' [*] Waiting for logs. To exit press CTRL+C')
13 def callback(ch, method, properties, body):
14     print(" [x] %r" % body)
15 channel.basic_consume(callback,
16                       queue=queue_name,
17                       no_ack=True)
18 channel.start_consuming()
subscriber

 注意:廣播,是實時的,收不到就沒了,消息不會存下來,相似收音機。

(五)有選擇的接收消息(exchange type=direct) 

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

 

publisher

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     host='localhost'))
 5 channel = connection.channel()
 6 channel.exchange_declare(exchange='direct_logs',
 7                          exchange_type='direct')
 8 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',
11                       routing_key=severity,
12                       body=message)
13 print(" [x] Sent %r:%r" % (severity, message))
14 connection.close()
publisher

subscriber

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     host='localhost'))
 5 channel = connection.channel()
 6 channel.exchange_declare(exchange='direct_logs',
 7                          exchange_type='direct')
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 severities = sys.argv[1:]
11 if not severities:
12     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
13     sys.exit(1)
14 print(severities)
15 for severity in severities:
16     channel.queue_bind(exchange='direct_logs',
17                        queue=queue_name,
18                        routing_key=severity)
19 print(' [*] Waiting for logs. To exit press CTRL+C')
20 def callback(ch, method, properties, body):
21     print(" [x] %r:%r" % (method.routing_key, body))
22 channel.basic_consume(callback,
23                       queue=queue_name,
24                       no_ack=True)
25 channel.start_consuming()
subscriber

(六)更細緻的消息過濾

Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).

That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.

publisher

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     host='localhost'))
 5 channel = connection.channel()
 6 channel.exchange_declare(exchange='topic_logs',
 7                          exchange_type='topic')
 8 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' # .info的格式
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='topic_logs',
11                       routing_key=routing_key,
12                       body=message)
13 print(" [x] Sent %r:%r" % (routing_key, message))
14 connection.close()
publisher

subscriber

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     host='localhost'))
 5 channel = connection.channel()
 6 channel.exchange_declare(exchange='topic_logs',
 7                          exchange_type='topic')
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 binding_keys = sys.argv[1:]
11 if not binding_keys:
12     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
13     sys.exit(1)
14 for binding_key in binding_keys:
15     channel.queue_bind(exchange='topic_logs',
16                        queue=queue_name,
17                        routing_key=binding_key)
18 
19 print(' [*] Waiting for logs. To exit press CTRL+C')
20 def callback(ch, method, properties, body):
21     print(" [x] %r:%r" % (method.routing_key, body))
22 channel.basic_consume(callback,
23                       queue=queue_name,
24                       no_ack=True)
25 channel.start_consuming()
subscriber

To receive all the logs run:

python receive_logs_topic.py "#" 

To receive all logs from the facility "kern":

python receive_logs_topic.py "kern.*" 

Or if you want to hear only about "critical" logs:

python receive_logs_topic.py "*.critical" 

You can create multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical" 

And to emit a log with a routing key "kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error"

(七)Remote procedure call (RPC)

To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received.---snmp也是一種rpc

server

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     host='localhost'))
 5 channel = connection.channel()
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n == 0:
 9         return 0
10     elif n == 1:
11         return 1
12     else:
13         return fib(n - 1) + fib(n - 2)
14 def on_request(ch, method, props, body):
15     n = int(body)
16     print(" [.] fib(%s)" % n)
17     response = fib(n)
18     ch.basic_publish(exchange='',
19                      routing_key=props.reply_to,#props獲取客戶端的properties中的reply_to
20                      properties=pika.BasicProperties(correlation_id= \
21                                                          props.correlation_id),
22                      body=str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag) #確保消息消費了
24 channel.basic_consume(on_request, queue='rpc_queue')
25 print(" [x] Awaiting RPC requests")
26 channel.start_consuming()
server

client

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
 6             host='localhost'))
 7         self.channel = self.connection.channel()
 8         result = self.channel.queue_declare(exclusive=True)
 9         self.callback_queue = result.method.queue
10         self.channel.basic_consume(self.on_response, #只要一收到消息就調用on_response
11                                    no_ack=True,
12                                    queue=self.callback_queue)
13     def on_response(self, ch, method, props, body):
14         if self.corr_id == props.correlation_id: #肯定我發給服務器的命令就是我想要的結果,防止發送多條命令後混亂
15             self.response = body  #收到消息後self.response就不爲none了
16     def call(self, n):
17         self.response = None
18         self.corr_id = str(uuid.uuid4()) #隨機生成一個肯定且惟一的uuid
19         self.channel.basic_publish(exchange='',
20                                    routing_key='rpc_queue',
21                                    properties=pika.BasicProperties(
22                                        reply_to=self.callback_queue, #讓服務器端的消息返回到callback_queue裏
23                                        correlation_id=self.corr_id,
24                                    ),
25                                    body=str(n)) #發的消息
26         while self.response is None:
27             self.connection.process_data_events() #非阻塞(每一個一段時間檢測一次)版的sart_consuming---收消息用的
28             print("no msg...")
29             time.sleep(0.5)
30         return int(self.response)
31 fibonacci_rpc = FibonacciRpcClient() #實例化
32 print(" [x] Requesting fib(6)")
33 response = fibonacci_rpc.call(6)
34 print(" [.] Got %r" % response)
client

2、redis

數據傳輸(一、文件(json,pickle)---效率低;二、socket)

緩存(MongoDB,redis(半持久化,默認存在內存裏,但也可改配置存在硬盤),memcached(只能存在內存裏),還能夠本身寫緩存)

    

---name和age是共享的數據,ex表明只存活2s

redis是一個key-value存儲系統。和Memcached相似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操做,並且這些操做都是原子性的。在此基礎上,redis支持各類不一樣方式的排序。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。

(一)Redis優勢

1.異常快速 : Redis是很是快的,每秒能夠執行大約110000設置操做,81000個/每秒的讀取操做。

2.支持豐富的數據類型 : Redis支持最大多數開發人員已經知道如列表,集合,可排序集合,哈希等數據類型。這使得在應用中很容易解決的各類問題,由於咱們知道哪些問題處理使用哪一種數據類型更好解決。

3.操做都是原子的 : 全部 Redis 的操做都是原子,從而確保當兩個客戶同時訪問 Redis 服務器獲得的是更新後的值(最新值)。

4.MultiUtility工具:Redis是一個多功能實用工具,能夠在不少如:緩存,消息傳遞隊列中使用(Redis原生支持發佈/訂閱),在應用程序中,如:Web應用程序會話,網站頁面點擊數等任何短暫的數據;

(二)安裝Redis環境

要在 Ubuntu 上安裝 Redis,打開終端,而後輸入如下命令:
$sudo apt-get update
$sudo apt-get install redis-server
這將在您的計算機上安裝Redis

啓動 Redis

$redis-server

查看 redis 是否還在運行

$redis-cli
這將打開一個 Redis 提示符,以下圖所示:
redis 127.0.0.1:6379>
在上面的提示信息中:127.0.0.1 是本機的IP地址,6379是 Redis 服務器運行的端口。如今輸入 PING 命令,以下圖所示:
redis 127.0.0.1:6379> ping
PONG
這說明如今你已經成功地在計算機上安裝了 Redis。
(三)Python操做Redis
安裝redis模塊

(四)在Ubuntu上安裝Redis桌面管理器

要在Ubuntu 上安裝 Redis桌面管理,能夠從 http://redisdesktop.com/download 下載包並安裝它。Redis 桌面管理器會給你用戶界面來管理 Redis 鍵和數據。

(五)Redis API使用

redis-py 的API的使用能夠分類爲:1.鏈接方式 2.鏈接池 3.操做管道 ①String 操做 ②Hash 操做 ③List 操做 ④Set 操做 ⑤Sort Set 操做 4.管道 5.發佈訂閱

(六)鏈接方式

一、操做模式

redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類,用於向後兼容舊版本的redis-py。

1 import redis
2 r = redis.Redis(host='10.211.55.4', port=6379)
3 r.set('foo', 'Bar')
4 print(r.get("foo")) #bytes類型,python3.x裏涉及到socket都是bytes類型

二、鏈接池redis-py使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。

1 import redis
2 pool = redis.ConnectionPool(host='192.168.1.1', port=6379)
3 r = redis.Redis(connection_pool=pool)
4 r.set('foo', 'Bar')
5 print(r.get('foo'))

(七)操做

1.String操做

redis中的String在在內存中按照一個name對應一個value來存儲。

①set(name, value, ex=None, px=None, nx=False, xx=False)

在Redis中設置值,默認,不存在則建立,存在則修改。 參數:ex,過時時間(秒),px,過時時間(毫秒)nx,若是設置爲True,則只有name不存在時,當前set操做才執行xx,若是設置爲True,則只有name存在時,崗前set操做才執行

②setnx(name, value) 設置值,只有name不存在時,執行設置操做(添加)

③setex(name, value, time)  設置值。參數:time,過時時間(數字秒 或 timedelta對象)

④psetex(name, time_ms, value) 設置值。參數:time_ms,過時時間(數字毫秒 或 timedelta對象)

⑤mset(*args, **kwargs )批量設置值。如:mset(k1='v1',k2='v2')mget({'k1':'v1','k2':'v2'})

⑥get(name) 獲取值

⑦mget(keys, *args)批量獲取如:mget('ylr','wupeiqi')r.mget(['ylr','wupeiqi'])

⑧getset(name, value) 設置新值並獲取原來的值

⑨getrange(key, start, end)獲取子序列(根據字節獲取,非字符)參數:# name,Redis 的 name# start,起始位置(字節)# end,結束位置(字節)# 如: "武沛齊" ,0-3表示 "武",至關於切片

⑩setrange(name, offset, value)# 修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加)。# 參數:# offset,字符串的索引,字節(一個漢字三個字節)# value,要設置的值

⑪setbit(name, offset, value)

 1 # 對name對應值的二進制表示的位進行操做
 2 # 參數:
 3     # name,redis的name
 4     # offset,位的索引(將值變換成二進制後再進行索引)
 5     # value,值只能是 1 或 0
 6 # 注:若是在Redis中有一個對應: n1 = "foo",
 7         那麼字符串foo的二進制表示爲:01100110 01101111 01101111
 8     因此,若是執行 setbit('n1', 7, 1),則就會將第7位設置爲1,
 9         那麼最終二進制則變成 01100111 01101111 01101111,即:"goo"
10 # 擴展,轉換二進制表示:
11     # source = "武沛齊"
12     source = "foo"
13     for i in source:
14         num = ord(i) #ord(i)獲取在ascii碼錶的位置,bin(num)轉化爲二進制
15         print bin(num).replace('b','')
16     特別的,若是source是漢字 "武沛齊"怎麼辦?
17     答:對於utf-8,每個漢字佔 3 個字節,那麼 "武沛齊" 則有 9個字節
18        對於漢字,for循環時候會按照 字節 迭代,那麼在迭代時,將每個字節轉換 十進制數,而後再將十進制數轉換成二進制
19         11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000
20         -------------------------- ----------------------------- -----------------------------
21                     武                         沛                           齊

用途舉例,用最省空間的方式,存儲在線用戶數及分別是哪些用戶在線。新浪微博,幾億用戶,MySQL裏select大於500萬就慢了且佔內存較大,因此效率不高。bitcount  顯示字符串裏有幾個1    每登陸一個用戶將其id(如60)   在字符串n5裏setbit 爲1(---setbit n5 60 1)   想知道有幾個用戶 bitcount n5  想知道哪些用戶在線循環這個字符串(n5)就能夠了 好比getbit n5 60---返回1,表明id爲60的這位用戶在線,0表明不在線,這樣用幾十兆(比用MySQL小多了)就能實現該功能。

⑫getbit(name, offset) # 獲取name對應的值的二進制表示中的某位的值 (0或1)

⑬bitcount(key, start=None, end=None) # 獲取name對應的值的二進制表示中 1 的個數 # 參數:# key,Redis的name  # start,位起始位置  # end,位結束位置

⑭strlen(name) # 返回name對應值的字節長度(一個漢字3個字節)

⑮incr(self, name, amount=1)# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。# 參數:# name,Redis的name# amount,自增數(必須是整數)# 注:同incrby

-----可用來計數在線用戶 incr login_users --- 輸入一次增長1   離線decr login_users -----輸入一次減1

⑯incrbyfloat(self, name, amount=1.0)# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。# 參數:# name,Redis的name # amount,自增數(浮點型)

⑰decr(self, name, amount=1)# 自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減。# 參數:# name,Redis的name # amount,自減數(整數)

⑱append(key, value)# 在redis name對應的值後面追加內容# 參數:key, redis的name  value, 要追加的字符串

2.Hash操做,redis中Hash在內存中的存儲格式以下圖:

①hset(name, key, value) # name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改)# 參數:# name,redis的name # key,name對應的hash中的key # value,name對應的hash中的value # 注:# hsetnx(name, key, value),當name對應的hash中不存在當前key時則建立(至關於添加)

②hmset(name, mapping) # 在name對應的hash中批量設置鍵值對 # 參數: # name,redis的name # mapping,字典,如:{'k1':'v1', 'k2': 'v2'} # 如:# r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

③hget(name,key) # 在name對應的hash中獲取根據key獲取value

④hmget(name, keys, *args) # 在name對應的hash中獲取多個key的值# 參數:# name,reids對應的name # keys,要獲取key集合,如:['k1', 'k2', 'k3'] # *args,要獲取的key,如:k1,k2,k3 # 如:# r.mget('xx', ['k1', 'k2']) # 或# print r.hmget('xx', 'k1', 'k2')

⑤hgetall(name) #獲取name對應hash的全部鍵-值

⑥hkeys(name) # 獲取name對應的hash中全部的key的值

⑦hvals(name) # 獲取name對應的hash中全部的value的值

⑧hexists(name, key) # 檢查name對應的hash是否存在當前傳入的key

⑨hdel(name,*keys) # 將name對應的hash中指定key的鍵值對刪除

⑩hincrby(name, key, amount=1) # 自增name對應的hash中的指定key的值,不存在則建立key=amount # 參數:# name,redis中的name # key, hash對應的key # amount,自增數(整數)

⑪hincrbyfloat(name, key, amount=1.0) # 自增name對應的hash中的指定key的值,不存在則建立key=amount # 參數:# name,redis中的name # key, hash對應的key # amount,自增數(浮點數) # 自增name對應的hash中的指定key的值,不存在則建立key=amount

⑫hscan(name, cursor=0, match=None, count=None)

Start a full hash scan with:

HSCAN myhash 0

Start a hash scan with fields matching a pattern with:

HSCAN myhash 0 MATCH order_*

Start a hash scan with fields matching a pattern and forcing the scan command to do more scanning with:

HSCAN myhash 0 MATCH order_* COUNT 1000

 1 # 增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而放置內存被撐爆
 2 # 參數:
 3     # name,redis的name
 4     # cursor,遊標(基於遊標分批取獲取數據)
 5     # match,匹配指定key,默認None 表示全部的key
 6     # count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
 7 # 如:
 8     # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
 9     # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
10     # ...
11     # 直到返回值cursor的值爲0時,表示數據已經經過分片獲取完畢

⑬hscan_iter(name, match=None, count=None)

1 # 利用yield封裝hscan建立生成器,實現分批去redis中獲取數據
2 # 參數:
3     # match,匹配指定key,默認None 表示全部的key
4     # count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
5 # 如:
6     # for item in r.hscan_iter('xx'):
7     #     print item

3. list

List操做,redis中的List在在內存中按照一個name對應一個List來存儲。如圖:

①lpush(name,values) # 在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊 # 如:# r.lpush('oo', 11,22,33) # 保存順序爲: 33,22,11 # 擴展:# rpush(name, values) 表示從右向左操做

②lpushx(name,value) # 在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊 # 更多:# rpushx(name, value) 表示從右向左操做

③llen(name) # name對應的list元素的個數

④linsert(name, where, refvalue, value)) # 在name對應的列表的某一個值前或後插入一個新值 # 參數:# name,redis的name # where,BEFORE或AFTER # refvalue,標杆值,即:在它先後插入數據 # value,要插入的數據

⑤r.lset(name, index, value) # 對name對應的list中的某一個索引位置從新賦值 # 參數:# name,redis的name # index,list的索引位置 # value,要設置的值

⑥r.lrem(name, value, num) # 在name對應的list中刪除指定的值# 參數:# name,redis的name # value,要刪除的值 # num,  num=0,刪除列表中全部的指定值;# num=2,從前到後,刪除2個;# num=-2,從後向前,刪除2個

⑦lpop(name) # 在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素 # 更多:# rpop(name) 表示從右向左操做

⑧lindex(name, index) 在name對應的列表中根據索引獲取列表元素

⑨lrange(name, start, end) # 在name對應的列表分片獲取數據 # 參數:# name,redis的name # start,索引的起始位置 # end,索引結束位置

⑩ltrim(name, start, end) # 在name對應的列表中移除沒有在start-end索引之間的值 # 參數:# name,redis的name # start,索引的起始位置 # end,索引結束位置

⑪rpoplpush(src, dst) # 從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊 # 參數:# src,要取數據的列表的name # dst,要添加數據的列表的name

⑫blpop(keys, timeout)  # 將多個列表排列,按照從左到右去pop對應列表的元素 # 參數:# keys,redis的name的集合 # timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞 # 更多:# r.brpop(keys, timeout),從右向左獲取數據

⑬brpoplpush(src, dst, timeout=0) # 從一個列表的右側移除一個元素並將其添加到另外一個列表的左側 # 參數:# src,取出並要移除元素的列表對應的name# dst,要插入元素的列表對應的name # timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞

⑭自定義增量迭代

 1 # 因爲redis類庫中沒有提供對列表元素的增量迭代,若是想要循環name對應的列表的全部元素,那麼就須要:
 2     # 一、獲取name對應的全部列表
 3     # 二、循環列表
 4 # 可是,若是列表很是大,那麼就有可能在第一步時就將程序的內容撐爆,全部有必要自定義一個增量迭代的功能:
 5 def list_iter(name):
 6     """
 7     自定義redis列表增量迭代
 8     :param name: redis中的name,即:迭代name對應的列表
 9     :return: yield 返回 列表元素
10     """
11     list_count = r.llen(name)
12     for index in xrange(list_count):
13         yield r.lindex(name, index)
14 # 使用
15 for item in list_iter('pp'):
16     print item

4.set集合操做 ----Set操做,Set集合就是不容許重複的列表

①sadd(name,values) # name對應的集合中添加元素

②scard(name) 獲取name對應的集合中元素個數

③sdiff(keys, *args) 在第一個name對應的集合中且不在其餘name對應的集合的元素集合

④sdiffstore(dest, keys, *args) # 獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中

⑤sinter(keys, *args) # 獲取多一個name對應集合的交集

⑥sinterstore(dest, keys, *args) # 獲取多一個name對應集合的交集,再講其加入到dest對應的集合中

⑦sismember(name, value) # 檢查value是不是name對應的集合的成員

⑧smembers(name) # 獲取name對應的集合的全部成員

⑨smove(src, dst, value) # 將某個成員從一個集合中移動到另一個集合

⑩spop(name) # 從集合的右側(尾部)移除一個成員,並將其返回

⑪srandmember(name, numbers) # 從name對應的集合中隨機獲取 numbers 個元素

⑫srem(name, values) # 在name對應的集合中刪除某些值

⑬sunion(keys, *args) # 獲取多一個name對應的集合的並集

⑭sunionstore(dest,keys, *args) # 獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中

⑮sscan(name, cursor=0, match=None, count=None)

⑯sscan_iter(name, match=None, count=None) # 同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大

5.有序集合,在集合的基礎上,爲每元素排序;元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。

 ①zadd(name, *args, **kwargs) # 在name對應的有序集合中添加元素如:# zadd('zz', 'n1', 1, 'n2', 2)# 或 # zadd('zz', n1=11, n2=22),命令行中是先分數,後key

②zcard(name) # 獲取name對應的有序集合元素的數量

③zcount(name, min, max) # 獲取name對應的有序集合中分數 在 [min,max] 之間的個數

④zincrby(name, value, amount) # 自增name對應的有序集合的 name 對應的分數

⑤r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float) 

 1 # 按照索引範圍獲取name對應的有序集合的元素
 2 # 參數:
 3     # name,redis的name
 4     # start,有序集合索引發始位置(非分數)
 5     # end,有序集合索引結束位置(非分數)
 6     # desc,排序規則,默認按照分數從小到大排序
 7     # withscores,是否獲取元素的分數,默認只獲取元素的值
 8     # score_cast_func,對分數進行數據轉換的函數
 9 # 更多:
10     # 從大到小排序
11     # zrevrange(name, start, end, withscores=False, score_cast_func=float)
12     # 按照分數範圍獲取name對應的有序集合的元素
13     # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
14     # 從大到小排序
15     # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

⑥zrank(name, value) # 獲取某個值在 name對應的有序集合中的排行(從 0 開始)# 更多:# zrevrank(name, value),從大到小排序

⑦zrem(name, values)#  刪除name對應的有序集合中值是values的成員 # 如:zrem('zz', ['s1', 's2'])

⑧zremrangebyrank(name, min, max)  # 根據排行範圍刪除

⑨zremrangebyscore(name, min, max) # 根據分數範圍刪除

⑩zscore(name, value) # 獲取name對應有序集合中 value 對應的分數

⑪zinterstore(dest, keys, aggregate=None) # 獲取兩個有序集合的交集,按照aggregate進行操做 # aggregate的值爲:  SUM  MIN  MAX ---可應用於成績單不一樣學科分數及總分

⑫zunionstore(dest, keys, aggregate=None) # 獲取兩個有序集合的並集,按照aggregate進行操做 # aggregate的值爲:  SUM  MIN  MAX

⑬zscan(name, cursor=0, match=None, count=None, score_cast_func=float)

⑭zscan_iter(name, match=None, count=None,score_cast_func=float) # 同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做

6.其餘經常使用操做

①delete(*names) # 根據刪除redis中的任意數據類型

②exists(name) # 檢測redis的name是否存在

③keys(pattern='*') 

1 # 根據模型獲取redis的name
2 # 更多:
3     # KEYS * 匹配數據庫中全部 key 。
4     # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
5     # KEYS h*llo 匹配 hllo 和 heeeeello 等。
6     # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

④expire(name ,time) # 爲某個redis的某個name設置超時時間

⑤rename(src, dst) # 對redis的name重命名爲

⑥move(name, db)) # 將redis的某個值移動到指定的db下,該db有該值就不移動,無該值就移動

⑦randomkey() # 隨機獲取一個redis的name(不刪除)

⑧type(name) # 獲取name對應值的類型

⑨scan(cursor=0, match=None, count=None)

⑩scan_iter(match=None, count=None) # 同字符串操做,用於增量迭代獲取key

(八)管道

redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。

1 import redis
2 pool = redis.ConnectionPool(host='192.168.211.128', port=6379,db=2)
3 r = redis.Redis(connection_pool=pool)
4 # pipe = r.pipeline(transaction=False)
5 pipe = r.pipeline(transaction=True)
6 pipe.set('name', 'alex')
7 pipe.set('role', 'dawang')
8 pipe.execute()

(九)發佈訂閱

發佈者:服務器

訂閱者:Dashboad和數據處理

Demo以下:

 1 import redis
 2 class RedisHelper:
 3     def __init__(self):
 4         self.__conn = redis.Redis(host='192.168.211.128')
 5         self.chan_sub = 'fm104.5'
 6         self.chan_pub = 'fm104.5'
 7     def public(self, msg):
 8         self.__conn.publish(self.chan_pub, msg)
 9         return True
10     def subscribe(self):
11         pub = self.__conn.pubsub() #打開收音機
12         pub.subscribe(self.chan_sub) #調頻道
13         pub.parse_response() #準備接收,再調用一次才真正接收
14         return pub
redis_helper

訂閱者

1 from redis_helper import RedisHelper
2 obj = RedisHelper()
3 redis_sub = obj.subscribe()
4 while True:
5     msg = redis_sub.parse_response()
6     print(msg)
redis_sub

發佈者

1 from redis_helper import RedisHelper
2 obj = RedisHelper()
3 obj.public('hello')
redis_pub

更多參考 https://github.com/andymccurdy/redis-py/   http://doc.redisfans.com/

做業:

題目:rpc命令端

需求:

1.能夠異步的執行多個命令

2.對多臺機器

>>:run "df -h" --hosts 192.168.3.55 10.4.3.4  ---正常的ssh就開始等結果了,但這裏要求不阻塞返回task id
task id: 45334
>>: check_task 45334  ---有返回的就返回,要分清兩個機器的結果
>>:

相關文章
相關標籤/搜索