diff --git a/main.py b/main.py index fa07db3..5e9c424 100644 --- a/main.py +++ b/main.py @@ -462,6 +462,18 @@ def clean_name(input_text): return cleaned_name or '空白昵称' +def get_quality_code(qn): + QUALITY_MAPPING = { + "原画": "OD", + "蓝光": "BD", + "超清": "UHD", + "高清": "HD", + "标清": "SD", + "流畅": "LD" + } + return QUALITY_MAPPING.get(qn) + + def start_record(url_data: tuple, count_variable: int = -1) -> None: global error_count @@ -473,7 +485,8 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None: new_record_url = '' count_time = time.time() retry = 0 - record_quality, record_url, anchor_name = url_data + record_quality_zh, record_url, anchor_name = url_data + record_quality = get_quality_code(record_quality_zh) proxy_address = proxy_addr platform = '未知平台' live_domain = '/'.join(record_url.split('/')[0:3]) @@ -1103,7 +1116,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None: recording.add(record_name) start_record_time = datetime.datetime.now() - recording_time_list[record_name] = [start_record_time, record_quality] + recording_time_list[record_name] = [start_record_time, record_quality_zh] rec_info = f"\r{anchor_name} 准备开始录制视频: {full_path}" if show_url: re_plat = ('WinkTV', 'PandaTV', 'ShowRoom', 'CHZZK', 'Youtube') diff --git a/streamget/stream.py b/streamget/stream.py index 311e858..9240ee8 100644 --- a/streamget/stream.py +++ b/streamget/stream.py @@ -22,6 +22,20 @@ from .spider import ( get_douyu_stream_data, get_bilibili_stream_data ) +QUALITY_MAPPING = {"OD": 0, "BD": 0, "UHD": 1, "HD": 2, "SD": 3, "LD": 4} + + +def get_quality_index(quality): + if not quality: + return 0 + + quality_str = str(quality).upper() + if quality_str.isdigit(): + quality_int = int(quality_str[0]) + return quality_int if quality_int in QUALITY_MAPPING.values() else 0 + else: + return QUALITY_MAPPING.get(quality_str, 0) + @trace_error_decorator def get_douyin_stream_url(json_data: dict, video_quality: str) -> dict: @@ -45,13 +59,13 @@ def get_douyin_stream_url(json_data: dict, video_quality: str) -> dict: flv_url_list.append(flv_url_list[-1]) m3u8_url_list.append(m3u8_url_list[-1]) - video_qualities = {"原画": 0, "蓝光": 0, "超清": 1, "高清": 2, "标清": 3, "流畅": 4} - quality_index = video_qualities.get(video_quality) + quality_index = get_quality_index(video_quality) m3u8_url = m3u8_url_list[quality_index] flv_url = flv_url_list[quality_index] result |= { 'is_live': True, 'title': json_data['title'], + 'quality': video_quality, 'm3u8_url': m3u8_url, 'flv_url': flv_url, 'record_url': m3u8_url or flv_url, @@ -101,13 +115,13 @@ def get_tiktok_stream_url(json_data: dict, video_quality: str) -> dict: flv_url_list.append(flv_url_list[-1]) while len(m3u8_url_list) < 5: m3u8_url_list.append(m3u8_url_list[-1]) - video_qualities = {"原画": 0, "蓝光": 0, "超清": 1, "高清": 2, "标清": 3, '流畅': 4} - quality_index = video_qualities.get(video_quality) + quality_index = get_quality_index(video_quality) flv_url = flv_url_list[quality_index]['url'].replace("https://", "http://") m3u8_url = m3u8_url_list[quality_index]['url'].replace("https://", "http://") result |= { 'is_live': True, 'title': live_room['liveRoom']['title'], + 'quality': video_quality, 'm3u8_url': m3u8_url, 'flv_url': flv_url, 'record_url': m3u8_url or flv_url, @@ -128,11 +142,10 @@ def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict: } if live_status: - quality_mapping = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4} - quality_mapping_bitrate = {'原画': 99999, '蓝光': 4000, '超清': 2000, '高清': 1000, '标清': 800, '流畅': 600} - if video_quality in quality_mapping: + quality_mapping_bitrate = {'OD': 99999, 'BD': 4000, 'UHD': 2000, 'HD': 1000, 'SD': 800, 'LD': 600} + if video_quality in QUALITY_MAPPING: - quality_index = quality_mapping[video_quality] + quality_index = get_quality_index(video_quality) if 'm3u8_url_list' in json_data: m3u8_url_list = json_data['m3u8_url_list'][::-1] while len(m3u8_url_list) < 5: @@ -154,6 +167,7 @@ def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict: # latest quality quality_index = len(flv_url_list) - 1 flv_url = flv_url_list[quality_index]['url'] + result['flv_url'] = flv_url result['record_url'] = flv_url else: @@ -165,6 +179,7 @@ def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict: flv_url = flv_url_list[quality_index]['url'] result |= {'flv_url': flv_url, 'record_url': flv_url} result['is_live'] = True + result['quality'] = video_quality return result @@ -230,17 +245,17 @@ def get_huya_stream_url(json_data: dict, video_quality: str) -> dict: m3u8_url = f'{hls_url}/{stream_name}.{hls_url_suffix}?{new_anti_code}&ratio=' quality_list = flv_anti_code.split('&exsphd=') - if len(quality_list) > 1 and video_quality not in ["原画", "蓝光"]: + if len(quality_list) > 1 and video_quality not in ["OD", "BD"]: pattern = r"(?<=264_)\d+" quality_list = list(re.findall(pattern, quality_list[1]))[::-1] while len(quality_list) < 5: quality_list.append(quality_list[-1]) video_quality_options = { - "超清": quality_list[0], - "高清": quality_list[1], - "标清": quality_list[2], - "流畅": quality_list[3] + "UHD": quality_list[0], + "HD": quality_list[1], + "SD": quality_list[2], + "LD": quality_list[3] } if video_quality not in video_quality_options: @@ -253,6 +268,7 @@ def get_huya_stream_url(json_data: dict, video_quality: str) -> dict: result |= { 'is_live': True, 'title': live_title, + 'quality': video_quality, 'm3u8_url': m3u8_url, 'flv_url': flv_url, 'record_url': flv_url or m3u8_url @@ -266,12 +282,12 @@ async def get_douyu_stream_url(json_data: dict, video_quality: str, cookies: str return json_data video_quality_options = { - "原画": '0', - "蓝光": '0', - "超清": '3', - "高清": '2', - "标清": '1', - "流畅": '1' + "OD": '0', + "BD": '0', + "UHD": '3', + "HD": '2', + "SD": '1', + "LD": '1' } rid = str(json_data["room_id"]) @@ -282,7 +298,7 @@ async def get_douyu_stream_url(json_data: dict, video_quality: str, cookies: str rtmp_live = flv_data['data'].get('rtmp_live') if rtmp_live: flv_url = f'{rtmp_url}/{rtmp_live}' - json_data |= {'flv_url': flv_url, 'record_url': flv_url} + json_data |= {'quality': video_quality, 'flv_url': flv_url, 'record_url': flv_url} return json_data @@ -300,6 +316,7 @@ def get_yy_stream_url(json_data: dict) -> dict: result |= { 'is_live': True, 'title': json_data['title'], + 'quality': 'OD', 'flv_url': flv_url, 'record_url': flv_url } @@ -318,12 +335,12 @@ async def get_bilibili_stream_url(json_data: dict, video_quality: str, proxy_add room_url = json_data['room_url'] video_quality_options = { - "原画": '10000', - "蓝光": '400', - "超清": '250', - "高清": '150', - "标清": '80', - "流畅": '80' + "OD": '10000', + "BD": '400', + "UHD": '250', + "HD": '150', + "SD": '80', + "LD": '80' } select_quality = video_quality_options[video_quality] @@ -333,6 +350,7 @@ async def get_bilibili_stream_url(json_data: dict, video_quality: str, proxy_add 'anchor_name': json_data['anchor_name'], 'is_live': True, 'title': json_data['title'], + 'quality': video_quality, 'record_url': play_url } @@ -346,8 +364,7 @@ def get_netease_stream_url(json_data: dict, video_quality: str) -> dict: sorted_keys = [key for key in order if key in stream_list] while len(sorted_keys) < 5: sorted_keys.append(sorted_keys[-1]) - quality_list = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4} - selected_quality = sorted_keys[quality_list[video_quality]] + selected_quality = sorted_keys[get_quality_index(video_quality)] flv_url_list = stream_list[selected_quality]['cdn'] selected_cdn = list(flv_url_list.keys())[0] flv_url = flv_url_list[selected_cdn] @@ -355,6 +372,7 @@ def get_netease_stream_url(json_data: dict, video_quality: str) -> dict: "is_live": True, "anchor_name": json_data['anchor_name'], "title": json_data['title'], + 'quality': video_quality, "flv_url": flv_url, "record_url": flv_url } @@ -366,11 +384,10 @@ def get_stream_url(json_data: dict, video_quality: str, url_type: str = 'm3u8', return json_data play_url_list = json_data['play_url_list'] - quality_list = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4} while len(play_url_list) < 5: play_url_list.append(play_url_list[-1]) - selected_quality = quality_list[video_quality] + selected_quality = get_quality_index(video_quality) data = { "anchor_name": json_data['anchor_name'], "is_live": True @@ -395,4 +412,5 @@ def get_stream_url(json_data: dict, video_quality: str, url_type: str = 'm3u8', flv_url = get_url(flv_extra_key) data |= {"flv_url": flv_url, "record_url": flv_url} data['title'] = json_data.get('title') + data['quality'] = video_quality return data