day10-python-協程\異步IO\RabbitMQ隊列\redis緩存

1、協程javascript

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程html

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:java

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。python

協程的好處:git

  • 無需線程上下文切換的開銷
  • 無需原子操做鎖定及同步的開銷
    •   "原子操做(atomic operation)是不須要synchronized",所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。原子操做能夠是一個步驟,也能夠是多個操做步驟,可是其順序是不能夠被打亂,或者切割掉只執行部分。視做總體是原子性的核心。
  • 方便切換控制流,簡化編程模型
  • 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

缺點:程序員

  • 沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

使用yield實現協程操做例子github

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
import queue

def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
        # time.sleep(1)

def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" % n)

if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

看樓上的例子,我問你這算不算作是協程呢?你說,我他媽哪知道呀,你前面說了一堆廢話,可是並沒告訴我協程的標準形態呀,我腚眼一想,以爲你說也對,那好,咱們先給協程一個標準定義,即符合什麼條件就能稱之爲協程:正則表達式

  1. 必須在只有一個單線程裏實現併發
  2. 修改共享數據不需加鎖
  3. 用戶程序裏本身保存多個控制流的上下文棧
  4. 一個協程遇到IO操做自動切換到其它協程

基於上面這4點定義,咱們剛纔用yield實現的程並不能算是合格的線程,由於它有一點功能沒實現,哪一點呢?redis

 

Greenletsql

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)   #啓動一個協程
gr2 = greenlet(test2)
gr1.switch()

感受確實用着比generator還簡單了呢,但好像尚未解決一個問題,就是遇到IO操做,自動切換,對不對?

 

Gevent

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import gevent


def foo():
    print('Running in foo')
    gevent.sleep(2)
    print('Explicit context switch to foo again')


def bar():
    print('Explicit精確的 context內容 to bar')
    gevent.sleep(1)
    print('Implicit context switch back to bar')

def func3():
    print("running func3")
    gevent.sleep(0)
    print("running func3 again")

gevent.joinall([
    gevent.spawn(foo),  #生成
    gevent.spawn(bar),
    gevent.spawn(func3)
])

 

同步和異步的性能區別

import gevent
 
def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)
 
def synchronous():
    for i in range(1,10):
        task(i)
 
def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)
 
print('Synchronous:')
synchronous()
 
print('Asynchronous:')
asynchronous()

上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。

 

遇到IO阻塞時會自動切換任務

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from urllib import request
import gevent,time
from gevent import monkey

monkey.patch_all()  #把當前程序的全部io操做給我單獨的作上標記

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

urls = ['https://www.python.org/',
        'https://www.yahoo.com/',
        'https://github.com/']
time_start = time.time()
for url in urls:
    f(url)
print("同步cost",time.time() - time_start)

async_time_start = time.time()
gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://github.com/'),
])
print("異步cost",time.time() - async_time_start)

經過gevent實現單線程下的多socket併發

server side 

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
import socket
import time
import gevent

from gevent import socket, monkey

monkey.patch_all()


def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)


def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()


if __name__ == '__main__':
    server(8001)

client side 

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket

HOST = 'localhost'  # The remote host
PORT = 8001  # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    # print(data)
    #
    print('Received', repr(data))
s.close()

併發100個sock鏈接

import socket
import threading

def sock_conn():

    client = socket.socket()

    client.connect(("localhost",8001))
    count = 0
    while True:
        #msg = input(">>:").strip()
        #if len(msg) == 0:continue
        client.send( ("hello %s" %count).encode("utf-8"))

        data = client.recv(1024)

        print("[%s]recv from server:" % threading.get_ident(),data.decode()) #結果
        count +=1
    client.close()


for i in range(100):
    t = threading.Thread(target=sock_conn)
    t.start()

2、事件驅動和異步IO

一般,咱們寫服務器處理模型的程序時,有如下幾種模型:
(1)每收到一個請求,建立一個新的進程,來處理該請求;
(2)每收到一個請求,建立一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)中方法,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。
第(2)種方式,因爲要涉及到線程的同步,有可能會面臨 死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。
綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數 網絡服務器採用的方式
 

看圖說話講事件驅動模型

