異步IO\數據庫\隊列\緩存

Select\Poll\Epoll異步IO 

select 多併發socket 例子

 1 import select
 2 import socket
 3 import sys
 4 import queue
 5 
 6 
 7 server = socket.socket()
 8 server.setblocking(0)
 9 
10 server_addr = ('localhost',10000)
11 
12 print('starting up on %s port %s' % server_addr)
13 server.bind(server_addr)
14 
15 server.listen(5)
16 
17 
18 inputs = [server, ] #本身也要監測呀,由於server自己也是個fd
19 outputs = []
20 
21 message_queues = {}
22 
23 while True:
24     print("waiting for next event...")
25 
26     readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏
27 
28     for s in readable: #每一個s就是一個socket
29 
30         if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了,
31             #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀
32             #新鏈接進來了,接受這個鏈接
33             conn, client_addr = s.accept()
34             print("new connection from",client_addr)
35             conn.setblocking(0)
36             inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接
37             #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到
38             #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的
39 
40             message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送
41 
42         else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了
43             #客戶端的數據過來了,在這接收
44             data = s.recv(1024)
45             if data:
46                 print("收到來自[%s]的數據:" % s.getpeername()[0], data)
47                 message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端
48                 if s not  in outputs:
49                     outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端
50 
51 
52             else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀
53                 print("客戶端斷開了",s)
54 
55                 if s in outputs:
56                     outputs.remove(s) #清理已斷開的鏈接
57 
58                 inputs.remove(s) #清理已斷開的鏈接
59 
60                 del message_queues[s] ##清理已斷開的鏈接
61 
62 
63     for s in writeable:
64         try :
65             next_msg = message_queues[s].get_nowait()
66 
67         except queue.Empty:
68             print("client [%s]" %s.getpeername()[0], "queue is empty..")
69             outputs.remove(s)
70 
71         else:
72             print("sending msg to [%s]"%s.getpeername()[0], next_msg)
73             s.send(next_msg.upper())
74 
75 
76     for s in exeptional:
77         print("handling exception for ",s.getpeername())
78         inputs.remove(s)
79         if s in outputs:
80             outputs.remove(s)
81         s.close()
82 
83         del message_queues[s]
server
 1 import socket
 2 import sys
 3 
 4 messages = [ b'This is the message. ',
 5              b'It will be sent ',
 6              b'in parts.',
 7              ]
 8 server_address = ('localhost', 10000)
 9 
10 # Create a TCP/IP socket
11 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
12           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
13           ]
14 
15 # Connect the socket to the port where the server is listening
16 print('connecting to %s port %s' % server_address)
17 for s in socks:
18     s.connect(server_address)
19 
20 for message in messages:
21 
22     # Send messages on both sockets
23     for s in socks:
24         print('%s: sending "%s"' % (s.getsockname(), message) )
25         s.send(message)
26 
27     # Read responses on both sockets
28     for s in socks:
29         data = s.recv(1024)
30         print( '%s: received "%s"' % (s.getsockname(), data) )
31         if not data:
32             print(sys.stderr, 'closing socket', s.getsockname() )
client

selectors模塊html

 1 import selectors
 2 import socket
 3  
 4 sel = selectors.DefaultSelector()
 5  
 6 def accept(sock, mask):
 7     conn, addr = sock.accept()  # Should be ready
 8     print('accepted', conn, 'from', addr)
 9     conn.setblocking(False)
10     sel.register(conn, selectors.EVENT_READ, read)
11  
12 def read(conn, mask):
13     data = conn.recv(1000)  # Should be ready
14     if data:
15         print('echoing', repr(data), 'to', conn)
16         conn.send(data)  # Hope it won't block
17     else:
18         print('closing', conn)
19         sel.unregister(conn)
20         conn.close()
21  
22 sock = socket.socket()
23 sock.bind(('localhost', 10000))
24 sock.listen(100)
25 sock.setblocking(False)
26 sel.register(sock, selectors.EVENT_READ, accept)
27  
28 while True:
29     events = sel.select()
30     for key, mask in events:
31         callback = key.data
32         callback(key.fileobj, mask)
selectors

堡壘機前戲

開發堡壘機以前,先來學習Python的paramiko模塊,該模塊機遇SSH用於鏈接遠程服務器並執行相關操做python

SSHClientredis

