1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > python websocket爬虫_python根据websocket抓取斗鱼弹幕和礼物消息

python websocket爬虫_python根据websocket抓取斗鱼弹幕和礼物消息

时间:2021-04-03 08:46:55

相关推荐

python websocket爬虫_python根据websocket抓取斗鱼弹幕和礼物消息

1.斗鱼弹幕协议

到斗鱼官方开放平台看斗鱼通讯协议,网址“/source/api/63”,登录后可查看

所以根据斗鱼协议做编码函数:def msg_encode(msg):

#消息以 \0 结尾,并以utf-8编码

msg = msg + '\0'

msg_bytes = msg.encode('utf-8')

#消息长度 + 头部长度8

length_bytes = int.to_bytes(len(msg) + 8, 4, byteorder='little')

#斗鱼客户端发送消息类型 689

type = 689

type_bytes = int.to_bytes(type, 2, byteorder='little')

# 加密字段与保留字段,默认 0 长度各 1

end_bytes = int.to_bytes(0, 1, byteorder='little')

#按顺序相加 消息长度 + 消息长度 + 消息类型 + 加密字段 + 保留字段

head_bytes = length_bytes + length_bytes + type_bytes + end_bytes + end_bytes

#消息头部拼接消息内容

data = head_bytes + msg_bytes

return data

然后根据斗鱼协议做解码函数:def msg_decode(msg_bytes):

#定义一个游标位置

cursor = 0

msg = []

while cursor < len(msg_bytes):

#根据斗鱼协议,报文 前四位与第二个四位,都是消息长度,取前四位,转化成整型

content_length = int.from_bytes(msg_bytes[cursor: (cursor + 4) - 1], byteorder='little')

#报文长度不包含前4位,从第5位开始截取消息长度的字节流,并扣除前8位的协议头,取出正文,用utf-8编码成字符串

content = msg_bytes[(cursor + 4) + 8:(cursor + 4) + content_length - 1].decode(encoding='utf-8',

errors='ignore')

msg.append(content)

cursor = (cursor + 4) + content_length

# print(msg)

return msg

解码后的消息需要反序列化:def msg_format(msg_str):

try:

msg_dict = {}

msg_list = msg_str.split('/')[0:-1]

for msg in msg_list:

msg = msg.replace('@s', '/').replace('@A', '@')

msg_tmp = msg.split('@=')

msg_dict[msg_tmp[0]] = msg_tmp[1]

return msg_dict

except Exception as e:

print(str(e))

2.抓包分析

建议通过chrome开发者工具抓包,或者使用fiddler抓包,网页的东西,直接用chrome更方便

为什么不用斗鱼开放平台提供的api直接用呢,因为api是面向注册的开发者的,api都需要申请使用,普通用户只能抓包分析通用接口

分析:

中交互消息可以获得多个弹幕服务器地址,我们直接取一个就行,不需要在程序中实现实时分析

然后分析连接中详细交互信息,实现与服务器交互

匿名登录时弹幕中礼物消息可能默认被屏蔽,需在登录时提交开启指令,或登录后提交开启指令

弹幕消息类型与指令非常丰富,这里举例简单的消息和指令,详细可以继续分析

结果如下:

登录:type@=loginreq/roomid@=74751/dfl@=/username@=visitor8243989/uid@=1132317461/ver@=0610/aver@=218101901/ct@=0/

##匿名分配的username与uid可以从中交互消息获取,也可直接自定义提交就行

退出登录:type@=logout/

心跳消息:type@=mrkl/

加入组消息:type@=joingroup/rid@=74751/gid@=-9999/ #gid默认1,此处改成 - 9999 改成海量弹幕模式

屏蔽礼物消息:type@=dmfbdreq/dfl@=sn@AA=105@ASss@AA=1@AS@Ssn@AA=106@ASss@AA=1@AS@Ssn@AA=107@ASss@AA=1@AS@Ssn@AA=108@ASss@AA=1@AS@Ssn@AA=110@ASss@AA=1@AS@Ssn@AA=901@ASss@AA=1@AS@S/

