Python自學day-11 (RabbitMQ、持久化、Exchange、RPC、Redis)

1、RabbitMQ概述html

  RabbitMQ是一種消息隊列,是一個公共的消息中間件,用於不一樣進程之間的通信。python

  除了RabbitMQ之外,還有ZeroMQ、ActiveMQ等等。正則表達式

  前面學習了兩種隊列:redis

  1. 線程 QUEUE:只能用於線程間通訊,不能跨進程。
  2. 進程 QUEUE:只能用於父進程與子進程之間通訊,或同屬於同一父進程下的多個子進程之間。兩個徹底獨立的python程序是沒法經過這種隊列通信的。

  RabbitMQ是用erlang語言開發的,依賴於erlang。在Windows上安裝RabbitMQ須要安裝erlang環境。數據庫

  RabbitMQ支持多種語言接口,Python主要使用pika來對其進行操做,pika是一個純Python的AMQP客戶端。windows

  安裝RabbitMQ步驟:瀏覽器

  1. erlang下載地址:http://www.erlang.org/downloads  安裝
  2. RabbitMQ下載地址:http://www.rabbitmq.com/install-windows.html  安裝
  3. 在python中安裝pika(pycharm中)

2、RabbitMQ架構緩存

  RabbitMQ因爲須要同時爲多個應用服務,其中維護的隊列不止一個。服務器

  上圖是一個典型的生產者消費者模型,中間是RabbitMQ消息隊列,他由Exchange和Queue組成。消息隊列服務實體又叫broker(即中間方塊部分)。微信

  圖中,紅色部分就是多個隊列。綠色部分是Exchange。

  

  RabbitMQ默認端口:

       Client端通信口:5672

    後臺管理口:15672  http://localhost:15672

    server間內部通信口:25672

  

  新安裝好的RabbitMQ沒法登陸後臺管理界面:

  • 在cmd窗口下進入rabbitmq安裝目錄下的sbin目錄,使用rabbitmq-plugins.bat list查看已安裝的插件列表。

  

  • 使用rabbitmq-plugins.bat enable rabbitmq_management開啓網頁管理界面

  

  • 重啓rabbitmq。
  • 在瀏覽器中輸入http://127.0.0.1:15672/
  • 輸入用戶名和密碼(默認爲guest)

 

 

3、最簡單的使用

  生產者:

import pika

# 創建一個socket連接
conn = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
# 聲明一個管道
channel = conn.channel()
# 定義一個queue
channel.queue_declare(queue='hello')
# 經過管道發送數據到隊列hello中
channel.basic_publish(exchange='',
                      routing_key='hello',  # queue的名字
                      body='Hello World!!') # 內容
print('[X] Sent "Hello World!!"')
conn.close()    # 關閉連接

  消費者:

import pika

conn = pika.BlockingConnection(
    # 若是消費者在其餘機器上,這裏的localhost須要修改成server的IP地址
    pika.ConnectionParameters('localhost')
)
channel = conn.channel()
# 爲何還要在這裏聲明queue,由於咱們不肯定在這以前是否已經有produce聲明瞭queue
# 若是能肯定已經存在queue,那就不用聲明瞭
channel.queue_declare(queue='hello')

# 參數ch是管道對象內存地址,method是消息要發給誰的信息
def callback(ch, method, properties, body):
        print('[x] Received %r' % body)

# 從hello隊列中中獲取消息
channel.basic_consume(
    callback,   # 若是收到消息就調用這個函數來處理消息
    queue='hello',  # 指定隊列名稱
    no_ack=True  # 是否向服務器確認消息處理完畢,見後述詳解no_ack
)
print('[*] Waiting for messages.To exit press CTRL+C')
# 開始接收,一直接收,沒有就卡住
channel.start_consuming()

   在這種模式下,若是一個生產者對應多個消費者(即啓動多個消費者),此時隊列中的消息是以輪訓的方式發送給每一個消費者的(相似負載均衡)。

  no_ack:

  消費者受到消息後,須要必定的時間來處理消息,使用no_ack=True表示無論處理完仍是沒有處理完,都不會給服務端發確認。這種狀況下,服務端不會管消費者是否處理完消息就直接將消息從隊列中刪除了。

  若是要讓消費者在處理完後給服務端發送確認消息後,再從隊列中刪除(若是未處理完就斷掉了,那麼就將消息給另一個活動的消費者處理),那麼就不要使用no_ack=True這個參數,而且在callback方法中添加下面代碼:

