refactor: simplify code structure and reduce redundancy

This commit is contained in:
ihmily 2024-11-29 15:29:14 +08:00
parent 7cbd995591
commit b963d8ac6f
4 changed files with 166 additions and 238 deletions

View File

@ -815,10 +815,7 @@ def get_xhs_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Option
if 'xhslink.com' in url:
url = get_req(url, proxy_addr=proxy_addr, headers=headers, redirect_url=True)
result = {
"anchor_name": '',
"is_live": False,
}
result = {"anchor_name": '', "is_live": False}
flv_url = ''
room_id = re.search('/livestream/(.*?)(?=/|\\?|$)', url)
if room_id:
@ -829,11 +826,8 @@ def get_xhs_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Option
json_data = json.loads(json_str)
anchor_name = json_data['data']['host_info']['nickname']
live_title = json_data['data']['room']['name']
result["anchor_name"] = anchor_name
result['title'] = live_title
flv_url = f'http://live-play.xhscdn.com/live/{room_id}.flv'
result['flv_url'] = flv_url
result['record_url'] = flv_url
result |= {"anchor_name": anchor_name, "title": live_title, "flv_url": flv_url, 'record_url': flv_url}
user_id = re.search('/user/profile/(.*?)(?=/|\\?|$)', url)
user_id = user_id.group(1) if user_id else get_params(url, 'host_id')
@ -847,16 +841,12 @@ def get_xhs_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Option
json_data = json.loads(json_str)
if json_data["success"]:
if json_data['data']:
result['is_live'] = True
live_link = json_data['data'][0]['live_link']
result['anchor_name'] = get_params(live_link, "host_nickname")
anchor_name = get_params(live_link, "host_nickname")
if flv_url and get_response_status(flv_url, proxy_addr=proxy_addr, headers=headers):
return result
flv_url = get_params(live_link, "flvUrl")
result['flv_url'] = flv_url
result['record_url'] = flv_url
else:
result['is_live'] = False
result |= {"anchor_name": anchor_name, "is_live": True, "flv_url": flv_url, 'record_url': flv_url}
else:
print(f"xhs {json_data['msg']}")
return result
@ -891,18 +881,14 @@ def get_bigo_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Optio
json_data = json.loads(json_str)
anchor_name = json_data['data']['nick_name']
live_status = json_data['data']['alive']
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
if live_status == 1:
live_title = json_data['data']['roomTopic']
result['title'] = live_title
m3u8_url = json_data['data']['hls_src']
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['record_url'] = m3u8_url
result |= {"title": live_title, "is_live": True, "m3u8_url": m3u8_url, 'record_url': m3u8_url}
elif result['anchor_name'] == '':
html_str = get_req(url=f'https://www.bigo.tv/cn/{room_id}', proxy_addr=proxy_addr, headers=headers)
result['anchor_name'] = re.search('<title>欢迎来到(.*?)的直播间</title>', html_str, re.DOTALL).group(1)
@ -926,16 +912,11 @@ def get_blued_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opti
json_data = json.loads(json_str)
anchor_name = json_data['userInfo']['name']
live_status = json_data['userInfo']['onLive']
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
if live_status:
m3u8_url = "http:" + json_data['liveInfo']['liveUrl']
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['record_url'] = m3u8_url
result |= {"is_live": True, "m3u8_url": m3u8_url, 'record_url': m3u8_url}
return result
@ -1097,10 +1078,8 @@ def get_sooplive_stream_data(
anchor_name = f"{anchor_name}-{json_data['data']['bj_id']}"
else:
anchor_name = ''
result = {
"anchor_name": '' if anchor_name is None else anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name or '' ,"is_live": False}
def get_url_list(m3u8: str) -> List[str]:
resp = get_req(url=m3u8, proxy_addr=proxy_addr, headers=headers, abroad=True)
@ -1122,17 +1101,19 @@ def get_sooplive_stream_data(
print("sooplive platform login successful! Starting to fetch live streaming data...")
return cookie
def fetch_data(cookie) -> dict:
def fetch_data(cookie, _result) -> dict:
aid_token = get_sooplive_tk(url, rtype='aid', proxy_addr=proxy_addr, cookies=cookie)
_anchor_name, _broad_no = get_sooplive_tk(url, rtype='info', proxy_addr=proxy_addr, cookies=cookie)
_view_url = get_sooplive_cdn_url(_broad_no, proxy_addr=proxy_addr)['view_url']
_m3u8_url = _view_url + '?aid=' + aid_token
result['anchor_name'] = _anchor_name
result['m3u8_url'] = _m3u8_url
result['is_live'] = True
result['play_url_list'] = get_url_list(_m3u8_url)
result['new_cookies'] = cookie
return result
_result |= {
"anchor_name": _anchor_name,
"is_live": True,
"m3u8_url": _m3u8_url,
'play_url_list': get_url_list(_m3u8_url),
'new_cookies': cookie
}
return _result
if json_data['data']['code'] == -3001:
print("sooplive live stream failed to retrieve, the live stream just ended.")
@ -1144,12 +1125,12 @@ def get_sooplive_stream_data(
"please ensure it is configured.")
new_cookie = handle_login()
if new_cookie and len(new_cookie) > 0:
return fetch_data(new_cookie)
return fetch_data(new_cookie, result)
raise RuntimeError("sooplive login failed, please check if the account and password are correct")
elif json_data['data']['code'] == -3004:
if cookies and len(cookies) > 0:
return fetch_data(cookies)
return fetch_data(cookies, result)
else:
raise RuntimeError("sooplive login failed, please check if the account and password are correct")
elif json_data['data']['code'] == -6001:
@ -1161,9 +1142,7 @@ def get_sooplive_stream_data(
hls_authentication_key = json_data['data']['hls_authentication_key']
view_url = get_sooplive_cdn_url(broad_no, proxy_addr=proxy_addr)['view_url']
m3u8_url = view_url + '?aid=' + hls_authentication_key
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['play_url_list'] = get_url_list(m3u8_url)
result |= {'is_live': True, 'm3u8_url': m3u8_url, 'play_url_list': get_url_list(m3u8_url)}
result['new_cookies'] = None
return result
@ -1190,10 +1169,12 @@ def get_netease_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: O
if 'quickplay' not in live_data:
result["anchor_name"] = room_data['nickname']
else:
result["anchor_name"] = live_data['nickname']
result["is_live"] = True
result["title"] = live_data['title']
result["stream_list"] = live_data['quickplay']
result |= {
'anchor_name': live_data['nickname'],
'is_live': True,
'title': live_data['title'],
'stream_list': live_data['quickplay']
}
return result
@ -1219,10 +1200,12 @@ def get_qiandurebo_stream_data(url: str, proxy_addr: OptionalStr = None, cookies
play_url = re.findall('"play_url": "(.*?)",\r\n', data)
if len(play_url) > 0 and 'common-text-center" style="display:block' not in html_str:
result['anchor_name'] = anchor_name[0]
result['flv_url'] = play_url[0]
result['is_live'] = True
result['record_url'] = play_url[0]
result |= {
'anchor_name': anchor_name[0],
'is_live': True,
'flv_url': play_url[0],
'record_url': play_url[0]
}
return result
@ -1253,7 +1236,6 @@ def get_pandatv_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: O
}
result = {"anchor_name": "", "is_live": False}
json_str = get_req('https://api.pandalive.co.kr/v1/member/bj',
proxy_addr=proxy_addr, headers=headers, data=data, abroad=True)
json_data = json.loads(json_str)
@ -1272,9 +1254,8 @@ def get_pandatv_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: O
else:
raise RuntimeError(json_data['errorData']['code'], json_data['message'])
play_url = json_data['PlayList']['hls'][0]['url']
result['m3u8_url'] = play_url
result['is_live'] = True
result['play_url_list'] = get_play_url_list(m3u8=play_url, proxy=proxy_addr, header=headers, abroad=True)
play_url_list = get_play_url_list(m3u8=play_url, proxy=proxy_addr, header=headers, abroad=True)
result |= {'is_live': True, 'm3u8_url': play_url, 'play_url_list': play_url_list}
return result
@ -1299,20 +1280,14 @@ def get_maoerfm_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Op
live_status = False
if 'room' in json_data['info']:
live_status = json_data['info']['room']['status']['broadcasting']
result = {
"anchor_name": anchor_name,
"is_live": live_status,
}
result = {"anchor_name": anchor_name, "is_live": live_status}
if live_status:
stream_list = json_data['info']['room']['channel']
m3u8_url = stream_list['hls_pull_url']
flv_url = stream_list['flv_pull_url']
result['title'] = json_data['info']['room']['name']
result['m3u8_url'] = m3u8_url
result['flv_url'] = flv_url
result['is_live'] = True
result['record_url'] = m3u8_url
title = json_data['info']['room']['name']
result |= {'is_live': True, 'title': title, 'm3u8_url': m3u8_url, 'flv_url': flv_url, 'record_url': m3u8_url}
return result
@ -1598,10 +1573,12 @@ def get_looklive_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: O
else:
play_url_list = json_data['data']['roomInfo']['liveUrl']
live_title = json_data['data']['roomInfo']['title']
result["title"] = live_title
result["flv_url"] = play_url_list['httpPullUrl']
result["m3u8_url"] = play_url_list['hlsPullUrl']
result["record_url"] = play_url_list['hlsPullUrl']
result |= {
"title": live_title,
"flv_url": play_url_list['httpPullUrl'],
"m3u8_url": play_url_list['hlsPullUrl'],
"record_url": play_url_list['hlsPullUrl'],
}
return result
@ -1802,12 +1779,10 @@ def get_popkontv_stream_url(
json_str = fetch_data(headers, partner_code)
json_data = json.loads(json_str)
m3u8_url = json_data['data']['castHlsUrl']
result["m3u8_url"] = m3u8_url
result["record_url"] = m3u8_url
result |= {"m3u8_url": m3u8_url, "record_url": m3u8_url}
elif json_data['statusCd'] == "L0000":
m3u8_url = json_data['data']['castHlsUrl']
result["m3u8_url"] = m3u8_url
result["record_url"] = m3u8_url
result |= {"m3u8_url": m3u8_url, "record_url": m3u8_url}
else:
raise RuntimeError("Failed to retrieve live stream source,", status_msg)
result['new_token'] = new_token
@ -1888,7 +1863,6 @@ def get_twitcasting_stream_url(
}
anchor_id = url.split('/')[3]
if cookies:
headers['Cookie'] = cookies
@ -1928,10 +1902,7 @@ def get_twitcasting_stream_url(
result["anchor_name"] = anchor_name
if live_status == 'true':
play_url = f'https://twitcasting.tv/{anchor_id}/metastream.m3u8/?video=1&mode=source'
result['m3u8_url'] = play_url
result['record_url'] = play_url
result['is_live'] = True
result['title'] = live_title
result |= {'title': live_title, 'is_live': True, "m3u8_url": play_url, "record_url": play_url}
result['new_cookies'] = new_cookie
return result
@ -1975,10 +1946,7 @@ def get_baidu_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: Opt
key = list(json_data['data'].keys())[0]
data = json_data['data'][key]
anchor_name = data['host']['name']
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
if data['status'] == "0":
result["is_live"] = True
live_title = data['video']['title']
@ -1995,9 +1963,7 @@ def get_baidu_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: Opt
url_list.append(prefix + i['urls'][0]['hls'].rsplit('?', maxsplit=1)[0].rsplit('/', maxsplit=1)[1])
if url_list:
result['play_url_list'] = url_list
result['title'] = live_title
result['is_live'] = True
result |= {"is_live": True, "title": live_title, 'play_url_list': url_list}
return result
@ -2024,20 +1990,15 @@ def get_weibo_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: Opt
room_id = i['page_info']['object_id']
break
result = {
"anchor_name": '',
"is_live": False,
}
result = {"anchor_name": '', "is_live": False}
if room_id:
app_api = f'https://weibo.com/l/pc/anchor/live?live_id={room_id}'
# app_api = f'https://weibo.com/l/!/2/wblive/room/show_pc_live.json?live_id={room_id}'
json_str = get_req(url=app_api, proxy_addr=proxy_addr, headers=headers)
json_data = json.loads(json_str)
anchor_name = json_data['data']['user_info']['name']
result["anchor_name"] = anchor_name
live_status = json_data['data']['item']['status']
if live_status == 1:
result["is_live"] = True
live_title = json_data['data']['item']['desc']
@ -2049,7 +2010,6 @@ def get_weibo_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: Opt
{"m3u8_url": m3u8_url, "flv_url": flv_url},
{"m3u8_url": m3u8_url.split('_')[0] + '.m3u8', "flv_url": flv_url.split('_')[0] + '.flv'}
]
return result
@ -2070,14 +2030,10 @@ def get_kugou_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opti
room_id = url.split('?')[0].rsplit('/', maxsplit=1)[1]
app_api = f'https://service2.fanxing.kugou.com/roomcen/room/web/cdn/getEnterRoomInfo?roomId={room_id}'
json_str = get_req(url=app_api, proxy_addr=proxy_addr, headers=headers)
json_data = json.loads(json_str)
anchor_name = json_data['data']['normalRoomInfo']['nickName']
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
if not anchor_name:
raise RuntimeError(
"Music channel live rooms are not supported for recording, please switch to a different live room."
@ -2097,15 +2053,12 @@ def get_kugou_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opti
'_': str(int(time.time() * 1000)),
}
api = f'https://fx1.service.kugou.com/video/pc/live/pull/mutiline/streamaddr?{urllib.parse.urlencode(params)}'
json_str2 = get_req(api, proxy_addr=proxy_addr, headers=headers)
json_data2 = json.loads(json_str2)
stream_data = json_data2['data']['lines']
if stream_data:
result["is_live"] = True
result['flv_url'] = stream_data[-1]['streamProfiles'][0]['httpsFlv'][0]
result['record_url'] = result['flv_url']
flv_url = stream_data[-1]['streamProfiles'][0]['httpsFlv'][0]
result |= {"is_live": True, "flv_url": flv_url, "record_url": flv_url}
return result
@ -2208,9 +2161,7 @@ def get_twitchtv_stream_data(url: str, proxy_addr: OptionalStr = None, cookies:
access_key = urllib.parse.urlencode(params)
m3u8_url = f'https://usher.ttvnw.net/api/channel/hls/{uid}.m3u8?{access_key}'
play_url_list = get_play_url_list(m3u8=m3u8_url, proxy=proxy_addr, header=headers, abroad=True)
result['m3u8_url'] = m3u8_url
result['play_url_list'] = play_url_list
result |= {'m3u8_url': m3u8_url, 'play_url_list': play_url_list}
return result
@ -2244,17 +2195,16 @@ def get_liveme_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opt
stream_data = json_data['data']['video_info']
anchor_name = stream_data['uname']
live_status = stream_data['status']
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
if live_status == "0":
result["is_live"] = True
m3u8_url = stream_data['hlsvideosource']
flv_url = stream_data['videosource']
result['m3u8_url'] = m3u8_url
result['flv_url'] = flv_url
result['record_url'] = m3u8_url if m3u8_url else flv_url
result |= {
'is_live': True,
'm3u8_url': m3u8_url,
'flv_url': flv_url,
'record_url': m3u8_url or flv_url
}
return result
@ -2392,9 +2342,11 @@ def get_huajiao_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Op
api = f'https://live.huajiao.com/live/substream?{urllib.parse.urlencode(params)}'
json_str = get_req(url=api, proxy_addr=proxy_addr, headers=headers)
json_data = json.loads(json_str)
result["is_live"] = True
result['flv_url'] = json_data['data']['h264_url']
result['record_url'] = json_data['data']['h264_url']
result |= {
'is_live': True,
'flv_url': json_data['data']['h264_url'],
'record_url': json_data['data']['h264_url'],
}
return result
@ -2424,19 +2376,16 @@ def get_liuxing_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Op
live_status = room_info["live_stat"]
result = {"anchor_name": anchor_name, "is_live": False}
if live_status == 1:
result["is_live"] = True
idx = room_info['idx']
live_id = room_info['liveId1']
flv_url = f'https://txpull1.5see.com/live/{idx}/{live_id}.flv'
result['flv_url'] = flv_url
result['record_url'] = flv_url
result |= {'is_live': True, 'flv_url': flv_url, 'record_url': flv_url}
return result
@trace_error_decorator
def get_showroom_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: OptionalStr = None) -> dict:
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0',
}
@ -2538,9 +2487,7 @@ def get_acfun_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: Opt
videoPlayRes = json_data['data']['videoPlayRes']
play_url_list = json.loads(videoPlayRes)['liveAdaptiveManifest'][0]['adaptationSet']['representation']
play_url_list = sorted(play_url_list, key=itemgetter('bitrate'), reverse=True)
result['play_url_list'] = play_url_list
result['title'] = live_title
result |= {'play_url_list': play_url_list, 'title': live_title}
return result
@ -2581,10 +2528,7 @@ def get_changliao_stream_url(url: str, proxy_addr: OptionalStr = None, cookies:
live_id = json_data['data']['roomInfo']['liveID']
flv_url = f'{flv_domain}/{live_id}.flv'
m3u8_url = f'{hls_domain}/{live_id}.m3u8'
result["is_live"] = True
result["m3u8_url"] = m3u8_url
result["flv_url"] = flv_url
result["record_url"] = flv_url
result |= {'is_live': True, 'm3u8_url': m3u8_url, 'flv_url': flv_url, 'record_url': flv_url}
return result
@ -2617,10 +2561,7 @@ def get_yingke_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opt
if live_status == 1:
m3u8_url = json_data['data']['live_addr'][0]['hls_stream_addr']
flv_url = json_data['data']['live_addr'][0]['stream_addr']
result["is_live"] = True
result["m3u8_url"] = m3u8_url
result["flv_url"] = flv_url
result["record_url"] = m3u8_url
result |= {'is_live': True, 'm3u8_url': m3u8_url, 'flv_url': flv_url, 'record_url': m3u8_url}
return result
@ -2662,10 +2603,7 @@ def get_yinbo_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opti
live_id = room_data['liveID']
flv_url = f'{flv_domain}/{live_id}.flv'
m3u8_url = f'{hls_domain}/{live_id}.m3u8'
result["is_live"] = True
result["m3u8_url"] = m3u8_url
result["flv_url"] = flv_url
result["record_url"] = flv_url
result |= {'is_live': True, 'm3u8_url': m3u8_url, 'flv_url': flv_url, 'record_url': flv_url}
return result
@ -2698,11 +2636,13 @@ def get_zhihu_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opti
if live_status == 1:
live_title = live_data['theme']
play_url = live_data['drama']['playInfo']
result["is_live"] = True
result["title"] = live_title
result["m3u8_url"] = play_url['hlsUrl']
result["flv_url"] = play_url['playUrl']
result["record_url"] = play_url['hlsUrl']
result |= {
'is_live': True,
'title': live_title,
'm3u8_url': play_url['hlsUrl'],
'flv_url': play_url['playUrl'],
'record_url': play_url['hlsUrl']
}
return result
@ -2730,12 +2670,10 @@ def get_chzzk_stream_data(url: str, proxy_addr: OptionalStr = None, cookies: Opt
if live_status == 'OPEN':
play_data = json.loads(live_data['livePlaybackJson'])
m3u8_url = play_data['media'][0]['path']
result["is_live"] = True
result["m3u8_url"] = m3u8_url
m3u8_url_list = get_play_url_list(m3u8_url, proxy=proxy_addr, header=headers, abroad=True)
prefix = m3u8_url.split('?')[0].rsplit('/', maxsplit=1)[0]
m3u8_url_list = [prefix + '/' + i for i in m3u8_url_list]
result["play_url_list"] = m3u8_url_list
result |= {"is_live": True, "m3u8_url": m3u8_url, "play_url_list": m3u8_url_list}
return result
@ -2781,15 +2719,10 @@ def get_haixiu_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opt
stream_data = json_data['data']
anchor_name = stream_data['nickname']
live_status = stream_data['live_status']
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
if live_status == 1:
result["is_live"] = True
flv_url = stream_data['media_url_web']
result['flv_url'] = flv_url
result['record_url'] = flv_url
result |= {'is_live': True, 'flv_url': flv_url, 'record_url': flv_url}
return result
@ -2825,17 +2758,11 @@ def get_vvxqiu_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opt
json_data = json.loads(json_str)
anchor_name = json_data['data']['memberVO']['memberName']
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
m3u8_url = f'https://liveplay-pro.wasaixiu.com/live/1400442770_{room_id}_{room_id[2:]}_single.m3u8'
resp = get_req(m3u8_url, proxy_addr=proxy_addr, headers=headers)
if 'Not Found' not in resp:
result["is_live"] = True
result['m3u8_url'] = m3u8_url
result['record_url'] = m3u8_url
result |= {'is_live': True, 'm3u8_url': m3u8_url, 'record_url': m3u8_url}
return result
@ -2855,10 +2782,7 @@ def get_17live_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opt
json_str = get_req(api_1, proxy_addr=proxy_addr, headers=headers)
json_data = json.loads(json_str)
anchor_name = json_data["displayName"]
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
json_data = {
'liveStreamID': room_id,
}
@ -2867,10 +2791,8 @@ def get_17live_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opt
json_data = json.loads(json_str)
live_status = json_data.get("status")
if live_status and live_status == 2:
result["is_live"] = True
flv_url = json_data['pullURLsInfo']['rtmpURLs'][0]['urlHighQuality']
result['flv_url'] = flv_url
result['record_url'] = flv_url
result |= {'is_live': True, 'flv_url': flv_url, 'record_url': flv_url}
return result
@ -2892,17 +2814,11 @@ def get_langlive_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: O
live_info = json_data['data']['live_info']
anchor_name = live_info['nickname']
live_status = live_info['live_status']
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
if live_status == 1:
result["is_live"] = True
flv_url = json_data['data']['live_info']['liveurl']
m3u8_url = json_data['data']['live_info']['liveurl_hls']
result['flv_url'] = flv_url
result['m3u8_url'] = m3u8_url
result['record_url'] = m3u8_url
result |= {'is_live': True, 'm3u8_url': m3u8_url, 'flv_url': flv_url, 'record_url': m3u8_url}
return result
@ -2935,14 +2851,10 @@ def get_pplive_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opt
live_info = json_data['data']
anchor_name = live_info['name']
live_status = live_info['living']
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
if live_status:
result["is_live"] = True
result['m3u8_url'] = live_info['pullUrl']
result['record_url'] = live_info['pullUrl']
m3u8_url = live_info['pullUrl']
result |= {'is_live': True, 'm3u8_url': m3u8_url, 'record_url': m3u8_url}
return result
@ -2975,15 +2887,11 @@ def get_6room_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opti
json_data = json.loads(json_str)
flv_title = json_data['content']['liveinfo']['flvtitle']
anchor_name = json_data['content']['roominfo']['alias']
result = {
"anchor_name": anchor_name,
"is_live": False,
}
result = {"anchor_name": anchor_name, "is_live": False}
if flv_title:
result["is_live"] = True
flv_url = f'https://wlive.6rooms.com/httpflv/{flv_title}.flv'
result['flv_url'] = flv_url
result['record_url'] = get_req(flv_url, proxy_addr=proxy_addr, headers=headers, redirect_url=True)
record_url = get_req(flv_url, proxy_addr=proxy_addr, headers=headers, redirect_url=True)
result |= {'is_live': True, 'flv_url': flv_url, 'record_url': record_url}
return result
@ -3040,9 +2948,7 @@ def get_shopee_stream_url(url: str, proxy_addr: OptionalStr = None, cookies: Opt
result["anchor_name"] = anchor_name
result['uid'] = f'uid={uid}&session={session_id}'
if live_status == 1 and is_living:
result["is_live"] = True
flv_url = json_data['data']['session']['play_url']
result['title'] = json_data['data']['session']['title']
result['flv_url'] = flv_url
result['record_url'] = flv_url
return result
title = json_data['data']['session']['title']
result |= {'is_live': True, 'title': title, 'flv_url': flv_url, 'record_url': flv_url}
return result

View File

@ -49,11 +49,13 @@ def get_douyin_stream_url(json_data: dict, video_quality: str) -> dict:
quality_index = video_qualities.get(video_quality)
m3u8_url = m3u8_url_list[quality_index]
flv_url = flv_url_list[quality_index]
result['title'] = json_data['title']
result['m3u8_url'] = m3u8_url
result['flv_url'] = flv_url
result['is_live'] = True
result['record_url'] = m3u8_url or flv_url
result |= {
'is_live': True,
'title': json_data['title'],
'm3u8_url': m3u8_url,
'flv_url': flv_url,
'record_url': m3u8_url or flv_url,
}
return result
@ -103,11 +105,13 @@ def get_tiktok_stream_url(json_data: dict, video_quality: str) -> dict:
quality_index = video_qualities.get(video_quality)
flv_url = flv_url_list[quality_index]['url'].replace("https://", "http://")
m3u8_url = m3u8_url_list[quality_index]['url'].replace("https://", "http://")
result['title'] = live_room['liveRoom']['title']
result['flv_url'] = flv_url
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['record_url'] = m3u8_url or flv_url
result |= {
'is_live': True,
'title': live_room['liveRoom']['title'],
'm3u8_url': m3u8_url,
'flv_url': flv_url,
'record_url': m3u8_url or flv_url,
}
return result
@ -143,8 +147,9 @@ def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict:
flv_url_list = sorted(flv_url_list, key=lambda x: x['bitrate'], reverse=True)
# uses quality_mapping_bitrate to get the index of the quality
quality_index_bitrate_value = quality_mapping_bitrate[video_quality]
# finds the value which less than the quality_index_bitrate_value, if not then uses the previous value
quality_index = next((i for i, x in enumerate(flv_url_list) if x['bitrate'] <= quality_index_bitrate_value), None)
# find the value below `quality_index_bitrate_value`, or else use the previous one.
quality_index = next(
(i for i, x in enumerate(flv_url_list) if x['bitrate'] <= quality_index_bitrate_value), None)
if quality_index is None:
# latest quality
quality_index = len(flv_url_list) - 1
@ -152,14 +157,13 @@ def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict:
result['flv_url'] = flv_url
result['record_url'] = flv_url
else:
# TODO: Old version which not working at 20241128, could be removed if not working confirmed, pls also clean the quality_mapping mapping
# TODO: Old version which not working at 20241128, could be removed if not working confirmed,
# please also clean the quality_mapping mapping
flv_url_list = json_data['flv_url_list'][::-1]
while len(flv_url_list) < 5:
flv_url_list.append(flv_url_list[-1])
flv_url = flv_url_list[quality_index]['url']
result['flv_url'] = flv_url
result['record_url'] = flv_url
result |= {'flv_url': flv_url, 'record_url': flv_url}
result['is_live'] = True
return result
@ -167,6 +171,7 @@ def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict:
@trace_error_decorator
def get_huya_stream_url(json_data: dict, video_quality: str) -> dict:
game_live_info = json_data['data'][0]['gameLiveInfo']
live_title = game_live_info['introduction']
stream_info_list = json_data['data'][0]['gameStreamInfoList']
anchor_name = game_live_info.get('nick', '')
@ -245,11 +250,13 @@ def get_huya_stream_url(json_data: dict, video_quality: str) -> dict:
flv_url = flv_url + str(video_quality_options[video_quality])
m3u8_url = m3u8_url + str(video_quality_options[video_quality])
result['title'] = game_live_info['introduction']
result['flv_url'] = flv_url
result['m3u8_url'] = m3u8_url
result['is_live'] = True
result['record_url'] = flv_url or m3u8_url
result |= {
'is_live': True,
'title': live_title,
'm3u8_url': m3u8_url,
'flv_url': flv_url,
'record_url': flv_url or m3u8_url
}
return result
@ -275,8 +282,7 @@ def get_douyu_stream_url(json_data: dict, video_quality: str, cookies: str, prox
rtmp_live = flv_data['data'].get('rtmp_live')
if rtmp_live:
flv_url = f'{rtmp_url}/{rtmp_live}'
json_data['flv_url'] = flv_url
json_data['record_url'] = flv_url
json_data |= {'flv_url': flv_url, 'record_url': flv_url}
return json_data
@ -291,10 +297,12 @@ def get_yy_stream_url(json_data: dict) -> dict:
stream_line_addr = json_data['avp_info_res']['stream_line_addr']
cdn_info = list(stream_line_addr.values())[0]
flv_url = cdn_info['cdn_info']['url']
result['title'] = json_data['title']
result['flv_url'] = flv_url
result['is_live'] = True
result['record_url'] = flv_url
result |= {
'is_live': True,
'title': json_data['title'],
'flv_url': flv_url,
'record_url': flv_url
}
return result
@ -369,11 +377,9 @@ def get_stream_url(json_data: dict, video_quality: str, url_type: str = 'm3u8',
}
if url_type == 'm3u8':
m3u8_url = play_url_list[selected_quality][extra_key] if extra_key else play_url_list[selected_quality]
data["m3u8_url"] = json_data['m3u8_url'] if spec else m3u8_url
data["record_url"] = m3u8_url
data |= {"m3u8_url": json_data['m3u8_url'] if spec else m3u8_url, "record_url": m3u8_url}
else:
flv = play_url_list[selected_quality][extra_key] if extra_key else play_url_list[selected_quality]
data["flv_url"] = flv
data["record_url"] = flv
flv_url = play_url_list[selected_quality][extra_key] if extra_key else play_url_list[selected_quality]
data |= {"flv_url": flv_url, "record_url": flv_url}
data['title'] = json_data.get('title')
return data

View File

@ -14,6 +14,21 @@ from .logger import logger
import configparser
class Color:
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
RESET = "\033[0m"
@staticmethod
def print_colored(text, color):
print(f"{color}{text}{Color.RESET}")
def trace_error_decorator(func: callable) -> callable:
@functools.wraps(func)
def wrapper(*args: list, **kwargs: dict) -> Any:
@ -134,5 +149,5 @@ def check_disk_capacity(file_path: str | Path, show: bool = False) -> float:
if show:
print(f"{disk_root} Total: {disk_usage.total / (1024 ** 3):.2f} GB "
f"Used: {disk_usage.used / (1024 ** 3):.2f} GB "
f"Free: {free_space_gb:.2f} GB")
f"Free: {free_space_gb:.2f} GB\n")
return free_space_gb

29
main.py
View File

@ -73,6 +73,7 @@ os.makedirs(default_path, exist_ok=True)
file_update_lock = threading.Lock()
os_type = os.name
clear_command = "cls" if os_type == 'nt' else "clear"
color_obj = utils.Color()
def signal_handler(_signal, _frame):
@ -182,7 +183,6 @@ def get_startup_info(system_type: str):
def segment_video(converts_file_path: str, segment_save_file_path: str, segment_format: str, segment_time: str,
is_original_delete: bool = True) -> None:
if os.path.exists(converts_file_path) and os.path.getsize(converts_file_path) > 0:
ffmpeg_command = [
"ffmpeg",
@ -313,7 +313,7 @@ def push_message(record_name: str, live_url: str, content: str) -> None:
print(f'提示信息:已经将[{record_name}]直播状态消息推送至你的{platform},'
f' 成功{len(result["success"])}, 失败{len(result["error"])}')
except Exception as e:
print(f"直播消息推送到{platform}失败: {e}")
color_obj.print_colored(f"直播消息推送到{platform}失败: {e}", color_obj.RED)
def run_script(command: str) -> None:
@ -342,7 +342,7 @@ def clear_record_info(record_name: str, record_url: str) -> None:
if record_url in url_comments and record_url in running_list:
running_list.remove(record_url)
monitoring -= 1
print(f"[{record_name}]已经从录制列表中移除")
color_obj.print_colored(f"[{record_name}]已经从录制列表中移除\n", color_obj.YELLOW)
def check_subprocess(record_name: str, record_url: str, ffmpeg_command: list, save_type: str,
@ -363,7 +363,7 @@ def check_subprocess(record_name: str, record_url: str, ffmpeg_command: list, sa
while process.poll() is None:
if record_url in url_comments or exit_recording:
print(f"[{record_name}]录制时已被注释,本条线程将会退出")
color_obj.print_colored(f"[{record_name}]录制时已被注释,本条线程将会退出", color_obj.YELLOW)
clear_record_info(record_name, record_url)
process.terminate()
process.wait()
@ -407,7 +407,7 @@ def check_subprocess(record_name: str, record_url: str, ffmpeg_command: list, sa
logger.debug("脚本命令执行结束!")
else:
print(f"\n{record_name} {stop_time} 直播录制出错,返回码: {return_code}\n")
color_obj.print_colored(f"\n{record_name} {stop_time} 直播录制出错,返回码: {return_code}\n", color_obj.RED)
recording.discard(record_name)
return False
@ -1063,13 +1063,13 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
_filepath, _ = urllib.request.urlretrieve(flv_url, save_file_path)
record_finished = True
recording.discard(record_name)
print(
f"\n{anchor_name} {time.strftime('%Y-%m-%d %H:%M:%S')} 直播录制完成\n")
print(f"\n{anchor_name} {time.strftime('%Y-%m-%d %H:%M:%S')} 直播录制完成\n")
else:
logger.debug("未找到FLV直播流跳过录制")
except Exception as e:
print(
f"\n{anchor_name} {time.strftime('%Y-%m-%d %H:%M:%S')} 直播录制出错,请检查网络\n")
color_obj.print_colored(
f"\n{anchor_name} {time.strftime('%Y-%m-%d %H:%M:%S')} 直播录制出错,请检查网络\n",
color_obj.RED)
logger.error(f"错误信息: {e} 发生错误的行数: {e.__traceback__.tb_lineno}")
with max_request_lock:
error_count += 1
@ -1361,7 +1361,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
if error_count > 20:
x = x + 60
print("瞬时错误太多,延迟加60秒")
color_obj.print_colored("\r瞬时错误太多,延迟加60秒", color_obj.YELLOW)
# 这里是.如果录制结束后,循环时间会暂时变成30s后检测一遍. 这样一定程度上防止主播卡顿造成少录
# 当30秒过后检测一遍后. 会回归正常设置的循环秒数
@ -1524,7 +1524,7 @@ try:
except HTTPError as err:
print(f"HTTP error occurred: {err.code} - {err.reason}")
except URLError as err:
print('INFO未检测到全局/规则网络代理,请检查代理配置(若无需录制海外直播请忽略此条提示)')
color_obj.print_colored(f"INFO未检测到全局/规则网络代理,请检查代理配置(若无需录制海外直播请忽略此条提示)", color_obj.YELLOW)
except Exception as err:
print("An unexpected error occurred:", err)
@ -1672,7 +1672,7 @@ while True:
logger.warning(f"Disk space remaining is below {disk_space_limit} GB. "
f"Exiting program due to the disk space limit being reached.")
sys.exit(-1)
print("")
def contains_url(string: str) -> bool:
pattern = r"(https?://)?(www\.)?[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+(:\d+)?(/.*)?"
@ -1826,7 +1826,7 @@ while True:
url_tuples_list.append(new_line)
else:
if not origin_line.startswith('#'):
print(f"\r{origin_line.strip()} 本行包含未知链接.此条跳过")
color_obj.print_colored(f"\r{origin_line.strip()} 本行包含未知链接.此条跳过", color_obj.YELLOW)
update_file(url_config_file, old_str=origin_line, new_str=origin_line, start_str='#')
while len(need_update_line_list):
@ -1873,4 +1873,5 @@ while True:
first_run = False
time.sleep(3)