【Python第九篇】異步IO\數據庫\隊列\緩存

本節內容javascript

  1. Select\Poll\Epoll異步IO與事件驅動
  2. Paramiko SSH
  3. RabbitMQ隊列
  4. Redis緩存
  5. pymsql操做
  6. SQLAlchemy

Select\Poll\Epoll異步IO

參考:http://www.cnblogs.com/alex3714/p/4372426.html html

番外篇 http://www.cnblogs.com/alex3714/articles/5876749.htmljava

select 多併發socket 例子python

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#  Author fuyf

import select,socket,sys,queue

server = socket.socket()
server.setblocking(0)

server_addr = ('localhost',9999)

print('starting up on %s port %s' % server_addr)
server.bind(server_addr)

server.listen(5)


inputs = [server, ] #本身也要監測呀,由於server自己也是個fd
outputs = []

message_queues = {}

while True:
    print("waiting for next event...")

    readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏

    for s in readable: #每一個s就是一個socket

        if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了,
            #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀
            #新鏈接進來了,接受這個鏈接
            conn, client_addr = s.accept()
            print("new connection from",client_addr)
            conn.setblocking(0)
            inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接
            #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到
            #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的

            message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送

        else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了
            #客戶端的數據過來了,在這接收
            data = s.recv(1024)
            if data:
                print("收到來自[%s]的數據:" % s.getpeername()[0], data)
                message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端
                if s not  in outputs:
                    outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端


            else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀
                print("客戶端斷開了",s)

                if s in outputs:
                    outputs.remove(s) #清理已斷開的鏈接

                inputs.remove(s) #清理已斷開的鏈接

                del message_queues[s] ##清理已斷開的鏈接


    for s in writeable:
        try :
            next_msg = message_queues[s].get_nowait()

        except queue.Empty:
            print("client [%s]" %s.getpeername()[0], "queue is empty..")
            outputs.remove(s)

        else:
            print("sending msg to [%s]"%s.getpeername()[0], next_msg)
            s.send(next_msg.upper())


    for s in exeptional:
        print("handling exception for ",s.getpeername())
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        del message_queues[s]
select socket server
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#  Author fuyf

import socket,sys

messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 9999)

# Create a TCP/IP socket
socks = [ socket.socket(),
          socket.socket(),]

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )
select socket client.py

selectors模塊mysql

import selectors
import socket
 
sel = selectors.DefaultSelector()
 
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
 
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
 
sock = socket.socket()
sock.bind(('localhost', 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
 
while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)
View Code

Paramiko模塊

Python的paramiko模塊,該模塊機遇SSH用於鏈接遠程服務器並執行相關操做linux

SSHClientgit

用於鏈接遠程服務器並執行基本命令github

基於用戶名密碼鏈接:web

import paramiko

# 建立SSH對象
ssh = paramiko.SSHClient()
# 容許鏈接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 鏈接服務器
ssh.connect(hostname='192.168.69.105', port=22, username='root', password='servyou')

# 執行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結果
result = stdout.read().decode()

print(result)
# 關閉鏈接
ssh.close()
import paramiko

transport = paramiko.Transport(('192.168.69.105', 22))
transport.connect(username='root', password='servyou')

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')
print(stdout.read().decode())

transport.close()
SSHClient 封裝 Transport

基於公鑰密鑰鏈接:redis

import paramiko
 
private_key = paramiko.RSAKey.from_private_key_file('id_rsa.txt')
 
# 建立SSH對象
ssh = paramiko.SSHClient()
# 容許鏈接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 鏈接服務器
ssh.connect(hostname='192.168.69.105', port=22, username='root', key=private_key)
 
# 執行命令
stdin, stdout, stderr = ssh.exec_command('df')

stdin, stdout, stderr = ssh.exec_command('df;ifconfig') # 多個命令用;隔開
# 獲取命令結果
res,err = stdout.read().decode(),stderr.read().decode()
result = res if res else err  # 判斷res和err哪一個有輸出
 
# 關閉鏈接
ssh.close()
import paramiko

private_key = paramiko.RSAKey.from_private_key_file('id_rsa.txt')

transport = paramiko.Transport(('192.168.69.105', 22))
transport.connect(username='root', pkey=private_key)

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')
result = stdout.read().decode()
transport.close()
SSHClient 封裝 Transport

SFTPClient

用於鏈接遠程服務器並執行上傳下載

基於用戶名密碼上傳下載

import paramiko
 
transport = paramiko.Transport(('192.168.69.105',22))
transport.connect(username='root',password='servyou')

sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path
sftp.get('remove_path', 'local_path')
 
transport.close()

基於公鑰密鑰上傳下載

import paramiko
 
private_key = paramiko.RSAKey.from_private_key_file('id_rsa.txt')
 
transport = paramiko.Transport(('192.168.69.105', 22))
transport.connect(username='root', pkey=private_key )
 
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path
sftp.get('remove_path', 'local_path')

transport.close()
View Code

RabbitMQ隊列

    RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。
    MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。

1、安裝RabbitMQ

windows:

先安裝Erlang,在安裝RabbitMQ,安裝完成後會在生成系統服務,並確保在啓動狀態

若是啓動報錯:產生pika.exceptions.ConnectionClosed錯誤,是由於沒有開啓rabbitmq服務。

Linux:

# RabbitMQ服務端
rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm  # 安裝配置epel源
yum -y install erlang                     # 安裝erlang
yum -y install rabbitmq-server        # 安裝RabbitMQ

# RabbitMQ的API
pip install pika
or
easy_install pika
or
源碼
https://pypi.python.org/pypi/pika

2、RabbitMQ應用場景

    RabbitMQ是一個消息代理,從「生產者」接收消息並傳遞消息至「消費者」,期間可根據規則路由、緩存、持久化消息。「生產者」也即message發送者如下簡稱P,相對應的「消費者」乃message接收者如下簡稱C,message經過queue由P到C,queue存在於RabbitMQ,可存儲儘量多的message,多個P可向同一queue發送message,多個C可從同一個queue接收message。

應用場景1-「Hello Word」

一個P向queue發送一個message,一個C從該queue接收message並打印。

producer,鏈接至RabbitMQ Server,聲明隊列,發送message,關閉鏈接,退出。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#  Author fuyf

import pika

# 與RabbitMQ Server創建鏈接
# 鏈接到的broker在本機-localhost上
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 聲明隊列以向其發送消息消息
# 向不存在的位置發送消息時RabbitMQ將消息丟棄
# queue='hello'指定隊列名字
channel.queue_declare(queue='hello')

# message不能直接發送給queue,需經exchange到達queue,此處使用以空字符串標識的默認的exchange
# 使用默認exchange時容許經過routing_key明確指定message將被髮送給哪一個queue
# body參數指定了要發送的message內容
channel.basic_publish(exchange='',routing_key='hello',body='Hello World!')

print(" [x] Sent 'Hello World!'")
# 關閉與RabbitMq Server間的鏈接
connection.close()
send.py

consumer,鏈接至RabbitMQ Server,聲明隊列,接收消息並進行處理這裏爲打印出消息,退出。

import pika

# 創建到達RabbitMQ Server的connection
# 此處RabbitMQ Server位於本機-localhost
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 聲明queue,確認要從中接收message的queue
# queue_declare函數是冪等的,可運行屢次,但只會建立一次
# 若能夠確信queue是已存在的,則此處可省略該聲明,如producer已經生成了該queue
# 但在producer和consumer中重複聲明queue是一個好的習慣
channel.queue_declare(queue='hello')
# 定義回調函數
# 一旦從queue中接收到一個message回調函數將被調用
# ch:channel
# method:
# properties:
# body:message
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 從queue接收message的參數設置
# 包括從哪一個queue接收message,用於處理message的callback,是否要確認message
# 默認狀況下是要對消息進行確認的,以防止消息丟失。
# 此處將no_ack明確指明爲True,不對消息進行確認。
channel.basic_consume(callback,queue='hello',no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
# 開始循環從queue中接收message並使用callback進行處理
channel.start_consuming()
receive.py

應用場景2-work queues

將耗時的消息處理經過隊列分配給多個consumer來處理,咱們稱此處的consumer爲worker,咱們將此處的queue稱爲Task Queue,其目的是爲了不資源密集型的task的同步處理,也即當即處理task並等待完成。相反,調度task使其稍後被處理。也即把task封裝進message併發送到task queue,worker進程在後臺運行,從task queue取出task並執行job,若運行了多個worker,則task可在多個worker間分配。


創建鏈接,聲明隊列,發送能夠模擬耗時任務的message,斷開鏈接、退出。

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 僅僅對message進行確認不能保證message不丟失,好比RabbitMQ崩潰了queue就會丟失
# 所以還需使用durable=True聲明queue是持久化的,這樣即使Rabb崩潰了重啓後queue仍然存在
channel.queue_declare(queue='task_queue', durable=True)

# 從命令行構造將要發送的message
message = ' '.join(sys.argv[1:]) or "Hello World!"

# 除了要聲明queue是持久化的外,還需聲明message是持久化的
# basic_publish的properties參數指定message的屬性
# 此處pika.BasicProperties中的delivery_mode=2指明message爲持久的
# 這樣一來RabbitMQ崩潰重啓後queue仍然存在其中的message也仍然存在
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 確保消息持久化
                      ))