在UI編程中,經常要對鼠標點擊進行相應,首先如何得到鼠標點擊呢?
方式一:建立一個線程,該線程一直循環檢測是否有鼠標點擊,那麼這個方式有如下幾個缺點
1. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
2. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
3. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題;
因此,該方式是很是很差的。

方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數;

 

 

 

事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。

讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。

 

在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。

在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。

當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:

  1. 程序中有許多任務,並且…
  2. 任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…
  3. 在等待事件到來時,某些任務會阻塞。

當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。

網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。

此處要提出一個問題,就是,上面的事件驅動模型中,只要一遇到IO就註冊一個事件,而後主程序就能夠繼續幹其它的事情了,只到io處理完畢後,繼續恢復以前中斷的任務,這本質上是怎麼實現的呢?哈哈,下面咱們就來一塊兒揭開這神祕的面紗。。。。

3、select、poll、epoll三者的區別

 http://www.javashuo.com/article/p-qajblppq-do.html

4、IO多路複用

https://www.cnblogs.com/alex3714/articles/5876749.html

5、select/selectors模塊

select 多併發socket 例子

select socket server

import select
import socket
import sys
import queue


server = socket.socket()
server.setblocking(0)

server_addr = ('localhost',10000)

print('starting up on %s port %s' % server_addr)
server.bind(server_addr)

server.listen(5)


inputs = [server, ] #本身也要監測呀,由於server自己也是個fd
outputs = []

message_queues = {}

while True:
    print("waiting for next event...")

    readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏

    for s in readable: #每一個s就是一個socket

        if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了,
            #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀
            #新鏈接進來了,接受這個鏈接
            conn, client_addr = s.accept()
            print("new connection from",client_addr)
            conn.setblocking(0)
            inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接
            #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到
            #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的

            message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送

        else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了
            #客戶端的數據過來了,在這接收
            data = s.recv(1024)
            if data:
                print("收到來自[%s]的數據:" % s.getpeername()[0], data)
                message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端
                if s not  in outputs:
                    outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端


            else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀
                print("客戶端斷開了",s)

                if s in outputs:
                    outputs.remove(s) #清理已斷開的鏈接

                inputs.remove(s) #清理已斷開的鏈接

                del message_queues[s] ##清理已斷開的鏈接


    for s in writeable:
        try :
            next_msg = message_queues[s].get_nowait()

        except queue.Empty:
            print("client [%s]" %s.getpeername()[0], "queue is empty..")
            outputs.remove(s)

        else:
            print("sending msg to [%s]"%s.getpeername()[0], next_msg)
            s.send(next_msg.upper())


    for s in exeptional:
        print("handling exception for ",s.getpeername())
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        del message_queues[s]

select socket client

import socket
import sys

messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 10000)

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )

 

selectors模塊

這個模塊容許高級和高效的I/O多路複用,創建在選擇模塊原語的基礎上。咱們鼓勵用戶使用這個模塊,除非他們但願對所使用的os級原語進行精確控制。

import selectors
import socket
 
sel = selectors.DefaultSelector()
 
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
 
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
 
sock = socket.socket()
sock.bind(('localhost', 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
 
while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

6、RabbitMQ隊列

 

實現最簡單的隊列通訊

 

 


send端

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  #聲明一個管道

#聲明queue
channel.queue_declare(queue='hello2')

# n RabbitMQ a message can never be sent directly to the queue,it always
channel.basic_publish(exchange='',
                      routing_key='hello2',#queue名字
                      body='Hello World!'
                      )
                      )
print(" [x] Sent 'Hello World!'")
connection.close()

receive端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello2')

def callback(ch, method, properties, body): #回調函數
    print("-->",ch,method,properties)
    print(" [x] Receiverd %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(#消費信息
                      'hello2',
                      callback,) #若是收到消息,就調用CALLBACK 函數來處理消息


print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

遠程鏈接rabbitmq server的話,須要配置權限 噢 

首先在rabbitmq server上建立一個用戶

sudo rabbitmqctl  add_user hhh hhh1234

同時還要配置權限,容許從外面訪問

sudo rabbitmqctl set_permissions -p / hhh ".*" ".*" ".*"

set_permissions [-p vhost] {user} {conf} {write} {read}

vhost

要授予用戶訪問權的虛擬主機的名稱,默認爲/。

user

授予對指定虛擬主機的訪問權的用戶的名稱。

conf

一個正則表達式,它與授予用戶配置權限的資源名稱相匹配。

write

爲用戶授予寫權限的與資源名稱匹配的正則表達式。

 read

匹配資源名稱的正則表達式,爲用戶授予讀權限。

 

客戶端鏈接的時候須要配置認證參數

credentials = pika.PlainCredentials('hhh', 'hhh1234')
 
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.1.40',5672,'/',credentials))
channel = connection.channel()

 

Work queues

在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少

消息提供者代碼

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  #聲明一個管道

#聲明queue
channel.queue_declare(queue='hello2',durable=True)

# n RabbitMQ a message can never be sent directly to the queue,it always
channel.basic_publish(exchange='',
                      routing_key='hello2',#queue名字
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  #make message persistent
                      )
                      )
