refactor: simplified the code structure for better maintainability

This commit is contained in:
ihmily
2024-10-02 06:45:31 +08:00
parent 7b6ea5c1b0
commit 40559a3b35
17 changed files with 884 additions and 706 deletions

75
.github/ISSUE_TEMPLATE/bug.yml vendored Normal file
View File

@@ -0,0 +1,75 @@
name: 🐛 Bug report
description: 项目运行中遇到的Bug或问题。
labels: ["status: issue-bug"]
body:
- type: markdown
attributes:
value: |
📝 **请在上方的`title`中填写一个简洁明了的标题**,格式建议为:[Bug] 简短描述。
例如:[Bug] B站某些直播间无法录制。
- type: checkboxes
attributes:
label: ⚠️ 确认是否已存在类似问题
description: >
🔍 [点击这里搜索历史issue](https://github.com/ihmily/DouyinLiveRecorder/issues?q=is%3Aissue)
请确保你的问题没有被报告过。
options:
- label: 我已经搜索过issues没有找到类似问题
required: true
- type: dropdown
attributes:
label: 🔧 运行方式
description: 请选择你是如何运行程序的。
options:
- 直接运行的exe文件
- 使用源代码运行
validations:
required: true
- type: dropdown
attributes:
label: 🐍 如果是使用源代码运行请选择你的Python环境版本
description: 请选择你运行程序的Python版本。
options:
- Python 3.8
- Python 3.9
- Python 3.10
- Python 3.11
- Python 3.12
- Other (请在问题中说明)
validations:
required: false
- type: checkboxes
attributes:
label: ⚠️ 确认是否已经重试多次
description: >
有时可能是你的设备或者网络问题导致的。
options:
- label: 我已经尝试过多次,仍然出现问题
required: true
- type: textarea
attributes:
label: 🕹 复现步骤
description: |
**⚠️ 不能复现将会关闭issue.**
请按照以下格式填写:
1. 录制的直播间地址是...
2. 使用的录制格式是...
3. ...
placeholder: |
1. ...
2. ...
3. ...
validations:
required: true
- type: textarea
attributes:
label: 😯 问题描述
description: 详细描述出现的问题,或提供有关截图。
validations:
required: true
- type: textarea
attributes:
label: 📜 错误信息
description: 如果有,请贴出相关的日志错误信息或者截图。
validations:
required: false

42
.github/ISSUE_TEMPLATE/feature.yml vendored Normal file
View File

@@ -0,0 +1,42 @@
name: 🚀 Feature request
description: 提出你对项目的新想法或建议。
labels: ["status: issue-feature"]
body:
- type: markdown
attributes:
value: |
📝 **请在上方的`title`中填写一个简洁明了的标题**,格式建议为:[Feature] 简短描述。
例如:[Feature] 添加xx直播录制。
- type: checkboxes
attributes:
label: ⚠️ 搜索是否存在类似issue
description: >
🔍 请在 [历史issue](https://github.com/ihmily/DouyinLiveRecorder/issues) 中清空输入框使用关键词搜索确保没有重复的issue。
options:
- label: 我已经搜索过issues没有发现相似issue
required: true
- type: textarea
attributes:
label: 📜 功能描述
description: 请详细描述你希望添加的功能,包括它的工作方式和预期效果。
placeholder: |
功能描述:
- type: textarea
attributes:
label: 🌐 举例(可选)
description: 如果可能,请提供功能相关的示例、截图或相关网址。
placeholder: |
直播间示例地址:
`https://www.example.com/live/xxxx`
- type: textarea
attributes:
label: 💡 动机
description: 描述你提出该feature的动机以及没有这项feature对你的使用造成了怎样的影响。
placeholder: |
我需要这个功能是因为...
- type: textarea
attributes:
label: 📚 附加信息
description: 如果有其他相关信息,如屏幕截图、错误日志或相关文档,请在这里提供。

4
.gitignore vendored
View File

@@ -99,7 +99,7 @@ ipython_config.py
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
@@ -158,3 +158,5 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
backup_config/

View File

@@ -1,71 +0,0 @@
# -*- encoding: utf-8 -*-
"""
Author: Hmily
Github: https://github.com/ihmily
Date: 2023-08-04 17:37:00
Copyright (c) 2023 by Hmily, All Rights Reserved.
Function: 本代码用于自动获取 抖音直播间页面cookies ,可用于录制
请确保电脑有对应浏览器并以及其驱动文件
"""
"""
驱动下载地址:
https://registry.npmmirror.com/binary.html?path=chromedriver/ # 谷歌
https://registry.npmmirror.com/binary.html?path=geckodriver/ # 火狐
"""
import time
from selenium import webdriver
# 下面演示谷歌浏览器获取
from selenium.webdriver.chrome.options import Options
# 请下载自己浏览器对应版本的驱动exe并注意其路径或者配置环境变量
# 谷歌浏览器版本 108.0.5359.95
# chromedriver版本 108.0.5359.71
# 需要安装对应浏览器的驱动
driver_path = "path/to/your/chromedriver.exe"
def get_cookies(url):
chrome_options = Options()
chrome_options.add_argument('-headless') # Chrome的无头模式即不用显示打开浏览器的界面
chrome_options.add_argument('--disable-gpu')
# driver = webdriver.Chrome(options=chrome_options,executable_path=driver_path)
driver = webdriver.Chrome(options=chrome_options) # 这里我是直接将驱动配置了环境变量
driver.get(url)
# 先清除浏览器中的所有 Cookie
driver.delete_all_cookies()
driver.get(url) # 再次打开页面
time.sleep(10) # 等待页面加载
# 获取 Cookie
cookies = driver.get_cookies()
# print(cookies)
return cookies
# 将包含 Cookie 信息的列表转换为字典
def cookies_to_dict(cookies_list):
cookies_dict = {}
for cookie in cookies_list:
cookies_dict[cookie['name']] = cookie['value']
return cookies_dict
# 将字典转换为适用于 requests 请求头的字符串
def dict_to_cookie_str(cookies_dict):
cookie_str = '; '.join([f"{key}={value}" for key, value in cookies_dict.items()])
return cookie_str
if __name__ == '__main__':
url = 'https://live.douyin.com/745964462470' # 任意直播间页面地址
cookies_list = get_cookies(url)
cookies_dict = cookies_to_dict(cookies_list)
cookie_str = dict_to_cookie_str(cookies_dict)
print(cookie_str)
# headers = {
# 'Cookie': cookie_str,
# # 其他请求头
# }

160
demo.py Normal file
View File

@@ -0,0 +1,160 @@
# -*- coding: utf-8 -*-
from loguru import logger
from douyinliverecorder import spider
# 以下示例直播间链接不保证时效性,请自行查看链接是否能正常访问
LIVE_STREAM_CONFIG = {
"douyin": {
"url": "https://live.douyin.com/745964462470",
"func": spider.get_douyin_stream_data,
},
"tiktok": {
"url": "https://www.tiktok.com/@pearlgaga88/live",
"func": spider.get_tiktok_stream_data,
},
"kuaishou": {
"url": "https://live.kuaishou.com/u/yall1102",
"func": spider.get_kuaishou_stream_data2,
},
"huya": {
"url": "https://www.huya.com/116",
"func": spider.get_huya_stream_data,
},
"douyu": {
"url": "https://www.douyu.com/topic/wzDBLS6?rid=4921614&dyshid=",
"func": spider.get_douyu_info_data,
},
"yy": {
"url": "https://www.yy.com/22490906/22490906",
"func": spider.get_yy_stream_data,
},
"bilibili": {
"url": "https://live.bilibili.com/21593109",
"func": spider.get_bilibili_stream_data,
},
"xhs": {
"url": "http://xhslink.com/O9f9fM",
"func": spider.get_xhs_stream_url,
},
"bigo": {
"url": "https://www.bigo.tv/cn/716418802",
"func": spider.get_bigo_stream_url,
},
"blued": {
"url": "https://app.blued.cn/live?id=Mp6G2R",
"func": spider.get_blued_stream_url,
},
"afreecatv": {
"url": "https://play.afreecatv.com/sw7love",
"func": spider.get_afreecatv_stream_data,
},
"netease": {
"url": "https://cc.163.com/583946984",
"func": spider.get_netease_stream_data,
},
"qiandurebo": {
"url": "https://qiandurebo.com/web/video.php?roomnumber=33333",
"func": spider.get_qiandurebo_stream_data,
},
"pandatv": {
"url": "https://www.pandalive.co.kr/live/play/bara0109",
"func": spider.get_pandatv_stream_data,
},
"maoerfm": {
"url": "https://fm.missevan.com/live/868895007",
"func": spider.get_maoerfm_stream_url,
},
"winktv": {
"url": "https://www.winktv.co.kr/live/play/anjer1004",
"func": spider.get_winktv_stream_data,
},
"flextv": {
"url": "https://www.flextv.co.kr/channels/593127/live",
"func": spider.get_flextv_stream_data,
},
"looklive": {
"url": "https://look.163.com/live?id=65108820&position=3",
"func": spider.get_looklive_stream_url,
},
"popkontv": {
"url": "https://www.popkontv.com/live/view?castId=wjfal007&partnerCode=P-00117",
"func": spider.get_popkontv_stream_url,
},
"twitcasting": {
"url": "https://twitcasting.tv/c:uonq",
"func": spider.get_twitcasting_stream_url,
},
"baidu": {
"url": "https://live.baidu.com/m/media/pclive/pchome/live.html?room_id=9175031377&tab_category",
"func": spider.get_baidu_stream_data,
},
"weibo": {
"url": "https://weibo.com/u/7849520225",
"func": spider.get_weibo_stream_data,
},
"kugou": {
"url": "https://fanxing2.kugou.com/50428671?refer=2177&sourceFrom=",
"func": spider.get_kugou_stream_url,
},
"twitchtv": {
"url": "https://www.twitch.tv/gamerbee",
"func": spider.get_twitchtv_stream_data,
},
"liveme": {
"url": "https://www.liveme.com/zh/v/17141937295821012854/index.html",
"func": spider.get_liveme_stream_url,
},
"huajiao": {
"url": "https://www.huajiao.com/user/223184650",
"func": spider.get_huajiao_stream_url,
},
"liuxing": {
"url": "https://www.7u66.com/100960",
"func": spider.get_liuxing_stream_url,
},
"showroom": {
"url": "https://www.showroom-live.com/room/profile?room_id=511033",
"func": spider.get_showroom_stream_data,
},
"acfun": {
"url": "https://live.acfun.cn/live/17912421",
"func": spider.get_acfun_stream_data,
},
"shiguang": {
"url": "https://www.rengzu.com/180778",
"func": spider.get_shiguang_stream_url,
},
"yingke": {
"url": "https://www.inke.cn/liveroom/index.html?uid=710032101&id=1720857535354099",
"func": spider.get_yingke_stream_url,
},
"yinbo": {
"url": "https://live.ybw1666.com/800002949",
"func": spider.get_yinbo_stream_url,
},
"zhihu": {
"url": "https://www.zhihu.com/theater/114453",
"func": spider.get_zhihu_stream_url,
},
"chzzk": {
"url": "https://chzzk.naver.com/live/458f6ec20b034f49e0fc6d03921646d2",
"func": spider.get_chzzk_stream_data,
}
}
def test_live_stream(platform_name: str) -> None:
if platform_name in LIVE_STREAM_CONFIG:
config = LIVE_STREAM_CONFIG[platform_name]
try:
stream_data = config['func'](config['url'], proxy_addr='')
logger.debug(f"Stream data for {platform_name}: {stream_data}")
except Exception as e:
logger.error(f"Error fetching stream data for {platform_name}: {e}")
else:
logger.warning(f"No configuration found for platform: {platform_name}")
if __name__ == "__main__":
platform = "douyin"
test_live_stream(platform)

View File

@@ -10,7 +10,7 @@ Copyright (c) 2023-2024 by Hmily, All Rights Reserved.
from typing import Dict, Any, Optional
import json
import urllib.request
from utils import trace_error_decorator
from .utils import trace_error_decorator
import smtplib
from email.header import Header
from email.mime.multipart import MIMEMultipart
@@ -43,9 +43,9 @@ def dingtalk(url: str, content: str, number: Optional[str] = '') -> Dict[str, An
@trace_error_decorator
def xizhi(url: str, content: str) -> Dict[str, Any]:
def xizhi(url: str, content: str, title: str = '直播间状态更新') -> Dict[str, Any]:
json_data = {
'title': '直播间状态更新',
'title': title,
'content': content
}
data = json.dumps(json_data).encode('utf-8')
@@ -66,13 +66,13 @@ def email_message(mail_host: str, mail_pass: str, from_email: str, to_email: str
if len(receivers) == 1:
message['To'] = receivers[0]
tApart = MIMEText(content, 'plain', 'utf-8')
message.attach(tApart)
t_apart = MIMEText(content, 'plain', 'utf-8')
message.attach(t_apart)
try:
smtpObj = smtplib.SMTP_SSL(mail_host, 465)
smtpObj.login(from_email, mail_pass)
smtpObj.sendmail(from_email, receivers, message.as_string())
smtp_obj = smtplib.SMTP_SSL(mail_host, 465)
smtp_obj.login(from_email, mail_pass)
smtp_obj.sendmail(from_email, receivers, message.as_string())
data = {'code': 200, 'msg': '邮件发送成功'}
return data
except smtplib.SMTPException as e:
@@ -96,18 +96,18 @@ def tg_bot(chat_id: int, token: str, content: str) -> Dict[str, Any]:
@trace_error_decorator
def bark(api: str, title: str = "message", content: str = 'test', level: str = "active",
badge: int = 1, autoCopy: int = 1, sound: str = "", icon: str = "", group: str = "",
isArchive: int = 1, url: str = "") -> Dict[str, Any]:
badge: int = 1, auto_copy: int = 1, sound: str = "", icon: str = "", group: str = "",
is_archive: int = 1, url: str = "") -> Dict[str, Any]:
json_data = {
"title": title,
"body": content,
"level": level,
"badge": badge,
"autoCopy": autoCopy,
"autoCopy": auto_copy,
"sound": sound,
"icon": icon,
"group": group,
"isArchive": isArchive,
"isArchive": is_archive,
"url": url
}

View File

@@ -4,7 +4,7 @@
Author: Hmily
GitHub: https://github.com/ihmily
Date: 2023-07-15 23:15:00
Update: 2024-09-30 02:09:12
Update: 2024-10-02 04:36:12
Copyright (c) 2023-2024 by Hmily, All Rights Reserved.
Function: Get live stream data.
"""
@@ -13,6 +13,7 @@ import hashlib
import random
import string
import time
from operator import itemgetter
import urllib.parse
import urllib.error
from urllib.request import Request
@@ -23,13 +24,12 @@ import re
import json
import execjs
import urllib.request
from utils import (
trace_error_decorator,
update_config, dict_to_cookie_str
)
from logger import script_path
import http.cookiejar
from web_rid import get_sec_user_id
from .utils import (
trace_error_decorator, dict_to_cookie_str
)
from .logger import script_path
from .web_rid import get_sec_user_id
no_proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(no_proxy_handler)
@@ -241,8 +241,8 @@ def get_douyin_stream_data(url: str, proxy_addr: Union[str, None] = None, cookie
match_json_str = re.search(r'(\{\\"common\\":.*?)]\\n"]\)</script><div hidden', html_str)
json_str = match_json_str.group(1)
cleaned_string = json_str.replace('\\', '').replace(r'u0026', r'&')
room_store = re.search('"roomStore":(.*?),"linkmicStore"', cleaned_string, re.S).group(1)
anchor_name = re.search('"nickname":"(.*?)","avatar_thumb', room_store, re.S).group(1)
room_store = re.search('"roomStore":(.*?),"linkmicStore"', cleaned_string, re.DOTALL).group(1)
anchor_name = re.search('"nickname":"(.*?)","avatar_thumb', room_store, re.DOTALL).group(1)
room_store = room_store.split(',"has_commerce_goods"')[0] + '}}}'
json_data = json.loads(room_store)['roomInfo']['room']
json_data['anchor_name'] = anchor_name
@@ -259,7 +259,7 @@ def get_douyin_stream_data(url: str, proxy_addr: Union[str, None] = None, cookie
else:
html_str = html_str.replace('\\', '').replace('u0026', '&')
match_json_str3 = re.search('"origin":\{"main":(.*?),"dash"', html_str, re.S)
match_json_str3 = re.search('"origin":\{"main":(.*?),"dash"', html_str, re.DOTALL)
if match_json_str3:
origin_url_list = json.loads(match_json_str3.group(1) + '}')
@@ -297,7 +297,7 @@ def get_tiktok_stream_data(url: str, proxy_addr: Union[str, None] = None, cookie
try:
json_str = re.findall(
'<script id="SIGI_STATE" type="application/json">(.*?)</script>',
html_str, re.S)[0]
html_str, re.DOTALL)[0]
except Exception:
raise ConnectionError("请检查你的网络是否可以正常访问TikTok网站")
json_data = json.loads(json_str)
@@ -594,7 +594,7 @@ def get_yy_stream_data(url: str, proxy_addr: Union[str, None] = None, cookies: U
html_str = get_req(url=url, proxy_addr=proxy_addr, headers=headers)
anchor_name = re.search('nick: "(.*?)",\n\s+logo', html_str).group(1)
cid = re.search('sid : "(.*?)",\n\s+ssid', html_str, re.S).group(1)
cid = re.search('sid : "(.*?)",\n\s+ssid', html_str, re.DOTALL).group(1)
data = '{"head":{"seq":1701869217590,"appidstr":"0","bidstr":"121","cidstr":"' + cid + '","sidstr":"' + cid + '","uid64":0,"client_type":108,"client_ver":"5.17.0","stream_sys_ver":1,"app":"yylive_web","playersdk_ver":"5.17.0","thundersdk_ver":"0","streamsdk_ver":"5.17.0"},"client_attribute":{"client":"web","model":"web0","cpu":"","graphics_card":"","os":"chrome","osversion":"0","vsdk_version":"","app_identify":"","app_version":"","business":"","width":"1920","height":"1080","scale":"","client_type":8,"h265":0},"avp_parameter":{"version":1,"client_type":8,"service_type":0,"imsi":0,"send_time":1701869217,"line_seq":-1,"gear":4,"ssl":1,"stream_format":0}}'
data_bytes = data.encode('utf-8')
@@ -684,11 +684,11 @@ def get_bilibili_stream_data(url: str, qn: str = '10000', platform: str = 'web',
playurl_info = json_data['data']['playurl_info']
format_list = playurl_info['playurl']['stream'][0]['format']
stream_data_list = format_list[0]['codec']
sorted_stream_list = sorted(stream_data_list, key=lambda x: x["current_qn"], reverse=True)
sorted_stream_list = sorted(stream_data_list, key=itemgetter("current_qn"), reverse=True)
# qn: 30000=杜比 20000=4K 10000=原画 400=蓝光 250=超清 150=高清 80=流畅
video_quality_options = {'10000': 0, '400': 1, '250': 2, '150': 3, '80': 4}
qn_count = len(sorted_stream_list)
select_stream_index = video_quality_options[qn] if qn_count - 1 >= video_quality_options[qn] else qn_count - 1
select_stream_index = min(video_quality_options[qn], qn_count - 1)
stream_data: dict = sorted_stream_list[select_stream_index]
base_url = stream_data['base_url']
host = stream_data['url_info'][0]['host']
@@ -775,7 +775,7 @@ def get_bigo_stream_url(url: str, proxy_addr: Union[str, None] = None, cookies:
result['record_url'] = m3u8_url
elif result['anchor_name'] == '':
html_str = get_req(url=f'https://www.bigo.tv/cn/{room_id}', proxy_addr=proxy_addr, headers=headers)
result['anchor_name'] = re.search('<title>欢迎来到(.*?)的直播间</title>', html_str, re.S).group(1)
result['anchor_name'] = re.search('<title>欢迎来到(.*?)的直播间</title>', html_str, re.DOTALL).group(1)
return result
@@ -792,7 +792,7 @@ def get_blued_stream_url(url: str, proxy_addr: Union[str, None] = None, cookies:
headers['Cookie'] = cookies
html_str = get_req(url=url, proxy_addr=proxy_addr, headers=headers)
json_str = re.search('decodeURIComponent\(\"(.*?)\"\)\),window\.Promise', html_str, re.S).group(1)
json_str = re.search('decodeURIComponent\(\"(.*?)\"\)\),window\.Promise', html_str, re.DOTALL).group(1)
json_str = urllib.parse.unquote(json_str)
json_data = json.loads(json_str)
anchor_name = json_data['userInfo']['name']
@@ -999,6 +999,7 @@ def get_afreecatv_stream_data(
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['play_url_list'] = get_url_list(m3u8_url)
result['new_cookies'] = cookie
return result
if json_data['data']['code'] == -3001:
@@ -1010,7 +1011,6 @@ def get_afreecatv_stream_data(
print("正在尝试使用您的账号和密码登录AfreecaTV直播平台请确保已配置")
new_cookie = handle_login()
if new_cookie and len(new_cookie) > 0:
update_config('./config/config.ini', 'Cookie', 'afreecatv_cookie', new_cookie)
return fetch_data(new_cookie)
raise RuntimeError('AfreecaTV登录失败请检查账号和密码是否正确')
@@ -1031,6 +1031,7 @@ def get_afreecatv_stream_data(
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['play_url_list'] = get_url_list(m3u8_url)
result['new_cookies'] = None
return result
@@ -1049,7 +1050,7 @@ def get_netease_stream_data(url: str, proxy_addr: Union[str, None] = None, cooki
html_str = get_req(url=url, proxy_addr=proxy_addr, headers=headers)
json_str = re.search('<script id="__NEXT_DATA__" .* crossorigin="anonymous">(.*?)</script></body>',
html_str, re.S).group(1)
html_str, re.DOTALL).group(1)
json_data = json.loads(json_str)
room_data = json_data['props']['pageProps']['roomInfoInitData']
live_data = room_data['live']
@@ -1076,7 +1077,7 @@ def get_qiandurebo_stream_data(url: str, proxy_addr: Union[str, None] = None, co
headers['Cookie'] = cookies
html_str = get_req(url=url, proxy_addr=proxy_addr, headers=headers)
data = re.search('var user = (.*?)\r\n\s+user\.play_url', html_str, re.S).group(1)
data = re.search('var user = (.*?)\r\n\s+user\.play_url', html_str, re.DOTALL).group(1)
anchor_name = re.findall('"zb_nickname": "(.*?)",\r\n', data)
result = {"anchor_name": "", "is_live": False}
@@ -1306,7 +1307,7 @@ def login_flextv(username: str, password: str, proxy_addr: Union[str, None] = No
def get_flextv_stream_url(
url: str, proxy_addr: Union[str, None] = None, cookies: Union[str, None] = None,
username: Union[str, None] = None, password: Union[str, None] = None
) -> Union[str, Any]:
) -> Any:
def fetch_data(cookie):
headers = {
'accept': 'application/json, text/plain, */*',
@@ -1323,6 +1324,7 @@ def get_flextv_stream_url(
raise ConnectionError('获取FlexTV直播数据失败请切换代理重试')
return json.loads(json_str)
new_cookie = None
json_data = fetch_data(cookies)
if "message" in json_data and json_data["message"] == "로그인후 이용이 가능합니다.":
print("FlexTV直播获取失败[未登录]: 19+直播需要登录后是成人才可观看")
@@ -1334,13 +1336,12 @@ def get_flextv_stream_url(
if new_cookie:
print('FlexTV平台登录成功开始获取直播数据...')
json_data = fetch_data(new_cookie)
update_config('./config/config.ini', 'Cookie', 'flextv_cookie', new_cookie)
else:
raise RuntimeError('FlexTV登录失败')
if 'sources' in json_data and len(json_data['sources']) > 0:
play_url = json_data['sources'][0]['url']
return play_url
return play_url, new_cookie
@trace_error_decorator
@@ -1358,6 +1359,7 @@ def get_flextv_stream_data(
headers['Cookie'] = cookies
user_id = url.split('/live')[0].rsplit('/', maxsplit=1)[-1]
result = {"anchor_name": '', "is_live": False}
new_cookies = None
try:
url2 = f'https://www.flextv.co.kr/channels/{user_id}'
html_str = get_req(url2, proxy_addr=proxy_addr, headers=headers, abroad=True)
@@ -1370,7 +1372,7 @@ def get_flextv_stream_data(
result["anchor_name"] = anchor_name
if live_status:
result['is_live'] = True
play_url = get_flextv_stream_url(
play_url, new_cookies = get_flextv_stream_url(
url=url, proxy_addr=proxy_addr, cookies=cookies, username=username, password=password)
if play_url:
result['m3u8_url'] = play_url
@@ -1378,6 +1380,7 @@ def get_flextv_stream_data(
abroad=True)
except Exception as e:
print('FlexTV直播间数据获取失败', e)
result['new_cookies'] = new_cookies
return result
@@ -1604,7 +1607,7 @@ def get_popkontv_stream_url(
anchor_name, room_info = get_popkontv_stream_data(
url, proxy_addr=proxy_addr, code=partner_code, username=username)
result = {"anchor_name": anchor_name, "is_live": False}
new_token = None
if room_info:
cast_start_date_code, cast_partner_code, mc_sign_id, cast_type, is_private = room_info
result["is_live"] = True
@@ -1644,8 +1647,8 @@ def get_popkontv_stream_url(
if new_access_token and len(new_access_token) == 640:
print('popkontv平台登录成功开始获取直播数据...')
headers['Authorization'] = f'Bearer {new_access_token}'
new_token = f'Bearer {new_access_token}'
json_str = fetch_data(headers, new_partner_code)
update_config('./config/config.ini', 'Authorization', 'popkontv_token', new_access_token)
else:
raise RuntimeError('popkontv登录失败请检查账号和密码是否正确')
json_data = json.loads(json_str)
@@ -1667,6 +1670,7 @@ def get_popkontv_stream_url(
result["record_url"] = m3u8_url
else:
raise RuntimeError('获取直播源失败,', status_msg)
result['new_token'] = new_token
return result
@@ -1756,7 +1760,7 @@ def get_twitcasting_stream_url(
return f'{anchor.group(1).strip()}-{anchor.group(2)}-{movie_id.group(1)}', status.group(1)
result = {"anchor_name": '', "is_live": False}
new_cookie = None
try:
to_login = get_params(url, "login")
if to_login == 'true':
@@ -1767,7 +1771,6 @@ def get_twitcasting_stream_url(
raise RuntimeError('TwitCasting登录失败,请检查配置文件中的账号密码是否正确')
print('TwitCasting 登录成功!开始获取数据...')
headers['Cookie'] = new_cookie
update_config('./config/config.ini', 'Cookie', 'twitcasting_cookie', new_cookie)
anchor_name, live_status = get_data(headers)
except AttributeError:
print('获取TwitCasting数据失败正在尝试登录...')
@@ -1777,7 +1780,6 @@ def get_twitcasting_stream_url(
raise RuntimeError('TwitCasting登录失败,请检查配置文件中的账号密码是否正确')
print('TwitCasting 登录成功!开始获取数据...')
headers['Cookie'] = new_cookie
update_config('./config/config.ini', 'Cookie', 'twitcasting_cookie', new_cookie)
anchor_name, live_status = get_data(headers)
result["anchor_name"] = anchor_name
@@ -1786,6 +1788,7 @@ def get_twitcasting_stream_url(
result['m3u8_url'] = play_url
result['record_url'] = play_url
result['is_live'] = True
result['new_cookies'] = new_cookie
return result
@@ -2320,7 +2323,7 @@ def get_acfun_stream_data(url: str, proxy_addr: Union[str, None] = None, cookies
json_data = json.loads(json_str)
videoPlayRes = json_data['data']['videoPlayRes']
play_url_list = json.loads(videoPlayRes)['liveAdaptiveManifest'][0]['adaptationSet']['representation']
play_url_list = sorted(play_url_list, key=lambda x: x['bitrate'], reverse=True)
play_url_list = sorted(play_url_list, key=itemgetter('bitrate'), reverse=True)
result['play_url_list'] = play_url_list
return result
@@ -2352,7 +2355,7 @@ def get_shiguang_stream_url(url: str, proxy_addr: Union[str, None] = None, cooki
def get_live_domain(page_url):
html_str = get_req(page_url, proxy_addr=proxy_addr, headers=headers)
config_json_str = re.findall("var config = (.*?)config.webskins",
html_str, re.S)[0].rsplit(";", maxsplit=1)[0].strip()
html_str, re.DOTALL)[0].rsplit(";", maxsplit=1)[0].strip()
config_json_data = json.loads(config_json_str)
stream_flv_domain = config_json_data['domainpullstream_flv']
stream_hls_domain = config_json_data['domainpullstream_hls']
@@ -2435,7 +2438,7 @@ def get_yinbo_stream_url(url: str, proxy_addr: Union[str, None] = None, cookies:
def get_live_domain(page_url):
html_str = get_req(page_url, proxy_addr=proxy_addr, headers=headers)
config_json_str = re.findall("var config = (.*?)config.webskins",
html_str, re.S)[0].rsplit(";", maxsplit=1)[0].strip()
html_str, re.DOTALL)[0].rsplit(";", maxsplit=1)[0].strip()
config_json_data = json.loads(config_json_str)
stream_flv_domain = config_json_data['domainpullstream_flv']
stream_hls_domain = config_json_data['domainpullstream_hls']
@@ -2512,89 +2515,4 @@ def get_chzzk_stream_data(url: str, proxy_addr: Union[str, None] = None, cookies
prefix = m3u8_url.split('?')[0].rsplit('/', maxsplit=1)[0]
m3u8_url_list = [prefix + '/' + i for i in m3u8_url_list]
result["play_url_list"] = m3u8_url_list
return result
if __name__ == '__main__':
# 尽量用自己的cookie以避免默认的不可用导致无法获取数据
# 以下示例链接不保证时效性,请自行查看链接是否能正常访问
room_url = "https://live.douyin.com/745964462470" # 抖音直播
# room_url = "https://www.tiktok.com/@pearlgaga88/live" # Tiktok直播
# room_url = "https://live.kuaishou.com/u/yall1102" # 快手直播
# room_url = 'https://www.huya.com/116' # 虎牙直播
# room_url = 'https://www.douyu.com/topic/wzDBLS6?rid=4921614&dyshid=' # 斗鱼直播
# room_url = 'https://www.douyu.com/3637778?dyshid'
# room_url = 'https://www.yy.com/22490906/22490906' # YY直播
# room_url = 'https://live.bilibili.com/21593109' # b站直播
# room_url = 'https://live.bilibili.com/23448867' # b站直播
# room_url = 'http://xhslink.com/O9f9fM' # 小红书直播
# room_url = 'https://www.bigo.tv/cn/716418802' # bigo直播
# room_url = 'https://slink.bigovideo.tv/uPvCVq' # bigo直播
# room_url = 'https://app.blued.cn/live?id=Mp6G2R' # blued直播
# room_url = 'https://play.afreecatv.com/sw7love' # afreecatv直播
# room_url = 'https://m.afreecatv.com/#/player/hl6260' # afreecatv直播
# room_url = 'https://play.afreecatv.com/secretx' # afreecatv直播
# room_url = 'https://cc.163.com/583946984' # 网易cc直播
# room_url = 'https://qiandurebo.com/web/video.php?roomnumber=33333' # 千度热播
# room_url = 'https://www.pandalive.co.kr/live/play/bara0109' # PandaTV
# room_url = 'https://fm.missevan.com/live/868895007' # 猫耳FM直播
# room_url = 'https://www.winktv.co.kr/live/play/anjer1004' # WinkTV
# room_url = 'https://www.flextv.co.kr/channels/593127/live' # FlexTV
# room_url = 'https://look.163.com/live?id=65108820&position=3' # Look直播
# room_url = 'https://www.popkontv.com/live/view?castId=wjfal007&partnerCode=P-00117' # popkontv
# room_url = 'https://www.popkontv.com/channel/notices?mcid=wjfal007&mcPartnerCode=P-00117' # popkontv
# room_url = 'https://twitcasting.tv/c:uonq' # TwitCasting
# room_url = 'https://live.baidu.com/m/media/pclive/pchome/live.html?room_id=9175031377&tab_category' # 百度直播
# room_url = 'https://weibo.com/u/7849520225' # 微博直播
# room_url = 'https://fanxing2.kugou.com/50428671?refer=2177&sourceFrom=' # 酷狗直播
# room_url = 'https://www.twitch.tv/gamerbee' # TwitchTV
# room_url = 'https://www.liveme.com/zh/v/17141937295821012854/index.html' # LiveMe
# room_url = 'https://www.huajiao.com/user/223184650' # 花椒直播
# room_url = 'https://www.7u66.com/100960' # 流星直播
# room_url = 'https://www.showroom-live.com/room/profile?room_id=511033' # showroom
# room_url = 'https://www.showroom-live.com/r/TPS0728' # showroom
# room_url = 'https://live.acfun.cn/live/17912421' # Acfun
# room_url = 'https://www.rengzu.com/180778' # 时光直播
# room_url = 'https://www.inke.cn/liveroom/index.html?uid=710032101&id=1720857535354099' # 映客直播
# room_url = 'https://live.ybw1666.com/800002949' # 音播直播
# room_url = 'https://www.zhihu.com/theater/114453' # 知乎直播
# room_url = 'https://chzzk.naver.com/live/458f6ec20b034f49e0fc6d03921646d2' # CHZZK
print(get_douyin_stream_data(room_url, proxy_addr=''))
# print(get_douyin_app_stream_data(room_url, proxy_addr=''))
# print(get_tiktok_stream_data(room_url, proxy_addr=''))
# print(get_kuaishou_stream_data2(room_url, proxy_addr=''))
# print(get_huya_stream_data(room_url, proxy_addr=''))
# print(get_huya_app_stream_url(room_url, proxy_addr=''))
# print(get_douyu_info_data(room_url, proxy_addr=''))
# print(get_douyu_stream_data("4921614", proxy_addr=''))
# print(get_yy_stream_data(room_url, proxy_addr=''))
# print(get_bilibili_stream_data(room_url, proxy_addr=''))
# print(get_xhs_stream_url(room_url, proxy_addr=''))
# print(get_bigo_stream_url(room_url, proxy_addr=''))
# print(get_blued_stream_url(room_url, proxy_addr=''))
# print(get_afreecatv_stream_data(room_url, proxy_addr='', username='', password=''))
# print(get_netease_stream_data(room_url, proxy_addr=''))
# print(get_qiandurebo_stream_data(room_url, proxy_addr=''))
# print(get_pandatv_stream_data(room_url, proxy_addr=''))
# print(get_maoerfm_stream_url(room_url, proxy_addr=''))
# print(get_winktv_stream_data(room_url, proxy_addr=''))
# print(get_flextv_stream_data(room_url,proxy_addr='', username='', password=''))
# print(get_looklive_stream_url(room_url, proxy_addr=''))
# print(get_popkontv_stream_url(room_url, proxy_addr='', username='', password=''))
# print(get_twitcasting_stream_url(room_url, proxy_addr='', username='', password=''))
# print(get_baidu_stream_data(room_url, proxy_addr=''))
# print(get_weibo_stream_data(room_url, proxy_addr=''))
# print(get_kugou_stream_url(room_url, proxy_addr=''))
# print(get_twitchtv_stream_data(room_url, proxy_addr=''))
# print(get_liveme_stream_url(room_url, proxy_addr=''))
# print(get_huajiao_stream_url(room_url, proxy_addr=''))
# print(get_liuxing_stream_url(room_url, proxy_addr=''))
# print(get_showroom_stream_data(room_url, proxy_addr=''))
# print(get_acfun_stream_data(room_url, proxy_addr=''))
# print(get_shiguang_stream_url(room_url, proxy_addr=''))
# print(get_yingke_stream_url(room_url, proxy_addr=''))
# print(get_yinbo_stream_url(room_url, proxy_addr=''))
# print(get_zhihu_stream_url(room_url, proxy_addr=''))
# print(get_chzzk_stream_data(room_url, proxy_addr=''))
return result

View File

@@ -0,0 +1,355 @@
# -*- encoding: utf-8 -*-
"""
Author: Hmily
GitHub: https://github.com/ihmily
Date: 2023-07-15 23:15:00
Update: 2024-10-02 04:36:12
Copyright (c) 2023-2024 by Hmily, All Rights Reserved.
Function: Get live stream data.
"""
from typing import Dict, Any, Union
import base64
import hashlib
import json
import time
import random
import re
from operator import itemgetter
import urllib.parse
import urllib.request
from .utils import trace_error_decorator
from .spider import (
get_douyu_stream_data, get_bilibili_stream_data
)
@trace_error_decorator
def get_douyin_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
anchor_name = json_data.get('anchor_name', None)
result = {
"anchor_name": anchor_name,
"is_live": False,
}
status = json_data.get("status", 4) # 直播状态 2 是正在直播、4 是未开播
if status == 2:
stream_url = json_data['stream_url']
flv_url_dict = stream_url['flv_pull_url']
flv_url_list = list(flv_url_dict.values())
m3u8_url_dict = stream_url['hls_pull_url_map']
m3u8_url_list = list(m3u8_url_dict.values())
while len(flv_url_list) < 5:
flv_url_list.append(flv_url_list[-1])
m3u8_url_list.append(m3u8_url_list[-1])
video_qualities = {"原画": 0, "蓝光": 0, "超清": 1, "高清": 2, "标清": 3, "流畅": 4}
quality_index = video_qualities.get(video_quality)
m3u8_url = m3u8_url_list[quality_index]
flv_url = flv_url_list[quality_index]
result['m3u8_url'] = m3u8_url
result['flv_url'] = flv_url
result['is_live'] = True
result['record_url'] = m3u8_url
return result
@trace_error_decorator
def get_tiktok_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
if not json_data:
return {"anchor_name": None, "is_live": False}
def get_video_quality_url(stream, q_key) -> list[dict[str, int | Any]]:
play_list = []
for key in stream:
url_info = stream[key]['main']
play_url = url_info[q_key]
sdk_params = url_info['sdk_params']
sdk_params = json.loads(sdk_params)
vbitrate = int(sdk_params['vbitrate'])
resolution = sdk_params['resolution']
if vbitrate != 0 and resolution:
width, height = map(int, resolution.split('x'))
play_list.append({'url': play_url, 'vbitrate': vbitrate, 'resolution': (width, height)})
play_list.sort(key=itemgetter('vbitrate'), reverse=True)
play_list.sort(key=lambda x: (-x['vbitrate'], -x['resolution'][0], -x['resolution'][1]))
return play_list
live_room = json_data['LiveRoom']['liveRoomUserInfo']
user = live_room['user']
anchor_name = f"{user['nickname']}-{user['uniqueId']}"
status = user.get("status", 4)
result = {
"anchor_name": anchor_name,
"is_live": False,
}
if status == 2:
stream_data = live_room['liveRoom']['streamData']['pull_data']['stream_data']
stream_data = json.loads(stream_data).get('data', {})
flv_url_list = get_video_quality_url(stream_data, 'flv')
m3u8_url_list = get_video_quality_url(stream_data, 'hls')
while len(flv_url_list) < 5:
flv_url_list.append(flv_url_list[-1])
while len(m3u8_url_list) < 5:
m3u8_url_list.append(m3u8_url_list[-1])
video_qualities = {"原画": 0, "蓝光": 0, "超清": 1, "高清": 2, "标清": 3, '流畅': 4}
quality_index = video_qualities.get(video_quality)
result['flv_url'] = flv_url_list[quality_index]['url']
result['m3u8_url'] = m3u8_url_list[quality_index]['url']
result['is_live'] = True
result['record_url'] = flv_url_list[quality_index]['url'].replace("https://", "http://")
return result
@trace_error_decorator
def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
if json_data['type'] == 1 and not json_data["is_live"]:
return json_data
live_status = json_data['is_live']
result = {
"type": 2,
"anchor_name": json_data['anchor_name'],
"is_live": live_status,
}
if live_status:
quality_mapping = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4}
if video_quality in quality_mapping:
quality_index = quality_mapping[video_quality]
if 'm3u8_url_list' in json_data:
m3u8_url_list = json_data['m3u8_url_list'][::-1]
while len(m3u8_url_list) < 5:
m3u8_url_list.append(m3u8_url_list[-1])
m3u8_url = m3u8_url_list[quality_index]['url']
result['m3u8_url'] = m3u8_url
if 'flv_url_list' in json_data:
flv_url_list = json_data['flv_url_list'][::-1]
while len(flv_url_list) < 5:
flv_url_list.append(flv_url_list[-1])
flv_url = flv_url_list[quality_index]['url']
result['flv_url'] = flv_url
result['record_url'] = flv_url
result['is_live'] = True
return result
@trace_error_decorator
def get_huya_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
game_live_info = json_data['data'][0]['gameLiveInfo']
stream_info_list = json_data['data'][0]['gameStreamInfoList']
anchor_name = game_live_info.get('nick', '')
result = {
"anchor_name": anchor_name,
"is_live": False,
}
if stream_info_list:
select_cdn = stream_info_list[0]
flv_url = select_cdn.get('sFlvUrl')
stream_name = select_cdn.get('sStreamName')
flv_url_suffix = select_cdn.get('sFlvUrlSuffix')
hls_url = select_cdn.get('sHlsUrl')
hls_url_suffix = select_cdn.get('sHlsUrlSuffix')
flv_anti_code = select_cdn.get('sFlvAntiCode')
def get_anti_code(old_anti_code: str) -> str:
# js地址https://hd.huya.com/cdn_libs/mobile/hysdk-m-202402211431.js
params_t = 100
sdk_version = 2403051612
# sdk_id是13位数毫秒级时间戳
t13 = int(time.time()) * 1000
sdk_sid = t13
# 计算uuid和uid参数值
init_uuid = (int(t13 % 10 ** 10 * 1000) + int(1000 * random.random())) % 4294967295 # 直接初始化
uid = random.randint(1400000000000, 1400009999999) # 经过测试uid也可以使用init_uuid代替
seq_id = uid + sdk_sid # 移动端请求的直播流地址中包含seqId参数
# 计算ws_time参数值(16进制) 可以是当前毫秒时间戳当然也可以直接使用url_query['wsTime'][0]
# 原始最大误差不得慢240000毫秒
target_unix_time = (t13 + 110624) // 1000
ws_time = f"{target_unix_time:x}".lower()
# fm参数值是经过url编码然后base64编码得到的解码结果类似 DWq8BcJ3h6DJt6TY_$0_$1_$2_$3
# 具体细节在上面js中查看大概在32657行代码开始有base64混淆代码请自行替换
url_query = urllib.parse.parse_qs(old_anti_code)
ws_secret_pf = base64.b64decode(urllib.parse.unquote(url_query['fm'][0]).encode()).decode().split("_")[0]
ws_secret_hash = hashlib.md5(f'{seq_id}|{url_query["ctype"][0]}|{params_t}'.encode()).hexdigest()
ws_secret = f'{ws_secret_pf}_{uid}_{stream_name}_{ws_secret_hash}_{ws_time}'
ws_secret_md5 = hashlib.md5(ws_secret.encode()).hexdigest()
anti_code = (
f'wsSecret={ws_secret_md5}&wsTime={ws_time}&seqid={seq_id}&ctype={url_query["ctype"][0]}&ver=1'
f'&fs={url_query["fs"][0]}&uuid={init_uuid}&u={uid}&t={params_t}&sv={sdk_version}'
f'&sdk_sid={sdk_sid}&codec=264'
)
return anti_code
new_anti_code = get_anti_code(flv_anti_code)
flv_url = f'{flv_url}/{stream_name}.{flv_url_suffix}?{new_anti_code}&ratio='
m3u8_url = f'{hls_url}/{stream_name}.{hls_url_suffix}?{new_anti_code}&ratio='
quality_list = flv_anti_code.split('&exsphd=')
if len(quality_list) > 1 and video_quality not in ["原画", "蓝光"]:
pattern = r"(?<=264_)\d+"
quality_list = list(re.findall(pattern, quality_list[1]))[::-1]
while len(quality_list) < 5:
quality_list.append(quality_list[-1])
video_quality_options = {
"超清": quality_list[0],
"高清": quality_list[1],
"标清": quality_list[2],
"流畅": quality_list[3]
}
if video_quality not in video_quality_options:
raise ValueError(
f"Invalid video quality. Available options are: {', '.join(video_quality_options.keys())}")
flv_url = flv_url + str(video_quality_options[video_quality])
m3u8_url = m3u8_url + str(video_quality_options[video_quality])
result['flv_url'] = flv_url
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['record_url'] = flv_url
return result
@trace_error_decorator
def get_douyu_stream_url(json_data: dict, video_quality: str, cookies: str, proxy_addr: str) -> Dict[str, Any]:
if not json_data["is_live"]:
return json_data
video_quality_options = {
"原画": '0',
"蓝光": '0',
"超清": '3',
"高清": '2',
"标清": '1',
"流畅": '1'
}
rid = str(json_data["room_id"])
json_data.pop("room_id", None)
rate = video_quality_options.get(video_quality, '0')
flv_data = get_douyu_stream_data(rid, rate, cookies=cookies, proxy_addr=proxy_addr)
rtmp_url = flv_data['data'].get('rtmp_url', None)
rtmp_live = flv_data['data'].get('rtmp_live', None)
if rtmp_live:
flv_url = f'{rtmp_url}/{rtmp_live}'
json_data['flv_url'] = flv_url
json_data['record_url'] = flv_url
return json_data
@trace_error_decorator
def get_yy_stream_url(json_data: dict) -> Dict[str, Any]:
anchor_name = json_data.get('anchor_name', '')
result = {
"anchor_name": anchor_name,
"is_live": False,
}
if 'avp_info_res' in json_data:
stream_line_addr = json_data['avp_info_res']['stream_line_addr']
cdn_info = list(stream_line_addr.values())[0]
flv_url = cdn_info['cdn_info']['url']
result['flv_url'] = flv_url
result['is_live'] = True
result['record_url'] = flv_url
return result
@trace_error_decorator
def get_bilibili_stream_url(json_data: dict, video_quality: str, proxy_addr: str, cookies: str) -> Dict[str, Any]:
anchor_name = json_data["anchor_name"]
if not json_data["live_status"]:
return {
"anchor_name": anchor_name,
"is_live": False
}
room_url = json_data['room_url']
video_quality_options = {
"原画": '10000',
"蓝光": '400',
"超清": '250',
"高清": '150',
"标清": '80',
"流畅": '80'
}
select_quality = video_quality_options[video_quality]
play_url = get_bilibili_stream_data(
room_url, qn=select_quality, platform='web', proxy_addr=proxy_addr, cookies=cookies)
return {
'anchor_name': json_data['anchor_name'],
'is_live': True,
'record_url': play_url
}
@trace_error_decorator
def get_netease_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
if not json_data['is_live']:
return json_data
stream_list = json_data['stream_list']['resolution']
order = ['blueray', 'ultra', 'high', 'standard']
sorted_keys = [key for key in order if key in stream_list]
while len(sorted_keys) < 5:
sorted_keys.append(sorted_keys[-1])
quality_list = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4}
selected_quality = sorted_keys[quality_list[video_quality]]
flv_url_list = stream_list[selected_quality]['cdn']
selected_cdn = list(flv_url_list.keys())[0]
flv_url = flv_url_list[selected_cdn]
return {
"is_live": True,
"anchor_name": json_data['anchor_name'],
"flv_url": flv_url,
"record_url": flv_url
}
def get_stream_url(json_data: dict, video_quality: str, url_type: str = 'm3u8', spec: bool = False,
extra_key: Union[str, int] = None) -> Dict[str, Any]:
if not json_data['is_live']:
return json_data
play_url_list = json_data['play_url_list']
quality_list = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4}
while len(play_url_list) < 5:
play_url_list.append(play_url_list[-1])
selected_quality = quality_list[video_quality]
data = {
"anchor_name": json_data['anchor_name'],
"is_live": True
}
if url_type == 'm3u8':
m3u8_url = play_url_list[selected_quality][extra_key] if extra_key else play_url_list[selected_quality]
data["m3u8_url"] = json_data['m3u8_url'] if spec else m3u8_url
data["record_url"] = m3u8_url
else:
flv = play_url_list[selected_quality][extra_key] if extra_key else play_url_list[selected_quality]
data["flv_url"] = flv
data["record_url"] = flv
return data

View File

@@ -4,13 +4,15 @@ import os
import functools
import hashlib
import traceback
from logger import logger
from typing import Union, Any
from .logger import logger
import configparser
def trace_error_decorator(func):
def trace_error_decorator(func: callable) -> callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
def wrapper(*args: list, **kwargs: dict) -> Any:
try:
return func(*args, **kwargs)
except Exception as e:
@@ -22,32 +24,18 @@ def trace_error_decorator(func):
return wrapper
def check_md5(file_path):
"""
计算文件的md5值
"""
def check_md5(file_path: str) -> str:
with open(file_path, 'rb') as fp:
file_md5 = hashlib.md5(fp.read()).hexdigest()
return file_md5
def dict_to_cookie_str(cookies_dict):
def dict_to_cookie_str(cookies_dict) -> str:
cookie_str = '; '.join([f"{key}={value}" for key, value in cookies_dict.items()])
return cookie_str
def read_config_value(file_path, section, key):
"""
从配置文件中读取指定键的值
参数:
- file_path: 配置文件的路径
- section: 部分名称
- key: 键名称
返回:
- 键的值如果部分或键不存在则返回None
"""
def read_config_value(file_path, section, key) -> Union[str, None]:
config = configparser.ConfigParser()
try:
@@ -67,16 +55,8 @@ def read_config_value(file_path, section, key):
return None
def update_config(file_path, section, key, new_value):
"""
更新配置文件中的键值
def update_config(file_path, section, key, new_value) -> None:
参数:
- file_path: 配置文件的路径
- section: 要更新的部分名称
- key: 要更新的键名称
- new_value: 新的键值
"""
config = configparser.ConfigParser()
try:
@@ -101,7 +81,7 @@ def update_config(file_path, section, key, new_value):
print(f"写入配置文件时出错: {e}")
def get_file_paths(directory):
def get_file_paths(directory) -> list:
file_paths = []
for root, dirs, files in os.walk(directory):
for file in files:

View File

@@ -11,22 +11,25 @@ import json
import re
import urllib.parse
from typing import Union
import execjs # pip install PyExecJS
import execjs
import requests
import urllib.request
no_proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(no_proxy_handler)
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; SAMSUNG SM-G973U) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/14.2 Chrome/87.0.4280.141 Mobile Safari/537.36',
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; SAMSUNG SM-G973U) AppleWebKit/537.36 (KHTML, like Gecko) '
'SamsungBrowser/14.2 Chrome/87.0.4280.141 Mobile Safari/537.36',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Cookie': 's_v_web_id=verify_lk07kv74_QZYCUApD_xhiB_405x_Ax51_GYO9bUIyZQVf'
}
# X-bogus算法
def get_xbogus(url: str) -> str:
def get_xbogus(url: str, headers: Union[dict, None] = None) -> str:
if not headers or "User-Agent" not in headers and "user-agent" not in headers:
headers = HEADERS
query = urllib.parse.urlparse(url).query
xbogus = execjs.compile(open('./x-bogus.js').read()).call('sign', query, headers["User-Agent"])
# print(xbogus)
@@ -34,7 +37,10 @@ def get_xbogus(url: str) -> str:
# 获取房间ID和用户secID
def get_sec_user_id(url: str, proxy_addr: Union[str, None] = None):
def get_sec_user_id(url: str, proxy_addr: Union[str, None] = None, headers: Union[dict, None] = None):
if not headers or "User-Agent" not in headers and "user-agent" not in headers:
headers = HEADERS
if proxy_addr:
proxies = {
'http': proxy_addr,
@@ -44,26 +50,41 @@ def get_sec_user_id(url: str, proxy_addr: Union[str, None] = None):
else:
response = opener.open(url, timeout=15)
redirect_url = response.url
sec_user_id = re.search(r'sec_user_id=([\w\d_\-]+)&', redirect_url).group(1)
sec_user_id = re.search(r'sec_user_id=([\w_\-]+)&', redirect_url).group(1)
room_id = redirect_url.split('?')[0].rsplit('/', maxsplit=1)[1]
return room_id, sec_user_id
# 获取直播间webID
def get_live_room_id(room_id: str, sec_user_id: str, proxy_addr: Union[str, None] = None) -> str:
url = f'https://webcast.amemv.com/webcast/room/reflow/info/?verifyFp=verify_lk07kv74_QZYCUApD_xhiB_405x_Ax51_GYO9bUIyZQVf&type_id=0&live_id=1&room_id={room_id}&sec_user_id={sec_user_id}&app_id=1128&msToken=wrqzbEaTlsxt52-vxyZo_mIoL0RjNi1ZdDe7gzEGMUTVh_HvmbLLkQrA_1HKVOa2C6gkxb6IiY6TY2z8enAkPEwGq--gM-me3Yudck2ailla5Q4osnYIHxd9dI4WtQ=='
xbogus = get_xbogus(url) # 获取X-Bogus算法
url = url + "&X-Bogus=" + xbogus
def get_live_room_id(room_id: str, sec_user_id: str, proxy_addr: Union[str, None] = None,
params: Union[dict, None] = None, headers: Union[dict, None] = None) -> str:
if not headers or "User-Agent" not in headers and "user-agent" not in headers:
headers = HEADERS
if not params:
params = {
"verifyFp": "verify_lk07kv74_QZYCUApD_xhiB_405x_Ax51_GYO9bUIyZQVf",
"type_id": "0",
"live_id": "1",
"room_id": room_id,
"sec_user_id": sec_user_id,
"app_id": "1128",
"msToken": "wrqzbEaTlsxt52-vxyZo_mIoL0RjNi1ZdDe7gzEGMUTVh_HvmbLLkQrA_1HKVOa2C6gkxb6IiY6TY2z8enAkPEwGq--gM"
"-me3Yudck2ailla5Q4osnYIHxd9dI4WtQ==",
}
api = f'https://webcast.amemv.com/webcast/room/reflow/info/?{urllib.parse.urlencode(params)}'
xbogus = get_xbogus(api)
api = api + "&X-Bogus=" + xbogus
if proxy_addr:
proxies = {
'http': proxy_addr,
'https': proxy_addr
}
response = requests.get(url, headers=headers, proxies=proxies, timeout=15)
response = requests.get(api, headers=headers, proxies=proxies, timeout=15)
json_str = response.text
else:
req = urllib.request.Request(url, headers=headers)
req = urllib.request.Request(api, headers=headers)
response = opener.open(req, timeout=15)
json_str = response.read().decode('utf-8')
json_data = json.loads(json_str)
@@ -71,9 +92,9 @@ def get_live_room_id(room_id: str, sec_user_id: str, proxy_addr: Union[str, None
if __name__ == '__main__':
url = "https://v.douyin.com/iQLgKSj/"
room_url = "https://v.douyin.com/iQLgKSj/"
# url="https://v.douyin.com/iQFeBnt/"
# url="https://v.douyin.com/iehvKttp/"
room_id, sec_user_id = get_sec_user_id(url)
web_rid = get_live_room_id(room_id, sec_user_id)
print(web_rid)
_room_id, sec_uid = get_sec_user_id(room_url)
web_rid = get_live_room_id(_room_id, sec_uid)
print("return web_rid:", web_rid)

556
main.py
View File

@@ -4,7 +4,7 @@
Author: Hmily
GitHub: https://github.com/ihmily
Date: 2023-07-17 23:52:05
Update: 2024-09-25 11:33:00
Update: 2024-10-02 06:15:00
Copyright (c) 2023-2024 by Hmily, All Rights Reserved.
Function: Record live stream video.
"""
@@ -16,64 +16,22 @@ import signal
import threading
import time
import datetime
import json
import re
import shutil
import random
import base64
import hashlib
import urllib.parse
import urllib.request
from urllib.error import URLError, HTTPError
from typing import Any, Union, Dict
from typing import Any, Union
import configparser
from spider import (
get_douyin_stream_data,
get_douyin_app_stream_data,
get_tiktok_stream_data,
get_kuaishou_stream_data,
get_huya_stream_data,
get_douyu_info_data,
get_douyu_stream_data,
get_yy_stream_data,
get_bilibili_stream_data,
get_bilibili_room_info,
get_xhs_stream_url,
get_bigo_stream_url,
get_blued_stream_url,
get_afreecatv_stream_data,
get_netease_stream_data,
get_qiandurebo_stream_data,
get_pandatv_stream_data,
get_maoerfm_stream_url,
get_winktv_stream_data,
get_flextv_stream_data,
get_looklive_stream_url,
get_popkontv_stream_url,
get_twitcasting_stream_url,
get_baidu_stream_data,
get_weibo_stream_data,
get_kugou_stream_url,
get_twitchtv_stream_data,
get_liveme_stream_url,
get_huajiao_stream_url,
get_liuxing_stream_url,
get_showroom_stream_data,
get_acfun_stream_data,
get_huya_app_stream_url,
get_shiguang_stream_url,
get_yinbo_stream_url,
get_yingke_stream_url,
get_zhihu_stream_url,
get_chzzk_stream_data
from douyinliverecorder import spider
from douyinliverecorder import stream
from douyinliverecorder.utils import (
logger, check_md5, update_config, get_file_paths
)
from utils import (
logger, check_md5,
trace_error_decorator, get_file_paths
from douyinliverecorder.msg_push import (
dingtalk, xizhi, tg_bot, email_message, bark
)
from msg_push import dingtalk, xizhi, tg_bot, email_message, bark
version = "v3.0.8"
platforms = ("\n国内站点:抖音|快手|虎牙|斗鱼|YY|B站|小红书|bigo|blued|网易CC|千度热播|猫耳FM|Look|TwitCasting|百度|微博|"
@@ -262,9 +220,7 @@ def create_ass_file(file_gruop: list):
def change_max_connect():
global max_request
global warning_count
global pre_max_request
global max_request, warning_count, pre_max_request
preset = max_request
start_time = time.time()
pre_max_request = max_request
@@ -296,336 +252,9 @@ def change_max_connect():
print("同一时间访问网络的线程数动态改为", max_request)
@trace_error_decorator
def get_douyin_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
anchor_name = json_data.get('anchor_name', None)
result = {
"anchor_name": anchor_name,
"is_live": False,
}
status = json_data.get("status", 4) # 直播状态 2 是正在直播、4 是未开播
if status == 2:
stream_url = json_data['stream_url']
flv_url_dict = stream_url['flv_pull_url']
flv_url_list = list(flv_url_dict.values())
m3u8_url_dict = stream_url['hls_pull_url_map']
m3u8_url_list = list(m3u8_url_dict.values())
while len(flv_url_list) < 5:
flv_url_list.append(flv_url_list[-1])
m3u8_url_list.append(m3u8_url_list[-1])
video_qualities = {"原画": 0, "蓝光": 0, "超清": 1, "高清": 2, "标清": 3, "流畅": 4}
quality_index = video_qualities.get(video_quality)
m3u8_url = m3u8_url_list[quality_index]
flv_url = flv_url_list[quality_index]
result['m3u8_url'] = m3u8_url
result['flv_url'] = flv_url
result['is_live'] = True
result['record_url'] = m3u8_url
return result
@trace_error_decorator
def get_tiktok_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
if not json_data:
return {"anchor_name": None, "is_live": False}
def get_video_quality_url(stream, q_key) -> list[dict[str, int | Any]]:
play_list = []
for key in stream:
url_info = stream[key]['main']
play_url = url_info[q_key]
sdk_params = url_info['sdk_params']
sdk_params = json.loads(sdk_params)
vbitrate = int(sdk_params['vbitrate'])
resolution = sdk_params['resolution']
if vbitrate != 0 and resolution:
width, height = map(int, resolution.split('x'))
play_list.append({'url': play_url, 'vbitrate': vbitrate, 'resolution': (width, height)})
play_list.sort(key=lambda x: x['vbitrate'], reverse=True)
play_list.sort(key=lambda x: (-x['vbitrate'], -x['resolution'][0], -x['resolution'][1]))
return play_list
live_room = json_data['LiveRoom']['liveRoomUserInfo']
user = live_room['user']
anchor_name = f"{user['nickname']}-{user['uniqueId']}"
status = user.get("status", 4)
result = {
"anchor_name": anchor_name,
"is_live": False,
}
if status == 2:
stream_data = live_room['liveRoom']['streamData']['pull_data']['stream_data']
stream_data = json.loads(stream_data).get('data', {})
flv_url_list = get_video_quality_url(stream_data, 'flv')
m3u8_url_list = get_video_quality_url(stream_data, 'hls')
while len(flv_url_list) < 5:
flv_url_list.append(flv_url_list[-1])
while len(m3u8_url_list) < 5:
m3u8_url_list.append(m3u8_url_list[-1])
video_qualities = {"原画": 0, "蓝光": 0, "超清": 1, "高清": 2, "标清": 3, '流畅': 4}
quality_index = video_qualities.get(video_quality)
result['flv_url'] = flv_url_list[quality_index]['url']
result['m3u8_url'] = m3u8_url_list[quality_index]['url']
result['is_live'] = True
result['record_url'] = flv_url_list[quality_index]['url'].replace("https://", "http://")
return result
@trace_error_decorator
def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
if json_data['type'] == 1 and not json_data["is_live"]:
return json_data
live_status = json_data['is_live']
result = {
"type": 2,
"anchor_name": json_data['anchor_name'],
"is_live": live_status,
}
if live_status:
quality_mapping = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4}
if video_quality in quality_mapping:
quality_index = quality_mapping[video_quality]
if 'm3u8_url_list' in json_data:
m3u8_url_list = json_data['m3u8_url_list'][::-1]
while len(m3u8_url_list) < 5:
m3u8_url_list.append(m3u8_url_list[-1])
m3u8_url = m3u8_url_list[quality_index]['url']
result['m3u8_url'] = m3u8_url
if 'flv_url_list' in json_data:
flv_url_list = json_data['flv_url_list'][::-1]
while len(flv_url_list) < 5:
flv_url_list.append(flv_url_list[-1])
flv_url = flv_url_list[quality_index]['url']
result['flv_url'] = flv_url
result['record_url'] = flv_url
result['is_live'] = True
return result
@trace_error_decorator
def get_huya_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
game_live_info = json_data['data'][0]['gameLiveInfo']
stream_info_list = json_data['data'][0]['gameStreamInfoList']
anchor_name = game_live_info.get('nick', '')
result = {
"anchor_name": anchor_name,
"is_live": False,
}
if stream_info_list:
select_cdn = stream_info_list[0]
flv_url = select_cdn.get('sFlvUrl')
stream_name = select_cdn.get('sStreamName')
flv_url_suffix = select_cdn.get('sFlvUrlSuffix')
hls_url = select_cdn.get('sHlsUrl')
hls_url_suffix = select_cdn.get('sHlsUrlSuffix')
flv_anti_code = select_cdn.get('sFlvAntiCode')
def get_anti_code(old_anti_code: str) -> str:
# js地址https://hd.huya.com/cdn_libs/mobile/hysdk-m-202402211431.js
params_t = 100
sdk_version = 2403051612
# sdk_id是13位数毫秒级时间戳
t13 = int(time.time()) * 1000
sdk_sid = t13
# 计算uuid和uid参数值
init_uuid = (int(t13 % 10 ** 10 * 1000) + int(1000 * random.random())) % 4294967295 # 直接初始化
uid = random.randint(1400000000000, 1400009999999) # 经过测试uid也可以使用init_uuid代替
seq_id = uid + sdk_sid # 移动端请求的直播流地址中包含seqId参数
# 计算ws_time参数值(16进制) 可以是当前毫秒时间戳当然也可以直接使用url_query['wsTime'][0]
# 原始最大误差不得慢240000毫秒
target_unix_time = (t13 + 110624) // 1000
ws_time = hex(target_unix_time)[2:].lower()
# fm参数值是经过url编码然后base64编码得到的解码结果类似 DWq8BcJ3h6DJt6TY_$0_$1_$2_$3
# 具体细节在上面js中查看大概在32657行代码开始有base64混淆代码请自行替换
url_query = urllib.parse.parse_qs(old_anti_code)
ws_secret_pf = base64.b64decode(urllib.parse.unquote(url_query['fm'][0]).encode()).decode().split("_")[0]
ws_secret_hash = hashlib.md5(f'{seq_id}|{url_query["ctype"][0]}|{params_t}'.encode()).hexdigest()
ws_secret = f'{ws_secret_pf}_{uid}_{stream_name}_{ws_secret_hash}_{ws_time}'
ws_secret_md5 = hashlib.md5(ws_secret.encode()).hexdigest()
anti_code = (
f'wsSecret={ws_secret_md5}&wsTime={ws_time}&seqid={seq_id}&ctype={url_query["ctype"][0]}&ver=1'
f'&fs={url_query["fs"][0]}&uuid={init_uuid}&u={uid}&t={params_t}&sv={sdk_version}'
f'&sdk_sid={sdk_sid}&codec=264'
)
return anti_code
new_anti_code = get_anti_code(flv_anti_code)
flv_url = f'{flv_url}/{stream_name}.{flv_url_suffix}?{new_anti_code}&ratio='
m3u8_url = f'{hls_url}/{stream_name}.{hls_url_suffix}?{new_anti_code}&ratio='
quality_list = flv_anti_code.split('&exsphd=')
if len(quality_list) > 1 and video_quality not in ["原画", "蓝光"]:
pattern = r"(?<=264_)\d+"
quality_list = [x for x in re.findall(pattern, quality_list[1])][::-1]
while len(quality_list) < 5:
quality_list.append(quality_list[-1])
video_quality_options = {
"超清": quality_list[0],
"高清": quality_list[1],
"标清": quality_list[2],
"流畅": quality_list[3]
}
if video_quality not in video_quality_options:
raise ValueError(
f"Invalid video quality. Available options are: {', '.join(video_quality_options.keys())}")
flv_url = flv_url + str(video_quality_options[video_quality])
m3u8_url = m3u8_url + str(video_quality_options[video_quality])
result['flv_url'] = flv_url
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['record_url'] = flv_url
return result
@trace_error_decorator
def get_douyu_stream_url(json_data: dict, cookies: str, video_quality: str, proxy_address: str) -> Dict[str, Any]:
if not json_data["is_live"]:
return json_data
video_quality_options = {
"原画": '0',
"蓝光": '0',
"超清": '3',
"高清": '2',
"标清": '1',
"流畅": '1'
}
rid = str(json_data["room_id"])
json_data.pop("room_id", None)
rate = video_quality_options.get(video_quality, '0')
flv_data = get_douyu_stream_data(rid, rate, cookies=cookies, proxy_addr=proxy_address)
rtmp_url = flv_data['data'].get('rtmp_url', None)
rtmp_live = flv_data['data'].get('rtmp_live', None)
if rtmp_live:
flv_url = f'{rtmp_url}/{rtmp_live}'
json_data['flv_url'] = flv_url
json_data['record_url'] = flv_url
return json_data
@trace_error_decorator
def get_yy_stream_url(json_data: dict) -> Dict[str, Any]:
anchor_name = json_data.get('anchor_name', '')
result = {
"anchor_name": anchor_name,
"is_live": False,
}
if 'avp_info_res' in json_data:
stream_line_addr = json_data['avp_info_res']['stream_line_addr']
cdn_info = list(stream_line_addr.values())[0]
flv_url = cdn_info['cdn_info']['url']
result['flv_url'] = flv_url
result['is_live'] = True
result['record_url'] = flv_url
return result
@trace_error_decorator
def get_bilibili_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
anchor_name = json_data["anchor_name"]
if not json_data["live_status"]:
return {
"anchor_name": anchor_name,
"is_live": False
}
room_url = json_data['room_url']
video_quality_options = {
"原画": '10000',
"蓝光": '400',
"超清": '250',
"高清": '150',
"标清": '80',
"流畅": '80'
}
select_quality = video_quality_options[video_quality]
play_url = get_bilibili_stream_data(room_url, qn=select_quality, platform='web', proxy_addr=proxy_addr,
cookies=bili_cookie)
return {
'anchor_name': json_data['anchor_name'],
'is_live': True,
'record_url': play_url
}
@trace_error_decorator
def get_netease_stream_url(json_data: dict, video_quality: str) -> Dict[str, Any]:
if not json_data['is_live']:
return json_data
stream_list = json_data['stream_list']['resolution']
order = ['blueray', 'ultra', 'high', 'standard']
sorted_keys = [key for key in order if key in stream_list]
while len(sorted_keys) < 5:
sorted_keys.append(sorted_keys[-1])
quality_list = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4}
selected_quality = sorted_keys[quality_list[video_quality]]
flv_url_list = stream_list[selected_quality]['cdn']
selected_cdn = list(flv_url_list.keys())[0]
flv_url = flv_url_list[selected_cdn]
return {
"is_live": True,
"anchor_name": json_data['anchor_name'],
"flv_url": flv_url,
"record_url": flv_url
}
def get_stream_url(json_data: dict, video_quality: str, url_type: str = 'm3u8', spec: bool = False,
extra_key: Union[str, int] = None) -> Dict[str, Any]:
if not json_data['is_live']:
return json_data
play_url_list = json_data['play_url_list']
quality_list = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4}
while len(play_url_list) < 5:
play_url_list.append(play_url_list[-1])
selected_quality = quality_list[video_quality]
data = {
"anchor_name": json_data['anchor_name'],
"is_live": True
}
if url_type == 'm3u8':
m3u8_url = play_url_list[selected_quality][extra_key] if extra_key else play_url_list[selected_quality]
data["m3u8_url"] = json_data['m3u8_url'] if spec else m3u8_url
data["record_url"] = m3u8_url
else:
flv = play_url_list[selected_quality][extra_key] if extra_key else play_url_list[selected_quality]
data["flv_url"] = flv
data["record_url"] = flv
return data
def push_message(content: str) -> Union[str, list]:
@@ -651,8 +280,7 @@ def push_message(content: str) -> Union[str, list]:
def clear_record_info(record_name, record_url):
global monitoring
if record_name in recording:
recording.remove(record_name)
recording.discard(record_name)
if record_url in url_comments and record_url in running_list:
running_list.remove(record_url)
monitoring -= 1
@@ -684,8 +312,7 @@ def check_subprocess(record_name: str, record_url: str, ffmpeg_command: list) ->
def start_record(url_data: tuple, count_variable: int = -1):
global warning_count
global video_save_path
global warning_count, video_save_path
start_pushed = False
while True:
@@ -699,6 +326,7 @@ def start_record(url_data: tuple, count_variable: int = -1):
retry = 0
record_quality, record_url, anchor_name = url_data
proxy_address = proxy_addr
platform = '未知平台'
if proxy_addr:
proxy_address = None
@@ -711,7 +339,7 @@ def start_record(url_data: tuple, count_variable: int = -1):
if extra_enable_proxy_platform_list:
for pt in extra_enable_proxy_platform_list:
if pt and pt.strip() in url:
proxy_address = proxy_addr_bak if proxy_addr_bak else None
proxy_address = proxy_addr_bak or None
# print(f'\r代理地址:{proxy_address}')
# print(f'\r全局代理:{global_proxy}')
@@ -722,49 +350,49 @@ def start_record(url_data: tuple, count_variable: int = -1):
platform = '抖音直播'
with semaphore:
if 'v.douyin.com' not in record_url:
json_data = get_douyin_stream_data(
json_data = spider.get_douyin_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=dy_cookie)
else:
json_data = get_douyin_app_stream_data(
json_data = spider.get_douyin_app_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=dy_cookie)
port_info = get_douyin_stream_url(json_data, record_quality)
port_info = stream.get_douyin_stream_url(json_data, record_quality)
elif record_url.find("https://www.tiktok.com/") > -1:
platform = 'TikTok直播'
with semaphore:
if global_proxy or proxy_address:
json_data = get_tiktok_stream_data(
json_data = spider.get_tiktok_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=tiktok_cookie)
port_info = get_tiktok_stream_url(json_data, record_quality)
port_info = stream.get_tiktok_stream_url(json_data, record_quality)
else:
logger.error(f"错误信息: 网络异常请检查网络是否能正常访问TikTok平台")
logger.error("错误信息: 网络异常请检查网络是否能正常访问TikTok平台")
elif record_url.find("https://live.kuaishou.com/") > -1:
platform = '快手直播'
with semaphore:
json_data = get_kuaishou_stream_data(
json_data = spider.get_kuaishou_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=ks_cookie)
port_info = get_kuaishou_stream_url(json_data, record_quality)
port_info = stream.get_kuaishou_stream_url(json_data, record_quality)
elif record_url.find("https://www.huya.com/") > -1:
platform = '虎牙直播'
with semaphore:
if record_quality not in ['原画', '蓝光', '超清']:
json_data = get_huya_stream_data(
json_data = spider.get_huya_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=hy_cookie)
port_info = get_huya_stream_url(json_data, record_quality)
port_info = stream.get_huya_stream_url(json_data, record_quality)
else:
port_info = get_huya_app_stream_url(
port_info = spider.get_huya_app_stream_url(
url=record_url,
proxy_addr=proxy_address,
cookies=hy_cookie
@@ -773,32 +401,32 @@ def start_record(url_data: tuple, count_variable: int = -1):
elif record_url.find("https://www.douyu.com/") > -1:
platform = '斗鱼直播'
with semaphore:
json_data = get_douyu_info_data(
json_data = spider.get_douyu_info_data(
url=record_url, proxy_addr=proxy_address, cookies=douyu_cookie)
port_info = get_douyu_stream_url(
json_data, proxy_address=proxy_address, cookies=douyu_cookie,
video_quality=record_quality
port_info = stream.get_douyu_stream_url(
json_data, video_quality=record_quality, cookies=douyu_cookie, proxy_addr=proxy_address
)
elif record_url.find("https://www.yy.com/") > -1:
platform = 'YY直播'
with semaphore:
json_data = get_yy_stream_data(
json_data = spider.get_yy_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=yy_cookie)
port_info = get_yy_stream_url(json_data)
port_info = stream.get_yy_stream_url(json_data)
elif record_url.find("https://live.bilibili.com/") > -1:
platform = 'B站直播'
with semaphore:
json_data = get_bilibili_room_info(
json_data = spider.get_bilibili_room_info(
url=record_url, proxy_addr=proxy_address, cookies=bili_cookie)
port_info = get_bilibili_stream_url(json_data, record_quality)
port_info = stream.get_bilibili_stream_url(
json_data, video_quality=record_quality, cookies=bili_cookie, proxy_addr=proxy_address)
elif record_url.find("https://www.redelight.cn/") > -1 or \
record_url.find("https://www.xiaohongshu.com/") > -1 or \
record_url.find("http://xhslink.com/") > -1:
platform = '小红书直播'
if retry > 0:
if retry > 1:
delete_line(url_config_file, record_url)
if record_url in running_list:
running_list.remove(record_url)
@@ -806,95 +434,102 @@ def start_record(url_data: tuple, count_variable: int = -1):
logger.info(f'{record_url} 小红书直播已结束,停止录制')
return
with semaphore:
port_info = get_xhs_stream_url(url=record_url, proxy_addr=proxy_address, cookies=xhs_cookie)
port_info = spider.get_xhs_stream_url(
record_url, proxy_addr=proxy_address, cookies=xhs_cookie)
retry += 1
elif record_url.find("https://www.bigo.tv/") > -1 or record_url.find("slink.bigovideo.tv/") > -1:
platform = 'Bigo直播'
with semaphore:
port_info = get_bigo_stream_url(record_url, proxy_addr=proxy_address, cookies=bigo_cookie)
port_info = spider.get_bigo_stream_url(
record_url, proxy_addr=proxy_address, cookies=bigo_cookie)
elif record_url.find("https://app.blued.cn/") > -1:
platform = 'Blued直播'
with semaphore:
port_info = get_blued_stream_url(record_url, proxy_addr=proxy_address, cookies=blued_cookie)
port_info = spider.get_blued_stream_url(
record_url, proxy_addr=proxy_address, cookies=blued_cookie)
elif record_url.find("afreecatv.com/") > -1:
platform = 'AfreecaTV'
with semaphore:
if global_proxy or proxy_address:
json_data = get_afreecatv_stream_data(
json_data = spider.get_afreecatv_stream_data(
url=record_url, proxy_addr=proxy_address,
cookies=afreecatv_cookie,
username=afreecatv_username,
password=afreecatv_password
)
port_info = get_stream_url(json_data, record_quality, spec=True)
if json_data and json_data.get('new_cookies', None):
update_config(config_file, 'Cookie', 'afreecatv_cookie', json_data['new_cookies'])
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
else:
logger.error(f"错误信息: 网络异常请检查本网络是否能正常访问AfreecaTV平台")
logger.error("错误信息: 网络异常请检查本网络是否能正常访问AfreecaTV平台")
elif record_url.find("cc.163.com/") > -1:
platform = '网易CC直播'
with semaphore:
json_data = get_netease_stream_data(url=record_url, cookies=netease_cookie)
port_info = get_netease_stream_url(json_data, record_quality)
json_data = spider.get_netease_stream_data(url=record_url, cookies=netease_cookie)
port_info = stream.get_netease_stream_url(json_data, record_quality)
elif record_url.find("qiandurebo.com/") > -1:
platform = '千度热播'
with semaphore:
port_info = get_qiandurebo_stream_data(
port_info = spider.get_qiandurebo_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=qiandurebo_cookie)
elif record_url.find("www.pandalive.co.kr/") > -1:
platform = 'PandaTV'
with semaphore:
if global_proxy or proxy_address:
json_data = get_pandatv_stream_data(
json_data = spider.get_pandatv_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=pandatv_cookie
)
port_info = get_stream_url(json_data, record_quality, spec=True)
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
else:
logger.error(f"错误信息: 网络异常请检查本网络是否能正常访问PandaTV直播平台")
logger.error("错误信息: 网络异常请检查本网络是否能正常访问PandaTV直播平台")
elif record_url.find("fm.missevan.com/") > -1:
platform = '猫耳FM直播'
with semaphore:
port_info = get_maoerfm_stream_url(
port_info = spider.get_maoerfm_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=maoerfm_cookie)
elif record_url.find("www.winktv.co.kr/") > -1:
platform = 'WinkTV'
with semaphore:
if global_proxy or proxy_address:
json_data = get_winktv_stream_data(
json_data = spider.get_winktv_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=winktv_cookie)
port_info = get_stream_url(json_data, record_quality, spec=True)
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
else:
logger.error(f"错误信息: 网络异常请检查本网络是否能正常访问WinkTV直播平台")
logger.error("错误信息: 网络异常请检查本网络是否能正常访问WinkTV直播平台")
elif record_url.find("www.flextv.co.kr/") > -1:
platform = 'FlexTV'
with semaphore:
if global_proxy or proxy_address:
json_data = get_flextv_stream_data(
json_data = spider.get_flextv_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=flextv_cookie,
username=flextv_username,
password=flextv_password
)
port_info = get_stream_url(json_data, record_quality, spec=True)
if json_data and json_data.get('new_cookies', None):
update_config(config_file, 'Cookie', 'flextv_cookie', json_data['new_cookies'])
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
else:
logger.error(f"错误信息: 网络异常请检查本网络是否能正常访问FlexTV直播平台")
logger.error("错误信息: 网络异常请检查本网络是否能正常访问FlexTV直播平台")
elif record_url.find("look.163.com/") > -1:
platform = 'Look直播'
with semaphore:
port_info = get_looklive_stream_url(
port_info = spider.get_looklive_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=look_cookie
)
@@ -902,7 +537,7 @@ def start_record(url_data: tuple, count_variable: int = -1):
platform = 'PopkonTV'
with semaphore:
if global_proxy or proxy_address:
port_info = get_popkontv_stream_url(
port_info = spider.get_popkontv_stream_url(
url=record_url,
proxy_addr=proxy_address,
access_token=popkontv_access_token,
@@ -910,13 +545,17 @@ def start_record(url_data: tuple, count_variable: int = -1):
password=popkontv_password,
partner_code=popkontv_partner_code
)
if port_info and port_info.get('new_token'):
update_config(config_file, 'Authorization',
'popkontv_token', port_info['new_token'])
else:
logger.error(f"错误信息: 网络异常请检查本网络是否能正常访问PopkonTV直播平台")
logger.error("错误信息: 网络异常请检查本网络是否能正常访问PopkonTV直播平台")
elif record_url.find("twitcasting.tv/") > -1:
platform = 'TwitCasting'
with semaphore:
port_info = get_twitcasting_stream_url(
port_info = spider.get_twitcasting_stream_url(
url=record_url,
proxy_addr=proxy_address,
cookies=twitcasting_cookie,
@@ -924,110 +563,112 @@ def start_record(url_data: tuple, count_variable: int = -1):
username=twitcasting_username,
password=twitcasting_password
)
if port_info and port_info.get('new_cookies'):
update_config(config_file, 'Cookie', 'twitcasting_cookie', port_info['new_cookies'])
elif record_url.find("live.baidu.com/") > -1:
platform = '百度直播'
with semaphore:
json_data = get_baidu_stream_data(
json_data = spider.get_baidu_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=baidu_cookie)
port_info = get_stream_url(json_data, record_quality)
port_info = stream.get_stream_url(json_data, record_quality)
elif record_url.find("weibo.com/") > -1:
platform = '微博直播'
with semaphore:
json_data = get_weibo_stream_data(
json_data = spider.get_weibo_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=weibo_cookie)
port_info = get_stream_url(json_data, record_quality, extra_key='m3u8_url')
port_info = stream.get_stream_url(json_data, record_quality, extra_key='m3u8_url')
elif record_url.find("kugou.com/") > -1:
platform = '酷狗直播'
with semaphore:
port_info = get_kugou_stream_url(
port_info = spider.get_kugou_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=kugou_cookie)
elif record_url.find("www.twitch.tv/") > -1:
platform = 'TwitchTV'
with semaphore:
if global_proxy or proxy_address:
json_data = get_twitchtv_stream_data(
json_data = spider.get_twitchtv_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=twitch_cookie
)
port_info = get_stream_url(json_data, record_quality, spec=True)
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
else:
logger.error(f"错误信息: 网络异常请检查本网络是否能正常访问TwitchTV直播平台")
logger.error("错误信息: 网络异常请检查本网络是否能正常访问TwitchTV直播平台")
elif record_url.find("www.liveme.com/") > -1:
if global_proxy or proxy_address:
platform = 'LiveMe'
with semaphore:
port_info = get_liveme_stream_url(
port_info = spider.get_liveme_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=liveme_cookie)
else:
logger.error(f"错误信息: 网络异常请检查本网络是否能正常访问LiveMe直播平台")
logger.error("错误信息: 网络异常请检查本网络是否能正常访问LiveMe直播平台")
elif record_url.find("www.huajiao.com/") > -1:
platform = '花椒直播'
with semaphore:
port_info = get_huajiao_stream_url(
port_info = spider.get_huajiao_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=huajiao_cookie)
elif record_url.find("7u66.com/") > -1:
platform = '流星直播'
with semaphore:
port_info = get_liuxing_stream_url(
port_info = spider.get_liuxing_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=liuxing_cookie)
elif record_url.find("showroom-live.com/") > -1:
platform = 'ShowRoom'
with semaphore:
json_data = get_showroom_stream_data(
json_data = spider.get_showroom_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=showroom_cookie)
port_info = get_stream_url(json_data, record_quality, spec=True)
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
elif record_url.find("live.acfun.cn/") > -1 or record_url.find("m.acfun.cn/") > -1:
platform = 'Acfun'
with semaphore:
json_data = get_acfun_stream_data(
json_data = spider.get_acfun_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=acfun_cookie)
port_info = get_stream_url(json_data, record_quality, url_type='flv', extra_key='url')
port_info = stream.get_stream_url(json_data, record_quality, url_type='flv', extra_key='url')
elif record_url.find("rengzu.com/") > -1:
platform = '时光直播'
with semaphore:
port_info = get_shiguang_stream_url(
port_info = spider.get_shiguang_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=shiguang_cookie)
elif record_url.find("ybw1666.com/") > -1:
platform = '音播直播'
with semaphore:
port_info = get_yinbo_stream_url(
port_info = spider.get_yinbo_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=yinbo_cookie)
elif record_url.find("www.inke.cn/") > -1:
platform = '映客直播'
with semaphore:
port_info = get_yingke_stream_url(
port_info = spider.get_yingke_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=yingke_cookie)
elif record_url.find("www.zhihu.com/") > -1:
platform = '知乎直播'
with semaphore:
port_info = get_zhihu_stream_url(
port_info = spider.get_zhihu_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=zhihu_cookie)
elif record_url.find("chzzk.naver.com/") > -1:
platform = 'CHZZK'
with semaphore:
json_data = get_chzzk_stream_data(
json_data = spider.get_chzzk_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=chzzk_cookie)
port_info = get_stream_url(json_data, record_quality, spec=True)
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
else:
logger.error(f'{record_url} 未知直播地址')
logger.error(f'{record_url} {platform}直播地址')
return
if anchor_name:
@@ -1073,7 +714,7 @@ def start_record(url_data: tuple, count_variable: int = -1):
if start_pushed:
if over_show_push:
push_content = f"直播间状态更新:[直播间名称] 直播已结束!时间:[时间]"
push_content = "直播间状态更新:[直播间名称] 直播已结束!时间:[时间]"
if over_push_message_text:
push_content = over_push_message_text
@@ -1089,7 +730,7 @@ def start_record(url_data: tuple, count_variable: int = -1):
if live_status_push and not start_pushed:
if begin_show_push:
push_content = f"直播间状态更新:[直播间名称] 正在直播中,时间:[时间]"
push_content = "直播间状态更新:[直播间名称] 正在直播中,时间:[时间]"
if begin_push_message_text:
push_content = begin_push_message_text
@@ -1515,8 +1156,7 @@ def start_record(url_data: tuple, count_variable: int = -1):
count_time = time.time()
if record_finished_2:
if record_name in recording:
recording.remove(record_name)
recording.discard(record_name)
record_finished_2 = False
except Exception as e:
@@ -1644,7 +1284,7 @@ print("| DouyinLiveRecorder |")
print("-----------------------------------------------------")
print(f"版本号: {version}")
print(f"GitHub: https://github.com/ihmily/DouyinLiveRecorder")
print("GitHub: https://github.com/ihmily/DouyinLiveRecorder")
print(f'支持平台: {platforms}')
print('.....................................................')
@@ -1822,13 +1462,11 @@ while True:
h, m = divmod(m, 60)
return f"{h}:{m:02d}:{s:02d}"
def contains_url(string: str) -> bool:
pattern = (r"(http:\/\/www\.|https:\/\/www\.|http:\/\/|https:\/\/)?[a-zA-Z0-9][a-zA-Z0-9\-]+(\.["
r"a-zA-Z0-9\-]+)*\.[a-zA-Z]{2,10}(:[0-9]{1,5})?(\/.*)?$")
return re.search(pattern, string) is not None
try:
url_comments = []
with (open(url_config_file, "r", encoding=text_encoding, errors='ignore') as file):
@@ -1990,4 +1628,4 @@ while True:
t2.start()
first_run = False
time.sleep(3)
time.sleep(3)

4
poetry.toml Normal file
View File

@@ -0,0 +1,4 @@
[virtualenvs]
in-project = true
create = true
prefer-active-python = true

24
pyproject.toml Normal file
View File

@@ -0,0 +1,24 @@
[project]
requires-python = ">=3.8,<3.13"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "douyinliverecorder"
version = "3.0.8"
description = "An easy tool for recording live streams"
authors = ["Hmily"]
license = "MIT"
readme = "README.md"
homepage = "https://github.com/ihmily/DouyinLiveRecorder"
repository = "https://github.com/ihmily/DouyinLiveRecorder"
keywords = ["douyin", "live", "recorder"]
[tool.poetry.dependencies]
python = "^3.8"
requests = "^2.25.1"
PyExecJS = "^1.5.1"
loguru = "^0.5.3"
pycryptodome = "^3.10.1"

30
setup.py Normal file
View File

@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
from setuptools import setup, find_packages
setup(
name='douyinliverecorder',
version='3.0.8',
author='Hmily',
description='An easy tool for recording live streams',
long_description=open('README.md', encoding='utf-8').read(),
long_description_content_type='text/markdown',
url='https://github.com/ihmily/DouyinLiveRecorder',
packages=find_packages(),
install_requires=[
'requests',
'PyExecJS',
'loguru',
'pycryptodome'
],
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
]
)