用於鏈接遠程服務器並執行基本命令服務器

基於用戶名密碼鏈接:併發

 1 import paramiko
 2   
 3 # 建立SSH對象
 4 ssh = paramiko.SSHClient()
 5 # 容許鏈接不在know_hosts文件中的主機
 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 7 # 鏈接服務器
 8 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123')
 9   
10 # 執行命令
11 stdin, stdout, stderr = ssh.exec_command('df')
12 # 獲取命令結果
13 result = stdout.read()
14   
15 # 關閉鏈接
16 ssh.close()
 1 import paramiko
 2 
 3 transport = paramiko.Transport(('hostname', 22))
 4 transport.connect(username='wupeiqi', password='123')
 5 
 6 ssh = paramiko.SSHClient()
 7 ssh._transport = transport
 8 
 9 stdin, stdout, stderr = ssh.exec_command('df')
10 print stdout.read()
11 
12 transport.close()

基於公鑰密鑰鏈接:app

 1 import paramiko
 2  
 3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
 4  
 5 # 建立SSH對象
 6 ssh = paramiko.SSHClient()
 7 # 容許鏈接不在know_hosts文件中的主機
 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 9 # 鏈接服務器
10 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)
11  
12 # 執行命令
13 stdin, stdout, stderr = ssh.exec_command('df')
14 # 獲取命令結果
15 result = stdout.read()
16  
17 # 關閉鏈接
18 ssh.close()
 1 import paramiko
 2 
 3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
 4 
 5 transport = paramiko.Transport(('hostname', 22))
 6 transport.connect(username='wupeiqi', pkey=private_key)
 7 
 8 ssh = paramiko.SSHClient()
 9 ssh._transport = transport
10 
11 stdin, stdout, stderr = ssh.exec_command('df')
12 
13 transport.close()
SSHClient 封裝 Transport
 1 import paramiko
 2 from io import StringIO
 3 
 4 key_str = """-----BEGIN RSA PRIVATE KEY-----
 5 MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8
 6 NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans
 7 H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e
 8 7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC
 9 tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP
10 c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A
11 ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+
12 c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh
13 IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8
14 S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz
15 zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6
16 01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC
17 OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl
18 HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq
19 UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65
20 lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA
21 539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM
22 WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH
23 C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8
24 RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg
25 9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/
26 pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj
27 98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw
28 DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI
29 +MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0=
30 -----END RSA PRIVATE KEY-----"""
31 
32 private_key = paramiko.RSAKey(file_obj=StringIO(key_str))
33 transport = paramiko.Transport(('10.0.1.40', 22))
34 transport.connect(username='wupeiqi', pkey=private_key)
35 
36 ssh = paramiko.SSHClient()
37 ssh._transport = transport
38 
39 stdin, stdout, stderr = ssh.exec_command('df')
40 result = stdout.read()
41 
42 transport.close()
43 
44 print(result)
基於私鑰字符串進行鏈接

SFTPClientssh

用於鏈接遠程服務器並執行上傳下載異步

基於用戶名密碼上傳下載socket

 1 import paramiko
 2  
 3 transport = paramiko.Transport(('hostname',22))
 4 transport.connect(username='wupeiqi',password='123')
 5  
 6 sftp = paramiko.SFTPClient.from_transport(transport)
 7 # 將location.py 上傳至服務器 /tmp/test.py
 8 sftp.put('/tmp/location.py', '/tmp/test.py')
 9 # 將remove_path 下載到本地 local_path
10 sftp.get('remove_path', 'local_path')
11  
12 transport.close()

基於公鑰密鑰上傳下載ide

 1 import paramiko
 2  
 3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
 4  
 5 transport = paramiko.Transport(('hostname', 22))
 6 transport.connect(username='wupeiqi', pkey=private_key )
 7  
 8 sftp = paramiko.SFTPClient.from_transport(transport)
 9 # 將location.py 上傳至服務器 /tmp/test.py
10 sftp.put('/tmp/location.py', '/tmp/test.py')
11 # 將remove_path 下載到本地 local_path
12 sftp.get('remove_path', 'local_path')
13  
14 transport.close()

RabbitMQ隊列  

安裝 rabbitMA

http://www.cnblogs.com/ericli-ericli/p/5902270.html

http://blog.csdn.net/zyz511919766/article/details/41946521