print(" [x] Sent %r" % (message,))
connection.close()
new_task.py

創建鏈接,聲明隊列,不斷的接收message,處理任務,進行確認。

import pika
import time

# 默認狀況RabbitMQ將message以round-robin方式發送給下一個consumer
# 每一個consumer接收到的平均message量是同樣的
# 能夠同時運行兩個或三個該程序進行測試

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

# 僅僅對message進行確認不能保證message不丟失,好比RabbitMQ崩潰了
# 還需使用durable=True聲明queue是持久化的,這樣即使Rabb崩潰了重啓後queue仍然存在其中的message不會丟失
# RabbitMQ中不容許使用不一樣的參數定義同名queue
channel.queue_declare(queue='task_queue', durable=True)

# 回調函數,函數體模擬耗時的任務處理
def callback(ch, method, properties, body):
    print(" [x] Received %r" % (body,))
    time.sleep(20)
    print(" [x] Done")
    # 對message進行確認
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 若存在多個consumer每一個consumer的負載可能不一樣,有些處理的快有些處理的慢
# RabbitMQ並無論這些,只是簡單的以round-robin的方式分配message
# 這可能形成某些consumer積壓不少任務處理不完而一些consumer長期處於飢餓狀態
# 可使用prefetch_count=1的basic_qos方法可告知RabbitMQ只有在consumer處理並確認了上一個message後才分配新的message給他
# 不然分給另外一個空閒的consumer
channel.basic_qos(prefetch_count=1)
# 這裏移除了no_ack=True這個參數,也即須要對message進行確認(默認行爲)
# 不然consumer在偶然down後其正在處理和分配到該consumer還未處理的message可能發生丟失
# 由於此時RabbitMQ在發送完message後當即從內存刪除該message
# 假如沒有設置no_ack=True則consumer在偶然down掉後其正在處理和分配至該consumer但還將來得及處理的message會從新分配到其餘consumer
# 沒有設置no_ack=True則consumer在收到message後會向RabbitMQ反饋已收到並處理了message告訴RabbitMQ能夠刪除該message
# RabbitMQ中沒有超時的概念,只有在consumer down掉後從新分發message
channel.basic_consume(callback,
                      queue='task_queue')

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
worker.py

參數設置解釋:

  • durable=True:指定durable參數爲真,隊列將持久化;

  • properties=pika.BasicProperties(delivery_mode = 2,): #啓用消息持久化,能夠防止RabbitMQ Server 重啓或者crash引發的數據丟失。

  • ch.basic_ack(delivery_tag = method.delivery_tag):告訴rabbitmq消息已經正確處理。若是沒有這條代碼,Consumer退出時,Message會從新分發。而後RabbitMQ會佔用愈來愈多的內存,因爲RabbitMQ會長時間運行,所以這個「內存泄漏」是致命的。

  • no_ack=True:默認爲假。設置no-ack參數爲真,每次Consumer接到數據後,而無論是否處理完成,RabbitMQ Server會當即把這個Message標記爲完成,而後從queue中刪除了。爲了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。爲了保證數據能被正確處理而不只僅是被Consumer收到,那麼咱們不能採用no-ack。而應該是在處理完數據後發送ack。

  • channel.basic_qos(prefetch_count=1)公平分發。這樣RabbitMQ就會使得每一個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。

應用場景3-Publish/Subscribe(消息發佈/訂閱)

