From 37aeddbe7162afd5ca3818fa664799ab0bb5fa20 Mon Sep 17 00:00:00 2001
From: ihmily <961532186@qq.com>
Date: Wed, 24 Jan 2024 23:14:37 +0800
Subject: [PATCH] feat: add custom live broadcast quality recording
---
main.py | 160 +++++++++++++++++++++++++++++++-----------------------
spider.py | 35 +++++++-----
2 files changed, 113 insertions(+), 82 deletions(-)
diff --git a/main.py b/main.py
index fe58daf..d2e7b41 100644
--- a/main.py
+++ b/main.py
@@ -4,7 +4,7 @@
Author: Hmily
GitHub: https://github.com/ihmily
Date: 2023-07-17 23:52:05
-Update: 2024-01-16 02:15:40
+Update: 2024-01-24 22:30:19
Copyright (c) 2023-2024 by Hmily, All Rights Reserved.
Function: Record live stream video.
"""
@@ -17,13 +17,14 @@ import urllib.request
import configparser
import subprocess
import threading
-import logging
import datetime
import time
import json
import re
import shutil
import signal
+from typing import Any
+
from spider import (
get_douyin_stream_data,
get_tiktok_stream_data,
@@ -51,7 +52,7 @@ from utils import (
)
from msg_push import dingtalk, xizhi, tg_bot
-version = "v2.0.8"
+version = "v2.0.9"
platforms = "抖音|TikTok|快手|虎牙|斗鱼|YY|B站|小红书|bigo直播|blued直播|AfreecaTV|网易cc"
# --------------------------全局变量-------------------------------------
recording = set()
@@ -77,8 +78,8 @@ backup_dir = './backup_config'
encoding = 'utf-8-sig'
rstr = r"[\/\\\:\*\?\"\<\>\|&u]"
ffmpeg_path = "ffmpeg" # ffmpeg文件路径
-default_path = os.getcwd()
-
+default_path = os.getcwd()+'/downloads'
+os.makedirs(default_path, exist_ok=True)
# --------------------------用到的函数-------------------------------------
def signal_handler(signal, frame):
@@ -130,8 +131,9 @@ def display_info():
no_repeat_recording = list(set(recording))
print(f"正在录制{len(no_repeat_recording)}个直播: ")
for recording_live in no_repeat_recording:
- have_record_time = now_time - recording_time_list[recording_live]
- print(f"{recording_live} 正在录制中 " + str(have_record_time).split('.')[0])
+ rt, qa = recording_time_list[recording_live]
+ have_record_time = now_time - rt
+ print(f"{recording_live}[{qa}] 正在录制中 " +str(have_record_time).split('.')[0])
# print('\n本软件已运行:'+str(now_time - start_display_time).split('.')[0])
print("x" * 60)
@@ -141,7 +143,7 @@ def display_info():
logger.warning(f"错误信息: {e} 发生错误的行数: {e.__traceback__.tb_lineno}")
-def update_file(file, old_str, new_str):
+def update_file(file: str, old_str: str, new_str: str):
# TODO: 更新文件操作
file_data = ""
with open(file, "r", encoding="utf-8-sig") as f:
@@ -153,7 +155,7 @@ def update_file(file, old_str, new_str):
f.write(file_data)
-def converts_mp4(address):
+def converts_mp4(address: str):
if tsconvert_to_mp4:
_output = subprocess.check_output([
"ffmpeg", "-i", address,
@@ -167,7 +169,7 @@ def converts_mp4(address):
os.remove(address)
-def converts_m4a(address):
+def converts_m4a(address: str):
if tsconvert_to_m4a:
_output = subprocess.check_output([
"ffmpeg", "-i", address,
@@ -181,7 +183,7 @@ def converts_m4a(address):
os.remove(address)
-def create_ass_file(filegruop):
+def create_ass_file(filegruop: list):
# TODO: 录制时生成ass格式的字幕文件
anchor_name = filegruop[0]
ass_filename = filegruop[1]
@@ -251,7 +253,7 @@ def change_max_connect():
@trace_error_decorator
-def get_douyin_stream_url(json_data):
+def get_douyin_stream_url(json_data: dict, video_quality: str) -> dict:
# TODO: 获取抖音直播源地址
anchor_name = json_data.get('anchor_name', None)
@@ -276,7 +278,7 @@ def get_douyin_stream_url(json_data):
# "标清": "SD2",
# }
- quality_list = list(m3u8_url_list.keys())
+ quality_list: list = list(m3u8_url_list.keys())
while len(quality_list) < 4:
quality_list.append(quality_list[-1])
video_qualities = {"原画": 0, "蓝光": 0, "超清": 1, "高清": 2, "标清": 3}
@@ -294,7 +296,7 @@ def get_douyin_stream_url(json_data):
@trace_error_decorator
-def get_tiktok_stream_url(json_data):
+def get_tiktok_stream_url(json_data: dict, video_quality: str) -> dict:
# TODO: 获取tiktok直播源地址
def get_video_quality_url(stream_data, quality_key):
@@ -333,10 +335,10 @@ def get_tiktok_stream_url(json_data):
@trace_error_decorator
-def get_kuaishou_stream_url(json_data):
+def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict:
# TODO: 获取快手直播源地址
- if json_data['type'] == 1:
+ if json_data['type'] == 1 and not json_data["is_live"]:
return json_data
live_status = json_data['is_live']
@@ -357,25 +359,22 @@ def get_kuaishou_stream_url(json_data):
while len(m3u8_url_list) < 4:
m3u8_url_list.append(m3u8_url_list[-1])
m3u8_url = m3u8_url_list[quality_index]['url']
- else:
- m3u8_url = json_data['backup']['m3u8_url']
+ result['m3u8_url'] = m3u8_url
+
if 'flv_url_list' in json_data:
flv_url_list = json_data['flv_url_list'][::-1]
while len(flv_url_list) < 4:
flv_url_list.append(flv_url_list[-1])
flv_url = flv_url_list[quality_index]['url']
- else:
- flv_url = json_data['backup']['flv_url']
+ result['flv_url'] = flv_url
+ result['record_url'] = flv_url
- result['flv_url'] = flv_url
- result['m3u8_url'] = m3u8_url
result['is_live'] = True
- result['record_url'] = flv_url if flv_url else m3u8_url
return result
@trace_error_decorator
-def get_huya_stream_url(json_data):
+def get_huya_stream_url(json_data: dict, video_quality: str) -> dict:
# TODO: 获取虎牙直播源地址
game_live_info = json_data.get('data', [])[0].get('gameLiveInfo', {})
@@ -389,7 +388,8 @@ def get_huya_stream_url(json_data):
if stream_info_list:
select_cdn = stream_info_list[0]
- flv_url = select_cdn.get('sFlvUrl')
+ # flv_url = select_cdn.get('sFlvUrl')
+ flv_url = 'http://hw.flv.huya.com/src' # 能播放但无法录制,待修复
stream_name = select_cdn.get('sStreamName')
flv_url_suffix = select_cdn.get('sFlvUrlSuffix')
hls_url = select_cdn.get('sHlsUrl')
@@ -425,12 +425,12 @@ def get_huya_stream_url(json_data):
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['record_url'] = flv_url # 虎牙使用flv视频流录制
-
return result
@trace_error_decorator
-def get_douyu_stream_url(json_data, cookies):
+def get_douyu_stream_url(json_data: dict, cookies: str, video_quality: str) -> dict:
+
# TODO: 获取斗鱼直播源地址
video_quality_options = {
"原画": '0',
@@ -462,7 +462,8 @@ def get_douyu_stream_url(json_data, cookies):
@trace_error_decorator
-def get_yy_stream_url(json_data):
+def get_yy_stream_url(json_data: dict) -> dict:
+
# TODO: 获取YY直播源地址
anchor_name = json_data.get('anchor_name', '')
result = {
@@ -481,8 +482,10 @@ def get_yy_stream_url(json_data):
@trace_error_decorator
-def get_bilibili_stream_url(json_data):
+def get_bilibili_stream_url(json_data: dict, video_quality: str) -> dict:
# TODO: 获取B站直播源地址
+ if "is_live" in json_data and not json_data['anchor_name']:
+ return json_data
anchor_name = json_data.get('roomInfoRes', {}).get('data', {}).get('anchor_info', {}).get('base_info', {}).get(
'uname', '')
@@ -536,7 +539,8 @@ def get_bilibili_stream_url(json_data):
@trace_error_decorator
-def get_netease_stream_url(json_data):
+def get_netease_stream_url(json_data: dict, video_quality: str) -> dict:
+
if not json_data['is_live']:
return json_data
stream_list = json_data['stream_list']['resolution']
@@ -557,7 +561,7 @@ def get_netease_stream_url(json_data):
}
-def start_record(url_tuple, count_variable=-1):
+def start_record(url_data: tuple, count_variable: int = -1):
global warning_count
global video_save_path
global live_list
@@ -573,8 +577,8 @@ def start_record(url_tuple, count_variable=-1):
no_error = True
new_record_url = ''
count_time = time.time()
- record_url = url_tuple[0]
- anchor_name = url_tuple[1]
+
+ record_quality, record_url, anchor_name = url_data
print(f"\r运行新线程,传入地址 {record_url}")
while True:
@@ -584,8 +588,8 @@ def start_record(url_tuple, count_variable=-1):
# 判断如果是浏览器长链接
with semaphore:
# 使用semaphore来控制同时访问资源的线程数量
- json_data = get_douyin_stream_data(record_url, dy_cookie)
- port_info = get_douyin_stream_url(json_data)
+ json_data = get_douyin_stream_data(record_url, cookies=dy_cookie)
+ port_info = get_douyin_stream_url(json_data, record_quality)
elif record_url.find("https://v.douyin.com/") > -1:
# 判断如果是app分享链接
is_long_url = True
@@ -596,64 +600,64 @@ def start_record(url_tuple, count_variable=-1):
new_record_url = "https://live.douyin.com/" + str(web_rid)
not_record_list.append(new_record_url)
with semaphore:
- json_data = get_douyin_stream_data(new_record_url, dy_cookie)
- port_info = get_douyin_stream_url(json_data)
+ json_data = get_douyin_stream_data(new_record_url, cookies=dy_cookie)
+ port_info = get_douyin_stream_url(json_data, record_quality)
elif record_url.find("https://www.tiktok.com/") > -1:
with semaphore:
if use_proxy:
if global_proxy or proxy_addr != '':
- json_data = get_tiktok_stream_data(record_url, proxy_addr, tiktok_cookie)
- port_info = get_tiktok_stream_url(json_data)
+ json_data = get_tiktok_stream_data(record_url, proxy_addr=proxy_addr, cookies=tiktok_cookie)
+ port_info = get_tiktok_stream_url(json_data, record_quality)
elif record_url.find("https://live.kuaishou.com/") > -1:
with semaphore:
- json_data = get_kuaishou_stream_data(record_url, ks_cookie)
- port_info = get_kuaishou_stream_url(json_data)
+ json_data = get_kuaishou_stream_data(record_url, cookies=ks_cookie)
+ port_info = get_kuaishou_stream_url(json_data, record_quality)
elif record_url.find("https://www.huya.com/") > -1:
with semaphore:
- json_data = get_huya_stream_data(record_url, hy_cookie)
- port_info = get_huya_stream_url(json_data)
+ json_data = get_huya_stream_data(record_url, cookies=hy_cookie)
+ port_info = get_huya_stream_url(json_data, record_quality)
elif record_url.find("https://www.douyu.com/") > -1:
with semaphore:
json_data = get_douyu_info_data(record_url)
- port_info = get_douyu_stream_url(json_data, douyu_cookie)
+ port_info = get_douyu_stream_url(json_data, cookies=douyu_cookie, video_quality=record_quality)
elif record_url.find("https://www.yy.com/") > -1:
with semaphore:
- json_data = get_yy_stream_data(record_url, yy_cookie)
+ json_data = get_yy_stream_data(record_url, cookies=yy_cookie)
port_info = get_yy_stream_url(json_data)
elif record_url.find("https://live.bilibili.com/") > -1:
with semaphore:
- json_data = get_bilibili_stream_data(record_url, bili_cookie)
- port_info = get_bilibili_stream_url(json_data)
+ json_data = get_bilibili_stream_data(record_url, cookies=bili_cookie)
+ port_info = get_bilibili_stream_url(json_data, record_quality)
elif record_url.find("https://www.xiaohongshu.com/") > -1:
with semaphore:
- port_info = get_xhs_stream_url(record_url, xhs_cookie)
+ port_info = get_xhs_stream_url(record_url, cookies=xhs_cookie)
elif record_url.find("https://www.bigo.tv/") > -1:
with semaphore:
- port_info = get_bigo_stream_url(record_url, bigo_cookie)
+ port_info = get_bigo_stream_url(record_url, cookies=bigo_cookie)
elif record_url.find("https://app.blued.cn/") > -1:
with semaphore:
- port_info = get_blued_stream_url(record_url, blued_cookie)
+ port_info = get_blued_stream_url(record_url, cookies=blued_cookie)
elif record_url.find("afreecatv.com/") > -1:
with semaphore:
- port_info = get_afreecatv_stream_url(record_url, afreecatv_cookie)
+ port_info = get_afreecatv_stream_url(record_url, proxy_addr=proxy_addr, cookies=afreecatv_cookie)
elif record_url.find("cc.163.com/") > -1:
with semaphore:
- json_data = get_netease_stream_data(record_url, netease_cookie)
- port_info = get_netease_stream_url(json_data)
+ json_data = get_netease_stream_data(record_url, cookies=netease_cookie)
+ port_info = get_netease_stream_url(json_data, record_quality)
if anchor_name:
- anchor_split = anchor_name.split('主播:')
+ anchor_split: list= anchor_name.split('主播:')
if len(anchor_split) > 1 and anchor_split[1].strip():
anchor_name = anchor_split[1].strip()
else:
@@ -662,7 +666,7 @@ def start_record(url_tuple, count_variable=-1):
anchor_name = port_info.get("anchor_name", '')
if anchor_name == '':
- print(f'序号{count_variable} 网址内容获取失败,进行重试中...获取失败的地址是:{url_tuple}')
+ print(f'序号{count_variable} 网址内容获取失败,进行重试中...获取失败的地址是:{url_data}')
warning_count += 1
else:
anchor_name = re.sub(rstr, "_", anchor_name) # 过滤不能作为文件名的字符,替换为下划线
@@ -673,7 +677,7 @@ def start_record(url_tuple, count_variable=-1):
name_list.append(f'{record_url}|#{record_url}')
return
- if url_tuple[1] == "" and run_once is False:
+ if url_data[-1] == "" and run_once is False:
if is_long_url:
name_list.append(f'{record_url}|{new_record_url},主播: {anchor_name.strip()}')
else:
@@ -708,7 +712,7 @@ def start_record(url_tuple, count_variable=-1):
else:
video_save_path = default_path + '/'
- video_save_path = video_save_path.replace("\\", "/")
+ video_save_path = video_save_path.replace("\\", '/')
full_path = f'{video_save_path}{anchor_name}'
if not os.path.exists(full_path):
os.makedirs(full_path)
@@ -755,7 +759,7 @@ def start_record(url_tuple, count_variable=-1):
recording.add(record_name)
start_record_time = datetime.datetime.now()
- recording_time_list[record_name] = start_record_time
+ recording_time_list[record_name] = [start_record_time, record_quality]
rec_info = f"\r{anchor_name} 录制视频中: {full_path}"
filename_short = full_path + '/' + anchor_name + '_' + now
@@ -1057,7 +1061,7 @@ def start_record(url_tuple, count_variable=-1):
time.sleep(2)
-def backup_file(file_path, backup_dir):
+def backup_file(file_path: str, backup_dir: str):
"""
备份配置文件到备份目录,分别保留最新 10 个文件
"""
@@ -1158,8 +1162,9 @@ except Exception as e:
print('INFO:未检测到全局/规则网络代理,请检查代理配置(若无需录制TikTok直播请忽略此条提示)')
-def read_config_value(config, section, option, default_value):
+def read_config_value(config: configparser.RawConfigParser, section: str, option: str, default_value: Any) -> Any:
try:
+
config.read(config_file, encoding=encoding)
if '录制设置' not in config.sections():
config.add_section('录制设置')
@@ -1257,12 +1262,18 @@ while True:
print("直播视频保存为TS格式")
- def transform_int_to_time(seconds):
+ def transform_int_to_time(seconds: int) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}:{m:02d}:{s:02d}"
+ def contains_url(string: str) -> bool:
+ # 正则表达式匹配各种格式的网址,包括裸域名和子域名
+ pattern = r"(http:\/\/www\.|https:\/\/www\.|http:\/\/|https:\/\/)?[a-zA-Z0-9][a-zA-Z0-9\-]+(\.[a-zA-Z0-9\-]+)*\.[a-zA-Z]{2,10}(:[0-9]{1,5})?(\/.*)?$"
+ return re.search(pattern, string) is not None
+
+
# 读取url_config.ini文件
try:
with open(url_config_file, "r", encoding=encoding) as file:
@@ -1275,7 +1286,19 @@ while True:
split_line = re.split('[,,]', line)
else:
split_line = [line, '']
- url = split_line[0]
+
+ if len(split_line) == 1:
+ url = split_line[0]
+ quality, name = [video_quality, '']
+ elif len(split_line) == 2:
+ if contains_url(split_line[0]):
+ quality = video_quality
+ url, name = split_line
+ else:
+ quality, url = split_line
+ name = ''
+ else:
+ quality, url, name = split_line
if ('http://' not in url) and ('https://' not in url):
url = 'https://' + url
@@ -1298,7 +1321,7 @@ while True:
'cc.163.com'
]
if url_host in host_list:
- new_line = (url, split_line[1])
+ new_line = (quality, url, name)
url_tuples_list.append(new_line)
else:
print(f"{url} 未知链接.此条跳过")
@@ -1311,21 +1334,22 @@ while True:
if len(url_tuples_list) > 0:
text_no_repeat_url = list(set(url_tuples_list))
+
if len(text_no_repeat_url) > 0:
for url_tuple in text_no_repeat_url:
- if url_tuple[0] in not_record_list:
+ if url_tuple[1] in not_record_list:
continue
- if url_tuple[0] not in runing_list:
+ if url_tuple[1] not in runing_list:
if not first_start:
- print(f"新增链接: {url_tuple[0]}")
+ print(f"新增链接: {url_tuple[1]}")
monitoring += 1
args = [url_tuple, monitoring]
# TODO: 执行开始录制的操作
create_var['thread' + str(monitoring)] = threading.Thread(target=start_record, args=args)
create_var['thread' + str(monitoring)].daemon = True
create_var['thread' + str(monitoring)].start()
- runing_list.append(url_tuple[0])
+ runing_list.append(url_tuple[1])
time.sleep(local_delay_default)
url_tuples_list = []
first_start = False
@@ -1341,4 +1365,4 @@ while True:
first_run = False
- time.sleep(3)
\ No newline at end of file
+ time.sleep(3)
diff --git a/spider.py b/spider.py
index a710289..14c00ef 100644
--- a/spider.py
+++ b/spider.py
@@ -4,7 +4,7 @@
Author: Hmily
GitHub:https://github.com/ihmily
Date: 2023-07-15 23:15:00
-Update: 2024-01-16 01:51:29
+Update: 2024-01-24 22:57:03
Copyright (c) 2023 by Hmily, All Rights Reserved.
Function: Get live stream data.
"""
@@ -122,7 +122,7 @@ def get_kuaishou_stream_data(url: str, cookies: Union[str, None] = None) -> Dict
print(f"Failed to parse JSON data from {url}. Error: {e}")
return {"type": 1, "is_live": False}
- result = {"type": 1, "is_live": False}
+ result = {"type": 2, "is_live": False}
if 'errorType' in play_list or 'liveStream' not in play_list:
error_msg = play_list.get('errorType', {}).get('title', '') + play_list.get('errorType', {}).get('content', '')
@@ -138,8 +138,8 @@ def get_kuaishou_stream_data(url: str, cookies: Union[str, None] = None) -> Dict
result.update({"anchor_name": anchor_name})
if play_list['liveStream'].get("playUrls"):
- play_url = play_list['liveStream']['playUrls'][0]['adaptationSet']['representation'][0].get('url', '')
- result.update({"flv_url": play_url, "record_url": play_url, "is_live": True})
+ play_url_list = play_list['liveStream']['playUrls'][0]['adaptationSet']['representation']
+ result.update({"flv_url_list": play_url_list, "is_live": True})
return result
@@ -334,12 +334,15 @@ def get_bilibili_stream_data(url: str, cookies: Union[str, None] = None) -> Dict
}
if cookies:
headers['Cookie'] = cookies
- req = urllib.request.Request(url, headers=headers)
- response = opener.open(req, timeout=15)
- html_str = response.read().decode('utf-8')
- json_str = re.search('