nginx+flask+gevent+uwsgi實現websocket

Websocket簡介

WebSocket是HTML5開始提供的一種在單個 TCP 鏈接上進行全雙工通信的協議。在WebSocket API中,瀏覽器和服務器只須要作一個握手的動做,而後,瀏覽器和服務器之間就造成了一條快速通道。二者之間就直接能夠數據互相傳送。瀏覽器經過 JavaScript向服務器發出創建 WebSocket 鏈接的請求,鏈接創建之後,客戶端和服務器端就能夠經過 TCP 鏈接直接交換數據。python

Websocket自己是一個持久化的新協議,和http沒有很大的關係,但websocket是基於http協議的,借用http協議完成了部分握手。linux

咱們在某個linux服務器上部署了一個websocket,而後咱們發起鏈接請求,看下請求發送的內容,對比下http請求和websocket請求的不一樣:nginx

      image.png

能夠看到,主要的不一樣點,Connection: Upgrade;Upgrade: websocket,還有3個Sec開頭的參數。前兩個參數是用來告訴nginx服務,我發起的是websocket協議,而不是http,後三個參數是用來校驗websocket服務是不是websocket的,以及校驗websocket協議的版本。web

在消息發出後,websocket服務端返回一個response:flask

Connection:upgrade後端

Date:Thu, 26 Oct 2017 06:01:51 GMT瀏覽器

Sec-WebSocket-Accept:3P+uxAGozJzj3vcr89gxFzydOu8=服務器

Sec-WebSocket-Origin:http://10.43.35.31:2327websocket

Server:nginx/1.12.0架構

Upgrade:WebSocket

當收到服務端返回,證實協議已經轉換成功,後面合http再無關係了,而Sec開頭的是證實我是websocket。

爲何使用websocket

沒有websocket的時候,經常使用的實時同步數據的方式通常有兩種:一、輪詢,即每隔必定的時間客戶端就向服務端發起一次請求,客戶端根據返回數據更新界面,這須要服務器有很快的處理速度和資源。二、長連接,即客戶端向服務端發送查詢請求,若是服務端無數據,則一直阻塞,直到有數據返回,返回後客戶端再重複此過程,這種須要很高的併發。

這兩種方式都須要不斷的創建http鏈接,服務端不能主動推送數據給客戶端,比較被動,比較耗資源,好比屢次請求的HTTP頭部,TCP鏈接複用會致使的線頭阻塞(即因爲前面數據丟失致使的後續數據堆積)。

Websocket很好的解決了上面的問題:只須要在創建鏈接的時候發送一次http請求,後面的交互都不須要再發了;WebSocket的鏈接是雙向通訊的鏈接,在同一個TCP鏈接上,既能夠發送,也能夠接收;幾個不一樣的URI能夠複用同一個WebSocket鏈接。

Websocket基礎架構及實現

 本次websocket的搭建考慮性能問題,採起的是flask+python+uwsgi+gevent+nginx的基礎架構。採用uwsgi+gevent的模式啓動websocket進程,爲gevent指定併發爲100,web框架使用小而輕的Flask,使用virtualenv建立虛擬環境,保證websocket不影響主工程,且能夠單獨發佈部署,本進程的依賴爲Flask,gevent和uwsgi。

Nginx須要分別爲http和https增長配置

image.png

上面是nginx的配置,nginx是從1.3開始支持websocket的,upgrade將http鏈接升級到websocket鏈接,Upgrade機制使用了Upgrade協議頭和Connection協議頭,所以當代理服務器攔截到來自客戶端的Upgrade請求時,代理服務器須要將本身的Upgrade請求發送給後端服務器,包括適合的請求頭。Nginx經過在客戶端和後端服務器之間創建隧道來支持WebSockets通訊。爲了讓Nginx能夠未來自客戶端的Upgrade請求發送到後端服務器,Upgrade和Connection的頭信息必須被顯式的設置。

Webserver採用的是Flask,下面簡單介紹下本工程代碼實現,包含幾個部分:

一、客戶端部分,客戶端採用

ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/websocket');或

ws = new WebSocket('wss://' + document.domain + ':' + location.port + '/websocket');來創建鏈接,並監聽onopen,onmessage,onclose事件。

二、服務端部分

首先建立一個flask的app,而後傳入GeventWebSocket,獲得一個websocket的對象。

app = Flask(__name__)
ws = GeventWebSocket(app)

GeventWebSocket繼承了websocket,重寫了init_app,由於要使用gevent,重寫主要是爲了打上猴子補丁,還替換了websocket中間件,也是爲了適配gevent。

class GeventWebSocket(WebSocket):
    middleware = GeventWebSocketMiddleware

    def init_app(self, app):
        logger.info("init_app")
        aggressive = app.config.get('UWSGI_WEBSOCKET_AGGRESSIVE_PATCH', True)
        patch_all(aggressive=aggressive)
        super(GeventWebSocket, self).init_app(app)

Websocket模塊主要爲了按要求啓動websocket服務端,主要的3個函數:

route:主要是爲view添加url規則;