在應用場景2中一個message(task)僅被傳遞給了一個comsumer(worker)。如今咱們設法將一個message傳遞給多個consumer。這種模式被稱爲publish/subscribe。此處以一個簡單的日誌系統爲例進行說明。該系統包含一個log發送程序和一個log接收並打印的程序。由log發送者發送到queue的消息能夠被全部運行的log接收者接收。所以,咱們能夠運行一個log接收者直接在屏幕上顯示log,同時運行另外一個log接收者將log寫入磁盤文件。相似廣播的效果。

fanout: 全部bind到此exchange的queue均可以接收消息。

日誌消息發送者:創建鏈接,聲明fanout類型的exchange,經過exchage向queue發送日誌消息,消息被廣播給全部接收者,關閉鏈接,退出。

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# producer只能經過exchange將message發給queue
# exchange的類型決定將message路由至哪些queue
# 可用的exchange類型:direct\topic\headers\fanout
# 此處定義一個名稱爲'logs'的'fanout'類型的exchange,'fanout'類型的exchange簡單的將message廣播到它所知道的全部queue
channel.exchange_declare(exchange='logs',type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
# 將message publish到名爲log的exchange中
# 由於是fanout類型的exchange,這裏無需指定routing_key
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

print(" [x] Sent %r" % (message,))

connection.close()
生產者

日誌消息接收者:創建鏈接,聲明exchange,將exchange與queue進行綁定,開始不停的接收log並打印。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 做爲好的習慣,在producer和consumer中分別聲明一次以保證所要使用的exchange存在
channel.exchange_declare(exchange='logs',type='fanout')

# 在不一樣的producer和consumer間共享queue時指明queue的name是重要的
# 但某些時候,好比日誌系統,須要接收全部的log message而非一個子集
# 並且僅對當前的message 流感興趣,對於過期的message不感興趣,那麼
# 能夠申請一個臨時隊列這樣,每次鏈接到RabbitMQ時會以一個隨機的名字生成
# 一個新的空的queue,將exclusive置爲True,這樣在consumer從RabbitMQ斷開後會刪除該queue
result = channel.queue_declare(exclusive=True)
# 用於獲取臨時queue的name
queue_name = result.method.queue
# exchange與queue之間的關係成爲binding
# binding告訴exchange將message發送該哪些queue
channel.queue_bind(exchange='logs',queue=queue_name)

def callback(ch, method, properties, body):
    print(" [x] %r" % (body,))

# 從指定地queue中consume message且不確認
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
消費者

應用場景4-有選擇的接收消息(exchange type=direct) 

direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字斷定應該將數據發送至指定隊列。

應用場景3中構建了簡單的log系統,能夠將log message廣播至多個receiver。如今咱們將考慮只把指定的message類型發送給其subscriber,好比只把error message寫到log file而將全部log message顯示在控制檯。

log message發送者:創建鏈接,聲明direct類型的exchange,生成併發送log message到exchange,關閉鏈接,退出。

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

# 聲明一個名爲direct_logs的direct類型的exchange
# direct類型的exchange
channel.exchange_declare(exchange='direct_logs',
                         type='direct')

# 從命令行獲取basic_publish的配置參數
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 向名爲direct_logs的exchage按照設置的routing_key發送message
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

print(" [x] Sent %r:%r" % (severity, message))
connection.close()
生產者

log message接收者:創建鏈接,聲明direct類型的exchange,聲明queue,使用提供的參數做爲routing_key將queue綁定到exchange,開始循環接收log message並打印。

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

# 聲明一個名爲direct_logs類型爲direct的exchange
# 同時在producer和consumer中聲明exchage或queue是個好習慣,以保證其存在
channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 從命令行獲取參數:routing_key
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    # exchange和queue之間的binding可接受routing_key參數
    # 該參數的意義依賴於exchange的類型
    # fanout類型的exchange直接忽略該參數
    # direct類型的exchange精確匹配該關鍵字進行message路由
    # 對多個queue使用相同的binding_key是合法的
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
消費者

應用場景5-topic

topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

應用場景4中改進的log系統中用direct類型的exchange替換應用場景3中的fanout類型exchange實現將不一樣的log message發送給不一樣的subscriber(也即分別經過不一樣的routing_key將queue綁定到exchange,這樣exchange即可將不一樣的message根據message內容路由至不一樣的queue)。但仍然存在限制,不能根據多個規則路由消息,好比接收者要麼只能收error類型的log message要麼只能收info類型的message。若是咱們不只想根據log的重要級別如info、warning、error等來進行log message路由還想同時根據log message的來源如auth、cron、kern來進行路由。爲了達到此目的,須要topic類型的exchange。topic類型的exchange中routing_key中能夠包含兩個特殊字符:「*」用於替代正好一個詞,「#」用於0個或多個詞。

log message接收者:創建鏈接,聲明topic類型的exchange,聲明queue,根據程序參數構造routing_key,根據routing_key將queue綁定到exchange,循環接收並處理message。

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
消費者

log message發送者:創建鏈接、聲明topic類型的exchange、根據程序參數構建routing_key和要發送的message,以構建的routing_key將message發送給topic類型的exchange,關閉鏈接,退出。

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
生產者

應用場景6-RPC

在應用場景2中描述瞭如何使用work queue將耗時的task分配到不一樣的worker中。可是,若是咱們task是想在遠程的計算機上運行一個函數並等待返回結果呢。這根場景2中的描述是一個徹底不一樣的故事。這一模式被稱爲遠程過程調用。如今,咱們將構建一個RPC系統,包含一個client和可擴展的RPC server,經過返回斐波那契數來模擬RPC service。

RPC server:創建鏈接,聲明queue,定義了一個返回指定數字的斐波那契數的函數,定義了一個回調函數在接收到包含參數的調用請求後調用本身的返回斐波那契數的函數並將結果發送到與接收到message的queue相關聯的queue,並進行確認。開始接收調用請求並用回調函數進行請求處理。

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id= \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()
rpc_server.py

RPC client:遠程過程調用發起者:定義了一個類,類中初始化到RabbitMQ Server的鏈接、聲明回調queue、開始在回調queue上等待接收響應、定義了在回調queue上接收到響應後的處理函數on_response根據響應關聯的correlation_id屬性做出響應、定義了調用函數並在其中向調用queue發送包含correlation_id等屬性的調用請求、初始化一個client實例,以30爲參數發起遠程過程調用。

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(7)")
response = fibonacci_rpc.call(7)
print(" [.] Got %r" % response)
rpc_client.py

RabbitMQ web管理工具

默認安裝的Rabbit MQ 監聽端口是5672。

使用Rabbit MQ 管理插件,能夠更好的可視化方式查看Rabbit MQ 服務器實例的狀態:先定位到rabbitmq安裝目錄, 而後輸入命令rabbitmq-plugins enable rabbitmq_management。啓動後直接在瀏覽器地址輸入:http://localhost:15672/ 帳號密碼都是:guest 便可。

Redis

redis是一個key-value存儲系統。和Memcached相似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操做,並且這些操做都是原子性的。在此基礎上,redis支持各類不一樣方式的排序。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。

1、Redis安裝和基本使用

1
2
3
4
wget http: / / download.redis.io / releases / redis - 3.0 . 6.tar .gz
tar xzf redis - 3.0 . 6.tar .gz
cd redis - 3.0 . 6
make

啓動服務端

1
src / redis - server

啓動客戶端

1
2
3
4
5
src / redis - cli
redis>  set  foo bar
OK
redis> get foo
"bar"

2、Python操做Redis

1
2
3
4
5
6
7
sudo pip install redis
or
sudo easy_install redis
or
源碼安裝
詳見:https: / / github.com / WoLpH / redis - py

API使用

redis-py 的API的使用能夠分類爲:

  • 鏈接方式
  • 鏈接池
  • 操做
    • String 操做
    • Hash 操做
    • List 操做
    • Set 操做
    • Sort Set 操做
  • 管道
  • 發佈訂閱

一、鏈接方式

redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類,用於向後兼容舊版本的redis-py。

import redis

r = redis.Redis(host='192.168.69.102', port=6379)
r.set('foo', 'Bar')
print(r.get('foo').decode())

二、鏈接池

redis-py使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。 

import redis
pool = redis.ConnectionPool(host='192.168.69.102', port=6379)
r = redis.Redis(connection_pool=pool)
r.set('name','chris')
print(r.get('name').decode())

三、操做

  • String操做,redis中的String在在內存中按照一個name對應一個value來存儲。

set(name, value, ex=None, px=None, nx=False, xx=False)

1
2
3
4
5
6
在Redis中設置值,默認,不存在則建立,存在則修改
參數:
      ex,過時時間(秒)
      px,過時時間(毫秒)
      nx,若是設置爲True,則只有name不存在時,當前set操做才執行
      xx,若是設置爲True,則只有name存在時,崗前set操做才執行

setnx(name, value)

1
設置值,只有name不存在時,執行設置操做(添加)

setex(name, value, time)

1
2
3
# 設置值
# 參數:
     # time,過時時間(數字秒 或 timedelta對象)

psetex(name, time_ms, value)

1
2
3
# 設置值
# 參數:
     # time_ms,過時時間(數字毫秒 或 timedelta對象)

mset(*args, **kwargs)

1
2
3
4
5
批量設置值
如:
     mset(k1= 'v1' , k2= 'v2' )
    
     mget({ 'k1' 'v1' 'k2' 'v2' })

get(name)

1
獲取值

mget(keys, *args)

1
2
3
4
5
批量獲取
如:
     mget( 'ylr' 'wupeiqi' )
    
     r.mget([ 'ylr' 'wupeiqi' ])

getset(name, value)

1
設置新值並獲取原來的值

getrange(key, start, end)

1
2
3
4
5
6
# 獲取子序列(根據字節獲取,非字符)
# 參數:
     # name,Redis 的 name
     # start,起始位置(字節)
     # end,結束位置(字節)
# 如: "武沛齊" ,0-3表示 "武"

setrange(name, offset, value)

1
2
3
4
# 修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加)
# 參數:
     # offset,字符串的索引,字節(一個漢字三個字節)
     # value,要設置的值

setbit(name, offset, value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 對name對應值的二進制表示的位進行操做
 
# 參數:
     # name,redis的name
     # offset,位的索引(將值變換成二進制後再進行索引)
     # value,值只能是 1 或 0
 
# 注:若是在Redis中有一個對應: n1 = "foo",
         那麼字符串foo的二進制表示爲: 01100110  01101111  01101111
     因此,若是執行 setbit( 'n1' 7 1 ),則就會將第 7 位設置爲 1
         那麼最終二進制則變成  01100111  01101111  01101111 ,即: "goo"
 
# 擴展,轉換二進制表示:
 
     # source = "武沛齊"
     source  =  "foo"
 
     for  in  source:
         num  =  ord (i)
         print  bin (num).replace( 'b' ,'')
 
     特別的,若是source是漢字  "武沛齊" 怎麼辦?
     答:對於utf - 8 ,每個漢字佔  3  個字節,那麼  "武沛齊"  則有  9 個字節
        對於漢字, for 循環時候會按照 字節 迭代,那麼在迭代時,將每個字節轉換 十進制數,而後再將十進制數轉換成二進制
         11100110  10101101  10100110  11100110  10110010  10011011  11101001  10111101  10010000
         - - - - - - - - - - - - - - - - - - - - - - - - - -  - - - - - - - - - - - - - - - - - - - - - - - - - - - - -  - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                     武                         沛                           齊

getbit(name, offset)

1
# 獲取name對應的值的二進制表示中的某位的值 (0或1)

bitcount(key, start=None, end=None)

1
2
3
4
5
# 獲取name對應的值的二進制表示中 1 的個數
# 參數:
     # key,Redis的name
     # start,位起始位置
     # end,位結束位置

bitop(operation, dest, *keys)

1
2
3
4
5
6
7
8
9
10
# 獲取多個值,並將值作位運算,將最後的結果保存至新的name對應的值
 
# 參數:
     # operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或)
     # dest, 新的Redis的name
     # *keys,要查找的Redis的name
 
# 如:
     bitop( "AND" 'new_name' 'n1' 'n2' 'n3' )
     # 獲取Redis中n1,n2,n3對應的值,而後講全部的值作位運算(求並集),而後將結果保存 new_name 對應的值中

strlen(name)

1
# 返回name對應值的字節長度(一個漢字3個字節)

incr(self, name, amount=1)

1
2
3
4
5
6
7
# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。
 
# 參數:
     # name,Redis的name
     # amount,自增數(必須是整數)
 
# 注:同incrby

incrbyfloat(self, name, amount=1.0)

1
2
3
4
5
# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。
 
# 參數:
     # name,Redis的name
     # amount,自增數(浮點型)

decr(self, name, amount=1)

1
2
3
4
5
# 自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減。
 
# 參數:
     # name,Redis的name
     # amount,自減數(整數)

append(key, value)

1
2
3
4
5
# 在redis name對應的值後面追加內容
 
# 參數:
     key, redis的name
     value, 要追加的字符串
  • Hash操做,redis中Hash在內存中的存儲格式以下圖:

hset(name, key, value)

1
2
3
4
5
6
7
8
9
# name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改)
 
# 參數:
     # name,redis的name
     # key,name對應的hash中的key
     # value,name對應的hash中的value
 
# 注:
     # hsetnx(name, key, value),當name對應的hash中不存在當前key時則建立(至關於添加)

hmset(name, mapping)

1
2
3
4
5
6
7
8
# 在name對應的hash中批量設置鍵值對
 
# 參數:
     # name,redis的name
     # mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
 
# 如:
     # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

hget(name,key)

1
# 在name對應的hash中獲取根據key獲取value

hmget(name, keys, *args)

1
2
3
4
5
6
7
8
9
10
11
# 在name對應的hash中獲取多個key的值
 
# 參數:
     # name,reids對應的name
     # keys,要獲取key集合,如:['k1', 'k2', 'k3']
     # *args,要獲取的key,如:k1,k2,k3
 
# 如:
     # r.mget('xx', ['k1', 'k2'])
     # 或
     # print r.hmget('xx', 'k1', 'k2')

hgetall(name)

1
獲取name對應 hash 的全部鍵值

hlen(name)

1
# 獲取name對應的hash中鍵值對的個數

hkeys(name)

1
# 獲取name對應的hash中全部的key的值

hvals(name)

1
# 獲取name對應的hash中全部的value的值

hexists(name, key)

1
# 檢查name對應的hash是否存在當前傳入的key

hdel(name,*keys)

1
# 將name對應的hash中指定key的鍵值對刪除

hincrby(name, key, amount=1)

1
2
3
4
5
# 自增name對應的hash中的指定key的值,不存在則建立key=amount
# 參數:
     # name,redis中的name
     # key, hash對應的key
     # amount,自增數(整數)

hincrbyfloat(name, key, amount=1.0)

1
2
3
4
5
6
7
8
# 自增name對應的hash中的指定key的值,不存在則建立key=amount
 
# 參數:
     # name,redis中的name
     # key, hash對應的key
     # amount,自增數(浮點數)
 
# 自增name對應的hash中的指定key的值,不存在則建立key=amount

hscan(name, cursor=0, match=None, count=None)

1
2
3
4
5
6
7
8
9
10
11
12
13
# 增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而放置內存被撐爆
 
# 參數:
     # name,redis的name
     # cursor,遊標(基於遊標分批取獲取數據)
     # match,匹配指定key,默認None 表示全部的key
     # count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
 
# 如:
     # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
     # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
     # ...
     # 直到返回值cursor的值爲0時,表示數據已經經過分片獲取完畢

hscan_iter(name, match=None, count=None)

1
2
3
4
5
6
7
8
9
# 利用yield封裝hscan建立生成器,實現分批去redis中獲取數據
 
# 參數:
     # match,匹配指定key,默認None 表示全部的key
     # count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
 
# 如:
     # for item in r.hscan_iter('xx'):
     #     print item
  • List操做,redis中的List在在內存中按照一個name對應一個List來存儲。如圖:

lpush(name,values)

1
2
3
4
5
6
7
8
# 在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊
 
# 如:
     # r.lpush('oo', 11,22,33)
     # 保存順序爲: 33,22,11
 
# 擴展:
     # rpush(name, values) 表示從右向左操做

lpushx(name,value)

1
2
3
4
# 在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊
 
# 更多:
     # rpushx(name, value) 表示從右向左操做

llen(name)

1
# name對應的list元素的個數

linsert(name, where, refvalue, value))

1
2
3
4
5
6
7
# 在name對應的列表的某一個值前或後插入一個新值
 
# 參數:
     # name,redis的name
     # where,BEFORE或AFTER
     # refvalue,標杆值,即:在它先後插入數據
     # value,要插入的數據

r.lset(name, index, value)

1
2
3
4
5
6
# 對name對應的list中的某一個索引位置從新賦值
 
# 參數:
     # name,redis的name
     # index,list的索引位置
     # value,要設置的值

r.lrem(name, value, num)

1
2
3
4
5
6
7
8
# 在name對應的list中刪除指定的值
 
# 參數:
     # name,redis的name
     # value,要刪除的值
     # num,  num=0,刪除列表中全部的指定值;
            # num=2,從前到後,刪除2個;
            # num=-2,從後向前,刪除2個

lpop(name)

1
2
3
4
# 在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素
 
# 更多:
     # rpop(name) 表示從右向左操做

lindex(name, index)

1
在name對應的列表中根據索引獲取列表元素

lrange(name, start, end)

1
2
3
4
5
# 在name對應的列表分片獲取數據
# 參數:
     # name,redis的name
     # start,索引的起始位置
     # end,索引結束位置

ltrim(name, start, end)

1
2
3
4
5
# 在name對應的列表中移除沒有在start-end索引之間的值
# 參數:
     # name,redis的name
     # start,索引的起始位置
     # end,索引結束位置

rpoplpush(src, dst)

1
2
3
4
# 從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊
# 參數:
     # src,要取數據的列表的name
     # dst,要添加數據的列表的name

blpop(keys, timeout)

1
2
3
4
5
6
7
8
# 將多個列表排列,按照從左到右去pop對應列表的元素
 
# 參數:
     # keys,redis的name的集合
     # timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
 
# 更多:
     # r.brpop(keys, timeout),從右向左獲取數據

brpoplpush(src, dst, timeout=0)

1
2
3
4
5
6
# 從一個列表的右側移除一個元素並將其添加到另外一個列表的左側
 
# 參數:
     # src,取出並要移除元素的列表對應的name
     # dst,要插入元素的列表對應的name
     # timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞

自定義增量迭代

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 因爲redis類庫中沒有提供對列表元素的增量迭代,若是想要循環name對應的列表的全部元素,那麼就須要:
     # 一、獲取name對應的全部列表
     # 二、循環列表
# 可是,若是列表很是大,那麼就有可能在第一步時就將程序的內容撐爆,全部有必要自定義一個增量迭代的功能:
 
def  list_iter(name):
     """
     自定義redis列表增量迭代
     :param name: redis中的name,即:迭代name對應的列表
     :return: yield 返回 列表元素
     """
     list_count  =  r.llen(name)
     for  index  in  xrange (list_count):
         yield  r.lindex(name, index)
 
# 使用
for  item  in  list_iter( 'pp' ):
     print  item
  • Set操做,Set集合就是不容許重複的列表

sadd(name,values)

1
# name對應的集合中添加元素

scard(name)

1
獲取name對應的集合中元素個數

sdiff(keys, *args)

1
在第一個name對應的集合中且不在其餘name對應的集合的元素集合

sdiffstore(dest, keys, *args)

1
# 獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中

sinter(keys, *args)

1
# 獲取多一個name對應集合的並集

sinterstore(dest, keys, *args)

1
# 獲取多一個name對應集合的並集,再講其加入到dest對應的集合中

sismember(name, value)

1
# 檢查value是不是name對應的集合的成員

smembers(name)

1
# 獲取name對應的集合的全部成員

smove(src, dst, value)

1
# 將某個成員從一個集合中移動到另一個集合

spop(name)

1
# 從集合的右側(尾部)移除一個成員,並將其返回

srandmember(name, numbers)

1
# 從name對應的集合中隨機獲取 numbers 個元素

srem(name, values)

1
# 在name對應的集合中刪除某些值

sunion(keys, *args)

1
# 獲取多一個name對應的集合的並集

sunionstore(dest,keys, *args)

1
# 獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中

sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)