print(" [x] Sent 'Hello World!'")
connection.close()

消費者代碼

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika, time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello2',durable=True)

def callback(ch, method, properties, body): #回調函數
    print("-->",ch,method,properties)
    # time.sleep(30)
    print(" [x] Receiverd %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(#消費信息
                      'hello2',
                      callback,) #若是收到消息,就調用CALLBACK 函數來處理消息
                      #True  #不確認

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

此時,先啓動消息生產者,而後再分別啓動3個消費者,經過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上


完成一項任務須要幾秒鐘的時間。您可能想知道,若是其中一個消費者啓動了一個很長的任務,但只完成了一部分就掛掉了,會發生什麼狀況。對於咱們當前的代碼,一旦RabbitMQ將消息傳遞給客戶,它就會當即將其從內存中刪除。在這種狀況下,若是您殺死了一個工做人員,咱們將丟失它正在處理的消息。咱們還將丟失全部發送給這個特定worker但還沒有處理的消息。

但咱們不想失去任何任務。若是一個工人死了,咱們但願把這個任務交給另外一個工人。

爲了確保消息永不丟失,RabbitMQ支持消息確認。從使用者發回一個ack(nowledgement),告訴RabbitMQ已經接收並處理了特定的消息,RabbitMQ能夠自由地刪除它。

若是使用者在不發送ack的狀況下死亡(其通道關閉、鏈接關閉或TCP鏈接丟失),RabbitMQ將理解消息未被徹底處理,並將從新對其排隊。若是同時有其餘的消費者在線,它會很快將其從新發送給另外一個消費者。這樣你能夠確保沒有信息丟失,即便工人偶爾死亡。

沒有任何消息超時;當使用者死亡時,RabbitMQ將從新傳遞消息。即便處理一條消息須要很長很長的時間,也不要緊。

消息確認在默認狀況下是打開的。在前面的示例中,咱們經過no_ack=True標誌顯式地關閉了它們。一旦咱們完成了一個任務,就應該移除這個標誌並從worker那裏發送一個適當的確認。

def callback(ch, method, properties, body): #回調函數
    print("-->",ch,method,properties)
    # time.sleep(30)
    print(" [x] Receiverd %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(#消費信息
                      'hello2',
                      callback,) #若是收到消息,就調用CALLBACK 函數來處理消息

使用這段代碼,咱們能夠確保即便您在處理消息時使用CTRL+C殺死了一個工做人員,也不會丟失任何東西。在工做人員死後不久,全部未確認的消息將被從新發送

 

消息持久化

咱們已經學習瞭如何確保即便使用者死亡,任務也不會丟失(默認狀況下,若是想禁用use no_ack=True)。可是,若是RabbitMQ服務器中止,咱們的任務仍然會丟失。

當RabbitMQ退出或崩潰時,它將忘記隊列和消息,除非您告訴它不要這樣作。確保消息不丟失須要作兩件事:咱們須要將隊列和消息都標記爲持久的。

首先,咱們須要確保RabbitMQ永遠不會丟失咱們的隊列。爲了作到這一點,咱們須要聲明它是耐用的:

channel.queue_declare(queue='hello2',durable=True)

雖然這個命令自己是正確的,可是它在咱們的設置中不能工做。那是由於咱們已經定義了一個名爲hello的隊列,它不是持久的。RabbitMQ不容許您使用不一樣的參數從新定義現有隊列,並將向任何試圖這樣作的程序返回一個錯誤。可是有一個快速的解決方法——讓咱們聲明一個不一樣名稱的隊列,例如:

channel.queue_declare(queue='task_queue', durable=True)

這個queue_declare更改須要同時應用到生產者代碼和消費者代碼。

此時,咱們確信即便RabbitMQ從新啓動,task_queue隊列也不會丟失。如今,咱們須要將消息標記爲persistent—經過提供一個值爲2的delivery_mode屬性。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

 

消息公平分發

若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

channel.basic_qos(prefetch_count=1)

帶消息持久化+公平分發的完整代碼

生產者端

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

消費者端

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
 
channel.start_consuming()

 

Publish\Subscribe(消息發佈\訂閱)

以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,

交換是一件很是簡單的事情。一邊接收來自生產者的消息,另外一邊將消息推送到隊列。交換器必須確切地知道如何處理接收到的消息。它應該被附加到一個特定的隊列嗎?它應該被添加到許多隊列中嗎?或者它應該被丟棄。這些規則由exchange類型定義。

Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息


fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

   表達式符號說明:#表明一個或多個字符,*表明任何字符
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout 

headers: 經過headers 來決定把消息發給哪些queue

消息publisher

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消息subscriber

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare('',exclusive=True)  #exclusive排他,惟一的,不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue
print("random queuename",queue_name)

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(queue_name,
                      callback,
                      True)

channel.start_consuming()

 

有選擇的接收消息(exchange type=direct)

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

publisher

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

consumer

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare('',exclusive=True)
queue_name = result.method.queue

serverities = sys.argv[1:]
if not serverities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
print(serverities)
for serverity in serverities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=serverity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" %(method.routing_key, body))

channel.basic_consume(queue_name,
                      callback,
                      True)
channel.start_consuming()

 

更細緻的消息過濾

雖然使用直接交換改進了咱們的系統,但它仍然有侷限性——它不能基於多個標準進行路由。

在咱們的日誌系統中,咱們可能不只但願根據嚴重性訂閱日誌,還但願根據發出日誌的源訂閱日誌。您可能從syslog unix工具中瞭解了這個概念,該工具根據嚴重程度(info/warn/crit…)和設施(auth/cron/kern…)來路由日誌。

這將給咱們很大的靈活性——咱們可能但願只監聽來自「cron」的關鍵錯誤,但也要監聽來自「kern」的全部日誌。

 

 

 publisher

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

consumer

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(queue_name,
                      callback,
                      True)

channel.start_consuming()

接收全部運行的日誌:

python receive_logs_topic.py "#"

接收設施「kern」的日誌:

python receive_logs_topic.py "kern.*"

或者你只想接收「critical」的日誌:

python receive_logs_topic.py "*.critical"

你能夠建立多個綁定:

python receive_logs_topic.py "kern.*" "*.critical"

併發出一個帶有路由鍵「kern」的日誌「critical」類型:

python emit_log_topic.py "kern.critical" "A critical kernel error"

 

Remote producer call(RPC)

爲了說明如何使用RPC服務,咱們將建立一個簡單的客戶機類。它將公開一個名爲call的方法,發送一個RPC請求並阻塞,直到收到答案:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

 

 

 RPC server

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=\
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume('rpc_queue',on_request)

print("[x] Awaiting RPC requests")
channel.start_consuming()