开启礼物消息:type@=dmfbdreq/dfl@=sn@AA=105@ASss@AA=0@AS@Ssn@AA=106@ASss@AA=0@AS@Ssn@AA=107@ASss@AA=0@AS@Ssn@AA=108@ASss@AA=0@AS@Ssn@AA=110@ASss@AA=0@AS@Ssn@AA=901@ASss@AA=0@AS@S/

##开启与屏蔽礼物可以详细控制,可以逐项尝试分析

注:登录消息中有个字段/dfl@=,此处直接加入屏蔽礼物或开启礼物指令中相应dfl@=后面的内容,即可在登录时控制礼物屏蔽或开启

3.礼物消息处理

获取斗鱼礼物类型,需抓包分析来源网址

url1 = '/resource/common/gift/flash/gift_effect.json'

url2 = '/resource/common/prop_gift_list/prop_gift_config.json'

将获取到的礼物json处理一下,合并且去除不需要信息,做成字典:

def get_gift_dict():

gift_json = {}

gift_json1 = requests.get('/resource/common/gift/flash/gift_effect.json').text

gift_json2 = requests.get('/resource/common/prop_gift_list/prop_gift_config.json').text

gift_json1=gift_json1.replace('DYConfigCallback(','')[0:-2]

gift_json2=gift_json2.replace('DYConfigCallback(','')[0:-2]

gift_json1 = json.loads(gift_json1)['data']['flashConfig']

gift_json2= json.loads(gift_json2)['data']

for gift in gift_json1:

gift_json[gift] = gift_json1[gift]['name']

for gift in gift_json2:

gift_json[gift] = gift_json2[gift]['name']

return gift_json

4.websocket连接

python环境先安装websocket包 pip3 install websocket-client

引入websocket包后,使用websecket.WebSocketApp建立websecket连接客户端,创建时在on_open、on_error、on_message、on_close4个参数中传入相应事件发生时需要处理的方法

顾名思义,4个参数很好理解了

websocket.WebSocketApp(url, on_open=on_open, on_error=on_error,on_message=on_message, on_close=on_close)

5.效果与源码

实际效果展示:

源码(演示代码,异常判断,数据校验就不做了):

__author__ = 'admin'

import websocket

import threading

import time

import requests

import json

class DyDanmu:

def __init__(self, roomid, url):

self.gift_dict = self.get_gift_dict()

self.gift_dict_keys = self.gift_dict.keys()

self.room_id = roomid

self.client = websocket.WebSocketApp(url, on_open=self.on_open, on_error=self.on_error,

on_message=self.on_message, on_close=self.on_close)

self.heartbeat_thread = threading.Thread(target=self.heartbeat)

def start(self):

self.client.run_forever()

def stop(self):

self.logout()

self.client.close()

def on_open(self):

self.login()

self.join_group()

self.heartbeat_thread.setDaemon(True)

self.heartbeat_thread.start()

def on_error(self, error):

print(error)

def on_close(self):

print('close')

def send_msg(self, msg):

msg_bytes = self.msg_encode(msg)

self.client.send(msg_bytes)

def on_message(self, msg):

message = self.msg_decode(msg)

# print(message)

for msg_str in message:

msg_dict = self.msg_format(msg_str)

if msg_dict['type'] == 'chatmsg':

print(msg_dict['nn'] + ':' + msg_dict['txt'])

if msg_dict['type'] == 'dgb':

if msg_dict['gfid'] in self.gift_dict_keys:

print(msg_dict['nn'] + '\t送出\t' + msg_dict['gfcnt'] + '\t个\t' + self.gift_dict[msg_dict['gfid']])

else:

print(msg_dict['nn'] + '\t送出\t' + msg_dict['gfcnt'] + '\t个\t' + msg_dict['gfid'] + '\t未知礼物')

# print(msg_dict)

# 发送登录信息

def login(self):

login_msg = 'type@=loginreq/roomid@=%s/' \

'dfl@=sn@AA=105@ASss@AA=0@AS@Ssn@AA=106@ASss@AA=0@AS@Ssn@AA=107@ASss@AA=0@AS@Ssn@AA=108@ASss@AA=0@AS@Ssn@AA=110@ASss@AA=0@AS@Ssn@AA=901@ASss@AA=0/' \