ch.basic_ack(delivery_tag=method.delivery_tag)

  以下代碼:

# 參數ch是管道對象內存地址,method是消息要發給誰的信息
def callback(ch, method, properties, body):
        print('[x] Received %r' % body)
        # 處理完畢後迴應服務端
        ch.basic_ack(delivery_tag=method.delivery_tag)

# 從hello隊列中中獲取消息
channel.basic_consume(
    callback,   # 若是收到消息就調用這個函數來處理消息
    queue='hello1',  # 指定隊列名稱
)

 

4、持久化

   在RabbitMQ服務重啓後,隊列和裏面的消息都會丟失。

   如何持久化隊列:

# 定義一個queue,並持久化
channel.queue_declare(queue='hello', durable=True)

  使用durable=Ture來持久化queue(客戶端和服務器端都得寫)。可是queue中的數據仍是會丟失!

  如何持久化隊列中的消息:

# 經過管道發送數據到隊列hello中,並持久化
channel.basic_publish(exchange='',
                      routing_key='hello1',  # queue的名字
                      body='Hello World!!',  # 內容
                      # 持久化隊列中的消息
                      properties=pika.BasicProperties(delivery_mode=2)
                      )

  持久化隊列中消息,如上述代碼中,添加properties=pika.BasicProperties(delivery_mode=2)。

  注意:消息持久化的delivery_mode必須搭配隊列持久化durable使用,不然隊列都不持久化,裏面的消息會丟失。 

 

  Windows命令行查看queue:

 

5、Exchange轉發器

  Exchange在定義時要指定類型,以決定哪些queue符合條件,能夠接受消息。能夠存在多個exchange。

  四種類型:

  fanout:全部bind到此exchange的queue均可以接受到消息。

  direct:經過routingKey和exchange決定的那個惟一的queue能夠接受消息。(徹底匹配)

  topic:全部符合routingKey(能夠是一個表達式)所bind的queue能夠接受消息。(模糊匹配)

  header:經過headers來決定把消息發給哪些queue。

 

  Fanout:廣播

  生產者:

import pika

conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = conn.channel()
# 只需定義exchange,無需定義queue了
channel.exchange_declare(
    exchange='logs1',   # 定義exchange名稱
    exchange_type='fanout',   # exchange類型,這裏爲廣播
    durable=True    # 持久化exchange
)
message = "Hello World!!"
channel.basic_publish(
    exchange='logs',    # exchange的名稱
    routing_key='',  # 之前這裏寫的是queue的名字,如今指定爲空
    body=message    # 發送的消息
)
print('[X] Sent %r' % message)
conn.close()

  消費者:

import pika

conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = conn.channel()
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout',
                         durable=True   # 和生產者一致,須要持久化
                         )
res = channel.queue_declare(exclusive=True)  # 自動生成一個隊列名稱(惟一的名稱),並在此queue的消費者斷開後,自動銷燬queue
queue_name = res.method.queue   # 獲取自動生成的隊列名例如amq.gen-55TO5iqNph0I5MmG5GlKuA  
# 將定義好的queue綁定到exchange上
channel.queue_bind(exchange='logs',
                   queue=queue_name
                   )
print('[X] Waiting for logs. To exit press CTRL+C')

# 回調函數
def callback(ch, methos, properties, body):
    print('[X] %r' % body)

# 接收消息
channel.basic_consume(callback,
                      queue=queue_name,  # 指定隊列名
                      no_ack=True   # 無響應,也可使用主動通知的形式
                      )
