環境web
pip install ws4py from ws4py.client.threadedclient import WebSocketClient
1、websocket協議json
2、工具層 utils.pywebsocket
class Timer(): def __init__(self,delay,fun): self.delay,self.f=delay,fun self.t=threading.Timer(self.delay,self.fun) self.t.start() def fun(self): if self.f:self.f() self.t=threading.Timer(self.delay,self.fun) self.t.start() def cancel(self): self.t.cancel() print("threading cancel") class Event(): def __init__(self): self.map=[] self.keys=[] def index(self,k): i=-1 for key in self.keys: i+=1 if key==k:return i return -1 def on(self,key,fun): i=self.index(key) if i==-1: self.map.append({"key":key,"funs":[fun]}) self.keys.append(key) else: self.map[i]["funs"].append(fun) def emit(self,key,data=None): i=self.index(key) if i==-1: print("no regist event:"+str(key)) return for f in self.map[i]["funs"]:f(data) def rm(self,key,fun): i=self.index(key) if i==-1: print("no regist event:"+str(key)) return funs=self.map[i]["funs"] for j in range(len(funs)): if funs[j]==fun:funs[j]=None self.map[i]["funs"]=list(filter(None,funs))
3、服務層 DanmuWS.pycookie
import threading import json import struct from ws4py.client.threadedclient import WebSocketClient from utils import Event,Timer event=Event() class DanmuWebSocket(WebSocketClient): def __init__(self,info,serveraddress='wss://broadcastlv.chat.bilibili.com/sub'): self.serveraddress=serveraddress WebSocketClient.__init__(self,serveraddress) DanmuWebSocket.event=event DanmuWebSocket.headerLength=16 self.Info=info def opened(self): self.sendLoginPacket(self.Info['uid'],self.Info['roomid'],self.Info['protover'],self.Info['platform'],self.Info['clientver']) self.sendHeartBeatPacket(); self.heartBeatHandler = Timer(20,self.sendHeartBeatPacket) print("opened") def delay_close(self): dws=DanmuWebSocket(self.Info,self.serveraddress) event.emit('reconnect',dws); def closed(self, code, reason=None): print("Closed", code, reason) if hasattr(self,"heartBeatHandler"):self.heartBeatHandler.cancel(); if code == 1000: return threading.Timer(5,self.delay_close).start() print("Closed", code, reason) def received_message(self, message): position,length=0,len(message.data)-1 while position<length: header_pack=struct.unpack(">IHHII",message.data[position:position+16]) length_pack=header_pack[0] operation=header_pack[3] if operation==3: num=header_pack[1]+position num=struct.unpack(">I",message.data[num:num+4])[0] event.emit('heartbeat',num) elif operation==5: data=json.loads(message.data[position+16:position+length_pack]) event.emit('cmd',data) #print("recv:"+data["cmd"]) else: event.emit('login'); position+=length_pack def sendData(self,data, protover = 1, operation = 2, sequence = 1): if type(data)==dict: data=json.dumps(data).encode() elif type(data)==str: data=data.encode() header=struct.pack(">IHHII",DanmuWebSocket.headerLength+len(data),DanmuWebSocket.headerLength,protover,operation,sequence) self.send(header+data) def sendLoginPacket(self,uid, roomid, protover = 1, platform = 'web', clientver = '1.4.6'): # Uint(4byte) + 00 10 + 00 01 + 00 00 00 07 + 00 00 00 01 + Data 登陸數據包 data = { 'uid': int(uid), 'roomid': int(roomid), 'protover': protover, 'platform': platform, 'clientver': clientver } print("sendLoginPacket") data=json.dumps(data) data=data.replace(' ','') self.sendData(data.encode(),1,7,1) def sendHeartBeatPacket(self): # Uint(4byte) + 00 10 + 00 01 + 00 00 00 02 + 00 00 00 01 + Data 心跳數據包 self.sendData(b'[object Object]', 1, 2, 1); def bind(self,onreconnect=None,onlogin=None,onheartbeat=None,oncmd=None,onreceive =None): if "cmd" in event.keys:return if hasattr(onreconnect,"__call__"):event.on("reconnect",onreconnect) if hasattr(onlogin,"__call__"):event.on("login",onlogin) if hasattr(onheartbeat,"__call__"):event.on("heartbeat",onheartbeat) if hasattr(oncmd,"__call__"):event.on("cmd",oncmd) if hasattr(onreceive,"__call__"):event.on("receive",onreceive)
4、測試代碼session
from server import Login headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Accept-Encoding': 'gzip, deflate, br', 'Referer': 'https://live.bilibili.com/', 'Origin': 'https://live.bilibili.com', 'Connection': 'keep-alive' } s=session(headers,'cookie.txt') login=Login(s) while not login.isLogin(): login.get_vdcode() login.loop_vdcode() info={ "uid": login.info['uid'], "roomid": 7603080, "protover": 1, "platform": "web", "clientver": "1.4.0" }
from DanmuWS import DanmuWebSocket def oncmd(data): cmd=data["cmd"] if cmd=="SYS_MSG": print(data) elif cmd=="SPECIAL_GIFT": print(data) else: print(data) def onlogin(data): print("login success") def onreconnect(dws): global ws ws=dws def onheartbeat(num): print(num) try: ws = DanmuWebSocket(info,'wss://broadcastlv.chat.bilibili.com/sub') ws.connect() ws.bind(None,onlogin,onheartbeat,oncmd) ws.run_forever() except: ws.close()