'username@=%s/uid@=%s/ltkid@=/biz@=/stk@=/devid@=8d8c22ce6093e6a7264f99da00021501/ct@=0/pt@=2/cvr@=0/tvr@=7/apd@=/rt@=1605498503/vk@=0afb8a90c2cb545e8459d60c760dc08b/' \

'ver@=0610/aver@=218101901/dmbt@=chrome/dmbv@=78/' % (

self.room_id, 'visitor4444086', '1178849206'

)

self.send_msg(login_msg)

def logout(self):

logout_msg = 'type@=logout/'

self.send_msg(logout_msg)

# 发送入组消息

def join_group(self):

join_group_msg = 'type@=joingroup/rid@=%s/gid@=-9999/' % (self.room_id)

self.send_msg(join_group_msg)

# 关闭礼物信息推送

def close_gift(self):

close_gift_msg = 'type@=dmfbdreq/dfl@=sn@AA=105@ASss@AA=1@AS@Ssn@AA=106@ASss@AA=1@AS@Ssn@AA=107@ASss@AA=1@AS@Ssn@AA=108@ASss@AA=1@AS@Ssn@AA=110@ASss@AA=1@AS@Ssn@AA=901@ASss@AA=1@AS@S/'

self.send_msg(close_gift_msg)

# 保持心跳线程

def heartbeat(self):

while True:

# 45秒发送一个心跳包

self.send_msg('type@=mrkl/')

print('发送心跳')

time.sleep(45)

def msg_encode(self, msg):

# 消息以 \0 结尾,并以utf-8编码

msg = msg + '\0'

msg_bytes = msg.encode('utf-8')

#消息长度 + 头部长度8

length_bytes = int.to_bytes(len(msg) + 8, 4, byteorder='little')

#斗鱼客户端发送消息类型 689

type = 689

type_bytes = int.to_bytes(type, 2, byteorder='little')

# 加密字段与保留字段,默认 0 长度各 1

end_bytes = int.to_bytes(0, 1, byteorder='little')

#按顺序相加 消息长度 + 消息长度 + 消息类型 + 加密字段 + 保留字段

head_bytes = length_bytes + length_bytes + type_bytes + end_bytes + end_bytes

#消息头部拼接消息内容

data = head_bytes + msg_bytes

return data

def msg_decode(self, msg_bytes):

# 定义一个游标位置

cursor = 0

msg = []

while cursor < len(msg_bytes):

#根据斗鱼协议,报文 前四位与第二个四位,都是消息长度,取前四位,转化成整型

content_length = int.from_bytes(msg_bytes[cursor: (cursor + 4) - 1], byteorder='little')

#报文长度不包含前4位,从第5位开始截取消息长度的字节流,并扣除前8位的协议头,取出正文,用utf-8编码成字符串

content = msg_bytes[(cursor + 4) + 8:(cursor + 4) + content_length - 1].decode(encoding='utf-8',

errors='ignore')

msg.append(content)

cursor = (cursor + 4) + content_length

# print(msg)

return msg

def msg_format(self, msg_str):

try:

msg_dict = {}

msg_list = msg_str.split('/')[0:-1]

for msg in msg_list:

msg = msg.replace('@s', '/').replace('@A', '@')

msg_tmp = msg.split('@=')

msg_dict[msg_tmp[0]] = msg_tmp[1]

return msg_dict

except Exception as e:

print(str(e))

def get_gift_dict(self):

gift_json = {}

gift_json1 = requests.get('/resource/common/gift/flash/gift_effect.json').text

gift_json2 = requests.get(

'/resource/common/prop_gift_list/prop_gift_config.json').text

gift_json1 = gift_json1.replace('DYConfigCallback(', '')[0:-2]

gift_json2 = gift_json2.replace('DYConfigCallback(', '')[0:-2]

gift_json1 = json.loads(gift_json1)['data']['flashConfig']

gift_json2 = json.loads(gift_json2)['data']

for gift in gift_json1:

gift_json[gift] = gift_json1[gift]['name']

for gift in gift_json2:

gift_json[gift] = gift_json2[gift]['name']

return gift_json

if __name__ == '__main__':

roomid = '74751'

url = 'wss://:8506/'

dy = DyDanmu(roomid, url)

dy.start()

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。