channel.start_consuming()   # 開始監聽消息

  注意:channel.queue_declare(exclusive=True)中的exclusive=True會自動生成一個惟一的隊列名,並在該隊列的消費者斷開後,自動銷燬該queue。這裏爲何要用這種形式,是由於咱們這裏的隊列只用於接收這個fanout廣播消息,接收完不須要這個queue了就能夠刪除了,若是還須要接收其餘exchange的消息,那可能就須要本身去指定名稱了。

   注意注意:fanout模式,在exchange接收到生產者的消息後,只會發給當前已綁定的queues,分發給全部的queue後,消息就刪除了。因此在生產者發送消息後再綁定到exchange的消費者隊列是沒法接受到這條消息的。

 

  direct:

 

  生產者:

import pika

conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = conn.channel()
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct'  # 轉發類型爲direct(徹底匹配)
                         )
severity = 'error'  # 發送級別爲error,用於(徹底)匹配
message = 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message
                      )
print('[X] Sent %r:%r' % (severity, message))
conn.close()

  消費者:

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = conn.channel()
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct'
                         )
res = channel.queue_declare(exclusive=True)  # 隨機生成一個queue_name(自動銷燬)
queue_name = res.method.queue   # 獲取隨機名
severities = ['info', 'warning', 'error']   # 接收三個級別的匹配字符
# 利用for循環,爲同一個queue綁定3個級別的消息(即3個匹配字符)
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity
                       )
print('[*] Waiting for logs.To exit press CTRL+C')

def callback(ch, method, properties, body):
    print('[X] %r:%r' % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True
                      )
channel.start_consuming()

  direct模式,就是使用routing_key來區分將消息發給哪一個隊列,須要接收的隊列必須使用routing_key來綁定到exchange上。發消息必須制定routing_key,從而exchange才能根據這個值來進行轉發。direct是徹底匹配的模式,一個字符都不能差。

 

  topic:

  topic與direct的使用方法很類似,在生產者中沒什麼不一樣,而在消費者中,topic是將一個匹配的表達式(相似正則表達式)和queue綁定到一個exchange中,這樣,只要生產者產生的消息可以匹配這個表達式,exchange就會把這個消息發送給匹配上的queue。

  生產者:

import pika

conn = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = conn.channel()
channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic'  # 轉發類型爲direct(徹底匹配)
                         )
severity = 'Leo.warning'  # 發送一個應用+級別的消息
message = 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=severity,
                      body=message
                      )
print('[X] Sent %r:%r' % (severity, message))
conn.close()

  消費者:

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = conn.channel()
channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic'
                         )
res = channel.queue_declare(exclusive=True)  # 隨機生成一個queue_name(自動銷燬)
queue_name = res.method.queue   # 獲取隨機名
severity = '*.warning'  # 匹配一個級別的任何應用發來的消息,是一個相似於正則的表達式
channel.queue_bind(exchange='topic_logs',
                   queue=queue_name,
                   routing_key=severity
                   )
print('[*] Waiting for logs.To exit press CTRL+C')

def callback(ch, method, properties, body):
    print('[X] %r:%r' % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True
                      )
channel.start_consuming()

  topic的匹配表達式:

  "#":匹配0-N個單詞(單詞由字母和數字組成)。

  "*":匹配0-1個單詞。

  中間只能用「.」來間隔。例如  「 *.info.* 」, " #.warning.* "

  注意:在消費者的代碼中queue_bind函數的參數中routing_key中"*"和"."和"#"都表明着匹配字符。而在生產者的basic_publish函數的參數中routing_key中字符串若是出現"*"、"."和"#",都表明一個普通字符。

 

6、RabbitMQ實現RPC(簡單版,無exchange部分)

   RPC:remote procedure call 遠程過程調用,即客戶端發送一條命令,服務端執行方法調用,並返回結果。

  RabbitMQ中實現RPC思路:

    生產者--->rpc_Queue--->消費者

    消費者<---res_Queue<---生產者

    經過兩個隊列,來互相傳遞命令和執行結果。

  客戶端代碼:(發送調用命令)

