Python twisted事件驅動網絡框架 源碼剖析

1、Twisted簡介

  Twisted是一個事件驅動的網絡框架,其中包含了諸多功能,例如:網絡協議、線程、數據庫管理、網絡操做、電子郵件等。html

  事件驅動簡而言之,事件驅動分爲二個部分:第一,註冊事件;第二,觸發事件。react

Protocols數據庫

  Protocols描述瞭如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含以下的方法:服務器

  makeConnection    在transport對象和服務器之間創建一條鏈接
  connectionMade     鏈接創建起來後調用
  dataReceived       接收數據時調用
  connectionLost      關閉鏈接時調用網絡

Transports框架

  Transports表明網絡中兩個通訊結點之間的鏈接。Transports負責描述鏈接的細節,好比鏈接是面向流式的仍是面向數據報的,流控以及可靠性。TCP、UDP和Unix套接字可做爲transports的例子。它們被設計爲「知足最小功能單元,同時具備最大程度的可複用性」,並且從協議實現中分離出來,這讓許多協議能夠採用相同類型的傳輸。Transports實現了ITransports接口,它包含以下的方法:異步

  write                   以非阻塞的方式按順序依次將數據寫到物理鏈接上
  writeSequence  將一個字符串列表寫到物理鏈接上
  loseConnection   將全部掛起的數據寫入,而後關閉鏈接
  getPeer       取得鏈接中對端的地址信息
  getHost      取得鏈接中本端的地址信息socket

  將transports從協議中分離出來也使得對這兩個層次的測試變得更加簡單。能夠經過簡單地寫入一個字符串來模擬傳輸,用這種方式來檢查。函數

2、源碼分析

EchoServer:源碼分析

from twisted.internet import protocol
from twisted.internet import reactor        #reactor無限循環,寫好了事件,reactor自動檢測,相似於select
  
class Echo(protocol.Protocol):
    def dataReceived(self, data):       #只要twisted一收到數據,就會調用dataRecevied方法
        self.transport.write(data)      #把收到的數據返回給客戶端
  
def main():
    factory = protocol.ServerFactory()      #定義一個基類,相似於socketserver的handler上面一層的類
    factory.protocol = Echo     #相似於socketserver中的handler,必須定義此Echo,代表每個客戶端過來後都會調用Echo創建一個實例
  
    reactor.listenTCP(1234,factory)     #reactor相似於select,是一個觸發器,檢測1234端口,須要把定義的基礎類傳進來
    reactor.run()       #reactor執行
  
if __name__ == '__main__':
    main()

EchoClient:

from twisted.internet import reactor, protocol
  
  
# a client protocol
  
class EchoClient(protocol.Protocol):
    """Once connected, send a message, then print the result."""
  
    def connectionMade(self):       #只要鏈接一創建成功,就會自動調用此方法
        self.transport.write("hello!")      #給服務端發送hello 
  
    def dataReceived(self, data):       #當有數據收到時,就會調用這個方法,自動進行
        "As soon as any data is received, write it back."
        print "Server said:", data      #收到數據後打印數據
        self.transport.loseConnection()     #數據傳送完畢後,關閉鏈接,執行了下面的方法  |
#                                                                                     v
#                       ---------<--------------<-----------------<----------------<---
#                       |
#                       v
    def connectionLost(self, reason):       #client connection斷開了,會執行此方法,此爲本身定義的connectionLost方法
        print "connection lost"
  
class EchoFactory(protocol.ClientFactory):
    protocol = EchoClient       #在類中定義protocal,重寫這個類;EchoClient至關於socketserver中的handle方法
  
    def clientConnectionFailed(self, connector, reason):        #若是reactor鏈接不上服務端,自動調用這方法
        print "Connection failed - goodbye!"        #打印鏈接失敗信息
        reactor.stop()      #關閉鏈接
  
    def clientConnectionLost(self, connector, reason):      #若是client connection斷開了,會自動調用此方法,相似於socketserver的handle後面的finish方法,和上面的connectionLost方法不一樣。
        print "Connection lost - goodbye!"      #打印鏈接斷開信息
        reactor.stop()      #關閉鏈接
  
  
# this connects the protocol to a server running on port 8000
def main():
    f = EchoFactory()       #建立一個客戶端的基類,與服務端的ServerFactory相似
    reactor.connectTCP("localhost", 1234, f)        #鏈接'localhost',端口號,把客戶端的基類傳入reactor
    reactor.run()       #運行reactor
  
# this only runs if the module was *not* imported
if __name__ == '__main__':
    main()      #程序入口,進入主程序

  運行服務器端腳本將啓動一個TCP服務器,監聽端口1234上的鏈接。服務器採用的是Echo協議,數據經TCP transport對象寫出。運行客戶端腳本將對服務器發起一個TCP鏈接,回顯服務器端的迴應而後終止鏈接並中止reactor事件循環。這裏的 Factory用來對鏈接的雙方生成protocol對象實例。兩端的通訊是異步的,connectTCP負責註冊回調函數到reactor事件循環中,當socket上有數據可讀時通知回調處理。

3、RPC客戶端向服務端發送命令源碼分析:

RPC server:

#Project interpreter: 2.7
import pika, os, time

def operate(body):
    sys_result=os.popen(body).read()
    print("%s client execute \033[1;31;0m%s\033[0m result:\n%s" % (time.strftime('%Y-%m-%d %H:%M:%S'),body,sys_result))
    return sys_result

def on_request(ch, method, props, body):
    response = operate(body)
    ch.basic_publish(exchange='',   #basic_publish指向管道內發送數據
                     routing_key=props.reply_to,    #指定向哪一個隊列發數據
                     properties=pika.BasicProperties(correlation_id = props.correlation_id),
                     body=str(response))    #body是發送的消息內容
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))   #這是阻塞的鏈接
        channel = connection.channel()  #生成一個管道
        channel.queue_declare(queue='rpc_queue')    #在管道中建立一個隊列,名字叫rpc_queue
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(on_request, queue='rpc_queue')
        print("Server is waiting RPC requests...")
        channel.start_consuming()   #開始接收數據,阻塞狀態
    except KeyboardInterrupt:
        print("Connection lost...")

RPC client:

#Project interpreter: 2.7
import pika, uuid

class OperateRpcClient(object):   #對類進行實例化
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將此queue刪除
        self.callback_queue = result.method.queue   #服務端執行完結果返回的queue名字
        self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)    #no_ack不須要確認,若是爲False,當客戶端消費完成後,會給服務端發送確認消息;queue參數指定了收取消息隊列的名稱

    def on_response(self, ch, method, props, body): #回調方法
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.input = raw_input("root@client>> ")
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=self.input)
        while self.response is None:
            self.connection.process_data_events()   #不斷的去Queue裏面接收數據,並且不是阻塞的
        return self.response

if __name__ == '__main__':
    print("This program use rabbitMQ send your OS command to server, your can use common command at here, enjoy it!\n\te.g.\n\t\t1.ls\n\t\t2.pwd\n\t\t3.free -m\n\t\t4.df -Th\n\t\t5.netstat -anplute")
    while True:
        try:
            operate_rpc = OperateRpcClient()
            response = operate_rpc.call()
            print(response)
        except KeyboardInterrupt:
            print("Connection lost...")
相關文章
相關標籤/搜索