1
# 同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大
  •  有序集合,在集合的基礎上,爲每元素排序;元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。

 

zadd(name, *args, **kwargs)

1
2
3
4
5
# 在name對應的有序集合中添加元素
# 如:
      # zadd('zz', 'n1', 1, 'n2', 2)
      # 或
      # zadd('zz', n1=11, n2=22)

zcard(name)

1
# 獲取name對應的有序集合元素的數量

zcount(name, min, max)

1
# 獲取name對應的有序集合中分數 在 [min,max] 之間的個數

zincrby(name, value, amount)

1
# 自增name對應的有序集合的 name 對應的分數

r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 按照索引範圍獲取name對應的有序集合的元素
 
# 參數:
     # name,redis的name
     # start,有序集合索引發始位置(非分數)
     # end,有序集合索引結束位置(非分數)
     # desc,排序規則,默認按照分數從小到大排序
     # withscores,是否獲取元素的分數,默認只獲取元素的值
     # score_cast_func,對分數進行數據轉換的函數
 
# 更多:
     # 從大到小排序
     # zrevrange(name, start, end, withscores=False, score_cast_func=float)
 
     # 按照分數範圍獲取name對應的有序集合的元素
     # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
     # 從大到小排序
     # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

zrank(name, value)

1
2
3
4
# 獲取某個值在 name對應的有序集合中的排行(從 0 開始)
 