import pika
import time
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        # 連接RabbitMQ
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost')
        )
        self.channel = self.connection.channel()
        # 隨機生成一個隊列
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue   # 獲取隊列名稱
        # 定義一個消費者,回調函數是on_response(),消息隊列名稱是callback_queue
        self.channel.basic_consume(self.on_response,
                                   no_ack=True,
                                   queue=self.callback_queue)

    # 消費者回調函數
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    # 發送消息(命令),並接受響應
    def call(self, n):
        # 定義self.response爲空
        self.response = None
        # 生成一個uuid,對應一個命令
        self.corr_id = str(uuid.uuid4())
        # 發送消息,隊列名爲rpc_queue,將響應隊列和uuid一併發送給服務端,隊列名爲
        # callback_queue,uuid爲corr_id,body爲消息體
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id
                                   ),
                                   body=str(n)
                                   )
        # 消息發送完畢後,循環調用非阻塞模式的start_consuming()
        # 即connection.process_data_events()
        # 當沒收到相應時,返回並繼續循環。
        # 當有響應時會調用on_response,在on_response中將self.response設置爲body,跳出循環
        while self.response is None:
            self.connection.process_data_events()
            # 每隔一秒檢查一下是否有響應到達
            time.sleep(1)
        # 返回響應結果
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()
response = fibonacci_rpc.call(3)
print('[.] Got %r' % response)

  服務端代碼:(處理命令,返回調用結果)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()
channel.queue_declare(queue='rpc_queue')

def fibonacci(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fibonacci(n-1) + fibonacci(n-2)

# 定義一個回調函數,用來處理命令
def on_request(ch, method, props, body):
    n = int(body)
    print('[.] fib(%s)' % n)
    # 執行調用的函數,或去響應內容
    response = fibonacci(n)
    # 將響應內容發送回去,隊列爲客戶端發過來的props.reply_to
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response)
                     )
    # 通知客戶端,這個命令已經執行完畢
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 從rpc_queue中接收消息
channel.basic_consume(on_request, queue='rpc_queue')

print('[x] Awaiting RPC requests')
# 開始接收消息
channel.start_consuming()

  上述代碼主要完成了RPC調用的基本功能,其中函數一個客戶端(發起RPC調用)和一個服務端(處理RPC調用)。

  在客戶端代碼中,關鍵點在於在發送消息的同時,將響應返回須要使用的隊列也一併定義並將名稱發送給了服務端(還包含區分消息和相應批次的uuid)。在發送完消息後,進入循環獲取響應的階段。

  在服務端代碼中,關鍵點在於接收到消息後,在回調函數中調用處理函數fibonacci(),並將運行結果發送到客戶端告知的響應隊列中。

  

7、Redis概述

   Redis是一個緩存系統,即第三方實現數據共享,例如QQ和微信要共享一個數據name。QQ和微信都分別使用socket連接到redis,而後實現訪問該name數據。

   Redis是單線程的,底層應該是使用epoll實現異步處理(不肯定)。

 

   Redis安裝:

   在Linux下采用yum等工具安裝,或者源碼安裝。

  1.安裝redis前須要安裝epel源,yum install epel-release -y

  2.安裝redis,yum install redis -y

  3.啓動redis,systemctl start redis.service

  4.查看redis是否啓動,systemctl status redis.service  或者  ps -ef | grep redis

 

  Redis命令行使用:

  使用redis-cli運行redis命令行:

   存數據:

  讀數據:

 

  在Python中調用redis:

  首先須要在python中安裝redis庫,可使用pycharm安裝。或者使用pip install redis,easy_install redis等方法安裝。

  redis默認使用端口號:6379

import redis
# 連接遠程redis服務
r = redis.Redis(host='192.168.1.66', port=6379)
r.set('age', '25')  # 存入數據
print(r.get('age'))  # 讀出數據 

  使用上述代碼進行redis連接和操做,若是報錯說redis拒絕連接,解決方案以下:

  1.修改redis配置文件/etc/redis.conf,將其中的bind 127.0.0.1註釋掉。

  2.將protected-mode yes修改成protected-mode no。

  3.再次連接,便可成功。

 

  Redis鏈接池:

import redis

pool = redis.ConnectionPool(host='192.168.1.66', port=6379)
r = redis.Redis(connection_pool=pool)
r.set('sex', 'male')
print(r.get('sex'))

  redis-py使用connection pool來管理一個redis server的全部連接,避免每次創建、釋放連接的資源開銷。默認,每一個redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個連接池。

  使用Redis幫助:

  在redis-cli中使用命令 help的形式能夠查看命令幫助。

  

8、Redis的String操做

  主要有如下一些經常使用方法:

  set(name, value, ex=None, px=None, nx=False, xx=False)

  在Redis的set函數中,默認不存在則建立,存在則修改。

  參數:

    ex,過時時間(秒)

    px,過時時間(毫秒)

    nx,若是設置爲True,則只有name不存在時,才執行set操做

    xx,若是設置爲True,則只有name存在時,才執行set操做

  setnx(key, value)  = set()+nx  通常不使用,直接用set

  setex(key, value, time) = set()+ex  通常不使用,直接用set

  psetex(key, value, time) = set()+px  通常不使用,直接用set

  mset(k1='v1', k2='v2')    批量設置鍵值  或者  mset({'k1':'v1', 'k2':'v2' })使用字段傳入鍵值對。

  mget(k1, k2)  批量獲取值   或者  mget([k1, k2]) 使用列表傳入keys

  getset(key, value)  獲取舊值,設置新值  

  getrange(key, start, end)  獲取某個值的其中幾個字符,例如值爲‘jack’,getrange(key, 0, 2)返回'jac'。

  setrange(key, offset, value)  將值從某個位置開始覆蓋字符爲value,例如值爲‘jack’,setrange(key, 1, 'AL'),返回'jALk'。

  setbit(key, offset, value) 將某個值的某個二進制位進行修改。例如n1對應的值爲foo,foo對應的二進制爲01100110 01101111 01101111第7位設置爲1,setbit('n1', 7, 1),返回的值變爲01100111 01101111 01101111,foo變爲goo。

    setbit一個使用的場景:例如,新浪微博有10億用戶,咱們想統計有哪些用戶在線,使用數據庫修改狀態的方式效率過低。咱們可使用每一個用戶對應一個編號index,編號對應的二進制位0表明不在線,1表明在線。用戶上線就使用setbit(key, index, 1)設置key對應值中的index位爲1,下線就setbit(key,index,0)。而後使用bitcount(key)就能夠統計值裏有多少位爲1,就能夠知道有多少用戶在線。 例如,設置第1億個用戶爲在線,setbit(user,100000000,1)。

  getbit(key, offset) 獲取某個位置的二進制值,例如1爲用戶在線,0爲不在線。

  strlen(key) 返回key對應value的字節長度(一個漢字三個字節)。

  incr(key, amount=1) 自增,運行一次,key對應的value+1,能夠用來處理用戶上線統計(incr login_user)

  

  decr(key, amount=1) 自減,運行一次,key對應的value-1,能夠用來處理用戶下線統計(decr login_user),注意會減爲負數。

   

  incrbyfloat(key, amount=1.2) 浮點數自加,相似incr。

  decrbyfloat(key, amount=1.2) 浮點數自減,相似decr。

  append(key, value) 在key對應的值後面添加value。例如name='alex',append(name,'leo'),返回alexleo。

  

