# -*- encoding: utf-8 -*- """ Author: Hmily GitHub: https://github.com/ihmily Date: 2023-07-15 23:15:00 Update: 2024-07-01 23:04:00 Copyright (c) 2023 by Hmily, All Rights Reserved. Function: Get live stream data. """ import gzip import hashlib import random import string import time import urllib.parse import urllib.error from urllib.request import Request from typing import Union, Dict, Any, Tuple, List import requests import ssl import re import json import execjs import urllib.request from utils import ( trace_error_decorator, update_config, dict_to_cookie_str ) from logger import script_path import http.cookiejar from web_rid import get_sec_user_id no_proxy_handler = urllib.request.ProxyHandler({}) opener = urllib.request.build_opener(no_proxy_handler) ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE def get_req( url: str, proxy_addr: Union[str, None] = None, headers: Union[dict, None] = None, data: Union[dict, bytes, None] = None, json_data: Union[dict, list, None] = None, timeout: int = 20, abroad: bool = False, content_conding: str = 'utf-8', redirect_url: bool = False, ) -> Union[str, Any]: if headers is None: headers = {} try: if proxy_addr: proxies = { 'http': proxy_addr, 'https': proxy_addr } if data or json_data: response = requests.post(url, data=data, json=json_data, headers=headers, proxies=proxies, timeout=timeout) else: response = requests.get(url, headers=headers, proxies=proxies, timeout=timeout) if redirect_url: return response.url resp_str = response.text else: if data and not isinstance(data, bytes): data = urllib.parse.urlencode(data).encode(content_conding) if json_data and isinstance(json_data, (dict, list)): data = json.dumps(json_data).encode(content_conding) req = urllib.request.Request(url, data=data, headers=headers) try: if abroad: response = urllib.request.urlopen(req, timeout=timeout) else: response = opener.open(req, timeout=timeout) if redirect_url: return response.url content_encoding = response.info().get('Content-Encoding') try: if content_encoding == 'gzip': with gzip.open(response, 'rt', encoding=content_conding) as gzipped: resp_str = gzipped.read() else: resp_str = response.read().decode(content_conding) finally: response.close() except urllib.error.HTTPError as e: if e.code == 400: resp_str = e.read().decode(content_conding) else: raise except urllib.error.URLError as e: print("URL Error:", e) raise except Exception as e: print("An error occurred:", e) raise except Exception as e: resp_str = str(e) return resp_str def get_params(url: str, params: str) -> Union[str, None]: parsed_url = urllib.parse.urlparse(url) query_params = urllib.parse.parse_qs(parsed_url.query) if params in query_params: return query_params[params][0] def generate_random_string(length): characters = string.ascii_uppercase + string.digits random_string = ''.join(random.choices(characters, k=length)) return random_string def jsonp_to_json(jsonp_str: str) -> Union[dict, None]: pattern = r'(\w+)\((.*)\);?$' match = re.search(pattern, jsonp_str) if match: _, json_str = match.groups() json_obj = json.loads(json_str) return json_obj else: raise Exception("No JSON data found in JSONP response.") def replace_url(file_path: str, old: str, new: str) -> None: with open(file_path, 'r', encoding='utf-8-sig') as f: content = f.read() if old in content: with open(file_path, 'w', encoding='utf-8-sig') as f: f.write(content.replace(old, new)) def get_play_url_list(m3u8: str, proxy: Union[str, None] = None, header: Union[dict, None] = None, abroad: bool = False) -> List[str]: resp = get_req(url=m3u8, proxy_addr=proxy, headers=header, abroad=abroad) play_url_list = [] for i in resp.split('\n'): if i.startswith('https://'): play_url_list.append(i.strip()) if not play_url_list: for i in resp.split('\n'): if i.strip().endswith('m3u8'): play_url_list.append(i.strip()) bandwidth_pattern = re.compile(r'BANDWIDTH=(\d+)') bandwidth_list = bandwidth_pattern.findall(resp) url_to_bandwidth = {url: int(bandwidth) for bandwidth, url in zip(bandwidth_list, play_url_list)} play_url_list = sorted(play_url_list, key=lambda url: url_to_bandwidth[url], reverse=True) return play_url_list @trace_error_decorator def get_douyin_app_stream_data(url: str, proxy_addr: Union[str, None] = None, cookies: Union[str, None] = None) -> \ Dict[str, Any]: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Referer': 'https://live.douyin.com/', 'Cookie': 'ttwid=1%7CB1qls3GdnZhUov9o2NxOMxxYS2ff6OSvEWbv0ytbES4%7C1680522049%7C280d802d6d478e3e78d0c807f7c487e7ffec0ae4e5fdd6a0fe74c3c6af149511; my_rd=1; passport_csrf_token=3ab34460fa656183fccfb904b16ff742; passport_csrf_token_default=3ab34460fa656183fccfb904b16ff742; d_ticket=9f562383ac0547d0b561904513229d76c9c21; n_mh=hvnJEQ4Q5eiH74-84kTFUyv4VK8xtSrpRZG1AhCeFNI; store-region=cn-fj; store-region-src=uid; LOGIN_STATUS=1; __security_server_data_status=1; FORCE_LOGIN=%7B%22videoConsumedRemainSeconds%22%3A180%7D; pwa2=%223%7C0%7C3%7C0%22; download_guide=%223%2F20230729%2F0%22; volume_info=%7B%22isUserMute%22%3Afalse%2C%22isMute%22%3Afalse%2C%22volume%22%3A0.6%7D; strategyABtestKey=%221690824679.923%22; stream_recommend_feed_params=%22%7B%5C%22cookie_enabled%5C%22%3Atrue%2C%5C%22screen_width%5C%22%3A1536%2C%5C%22screen_height%5C%22%3A864%2C%5C%22browser_online%5C%22%3Atrue%2C%5C%22cpu_core_num%5C%22%3A8%2C%5C%22device_memory%5C%22%3A8%2C%5C%22downlink%5C%22%3A10%2C%5C%22effective_type%5C%22%3A%5C%224g%5C%22%2C%5C%22round_trip_time%5C%22%3A150%7D%22; VIDEO_FILTER_MEMO_SELECT=%7B%22expireTime%22%3A1691443863751%2C%22type%22%3Anull%7D; home_can_add_dy_2_desktop=%221%22; __live_version__=%221.1.1.2169%22; device_web_cpu_core=8; device_web_memory_size=8; xgplayer_user_id=346045893336; csrf_session_id=2e00356b5cd8544d17a0e66484946f28; odin_tt=724eb4dd23bc6ffaed9a1571ac4c757ef597768a70c75fef695b95845b7ffcd8b1524278c2ac31c2587996d058e03414595f0a4e856c53bd0d5e5f56dc6d82e24004dc77773e6b83ced6f80f1bb70627; __ac_nonce=064caded4009deafd8b89; __ac_signature=_02B4Z6wo00f01HLUuwwAAIDBh6tRkVLvBQBy9L-AAHiHf7; ttcid=2e9619ebbb8449eaa3d5a42d8ce88ec835; webcast_leading_last_show_time=1691016922379; webcast_leading_total_show_times=1; webcast_local_quality=sd; live_can_add_dy_2_desktop=%221%22; msToken=1JDHnVPw_9yTvzIrwb7cQj8dCMNOoesXbA_IooV8cezcOdpe4pzusZE7NB7tZn9TBXPr0ylxmv-KMs5rqbNUBHP4P7VBFUu0ZAht_BEylqrLpzgt3y5ne_38hXDOX8o=; msToken=jV_yeN1IQKUd9PlNtpL7k5vthGKcHo0dEh_QPUQhr8G3cuYv-Jbb4NnIxGDmhVOkZOCSihNpA2kvYtHiTW25XNNX_yrsv5FN8O6zm3qmCIXcEe0LywLn7oBO2gITEeg=; tt_scid=mYfqpfbDjqXrIGJuQ7q-DlQJfUSG51qG.KUdzztuGP83OjuVLXnQHjsz-BRHRJu4e986' } if cookies: headers['Cookie'] = cookies def get_app_data(): room_id, sec_uid = get_sec_user_id(url=url, proxy_addr=proxy_addr) api2 = f'https://webcast.amemv.com/webcast/room/reflow/info/?verifyFp=verify_lxj5zv70_7szNlAB7_pxNY_48Vh_ALKF_GA1Uf3yteoOY&type_id=0&live_id=1&room_id={room_id}&sec_user_id={sec_uid}&version_code=99.99.99&app_id=1128' json_str2 = get_req(url=api2, proxy_addr=proxy_addr, headers=headers) json_data2 = json.loads(json_str2)['data'] room_data2 = json_data2['room'] room_data2['anchor_name'] = room_data2['owner']['nickname'] return room_data2 try: web_rid = url.split('?')[0].split('live.douyin.com/') if len(web_rid) > 1: web_rid = web_rid[1] api = f'https://live.douyin.com/webcast/room/web/enter/?aid=6383&app_name=douyin_web&live_id=1&device_platform=web&language=zh-CN&browser_language=zh-CN&browser_platform=Win32&browser_name=Chrome&browser_version=116.0.0.0&web_rid={web_rid}' json_str = get_req(url=api, proxy_addr=proxy_addr, headers=headers) json_data = json.loads(json_str)['data'] room_data = json_data['data'][0] room_data['anchor_name'] = json_data['user']['nickname'] else: room_data = get_app_data() if 'stream_url' not in room_data: raise RuntimeError('该直播类型或玩法电脑端暂未支持,请使用app端分享链接进行录制') live_core_sdk_data = room_data['stream_url']['live_core_sdk_data'] if room_data['status'] == 2: if live_core_sdk_data: json_str = live_core_sdk_data['pull_data']['stream_data'] json_data = json.loads(json_str) if 'origin' in json_data['data']: origin_url_list = json_data['data']['origin']['main'] origin_m3u8 = {'ORIGIN': origin_url_list["hls"]} origin_flv = {'ORIGIN': origin_url_list["flv"]} hls_pull_url_map = room_data['stream_url']['hls_pull_url_map'] flv_pull_url = room_data['stream_url']['flv_pull_url'] room_data['stream_url']['hls_pull_url_map'] = {**origin_m3u8, **hls_pull_url_map} room_data['stream_url']['flv_pull_url'] = {**origin_flv, **flv_pull_url} except Exception as e: print(f"错误信息: {e} 发生错误的行数: {e.__traceback__.tb_lineno}") room_data = {'anchor_name': ""} return room_data @trace_error_decorator def get_douyin_stream_data(url: str, proxy_addr: Union[str, None] = None, cookies: Union[str, None] = None) -> \ Dict[str, Any]: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Referer': 'https://live.douyin.com/', 'Cookie': 'ttwid=1%7CB1qls3GdnZhUov9o2NxOMxxYS2ff6OSvEWbv0ytbES4%7C1680522049%7C280d802d6d478e3e78d0c807f7c487e7ffec0ae4e5fdd6a0fe74c3c6af149511; my_rd=1; passport_csrf_token=3ab34460fa656183fccfb904b16ff742; passport_csrf_token_default=3ab34460fa656183fccfb904b16ff742; d_ticket=9f562383ac0547d0b561904513229d76c9c21; n_mh=hvnJEQ4Q5eiH74-84kTFUyv4VK8xtSrpRZG1AhCeFNI; store-region=cn-fj; store-region-src=uid; LOGIN_STATUS=1; __security_server_data_status=1; FORCE_LOGIN=%7B%22videoConsumedRemainSeconds%22%3A180%7D; pwa2=%223%7C0%7C3%7C0%22; download_guide=%223%2F20230729%2F0%22; volume_info=%7B%22isUserMute%22%3Afalse%2C%22isMute%22%3Afalse%2C%22volume%22%3A0.6%7D; strategyABtestKey=%221690824679.923%22; stream_recommend_feed_params=%22%7B%5C%22cookie_enabled%5C%22%3Atrue%2C%5C%22screen_width%5C%22%3A1536%2C%5C%22screen_height%5C%22%3A864%2C%5C%22browser_online%5C%22%3Atrue%2C%5C%22cpu_core_num%5C%22%3A8%2C%5C%22device_memory%5C%22%3A8%2C%5C%22downlink%5C%22%3A10%2C%5C%22effective_type%5C%22%3A%5C%224g%5C%22%2C%5C%22round_trip_time%5C%22%3A150%7D%22; VIDEO_FILTER_MEMO_SELECT=%7B%22expireTime%22%3A1691443863751%2C%22type%22%3Anull%7D; home_can_add_dy_2_desktop=%221%22; __live_version__=%221.1.1.2169%22; device_web_cpu_core=8; device_web_memory_size=8; xgplayer_user_id=346045893336; csrf_session_id=2e00356b5cd8544d17a0e66484946f28; odin_tt=724eb4dd23bc6ffaed9a1571ac4c757ef597768a70c75fef695b95845b7ffcd8b1524278c2ac31c2587996d058e03414595f0a4e856c53bd0d5e5f56dc6d82e24004dc77773e6b83ced6f80f1bb70627; __ac_nonce=064caded4009deafd8b89; __ac_signature=_02B4Z6wo00f01HLUuwwAAIDBh6tRkVLvBQBy9L-AAHiHf7; ttcid=2e9619ebbb8449eaa3d5a42d8ce88ec835; webcast_leading_last_show_time=1691016922379; webcast_leading_total_show_times=1; webcast_local_quality=sd; live_can_add_dy_2_desktop=%221%22; msToken=1JDHnVPw_9yTvzIrwb7cQj8dCMNOoesXbA_IooV8cezcOdpe4pzusZE7NB7tZn9TBXPr0ylxmv-KMs5rqbNUBHP4P7VBFUu0ZAht_BEylqrLpzgt3y5ne_38hXDOX8o=; msToken=jV_yeN1IQKUd9PlNtpL7k5vthGKcHo0dEh_QPUQhr8G3cuYv-Jbb4NnIxGDmhVOkZOCSihNpA2kvYtHiTW25XNNX_yrsv5FN8O6zm3qmCIXcEe0LywLn7oBO2gITEeg=; tt_scid=mYfqpfbDjqXrIGJuQ7q-DlQJfUSG51qG.KUdzztuGP83OjuVLXnQHjsz-BRHRJu4e986' } if cookies: headers['Cookie'] = cookies try: origin_url_list = None html_str = get_req(url=url, proxy_addr=proxy_addr, headers=headers) match_json_str = re.search(r'(\{\\"state\\":.*?)]\\n"]\)', html_str) if not match_json_str: match_json_str = re.search(r'(\{\\"common\\":.*?)]\\n"]\)