# 更多:
     # zrevrank(name, value),從大到小排序

zrangebylex(name, min, max, start=None, num=None)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 當有序集合的全部成員都具備相同的分值時,有序集合的元素會根據成員的 值 (lexicographical ordering)來進行排序,而這個命令則能夠返回給定的有序集合鍵 key 中, 元素的值介於 min 和 max 之間的成員
# 對集合中的每一個成員進行逐個字節的對比(byte-by-byte compare), 並按照從低到高的順序, 返回排序後的集合成員。 若是兩個字符串有一部份內容是相同的話, 那麼命令會認爲較長的字符串比較短的字符串要大
 
# 參數:
     # name,redis的name
     # min,左區間(值)。 + 表示正無限; - 表示負無限; ( 表示開區間; [ 則表示閉區間
     # min,右區間(值)
     # start,對結果進行分片處理,索引位置
     # num,對結果進行分片處理,索引後面的num個元素
 
# 如:
     # ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga
     # r.zrangebylex('myzset', "-", "[ca") 結果爲:['aa', 'ba', 'ca']
 
# 更多:
     # 從大到小排序
     # zrevrangebylex(name, max, min, start=None, num=None)

zrem(name, values)

1
2
3
# 刪除name對應的有序集合中值是values的成員
 
# 如:zrem('zz', ['s1', 's2'])