9、Redis的hash操做

  hset(name, key, value)  至關於name中存在多個鍵值對,例如hset('n2', 'k9', 'v99'),hset('n2', 'kx', 'vx')。

  

  hgetall(name)  返回全部的鍵值對。

  

  hget(name, key)  獲取info中某個key的值,例如 hget('info', 'age')。

   

  hkeys(name) 獲取全部name中的key。

  hvals(name) 獲取全部name中的value。

  hmset(name,mapping)  批量設置,例如hmset('info', {'k1':'v1', 'k2':'v2'})

  hmget(name, key ...)  批量獲取,例如hmget('info', 'k1', 'k2')

  hlen(name)  獲取name中有幾個key-value。例如hlen('info'),返回(integer)2

  hexist(name, key) 判斷name中是否存在某個鍵值。

  hincrby(name, key, amount=1) 使name中key對應的value自增。

  hdecrby(name, key, amount=1) 使name中key對應的value自減。

  hincebyfloat(name, key, amount=1.2) 浮點自增。

  hscan(name, cursor=0, match=None, count=None) 因爲hset能夠保存200多億個key-value,因此使用hkeys來查詢很耗性能,因此可使用hscan進行過濾。例如hscan('info',0,'*e*'),過濾key中帶'e'的key-value。

  hscan_iter(name, match=None, count=None)  返回一個迭代器,而後使用循環獲取key-value。

  例如:

for item in r.hscan_iter('info'):
    print item

  

10、Redis的list操做

  lpush(names, val1, val2...) 往列表中插入數據,若是不存在則建立。lpush是從數據的最前面插入(最左邊)。

  lrange(names, start, end)  去列表中的一個範圍的數據,例如lrange(names, 0, -1)即取names中的全部數據。

  rpush(names, val1, val2..) 與lpush相反,rpush是從最後面插入(最右邊)。

  lpushx(names, value)  只有當names存在時才插入,不然無效。

  rpushx(names, value) 相似lpushx。 

  llen(names) 返回names對應list元素個數。

  linsert(names, where, refvalue, value)  在names列表中,refvalue值的前面仍是後面插入value。例如linsert(names, BEFORE, 'alex', 'leo'),在names列表的'alex'前插入'leo'。where填寫BEFORE和AFTER。

  lset(names, index, value) 在names列表中某個index處從新設置值。

  lrem(names, value, num) 刪除names列表中指定的值,num表示刪除的數量(正數爲從前到後,負數爲從後到前)。例如lrem(names, 'leo', 2)即從前到後刪除2個'leo'。

  lpop(names) 從names的最前面刪除一個元素,並返回。

  rpop(names) 從names的最後面刪除一個元素,並返回。

  lindex(names, index)  從names中根據index獲取相應位置的元素。

  ltrim(name, start, end) 移除names中start-end下標範圍的元素。

  rpoplpush(src, dst)  從一個列表的最右邊取元素,添加到另外一個列表的最左邊。

 

11、Redis的set(集合)操做

  集合就是不容許重複的列表。

  sadd(names, val1, val2...) 向names中添加元素,可是不能重複,若是與集合中某個元素重複,則不會插入該元素。

  smembers(names) 獲取names集合中的全部元素,這些元素都是不重複的。

  sdiff(names1, names2...) 獲取差集,names1-names2

  sdiffstore(res, names1, names2...) 獲取names1-names2的結果,並存放到res集合中。

  sinter(names1, names2...) 獲取多個集合之間的交集。

  sinterstore(res, names1, names2...) 獲取集合之間的交集,並存放到res集合中。

  sismember(names, value) 判斷value是不是names集合中的成員。

  smove(src, dst, value) 將某個成員從一個集合移動到另一個集合中。

  spop(names) 從集合的右側移除一個成員,並將其返回。

  srandmember(names, num) 從names集合中隨機獲取num個元素。

  srem(names, value...) 在names集合中刪除某些值。

  sunion(names1, names2...) 多個集合的並集。

  sunionstore(res, names1, names2...) 多個集合的並集,並存放res集合中。

  sscan(names, cursor=0, match=None, count=None) 和hscan很像,即過濾。

  sscan_iter(names, match=None, count=None) 和hscan_iter很像,即返回一個迭代器,而後使用循環獲取值。

 

  有序集合:

    在集合的基礎上,爲每一個元素排序。元素的排序須要根據另外一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。

  zadd(names, s1, val1, s2, val2...) 向有序集合中插入數據,每一個數據都有一個分數,用於排序。例如zadd(names,10,'leo',20,'alex')。

  zrange(names, start, end, desc=False, withscores=False, score_cast_func=float) 查看有序集合的部分元素,例如zrange(names, 0, -1, withscores=True)查看所有元素(帶分數)。

  zcard(names) 獲取names對應有序集合元素的數量。

  zcount(names, min, max) 獲取有序集合中分數在[min, max]之間元素的個數。

  zincrby(names, value, amount) 自增names有序集合中對應value值元素的分數。

  zrank(names, value) 獲取有序集合中value對應元素的排行(從0開始,從小到大)。

  zrerang(name, value) 與zrank相反,從大到小。

  zrem(names, value) 刪除names中值爲value的元素。

  zremrangebyrank(names, min, max) 根據排行範圍(至關於index)刪除。

  zremrangebyscore(names, min, max) 根據分數範圍刪除。

  zscore(names, value) 獲取names有序集合中value對應元素的分數。

  zinterstore(res, keys, aggregate=None) 獲取兩個有序集合的交集,若是其中有值相同分數不一樣的元素,則對其分數進行SUM,MIN,MAX三種操做。

  zunionstore(res, keys, aggregate=None)  獲取兩個有序集合的並集,若是其中有值相同的元素,則對其分數進行SUM,MIN,MAX三種操做。

  zscan(name, cursor=0, match=None, count=None,score_cast_func=float) 

  zscan_iter(name, match=None, count=None,score_cast_func=float)

  同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做。

 

