diff --git a/main.py b/main.py index 1b2e74c..f5758ec 100644 --- a/main.py +++ b/main.py @@ -4,7 +4,7 @@ Author: Hmily GitHub: https://github.com/ihmily Date: 2023-07-17 23:52:05 -Update: 2025-07-19 17:43:00 +Update: 2025-10-23 19:48:05 Copyright (c) 2023-2025 by Hmily, All Rights Reserved. Function: Record live stream video. """ @@ -38,7 +38,7 @@ from ffmpeg_install import ( check_ffmpeg, ffmpeg_path, current_env_path ) -version = "v4.0.6" +version = "v4.0.7" platforms = ("\n国内站点:抖音|快手|虎牙|斗鱼|YY|B站|小红书|bigo|blued|网易CC|千度热播|猫耳FM|Look|TwitCasting|百度|微博|" "酷狗|花椒|流星|Acfun|畅聊|映客|音播|知乎|嗨秀|VV星球|17Live|浪Live|漂漂|六间房|乐嗨|花猫|淘宝|京东|咪咕|连接|来秀" "\n海外站点:TikTok|SOOP|PandaTV|WinkTV|FlexTV|PopkonTV|TwitchTV|LiveMe|ShowRoom|CHZZK|Shopee|" @@ -383,7 +383,6 @@ def clear_record_info(record_name: str, record_url: str) -> None: def direct_download_stream(source_url: str, save_path: str, record_name: str, live_url: str, platform: str) -> bool: - try: with open(save_path, 'wb') as f: client = httpx.Client(timeout=None) @@ -398,16 +397,16 @@ def direct_download_stream(source_url: str, save_path: str, record_name: str, li if response.status_code != 200: logger.error(f"请求直播流失败,状态码: {response.status_code}") return False - + downloaded = 0 chunk_size = 1024 * 16 - + for chunk in response.iter_bytes(chunk_size): if live_url in url_comments or exit_recording: color_obj.print_colored(f"[{record_name}]录制时已被注释或请求停止,下载中断", color_obj.YELLOW) clear_record_info(record_name, live_url) return False - + if chunk: f.write(chunk) downloaded += len(chunk) @@ -416,8 +415,8 @@ def direct_download_stream(source_url: str, save_path: str, record_name: str, li except Exception as e: logger.error(f"FLV下载错误: {e} 发生错误的行数: {e.__traceback__.tb_lineno}") return False - - + + def check_subprocess(record_name: str, record_url: str, ffmpeg_command: list, save_type: str, script_command: str | None = None) -> bool: save_file_path = ffmpeg_command[-1] @@ -511,6 +510,7 @@ def get_quality_code(qn): } return QUALITY_MAPPING.get(qn) + def get_record_headers(platform, live_url): live_domain = '/'.join(live_url.split('/')[0:3]) record_headers = { @@ -581,7 +581,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None: platform = '抖音直播' with semaphore: if 'v.douyin.com' not in record_url and '/user/' not in record_url: - json_data = asyncio.run(spider.get_douyin_stream_data( + json_data = asyncio.run(spider.get_douyin_web_stream_data( url=record_url, proxy_addr=proxy_address, cookies=dy_cookie)) @@ -1326,15 +1326,16 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None: recording.add(record_name) start_record_time = datetime.datetime.now() recording_time_list[record_name] = [start_record_time, record_quality_zh] - + download_success = direct_download_stream( flv_url, save_file_path, record_name, record_url, platform ) - + if download_success: record_finished = True - print(f"\n{anchor_name} {time.strftime('%Y-%m-%d %H:%M:%S')} 直播录制完成\n") - + print( + f"\n{anchor_name} {time.strftime('%Y-%m-%d %H:%M:%S')} 直播录制完成\n") + recording.discard(record_name) else: logger.debug("未找到FLV直播流,跳过录制") @@ -1932,10 +1933,10 @@ while True: check_path = video_save_path or default_path if utils.check_disk_capacity(check_path, show=first_run) < disk_space_limit: exit_recording = True - if exit_recording and not recording: - logger.warning(f"Disk space remaining is below {disk_space_limit} GB. " - f"Exiting program due to the disk space limit being reached.") - sys.exit(-1) + if not recording: + logger.warning(f"Disk space remaining is below {disk_space_limit} GB. " + f"Exiting program due to the disk space limit being reached.") + sys.exit(-1) def contains_url(string: str) -> bool: @@ -2150,4 +2151,3 @@ while True: first_run = False time.sleep(3) - diff --git a/src/ab_sign.py b/src/ab_sign.py new file mode 100644 index 0000000..ad04d32 --- /dev/null +++ b/src/ab_sign.py @@ -0,0 +1,454 @@ +# -*- encoding: utf-8 -*- +import math +import time + + +def rc4_encrypt(plaintext: str, key: str) -> str: + # 初始化状态数组 + s = list(range(256)) + + # 使用密钥对状态数组进行置换 + j = 0 + for i in range(256): + j = (j + s[i] + ord(key[i % len(key)])) % 256 + s[i], s[j] = s[j], s[i] + + # 生成密钥流并加密 + i = j = 0 + result = [] + for char in plaintext: + i = (i + 1) % 256 + j = (j + s[i]) % 256 + s[i], s[j] = s[j], s[i] + t = (s[i] + s[j]) % 256 + result.append(chr(s[t] ^ ord(char))) + + return ''.join(result) + + +def left_rotate(x: int, n: int) -> int: + n %= 32 + return ((x << n) | (x >> (32 - n))) & 0xFFFFFFFF + + +def get_t_j(j: int) -> int: + if 0 <= j < 16: + return 2043430169 # 0x79CC4519 + elif 16 <= j < 64: + return 2055708042 # 0x7A879D8A + else: + raise ValueError("invalid j for constant Tj") + + +def ff_j(j: int, x: int, y: int, z: int) -> int: + if 0 <= j < 16: + return (x ^ y ^ z) & 0xFFFFFFFF + elif 16 <= j < 64: + return ((x & y) | (x & z) | (y & z)) & 0xFFFFFFFF + else: + raise ValueError("invalid j for bool function FF") + + +def gg_j(j: int, x: int, y: int, z: int) -> int: + if 0 <= j < 16: + return (x ^ y ^ z) & 0xFFFFFFFF + elif 16 <= j < 64: + return ((x & y) | (~x & z)) & 0xFFFFFFFF + else: + raise ValueError("invalid j for bool function GG") + + +class SM3: + def __init__(self): + self.reg = [] + self.chunk = [] + self.size = 0 + self.reset() + + def reset(self): + # 初始化寄存器值 - 修正为与JS版本相同的值 + self.reg = [ + 1937774191, 1226093241, 388252375, 3666478592, + 2842636476, 372324522, 3817729613, 2969243214 + ] + self.chunk = [] + self.size = 0 + + def write(self, data): + # 将输入转换为字节数组 + if isinstance(data, str): + # 直接转换为UTF-8字节列表 + a = list(data.encode('utf-8')) + else: + a = data + + self.size += len(a) + f = 64 - len(self.chunk) + + if len(a) < f: + # 如果数据长度小于剩余空间,直接添加 + self.chunk.extend(a) + else: + # 否则分块处理 + self.chunk.extend(a[:f]) + + while len(self.chunk) >= 64: + self._compress(self.chunk) + if f < len(a): + self.chunk = a[f:min(f + 64, len(a))] + else: + self.chunk = [] + f += 64 + + def _fill(self): + # 计算比特长度 + bit_length = 8 * self.size + + # 添加填充位 + padding_pos = len(self.chunk) + self.chunk.append(0x80) + padding_pos = (padding_pos + 1) % 64 + + # 如果剩余空间不足8字节,则填充到下一个块 + if 64 - padding_pos < 8: + padding_pos -= 64 + + # 填充0直到剩余8字节用于存储长度 + while padding_pos < 56: + self.chunk.append(0) + padding_pos += 1 + + # 添加消息长度(高32位) + high_bits = bit_length // 4294967296 + for i in range(4): + self.chunk.append((high_bits >> (8 * (3 - i))) & 0xFF) + + # 添加消息长度(低32位) + for i in range(4): + self.chunk.append((bit_length >> (8 * (3 - i))) & 0xFF) + + def _compress(self, data): + if len(data) < 64: + raise ValueError("compress error: not enough data") + else: + # 消息扩展 + w = [0] * 132 + + # 将字节数组转换为字 + for t in range(16): + w[t] = (data[4 * t] << 24) | (data[4 * t + 1] << 16) | (data[4 * t + 2] << 8) | data[4 * t + 3] + w[t] &= 0xFFFFFFFF + + # 消息扩展 + for j in range(16, 68): + a = w[j - 16] ^ w[j - 9] ^ left_rotate(w[j - 3], 15) + a = a ^ left_rotate(a, 15) ^ left_rotate(a, 23) + w[j] = (a ^ left_rotate(w[j - 13], 7) ^ w[j - 6]) & 0xFFFFFFFF + + # 计算w' + for j in range(64): + w[j + 68] = (w[j] ^ w[j + 4]) & 0xFFFFFFFF + + # 压缩 + a, b, c, d, e, f, g, h = self.reg + + for j in range(64): + ss1 = left_rotate((left_rotate(a, 12) + e + left_rotate(get_t_j(j), j)) & 0xFFFFFFFF, 7) + ss2 = ss1 ^ left_rotate(a, 12) + tt1 = (ff_j(j, a, b, c) + d + ss2 + w[j + 68]) & 0xFFFFFFFF + tt2 = (gg_j(j, e, f, g) + h + ss1 + w[j]) & 0xFFFFFFFF + + d = c + c = left_rotate(b, 9) + b = a + a = tt1 + h = g + g = left_rotate(f, 19) + f = e + e = (tt2 ^ left_rotate(tt2, 9) ^ left_rotate(tt2, 17)) & 0xFFFFFFFF + + # 更新寄存器 + self.reg[0] ^= a + self.reg[1] ^= b + self.reg[2] ^= c + self.reg[3] ^= d + self.reg[4] ^= e + self.reg[5] ^= f + self.reg[6] ^= g + self.reg[7] ^= h + + def sum(self, data=None, output_format=None): + """ + 计算哈希值 + """ + # 如果提供了输入,则重置并写入 + if data is not None: + self.reset() + self.write(data) + + self._fill() + + # 分块压缩 + for f in range(0, len(self.chunk), 64): + self._compress(self.chunk[f:f + 64]) + + if output_format == 'hex': + # 十六进制输出 + result = ''.join(f'{val:08x}' for val in self.reg) + else: + # 字节数组输出 + result = [] + for f in range(8): + c = self.reg[f] + result.append((c >> 24) & 0xFF) + result.append((c >> 16) & 0xFF) + result.append((c >> 8) & 0xFF) + result.append(c & 0xFF) + + self.reset() + return result + + +def result_encrypt(long_str: str, num: str | None = None) -> str: + # 魔改base64编码表 + encoding_tables = { + "s0": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=", + "s1": "Dkdpgh4ZKsQB80/Mfvw36XI1R25+WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=", + "s2": "Dkdpgh4ZKsQB80/Mfvw36XI1R25-WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=", + "s3": "ckdp1h4ZKsUB80/Mfvw36XIgR25+WQAlEi7NLboqYTOPuzmFjJnryx9HVGDaStCe", + "s4": "Dkdpgh2ZmsQB80/MfvV36XI1R45-WUAlEixNLwoqYTOPuzKFjJnry79HbGcaStCe" + } + + # 位移常量 + masks = [16515072, 258048, 4032, 63] # 对应 0, 1, 2 的掩码,添加63作为第四个掩码 + shifts = [18, 12, 6, 0] # 对应的位移量 + + encoding_table = encoding_tables[num] + + result = "" + round_num = 0 + long_int = get_long_int(round_num, long_str) + + total_chars = math.ceil(len(long_str) / 3 * 4) + + for i in range(total_chars): + # 每4个字符处理一组3字节 + if i // 4 != round_num: + round_num += 1 + long_int = get_long_int(round_num, long_str) + + # 计算当前位置的索引 + index = i % 4 + + # 使用掩码和位移提取6位值 + char_index = (long_int & masks[index]) >> shifts[index] + + result += encoding_table[char_index] + + return result + + +def get_long_int(round_num: int, long_str: str) -> int: + round_num = round_num * 3 + + # 获取字符串中的字符,如果超出范围则使用0 + char1 = ord(long_str[round_num]) if round_num < len(long_str) else 0 + char2 = ord(long_str[round_num + 1]) if round_num + 1 < len(long_str) else 0 + char3 = ord(long_str[round_num + 2]) if round_num + 2 < len(long_str) else 0 + + return (char1 << 16) | (char2 << 8) | char3 + + +def gener_random(random_num: int, option: list[int]) -> list[int]: + byte1 = random_num & 255 + byte2 = (random_num >> 8) & 255 + + return [ + (byte1 & 170) | (option[0] & 85), # 偶数位与option[0]的奇数位合并 + (byte1 & 85) | (option[0] & 170), # 奇数位与option[0]的偶数位合并 + (byte2 & 170) | (option[1] & 85), # 偶数位与option[1]的奇数位合并 + (byte2 & 85) | (option[1] & 170), # 奇数位与option[1]的偶数位合并 + ] + + +def generate_random_str() -> str: + """ + 生成随机字符串 + + Returns: + 随机字符串 + """ + # 使用与JS版本相同的固定随机值 + random_values = [0.123456789, 0.987654321, 0.555555555] + + # 生成三组随机字节并合并 + random_bytes = [] + random_bytes.extend(gener_random(int(random_values[0] * 10000), [3, 45])) + random_bytes.extend(gener_random(int(random_values[1] * 10000), [1, 0])) + random_bytes.extend(gener_random(int(random_values[2] * 10000), [1, 5])) + + return ''.join(chr(b) for b in random_bytes) + + +def generate_rc4_bb_str(url_search_params: str, user_agent: str, window_env_str: str, + suffix: str = "cus", arguments: list[int] | None = None) -> str: + if arguments is None: + arguments = [0, 1, 14] + + sm3 = SM3() + start_time = int(time.time() * 1000) + + # 三次加密处理 + # 1: url_search_params两次sm3之的结果 + url_search_params_list = sm3.sum(sm3.sum(url_search_params + suffix)) + # 2: 对后缀两次sm3之的结果 + cus = sm3.sum(sm3.sum(suffix)) + # 3: 对ua处理之后的结果 + ua_key = chr(0) + chr(1) + chr(14) # [1/256, 1, 14] + ua = sm3.sum(result_encrypt( + rc4_encrypt(user_agent, ua_key), + "s3" + )) + + end_time = start_time + 100 + + # 构建配置对象 + b = { + 8: 3, + 10: end_time, + 15: { + "aid": 6383, + "pageId": 110624, + "boe": False, + "ddrt": 7, + "paths": { + "include": [{} for _ in range(7)], + "exclude": [] + }, + "track": { + "mode": 0, + "delay": 300, + "paths": [] + }, + "dump": True, + "rpU": "hwj" + }, + 16: start_time, + 18: 44, + 19: [1, 0, 1, 5], + } + + def split_to_bytes(num: int) -> list[int]: + return [ + (num >> 24) & 255, + (num >> 16) & 255, + (num >> 8) & 255, + num & 255 + ] + + # 处理时间戳 + start_time_bytes = split_to_bytes(b[16]) + b[20] = start_time_bytes[0] + b[21] = start_time_bytes[1] + b[22] = start_time_bytes[2] + b[23] = start_time_bytes[3] + b[24] = int(b[16] / 256 / 256 / 256 / 256) & 255 + b[25] = int(b[16] / 256 / 256 / 256 / 256 / 256) & 255 + + # 处理Arguments参数 + arg0_bytes = split_to_bytes(arguments[0]) + b[26] = arg0_bytes[0] + b[27] = arg0_bytes[1] + b[28] = arg0_bytes[2] + b[29] = arg0_bytes[3] + + b[30] = int(arguments[1] / 256) & 255 + b[31] = (arguments[1] % 256) & 255 + + arg1_bytes = split_to_bytes(arguments[1]) + b[32] = arg1_bytes[0] + b[33] = arg1_bytes[1] + + arg2_bytes = split_to_bytes(arguments[2]) + b[34] = arg2_bytes[0] + b[35] = arg2_bytes[1] + b[36] = arg2_bytes[2] + b[37] = arg2_bytes[3] + + # 处理加密结果 + b[38] = url_search_params_list[21] + b[39] = url_search_params_list[22] + b[40] = cus[21] + b[41] = cus[22] + b[42] = ua[23] + b[43] = ua[24] + + # 处理结束时间 + end_time_bytes = split_to_bytes(b[10]) + b[44] = end_time_bytes[0] + b[45] = end_time_bytes[1] + b[46] = end_time_bytes[2] + b[47] = end_time_bytes[3] + b[48] = b[8] + b[49] = int(b[10] / 256 / 256 / 256 / 256) & 255 + b[50] = int(b[10] / 256 / 256 / 256 / 256 / 256) & 255 + + # 处理配置项 + b[51] = b[15]['pageId'] + + page_id_bytes = split_to_bytes(b[15]['pageId']) + b[52] = page_id_bytes[0] + b[53] = page_id_bytes[1] + b[54] = page_id_bytes[2] + b[55] = page_id_bytes[3] + + b[56] = b[15]['aid'] + b[57] = b[15]['aid'] & 255 + b[58] = (b[15]['aid'] >> 8) & 255 + b[59] = (b[15]['aid'] >> 16) & 255 + b[60] = (b[15]['aid'] >> 24) & 255 + + # 处理环境信息 + window_env_list = [ord(char) for char in window_env_str] + b[64] = len(window_env_list) + b[65] = b[64] & 255 + b[66] = (b[64] >> 8) & 255 + + b[69] = 0 + b[70] = 0 + b[71] = 0 + + # 计算校验和 + b[72] = b[18] ^ b[20] ^ b[26] ^ b[30] ^ b[38] ^ b[40] ^ b[42] ^ b[21] ^ b[27] ^ b[31] ^ \ + b[35] ^ b[39] ^ b[41] ^ b[43] ^ b[22] ^ b[28] ^ b[32] ^ b[36] ^ b[23] ^ b[29] ^ \ + b[33] ^ b[37] ^ b[44] ^ b[45] ^ b[46] ^ b[47] ^ b[48] ^ b[49] ^ b[50] ^ b[24] ^ \ + b[25] ^ b[52] ^ b[53] ^ b[54] ^ b[55] ^ b[57] ^ b[58] ^ b[59] ^ b[60] ^ b[65] ^ \ + b[66] ^ b[70] ^ b[71] + + # 构建最终字节数组 + bb = [ + b[18], b[20], b[52], b[26], b[30], b[34], b[58], b[38], b[40], b[53], b[42], b[21], + b[27], b[54], b[55], b[31], b[35], b[57], b[39], b[41], b[43], b[22], b[28], b[32], + b[60], b[36], b[23], b[29], b[33], b[37], b[44], b[45], b[59], b[46], b[47], b[48], + b[49], b[50], b[24], b[25], b[65], b[66], b[70], b[71] + ] + bb.extend(window_env_list) + bb.append(b[72]) + + return rc4_encrypt( + ''.join(chr(byte) for byte in bb), + chr(121) + ) + + +def ab_sign(url_search_params: str, user_agent: str) -> str: + window_env_str = "1920|1080|1920|1040|0|30|0|0|1872|92|1920|1040|1857|92|1|24|Win32" + + # 1. 生成随机字符串前缀 + # 2. 生成RC4加密的主体部分 + # 3. 对结果进行最终加密并添加等号后缀 + return result_encrypt( + generate_random_str() + + generate_rc4_bb_str(url_search_params, user_agent, window_env_str), + "s4" + ) + "=" diff --git a/src/spider.py b/src/spider.py index 2b5c682..4614c97 100644 --- a/src/spider.py +++ b/src/spider.py @@ -4,7 +4,7 @@ Author: Hmily GitHub: https://github.com/ihmily Date: 2023-07-15 23:15:00 -Update: 2025-07-19 17:43:00 +Update: 2025-10-23 18:28:00 Copyright (c) 2023-2025 by Hmily, All Rights Reserved. Function: Get live stream data. """ @@ -29,6 +29,7 @@ from .utils import trace_error_decorator, generate_random_string from .logger import script_path from .room import get_sec_user_id, get_unique_id, UnsupportedUrlError from .http_clients.async_http import async_req +from .ab_sign import ab_sign ssl_context = ssl.create_default_context() @@ -159,6 +160,75 @@ async def get_douyin_app_stream_data(url: str, proxy_addr: OptionalStr = None, c return room_data +async def get_douyin_web_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: OptionalStr = None): + headers = { + 'cookie': 'ttwid=1%7C2iDIYVmjzMcpZ20fcaFde0VghXAA3NaNXE_SLR68IyE%7C1761045455' + '%7Cab35197d5cfb21df6cbb2fa7ef1c9262206b062c315b9d04da746d0b37dfbc7d', + 'referer': 'https://live.douyin.com/335354047186', + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/116.0.5845.97 Safari/537.36 Core/1.116.567.400 QQBrowser/19.7.6764.400', + } + if cookies: + headers['cookie'] = cookies + + try: + web_rid = url.split('?')[0].split('live.douyin.com/')[-1] + params = { + "aid": "6383", + "app_name": "douyin_web", + "live_id": "1", + "device_platform": "web", + "language": "zh-CN", + "browser_language": "zh-CN", + "browser_platform": "Win32", + "browser_name": "Chrome", + "browser_version": "116.0.0.0", + "web_rid": web_rid, + 'msToken': '', + } + + api = f'https://live.douyin.com/webcast/room/web/enter/?{urllib.parse.urlencode(params)}' + a_bogus = ab_sign(urllib.parse.urlparse(api).query, headers['user-agent']) + api += "&a_bogus=" + a_bogus + json_str = await async_req(url=api, proxy_addr=proxy_addr, headers=headers) + json_data = json.loads(json_str)['data'] + room_data = json_data['data'][0] + room_data['anchor_name'] = json_data['user']['nickname'] + + if room_data['status'] == 2: + if 'stream_url' not in room_data: + raise RuntimeError( + "The live streaming type or gameplay is not supported on the computer side yet, please use the " + "app to share the link for recording." + ) + live_core_sdk_data = room_data['stream_url']['live_core_sdk_data'] + pull_datas = room_data['stream_url']['pull_datas'] + if live_core_sdk_data: + if pull_datas: + key = list(pull_datas.keys())[0] + json_str = pull_datas[key]['stream_data'] + else: + json_str = live_core_sdk_data['pull_data']['stream_data'] + json_data = json.loads(json_str) + if 'origin' in json_data['data']: + stream_data = live_core_sdk_data['pull_data']['stream_data'] + origin_data = json.loads(stream_data)['data']['origin']['main'] + sdk_params = json.loads(origin_data['sdk_params']) + origin_hls_codec = sdk_params.get('VCodec') or '' + + origin_url_list = json_data['data']['origin']['main'] + origin_m3u8 = {'ORIGIN': origin_url_list["hls"] + '&codec=' + origin_hls_codec} + origin_flv = {'ORIGIN': origin_url_list["flv"] + '&codec=' + origin_hls_codec} + hls_pull_url_map = room_data['stream_url']['hls_pull_url_map'] + flv_pull_url = room_data['stream_url']['flv_pull_url'] + room_data['stream_url']['hls_pull_url_map'] = {**origin_m3u8, **hls_pull_url_map} + room_data['stream_url']['flv_pull_url'] = {**origin_flv, **flv_pull_url} + except Exception as e: + print(f"Error message: {e} Error line: {e.__traceback__.tb_lineno}") + room_data = {'anchor_name': ""} + return room_data + + @trace_error_decorator async def get_douyin_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: OptionalStr = None) -> dict: headers = { @@ -3250,4 +3320,4 @@ async def get_picarto_stream_url(url: str, proxy_addr: OptionalStr = None, cooki title = json_data['channel']['title'] m3u8_url = f"https://1-edge1-us-newyork.picarto.tv/stream/hls/golive+{anchor_name}/index.m3u8" result |= {'is_live': True, 'title': title, 'm3u8_url': m3u8_url, 'record_url': m3u8_url} - return result \ No newline at end of file + return result