網上流傳的都是Python2的websocket實現html
# coding=utf8 # !/usr/bin/python import struct, socket import hashlib import threading, random import time import struct from base64 import b64encode, b64decode connectionlist = {} g_code_length = 0 g_header_length = 0 def hex2dec(string_num): return str(int(string_num.upper(), 16)) def get_datalength(msg): global g_code_length global g_header_length print(len(msg)) g_code_length = ord(msg[1]) & 127 received_length = 0; if g_code_length == 126: # g_code_length = msg[2:4] # g_code_length = (ord(msg[2])<<8) + (ord(msg[3])) g_code_length = struct.unpack('>H', str(msg[2:4]))[0] g_header_length = 8 elif g_code_length == 127: # g_code_length = msg[2:10] g_code_length = struct.unpack('>Q', str(msg[2:10]))[0] g_header_length = 14 else: g_header_length = 6 g_code_length = int(g_code_length) return g_code_length def parse_data(msg): global g_code_length g_code_length = ord(msg[1]) & 127 received_length = 0; if g_code_length == 126: g_code_length = struct.unpack('>H', str(msg[2:4]))[0] masks = msg[4:8] data = msg[8:] elif g_code_length == 127: g_code_length = struct.unpack('>Q', str(msg[2:10]))[0] masks = msg[10:14] data = msg[14:] else: masks = msg[2:6] data = msg[6:] i = 0 raw_str = '' for d in data: raw_str += chr(ord(d) ^ ord(masks[i % 4])) i += 1 print(u"總長度是:%d" % int(g_code_length)) return raw_str def sendMessage(message): global connectionlist message_utf_8 = message.encode('utf-8') for connection in connectionlist.values(): back_str = [] back_str.append('\x81') data_length = len(message_utf_8) if data_length <= 125: back_str.append(chr(data_length)) elif data_length <= 65535: back_str.append(struct.pack('b', 126)) back_str.append(struct.pack('>h', data_length)) # back_str.append(chr(data_length >> 8)) # back_str.append(chr(data_length & 0xFF)) # a = struct.pack('>h', data_length) # b = chr(data_length >> 8) # c = chr(data_length & 0xFF) elif data_length <= (2 ^ 64 - 1): # back_str.append(chr(127)) back_str.append(struct.pack('b', 127)) back_str.append(struct.pack('>q', data_length)) # back_str.append(chr(data_length >> 8)) # back_str.append(chr(data_length & 0xFF)) else: print(u'太長了') msg = '' for c in back_str: msg += c back_str = str(msg) + message_utf_8 # .encode('utf-8') # connection.send(str.encode(str(u"\x00%s\xFF\n\n" % message))) #這個是舊版 # print (u'send message:' + message) if back_str != None and len(back_str) > 0: print(back_str, '$backstr') print(','.join([str(ord(i)) for i in back_str])) connection.send(back_str) def deleteconnection(item): global connectionlist del connectionlist['connection' + item] class WebSocket(threading.Thread): # 繼承Thread GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" def __init__(self, conn, index, name, remote, path="/"): threading.Thread.__init__(self) # 初始化父類Thread self.conn = conn self.index = index self.name = name self.remote = remote self.path = path self.buffer = "" self.buffer_utf8 = "" self.length_buffer = 0 def run(self): # 重載Thread的run print('Socket%s Start!' % self.index) headers = {} self.handshaken = False while True: if self.handshaken == False: print('Socket%s Start Handshaken with %s!' % (self.index, self.remote)) self.buffer += bytes.decode(self.conn.recv(1024)) if self.buffer.find('\r\n\r\n') != -1: header, data = self.buffer.split('\r\n\r\n', 1) for line in header.split("\r\n")[1:]: key, value = line.split(": ", 1) headers[key] = value headers["Location"] = ("ws://%s%s" % (headers["Host"], self.path)) key = headers['Sec-WebSocket-Key'] token = b64encode(hashlib.sha1(str.encode(str(key + self.GUID))).digest()) handshake = "HTTP/1.1 101 Switching Protocols\r\n" \ "Upgrade: websocket\r\n" \ "Connection: Upgrade\r\n" \ "Sec-WebSocket-Accept: " + bytes.decode(token) + "\r\n" \ "WebSocket-Origin: " + str( headers["Origin"]) + "\r\n" \ "WebSocket-Location: " + str(headers["Location"]) + "\r\n\r\n" self.conn.send(str.encode(str(handshake))) self.handshaken = True print('Socket %s Handshaken with %s success!' % (self.index, self.remote)) sendMessage(u'Welcome, ' + self.name + ' !') self.buffer_utf8 = "" g_code_length = 0 else: global g_code_length global g_header_length mm = self.conn.recv(128) if len(mm) <= 0: continue if g_code_length == 0: get_datalength(mm) # 接受的長度 self.length_buffer = self.length_buffer + len(mm) self.buffer = self.buffer + mm if self.length_buffer - g_header_length < g_code_length: continue else: self.buffer_utf8 = parse_data(self.buffer) # utf8 msg_unicode = str(self.buffer_utf8).decode('utf-8', 'ignore') # unicode if msg_unicode == 'quit': print(u'Socket%s Logout!' % (self.index)) nowTime = time.strftime('%H:%M:%S', time.localtime(time.time())) sendMessage(u'%s %s say: %s' % (nowTime, self.remote, self.name + ' Logout')) deleteconnection(str(self.index)) self.conn.close() break # 退出線程 else: # print (u'Socket%s Got msg:%s from %s!' % (self.index, msg_unicode, self.remote)) nowTime = time.strftime(u'%H:%M:%S', time.localtime(time.time())) sendMessage(u'%s %s say: %s' % (nowTime, self.remote, msg_unicode)) # 重置buffer和bufferlength self.buffer_utf8 = "" self.buffer = "" g_code_length = 0 self.length_buffer = 0 self.buffer = "" class WebSocketServer(object): def __init__(self): self.socket = None def begin(self): print('WebSocketServer Start!') self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(("127.0.0.1", 12345)) self.socket.listen(50) global connectionlist i = 0 while True: connection, address = self.socket.accept() username = address[0] newSocket = WebSocket(connection, i, username, address) newSocket.start() # 開始線程,執行run函數 connectionlist['connection' + str(i)] = connection i = i + 1 if __name__ == "__main__": server = WebSocketServer() server.begin()
python2前端前端
<!DOCTYPE html> </html> <head> <meta charset="utf-8"> </head> <body> <h3>WebSocketTest</h3> <div id="login"> <div> <input id="serverIP" type="text" placeholder="服務器IP" value="127.0.0.1" autofocus="autofocus"/> <input id="serverPort" type="text" placeholder="服務器端口" value="5000"/> <input id="btnConnect" type="button" value="鏈接" onclick="connect()"/> </div> <div> <input id="sendText" type="text" placeholder="發送文本" value="I'm WebSocket Client!"/> <input id="btnSend" type="button" value="發送" onclick="send()"/> </div> <div> <div> 來自服務端的消息 </div> <textarea id="txtContent" cols="50" rows="10" readonly="readonly"></textarea> </div> </div> </body> <script> var socket; function connect() { var host = "ws://" + $("serverIP").value + ":" + $("serverPort").value + "/" socket = new WebSocket(host); try { socket.onopen = function (msg) { $("btnConnect").disabled = true; alert("鏈接成功!"); }; socket.onmessage = function (msg) { if (typeof msg.data == "string") { displayContent(msg.data); } else { alert("非文本消息"); } }; socket.onclose = function (msg) { alert("socket closed!") }; socket.onerror = function (msg) { alert("socket error" + msg) } } catch (ex) { log(ex); } } function send() { var msg = $("sendText").value socket.send(msg); } window.onbeforeunload = function () { try { socket.close(); socket = null; } catch (ex) { } }; function $(id) { return document.getElementById(id); } Date.prototype.Format = function (fmt) { //author: meizz var o = { "M+": this.getMonth() + 1, //月份 "d+": this.getDate(), //日 "h+": this.getHours(), //小時 "m+": this.getMinutes(), //分 "s+": this.getSeconds(), //秒 "q+": Math.floor((this.getMonth() + 3) / 3), //季度 "S": this.getMilliseconds() //毫秒 }; if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length)); for (var k in o) if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr(("" + o[k]).length))); return fmt; } function displayContent(msg) { $("txtContent").value += "\r\n" + new Date().Format("yyyy/MM/dd hh:mm:ss") + ": " + msg; } function onkey(event) { if (event.keyCode == 13) { send(); } } </script> </html>
Python3的實現服務器發送消息有點問題python
import base64 import hashlib import socket import threading import traceback """ 基於socket協議手寫Websocket協議 """ clients = {} # 通知客戶端,廣播,這個函數有點問題 def notify(message): if len(message) > 127: print('message too long') return print(len(clients), 'len(clients)') for connection in list(clients.values()): print('before') content = bytearray(message, 'utf8') bs = bytearray([0x81]) bs.extend(content) print(bytes(bs)) # connection.send(bytes(bs)) connection.send(bytes(b'\x81\x05hello')) # 客戶端處理線程,每一個客戶端都啓動一個線程 class websocket_thread(threading.Thread): def __init__(self, connection, username): super(websocket_thread, self).__init__() self.connection = connection self.username = username def run(self): print('new websocket client joined!') data = self.connection.recv(1024) # 解析用戶請求的頭部,獲取sec-websocket-key,並對其進行sha一、base64 headers = self.parse_headers(data) token = self.generate_token(headers['Sec-WebSocket-Key']) headers = "HTTP/1.1 101 WebSocket Protocol Hybi-10\r\n" \ "Upgrade: WebSocket\r\n" \ "Connection: Upgrade\r\n" \ "Sec-WebSocket-Accept: %s\r\n\r\n" % token self.connection.send(headers.encode('ascii')) print('loop start') while True: try: data = self.connection.recv(1024) except socket.error as e: traceback.print_exc(e) print("unexpected error: ", e) clients.pop(self.username) break if len(data) == 0: continue data = self.parse_data(data) message = self.username + ": " + data notify(message) print(message) # 解析用戶請求中的數據 def parse_data(self, msg): print(msg, type(msg), len(msg)) v = msg[1] & 0x7f if v == 0x7e: p = 4 elif v == 0x7f: p = 10 else: p = 2 mask = msg[p:p + 4] data = msg[p + 4:] ans = bytearray([v ^ mask[k % 4] for k, v in enumerate(data)]) ans = bytes(ans).decode('utf8') return ans def parse_headers(self, msg): headers = {} msg = msg.decode('ascii') header, data = msg.split('\r\n\r\n', 1) for line in header.split('\r\n')[1:]: key, value = line.split(': ', 1) headers[key] = value headers['data'] = data return headers def generate_token(self, msg): # 下面這個字符串是websocket的magicstring key = msg + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' ser_key = hashlib.sha1(key.encode('ascii')).digest() return base64.encodebytes(ser_key).decode('ascii') # 服務端 class websocket_server(threading.Thread): def __init__(self, port): super(websocket_server, self).__init__() self.port = port def run(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('127.0.0.1', self.port)) sock.listen(5) print('websocket server started!') while True: connection, address = sock.accept() try: username = "ID" + str(address[1]) thread = websocket_thread(connection, username) thread.start() clients[username] = connection except socket.timeout: print('websocket connection timeout!') if __name__ == '__main__': server = websocket_server(5000) server.start()
Flask關於websocket有兩個庫:web
這兩個庫的官方例子我都沒有運行成功。打開瀏覽器一直顯示正在加載,不知何故。flask
gevent-socket是實現websocket的另外一種方式。gevent-socket官方例子給的挺好。瀏覽器
但是,當我使用Nginx配置成wss以後,將ws請求轉發到gevent-socket,就沒法正常工做了。不知道爲何。tomcat
from geventwebsocket import WebSocketServer, WebSocketApplication, Resource class EchoApplication(WebSocketApplication): def on_open(self): print("Connection opened") def on_message(self, message): self.ws.send(message[::-1]) def on_close(self, reason): print(reason) WebSocketServer( ('', 5000), Resource({'/': EchoApplication}) ).serve_forever()
html服務器
<html> <head> <style> * { font-size: 20px; } </style> </head> <body> <div style="text-align: center"> <textarea style="width: 80%;height: 70%;" id="all" readonly></textarea> <br> <input id="me" type="text" style="height: 10%;width: 80%;"> </div> </body> <script> ws = new WebSocket("ws://weiyinfu.cn:5000/message/"); ws.onmessage = function (msg) { output('server: ' + msg.data) }; ws.onopen = function (msg) { output('welcome,websocket opened') } ws.onerror = function (msg) { output('bad ! websocket error') } function output(s) { var all = document.getElementById('all') all.value += s + '\n' all.scrollTop = all.scrollHeight } document.getElementById('me').onkeydown = function (e) { if (e.keyCode == 13) { var s = document.getElementById('me').value output('Me: ' + s ) ws.send(s) document.getElementById('me').value = '' e.preventDefault() } } </script> </html>
本文提到四種websocket實現方式websocket
我最終使用了tomcat的websocket。app