12、其餘經常使用操做

  delete(key) 任意刪除一個key對應的元素。

  exist(key) 查看是否存在key對應的元素。

  keys(pattern='*') 根據pattern匹配獲取redis中全部的key。

  expire(key, time) 爲某個元素設置超時時間(秒)。到時間自動刪除。

  rename(src, dst) 重命名。

  move(name, db) 將某個值移動到指定的db下。redis中默認有16個db(至關於表),編號爲0-15。每一個db中數據都是新的。使用select db_id來切換。

  type(key) 獲取key對應值的類型。

  scan(cursor=0, match=None, count=None)  同字符串操做

  scan_iter(match=None, count=None)  同字符串操做

   

十3、Redis管道(pipline)

  使用pipe能夠將多個操做黏在一塊兒,變成一個原子操做。

import redis

pool = redis.ConnectionPool(host='192.168.1.66', port=6379)
r = redis.Redis(host='192.168.1.66', port=6379)

pipe = r.pipeline(transaction=True)  # 定義一個pipe

pipe.set('haha1', 'good')   # 加入動做
pipe.set('haha2', 'UP')  # 加入動做

pipe.execute()  # 開始執行pipe,其中的兩個動做是黏在一塊兒的原子操做

 

十4、發佈訂閱

  使用Redis也能夠作到和RabbitMQ相似的廣播效果。

  首先封裝一個RedisHelper:

import redis

class RedisHelper(object):

    def __init__(self):
        self.__conn = redis.Redis(host='192.168.1.66', port=6379)
        self.channel_pub = 'fm104.5'  # 發佈的頻道
        self.channel_sub = 'fm104.5'  # 接收的頻道

    def publishing(self, msg):
        self.__conn.publish(self.channel_pub, msg)
        return True

    def subscribe(self):
        sub = self.__conn.pubsub()  # 至關於打開收音機
        sub.subscribe(self.channel_sub)  # 選擇頻道
        sub.parse_response()    # 準備接收(再次調用纔是正式接收)
        return sub

  建立一個訂閱者:

from redishelper import RedisHelper

rh = RedisHelper()
rh_sub = rh.subscribe()
print('打開收音機,調頻104.5,開始接收消息:')
while True:
    msg = rh_sub.parse_response()
    print(msg)

  建立一個發佈者:

from redishelper import RedisHelper

rh = RedisHelper()
rh.publishing('Hello World!')

  也能夠經過redis-cli來發布,使用PUBLISH fm104.5 'Hello World'便可。

相關文章
相關標籤/搜索