zremrangebyrank(name, min, max)

1
# 根據排行範圍刪除

zremrangebyscore(name, min, max)

1
# 根據分數範圍刪除

zremrangebylex(name, min, max)

1
# 根據值返回刪除

zscore(name, value)

1
# 獲取name對應有序集合中 value 對應的分數

zinterstore(dest, keys, aggregate=None)

1
2
# 獲取兩個有序集合的交集,若是遇到相同值不一樣分數,則按照aggregate進行操做
# aggregate的值爲:  SUM  MIN  MAX

zunionstore(dest, keys, aggregate=None)

1
2
# 獲取兩個有序集合的並集,若是遇到相同值不一樣分數,則按照aggregate進行操做
# aggregate的值爲:  SUM  MIN  MAX

zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)

1
# 同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做
  • 其餘經常使用操做

delete(*names)

1
# 根據刪除redis中的任意數據類型

exists(name)

1
# 檢測redis的name是否存在

keys(pattern='*')

1
2
3
4
5
6
7
# 根據模型獲取redis的name
 
# 更多:
     # KEYS * 匹配數據庫中全部 key 。
     # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
     # KEYS h*llo 匹配 hllo 和 heeeeello 等。
     # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

expire(name ,time)

1
# 爲某個redis的某個name設置超時時間

rename(src, dst)

1
# 對redis的name重命名爲

move(name, db))

1
# 將redis的某個值移動到指定的db下

randomkey()

1
# 隨機獲取一個redis的name(不刪除)

type(name)

1
# 獲取name對應值的類型

scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)

1
# 同字符串操做,用於增量迭代獲取key

四、管道

redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。

import redis,time

pool = redis.ConnectionPool(host='192.168.69.102', port=6379)
r = redis.Redis(connection_pool=pool)
pipe = r.pipeline(transaction=True)

pipe.set('name1', 'chris1')
time.sleep(20)
pipe.set('role1', 'student1')

pipe.execute()

五、發佈訂閱

pymsql操做

Python操做MySQL主要使用兩種方式:

  • 原生模塊 pymsql
  • ORM框架 SQLAchemy

下載安裝

1
pip3 install pymysql

使用操做

一、執行SQL

import pymysql

# 建立鏈接
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='servyou', db='servyoudb')
# 建立遊標
cursor = conn.cursor()
# 執行SQL,並返回收影響行數,executemany一次插入多條數據,數據用[]括起來
# effect_row = cursor.executemany("insert into students(name,sex,age)values(%s,%s,%s)",[('chris1','M',22)])

# 執行SQL,並返回收影響行數,插入單條數據
# effect_row1 = cursor.execute("insert into students(name,sex,age)values(%s,%s,%s)",('chris2','M',22))

