feat: add custom live broadcast quality recording

This commit is contained in:
ihmily
2024-01-24 23:14:37 +08:00
parent 848903ec8f
commit 37aeddbe71
2 changed files with 113 additions and 82 deletions

160
main.py
View File

@@ -4,7 +4,7 @@
Author: Hmily
GitHub: https://github.com/ihmily
Date: 2023-07-17 23:52:05
Update: 2024-01-16 02:15:40
Update: 2024-01-24 22:30:19
Copyright (c) 2023-2024 by Hmily, All Rights Reserved.
Function: Record live stream video.
"""
@@ -17,13 +17,14 @@ import urllib.request
import configparser
import subprocess
import threading
import logging
import datetime
import time
import json
import re
import shutil
import signal
from typing import Any
from spider import (
get_douyin_stream_data,
get_tiktok_stream_data,
@@ -51,7 +52,7 @@ from utils import (
)
from msg_push import dingtalk, xizhi, tg_bot
version = "v2.0.8"
version = "v2.0.9"
platforms = "抖音|TikTok|快手|虎牙|斗鱼|YY|B站|小红书|bigo直播|blued直播|AfreecaTV|网易cc"
# --------------------------全局变量-------------------------------------
recording = set()
@@ -77,8 +78,8 @@ backup_dir = './backup_config'
encoding = 'utf-8-sig'
rstr = r"[\/\\\:\*\?\"\<\>\|&u]"
ffmpeg_path = "ffmpeg" # ffmpeg文件路径
default_path = os.getcwd()
default_path = os.getcwd()+'/downloads'
os.makedirs(default_path, exist_ok=True)
# --------------------------用到的函数-------------------------------------
def signal_handler(signal, frame):
@@ -130,8 +131,9 @@ def display_info():
no_repeat_recording = list(set(recording))
print(f"正在录制{len(no_repeat_recording)}个直播: ")
for recording_live in no_repeat_recording:
have_record_time = now_time - recording_time_list[recording_live]
print(f"{recording_live} 正在录制中 " + str(have_record_time).split('.')[0])
rt, qa = recording_time_list[recording_live]
have_record_time = now_time - rt
print(f"{recording_live}[{qa}] 正在录制中 " +str(have_record_time).split('.')[0])
# print('\n本软件已运行'+str(now_time - start_display_time).split('.')[0])
print("x" * 60)
@@ -141,7 +143,7 @@ def display_info():
logger.warning(f"错误信息: {e} 发生错误的行数: {e.__traceback__.tb_lineno}")
def update_file(file, old_str, new_str):
def update_file(file: str, old_str: str, new_str: str):
# TODO: 更新文件操作
file_data = ""
with open(file, "r", encoding="utf-8-sig") as f:
@@ -153,7 +155,7 @@ def update_file(file, old_str, new_str):
f.write(file_data)
def converts_mp4(address):
def converts_mp4(address: str):
if tsconvert_to_mp4:
_output = subprocess.check_output([
"ffmpeg", "-i", address,
@@ -167,7 +169,7 @@ def converts_mp4(address):
os.remove(address)
def converts_m4a(address):
def converts_m4a(address: str):
if tsconvert_to_m4a:
_output = subprocess.check_output([
"ffmpeg", "-i", address,
@@ -181,7 +183,7 @@ def converts_m4a(address):
os.remove(address)
def create_ass_file(filegruop):
def create_ass_file(filegruop: list):
# TODO: 录制时生成ass格式的字幕文件
anchor_name = filegruop[0]
ass_filename = filegruop[1]
@@ -251,7 +253,7 @@ def change_max_connect():
@trace_error_decorator
def get_douyin_stream_url(json_data):
def get_douyin_stream_url(json_data: dict, video_quality: str) -> dict:
# TODO: 获取抖音直播源地址
anchor_name = json_data.get('anchor_name', None)
@@ -276,7 +278,7 @@ def get_douyin_stream_url(json_data):
# "标清": "SD2",
# }
quality_list = list(m3u8_url_list.keys())
quality_list: list = list(m3u8_url_list.keys())
while len(quality_list) < 4:
quality_list.append(quality_list[-1])
video_qualities = {"原画": 0, "蓝光": 0, "超清": 1, "高清": 2, "标清": 3}
@@ -294,7 +296,7 @@ def get_douyin_stream_url(json_data):
@trace_error_decorator
def get_tiktok_stream_url(json_data):
def get_tiktok_stream_url(json_data: dict, video_quality: str) -> dict:
# TODO: 获取tiktok直播源地址
def get_video_quality_url(stream_data, quality_key):
@@ -333,10 +335,10 @@ def get_tiktok_stream_url(json_data):
@trace_error_decorator
def get_kuaishou_stream_url(json_data):
def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict:
# TODO: 获取快手直播源地址
if json_data['type'] == 1:
if json_data['type'] == 1 and not json_data["is_live"]:
return json_data
live_status = json_data['is_live']
@@ -357,25 +359,22 @@ def get_kuaishou_stream_url(json_data):
while len(m3u8_url_list) < 4:
m3u8_url_list.append(m3u8_url_list[-1])
m3u8_url = m3u8_url_list[quality_index]['url']
else:
m3u8_url = json_data['backup']['m3u8_url']
result['m3u8_url'] = m3u8_url
if 'flv_url_list' in json_data:
flv_url_list = json_data['flv_url_list'][::-1]
while len(flv_url_list) < 4:
flv_url_list.append(flv_url_list[-1])
flv_url = flv_url_list[quality_index]['url']
else:
flv_url = json_data['backup']['flv_url']
result['flv_url'] = flv_url
result['record_url'] = flv_url
result['flv_url'] = flv_url
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['record_url'] = flv_url if flv_url else m3u8_url
return result
@trace_error_decorator
def get_huya_stream_url(json_data):
def get_huya_stream_url(json_data: dict, video_quality: str) -> dict:
# TODO: 获取虎牙直播源地址
game_live_info = json_data.get('data', [])[0].get('gameLiveInfo', {})
@@ -389,7 +388,8 @@ def get_huya_stream_url(json_data):
if stream_info_list:
select_cdn = stream_info_list[0]
flv_url = select_cdn.get('sFlvUrl')
# flv_url = select_cdn.get('sFlvUrl')
flv_url = 'http://hw.flv.huya.com/src' # 能播放但无法录制,待修复
stream_name = select_cdn.get('sStreamName')
flv_url_suffix = select_cdn.get('sFlvUrlSuffix')
hls_url = select_cdn.get('sHlsUrl')
@@ -425,12 +425,12 @@ def get_huya_stream_url(json_data):
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['record_url'] = flv_url # 虎牙使用flv视频流录制
return result
@trace_error_decorator
def get_douyu_stream_url(json_data, cookies):
def get_douyu_stream_url(json_data: dict, cookies: str, video_quality: str) -> dict:
# TODO: 获取斗鱼直播源地址
video_quality_options = {
"原画": '0',
@@ -462,7 +462,8 @@ def get_douyu_stream_url(json_data, cookies):
@trace_error_decorator
def get_yy_stream_url(json_data):
def get_yy_stream_url(json_data: dict) -> dict:
# TODO: 获取YY直播源地址
anchor_name = json_data.get('anchor_name', '')
result = {
@@ -481,8 +482,10 @@ def get_yy_stream_url(json_data):
@trace_error_decorator
def get_bilibili_stream_url(json_data):
def get_bilibili_stream_url(json_data: dict, video_quality: str) -> dict:
# TODO: 获取B站直播源地址
if "is_live" in json_data and not json_data['anchor_name']:
return json_data
anchor_name = json_data.get('roomInfoRes', {}).get('data', {}).get('anchor_info', {}).get('base_info', {}).get(
'uname', '')
@@ -536,7 +539,8 @@ def get_bilibili_stream_url(json_data):
@trace_error_decorator
def get_netease_stream_url(json_data):
def get_netease_stream_url(json_data: dict, video_quality: str) -> dict:
if not json_data['is_live']:
return json_data
stream_list = json_data['stream_list']['resolution']
@@ -557,7 +561,7 @@ def get_netease_stream_url(json_data):
}
def start_record(url_tuple, count_variable=-1):
def start_record(url_data: tuple, count_variable: int = -1):
global warning_count
global video_save_path
global live_list
@@ -573,8 +577,8 @@ def start_record(url_tuple, count_variable=-1):
no_error = True
new_record_url = ''
count_time = time.time()
record_url = url_tuple[0]
anchor_name = url_tuple[1]
record_quality, record_url, anchor_name = url_data
print(f"\r运行新线程,传入地址 {record_url}")
while True:
@@ -584,8 +588,8 @@ def start_record(url_tuple, count_variable=-1):
# 判断如果是浏览器长链接
with semaphore:
# 使用semaphore来控制同时访问资源的线程数量
json_data = get_douyin_stream_data(record_url, dy_cookie)
port_info = get_douyin_stream_url(json_data)
json_data = get_douyin_stream_data(record_url, cookies=dy_cookie)
port_info = get_douyin_stream_url(json_data, record_quality)
elif record_url.find("https://v.douyin.com/") > -1:
# 判断如果是app分享链接
is_long_url = True
@@ -596,64 +600,64 @@ def start_record(url_tuple, count_variable=-1):
new_record_url = "https://live.douyin.com/" + str(web_rid)
not_record_list.append(new_record_url)
with semaphore:
json_data = get_douyin_stream_data(new_record_url, dy_cookie)
port_info = get_douyin_stream_url(json_data)
json_data = get_douyin_stream_data(new_record_url, cookies=dy_cookie)
port_info = get_douyin_stream_url(json_data, record_quality)
elif record_url.find("https://www.tiktok.com/") > -1:
with semaphore:
if use_proxy:
if global_proxy or proxy_addr != '':
json_data = get_tiktok_stream_data(record_url, proxy_addr, tiktok_cookie)
port_info = get_tiktok_stream_url(json_data)
json_data = get_tiktok_stream_data(record_url, proxy_addr=proxy_addr, cookies=tiktok_cookie)
port_info = get_tiktok_stream_url(json_data, record_quality)
elif record_url.find("https://live.kuaishou.com/") > -1:
with semaphore:
json_data = get_kuaishou_stream_data(record_url, ks_cookie)
port_info = get_kuaishou_stream_url(json_data)
json_data = get_kuaishou_stream_data(record_url, cookies=ks_cookie)
port_info = get_kuaishou_stream_url(json_data, record_quality)
elif record_url.find("https://www.huya.com/") > -1:
with semaphore:
json_data = get_huya_stream_data(record_url, hy_cookie)
port_info = get_huya_stream_url(json_data)
json_data = get_huya_stream_data(record_url, cookies=hy_cookie)
port_info = get_huya_stream_url(json_data, record_quality)
elif record_url.find("https://www.douyu.com/") > -1:
with semaphore:
json_data = get_douyu_info_data(record_url)
port_info = get_douyu_stream_url(json_data, douyu_cookie)
port_info = get_douyu_stream_url(json_data, cookies=douyu_cookie, video_quality=record_quality)
elif record_url.find("https://www.yy.com/") > -1:
with semaphore:
json_data = get_yy_stream_data(record_url, yy_cookie)
json_data = get_yy_stream_data(record_url, cookies=yy_cookie)
port_info = get_yy_stream_url(json_data)
elif record_url.find("https://live.bilibili.com/") > -1:
with semaphore:
json_data = get_bilibili_stream_data(record_url, bili_cookie)
port_info = get_bilibili_stream_url(json_data)
json_data = get_bilibili_stream_data(record_url, cookies=bili_cookie)
port_info = get_bilibili_stream_url(json_data, record_quality)
elif record_url.find("https://www.xiaohongshu.com/") > -1:
with semaphore:
port_info = get_xhs_stream_url(record_url, xhs_cookie)
port_info = get_xhs_stream_url(record_url, cookies=xhs_cookie)
elif record_url.find("https://www.bigo.tv/") > -1:
with semaphore:
port_info = get_bigo_stream_url(record_url, bigo_cookie)
port_info = get_bigo_stream_url(record_url, cookies=bigo_cookie)
elif record_url.find("https://app.blued.cn/") > -1:
with semaphore:
port_info = get_blued_stream_url(record_url, blued_cookie)
port_info = get_blued_stream_url(record_url, cookies=blued_cookie)
elif record_url.find("afreecatv.com/") > -1:
with semaphore:
port_info = get_afreecatv_stream_url(record_url, afreecatv_cookie)
port_info = get_afreecatv_stream_url(record_url, proxy_addr=proxy_addr, cookies=afreecatv_cookie)
elif record_url.find("cc.163.com/") > -1:
with semaphore:
json_data = get_netease_stream_data(record_url, netease_cookie)
port_info = get_netease_stream_url(json_data)
json_data = get_netease_stream_data(record_url, cookies=netease_cookie)
port_info = get_netease_stream_url(json_data, record_quality)
if anchor_name:
anchor_split = anchor_name.split('主播:')
anchor_split: list= anchor_name.split('主播:')
if len(anchor_split) > 1 and anchor_split[1].strip():
anchor_name = anchor_split[1].strip()
else:
@@ -662,7 +666,7 @@ def start_record(url_tuple, count_variable=-1):
anchor_name = port_info.get("anchor_name", '')
if anchor_name == '':
print(f'序号{count_variable} 网址内容获取失败,进行重试中...获取失败的地址是:{url_tuple}')
print(f'序号{count_variable} 网址内容获取失败,进行重试中...获取失败的地址是:{url_data}')
warning_count += 1
else:
anchor_name = re.sub(rstr, "_", anchor_name) # 过滤不能作为文件名的字符,替换为下划线
@@ -673,7 +677,7 @@ def start_record(url_tuple, count_variable=-1):
name_list.append(f'{record_url}|#{record_url}')
return
if url_tuple[1] == "" and run_once is False:
if url_data[-1] == "" and run_once is False:
if is_long_url:
name_list.append(f'{record_url}|{new_record_url},主播: {anchor_name.strip()}')
else:
@@ -708,7 +712,7 @@ def start_record(url_tuple, count_variable=-1):
else:
video_save_path = default_path + '/'
video_save_path = video_save_path.replace("\\", "/")
video_save_path = video_save_path.replace("\\", '/')
full_path = f'{video_save_path}{anchor_name}'
if not os.path.exists(full_path):
os.makedirs(full_path)
@@ -755,7 +759,7 @@ def start_record(url_tuple, count_variable=-1):
recording.add(record_name)
start_record_time = datetime.datetime.now()
recording_time_list[record_name] = start_record_time
recording_time_list[record_name] = [start_record_time, record_quality]
rec_info = f"\r{anchor_name} 录制视频中: {full_path}"
filename_short = full_path + '/' + anchor_name + '_' + now
@@ -1057,7 +1061,7 @@ def start_record(url_tuple, count_variable=-1):
time.sleep(2)
def backup_file(file_path, backup_dir):
def backup_file(file_path: str, backup_dir: str):
"""
备份配置文件到备份目录,分别保留最新 10 个文件
"""
@@ -1158,8 +1162,9 @@ except Exception as e:
print('INFO未检测到全局/规则网络代理请检查代理配置若无需录制TikTok直播请忽略此条提示')
def read_config_value(config, section, option, default_value):
def read_config_value(config: configparser.RawConfigParser, section: str, option: str, default_value: Any) -> Any:
try:
config.read(config_file, encoding=encoding)
if '录制设置' not in config.sections():
config.add_section('录制设置')
@@ -1257,12 +1262,18 @@ while True:
print("直播视频保存为TS格式")
def transform_int_to_time(seconds):
def transform_int_to_time(seconds: int) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}:{m:02d}:{s:02d}"
def contains_url(string: str) -> bool:
# 正则表达式匹配各种格式的网址,包括裸域名和子域名
pattern = r"(http:\/\/www\.|https:\/\/www\.|http:\/\/|https:\/\/)?[a-zA-Z0-9][a-zA-Z0-9\-]+(\.[a-zA-Z0-9\-]+)*\.[a-zA-Z]{2,10}(:[0-9]{1,5})?(\/.*)?$"
return re.search(pattern, string) is not None
# 读取url_config.ini文件
try:
with open(url_config_file, "r", encoding=encoding) as file:
@@ -1275,7 +1286,19 @@ while True:
split_line = re.split('[,]', line)
else:
split_line = [line, '']
url = split_line[0]
if len(split_line) == 1:
url = split_line[0]
quality, name = [video_quality, '']
elif len(split_line) == 2:
if contains_url(split_line[0]):
quality = video_quality
url, name = split_line
else:
quality, url = split_line
name = ''
else:
quality, url, name = split_line
if ('http://' not in url) and ('https://' not in url):
url = 'https://' + url
@@ -1298,7 +1321,7 @@ while True:
'cc.163.com'
]
if url_host in host_list:
new_line = (url, split_line[1])
new_line = (quality, url, name)
url_tuples_list.append(new_line)
else:
print(f"{url} 未知链接.此条跳过")
@@ -1311,21 +1334,22 @@ while True:
if len(url_tuples_list) > 0:
text_no_repeat_url = list(set(url_tuples_list))
if len(text_no_repeat_url) > 0:
for url_tuple in text_no_repeat_url:
if url_tuple[0] in not_record_list:
if url_tuple[1] in not_record_list:
continue
if url_tuple[0] not in runing_list:
if url_tuple[1] not in runing_list:
if not first_start:
print(f"新增链接: {url_tuple[0]}")
print(f"新增链接: {url_tuple[1]}")
monitoring += 1
args = [url_tuple, monitoring]
# TODO: 执行开始录制的操作
create_var['thread' + str(monitoring)] = threading.Thread(target=start_record, args=args)
create_var['thread' + str(monitoring)].daemon = True
create_var['thread' + str(monitoring)].start()
runing_list.append(url_tuple[0])
runing_list.append(url_tuple[1])
time.sleep(local_delay_default)
url_tuples_list = []
first_start = False
@@ -1341,4 +1365,4 @@ while True:
first_run = False
time.sleep(3)
time.sleep(3)

View File

@@ -4,7 +4,7 @@
Author: Hmily
GitHub:https://github.com/ihmily
Date: 2023-07-15 23:15:00
Update: 2024-01-16 01:51:29
Update: 2024-01-24 22:57:03
Copyright (c) 2023 by Hmily, All Rights Reserved.
Function: Get live stream data.
"""
@@ -122,7 +122,7 @@ def get_kuaishou_stream_data(url: str, cookies: Union[str, None] = None) -> Dict
print(f"Failed to parse JSON data from {url}. Error: {e}")
return {"type": 1, "is_live": False}
result = {"type": 1, "is_live": False}
result = {"type": 2, "is_live": False}
if 'errorType' in play_list or 'liveStream' not in play_list:
error_msg = play_list.get('errorType', {}).get('title', '') + play_list.get('errorType', {}).get('content', '')
@@ -138,8 +138,8 @@ def get_kuaishou_stream_data(url: str, cookies: Union[str, None] = None) -> Dict
result.update({"anchor_name": anchor_name})
if play_list['liveStream'].get("playUrls"):
play_url = play_list['liveStream']['playUrls'][0]['adaptationSet']['representation'][0].get('url', '')
result.update({"flv_url": play_url, "record_url": play_url, "is_live": True})
play_url_list = play_list['liveStream']['playUrls'][0]['adaptationSet']['representation']
result.update({"flv_url_list": play_url_list, "is_live": True})
return result
@@ -334,12 +334,15 @@ def get_bilibili_stream_data(url: str, cookies: Union[str, None] = None) -> Dict
}
if cookies:
headers['Cookie'] = cookies
req = urllib.request.Request(url, headers=headers)
response = opener.open(req, timeout=15)
html_str = response.read().decode('utf-8')
json_str = re.search('<script>window.__NEPTUNE_IS_MY_WAIFU__=(.*?)</script><script>', html_str, re.S).group(1)
json_data = json.loads(json_str)
return json_data
try:
req = urllib.request.Request(url, headers=headers)
response = opener.open(req, timeout=15)
html_str = response.read().decode('utf-8')
json_str = re.search('<script>window.__NEPTUNE_IS_MY_WAIFU__=(.*?)</script><script>', html_str, re.S).group(1)
json_data = json.loads(json_str)
return json_data
except Exception:
return {"anchor_name": '', "is_live": False}
@trace_error_decorator
@@ -513,7 +516,6 @@ def get_afreecatv_stream_url(url: str, proxy_addr: Union[str, None] = None, cook
}
url2 = 'http://api.m.afreecatv.com/broad/a/watch'
if proxy_addr:
proxies = {
'http': proxy_addr,
@@ -531,8 +533,13 @@ def get_afreecatv_stream_url(url: str, proxy_addr: Union[str, None] = None, cook
json_data = json.loads(json_str)
anchor_name = json_data['data']['user_nick']
if not anchor_name:
if json_data['data']['code'] == -3004:
print("AfreecaTV直播获取失败:", json_data['data']['message'])
elif json_data['data']['code'] == -3002:
print("AfreecaTV直播获取失败:", json_data['data']['message'])
result = {
"anchor_name": anchor_name,
"anchor_name": '' if anchor_name is None else anchor_name,
"is_live": False,
}
@@ -557,7 +564,7 @@ def get_netease_stream_data(url: str, cookies: Union[str, None] = None) -> Dict[
}
if cookies:
headers['Cookie'] = cookies
url = url+'/' if url[-1] != '/' else url
req = urllib.request.Request(url, headers=headers)
response = opener.open(req, timeout=15)
html_str = response.read().decode('utf-8')
@@ -608,4 +615,4 @@ if __name__ == '__main__':
# print(get_bigo_stream_url(url))
# print(get_blued_stream_url(url))
# print(get_afreecatv_stream_url(url, proxy_addr=''))
# print(get_netease_stream_data(url))
# print(get_netease_stream_data(url))