Work Queues

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3         host='localhost'))
 4 channel = connection.channel()
 5  
 6 channel.queue_declare(queue='hello')
 7  
 8 channel.basic_publish(exchange='',
 9                       routing_key='hello',
10                       body='Hello World!')
11 print(" [x] Sent 'Hello World!'")
12 connection.close()
生產者
 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3         host='localhost'))
 4 channel = connection.channel()
 5  
 6 channel.queue_declare(queue='hello')
 7  
 8 def callback(ch, method, properties, body):
 9     print(" [x] Received %r" % body)
10  
11 channel.basic_consume(callback,
12                       queue='hello',
13                       no_ack=True)
14  
15 print(' [*] Waiting for messages. To exit press CTRL+C')
16 channel.start_consuming()
消費者

一、acknowledgment 消息不丟失

no-ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4         host='10.211.55.4'))
 5 channel = connection.channel()
 6 
 7 channel.queue_declare(queue='hello')
 8 
 9 def callback(ch, method, properties, body):
10     print(" [x] Received %r" % body)
11     import time
12     time.sleep(10)
13     print 'ok'
14     ch.basic_ack(delivery_tag = method.delivery_tag)
15 
16 channel.basic_consume(callback,
17                       queue='hello',
18                       no_ack=False)
19 
20 print(' [*] Waiting for messages. To exit press CTRL+C')
21 channel.start_consuming()
消費者

二、durable   消息不丟失

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()
生產者
 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
 4 channel = connection.channel()
 5 
 6 # make message persistent
 7 channel.queue_declare(queue='hello', durable=True)
 8 
 9 
10 def callback(ch, method, properties, body):
11     print(" [x] Received %r" % body)
12     import time
13     time.sleep(10)
14     print 'ok'
15     ch.basic_ack(delivery_tag = method.delivery_tag)
16 
17 channel.basic_consume(callback,
18                       queue='hello',
19                       no_ack=False)
20 
21 print(' [*] Waiting for messages. To exit press CTRL+C')
22 channel.start_consuming()
消費者

三、消息獲取順序

默認消息隊列裏的數據是按照順序被消費者拿走,可是在消費者端,配置prefetch_count=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消費者

發佈訂閱

發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

 exchange type = fanout

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='logs',
 9                          type='fanout')
10 
11 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
12 channel.basic_publish(exchange='logs',
13                       routing_key='',
14                       body=message)
15 print(" [x] Sent %r" % message)
16 connection.close()
發佈者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
訂閱者

有選擇的接收消息

 

 exchange type = direct

以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10 
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13 
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
17     sys.exit(1)
18 
19 for severity in severities:
20     channel.queue_bind(exchange='direct_logs',
21                        queue=queue_name,
22                        routing_key=severity)
23 
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32 
33 channel.start_consuming()
消費者
 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10 
11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14                       routing_key=severity,
15                       body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()
生產者

模糊匹配

 

exchange type = topic

在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。

  • # 表示能夠匹配 0 個 或 多個 單詞
  • *  表示只能匹配 一個 單詞
  • 1 發送者路由值              隊列中
    2 old.boy.python          old.*  -- 不匹配
    3 old.boy.python          old.#  -- 匹配
     1 import pika
     2 import sys
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='localhost'))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='topic_logs',
     9                          type='topic')
    10 
    11 result = channel.queue_declare(exclusive=True)
    12 queue_name = result.method.queue
    13 
    14 binding_keys = sys.argv[1:]
    15 if not binding_keys:
    16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    17     sys.exit(1)
    18 
    19 for binding_key in binding_keys:
    20     channel.queue_bind(exchange='topic_logs',
    21                        queue=queue_name,
    22                        routing_key=binding_key)
    23 
    24 print(' [*] Waiting for logs. To exit press CTRL+C')
    25 
    26 def callback(ch, method, properties, body):
    27     print(" [x] %r:%r" % (method.routing_key, body))
    28 
    29 channel.basic_consume(callback,
    30                       queue=queue_name,
    31                       no_ack=True)
    32 
    33 channel.start_consuming()
    消費者
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()
    生產者

    memcached 

    http://www.cnblogs.com/wupeiqi/articles/5132791.html  

     

    redis 使用

    http://www.cnblogs.com/alex3714/articles/6217453.html  

相關文章
相關標籤/搜索