cursor.execute("select * from students")

# 獲取第一行數據
# result  = cursor.fetchone()
# 獲取前n行數據
# result  = cursor.fetchmany(3)
# 獲取全部數據
result  = cursor.fetchall()
print(result)
#  提交,否則沒法保存新建或者修改的數據
conn.commit()
# 關閉遊標
cursor.close()
# 關閉鏈接
conn.close()

SQLAlchemy ORM

ORM英文全稱object relational mapping,就是對象映射關係程序,簡單來講咱們相似python這種面向對象的程序來講一切皆對象,可是咱們使用的數據庫卻都是關係型的,爲了保證一致的使用習慣,經過orm將編程語言的對象模型和數據庫的關係模型創建映射關係,這樣咱們在使用編程語言對數據庫進行操做的時候能夠直接使用編程語言的對象模型進行操做就能夠了,而不用直接使用sql語言。

在Python中,最有名的ORM框架是SQLAlchemy。

1、安裝sqlalchemy

pip install SQLAlchemy
pip install pymysql    #用pymysql與sqlalchemy交互

2、sqlalchemy使用

filter和filter_by使用總結:
filter 能夠像寫 sql 的 where 條件那樣寫 > < 等條件
filter用類名.屬性名,比較用==,filter_by直接用屬性名,比較用=

filter_by的參數是**kwargs,直接支持組合查詢。
filter不支持組合查詢,只能連續調用filter來變相實現。
  • 建立表結構
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,INTEGER,String

engine = create_engine("mysql+pymysql://root:servyou@localhost/servyoudb",
                       encoding = 'utf-8',echo = True)
Base = declarative_base()#生成orm基類

class User(Base):
    __tablename__ = 'user' # 表名
    id = Column(INTEGER,primary_key=True)
    name = Column(String(32))
    password = Column(String(64))

Base.metadata.create_all(engine)   #建立表結構
  •  插入數據
# 建立數據
session_class = sessionmaker(bind=engine) #建立與數據庫的會話session class ,注意,這裏返回給session的是個class,不是實例
session = session_class()

data1 = User(name='chris1',password='123') #生成要建立的數據對象
data2 = User(name='jack1',password='servyou')

session.add(data1)  # 把要建立的數據對象添加到這個session裏,一會統一建立
session.add(data2)

session.commit()  # 提交,建立數據
  •  查詢
# 獲取第一條數據
result = session.query(User).filter_by(name='chris').first()
print(result.id,result.name,result.password)

# 獲取全部數據
result2 = session.query(User.id,User.name).all()
print(result2)

#多條件查詢
result3 = session.query(User.id,User.name).filter(User.id>0).filter(User.id<7).all()
print(result3)

print(session.query(User).filter_by(id=1, name='chris_new').all())
  • 其餘
# 排序
ret = session.query(User).order_by(User.age.desc()).all()
print(ret)
ret2 = session.query(User).order_by(User.age.desc(), User.id.asc()).all()

# 通配符
# ret = session.query(User).filter(User.name.like('e%')).all()  
ret = session.query(User).filter(~User.name.like('e%')).all()  # 非e%的
print(ret)

#條件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
session.query(User).filter(User.name != 'chris')
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()

# 分組
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 連表
ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
ret = session.query(Person).join(Favor).all()
ret = session.query(Person).join(Favor, isouter=True).all()

# 組合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()
View Code
  • 修改
# 先查詢出來一條數據,在賦值便可修改
data = session.query(User).filter_by(name='chris').first()
print(data)
data.name = 'chris_new'
session.commit()

session.query(User).filter(User.age > 50).update({"name" : "099"})
session.query(User).filter(User.age > 50).update({User.name: User.name + "099"}, synchronize_session=False)
session.query(User).filter(User.age > 50).update({"age": User.age + 1}, synchronize_session="evaluate")
session.commit()
  • 刪除
session.query(Users).filter(Users.id > 2).delete()
session.commit()
  • 回滾
fake_user = User(name='Rain', password='12345')
session.add(fake_user)

session.query(User).filter(User.name.in_(['Jack', 'rain'])).all()  # 這時看session裏有你剛添加和修改的數據
session.rollback()  # 此時rollback一下
  • 統計和分組
#統計
print(session.query(User).filter(User.name.like("Ch%")).count())
#分組
from sqlalchemy import func
print(session.query(func.count(User.name),User.name).group_by(User.name).all())
打印原生sql:print(session.query(func.count(User.name),User.name).group_by(User.name))
至關於原生sql爲
SELECT count(user.name) AS count_1, user.name AS user_name
FROM user GROUP BY user.name
  • 外鍵關聯
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,INTEGER,String,DATE,ForeignKey
from sqlalchemy.orm import relationship

engine = create_engine("mysql+pymysql://root:servyou@localhost/servyoudb?charset=utf8")
Base = declarative_base()  # 生成orm基類

class Student(Base):
    __tablename__ = 'student' # 表名
    id = Column(INTEGER,primary_key=True)
    name = Column(String(32),nullable=False)
    date = Column(DATE,nullable=False)

    def __repr__(self):
        return '<id:%s name:%s>' %(self.id,self.name)

class  StudyRecord(Base):
    __tablename__ = 'study_record'
    id = Column(INTEGER, primary_key=True)
    day = Column(INTEGER, nullable=False)
    status = Column(String(32), nullable=False)
    stu_id = Column(INTEGER, ForeignKey('student.id'))

    student = relationship("Student", backref="my_record")  # 容許你在user表裏經過backref字段反向查出全部它在addresses表裏的關聯項

    def __repr__(self):
        return "<(%s day:%s status:%s)>" % (self.student.name,self.day,self.status)

Base.metadata.create_all(engine)   #建立表結構
orm_m2.py
from sqlalchemy.orm import sessionmaker
from day8 import orm_m2

session_class = sessionmaker(bind=orm_m2.engine)
session = session_class()

#經過student對象反查關聯study_record的記錄
ret = session.query(orm_m2.Student).filter(orm_m2.Student.name=='chris').first()
print(ret.my_record)

