diff --git a/main.py b/main.py index d2e7b41..7a58928 100644 --- a/main.py +++ b/main.py @@ -4,7 +4,7 @@ Author: Hmily GitHub: https://github.com/ihmily Date: 2023-07-17 23:52:05 -Update: 2024-01-24 22:30:19 +Update: 2024-01-25 12:47:12 Copyright (c) 2023-2024 by Hmily, All Rights Reserved. Function: Record live stream video. """ @@ -23,13 +23,12 @@ import json import re import shutil import signal -from typing import Any +from typing import Any, Union from spider import ( get_douyin_stream_data, get_tiktok_stream_data, get_kuaishou_stream_data, - get_kuaishou_stream_data2, get_huya_stream_data, get_douyu_info_data, get_douyu_stream_data, @@ -53,7 +52,7 @@ from utils import ( from msg_push import dingtalk, xizhi, tg_bot version = "v2.0.9" -platforms = "抖音|TikTok|快手|虎牙|斗鱼|YY|B站|小红书|bigo直播|blued直播|AfreecaTV|网易cc" +platforms = "抖音|TikTok|快手|虎牙|斗鱼|YY|B站|小红书|bigo直播|blued直播|AfreecaTV|网易CC" # --------------------------全局变量------------------------------------- recording = set() unrecording = set() @@ -78,11 +77,12 @@ backup_dir = './backup_config' encoding = 'utf-8-sig' rstr = r"[\/\\\:\*\?\"\<\>\|&u]" ffmpeg_path = "ffmpeg" # ffmpeg文件路径 -default_path = os.getcwd()+'/downloads' +default_path = os.getcwd() + '/downloads' os.makedirs(default_path, exist_ok=True) + # --------------------------用到的函数------------------------------------- -def signal_handler(signal, frame): +def signal_handler(_signal, _frame): sys.exit(0) @@ -113,7 +113,7 @@ def display_info(): if split_video_by_time: print(f"录制分段开启: {split_time}秒", end=" | ") print(f"是否生成时间文件: {'是' if create_time_file else '否'}", end=" | ") - print(f"录制视频质量为: {video_quality}", end=" | ") + print(f"录制视频质量为: {video_record_quality}", end=" | ") print(f"录制视频格式为: {video_save_type}", end=" | ") print(f"目前瞬时错误数为: {warning_count}", end=" | ") format_now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) @@ -133,7 +133,7 @@ def display_info(): for recording_live in no_repeat_recording: rt, qa = recording_time_list[recording_live] have_record_time = now_time - rt - print(f"{recording_live}[{qa}] 正在录制中 " +str(have_record_time).split('.')[0]) + print(f"{recording_live}[{qa}] 正在录制中 " + str(have_record_time).split('.')[0]) # print('\n本软件已运行:'+str(now_time - start_display_time).split('.')[0]) print("x" * 60) @@ -143,15 +143,15 @@ def display_info(): logger.warning(f"错误信息: {e} 发生错误的行数: {e.__traceback__.tb_lineno}") -def update_file(file: str, old_str: str, new_str: str): +def update_file(file_path: str, old_str: str, new_str: str): # TODO: 更新文件操作 file_data = "" - with open(file, "r", encoding="utf-8-sig") as f: + with open(file_path, "r", encoding="utf-8-sig") as f: for text_line in f: if old_str in text_line: text_line = text_line.replace(old_str, new_str) file_data += text_line - with open(file, "w", encoding="utf-8-sig") as f: + with open(file_path, "w", encoding="utf-8-sig") as f: f.write(file_data) @@ -299,10 +299,10 @@ def get_douyin_stream_url(json_data: dict, video_quality: str) -> dict: def get_tiktok_stream_url(json_data: dict, video_quality: str) -> dict: # TODO: 获取tiktok直播源地址 - def get_video_quality_url(stream_data, quality_key): + def get_video_quality_url(stream, q_key): return { - 'hls': re.sub("https", "http", stream_data[quality_key]['main']['hls']), - 'flv': re.sub("https", "http", stream_data[quality_key]['main']['flv']), + 'hls': re.sub("https", "http", stream[q_key]['main']['hls']), + 'flv': re.sub("https", "http", stream[q_key]['main']['flv']), } live_room = json_data['LiveRoom']['liveRoomUserInfo'] @@ -349,7 +349,7 @@ def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict: } if live_status: - quality_mapping = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, } + quality_mapping = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3} if video_quality in quality_mapping: @@ -430,7 +430,6 @@ def get_huya_stream_url(json_data: dict, video_quality: str) -> dict: @trace_error_decorator def get_douyu_stream_url(json_data: dict, cookies: str, video_quality: str) -> dict: - # TODO: 获取斗鱼直播源地址 video_quality_options = { "原画": '0', @@ -463,7 +462,6 @@ def get_douyu_stream_url(json_data: dict, cookies: str, video_quality: str) -> d @trace_error_decorator def get_yy_stream_url(json_data: dict) -> dict: - # TODO: 获取YY直播源地址 anchor_name = json_data.get('anchor_name', '') result = { @@ -509,11 +507,10 @@ def get_bilibili_stream_url(json_data: dict, video_quality: str) -> dict: extra = stream_data['url_info'][0]['extra'] url_type = format_list[m] qn = str(accept_qn_list[n]) - quality = quality_list[qn] - base_url = re.sub(r'_(\d+)' + f'(?={url_type}\?)', quality, base_url) + select_quality = quality_list[qn] + base_url = re.sub(r'_(\d+)' + f'(?={url_type}\\?)', select_quality, base_url) extra = re.sub('&qn=0', f'&qn={qn}', extra) - url = host + base_url + extra - return url + return host + base_url + extra if video_quality == "原画" or video_quality == "蓝光": flv_url = get_url(0, 0) @@ -540,7 +537,6 @@ def get_bilibili_stream_url(json_data: dict, video_quality: str) -> dict: @trace_error_decorator def get_netease_stream_url(json_data: dict, video_quality: str) -> dict: - if not json_data['is_live']: return json_data stream_list = json_data['stream_list']['resolution'] @@ -585,12 +581,14 @@ def start_record(url_data: tuple, count_variable: int = -1): try: port_info = [] if record_url.find("https://live.douyin.com/") > -1: + platform = '抖音直播' # 判断如果是浏览器长链接 with semaphore: # 使用semaphore来控制同时访问资源的线程数量 json_data = get_douyin_stream_data(record_url, cookies=dy_cookie) port_info = get_douyin_stream_url(json_data, record_quality) elif record_url.find("https://v.douyin.com/") > -1: + platform = '抖音直播' # 判断如果是app分享链接 is_long_url = True room_id, sec_user_id = get_sec_user_id(record_url) @@ -604,60 +602,77 @@ def start_record(url_data: tuple, count_variable: int = -1): port_info = get_douyin_stream_url(json_data, record_quality) elif record_url.find("https://www.tiktok.com/") > -1: + platform = 'TikTok直播' with semaphore: if use_proxy: if global_proxy or proxy_addr != '': - json_data = get_tiktok_stream_data(record_url, proxy_addr=proxy_addr, cookies=tiktok_cookie) + json_data = get_tiktok_stream_data(record_url, proxy_addr=proxy_addr, + cookies=tiktok_cookie) port_info = get_tiktok_stream_url(json_data, record_quality) elif record_url.find("https://live.kuaishou.com/") > -1: + platform = '快手直播' with semaphore: json_data = get_kuaishou_stream_data(record_url, cookies=ks_cookie) port_info = get_kuaishou_stream_url(json_data, record_quality) elif record_url.find("https://www.huya.com/") > -1: + platform = '虎牙直播' with semaphore: json_data = get_huya_stream_data(record_url, cookies=hy_cookie) port_info = get_huya_stream_url(json_data, record_quality) elif record_url.find("https://www.douyu.com/") > -1: + platform = '斗鱼直播' with semaphore: json_data = get_douyu_info_data(record_url) - port_info = get_douyu_stream_url(json_data, cookies=douyu_cookie, video_quality=record_quality) + port_info = get_douyu_stream_url(json_data, cookies=douyu_cookie, + video_quality=record_quality) elif record_url.find("https://www.yy.com/") > -1: + platform = 'YY直播' with semaphore: json_data = get_yy_stream_data(record_url, cookies=yy_cookie) port_info = get_yy_stream_url(json_data) elif record_url.find("https://live.bilibili.com/") > -1: + platform = 'B站直播' with semaphore: json_data = get_bilibili_stream_data(record_url, cookies=bili_cookie) port_info = get_bilibili_stream_url(json_data, record_quality) elif record_url.find("https://www.xiaohongshu.com/") > -1: + platform = '小红书直播' with semaphore: port_info = get_xhs_stream_url(record_url, cookies=xhs_cookie) elif record_url.find("https://www.bigo.tv/") > -1: + platform = 'bigo直播' with semaphore: port_info = get_bigo_stream_url(record_url, cookies=bigo_cookie) elif record_url.find("https://app.blued.cn/") > -1: + platform = 'blued直播' with semaphore: port_info = get_blued_stream_url(record_url, cookies=blued_cookie) elif record_url.find("afreecatv.com/") > -1: + platform = 'AfreecaTv直播' with semaphore: - port_info = get_afreecatv_stream_url(record_url, proxy_addr=proxy_addr, cookies=afreecatv_cookie) + port_info = get_afreecatv_stream_url(record_url, proxy_addr=proxy_addr, + cookies=afreecatv_cookie) elif record_url.find("cc.163.com/") > -1: + platform = '网易CC直播' with semaphore: json_data = get_netease_stream_data(record_url, cookies=netease_cookie) port_info = get_netease_stream_url(json_data, record_quality) + else: + logger.warning(f'{record_url} 未知直播地址') + return if anchor_name: - anchor_split: list= anchor_name.split('主播:') + anchor_split: list = anchor_name.split('主播:') if len(anchor_split) > 1 and anchor_split[1].strip(): anchor_name = anchor_split[1].strip() else: @@ -700,20 +715,19 @@ def start_record(url_data: tuple, count_variable: int = -1): tg_bot(tg_chat_id, tg_token, content) real_url = port_info['record_url'] - full_path = f'{default_path}/{anchor_name}' - if real_url != "": + full_path = f'{default_path}/{platform}/{anchor_name}' + if len(real_url) > 0: live_list.append(anchor_name) - now = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time())) + now = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S") try: if len(video_save_path) > 0: - if video_save_path[-1] != "/": + if video_save_path[-1] not in ["/", "\\"]: video_save_path = video_save_path + "/" - else: - video_save_path = default_path + '/' + full_path = f'{video_save_path}{platform}/{anchor_name}' + + full_path = full_path.replace("\\", '/') - video_save_path = video_save_path.replace("\\", '/') - full_path = f'{video_save_path}{anchor_name}' if not os.path.exists(full_path): os.makedirs(full_path) except Exception as e: @@ -1061,38 +1075,38 @@ def start_record(url_data: tuple, count_variable: int = -1): time.sleep(2) -def backup_file(file_path: str, backup_dir: str): +def backup_file(file_path: str, backup_dir_path: str): """ - 备份配置文件到备份目录,分别保留最新 10 个文件 + 备份配置文件到备份目录,分别保留最新 5 个文件 """ try: - if not os.path.exists(backup_dir): - os.makedirs(backup_dir) + if not os.path.exists(backup_dir_path): + os.makedirs(backup_dir_path) timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') backup_file_name = os.path.basename(file_path) + '_' + timestamp - backup_file_path = os.path.join(backup_dir, backup_file_name).replace("\\", "/") + backup_file_path = os.path.join(backup_dir_path, backup_file_name).replace("\\", "/") shutil.copy2(file_path, backup_file_path) print(f'\r已备份配置文件 {file_path} 到 {backup_file_path}') # 删除多余的备份文件 - files = os.listdir(backup_dir) + files = os.listdir(backup_dir_path) url_files = [f for f in files if f.startswith('URL_config.ini')] config_files = [f for f in files if f.startswith('config.ini')] - url_files.sort(key=lambda x: os.path.getmtime(os.path.join(backup_dir, x))) - config_files.sort(key=lambda x: os.path.getmtime(os.path.join(backup_dir, x))) + url_files.sort(key=lambda x: os.path.getmtime(os.path.join(backup_dir_path, x))) + config_files.sort(key=lambda x: os.path.getmtime(os.path.join(backup_dir_path, x))) while len(url_files) > 5: oldest_file = url_files[0] - os.remove(os.path.join(backup_dir, oldest_file)) + os.remove(os.path.join(backup_dir_path, oldest_file)) # print(f'\r已删除最旧的 URL_config.ini 备份文件 {oldest_file}') url_files = url_files[1:] while len(config_files) > 5: oldest_file = config_files[0] - os.remove(os.path.join(backup_dir, oldest_file)) + os.remove(os.path.join(backup_dir_path, oldest_file)) # print(f'\r已删除最旧的 config.ini 备份文件 {oldest_file}') config_files = config_files[1:] @@ -1117,7 +1131,7 @@ def backup_file_start(): if new_url_config_md5 != url_config_md5: backup_file(url_config_file, backup_dir) url_config_md5 = new_url_config_md5 - time.sleep(60) # 每1分钟检测一次文件是否有修改 + time.sleep(600) # 每10分钟检测一次文件是否有修改 except Exception as e: print(f'执行脚本异常:{str(e)}') @@ -1156,27 +1170,27 @@ try: print('系统代理检测中,请耐心等待...') response_g = urllib.request.urlopen("https://www.google.com/", timeout=15) global_proxy = True - print('系统代理已开启√ 注意:配置文件中的代理设置也要开启才会生效哦!') - -except Exception as e: - print('INFO:未检测到全局/规则网络代理,请检查代理配置(若无需录制TikTok直播请忽略此条提示)') + print('全局/规则网络代理已开启√ 注意:配置文件中的代理设置也要开启才会生效哦!') +except Exception: + print('INFO:未检测到全局/规则网络代理,请检查代理配置(若无需录制TikTok/AfreecaTV直播请忽略此条提示)') -def read_config_value(config: configparser.RawConfigParser, section: str, option: str, default_value: Any) -> Any: +def read_config_value(config_parser: configparser.RawConfigParser, section: str, option: str, default_value: Any) -> ( + Union)[str, int, bool]: try: - config.read(config_file, encoding=encoding) - if '录制设置' not in config.sections(): - config.add_section('录制设置') - if '推送配置' not in config.sections(): - config.add_section('推送配置') - if 'Cookie' not in config.sections(): - config.add_section('Cookie') - return config.get(section, option) + config_parser.read(config_file, encoding=encoding) + if '录制设置' not in config_parser.sections(): + config_parser.add_section('录制设置') + if '推送配置' not in config_parser.sections(): + config_parser.add_section('推送配置') + if 'Cookie' not in config_parser.sections(): + config_parser.add_section('Cookie') + return config_parser.get(section, option) except (configparser.NoSectionError, configparser.NoOptionError): - config.set(section, option, str(default_value)) + config_parser.set(section, option, str(default_value)) with open(config_file, 'w', encoding=encoding) as f: - config.write(f) + config_parser.write(f) return default_value @@ -1187,26 +1201,26 @@ while True: config = configparser.RawConfigParser() try: - with open(config_file, 'r', encoding=encoding) as f: - config.read_file(f) + with open(config_file, 'r', encoding=encoding) as file: + config.read_file(file) except IOError: - with open(config_file, 'w', encoding=encoding) as f: + with open(config_file, 'w', encoding=encoding) as file: pass if os.path.isfile(url_config_file): - with open(url_config_file, 'r', encoding=encoding) as f: - ini_content = f.read() + with open(url_config_file, 'r', encoding=encoding) as file: + ini_content = file.read() else: ini_content = "" if len(ini_content) == 0: input_url = input('请输入要录制的主播直播间网址(尽量使用PC网页端的直播间地址):\n') - with open(url_config_file, 'a+', encoding=encoding) as f: - f.write(input_url) + with open(url_config_file, 'a+', encoding=encoding) as file: + file.write(input_url) video_save_path = read_config_value(config, '录制设置', '直播保存路径(不填则默认)', "") video_save_type = read_config_value(config, '录制设置', '视频保存格式TS|MKV|FLV|MP4|TS音频|MKV音频', "mp4") - video_quality = read_config_value(config, '录制设置', '原画|超清|高清|标清', "原画") + video_record_quality = read_config_value(config, '录制设置', '原画|超清|高清|标清', "原画") use_proxy = options.get(read_config_value(config, '录制设置', '是否使用代理ip(是/否)', "是"), False) proxy_addr = read_config_value(config, '录制设置', '代理地址', "") max_request = int(read_config_value(config, '录制设置', '同一时间访问网络的线程数', 3)) @@ -1269,12 +1283,12 @@ while True: def contains_url(string: str) -> bool: - # 正则表达式匹配各种格式的网址,包括裸域名和子域名 - pattern = r"(http:\/\/www\.|https:\/\/www\.|http:\/\/|https:\/\/)?[a-zA-Z0-9][a-zA-Z0-9\-]+(\.[a-zA-Z0-9\-]+)*\.[a-zA-Z]{2,10}(:[0-9]{1,5})?(\/.*)?$" + pattern = (r"(http:\/\/www\.|https:\/\/www\.|http:\/\/|https:\/\/)?[a-zA-Z0-9][a-zA-Z0-9\-]+(\.[" + r"a-zA-Z0-9\-]+)*\.[a-zA-Z]{2,10}(:[0-9]{1,5})?(\/.*)?$") return re.search(pattern, string) is not None - # 读取url_config.ini文件 + # 读取URL_config.ini文件 try: with open(url_config_file, "r", encoding=encoding) as file: for line in file: @@ -1289,10 +1303,10 @@ while True: if len(split_line) == 1: url = split_line[0] - quality, name = [video_quality, ''] + quality, name = [video_record_quality, ''] elif len(split_line) == 2: if contains_url(split_line[0]): - quality = video_quality + quality = video_record_quality url, name = split_line else: quality, url = split_line @@ -1300,6 +1314,9 @@ while True: else: quality, url, name = split_line + if quality not in ["原画", "蓝光", "超清", "高清", "标清"]: + quality = '原画' + if ('http://' not in url) and ('https://' not in url): url = 'https://' + url @@ -1354,8 +1371,8 @@ while True: url_tuples_list = [] first_start = False - except Exception as e: - logger.warning(f"错误信息: {e} 发生错误的行数: {e.__traceback__.tb_lineno}") + except Exception as err: + logger.warning(f"错误信息: {err} 发生错误的行数: {err.__traceback__.tb_lineno}") if first_run: t = threading.Thread(target=display_info, args=(), daemon=True)