RPC client

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import uuid
import time

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters
                                                  (host='localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare('',exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.callback_queue,
                                   self.on_response,    #只要一收到消息就調用on_response
                                   True)
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()   #非阻塞版的start_consuming()
            print("no msg")
            time.sleep(0.5)
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print("[x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print("[.] Got %r" % response)

 

7、Memcached

http://www.cnblogs.com/wupeiqi/articles/5132791.html 

 

8、redis

介紹

redis是業界主流的key-value nosql 數據庫之一。和Memcached相似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操做,並且這些操做都是原子性的。在此基礎上,redis支持各類不一樣方式的排序。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。

Redis優勢

  • 異常快速 : Redis是很是快的,每秒能夠執行大約110000設置操做,81000個/每秒的讀取操做。

  • 支持豐富的數據類型 : Redis支持最大多數開發人員已經知道如列表,集合,可排序集合,哈希等數據類型。

    這使得在應用中很容易解決的各類問題,由於咱們知道哪些問題處理使用哪一種數據類型更好解決。
  • 操做都是原子的 : 全部 Redis 的操做都是原子,從而確保當兩個客戶同時訪問 Redis 服務器獲得的是更新後的值(最新值)。

  • MultiUtility工具:Redis是一個多功能實用工具,能夠在不少如:緩存,消息傳遞隊列中使用(Redis原生支持發佈/訂閱),在應用程序中,如:Web應用程序會話,網站頁面點擊數等任何短暫的數據;

安裝Redis環境

要在 Ubuntu 上安裝 Redis,打開終端,而後輸入如下命令:
$sudo apt-get update
$sudo apt-get install redis-server
這將在您的計算機上安裝Redis

啓動 Redis

$redis-server

查看 redis 是否還在運行

$redis-cli
這將打開一個 Redis 提示符,以下圖所示:
redis 127.0.0.1:6379>
在上面的提示信息中:127.0.0.1 是本機的IP地址,6379是 Redis 服務器運行的端口。如今輸入 PING 命令,以下圖所示:
redis 127.0.0.1:6379> ping
PONG
這說明如今你已經成功地在計算機上安裝了 Redis。
 
鏈接方式

一、操做模式

redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類,用於向後兼容舊版本的redis-py。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis

r = redis.Redis(host='192.168.1.40',port=6379)
r.set('foo','Bar')

print(r.get('foo'))

二、鏈接池

redis-py使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis

pool = redis.ConnectionPool(host='192.168.1.40',port=6379)

r = redis.Redis(connection_pool=pool)
r.set('foo','Bar')

print(r.get('foo'))

 

操做

1. String操做

redis中的String在在內存中按照一個name對應一個value來存儲。如圖:

set(name, value, ex=None, px=None, nx=False, xx=False)

在Redis中設置值,默認,不存在則建立,存在則修改
參數:
      ex,過時時間(秒)
      px,過時時間(毫秒)
      nx,若是設置爲True,則只有name不存在時,當前set操做才執行
      xx,若是設置爲True,則只有name存在時,崗前set操做才執行

setnx(name, value)

設置值,只有name不存在時,執行設置操做(添加)

setex(name, value, time)

# 設置值
# 參數:
     # time,過時時間(數字秒 或 timedelta對象)

psetex(name, time_ms, value)

# 設置值
# 參數:
     # time_ms,過時時間(數字毫秒 或 timedelta對象)

mset(*args, **kwargs)

批量設置值
如:
     mset(k1= 'v1' , k2= 'v2' )
    
     mget({ 'k1' 'v1' 'k2' 'v2' })

get(name)

獲取值

mget(keys, *args)

批量獲取
如:
     mget( 'ylr' 'wupeiqi' )
    
     r.mget([ 'ylr' 'wupeiqi' ])

getset(name, value)

設置新值並獲取原來的值

getrange(key, start, end)

# 獲取子序列(根據字節獲取,非字符)
# 參數:
     # name,Redis 的 name
     # start,起始位置(字節)
     # end,結束位置(字節)
# 如: "武沛齊" ,0-3表示 "武"

setrange(name, offset, value)

# 修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加)
# 參數:
     # offset,字符串的索引,字節(一個漢字三個字節)
     # value,要設置的值

setbit(name, offset, value)

# 對name對應值的二進制表示的位進行操做
 
# 參數:
     # name,redis的name
     # offset,位的索引(將值變換成二進制後再進行索引)
     # value,值只能是 1 或 0
 
# 注:若是在Redis中有一個對應: n1 = "foo",
         那麼字符串foo的二進制表示爲: 01100110  01101111  01101111
     因此,若是執行 setbit( 'n1' 7 1 ),則就會將第 7 位設置爲 1
         那麼最終二進制則變成  01100111  01101111  01101111 ,即: "goo"
 
# 擴展,轉換二進制表示:
 
     # source = "武沛齊"
     source  =  "foo"
 
     for  in  source:
         num  =  ord (i)
         print  bin (num).replace( 'b' ,'')
 
     特別的,若是source是漢字  "武沛齊" 怎麼辦?
     答:對於utf - 8 ,每個漢字佔  3  個字節,那麼  "武沛齊"  則有  9 個字節
        對於漢字, for 循環時候會按照 字節 迭代,那麼在迭代時,將每個字節轉換 十進制數,而後再將十進制數轉換成二進制
         11100110  10101101  10100110  11100110  10110010  10011011  11101001  10111101  10010000
         - - - - - - - - - - - - - - - - - - - - - - - - - -  - - - - - - - - - - - - - - - - - - - - - - - - - - - - -  - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                     武                         沛                           齊

*用途舉例,用最省空間的方式,存儲在線用戶數及分別是哪些用戶在線

getbit(name, offset)

# 獲取name對應的值的二進制表示中的某位的值 (0或1)

bitcount(key, start=None, end=None)

# 獲取name對應的值的二進制表示中 1 的個數
# 參數:
     # key,Redis的name
     # start,位起始位置
     # end,位結束位置

strlen(name)

# 返回name對應值的字節長度(一個漢字3個字節)

incr(self, name, amount=1)

# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。
 
# 參數:
     # name,Redis的name
     # amount,自增數(必須是整數)
 
# 注:同incrby

incrbyfloat(self, name, amount=1.0)

# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。
 
# 參數:
     # name,Redis的name
     # amount,自增數(浮點型)

decr(self, name, amount=1)

# 自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減。
 
# 參數:
     # name,Redis的name
     # amount,自減數(整數)

append(key, value)

# 在redis name對應的值後面追加內容
 
# 參數:
     key, redis的name
     value, 要追加的字符串

  

2. Hash操做

hash表現形式上有些像pyhton中的dict,能夠存儲一組關聯性較強的數據 , redis中Hash在內存中的存儲格式以下圖:  

 

hset(name, key, value)

# name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改)
 
# 參數:
     # name,redis的name
     # key,name對應的hash中的key
     # value,name對應的hash中的value
 
# 注:
     # hsetnx(name, key, value),當name對應的hash中不存在當前key時則建立(至關於添加)

hmset(name, mapping)

# 在name對應的hash中批量設置鍵值對
 
# 參數:
     # name,redis的name
     # mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
 
# 如:
     # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

hget(name,key)

# 在name對應的hash中獲取根據key獲取value

hmget(name, keys, *args)

# 在name對應的hash中獲取多個key的值
 
# 參數:
     # name,reids對應的name
     # keys,要獲取key集合,如:['k1', 'k2', 'k3']
     # *args,要獲取的key,如:k1,k2,k3
 
# 如:
     # r.mget('xx', ['k1', 'k2'])
     # 或
     # print r.hmget('xx', 'k1', 'k2')

hgetall(name)

獲取name對應 hash 的全部鍵值

hlen(name)

# 獲取name對應的hash中鍵值對的個數

hkeys(name)

# 獲取name對應的hash中全部的key的值

hvals(name)

# 獲取name對應的hash中全部的value的值

hexists(name, key)

# 檢查name對應的hash是否存在當前傳入的key

hdel(name,*keys)

# 將name對應的hash中指定key的鍵值對刪除

hincrby(name, key, amount=1)

# 自增name對應的hash中的指定key的值,不存在則建立key=amount
# 參數:
     # name,redis中的name
     # key, hash對應的key
     # amount,自增數(整數)

hincrbyfloat(name, key, amount=1.0)

# 自增name對應的hash中的指定key的值,不存在則建立key=amount
 
# 參數:
     # name,redis中的name
     # key, hash對應的key
     # amount,自增數(浮點數)
 
# 自增name對應的hash中的指定key的值,不存在則建立key=amount

hscan(name, cursor=0, match=None, count=None)

Start a full hash scan with:

HSCAN myhash 0

Start a hash scan with fields matching a pattern with:

HSCAN myhash 0 MATCH order_*

Start a hash scan with fields matching a pattern and forcing the scan command to do more scanning with:

HSCAN myhash 0 MATCH order_* COUNT 1000

 

# 增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而放置內存被撐爆
 
# 參數:
     # name,redis的name
     # cursor,遊標(基於遊標分批取獲取數據)
     # match,匹配指定key,默認None 表示全部的key
     # count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
 
# 如:
     # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
     # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
     # ...
     # 直到返回值cursor的值爲0時,表示數據已經經過分片獲取完畢

 

hscan_iter(name, match=None, count=None)

# 利用yield封裝hscan建立生成器,實現分批去redis中獲取數據
  
# 參數:
     # match,匹配指定key,默認None 表示全部的key
     # count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
  
# 如:
     # for item in r.hscan_iter('xx'):
     #     print item

 

3. list

List操做,redis中的List在在內存中按照一個name對應一個List來存儲。如圖:  

lpush(name,values)

# 在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊
 
# 如:
     # r.lpush('oo', 11,22,33)
     # 保存順序爲: 33,22,11
 
# 擴展:
     # rpush(name, values) 表示從右向左操做

lpushx(name,value)

# 在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊
 
# 更多:
     # rpushx(name, value) 表示從右向左操做

llen(name)

# name對應的list元素的個數

linsert(name, where, refvalue, value))