def route(self, rule, **options):
    def decorator(f):
        endpoint = options.pop('endpoint', None)
        self.add_url_rule(rule, endpoint, f, **options)
        return f
    return decorator

init_app:修改flask的app的屬性,使在啓動app時能以uwsgi啓動,並重寫wsgi_app,供uwsgi進行調用。

def init_app(self, app):
    self.app = app
    app.wsgi_app = self.middleware(app.wsgi_app, self)
    app.run = lambda **kwargs: self.run(**kwargs)

run:以uwsgi方式啓動app,並指定相關啓動函數。

def run(self, app=None, debug=False, host='127.0.0.1', port=5000, uwsgi_binary=None, **kwargs):
    if not app:
        app = self.app.name + ':app'

    if self.app.debug:
        debug = True
    run_uwsgi(app, debug, host, port, uwsgi_binary, **kwargs)

中間件替換爲了子類的中間件,主要是提供了wsgi_app供uwsgi進行調用。wsgi server工做流程以下:服務器建立socket,監聽端口,等待客戶端鏈接;當有請求來時,服務器解析客戶端信息放到環境變量environ中,並調用綁定的handler來處理請求;handler解析這個http請求,將請求信息例如method,path等放到environ中;wsgi handler再將一些服務器端信息也放到environ中,最後服務器信息,客戶端信息,本次請求信息所有都保存到了環境變量environ中;wsgi handler 調用註冊的wsgi app,並將environ和回調函數傳給wsgi app;wsgi app 將reponse header/status/body 回傳給wsgi handler;handler仍是經過socket將response信息塞回給客戶端。

Wsgi app先經過environ獲取到請求的url,而後定位到具體調用的函數,

urls = self.websocket.url_map.bind_to_environ(environ)
try:
    endpoint, args = urls.match()
    handler = self.websocket.view_functions[endpoint]
except HTTPException:
    handler = None

而後,根據environ獲取websocket_key,以後與客戶端完成握手,

uwsgi.websocket_handshake(environ['HTTP_SEC_WEBSOCKET_KEY'],
                          environ.get('HTTP_ORIGIN', ''))

而後建立基於gevent的發送事件、隊列和監聽事件、隊列;而後建立client來操控這兩個隊列完成發送和接收。

send_event = Event()
send_queue = Queue()
recv_event = Event()
recv_queue = Queue()

client = self.client(environ, uwsgi.connection_fd(), send_event,
                     send_queue, recv_event, recv_queue,
                     self.websocket.timeout)

而後啓動協程,建立發送消息的tcp鏈路,

handler = spawn(handler, client, **args)
def listener(client):
    select([client.fd], [], [], client.timeout)
    recv_event.set()
listening = spawn(listener, client)

進入等待,一旦監聽到發送或接收事件,則調用uwsgi方法對消息進行處理,若是handler完成或者被殺死,則終止監聽,

wait([handler, send_event, recv_event], None, 1)
# handle send events
if send_event.is_set():
    try:
        while True:
            uwsgi.websocket_send(send_queue.get_nowait())
    except Empty:
        send_event.clear()
    except IOError:
        client.connected = False

# handle receive events
elif recv_event.is_set():
    recv_event.clear()
    try:
        message = True
        while message:
            message = uwsgi.websocket_recv_nb()
            recv_queue.put(message)
        listening = spawn(listener, client)
    except IOError:
        client.connected = False

# handler done, we're outta here
elif handler.ready():
    listening.kill()
    return ''

這裏面使用的client實例主要就實現了對於隊列的操做和事件狀態的變動,

def send(self, msg, binary=True):
    if binary:
        return self.send_binary(msg)
    self.send_queue.put(msg)
    self.send_event.set()

 

def receive(self):
    return self.recv_queue.get()

 

def close(self):
    self.connected = False

而後就是app裏使用websocket的方法了,我在全局生命了一個client的list,用來存儲已鏈接的wensocket對象,每次創建鏈接,都會將新的鏈接對象存進去,

client = []

@ws.route('/websocket')
def start(channel):
    global client
    client.append(channel)
    channel.send('websocket is running')
    while True:
        channel.receive()

又起了一個view,定義爲post,接受咱們本身工程的消息上報,每次消息上報只上報正在鏈接的對象,已經不鏈接的從列表刪除,

@app.route('/send', methods=['POST'])
def send_msg():
    message = request.data
    send_all_client(message)
    delete_close_client()
    return 'SUCCESS'

全部須要上報的消息,統一走這個rest接口通知就能夠,app會將消息上報到每一個鏈接的客戶端,最後爲app指定gevent線程數和監聽端口,

if __name__ == '__main__':
    app.run(gevent=100, port=5000)

每次經過python app.py啓動工程便可,代碼會自動拼裝啓動命令,以下:

/home/ngomm/websocket/ENV/bin/uwsgi --http 127.0.0.1:5000 --http-websockets --virtualenv /home/ngomm/websocket/ENV --gevent 100 --master --wsgi app:app

由於但願能夠獨立部署,因此爲工程建立了虛擬環境,該工程依賴的包爲Flask (0.12.2),gevent (1.1.2),uWSGI (2.0.11.2)。

相關文章
相關標籤/搜索