# -*- encoding: utf-8 -*- """ Author: Hmily GitHub:https://github.com/ihmily Date: 2023-07-15 23:15:00 Update: 2024-05-08 12:40:18 Copyright (c) 2023 by Hmily, All Rights Reserved. Function: Get live stream data. """ import hashlib import random import time import urllib.parse import urllib.error from urllib.request import Request from typing import Union, Dict, Any, Tuple, List import requests import ssl import re import json import execjs import urllib.request from utils import ( trace_error_decorator, update_config, dict_to_cookie_str ) from logger import script_path import http.cookiejar no_proxy_handler = urllib.request.ProxyHandler({}) opener = urllib.request.build_opener(no_proxy_handler) ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE def get_req( url: str, proxy_addr: Union[str, None] = None, headers: Union[dict, None] = None, data: Union[dict, bytes, None] = None, json_data: Union[dict, list, None] = None, timeout: int = 20, abroad: bool = False ) -> Union[str, Any]: if headers is None: headers = {} try: if proxy_addr: proxies = { 'http': proxy_addr, 'https': proxy_addr } if data or json_data: response = requests.post(url, data=data, json=json_data, headers=headers, proxies=proxies, timeout=timeout) else: response = requests.get(url, headers=headers, proxies=proxies, timeout=timeout) resp_str = response.text else: if data and not isinstance(data, bytes): data = urllib.parse.urlencode(data).encode('utf-8') if json_data and isinstance(json_data, (dict, list)): data = json.dumps(json_data).encode('utf-8') req = urllib.request.Request(url, data=data, headers=headers) try: if abroad: with urllib.request.urlopen(req, timeout=timeout) as response: resp_str = response.read().decode('utf-8') else: with opener.open(req, timeout=timeout) as response: resp_str = response.read().decode('utf-8') except urllib.error.HTTPError as e: if e.code == 400: resp_str = e.read().decode('utf-8') else: raise except urllib.error.URLError as e: print("URL Error:", e) raise except Exception as e: print("An error occurred:", e) raise except Exception as e: resp_str = str(e) return resp_str def get_params(url: str, params: str) -> Union[str, None]: parsed_url = urllib.parse.urlparse(url) query_params = urllib.parse.parse_qs(parsed_url.query) if params in query_params: return query_params[params][0] else: return None def jsonp_to_json(jsonp_str: str) -> Union[dict, None]: pattern = r'(\w+)\((.*)\);?$' match = re.search(pattern, jsonp_str) if match: _, json_str = match.groups() json_obj = json.loads(json_str) return json_obj else: print("No JSON data found in JSONP response.") return None def replace_url(file_path: str, old: str, new: str) -> None: with open(file_path, 'r', encoding='utf-8-sig') as f: content = f.read() if old in content: with open(file_path, 'w', encoding='utf-8-sig') as f: f.write(content.replace(old, new)) def get_play_url_list(m3u8: str, proxy: Union[str, None] = None, header: Union[dict, None] = None, abroad: bool = False) -> List[str]: resp = get_req(url=m3u8, proxy_addr=proxy, headers=header, abroad=abroad) play_url_list = [] for i in resp.split('\n'): if i.startswith('https://'): play_url_list.append(i.strip()) bandwidth_pattern = re.compile(r'BANDWIDTH=(\d+)') bandwidth_list = bandwidth_pattern.findall(resp) url_to_bandwidth = {url: int(bandwidth) for bandwidth, url in zip(bandwidth_list, play_url_list)} play_url_list = sorted(play_url_list, key=lambda url: url_to_bandwidth[url], reverse=True) return play_url_list @trace_error_decorator def get_douyin_stream_data(url: str, proxy_addr: Union[str, None] = None, cookies: Union[str, None] = None) -> \ Dict[str, Any]: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Referer': 'https://live.douyin.com/', 'Cookie': 'ttwid=1%7CB1qls3GdnZhUov9o2NxOMxxYS2ff6OSvEWbv0ytbES4%7C1680522049%7C280d802d6d478e3e78d0c807f7c487e7ffec0ae4e5fdd6a0fe74c3c6af149511; my_rd=1; passport_csrf_token=3ab34460fa656183fccfb904b16ff742; passport_csrf_token_default=3ab34460fa656183fccfb904b16ff742; d_ticket=9f562383ac0547d0b561904513229d76c9c21; n_mh=hvnJEQ4Q5eiH74-84kTFUyv4VK8xtSrpRZG1AhCeFNI; store-region=cn-fj; store-region-src=uid; LOGIN_STATUS=1; __security_server_data_status=1; FORCE_LOGIN=%7B%22videoConsumedRemainSeconds%22%3A180%7D; pwa2=%223%7C0%7C3%7C0%22; download_guide=%223%2F20230729%2F0%22; volume_info=%7B%22isUserMute%22%3Afalse%2C%22isMute%22%3Afalse%2C%22volume%22%3A0.6%7D; strategyABtestKey=%221690824679.923%22; stream_recommend_feed_params=%22%7B%5C%22cookie_enabled%5C%22%3Atrue%2C%5C%22screen_width%5C%22%3A1536%2C%5C%22screen_height%5C%22%3A864%2C%5C%22browser_online%5C%22%3Atrue%2C%5C%22cpu_core_num%5C%22%3A8%2C%5C%22device_memory%5C%22%3A8%2C%5C%22downlink%5C%22%3A10%2C%5C%22effective_type%5C%22%3A%5C%224g%5C%22%2C%5C%22round_trip_time%5C%22%3A150%7D%22; VIDEO_FILTER_MEMO_SELECT=%7B%22expireTime%22%3A1691443863751%2C%22type%22%3Anull%7D; home_can_add_dy_2_desktop=%221%22; __live_version__=%221.1.1.2169%22; device_web_cpu_core=8; device_web_memory_size=8; xgplayer_user_id=346045893336; csrf_session_id=2e00356b5cd8544d17a0e66484946f28; odin_tt=724eb4dd23bc6ffaed9a1571ac4c757ef597768a70c75fef695b95845b7ffcd8b1524278c2ac31c2587996d058e03414595f0a4e856c53bd0d5e5f56dc6d82e24004dc77773e6b83ced6f80f1bb70627; __ac_nonce=064caded4009deafd8b89; __ac_signature=_02B4Z6wo00f01HLUuwwAAIDBh6tRkVLvBQBy9L-AAHiHf7; ttcid=2e9619ebbb8449eaa3d5a42d8ce88ec835; webcast_leading_last_show_time=1691016922379; webcast_leading_total_show_times=1; webcast_local_quality=sd; live_can_add_dy_2_desktop=%221%22; msToken=1JDHnVPw_9yTvzIrwb7cQj8dCMNOoesXbA_IooV8cezcOdpe4pzusZE7NB7tZn9TBXPr0ylxmv-KMs5rqbNUBHP4P7VBFUu0ZAht_BEylqrLpzgt3y5ne_38hXDOX8o=; msToken=jV_yeN1IQKUd9PlNtpL7k5vthGKcHo0dEh_QPUQhr8G3cuYv-Jbb4NnIxGDmhVOkZOCSihNpA2kvYtHiTW25XNNX_yrsv5FN8O6zm3qmCIXcEe0LywLn7oBO2gITEeg=; tt_scid=mYfqpfbDjqXrIGJuQ7q-DlQJfUSG51qG.KUdzztuGP83OjuVLXnQHjsz-BRHRJu4e986' } if cookies: headers['Cookie'] = cookies try: html_str = get_req(url=url, proxy_addr=proxy_addr, headers=headers) match_json_str = re.search(r'(\{\\"state\\":.*?)]\\n"]\)', html_str) if not match_json_str: match_json_str = re.search(r'(\{\\"common\\":.*?)]\\n"]\)