# 在name對應的列表的某一個值前或後插入一個新值
 
# 參數:
     # name,redis的name
     # where,BEFORE或AFTER
     # refvalue,標杆值,即:在它先後插入數據
     # value,要插入的數據

r.lset(name, index, value)

# 對name對應的list中的某一個索引位置從新賦值
 
# 參數:
     # name,redis的name
     # index,list的索引位置
     # value,要設置的值

r.lrem(name, value, num)

# 在name對應的list中刪除指定的值
 
# 參數:
     # name,redis的name
     # value,要刪除的值
     # num,  num=0,刪除列表中全部的指定值;
            # num=2,從前到後,刪除2個;
            # num=-2,從後向前,刪除2個

lpop(name)

# 在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素
 
# 更多:
     # rpop(name) 表示從右向左操做

lindex(name, index)

在name對應的列表中根據索引獲取列表元素

lrange(name, start, end)

# 在name對應的列表分片獲取數據
# 參數:
     # name,redis的name
     # start,索引的起始位置
     # end,索引結束位置

ltrim(name, start, end)

# 在name對應的列表中移除沒有在start-end索引之間的值
# 參數:
     # name,redis的name
     # start,索引的起始位置
     # end,索引結束位置

rpoplpush(src, dst)

# 從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊
# 參數:
     # src,要取數據的列表的name
     # dst,要添加數據的列表的name