# s1 = orm_m2.Student(name='chris',date='2017-07-01')
# s2 = orm_m2.Student(name='eric',date='2016-06-01')
# s3 = orm_m2.Student(name='jack',date='2017-07-11')
#
# r1 = orm_m2.StudyRecord(day=1,status='yes',stu_id=1)
# r2 = orm_m2.StudyRecord(day=2,status='no',stu_id=1)
# r3 = orm_m2.StudyRecord(day=3,status='yes',stu_id=1)
# r4 = orm_m2.StudyRecord(day=1,status='yes',stu_id=2)
#
# session.add_all([s1,s2,s3,r1,r2,r3,r4])
session.commit()
orm_api.py
  • 多外鍵關聯
from sqlalchemy import Integer, ForeignKey, String, Column,create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship,sessionmaker

engine = create_engine("mysql+pymysql://root:servyou@localhost/servyoudb?charset=utf8")
Base = declarative_base()  # 生成orm基類

class Customer(Base):
    __tablename__ = 'customer'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))

    billing_address_id = Column(Integer, ForeignKey("address.id"))
    shipping_address_id = Column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address", foreign_keys=[billing_address_id])
    shipping_address = relationship("Address", foreign_keys=[shipping_address_id])

    def __repr__(self):
        return self.name,self.billing_address

class Address(Base):
    __tablename__ = 'address'
    id = Column(Integer, primary_key=True)
    street = Column(String(32))
    city = Column(String(32))
    state = Column(String(32))

    def __repr__(self):
        return self.street

Base.metadata.create_all(engine)   #建立表結構

session_class = sessionmaker(bind=engine)
session = session_class()

obj = session.query(Customer).filter(Customer.name=='克里斯').first()
print(obj.name,obj.billing_address,obj.shipping_address)

# addr1 = Address(street='浦沿',city='濱江',state='杭州')
# addr2 = Address(street='城站',city='江干',state='杭州')
# addr3 = Address(street='冰雪大世界',city='柯橋',state='紹興')
# session.add_all([addr1,addr2,addr3])
#
#
# c1 = Customer(name='克里斯',billing_address=addr1,shipping_address=addr2)
# c2 = Customer(name='艾瑞克',billing_address=addr3,shipping_address=addr3)
# session.add_all([c1,c2])

session.commit()
多外鍵
  • 多對多關係

如今來設計一個能描述「圖書」與「做者」的關係的表結構,需求是

  1. 一本書能夠有好幾個做者一塊兒出版
  2. 一個做者能夠寫好幾本書

表結構設計以下:

經過book_m2m_author表完成了book表和author表以前的多對多關聯; 

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#  Author fuyf

from sqlalchemy import Table, Column, Integer,String,DATE, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

book_m2m_author = Table('book_m2m_author', Base.metadata,
                        Column('book_id',Integer,ForeignKey('books.id')),
                        Column('author_id',Integer,ForeignKey('authors.id')),
                        )

class Book(Base):
    __tablename__ = 'books'
    id = Column(Integer,primary_key=True)
    name = Column(String(64))
    pub_date = Column(DATE)
    authors = relationship('Author',secondary=book_m2m_author,backref='books')

    def __repr__(self):
        return self.name

class Author(Base):
    __tablename__ = 'authors'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))

    def __repr__(self):
        return self.name

engine = create_engine('mysql+pymysql://root:servyou@localhost/servyoudb?charset=utf8')
Base.metadata.create_all(engine)

session_class = sessionmaker(bind=engine)
session = session_class()

a1 = Author(name='克里斯')
a2 = Author(name='艾瑞克')
a3 = Author(name='傑克')

b1 = Book(name='跟chris學python')
b2 = Book(name='跟chris學linux')
b3 = Book(name='跟chris學吉他')

b1.authors = [a1,a2,a3]
b2.authors = [a1,a3]

session.add_all([a1,a2,a3,b1,b2,b3])
session.commit()

此時,手動連上mysql,分別查看這3張表,發現book_m2m_author中自動建立了多條紀錄用來鏈接book和author表

mysql> select * from book_m2m_author order by book_id,author_id;
+---------+-----------+
| book_id | author_id |
+---------+-----------+
|       1 |         1 |
|       1 |         2 |
|       1 |         3 |
|       2 |         1 |
|       2 |         3 |
+---------+-----------+
5 rows in set (0.00 sec)

mysql> select * from authors;
+----+-----------+
| id | name      |
+----+-----------+
|  1 | 克里斯    |
|  2 | 艾瑞克    |
|  3 | 傑克      |
+----+-----------+
3 rows in set (0.00 sec)

mysql> select * from books;
+----+-------------------+----------+
| id | name              | pub_date |
+----+-------------------+----------+
|  1 | 跟chris學python   | NULL     |
|  2 | 跟chris學linux    | NULL     |
|  3 | 跟chris學吉他     | NULL     |
+----+-------------------+----------+
3 rows in set (0.00 sec)

mysql> select * from book_m2m_author order by book_id;
+---------+-----------+
| book_id | author_id |
+---------+-----------+
|       1 |         2 |
|       1 |         1 |
|       1 |         3 |
|       2 |         1 |
|       2 |         3 |
+---------+-----------+
5 rows in set (0.00 sec)

 查詢:

#查詢
print('--------經過書表查關聯的做者---------')
book_obj = session.query(Book).filter(Book.name=='跟chris學python').first()
print(book_obj.name,book_obj.authors)

print('--------經過做者表查關聯的書---------')
author_obj = session.query(Author).filter(Author.name=='克里斯').all()
print(author_obj[0].name,author_obj[0].books)

#輸出
--------經過書表查關聯的做者---------
跟chris學python [克里斯, 艾瑞克, 傑克]
--------經過做者表查關聯的書---------
克里斯 [跟chris學python, 跟chris學linux]

多對多刪除

刪除數據時不用管boo_m2m_authors , sqlalchemy會自動幫你把對應的數據刪除

  •  經過書刪除做者
author_obj = session.query(Author).filter_by(name="傑克").first()

book_obj = session.query(Book).filter_by(name="跟chris學linux").first()

book_obj.authors.remove(author_obj)  # 從一本書裏刪除一個做者
session.commit()
  • 直接刪除做者 

刪除做者時,會把這個做者跟全部書的關聯數據也自動刪除

author_obj =session.query(Author).filter_by(name="克里斯").first()
# print(author_obj.name , author_obj.books)
session.delete(author_obj)
session.commit()

處理中文

sqlalchemy設置編碼字符集必定要在數據庫訪問的URL上增長charset=utf8,不然數據庫的鏈接就不是utf8的編碼格式

engine = create_engine('mysql+pymysql://root:servyou@localhost/servyoudb?charset=utf8',echo=True)

練習:學員管理系統

學員管理系統:
相關文章
相關標籤/搜索