fix douyin stream fetch

This commit is contained in:
ihmily 2025-10-23 19:55:56 +08:00
parent abb204e6e9
commit 200e5b5b58
3 changed files with 544 additions and 20 deletions

36
main.py
View File

@ -4,7 +4,7 @@
Author: Hmily
GitHub: https://github.com/ihmily
Date: 2023-07-17 23:52:05
Update: 2025-07-19 17:43:00
Update: 2025-10-23 19:48:05
Copyright (c) 2023-2025 by Hmily, All Rights Reserved.
Function: Record live stream video.
"""
@ -38,7 +38,7 @@ from ffmpeg_install import (
check_ffmpeg, ffmpeg_path, current_env_path
)
version = "v4.0.6"
version = "v4.0.7"
platforms = ("\n国内站点:抖音|快手|虎牙|斗鱼|YY|B站|小红书|bigo|blued|网易CC|千度热播|猫耳FM|Look|TwitCasting|百度|微博|"
"酷狗|花椒|流星|Acfun|畅聊|映客|音播|知乎|嗨秀|VV星球|17Live|浪Live|漂漂|六间房|乐嗨|花猫|淘宝|京东|咪咕|连接|来秀"
"\n海外站点TikTok|SOOP|PandaTV|WinkTV|FlexTV|PopkonTV|TwitchTV|LiveMe|ShowRoom|CHZZK|Shopee|"
@ -383,7 +383,6 @@ def clear_record_info(record_name: str, record_url: str) -> None:
def direct_download_stream(source_url: str, save_path: str, record_name: str, live_url: str, platform: str) -> bool:
try:
with open(save_path, 'wb') as f:
client = httpx.Client(timeout=None)
@ -398,16 +397,16 @@ def direct_download_stream(source_url: str, save_path: str, record_name: str, li
if response.status_code != 200:
logger.error(f"请求直播流失败,状态码: {response.status_code}")
return False
downloaded = 0
chunk_size = 1024 * 16
for chunk in response.iter_bytes(chunk_size):
if live_url in url_comments or exit_recording:
color_obj.print_colored(f"[{record_name}]录制时已被注释或请求停止,下载中断", color_obj.YELLOW)
clear_record_info(record_name, live_url)
return False
if chunk:
f.write(chunk)
downloaded += len(chunk)
@ -416,8 +415,8 @@ def direct_download_stream(source_url: str, save_path: str, record_name: str, li
except Exception as e:
logger.error(f"FLV下载错误: {e} 发生错误的行数: {e.__traceback__.tb_lineno}")
return False
def check_subprocess(record_name: str, record_url: str, ffmpeg_command: list, save_type: str,
script_command: str | None = None) -> bool:
save_file_path = ffmpeg_command[-1]
@ -511,6 +510,7 @@ def get_quality_code(qn):
}
return QUALITY_MAPPING.get(qn)
def get_record_headers(platform, live_url):
live_domain = '/'.join(live_url.split('/')[0:3])
record_headers = {
@ -581,7 +581,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
platform = '抖音直播'
with semaphore:
if 'v.douyin.com' not in record_url and '/user/' not in record_url:
json_data = asyncio.run(spider.get_douyin_stream_data(
json_data = asyncio.run(spider.get_douyin_web_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=dy_cookie))
@ -1326,15 +1326,16 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
recording.add(record_name)
start_record_time = datetime.datetime.now()
recording_time_list[record_name] = [start_record_time, record_quality_zh]
download_success = direct_download_stream(
flv_url, save_file_path, record_name, record_url, platform
)
if download_success:
record_finished = True
print(f"\n{anchor_name} {time.strftime('%Y-%m-%d %H:%M:%S')} 直播录制完成\n")
print(
f"\n{anchor_name} {time.strftime('%Y-%m-%d %H:%M:%S')} 直播录制完成\n")
recording.discard(record_name)
else:
logger.debug("未找到FLV直播流跳过录制")
@ -1932,10 +1933,10 @@ while True:
check_path = video_save_path or default_path
if utils.check_disk_capacity(check_path, show=first_run) < disk_space_limit:
exit_recording = True
if exit_recording and not recording:
logger.warning(f"Disk space remaining is below {disk_space_limit} GB. "
f"Exiting program due to the disk space limit being reached.")
sys.exit(-1)
if not recording:
logger.warning(f"Disk space remaining is below {disk_space_limit} GB. "
f"Exiting program due to the disk space limit being reached.")
sys.exit(-1)
def contains_url(string: str) -> bool:
@ -2150,4 +2151,3 @@ while True:
first_run = False
time.sleep(3)

454
src/ab_sign.py Normal file
View File

@ -0,0 +1,454 @@
# -*- encoding: utf-8 -*-
import math
import time
def rc4_encrypt(plaintext: str, key: str) -> str:
# 初始化状态数组
s = list(range(256))
# 使用密钥对状态数组进行置换
j = 0
for i in range(256):
j = (j + s[i] + ord(key[i % len(key)])) % 256
s[i], s[j] = s[j], s[i]
# 生成密钥流并加密
i = j = 0
result = []
for char in plaintext:
i = (i + 1) % 256
j = (j + s[i]) % 256
s[i], s[j] = s[j], s[i]
t = (s[i] + s[j]) % 256
result.append(chr(s[t] ^ ord(char)))
return ''.join(result)
def left_rotate(x: int, n: int) -> int:
n %= 32
return ((x << n) | (x >> (32 - n))) & 0xFFFFFFFF
def get_t_j(j: int) -> int:
if 0 <= j < 16:
return 2043430169 # 0x79CC4519
elif 16 <= j < 64:
return 2055708042 # 0x7A879D8A
else:
raise ValueError("invalid j for constant Tj")
def ff_j(j: int, x: int, y: int, z: int) -> int:
if 0 <= j < 16:
return (x ^ y ^ z) & 0xFFFFFFFF
elif 16 <= j < 64:
return ((x & y) | (x & z) | (y & z)) & 0xFFFFFFFF
else:
raise ValueError("invalid j for bool function FF")
def gg_j(j: int, x: int, y: int, z: int) -> int:
if 0 <= j < 16:
return (x ^ y ^ z) & 0xFFFFFFFF
elif 16 <= j < 64:
return ((x & y) | (~x & z)) & 0xFFFFFFFF
else:
raise ValueError("invalid j for bool function GG")
class SM3:
def __init__(self):
self.reg = []
self.chunk = []
self.size = 0
self.reset()
def reset(self):
# 初始化寄存器值 - 修正为与JS版本相同的值
self.reg = [
1937774191, 1226093241, 388252375, 3666478592,
2842636476, 372324522, 3817729613, 2969243214
]
self.chunk = []
self.size = 0
def write(self, data):
# 将输入转换为字节数组
if isinstance(data, str):
# 直接转换为UTF-8字节列表
a = list(data.encode('utf-8'))
else:
a = data
self.size += len(a)
f = 64 - len(self.chunk)
if len(a) < f:
# 如果数据长度小于剩余空间,直接添加
self.chunk.extend(a)
else:
# 否则分块处理
self.chunk.extend(a[:f])
while len(self.chunk) >= 64:
self._compress(self.chunk)
if f < len(a):
self.chunk = a[f:min(f + 64, len(a))]
else:
self.chunk = []
f += 64
def _fill(self):
# 计算比特长度
bit_length = 8 * self.size
# 添加填充位
padding_pos = len(self.chunk)
self.chunk.append(0x80)
padding_pos = (padding_pos + 1) % 64
# 如果剩余空间不足8字节则填充到下一个块
if 64 - padding_pos < 8:
padding_pos -= 64
# 填充0直到剩余8字节用于存储长度
while padding_pos < 56:
self.chunk.append(0)
padding_pos += 1
# 添加消息长度高32位
high_bits = bit_length // 4294967296
for i in range(4):
self.chunk.append((high_bits >> (8 * (3 - i))) & 0xFF)
# 添加消息长度低32位
for i in range(4):
self.chunk.append((bit_length >> (8 * (3 - i))) & 0xFF)
def _compress(self, data):
if len(data) < 64:
raise ValueError("compress error: not enough data")
else:
# 消息扩展
w = [0] * 132
# 将字节数组转换为字
for t in range(16):
w[t] = (data[4 * t] << 24) | (data[4 * t + 1] << 16) | (data[4 * t + 2] << 8) | data[4 * t + 3]
w[t] &= 0xFFFFFFFF
# 消息扩展
for j in range(16, 68):
a = w[j - 16] ^ w[j - 9] ^ left_rotate(w[j - 3], 15)
a = a ^ left_rotate(a, 15) ^ left_rotate(a, 23)
w[j] = (a ^ left_rotate(w[j - 13], 7) ^ w[j - 6]) & 0xFFFFFFFF
# 计算w'
for j in range(64):
w[j + 68] = (w[j] ^ w[j + 4]) & 0xFFFFFFFF
# 压缩
a, b, c, d, e, f, g, h = self.reg
for j in range(64):
ss1 = left_rotate((left_rotate(a, 12) + e + left_rotate(get_t_j(j), j)) & 0xFFFFFFFF, 7)
ss2 = ss1 ^ left_rotate(a, 12)
tt1 = (ff_j(j, a, b, c) + d + ss2 + w[j + 68]) & 0xFFFFFFFF
tt2 = (gg_j(j, e, f, g) + h + ss1 + w[j]) & 0xFFFFFFFF
d = c
c = left_rotate(b, 9)
b = a
a = tt1
h = g
g = left_rotate(f, 19)
f = e
e = (tt2 ^ left_rotate(tt2, 9) ^ left_rotate(tt2, 17)) & 0xFFFFFFFF
# 更新寄存器
self.reg[0] ^= a
self.reg[1] ^= b
self.reg[2] ^= c
self.reg[3] ^= d
self.reg[4] ^= e
self.reg[5] ^= f
self.reg[6] ^= g
self.reg[7] ^= h
def sum(self, data=None, output_format=None):
"""
计算哈希值
"""
# 如果提供了输入,则重置并写入
if data is not None:
self.reset()
self.write(data)
self._fill()
# 分块压缩
for f in range(0, len(self.chunk), 64):
self._compress(self.chunk[f:f + 64])
if output_format == 'hex':
# 十六进制输出
result = ''.join(f'{val:08x}' for val in self.reg)
else:
# 字节数组输出
result = []
for f in range(8):
c = self.reg[f]
result.append((c >> 24) & 0xFF)
result.append((c >> 16) & 0xFF)
result.append((c >> 8) & 0xFF)
result.append(c & 0xFF)
self.reset()
return result
def result_encrypt(long_str: str, num: str | None = None) -> str:
# 魔改base64编码表
encoding_tables = {
"s0": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=",
"s1": "Dkdpgh4ZKsQB80/Mfvw36XI1R25+WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=",
"s2": "Dkdpgh4ZKsQB80/Mfvw36XI1R25-WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=",
"s3": "ckdp1h4ZKsUB80/Mfvw36XIgR25+WQAlEi7NLboqYTOPuzmFjJnryx9HVGDaStCe",
"s4": "Dkdpgh2ZmsQB80/MfvV36XI1R45-WUAlEixNLwoqYTOPuzKFjJnry79HbGcaStCe"
}
# 位移常量
masks = [16515072, 258048, 4032, 63] # 对应 0, 1, 2 的掩码添加63作为第四个掩码
shifts = [18, 12, 6, 0] # 对应的位移量
encoding_table = encoding_tables[num]
result = ""
round_num = 0
long_int = get_long_int(round_num, long_str)
total_chars = math.ceil(len(long_str) / 3 * 4)
for i in range(total_chars):
# 每4个字符处理一组3字节
if i // 4 != round_num:
round_num += 1
long_int = get_long_int(round_num, long_str)
# 计算当前位置的索引
index = i % 4
# 使用掩码和位移提取6位值
char_index = (long_int & masks[index]) >> shifts[index]
result += encoding_table[char_index]
return result
def get_long_int(round_num: int, long_str: str) -> int:
round_num = round_num * 3
# 获取字符串中的字符如果超出范围则使用0
char1 = ord(long_str[round_num]) if round_num < len(long_str) else 0
char2 = ord(long_str[round_num + 1]) if round_num + 1 < len(long_str) else 0
char3 = ord(long_str[round_num + 2]) if round_num + 2 < len(long_str) else 0
return (char1 << 16) | (char2 << 8) | char3
def gener_random(random_num: int, option: list[int]) -> list[int]:
byte1 = random_num & 255
byte2 = (random_num >> 8) & 255
return [
(byte1 & 170) | (option[0] & 85), # 偶数位与option[0]的奇数位合并
(byte1 & 85) | (option[0] & 170), # 奇数位与option[0]的偶数位合并
(byte2 & 170) | (option[1] & 85), # 偶数位与option[1]的奇数位合并
(byte2 & 85) | (option[1] & 170), # 奇数位与option[1]的偶数位合并
]
def generate_random_str() -> str:
"""
生成随机字符串
Returns:
随机字符串
"""
# 使用与JS版本相同的固定随机值
random_values = [0.123456789, 0.987654321, 0.555555555]
# 生成三组随机字节并合并
random_bytes = []
random_bytes.extend(gener_random(int(random_values[0] * 10000), [3, 45]))
random_bytes.extend(gener_random(int(random_values[1] * 10000), [1, 0]))
random_bytes.extend(gener_random(int(random_values[2] * 10000), [1, 5]))
return ''.join(chr(b) for b in random_bytes)
def generate_rc4_bb_str(url_search_params: str, user_agent: str, window_env_str: str,
suffix: str = "cus", arguments: list[int] | None = None) -> str:
if arguments is None:
arguments = [0, 1, 14]
sm3 = SM3()
start_time = int(time.time() * 1000)
# 三次加密处理
# 1: url_search_params两次sm3之的结果
url_search_params_list = sm3.sum(sm3.sum(url_search_params + suffix))
# 2: 对后缀两次sm3之的结果
cus = sm3.sum(sm3.sum(suffix))
# 3: 对ua处理之后的结果
ua_key = chr(0) + chr(1) + chr(14) # [1/256, 1, 14]
ua = sm3.sum(result_encrypt(
rc4_encrypt(user_agent, ua_key),
"s3"
))
end_time = start_time + 100
# 构建配置对象
b = {
8: 3,
10: end_time,
15: {
"aid": 6383,
"pageId": 110624,
"boe": False,
"ddrt": 7,
"paths": {
"include": [{} for _ in range(7)],
"exclude": []
},
"track": {
"mode": 0,
"delay": 300,
"paths": []
},
"dump": True,
"rpU": "hwj"
},
16: start_time,
18: 44,
19: [1, 0, 1, 5],
}
def split_to_bytes(num: int) -> list[int]:
return [
(num >> 24) & 255,
(num >> 16) & 255,
(num >> 8) & 255,
num & 255
]
# 处理时间戳
start_time_bytes = split_to_bytes(b[16])
b[20] = start_time_bytes[0]
b[21] = start_time_bytes[1]
b[22] = start_time_bytes[2]
b[23] = start_time_bytes[3]
b[24] = int(b[16] / 256 / 256 / 256 / 256) & 255
b[25] = int(b[16] / 256 / 256 / 256 / 256 / 256) & 255
# 处理Arguments参数
arg0_bytes = split_to_bytes(arguments[0])
b[26] = arg0_bytes[0]
b[27] = arg0_bytes[1]
b[28] = arg0_bytes[2]
b[29] = arg0_bytes[3]
b[30] = int(arguments[1] / 256) & 255
b[31] = (arguments[1] % 256) & 255
arg1_bytes = split_to_bytes(arguments[1])
b[32] = arg1_bytes[0]
b[33] = arg1_bytes[1]
arg2_bytes = split_to_bytes(arguments[2])
b[34] = arg2_bytes[0]
b[35] = arg2_bytes[1]
b[36] = arg2_bytes[2]
b[37] = arg2_bytes[3]
# 处理加密结果
b[38] = url_search_params_list[21]
b[39] = url_search_params_list[22]
b[40] = cus[21]
b[41] = cus[22]
b[42] = ua[23]
b[43] = ua[24]
# 处理结束时间
end_time_bytes = split_to_bytes(b[10])
b[44] = end_time_bytes[0]
b[45] = end_time_bytes[1]
b[46] = end_time_bytes[2]
b[47] = end_time_bytes[3]
b[48] = b[8]
b[49] = int(b[10] / 256 / 256 / 256 / 256) & 255
b[50] = int(b[10] / 256 / 256 / 256 / 256 / 256) & 255
# 处理配置项
b[51] = b[15]['pageId']
page_id_bytes = split_to_bytes(b[15]['pageId'])
b[52] = page_id_bytes[0]
b[53] = page_id_bytes[1]
b[54] = page_id_bytes[2]
b[55] = page_id_bytes[3]
b[56] = b[15]['aid']
b[57] = b[15]['aid'] & 255
b[58] = (b[15]['aid'] >> 8) & 255
b[59] = (b[15]['aid'] >> 16) & 255
b[60] = (b[15]['aid'] >> 24) & 255
# 处理环境信息
window_env_list = [ord(char) for char in window_env_str]
b[64] = len(window_env_list)
b[65] = b[64] & 255
b[66] = (b[64] >> 8) & 255
b[69] = 0
b[70] = 0
b[71] = 0
# 计算校验和
b[72] = b[18] ^ b[20] ^ b[26] ^ b[30] ^ b[38] ^ b[40] ^ b[42] ^ b[21] ^ b[27] ^ b[31] ^ \
b[35] ^ b[39] ^ b[41] ^ b[43] ^ b[22] ^ b[28] ^ b[32] ^ b[36] ^ b[23] ^ b[29] ^ \
b[33] ^ b[37] ^ b[44] ^ b[45] ^ b[46] ^ b[47] ^ b[48] ^ b[49] ^ b[50] ^ b[24] ^ \
b[25] ^ b[52] ^ b[53] ^ b[54] ^ b[55] ^ b[57] ^ b[58] ^ b[59] ^ b[60] ^ b[65] ^ \
b[66] ^ b[70] ^ b[71]
# 构建最终字节数组
bb = [
b[18], b[20], b[52], b[26], b[30], b[34], b[58], b[38], b[40], b[53], b[42], b[21],
b[27], b[54], b[55], b[31], b[35], b[57], b[39], b[41], b[43], b[22], b[28], b[32],
b[60], b[36], b[23], b[29], b[33], b[37], b[44], b[45], b[59], b[46], b[47], b[48],
b[49], b[50], b[24], b[25], b[65], b[66], b[70], b[71]
]
bb.extend(window_env_list)
bb.append(b[72])
return rc4_encrypt(
''.join(chr(byte) for byte in bb),
chr(121)
)
def ab_sign(url_search_params: str, user_agent: str) -> str:
window_env_str = "1920|1080|1920|1040|0|30|0|0|1872|92|1920|1040|1857|92|1|24|Win32"
# 1. 生成随机字符串前缀
# 2. 生成RC4加密的主体部分
# 3. 对结果进行最终加密并添加等号后缀
return result_encrypt(
generate_random_str() +
generate_rc4_bb_str(url_search_params, user_agent, window_env_str),
"s4"
) + "="

View File

@ -4,7 +4,7 @@
Author: Hmily
GitHub: https://github.com/ihmily
Date: 2023-07-15 23:15:00
Update: 2025-07-19 17:43:00
Update: 2025-10-23 18:28:00
Copyright (c) 2023-2025 by Hmily, All Rights Reserved.
Function: Get live stream data.
"""
@ -29,6 +29,7 @@ from .utils import trace_error_decorator, generate_random_string
from .logger import script_path
from .room import get_sec_user_id, get_unique_id, UnsupportedUrlError
from .http_clients.async_http import async_req
from .ab_sign import ab_sign
ssl_context = ssl.create_default_context()
@ -159,6 +160,75 @@ async def get_douyin_app_stream_data(url: str, proxy_addr: OptionalStr = None, c
return room_data
async def get_douyin_web_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: OptionalStr = None):
headers = {
'cookie': 'ttwid=1%7C2iDIYVmjzMcpZ20fcaFde0VghXAA3NaNXE_SLR68IyE%7C1761045455'
'%7Cab35197d5cfb21df6cbb2fa7ef1c9262206b062c315b9d04da746d0b37dfbc7d',
'referer': 'https://live.douyin.com/335354047186',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/116.0.5845.97 Safari/537.36 Core/1.116.567.400 QQBrowser/19.7.6764.400',
}
if cookies:
headers['cookie'] = cookies
try:
web_rid = url.split('?')[0].split('live.douyin.com/')[-1]
params = {
"aid": "6383",
"app_name": "douyin_web",
"live_id": "1",
"device_platform": "web",
"language": "zh-CN",
"browser_language": "zh-CN",
"browser_platform": "Win32",
"browser_name": "Chrome",
"browser_version": "116.0.0.0",
"web_rid": web_rid,
'msToken': '',
}
api = f'https://live.douyin.com/webcast/room/web/enter/?{urllib.parse.urlencode(params)}'
a_bogus = ab_sign(urllib.parse.urlparse(api).query, headers['user-agent'])
api += "&a_bogus=" + a_bogus
json_str = await async_req(url=api, proxy_addr=proxy_addr, headers=headers)
json_data = json.loads(json_str)['data']
room_data = json_data['data'][0]
room_data['anchor_name'] = json_data['user']['nickname']
if room_data['status'] == 2:
if 'stream_url' not in room_data:
raise RuntimeError(
"The live streaming type or gameplay is not supported on the computer side yet, please use the "
"app to share the link for recording."
)
live_core_sdk_data = room_data['stream_url']['live_core_sdk_data']
pull_datas = room_data['stream_url']['pull_datas']
if live_core_sdk_data:
if pull_datas:
key = list(pull_datas.keys())[0]
json_str = pull_datas[key]['stream_data']
else:
json_str = live_core_sdk_data['pull_data']['stream_data']
json_data = json.loads(json_str)
if 'origin' in json_data['data']:
stream_data = live_core_sdk_data['pull_data']['stream_data']
origin_data = json.loads(stream_data)['data']['origin']['main']
sdk_params = json.loads(origin_data['sdk_params'])
origin_hls_codec = sdk_params.get('VCodec') or ''
origin_url_list = json_data['data']['origin']['main']
origin_m3u8 = {'ORIGIN': origin_url_list["hls"] + '&codec=' + origin_hls_codec}
origin_flv = {'ORIGIN': origin_url_list["flv"] + '&codec=' + origin_hls_codec}
hls_pull_url_map = room_data['stream_url']['hls_pull_url_map']
flv_pull_url = room_data['stream_url']['flv_pull_url']
room_data['stream_url']['hls_pull_url_map'] = {**origin_m3u8, **hls_pull_url_map}
room_data['stream_url']['flv_pull_url'] = {**origin_flv, **flv_pull_url}
except Exception as e:
print(f"Error message: {e} Error line: {e.__traceback__.tb_lineno}")
room_data = {'anchor_name': ""}
return room_data
@trace_error_decorator
async def get_douyin_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: OptionalStr = None) -> dict:
headers = {
@ -3250,4 +3320,4 @@ async def get_picarto_stream_url(url: str, proxy_addr: OptionalStr = None, cooki
title = json_data['channel']['title']
m3u8_url = f"https://1-edge1-us-newyork.picarto.tv/stream/hls/golive+{anchor_name}/index.m3u8"
result |= {'is_live': True, 'title': title, 'm3u8_url': m3u8_url, 'record_url': m3u8_url}
return result
return result