blpop(keys, timeout)

# 將多個列表排列,按照從左到右去pop對應列表的元素
 
# 參數:
     # keys,redis的name的集合
     # timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
 
# 更多:
     # r.brpop(keys, timeout),從右向左獲取數據

brpoplpush(src, dst, timeout=0)

# 從一個列表的右側移除一個元素並將其添加到另外一個列表的左側
 
# 參數:
     # src,取出並要移除元素的列表對應的name
     # dst,要插入元素的列表對應的name
     # timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞

 

4.set集合操做

Set操做,Set集合就是不容許重複的列表

sadd(name,values)

# name對應的集合中添加元素

scard(name)

獲取name對應的集合中元素個數

sdiff(keys, *args)

在第一個name對應的集合中且不在其餘name對應的集合的元素集合

sdiffstore(dest, keys, *args)

# 獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中

sinter(keys, *args)

# 獲取多一個name對應集合的並集

sinterstore(dest, keys, *args)

# 獲取多一個name對應集合的並集,再講其加入到dest對應的集合中

sismember(name, value)

# 檢查value是不是name對應的集合的成員

smembers(name)

# 獲取name對應的集合的全部成員

smove(src, dst, value)

# 將某個成員從一個集合中移動到另一個集合

spop(name)

# 從集合的右側(尾部)移除一個成員,並將其返回

srandmember(name, numbers)

# 從name對應的集合中隨機獲取 numbers 個元素

srem(name, values)

# 在name對應的集合中刪除某些值

sunion(keys, *args)

# 獲取多一個name對應的集合的並集

sunionstore(dest,keys, *args)

# 獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中

sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)

# 同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大

 

有序集合,在集合的基礎上,爲每元素排序;元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。

zadd(name, *args, **kwargs)

# 在name對應的有序集合中添加元素
# 如:
      # zadd('zz', 'n1', 1, 'n2', 2)
      # 或
      # zadd('zz', n1=11, n2=22)

zcard(name)

# 獲取name對應的有序集合元素的數量

zcount(name, min, max)

# 獲取name對應的有序集合中分數 在 [min,max] 之間的個數

zincrby(name, value, amount)

# 自增name對應的有序集合的 name 對應的分數

r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)

# 按照索引範圍獲取name對應的有序集合的元素
 
# 參數:
     # name,redis的name
     # start,有序集合索引發始位置(非分數)
     # end,有序集合索引結束位置(非分數)
     # desc,排序規則,默認按照分數從小到大排序
     # withscores,是否獲取元素的分數,默認只獲取元素的值
     # score_cast_func,對分數進行數據轉換的函數
 
# 更多:
     # 從大到小排序
     # zrevrange(name, start, end, withscores=False, score_cast_func=float)
 
     # 按照分數範圍獲取name對應的有序集合的元素
     # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
     # 從大到小排序
     # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

zrank(name, value)

# 獲取某個值在 name對應的有序集合中的排行(從 0 開始)
 
# 更多:
     # zrevrank(name, value),從大到小排序

 

zrem(name, values)

# 刪除name對應的有序集合中值是values的成員
 
# 如:zrem('zz', ['s1', 's2'])

zremrangebyrank(name, min, max)

# 根據排行範圍刪除

zremrangebyscore(name, min, max)

# 根據分數範圍刪除

zscore(name, value)

# 獲取name對應有序集合中 value 對應的分數

zinterstore(dest, keys, aggregate=None)

# 獲取兩個有序集合的交集,若是遇到相同值不一樣分數,則按照aggregate進行操做
# aggregate的值爲:  SUM  MIN  MAX

zunionstore(dest, keys, aggregate=None)

# 獲取兩個有序集合的並集,若是遇到相同值不一樣分數,則按照aggregate進行操做
# aggregate的值爲:  SUM  MIN  MAX

zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)

# 同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做

 

其餘經常使用操做

delete(*names)

# 根據刪除redis中的任意數據類型

exists(name)

# 檢測redis的name是否存在

keys(pattern='*')

# 根據模型獲取redis的name
 
# 更多:
     # KEYS * 匹配數據庫中全部 key 。
     # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
     # KEYS h*llo 匹配 hllo 和 heeeeello 等。
     # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

expire(name ,time)

# 爲某個redis的某個name設置超時時間

rename(src, dst)

# 對redis的name重命名爲

move(name, db))

# 將redis的某個值移動到指定的db下

randomkey()

# 隨機獲取一個redis的name(不刪除)

type(name)

# 獲取name對應值的類型

scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)

# 同字符串操做,用於增量迭代獲取key

 

管道

redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis
import time

pool = redis.ConnectionPool(host='192.168.1.40',port=6379)

r = redis.Redis(connection_pool=pool)

# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)

pipe.set('name', 'alex')
# time.sleep(50)
pipe.set('role', 'sb')

pipe.execute()

 

發佈\訂閱

 

 

 

發佈者:服務器

訂閱者:Dashboad和數據處理

Demo以下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis


class RedisHelper:

    def __init__(self):
        self.__conn = redis.Redis(host='192.168.1.40')
        self.chan_sub = 'fm104.5'
        self.chan_pub = 'fm104.5'

    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    def subscribe(self):
        pub = self.__conn.pubsub()  #打開收音機
        pub.subscribe(self.chan_sub)    #調頻道
        pub.parse_response()    #準備接收
        return pub

訂閱者

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from redis_helper import RedisHelper

obj = RedisHelper() redis_sub = obj.subscribe() while True: msg = redis_sub.parse_response() print(msg)

發佈者

#!/usr/bin/env python
# -*- coding:utf-8 -*-



from redis_helper import RedisHelper

obj = RedisHelper()
obj.public('hello')

 

 

做業一:

題目:IO多路複用版FTP

需求:

  1. 實現文件上傳及下載功能
  2. 支持多鏈接併發傳文件
  3. 使用select or selectors

 

做業二:

題目:rpc命令端

需求:

  1. 能夠異步的執行多個命令
  2. 對多臺機器

>>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 45334>>: check_task 45334>>:

相關文章
相關標籤/搜索