Compare commits

...

61 Commits
v4.0.2 ... main

Author SHA1 Message Date
vain-Liang
add187f8d8
feat: basic support for running with uv (#1265) 2025-11-03 19:55:52 +08:00
ihmily
0333cb4a01 optimize douyin live error catch 2025-10-25 15:17:50 +08:00
ihmily
73857755a7 fix tiktok parse 2025-10-24 18:59:45 +08:00
ihmily
fec734ae74 docs: update readme 2025-10-24 16:04:08 +08:00
ihmily
853d03ea14 fix: update bigo match 2025-10-24 10:53:21 +08:00
ihmily
2fb7f7afd7 feat: add sooplive.com support 2025-10-24 10:43:46 +08:00
ihmily
200e5b5b58 fix douyin stream fetch 2025-10-23 19:55:56 +08:00
Ovear
abb204e6e9
fix: graceful exit when disk usage threshold is reached (#1239) 2025-10-23 19:45:22 +08:00
ihmily
271a53621d fix: update TTinglive 2025-09-01 21:50:06 +08:00
ihmily
d77760f3c9 update readme 2025-09-01 18:21:52 +08:00
ihmily
9c913e23cc update config 2025-09-01 17:48:42 +08:00
ihmily
af37bf28f0 fix: update title fetch 2025-09-01 16:52:56 +08:00
ihmily
d4796409c7 feat: add direct downloader 2025-09-01 16:29:13 +08:00
ihmily
93a12ab41d feat: add direct downloader 2025-09-01 16:10:16 +08:00
ihmily
525b720627 fix live audio record 2025-08-28 18:05:11 +08:00
Hmily
e9f2a55ceb
feat: add laixiu sign js (#1195) 2025-08-28 17:08:15 +08:00
Hmily
3965487746
fix bigo room_id parse and twitcasting login (#1194) 2025-08-28 17:03:10 +08:00
Hmily
e80f1e653a
fix: update liveme room id match 2025-08-27 18:15:45 +08:00
Hmily
63787f1743
fix: update liveme room id match (#1192) 2025-08-27 18:10:28 +08:00
Hmily
a328c6a1c5
fix douyin stream fetch (#1190) 2025-08-27 13:55:22 +08:00
Hmily
be2c3a393f
fix: update flextv endpoint address (#1185) 2025-08-22 18:36:48 +08:00
Hmily
c7e3cf47ce
fix: update twitcast live parse (#1177) 2025-08-12 16:11:36 +08:00
Hmily
199186fb09
fix: update weibo live parse 2025-08-05 19:03:45 +08:00
Hmily
5778ebc4b3
fix: update migu live parse (#1165) 2025-08-02 19:41:24 +08:00
COYG⚡️
d9f985303a
Update LICENSE (#1160) 2025-07-30 15:46:12 +08:00
逆行时光
110d5bded4
feat: add push tunnel: pushplus (#1156) 2025-07-30 09:32:01 +08:00
ihmily
e478d72e62 fix: optimize douyin app short link parse 2025-07-24 16:53:07 +08:00
Hmily
bcfc268c1c
feat: add picarto live (#1149) 2025-07-22 11:51:44 +08:00
ihmily
8e4e9b098f feat: add vcodec flag for tiktok and douyin 2025-07-19 19:06:27 +08:00
ihmily
9f499f3fa6 feat: add lianjie and laixiu live 2025-07-19 17:45:19 +08:00
ihmily
ae8200e01c feat: add migu live record 2025-07-04 17:28:44 +08:00
ihmily
952eeb9b7c fix: handle twitch live record ad issues 2025-06-30 21:53:23 +08:00
727155455
ba8979e4ee
fix script input parameters (#1104) 2025-06-16 20:20:17 +08:00
ihmily
c157c08e5a style: fix lint issues 2025-06-14 14:27:55 +08:00
ihmily
effcfcc76f fix: update bilibili live data fetch 2025-06-13 19:00:31 +08:00
Hmily
d787838ed2
up 2025-06-11 13:36:36 +08:00
Hmily
86a822f0db
Update build-image.yml 2025-06-06 18:58:39 +08:00
ihmily
51f79d70f8 fix: update 2025-06-05 16:06:12 +08:00
ihmily
d37d9f25ad fix: update maoerfm live record 2025-06-05 14:19:53 +08:00
ihmily
c40a10235d fix: update blued stream fetch 2025-06-05 11:58:08 +08:00
ihmily
d860023808 fix: update bilibili live room data fetch 2025-06-05 11:41:52 +08:00
ihmily
57739410f8 fix: update taobao live stream fetch 2025-06-04 18:20:30 +08:00
ihmily
9bc629c1b0 fix: update douyin profile link parse 2025-06-04 15:38:16 +08:00
ihmily
2dd9c42767 fix: update douyin profile link parse 2025-06-04 15:31:59 +08:00
ihmily
fd06bc89da fix: update popkontv live stream fetch 2025-05-30 18:55:47 +08:00
ihmily
019b30203e fix: update rednote live stream fetch 2025-05-30 18:40:51 +08:00
ihmily
0bd2a3a360 refactor: Refactor code structure 2025-05-30 17:12:59 +08:00
某时橙
4fa3fbb773
fix: print flush (#957) 2025-03-20 18:50:01 +08:00
咳咳
ba046660fc
fix: huya prioritizes using TX CDN (#993) 2025-03-20 16:27:18 +08:00
ihmily
f73ce7b68e fix: update huya live record 2025-02-08 19:20:09 +08:00
ihmily
9e494f4377 fix: update flextv live stream fetch 2025-02-08 18:00:52 +08:00
ihmily
151a6a45c9 fix: fix force https record 2025-02-06 18:41:06 +08:00
ihmily
807f4b758e fix: update netease cc live record 2025-02-06 02:37:06 +08:00
ihmily
bf7381bf6c refactor: convert asynchronously func and add readme_pypi 2025-02-05 22:44:21 +08:00
ihmily
d055602e81 refactor: update quality map 2025-02-05 21:34:32 +08:00
ihmily
ef97e01dba refactor: update quality map 2025-02-05 20:52:36 +08:00
ihmily
e189640d3a refactor: update quality map 2025-02-05 20:31:43 +08:00
ihmily
7bb778a875 refactor: rename package from douyinliverecorder to streamget 2025-02-05 11:56:16 +08:00
ihmily
99ea143c78 chore: update httpx dependencies 2025-02-04 07:03:34 +08:00
ihmily
52c0287150 perf: add http2 tcp request 2025-02-04 06:54:14 +08:00
ihmily
246632de78 refactor: optimize code structure 2025-02-04 06:39:58 +08:00
33 changed files with 2242 additions and 790 deletions

View File

@ -4,45 +4,51 @@ on:
push:
tags:
- '*'
workflow_dispatch:
inputs:
tag_name:
description: 'Tag name for the Docker image'
required: false
default: 'latest'
jobs:
build_and_push:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Checkout code
uses: actions/checkout@v2
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Cache Docker layers
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-
- name: Cache Docker layers
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-
- name: Log in to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
registry: docker.io
- name: Log in to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
registry: docker.io
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile
push: true
tags: |
ihmily/douyin-live-recorder:${{ github.ref_name }}
ihmily/douyin-live-recorder:latest
platforms: linux/amd64,linux/arm64
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile
push: true
tags: |
ihmily/douyin-live-recorder:${{ github.event.inputs.tag_name || github.ref_name }}
ihmily/douyin-live-recorder:latest
platforms: linux/amd64,linux/arm64
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache

2
.gitignore vendored
View File

@ -90,7 +90,7 @@ node-v*.zip
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.

View File

@ -1,6 +1,6 @@
MIT License
Copyright (c) 2023 Hmily
Copyright (c) 2025 Hmily
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

138
README.md
View File

@ -31,7 +31,7 @@
- [x] 猫耳FM
- [x] Look直播
- [x] WinkTV
- [x] FlexTV
- [x] TTingLive(原Flextv)
- [x] PopkonTV
- [x] TwitCasting
- [x] 百度直播
@ -61,6 +61,10 @@
- [x] 淘宝
- [x] 京东
- [x] Faceit
- [x] 咪咕
- [x] 连接直播
- [x] 来秀直播
- [x] Picarto
- [ ] 更多平台正在更新中
</div>
@ -80,6 +84,7 @@
├── utils.py -> (contains utility functions)
├── logger.py -> (logger handdle)
├── room.py -> (get room info)
├── ab_sign.py-> (generate dy token)
├── /javascript -> (some decrypt code)
├── main.py -> (main file)
├── ffmpeg_install.py -> (ffmpeg install script)
@ -144,8 +149,7 @@ https://www.yy.com/22490906/22490906
B站:
https://live.bilibili.com/320
小红书(推荐使用主页地址):
https://www.xiaohongshu.com/user/profile/6330049c000000002303c7ed?appuid=5f3f478a00000000010005b3
小红书(直播间分享地址):
http://xhslink.com/xpJpfM
bigo直播:
@ -175,7 +179,7 @@ https://look.163.com/live?id=65108820&position=3
WinkTV:
https://www.winktv.co.kr/live/play/anjer1004
FlexTV:
FlexTV(TTinglive)::
https://www.flextv.co.kr/channels/593127/live
PopkonTV:
@ -258,6 +262,7 @@ Youtube:
https://www.youtube.com/watch?v=cS6zS5hi1w0
淘宝(需cookie):
https://tbzb.taobao.com/live?liveId=532359023188
https://m.tb.cn/h.TWp0HTd
京东:
@ -265,12 +270,24 @@ https://3.cn/28MLBy-E
Faceit:
https://www.faceit.com/zh/players/Compl1/stream
连接直播:
https://show.lailianjie.com/10000258
咪咕直播:
https://www.miguvideo.com/p/live/120000541321
来秀直播:
https://www.imkktv.com/h5/share/video.html?uid=1845195&roomId=1710496
Picarto:
https://www.picarto.tv/cuteavalanche
```
&emsp;
## 🎃源码运行
使用源码运行,前提要有**Python>=3.10**环境如果没有请先自行安装Python再执行下面步骤。
使用源码运行,可参考下面的步骤。
1.首先拉取或手动下载本仓库项目代码
@ -282,9 +299,94 @@ git clone https://github.com/ihmily/DouyinLiveRecorder.git
```bash
cd DouyinLiveRecorder
pip3 install -r requirements.txt
```
> [!TIP]
> - 不论你是否已安装 **Python>=3.10** 环境, 都推荐使用 [**uv**](https://github.com/astral-sh/uv) 运行, 因为它可以自动管理虚拟环境和方便地管理 **Python** 版本, **不过这完全是可选的**<br />
> 使用以下命令安装
> ```bash
> # 在 macOS 和 Linux 上安装 uv
> curl -LsSf https://astral.sh/uv/install.sh | sh
> ```
> ```powershell
> # 在 Windows 上安装 uv
> powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
> ```
> - 如果安装依赖速度太慢, 你可以考虑使用国内 pip 镜像源:<br />
> 在 `pip` 命令使用 `-i` 参数指定, 如 `pip3 install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple`<br />
> 或者在 `uv` 命令 `--index` 选项指定, 如 `uv sync --index https://pypi.tuna.tsinghua.edu.cn/simple`
<details>
<summary>如果已安装 <b>Python>=3.10</b> 环境</summary>
- :white_check_mark: 在虚拟环境中安装 (推荐)
1. 创建虚拟环境
- 使用系统已安装的 Python, 不使用 uv
```bash
python -m venv .venv
```
- 使用 uv, 默认使用系统 Python, 你可以添加 `--python` 选项指定 Python 版本而不使用系统 Python [uv官方文档](https://docs.astral.sh/uv/concepts/python-versions/)
```bash
uv venv
```
2. 在终端激活虚拟环境 (在未安装 uv 或你想要手动激活虚拟环境时执行, 若已安装 uv, 可以跳过这一步, uv 会自动激活并使用虚拟环境)
**Bash**
```bash
source .venv/Scripts/activate
```
**Powershell**
```powershell
.venv\Scripts\activate.ps1
```
**Windows CMD**
```bat
.venv\Scripts\activate.bat
```
3. 安装依赖
```bash
# 使用 pip (若安装太慢或失败, 可使用 `-i` 指定镜像源)
pip3 install -U pip && pip3 install -r requirements.txt
# 或者使用 uv (可使用 `--index` 指定镜像源)
uv sync
# 或者
uv pip sync requirements.txt
```
- :x: 在系统 Python 环境中安装 (不推荐)
```bash
pip3 install -U pip && pip3 install -r requirements.txt
```
</details>
<details>
<summary>如果未安装 <b>Python>=3.10</b> 环境</summary>
你可以使用 [**uv**](https://github.com/astral-sh/uv) 安装依赖
```bash
# uv 将使用 3.10 及以上的最新 python 发行版自动创建并使用虚拟环境, 可使用 --python 选项指定 python 版本, 参见 https://docs.astral.sh/uv/reference/cli/#uv-sync--python 和 https://docs.astral.sh/uv/reference/cli/#uv-pip-sync--python
uv sync
# 或
uv pip sync requirements.txt
```
</details>
3.安装[FFmpeg](https://ffmpeg.org/download.html#build-linux)如果是Windows系统这一步可跳过。对于Linux系统执行以下命令安装
CentOS执行
@ -317,6 +419,12 @@ brew install ffmpeg
```python
python main.py
```
```bash
uv run main.py
```
其中Linux系统请使用`python3 main.py` 运行。
@ -373,6 +481,13 @@ docker-compose stop
&emsp;
## 🤖相关项目
- StreamCap: https://github.com/ihmily/StreamCap
- streamget: https://github.com/ihmily/streamget
&emsp;
## ❤️贡献者
&ensp;&ensp; [![Hmily](https://github.com/ihmily.png?size=50)](https://github.com/ihmily)
@ -392,10 +507,21 @@ docker-compose stop
&ensp;&ensp; [![HoratioShaw](https://github.com/HoratioShaw.png?size=50)](https://github.com/HoratioShaw)
[![nov30th](https://github.com/nov30th.png?size=50)](https://github.com/nov30th)
[![727155455](https://github.com/727155455.png?size=50)](https://github.com/727155455)
[![nixingshiguang](https://github.com/nixingshiguang.png?size=50)](https://github.com/nixingshiguang)
[![1411430556](https://github.com/1411430556.png?size=50)](https://github.com/1411430556)
[![Ovear](https://github.com/Ovear.png?size=50)](https://github.com/Ovear)
&emsp;
## ⏳提交日志
- 20251024
- 修复抖音风控无法获取数据问题
- 新增soop.com录制支持
- 修复bigo录制
- 20250127
- 新增淘宝、京东、faceit直播录制
- 修复小红书直播流录制以及转码问题

View File

@ -30,31 +30,32 @@ mp4格式重新编码为h264 = 否
额外使用代理录制的平台(逗号分隔) =
[推送配置]
# 可选微信|钉钉|tg|邮箱|bark|ntfy 可填多个
直播状态推送渠道 =
钉钉推送接口链接 =
微信推送接口链接 =
bark推送接口链接 =
# 可选微信|钉钉|tg|邮箱|bark|ntfy|pushplus 可填多个
直播状态推送渠道 =
钉钉推送接口链接 =
微信推送接口链接 =
bark推送接口链接 =
bark推送中断级别 = active
bark推送铃声 =
钉钉通知@对象(填手机号) =
bark推送铃声 =
钉钉通知@对象(填手机号) =
钉钉通知@全体(是/否) =
tgapi令牌 =
tg聊天id(个人或者群组id) =
smtp邮件服务器 =
是否使用SMTP服务SSL加密(是/否) =
SMTP邮件服务器端口 =
邮箱登录账号 =
发件人密码(授权码) =
发件人邮箱 =
发件人显示昵称 =
收件人邮箱 =
tgapi令牌 =
tg聊天id(个人或者群组id) =
smtp邮件服务器 =
是否使用SMTP服务SSL加密(是/否) =
SMTP邮件服务器端口 =
邮箱登录账号 =
发件人密码(授权码) =
发件人邮箱 =
发件人显示昵称 =
收件人邮箱 =
ntfy推送地址 = https://ntfy.sh/xxxx
ntfy推送标签 = tada
ntfy推送邮箱 =
自定义推送标题 =
自定义开播推送内容 =
自定义关播推送内容 =
ntfy推送邮箱 =
pushplus推送token =
自定义推送标题 =
自定义开播推送内容 =
自定义关播推送内容 =
只推送通知不录制(是/否) =
直播推送检测频率(秒) = 1800
开播推送开启(是/否) =
@ -63,63 +64,68 @@ ntfy推送邮箱 =
[Cookie]
# 录制抖音必填
抖音cookie = ttwid=1%7CB1qls3GdnZhUov9o2NxOMxxYS2ff6OSvEWbv0ytbES4%7C1680522049%7C280d802d6d478e3e78d0c807f7c487e7ffec0ae4e5fdd6a0fe74c3c6af149511; my_rd=1; passport_csrf_token=3ab34460fa656183fccfb904b16ff742; passport_csrf_token_default=3ab34460fa656183fccfb904b16ff742; d_ticket=9f562383ac0547d0b561904513229d76c9c21; n_mh=hvnJEQ4Q5eiH74-84kTFUyv4VK8xtSrpRZG1AhCeFNI; store-region=cn-fj; store-region-src=uid; LOGIN_STATUS=1; __security_server_data_status=1; FORCE_LOGIN=%7B%22videoConsumedRemainSeconds%22%3A180%7D; pwa2=%223%7C0%7C3%7C0%22; download_guide=%223%2F20230729%2F0%22; volume_info=%7B%22isUserMute%22%3Afalse%2C%22isMute%22%3Afalse%2C%22volume%22%3A0.6%7D; strategyABtestKey=%221690824679.923%22; stream_recommend_feed_params=%22%7B%5C%22cookie_enabled%5C%22%3Atrue%2C%5C%22screen_width%5C%22%3A1536%2C%5C%22screen_height%5C%22%3A864%2C%5C%22browser_online%5C%22%3Atrue%2C%5C%22cpu_core_num%5C%22%3A8%2C%5C%22device_memory%5C%22%3A8%2C%5C%22downlink%5C%22%3A10%2C%5C%22effective_type%5C%22%3A%5C%224g%5C%22%2C%5C%22round_trip_time%5C%22%3A150%7D%22; VIDEO_FILTER_MEMO_SELECT=%7B%22expireTime%22%3A1691443863751%2C%22type%22%3Anull%7D; home_can_add_dy_2_desktop=%221%22; __live_version__=%221.1.1.2169%22; device_web_cpu_core=8; device_web_memory_size=8; xgplayer_user_id=346045893336; csrf_session_id=2e00356b5cd8544d17a0e66484946f28; odin_tt=724eb4dd23bc6ffaed9a1571ac4c757ef597768a70c75fef695b95845b7ffcd8b1524278c2ac31c2587996d058e03414595f0a4e856c53bd0d5e5f56dc6d82e24004dc77773e6b83ced6f80f1bb70627; __ac_nonce=064caded4009deafd8b89; __ac_signature=_02B4Z6wo00f01HLUuwwAAIDBh6tRkVLvBQBy9L-AAHiHf7; ttcid=2e9619ebbb8449eaa3d5a42d8ce88ec835; webcast_leading_last_show_time=1691016922379; webcast_leading_total_show_times=1; webcast_local_quality=sd; live_can_add_dy_2_desktop=%221%22; msToken=1JDHnVPw_9yTvzIrwb7cQj8dCMNOoesXbA_IooV8cezcOdpe4pzusZE7NB7tZn9TBXPr0ylxmv-KMs5rqbNUBHP4P7VBFUu0ZAht_BEylqrLpzgt3y5ne_38hXDOX8o=; msToken=jV_yeN1IQKUd9PlNtpL7k5vthGKcHo0dEh_QPUQhr8G3cuYv-Jbb4NnIxGDmhVOkZOCSihNpA2kvYtHiTW25XNNX_yrsv5FN8O6zm3qmCIXcEe0LywLn7oBO2gITEeg=; tt_scid=mYfqpfbDjqXrIGJuQ7q-DlQJfUSG51qG.KUdzztuGP83OjuVLXnQHjsz-BRHRJu4e986
快手cookie =
tiktok_cookie =
虎牙cookie =
斗鱼cookie =
yy_cookie =
b站cookie =
小红书cookie =
bigo_cookie =
blued_cookie =
sooplive_cookie =
netease_cookie =
千度热播_cookie =
pandatv_cookie =
猫耳fm_cookie =
winktv_cookie =
flextv_cookie =
look_cookie =
twitcasting_cookie =
baidu_cookie =
weibo_cookie =
kugou_cookie =
twitch_cookie =
liveme_cookie =
huajiao_cookie =
liuxing_cookie =
showroom_cookie =
acfun_cookie =
changliao_cookie =
快手cookie =
tiktok_cookie =
虎牙cookie =
斗鱼cookie =
yy_cookie =
b站cookie =
小红书cookie =
bigo_cookie =
blued_cookie =
sooplive_cookie =
netease_cookie =
千度热播_cookie =
pandatv_cookie =
猫耳fm_cookie =
winktv_cookie =
flextv_cookie =
look_cookie =
twitcasting_cookie =
baidu_cookie =
weibo_cookie =
kugou_cookie =
twitch_cookie =
liveme_cookie =
huajiao_cookie =
liuxing_cookie =
showroom_cookie =
acfun_cookie =
changliao_cookie =
yinbo_cookie =
yingke_cookie =
zhihu_cookie =
chzzk_cookie =
haixiu_cookie =
vvxqiu_cookie =
17live_cookie =
langlive_cookie =
pplive_cookie =
6room_cookie =
lehaitv_cookie =
huamao_cookie =
shopee_cookie =
youtube_cookie =
taobao_cookie =
jd_cookie =
faceit_cookie =
yingke_cookie =
zhihu_cookie =
chzzk_cookie =
haixiu_cookie =
vvxqiu_cookie =
17live_cookie =
langlive_cookie =
pplive_cookie =
6room_cookie =
lehaitv_cookie =
huamao_cookie =
shopee_cookie =
youtube_cookie =
taobao_cookie =
jd_cookie =
faceit_cookie =
migu_cookie =
lianjie_cookie =
laixiu_cookie =
picarto_cookie =
[Authorization]
popkontv_token =
popkontv_token =
[账号密码]
sooplive账号 =
sooplive密码 =
flextv账号 =
flextv密码 =
popkontv账号 =
sooplive账号 =
sooplive密码 =
flextv账号 =
flextv密码 =
popkontv账号 =
partner_code = P-00001
popkontv密码 =
popkontv密码 =
twitcasting账号类型 = normal
twitcasting账号 =
twitcasting密码 =
twitcasting账号 =
twitcasting密码 =

26
demo.py
View File

@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-
import asyncio
from douyinliverecorder.logger import logger
from douyinliverecorder import spider
from src.logger import logger
from src import spider
# 以下示例直播间链接不保证时效性,请自行查看链接是否能正常访问
# Please note that the following example live room links may not be up-to-date;
# Please note that the following example live room links may not be up-to-date
LIVE_STREAM_CONFIG = {
"douyin": {
"url": "https://live.douyin.com/745964462470",
@ -71,7 +71,7 @@ LIVE_STREAM_CONFIG = {
"func": spider.get_winktv_stream_data,
},
"flextv": {
"url": "https://www.flextv.co.kr/channels/593127/live",
"url": "https://www.ttinglive.com/channels/685479/live",
"func": spider.get_flextv_stream_data,
},
"looklive": {
@ -190,6 +190,22 @@ LIVE_STREAM_CONFIG = {
"faceit": {
"url": "https://www.faceit.com/zh/players/Compl1/stream",
"func": spider.get_faceit_stream_data,
},
"lianjie": {
"url": "https://show.lailianjie.com/10000258",
"func": spider.get_lianjie_stream_url,
},
"migu": {
"url": "https://www.miguvideo.com/p/live/120000541321",
"func": spider.get_migu_stream_url,
},
"laixiu": {
"url": "https://www.imkktv.com/h5/share/video.html?uid=1845195&roomId=1710496",
"func": spider.get_laixiu_stream_url,
},
"picarto": {
"url": "https://www.picarto.tv/cuteavalanche",
"func": spider.get_picarto_stream_url,
}
}
@ -208,4 +224,4 @@ def test_live_stream(platform_name: str, proxy_addr=None, cookies=None) -> None:
if __name__ == "__main__":
platform = "douyin"
test_live_stream(platform)
test_live_stream(platform)

View File

@ -15,7 +15,7 @@ import zipfile
from pathlib import Path
import requests
from tqdm import tqdm
from douyinliverecorder.logger import logger
from src.logger import logger
current_platform = platform.system()
execute_dir = os.path.split(os.path.realpath(sys.argv[0]))[0]
@ -122,7 +122,7 @@ def install_ffmpeg_linux():
logger.debug("Trying to install the stable version of ffmpeg")
result = subprocess.run(['yum', '-y', 'update'], capture_output=True)
if result.returncode != 0:
logger.error(f"Failed to update package lists using yum.")
logger.error("Failed to update package lists using yum.")
return False
result = subprocess.run(['yum', 'install', '-y', 'ffmpeg'], capture_output=True)
@ -218,4 +218,4 @@ def check_ffmpeg_installed() -> bool:
def check_ffmpeg() -> bool:
if not check_ffmpeg_installed():
return install_ffmpeg()
return True
return True

View File

@ -20,7 +20,7 @@ else:
locale_path = Path(execute_dir) / 'i18n'
_tr = init_gettext(locale_path, 'zh_CN')
original_print = builtins.print
package_name = 'douyinliverecorder'
package_name = 'src'
def translated_print(*args, **kwargs):

505
main.py
View File

@ -4,8 +4,8 @@
Author: Hmily
GitHub: https://github.com/ihmily
Date: 2023-07-17 23:52:05
Update: 2025-01-26 00:05:00
Copyright (c) 2023-2024 by Hmily, All Rights Reserved.
Update: 2025-10-23 19:48:05
Copyright (c) 2023-2025 by Hmily, All Rights Reserved.
Function: Record live stream video.
"""
import asyncio
@ -22,27 +22,27 @@ import shutil
import random
import uuid
from pathlib import Path
import urllib.parse
import urllib.request
from urllib.error import URLError, HTTPError
from typing import Any
import configparser
from douyinliverecorder import spider, stream
from douyinliverecorder.proxy import ProxyDetector
from douyinliverecorder.utils import logger
from douyinliverecorder import utils
import httpx
from src import spider, stream
from src.proxy import ProxyDetector
from src.utils import logger
from src import utils
from msg_push import (
dingtalk, xizhi, tg_bot, send_email, bark, ntfy
dingtalk, xizhi, tg_bot, send_email, bark, ntfy, pushplus
)
from ffmpeg_install import (
check_ffmpeg, ffmpeg_path, current_env_path
)
version = "v4.0.2"
version = "v4.0.7"
platforms = ("\n国内站点:抖音|快手|虎牙|斗鱼|YY|B站|小红书|bigo|blued|网易CC|千度热播|猫耳FM|Look|TwitCasting|百度|微博|"
"酷狗|花椒|流星|Acfun|畅聊|映客|音播|知乎|嗨秀|VV星球|17Live|浪Live|漂漂|六间房|乐嗨|花猫|淘宝|京东"
"酷狗|花椒|流星|Acfun|畅聊|映客|音播|知乎|嗨秀|VV星球|17Live|浪Live|漂漂|六间房|乐嗨|花猫|淘宝|京东|咪咕|连接|来秀"
"\n海外站点TikTok|SOOP|PandaTV|WinkTV|FlexTV|PopkonTV|TwitchTV|LiveMe|ShowRoom|CHZZK|Shopee|"
"Youtube|Faceit")
"Youtube|Faceit|Picarto")
recording = set()
error_count = 0
@ -92,6 +92,7 @@ def display_info() -> None:
time.sleep(5)
while True:
try:
sys.stdout.flush()
time.sleep(5)
if Path(sys.executable).name != 'pythonw.exe':
os.system(clear_command)
@ -101,9 +102,9 @@ def display_info() -> None:
if split_video_by_time:
print(f"录制分段开启: {split_time}", end=" | ")
else:
print(f"录制分段开启: 否", end=" | ")
print("录制分段开启: 否", end=" | ")
if create_time_file:
print(f"是否生成时间文件: 是", end=" | ")
print("是否生成时间文件: 是", end=" | ")
print(f"录制视频质量为: {video_record_quality}", end=" | ")
print(f"录制视频格式为: {video_save_type}", end=" | ")
print(f"目前瞬时错误数为: {error_count}", end=" | ")
@ -219,7 +220,7 @@ def converts_mp4(converts_file_path: str, is_original_delete: bool = True) -> No
try:
if os.path.exists(converts_file_path) and os.path.getsize(converts_file_path) > 0:
if converts_to_h264:
color_obj.print_colored(f"正在转码为MP4格式并重新编码为h264\n", color_obj.YELLOW)
color_obj.print_colored("正在转码为MP4格式并重新编码为h264\n", color_obj.YELLOW)
ffmpeg_command = [
"ffmpeg", "-i", converts_file_path,
"-c:v", "libx264",
@ -230,7 +231,7 @@ def converts_mp4(converts_file_path: str, is_original_delete: bool = True) -> No
"-f", "mp4", converts_file_path.rsplit('.', maxsplit=1)[0] + ".mp4",
]
else:
color_obj.print_colored(f"正在转码为MP4格式\n", color_obj.YELLOW)
color_obj.print_colored("正在转码为MP4格式\n", color_obj.YELLOW)
ffmpeg_command = [
"ffmpeg", "-i", converts_file_path,
"-c:v", "copy",
@ -339,6 +340,7 @@ def push_message(record_name: str, live_url: str, content: str) -> None:
'NTFY': lambda: ntfy(
ntfy_api, title=msg_title, content=content, tags=ntfy_tags, action_url=live_url, email=ntfy_email
),
'PUSHPLUS': lambda: pushplus(pushplus_token, msg_title, content),
}
for platform, func in push_functions.items():
@ -365,7 +367,7 @@ def run_script(command: str) -> None:
print(stderr_decoded)
except PermissionError as e:
logger.error(e)
logger.error(f'脚本无执行权限!, 若是Linux环境, 请先执行:chmod +x your_script.sh 授予脚本可执行权限')
logger.error('脚本无执行权限!, 若是Linux环境, 请先执行:chmod +x your_script.sh 授予脚本可执行权限')
except OSError as e:
logger.error(e)
logger.error('Please add `#!/bin/bash` at the beginning of your bash script file.')
@ -380,6 +382,41 @@ def clear_record_info(record_name: str, record_url: str) -> None:
color_obj.print_colored(f"[{record_name}]已经从录制列表中移除\n", color_obj.YELLOW)
def direct_download_stream(source_url: str, save_path: str, record_name: str, live_url: str, platform: str) -> bool:
try:
with open(save_path, 'wb') as f:
client = httpx.Client(timeout=None)
headers = {}
header_params = get_record_headers(platform, live_url)
if header_params:
key, value = header_params.split(":", 1)
headers[key] = value
with client.stream('GET', source_url, headers=headers, follow_redirects=True) as response:
if response.status_code != 200:
logger.error(f"请求直播流失败,状态码: {response.status_code}")
return False
downloaded = 0
chunk_size = 1024 * 16
for chunk in response.iter_bytes(chunk_size):
if live_url in url_comments or exit_recording:
color_obj.print_colored(f"[{record_name}]录制时已被注释或请求停止,下载中断", color_obj.YELLOW)
clear_record_info(record_name, live_url)
return False
if chunk:
f.write(chunk)
downloaded += len(chunk)
print()
return True
except Exception as e:
logger.error(f"FLV下载错误: {e} 发生错误的行数: {e.__traceback__.tb_lineno}")
return False
def check_subprocess(record_name: str, record_url: str, ffmpeg_command: list, save_type: str,
script_command: str | None = None) -> bool:
save_file_path = ffmpeg_command[-1]
@ -431,7 +468,7 @@ def check_subprocess(record_name: str, record_url: str, ffmpeg_command: list, sa
params = [
f'--record_name "{record_name}"',
f'--save_file_path "{save_file_path}"',
f'--save_type {save_type}'
f'--save_type {save_type}',
f'--split_video_by_time {split_video_by_time}',
f'--converts_to_mp4 {converts_to_mp4}',
]
@ -462,6 +499,49 @@ def clean_name(input_text):
return cleaned_name or '空白昵称'
def get_quality_code(qn):
QUALITY_MAPPING = {
"原画": "OD",
"蓝光": "BD",
"超清": "UHD",
"高清": "HD",
"标清": "SD",
"流畅": "LD"
}
return QUALITY_MAPPING.get(qn)
def get_record_headers(platform, live_url):
live_domain = '/'.join(live_url.split('/')[0:3])
record_headers = {
'PandaTV': 'origin:https://www.pandalive.co.kr',
'WinkTV': 'origin:https://www.winktv.co.kr',
'PopkonTV': 'origin:https://www.popkontv.com',
'FlexTV': 'origin:https://www.flextv.co.kr',
'千度热播': 'referer:https://qiandurebo.com',
'17Live': 'referer:https://17.live/en/live/6302408',
'浪Live': 'referer:https://www.lang.live',
'shopee': f'origin:{live_domain}',
'Blued直播': 'referer:https://app.blued.cn'
}
return record_headers.get(platform)
def is_flv_preferred_platform(link):
return any(i in link for i in ["douyin", "tiktok"])
def select_source_url(link, stream_info):
if is_flv_preferred_platform(link):
codec = utils.get_query_params(stream_info.get('flv_url'), "codec")
if codec and codec[0] == 'h265':
logger.warning("FLV is not supported for h265 codec, use HLS source instead")
else:
return stream_info.get('flv_url')
return stream_info.get('record_url')
def start_record(url_data: tuple, count_variable: int = -1) -> None:
global error_count
@ -473,7 +553,8 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
new_record_url = ''
count_time = time.time()
retry = 0
record_quality, record_url, anchor_name = url_data
record_quality_zh, record_url, anchor_name = url_data
record_quality = get_quality_code(record_quality_zh)
proxy_address = proxy_addr
platform = '未知平台'
live_domain = '/'.join(record_url.split('/')[0:3])
@ -499,8 +580,8 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
if record_url.find("douyin.com/") > -1:
platform = '抖音直播'
with semaphore:
if 'v.douyin.com' not in record_url:
json_data = asyncio.run(spider.get_douyin_stream_data(
if 'v.douyin.com' not in record_url and '/user/' not in record_url:
json_data = asyncio.run(spider.get_douyin_web_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=dy_cookie))
@ -509,7 +590,8 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
url=record_url,
proxy_addr=proxy_address,
cookies=dy_cookie))
port_info = stream.get_douyin_stream_url(json_data, record_quality)
port_info = asyncio.run(
stream.get_douyin_stream_url(json_data, record_quality, proxy_address))
elif record_url.find("https://www.tiktok.com/") > -1:
platform = 'TikTok直播'
@ -519,7 +601,8 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
url=record_url,
proxy_addr=proxy_address,
cookies=tiktok_cookie))
port_info = stream.get_tiktok_stream_url(json_data, record_quality)
port_info = asyncio.run(
stream.get_tiktok_stream_url(json_data, record_quality, proxy_address))
else:
logger.error("错误信息: 网络异常请检查网络是否能正常访问TikTok平台")
@ -530,17 +613,17 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
url=record_url,
proxy_addr=proxy_address,
cookies=ks_cookie))
port_info = stream.get_kuaishou_stream_url(json_data, record_quality)
port_info = asyncio.run(stream.get_kuaishou_stream_url(json_data, record_quality))
elif record_url.find("https://www.huya.com/") > -1:
platform = '虎牙直播'
with semaphore:
if record_quality not in ['原画', '蓝光', '超清']:
if record_quality not in ['OD', 'BD', 'UHD']:
json_data = asyncio.run(spider.get_huya_stream_data(
url=record_url,
proxy_addr=proxy_address,
cookies=hy_cookie))
port_info = stream.get_huya_stream_url(json_data, record_quality)
port_info = asyncio.run(stream.get_huya_stream_url(json_data, record_quality))
else:
port_info = asyncio.run(spider.get_huya_app_stream_url(
url=record_url,
@ -562,7 +645,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
with semaphore:
json_data = asyncio.run(spider.get_yy_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=yy_cookie))
port_info = stream.get_yy_stream_url(json_data)
port_info = asyncio.run(stream.get_yy_stream_url(json_data))
elif record_url.find("https://live.bilibili.com/") > -1:
platform = 'B站直播'
@ -572,16 +655,15 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
port_info = asyncio.run(stream.get_bilibili_stream_url(
json_data, video_quality=record_quality, cookies=bili_cookie, proxy_addr=proxy_address))
elif record_url.find("https://www.redelight.cn/") > -1 or \
record_url.find("https://www.xiaohongshu.com/") > -1 or \
record_url.find("http://xhslink.com/") > -1:
elif record_url.find("http://xhslink.com/") > -1 or \
record_url.find("https://www.xiaohongshu.com/") > -1:
platform = '小红书直播'
with semaphore:
port_info = asyncio.run(spider.get_xhs_stream_url(
record_url, proxy_addr=proxy_address, cookies=xhs_cookie))
retry += 1
elif record_url.find("https://www.bigo.tv/") > -1 or record_url.find("slink.bigovideo.tv/") > -1:
elif record_url.find("www.bigo.tv/") > -1 or record_url.find("slink.bigovideo.tv/") > -1:
platform = 'Bigo直播'
with semaphore:
port_info = asyncio.run(spider.get_bigo_stream_url(
@ -593,7 +675,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
port_info = asyncio.run(spider.get_blued_stream_url(
record_url, proxy_addr=proxy_address, cookies=blued_cookie))
elif record_url.find("sooplive.co.kr/") > -1:
elif record_url.find("sooplive.co.kr/") > -1 or record_url.find("sooplive.com/") > -1:
platform = 'SOOP'
with semaphore:
if global_proxy or proxy_address:
@ -607,7 +689,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
utils.update_config(
config_file, 'Cookie', 'sooplive_cookie', json_data['new_cookies']
)
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality, spec=True))
else:
logger.error("错误信息: 网络异常请检查本网络是否能正常访问SOOP平台")
@ -616,7 +698,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
with semaphore:
json_data = asyncio.run(spider.get_netease_stream_data(
url=record_url, cookies=netease_cookie))
port_info = stream.get_netease_stream_url(json_data, record_quality)
port_info = asyncio.run(stream.get_netease_stream_url(json_data, record_quality))
elif record_url.find("qiandurebo.com/") > -1:
platform = '千度热播'
@ -633,7 +715,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
proxy_addr=proxy_address,
cookies=pandatv_cookie
))
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality, spec=True))
else:
logger.error("错误信息: 网络异常请检查本网络是否能正常访问PandaTV直播平台")
@ -651,11 +733,11 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
url=record_url,
proxy_addr=proxy_address,
cookies=winktv_cookie))
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality, spec=True))
else:
logger.error("错误信息: 网络异常请检查本网络是否能正常访问WinkTV直播平台")
elif record_url.find("www.flextv.co.kr/") > -1:
elif record_url.find("www.flextv.co.kr/") > -1 or record_url.find("www.ttinglive.com/") > -1:
platform = 'FlexTV'
with semaphore:
if global_proxy or proxy_address:
@ -670,7 +752,10 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
utils.update_config(
config_file, 'Cookie', 'flextv_cookie', json_data['new_cookies']
)
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
if 'play_url_list' in json_data:
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality, spec=True))
else:
port_info = json_data
else:
logger.error("错误信息: 网络异常请检查本网络是否能正常访问FlexTV直播平台")
@ -705,7 +790,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
elif record_url.find("twitcasting.tv/") > -1:
platform = 'TwitCasting'
with semaphore:
port_info = asyncio.run(spider.get_twitcasting_stream_url(
json_data = asyncio.run(spider.get_twitcasting_stream_url(
url=record_url,
proxy_addr=proxy_address,
cookies=twitcasting_cookie,
@ -713,6 +798,8 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
username=twitcasting_username,
password=twitcasting_password
))
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality, spec=False))
if port_info and port_info.get('new_cookies'):
utils.update_config(
file_path=config_file, section='Cookie', key='twitcasting_cookie',
@ -726,14 +813,15 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
url=record_url,
proxy_addr=proxy_address,
cookies=baidu_cookie))
port_info = stream.get_stream_url(json_data, record_quality)
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality))
elif record_url.find("weibo.com/") > -1:
platform = '微博直播'
with semaphore:
json_data = asyncio.run(spider.get_weibo_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=weibo_cookie))
port_info = stream.get_stream_url(json_data, record_quality, hls_extra_key='m3u8_url')
port_info = asyncio.run(stream.get_stream_url(
json_data, record_quality, hls_extra_key='m3u8_url'))
elif record_url.find("kugou.com/") > -1:
platform = '酷狗直播'
@ -750,7 +838,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
proxy_addr=proxy_address,
cookies=twitch_cookie
))
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality, spec=True))
else:
logger.error("错误信息: 网络异常请检查本网络是否能正常访问TwitchTV直播平台")
@ -780,15 +868,15 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
with semaphore:
json_data = asyncio.run(spider.get_showroom_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=showroom_cookie))
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality, spec=True))
elif record_url.find("live.acfun.cn/") > -1 or record_url.find("m.acfun.cn/") > -1:
platform = 'Acfun'
with semaphore:
json_data = asyncio.run(spider.get_acfun_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=acfun_cookie))
port_info = stream.get_stream_url(
json_data, record_quality, url_type='flv', flv_extra_key='url')
port_info = asyncio.run(stream.get_stream_url(
json_data, record_quality, url_type='flv', flv_extra_key='url'))
elif record_url.find("live.tlclw.com/") > -1:
platform = '畅聊直播'
@ -819,7 +907,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
with semaphore:
json_data = asyncio.run(spider.get_chzzk_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=chzzk_cookie))
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality, spec=True))
elif record_url.find("www.haixiutv.com/") > -1:
platform = '嗨秀直播'
@ -882,17 +970,17 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
with semaphore:
json_data = asyncio.run(spider.get_youtube_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=youtube_cookie))
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality, spec=True))
elif record_url.find("tb.cn") > -1:
platform = '淘宝直播'
with semaphore:
json_data = asyncio.run(spider.get_taobao_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=taobao_cookie))
port_info = stream.get_stream_url(
port_info = asyncio.run(stream.get_stream_url(
json_data, record_quality,
url_type='all', hls_extra_key='hlsUrl', flv_extra_key='flvUrl'
)
))
elif record_url.find("3.cn") > -1 or record_url.find("m.jd.com") > -1:
platform = '京东直播'
@ -907,10 +995,34 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
with semaphore:
json_data = asyncio.run(spider.get_faceit_stream_data(
url=record_url, proxy_addr=proxy_address, cookies=faceit_cookie))
port_info = stream.get_stream_url(json_data, record_quality, spec=True)
port_info = asyncio.run(stream.get_stream_url(json_data, record_quality, spec=True))
else:
logger.error("错误信息: 网络异常请检查本网络是否能正常访问faceit直播平台")
elif record_url.find("www.miguvideo.com") > -1 or record_url.find("m.miguvideo.com") > -1:
platform = '咪咕直播'
with semaphore:
port_info = asyncio.run(spider.get_migu_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=migu_cookie))
elif record_url.find("show.lailianjie.com") > -1:
platform = '连接直播'
with semaphore:
port_info = asyncio.run(spider.get_lianjie_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=lianjie_cookie))
elif record_url.find("www.imkktv.com") > -1:
platform = '来秀直播'
with semaphore:
port_info = asyncio.run(spider.get_laixiu_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=laixiu_cookie))
elif record_url.find("www.picarto.tv") > -1:
platform = 'Picarto'
with semaphore:
port_info = asyncio.run(spider.get_picarto_stream_url(
url=record_url, proxy_addr=proxy_address, cookies=picarto_cookie))
elif record_url.find(".m3u8") > -1 or record_url.find(".flv") > -1:
platform = '自定义录制直播'
port_info = {
@ -1002,7 +1114,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
time.sleep(push_check_seconds)
continue
real_url = port_info.get('record_url')
real_url = select_source_url(record_url, port_info)
full_path = f'{default_path}/{platform}'
if real_url:
now = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
@ -1034,12 +1146,13 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
except Exception as e:
logger.error(f"错误信息: {e} 发生错误的行数: {e.__traceback__.tb_lineno}")
if enable_https_recording and real_url.startswith("http://"):
real_url = real_url.replace("http://", "https://")
if platform != '自定义录制直播':
if enable_https_recording and real_url.startswith("http://"):
real_url = real_url.replace("http://", "https://")
http_record_list = ['shopee']
if platform in http_record_list:
real_url = real_url.replace("https://", "http://")
http_record_list = ['shopee', "migu"]
if platform in http_record_list:
real_url = real_url.replace("https://", "http://")
user_agent = ("Mozilla/5.0 (Linux; Android 11; SAMSUNG SM-G973U) AppleWebKit/537.36 ("
"KHTML, like Gecko) SamsungBrowser/14.2 Chrome/87.0.4280.141 Mobile "
@ -1081,18 +1194,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
"-avoid_negative_ts", "1"
]
record_headers = {
'PandaTV': 'origin:https://www.pandalive.co.kr',
'WinkTV': 'origin:https://www.winktv.co.kr',
'PopkonTV': 'origin:https://www.popkontv.com',
'FlexTV': 'origin:https://www.flextv.co.kr',
'千度热播': 'referer:https://qiandurebo.com',
'17Live': 'referer:https://17.live/en/live/6302408',
'浪Live': 'referer:https://www.lang.live',
'shopee': f'origin:{live_domain}',
}
headers = record_headers.get(platform)
headers = get_record_headers(platform, record_url)
if headers:
ffmpeg_command.insert(11, "-headers")
ffmpeg_command.insert(12, headers)
@ -1103,23 +1205,108 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
recording.add(record_name)
start_record_time = datetime.datetime.now()
recording_time_list[record_name] = [start_record_time, record_quality]
recording_time_list[record_name] = [start_record_time, record_quality_zh]
rec_info = f"\r{anchor_name} 准备开始录制视频: {full_path}"
if show_url:
re_plat = ('WinkTV', 'PandaTV', 'ShowRoom', 'CHZZK', 'Youtube')
if platform in re_plat:
logger.info(f"{platform} | {anchor_name} | 直播源地址: {port_info['m3u8_url']}")
logger.info(
f"{platform} | {anchor_name} | 直播源地址: {port_info.get('m3u8_url')}")
else:
logger.info(
f"{platform} | {anchor_name} | 直播源地址: {port_info['record_url']}")
f"{platform} | {anchor_name} | 直播源地址: {real_url}")
only_flv_record = False
only_flv_platform_list = ['shopee'] if os.name == 'nt' else ['shopee', '花椒直播']
only_flv_platform_list = ['shopee', '花椒直播']
if platform in only_flv_platform_list:
logger.debug(f"提示: {platform} 将强制使用FLV格式录制")
only_flv_record = True
if video_save_type == "FLV" or only_flv_record:
only_audio_record = False
only_audio_platform_list = ['猫耳FM直播', 'Look直播']
if platform in only_audio_platform_list:
only_audio_record = True
record_save_type = video_save_type
if is_flv_preferred_platform(record_url) and port_info.get('flv_url'):
codec = utils.get_query_params(port_info['flv_url'], "codec")
if codec and codec[0] == 'h265':
logger.warning("FLV is not supported for h265 codec, use TS format instead")
record_save_type = "TS"
if only_audio_record or any(i in record_save_type for i in ['MP3', 'M4A']):
try:
now = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
extension = "mp3" if "m4a" not in record_save_type.lower() else "m4a"
name_format = "_%03d" if split_video_by_time else ""
save_file_path = (f"{full_path}/{anchor_name}_{title_in_name}{now}"
f"{name_format}.{extension}")
if split_video_by_time:
print(f'\r{anchor_name} 准备开始录制音频: {save_file_path}')
if "MP3" in record_save_type:
command = [
"-map", "0:a",
"-c:a", "libmp3lame",
"-ab", "320k",
"-f", "segment",
"-segment_time", split_time,
"-reset_timestamps", "1",
save_file_path,
]
else:
command = [
"-map", "0:a",
"-c:a", "aac",
"-bsf:a", "aac_adtstoasc",
"-ab", "320k",
"-f", "segment",
"-segment_time", split_time,
"-segment_format", 'mpegts',
"-reset_timestamps", "1",
save_file_path,
]
else:
if "MP3" in record_save_type:
command = [
"-map", "0:a",
"-c:a", "libmp3lame",
"-ab", "320k",
save_file_path,
]
else:
command = [
"-map", "0:a",
"-c:a", "aac",
"-bsf:a", "aac_adtstoasc",
"-ab", "320k",
"-movflags", "+faststart",
save_file_path,
]
ffmpeg_command.extend(command)
comment_end = check_subprocess(
record_name,
record_url,
ffmpeg_command,
record_save_type,
custom_script
)
if comment_end:
return
except subprocess.CalledProcessError as e:
logger.error(f"错误信息: {e} 发生错误的行数: {e.__traceback__.tb_lineno}")
with max_request_lock:
error_count += 1
error_window.append(1)
if only_flv_record:
logger.info(f"Use Direct Downloader to Download FLV Stream: {record_url}")
filename = anchor_name + f'_{title_in_name}' + now + '.flv'
save_file_path = f'{full_path}/{filename}'
print(f'{rec_info}/{filename}')
@ -1136,11 +1323,20 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
try:
flv_url = port_info.get('flv_url')
if flv_url:
_filepath, _ = urllib.request.urlretrieve(flv_url, save_file_path)
record_finished = True
recording.add(record_name)
start_record_time = datetime.datetime.now()
recording_time_list[record_name] = [start_record_time, record_quality_zh]
download_success = direct_download_stream(
flv_url, save_file_path, record_name, record_url, platform
)
if download_success:
record_finished = True
print(
f"\n{anchor_name} {time.strftime('%Y-%m-%d %H:%M:%S')} 直播录制完成\n")
recording.discard(record_name)
print(
f"\n{anchor_name} {time.strftime('%Y-%m-%d %H:%M:%S')} 直播录制完成\n")
else:
logger.debug("未找到FLV直播流跳过录制")
except Exception as e:
@ -1153,6 +1349,54 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
error_count += 1
error_window.append(1)
elif record_save_type == "FLV":
filename = anchor_name + f'_{title_in_name}' + now + ".flv"
print(f'{rec_info}/{filename}')
save_file_path = full_path + '/' + filename
try:
if split_video_by_time:
now = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
save_file_path = f"{full_path}/{anchor_name}_{title_in_name}{now}_%03d.flv"
command = [
"-map", "0",
"-c:v", "copy",
"-c:a", "copy",
"-bsf:a", "aac_adtstoasc",
"-f", "segment",
"-segment_time", split_time,
"-segment_format", "flv",
"-reset_timestamps", "1",
save_file_path
]
else:
command = [
"-map", "0",
"-c:v", "copy",
"-c:a", "copy",
"-bsf:a", "aac_adtstoasc",
"-f", "flv",
"{path}".format(path=save_file_path),
]
ffmpeg_command.extend(command)
comment_end = check_subprocess(
record_name,
record_url,
ffmpeg_command,
record_save_type,
custom_script
)
if comment_end:
return
except subprocess.CalledProcessError as e:
logger.error(f"错误信息: {e} 发生错误的行数: {e.__traceback__.tb_lineno}")
with max_request_lock:
error_count += 1
error_window.append(1)
try:
if converts_to_mp4:
seg_file_path = f"{full_path}/{anchor_name}_{title_in_name}{now}_%03d.mp4"
@ -1179,7 +1423,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
except Exception as e:
logger.error(f"转码失败: {e} ")
elif video_save_type == "MKV":
elif record_save_type == "MKV":
filename = anchor_name + f'_{title_in_name}' + now + ".mkv"
print(f'{rec_info}/{filename}')
save_file_path = full_path + '/' + filename
@ -1215,7 +1459,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
record_name,
record_url,
ffmpeg_command,
video_save_type,
record_save_type,
custom_script
)
if comment_end:
@ -1227,7 +1471,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
error_count += 1
error_window.append(1)
elif video_save_type == "MP4":
elif record_save_type == "MP4":
filename = anchor_name + f'_{title_in_name}' + now + ".mp4"
print(f'{rec_info}/{filename}')
save_file_path = full_path + '/' + filename
@ -1262,77 +1506,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
record_name,
record_url,
ffmpeg_command,
video_save_type,
custom_script
)
if comment_end:
return
except subprocess.CalledProcessError as e:
logger.error(f"错误信息: {e} 发生错误的行数: {e.__traceback__.tb_lineno}")
with max_request_lock:
error_count += 1
error_window.append(1)
elif "音频" in video_save_type:
try:
now = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
extension = "mp3" if "MP3" in video_save_type else "m4a"
name_format = "_%03d" if split_video_by_time else ""
save_file_path = (f"{full_path}/{anchor_name}_{title_in_name}{now}"
f"{name_format}.{extension}")
if split_video_by_time:
print(f'\r{anchor_name} 准备开始录制音频: {save_file_path}')
if "MP3" in video_save_type:
command = [
"-map", "0:a",
"-c:a", "libmp3lame",
"-ab", "320k",
"-f", "segment",
"-segment_time", split_time,
"-reset_timestamps", "1",
save_file_path,
]
else:
command = [
"-map", "0:a",
"-c:a", "aac",
"-bsf:a", "aac_adtstoasc",
"-ab", "320k",
"-f", "segment",
"-segment_time", split_time,
"-segment_format", 'mpegts',
"-reset_timestamps", "1",
save_file_path,
]
else:
if "MP3" in video_save_type:
command = [
"-map", "0:a",
"-c:a", "libmp3lame",
"-ab", "320k",
save_file_path,
]
else:
command = [
"-map", "0:a",
"-c:a", "aac",
"-bsf:a", "aac_adtstoasc",
"-ab", "320k",
"-movflags", "+faststart",
save_file_path,
]
ffmpeg_command.extend(command)
comment_end = check_subprocess(
record_name,
record_url,
ffmpeg_command,
video_save_type,
record_save_type,
custom_script
)
if comment_end:
@ -1368,7 +1542,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
record_name,
record_url,
ffmpeg_command,
video_save_type,
record_save_type,
custom_script
)
if comment_end:
@ -1412,7 +1586,7 @@ def start_record(url_data: tuple, count_variable: int = -1) -> None:
record_name,
record_url,
ffmpeg_command,
video_save_type,
record_save_type,
custom_script
)
if comment_end:
@ -1600,8 +1774,8 @@ try:
print("System Proxy: http://{}:{}".format(proxy_info.ip, proxy_info.port))
except HTTPError as err:
print(f"HTTP error occurred: {err.code} - {err.reason}")
except URLError as err:
color_obj.print_colored(f"INFO未检测到全局/规则网络代理,请检查代理配置(若无需录制海外直播请忽略此条提示)",
except URLError:
color_obj.print_colored("INFO未检测到全局/规则网络代理,请检查代理配置(若无需录制海外直播请忽略此条提示)",
color_obj.YELLOW)
except Exception as err:
print("An unexpected error occurred:", err)
@ -1643,7 +1817,7 @@ while True:
loop_time = options.get(read_config_value(config, '录制设置', '是否显示循环秒数', ""), False)
show_url = options.get(read_config_value(config, '录制设置', '是否显示直播源地址', ""), False)
split_video_by_time = options.get(read_config_value(config, '录制设置', '分段录制是否开启', ""), False)
enable_https_recording = options.get(read_config_value(config, '录制设置', '强制启用HTTPS录制', ""), False)
enable_https_recording = options.get(read_config_value(config, '录制设置', '是否强制启用https录制', ""), False)
disk_space_limit = float(read_config_value(config, '录制设置', '录制空间剩余阈值(gb)', 1.0))
split_time = str(read_config_value(config, '录制设置', '视频分段时间(秒)', 1800))
converts_to_mp4 = options.get(read_config_value(config, '录制设置', '录制完成后自动转为mp4格式', ""), False)
@ -1680,6 +1854,7 @@ while True:
ntfy_api = read_config_value(config, '推送配置', 'ntfy推送地址', "")
ntfy_tags = read_config_value(config, '推送配置', 'ntfy推送标签', "tada")
ntfy_email = read_config_value(config, '推送配置', 'ntfy推送邮箱', "")
pushplus_token = read_config_value(config, '推送配置', 'pushplus推送token', "")
push_message_title = read_config_value(config, '推送配置', '自定义推送标题', "直播间状态更新通知")
begin_push_message_text = read_config_value(config, '推送配置', '自定义开播推送内容', "")
over_push_message_text = read_config_value(config, '推送配置', '自定义关播推送内容', "")
@ -1744,8 +1919,12 @@ while True:
taobao_cookie = read_config_value(config, 'Cookie', 'taobao_cookie', '')
jd_cookie = read_config_value(config, 'Cookie', 'jd_cookie', '')
faceit_cookie = read_config_value(config, 'Cookie', 'faceit_cookie', '')
migu_cookie = read_config_value(config, 'Cookie', 'migu_cookie', '')
lianjie_cookie = read_config_value(config, 'Cookie', 'lianjie_cookie', '')
laixiu_cookie = read_config_value(config, 'Cookie', 'laixiu_cookie', '')
picarto_cookie = read_config_value(config, 'Cookie', 'picarto_cookie', '')
video_save_type_list = ("FLV", "MKV", "TS", "MP4", "MP3音频", "M4A音频")
video_save_type_list = ("FLV", "MKV", "TS", "MP4", "MP3音频", "M4A音频", "MP3", "M4A")
if video_save_type and video_save_type.upper() in video_save_type_list:
video_save_type = video_save_type.upper()
else:
@ -1773,7 +1952,7 @@ while True:
delete_line(url_config_file, origin_line)
line_list.append(origin_line)
line = origin_line.strip()
if len(line) < 20:
if len(line) < 18:
continue
line_spilt = line.split('主播: ')
@ -1861,15 +2040,23 @@ while True:
'e.tb.cn',
'huodong.m.taobao.com',
'3.cn',
'eco.m.jd.com'
'eco.m.jd.com',
'www.miguvideo.com',
'm.miguvideo.com',
'show.lailianjie.com',
'www.imkktv.com',
'www.picarto.tv'
]
overseas_platform_host = [
'www.tiktok.com',
'play.sooplive.co.kr',
'm.sooplive.co.kr',
'www.sooplive.com',
'm.sooplive.com',
'www.pandalive.co.kr',
'www.winktv.co.kr',
'www.flextv.co.kr',
'www.ttinglive.com',
'www.popkontv.com',
'www.twitch.tv',
'www.liveme.com',
@ -1965,4 +2152,4 @@ while True:
t2.start()
first_run = False
time.sleep(3)
time.sleep(3)

View File

@ -213,6 +213,42 @@ def ntfy(api: str, title: str = "message", content: str = 'test', tags: str = 't
return {"success": success, "error": error}
def pushplus(token: str, title: str, content: str) -> Dict[str, Any]:
"""
PushPlus推送通知
API文档: https://www.pushplus.plus/doc/
"""
success = []
error = []
token_list = token.replace('', ',').split(',') if token.strip() else []
for _token in token_list:
json_data = {
'token': _token,
'title': title,
'content': content
}
try:
url = 'https://www.pushplus.plus/send'
data = json.dumps(json_data).encode('utf-8')
req = urllib.request.Request(url, data=data, headers=headers)
response = opener.open(req, timeout=10)
json_str = response.read().decode('utf-8')
json_data = json.loads(json_str)
if json_data.get('code') == 200:
success.append(_token)
else:
error.append(_token)
print(f'PushPlus推送失败, Token{_token}, 失败信息:{json_data.get("msg", "未知错误")}')
except Exception as e:
error.append(_token)
print(f'PushPlus推送失败, Token{_token}, 错误信息:{e}')
return {"success": success, "error": error}
if __name__ == '__main__':
send_title = '直播通知' # 标题
send_content = '张三 开播了!' # 推送内容
@ -252,4 +288,8 @@ if __name__ == '__main__':
api="https://ntfy.sh/xxxxx",
title="直播推送",
content="xxx已开播",
)
)
# PushPlus推送通知
pushplus_token = '' # 替换成自己的PushPlus Token获取地址https://www.pushplus.plus/
# pushplus(pushplus_token, send_title, send_content)

View File

@ -1,4 +0,0 @@
[virtualenvs]
in-project = true
create = true
prefer-active-python = true

View File

@ -1,24 +1,23 @@
[project]
name = "douyinliverecorder"
version = "4.0.2"
description = "An easy tool for recording live streams"
authors = [{ name = "Hmily" }]
license = {text = "MIT"}
name = "DouyinLiveRecorder"
version = "4.0.7"
description = "可循环值守和多人录制的直播录制软件, 支持抖音、TikTok、Youtube、快手、虎牙、斗鱼、B站、小红书、pandatv、sooplive、flextv、popkontv、twitcasting、winktv、百度、微博、酷狗、17Live、Twitch、Acfun、CHZZK、shopee等40+平台直播录制"
readme = "README.md"
urls = {Repository = "https://github.com/ihmily/DouyinLiveRecorder"}
keywords = ["douyin", "live", "recorder"]
requires-python = ">=3.10,<4.0"
authors = [{name = "Hmily"}]
license = { text = "MIT" }
requires-python = ">=3.10"
dependencies = [
"requests>=2.31.0",
"loguru>=0.7.3",
"pycryptodome>=3.20.0",
"distro>=1.9.0",
"tqdm>=4.67.1",
"httpx==0.28.1",
"httpx[http2]>=0.28.1",
"PyExecJS>=1.5.1"
]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[project.urls]
"Homepage" = "https://github.com/ihmily/DouyinLiveRecorder"
"Documentation" = "https://github.com/ihmily/DouyinLiveRecorder"
"Repository" = "https://github.com/ihmily/DouyinLiveRecorder"
"Issues" = "https://github.com/ihmily/DouyinLiveRecorder/issues"

View File

@ -3,5 +3,5 @@ loguru>=0.7.3
pycryptodome>=3.20.0
distro>=1.9.0
tqdm>=4.67.1
httpx>=0.28.1
httpx[http2]>=0.28.1
PyExecJS>=1.5.1

View File

@ -1,32 +0,0 @@
# -*- coding: utf-8 -*-
from setuptools import setup, find_packages
setup(
name='douyinliverecorder',
version='4.0.2',
author='Hmily',
description='An easy tool for recording live streams',
long_description=open('README.md', encoding='utf-8').read(),
long_description_content_type='text/markdown',
url='https://github.com/ihmily/DouyinLiveRecorder',
packages=find_packages(),
install_requires=[
'requests>=2.31.0',
'loguru>=0.7.3',
'pycryptodome>=3.20.0',
'distro>=1.9.0',
'tqdm>=4.67.1',
'httpx>=0.28.1'
'PyExecJS>=1.5.1',
],
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13',
]
)

454
src/ab_sign.py Normal file
View File

@ -0,0 +1,454 @@
# -*- encoding: utf-8 -*-
import math
import time
def rc4_encrypt(plaintext: str, key: str) -> str:
# 初始化状态数组
s = list(range(256))
# 使用密钥对状态数组进行置换
j = 0
for i in range(256):
j = (j + s[i] + ord(key[i % len(key)])) % 256
s[i], s[j] = s[j], s[i]
# 生成密钥流并加密
i = j = 0
result = []
for char in plaintext:
i = (i + 1) % 256
j = (j + s[i]) % 256
s[i], s[j] = s[j], s[i]
t = (s[i] + s[j]) % 256
result.append(chr(s[t] ^ ord(char)))
return ''.join(result)
def left_rotate(x: int, n: int) -> int:
n %= 32
return ((x << n) | (x >> (32 - n))) & 0xFFFFFFFF
def get_t_j(j: int) -> int:
if 0 <= j < 16:
return 2043430169 # 0x79CC4519
elif 16 <= j < 64:
return 2055708042 # 0x7A879D8A
else:
raise ValueError("invalid j for constant Tj")
def ff_j(j: int, x: int, y: int, z: int) -> int:
if 0 <= j < 16:
return (x ^ y ^ z) & 0xFFFFFFFF
elif 16 <= j < 64:
return ((x & y) | (x & z) | (y & z)) & 0xFFFFFFFF
else:
raise ValueError("invalid j for bool function FF")
def gg_j(j: int, x: int, y: int, z: int) -> int:
if 0 <= j < 16:
return (x ^ y ^ z) & 0xFFFFFFFF
elif 16 <= j < 64:
return ((x & y) | (~x & z)) & 0xFFFFFFFF
else:
raise ValueError("invalid j for bool function GG")
class SM3:
def __init__(self):
self.reg = []
self.chunk = []
self.size = 0
self.reset()
def reset(self):
# 初始化寄存器值 - 修正为与JS版本相同的值
self.reg = [
1937774191, 1226093241, 388252375, 3666478592,
2842636476, 372324522, 3817729613, 2969243214
]
self.chunk = []
self.size = 0
def write(self, data):
# 将输入转换为字节数组
if isinstance(data, str):
# 直接转换为UTF-8字节列表
a = list(data.encode('utf-8'))
else:
a = data
self.size += len(a)
f = 64 - len(self.chunk)
if len(a) < f:
# 如果数据长度小于剩余空间,直接添加
self.chunk.extend(a)
else:
# 否则分块处理
self.chunk.extend(a[:f])
while len(self.chunk) >= 64:
self._compress(self.chunk)
if f < len(a):
self.chunk = a[f:min(f + 64, len(a))]
else:
self.chunk = []
f += 64
def _fill(self):
# 计算比特长度
bit_length = 8 * self.size
# 添加填充位
padding_pos = len(self.chunk)
self.chunk.append(0x80)
padding_pos = (padding_pos + 1) % 64
# 如果剩余空间不足8字节则填充到下一个块
if 64 - padding_pos < 8:
padding_pos -= 64
# 填充0直到剩余8字节用于存储长度
while padding_pos < 56:
self.chunk.append(0)
padding_pos += 1
# 添加消息长度高32位
high_bits = bit_length // 4294967296
for i in range(4):
self.chunk.append((high_bits >> (8 * (3 - i))) & 0xFF)
# 添加消息长度低32位
for i in range(4):
self.chunk.append((bit_length >> (8 * (3 - i))) & 0xFF)
def _compress(self, data):
if len(data) < 64:
raise ValueError("compress error: not enough data")
else:
# 消息扩展
w = [0] * 132
# 将字节数组转换为字
for t in range(16):
w[t] = (data[4 * t] << 24) | (data[4 * t + 1] << 16) | (data[4 * t + 2] << 8) | data[4 * t + 3]
w[t] &= 0xFFFFFFFF
# 消息扩展
for j in range(16, 68):
a = w[j - 16] ^ w[j - 9] ^ left_rotate(w[j - 3], 15)
a = a ^ left_rotate(a, 15) ^ left_rotate(a, 23)
w[j] = (a ^ left_rotate(w[j - 13], 7) ^ w[j - 6]) & 0xFFFFFFFF
# 计算w'
for j in range(64):
w[j + 68] = (w[j] ^ w[j + 4]) & 0xFFFFFFFF
# 压缩
a, b, c, d, e, f, g, h = self.reg
for j in range(64):
ss1 = left_rotate((left_rotate(a, 12) + e + left_rotate(get_t_j(j), j)) & 0xFFFFFFFF, 7)
ss2 = ss1 ^ left_rotate(a, 12)
tt1 = (ff_j(j, a, b, c) + d + ss2 + w[j + 68]) & 0xFFFFFFFF
tt2 = (gg_j(j, e, f, g) + h + ss1 + w[j]) & 0xFFFFFFFF
d = c
c = left_rotate(b, 9)
b = a
a = tt1
h = g
g = left_rotate(f, 19)
f = e
e = (tt2 ^ left_rotate(tt2, 9) ^ left_rotate(tt2, 17)) & 0xFFFFFFFF
# 更新寄存器
self.reg[0] ^= a
self.reg[1] ^= b
self.reg[2] ^= c
self.reg[3] ^= d
self.reg[4] ^= e
self.reg[5] ^= f
self.reg[6] ^= g
self.reg[7] ^= h
def sum(self, data=None, output_format=None):
"""
计算哈希值
"""
# 如果提供了输入,则重置并写入
if data is not None:
self.reset()
self.write(data)
self._fill()
# 分块压缩
for f in range(0, len(self.chunk), 64):
self._compress(self.chunk[f:f + 64])
if output_format == 'hex':
# 十六进制输出
result = ''.join(f'{val:08x}' for val in self.reg)
else:
# 字节数组输出
result = []
for f in range(8):
c = self.reg[f]
result.append((c >> 24) & 0xFF)
result.append((c >> 16) & 0xFF)
result.append((c >> 8) & 0xFF)
result.append(c & 0xFF)
self.reset()
return result
def result_encrypt(long_str: str, num: str | None = None) -> str:
# 魔改base64编码表
encoding_tables = {
"s0": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=",
"s1": "Dkdpgh4ZKsQB80/Mfvw36XI1R25+WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=",
"s2": "Dkdpgh4ZKsQB80/Mfvw36XI1R25-WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=",
"s3": "ckdp1h4ZKsUB80/Mfvw36XIgR25+WQAlEi7NLboqYTOPuzmFjJnryx9HVGDaStCe",
"s4": "Dkdpgh2ZmsQB80/MfvV36XI1R45-WUAlEixNLwoqYTOPuzKFjJnry79HbGcaStCe"
}
# 位移常量
masks = [16515072, 258048, 4032, 63] # 对应 0, 1, 2 的掩码添加63作为第四个掩码
shifts = [18, 12, 6, 0] # 对应的位移量
encoding_table = encoding_tables[num]
result = ""
round_num = 0
long_int = get_long_int(round_num, long_str)
total_chars = math.ceil(len(long_str) / 3 * 4)
for i in range(total_chars):
# 每4个字符处理一组3字节
if i // 4 != round_num:
round_num += 1
long_int = get_long_int(round_num, long_str)
# 计算当前位置的索引
index = i % 4
# 使用掩码和位移提取6位值
char_index = (long_int & masks[index]) >> shifts[index]
result += encoding_table[char_index]
return result
def get_long_int(round_num: int, long_str: str) -> int:
round_num = round_num * 3
# 获取字符串中的字符如果超出范围则使用0
char1 = ord(long_str[round_num]) if round_num < len(long_str) else 0
char2 = ord(long_str[round_num + 1]) if round_num + 1 < len(long_str) else 0
char3 = ord(long_str[round_num + 2]) if round_num + 2 < len(long_str) else 0
return (char1 << 16) | (char2 << 8) | char3
def gener_random(random_num: int, option: list[int]) -> list[int]:
byte1 = random_num & 255
byte2 = (random_num >> 8) & 255
return [
(byte1 & 170) | (option[0] & 85), # 偶数位与option[0]的奇数位合并
(byte1 & 85) | (option[0] & 170), # 奇数位与option[0]的偶数位合并
(byte2 & 170) | (option[1] & 85), # 偶数位与option[1]的奇数位合并
(byte2 & 85) | (option[1] & 170), # 奇数位与option[1]的偶数位合并
]
def generate_random_str() -> str:
"""
生成随机字符串
Returns:
随机字符串
"""
# 使用与JS版本相同的固定随机值
random_values = [0.123456789, 0.987654321, 0.555555555]
# 生成三组随机字节并合并
random_bytes = []
random_bytes.extend(gener_random(int(random_values[0] * 10000), [3, 45]))
random_bytes.extend(gener_random(int(random_values[1] * 10000), [1, 0]))
random_bytes.extend(gener_random(int(random_values[2] * 10000), [1, 5]))
return ''.join(chr(b) for b in random_bytes)
def generate_rc4_bb_str(url_search_params: str, user_agent: str, window_env_str: str,
suffix: str = "cus", arguments: list[int] | None = None) -> str:
if arguments is None:
arguments = [0, 1, 14]
sm3 = SM3()
start_time = int(time.time() * 1000)
# 三次加密处理
# 1: url_search_params两次sm3之的结果
url_search_params_list = sm3.sum(sm3.sum(url_search_params + suffix))
# 2: 对后缀两次sm3之的结果
cus = sm3.sum(sm3.sum(suffix))
# 3: 对ua处理之后的结果
ua_key = chr(0) + chr(1) + chr(14) # [1/256, 1, 14]
ua = sm3.sum(result_encrypt(
rc4_encrypt(user_agent, ua_key),
"s3"
))
end_time = start_time + 100
# 构建配置对象
b = {
8: 3,
10: end_time,
15: {
"aid": 6383,
"pageId": 110624,
"boe": False,
"ddrt": 7,
"paths": {
"include": [{} for _ in range(7)],
"exclude": []
},
"track": {
"mode": 0,
"delay": 300,
"paths": []
},
"dump": True,
"rpU": "hwj"
},
16: start_time,
18: 44,
19: [1, 0, 1, 5],
}
def split_to_bytes(num: int) -> list[int]:
return [
(num >> 24) & 255,
(num >> 16) & 255,
(num >> 8) & 255,
num & 255
]
# 处理时间戳
start_time_bytes = split_to_bytes(b[16])
b[20] = start_time_bytes[0]
b[21] = start_time_bytes[1]
b[22] = start_time_bytes[2]
b[23] = start_time_bytes[3]
b[24] = int(b[16] / 256 / 256 / 256 / 256) & 255
b[25] = int(b[16] / 256 / 256 / 256 / 256 / 256) & 255
# 处理Arguments参数
arg0_bytes = split_to_bytes(arguments[0])
b[26] = arg0_bytes[0]
b[27] = arg0_bytes[1]
b[28] = arg0_bytes[2]
b[29] = arg0_bytes[3]
b[30] = int(arguments[1] / 256) & 255
b[31] = (arguments[1] % 256) & 255
arg1_bytes = split_to_bytes(arguments[1])
b[32] = arg1_bytes[0]
b[33] = arg1_bytes[1]
arg2_bytes = split_to_bytes(arguments[2])
b[34] = arg2_bytes[0]
b[35] = arg2_bytes[1]
b[36] = arg2_bytes[2]
b[37] = arg2_bytes[3]
# 处理加密结果
b[38] = url_search_params_list[21]
b[39] = url_search_params_list[22]
b[40] = cus[21]
b[41] = cus[22]
b[42] = ua[23]
b[43] = ua[24]
# 处理结束时间
end_time_bytes = split_to_bytes(b[10])
b[44] = end_time_bytes[0]
b[45] = end_time_bytes[1]
b[46] = end_time_bytes[2]
b[47] = end_time_bytes[3]
b[48] = b[8]
b[49] = int(b[10] / 256 / 256 / 256 / 256) & 255
b[50] = int(b[10] / 256 / 256 / 256 / 256 / 256) & 255
# 处理配置项
b[51] = b[15]['pageId']
page_id_bytes = split_to_bytes(b[15]['pageId'])
b[52] = page_id_bytes[0]
b[53] = page_id_bytes[1]
b[54] = page_id_bytes[2]
b[55] = page_id_bytes[3]
b[56] = b[15]['aid']
b[57] = b[15]['aid'] & 255
b[58] = (b[15]['aid'] >> 8) & 255
b[59] = (b[15]['aid'] >> 16) & 255
b[60] = (b[15]['aid'] >> 24) & 255
# 处理环境信息
window_env_list = [ord(char) for char in window_env_str]
b[64] = len(window_env_list)
b[65] = b[64] & 255
b[66] = (b[64] >> 8) & 255
b[69] = 0
b[70] = 0
b[71] = 0
# 计算校验和
b[72] = b[18] ^ b[20] ^ b[26] ^ b[30] ^ b[38] ^ b[40] ^ b[42] ^ b[21] ^ b[27] ^ b[31] ^ \
b[35] ^ b[39] ^ b[41] ^ b[43] ^ b[22] ^ b[28] ^ b[32] ^ b[36] ^ b[23] ^ b[29] ^ \
b[33] ^ b[37] ^ b[44] ^ b[45] ^ b[46] ^ b[47] ^ b[48] ^ b[49] ^ b[50] ^ b[24] ^ \
b[25] ^ b[52] ^ b[53] ^ b[54] ^ b[55] ^ b[57] ^ b[58] ^ b[59] ^ b[60] ^ b[65] ^ \
b[66] ^ b[70] ^ b[71]
# 构建最终字节数组
bb = [
b[18], b[20], b[52], b[26], b[30], b[34], b[58], b[38], b[40], b[53], b[42], b[21],
b[27], b[54], b[55], b[31], b[35], b[57], b[39], b[41], b[43], b[22], b[28], b[32],
b[60], b[36], b[23], b[29], b[33], b[37], b[44], b[45], b[59], b[46], b[47], b[48],
b[49], b[50], b[24], b[25], b[65], b[66], b[70], b[71]
]
bb.extend(window_env_list)
bb.append(b[72])
return rc4_encrypt(
''.join(chr(byte) for byte in bb),
chr(121)
)
def ab_sign(url_search_params: str, user_agent: str) -> str:
window_env_str = "1920|1080|1920|1040|0|30|0|0|1872|92|1920|1040|1857|92|1|24|Win32"
# 1. 生成随机字符串前缀
# 2. 生成RC4加密的主体部分
# 3. 对结果进行最终加密并添加等号后缀
return result_encrypt(
generate_random_str() +
generate_rc4_bb_str(url_search_params, user_agent, window_env_str),
"s4"
) + "="

View File

View File

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
import httpx
from typing import Dict, Any
from .. import utils
OptionalStr = str | None
OptionalDict = Dict[str, Any] | None
async def async_req(
url: str,
proxy_addr: OptionalStr = None,
headers: OptionalDict = None,
data: dict | bytes | None = None,
json_data: dict | list | None = None,
timeout: int = 20,
redirect_url: bool = False,
return_cookies: bool = False,
include_cookies: bool = False,
abroad: bool = False,
content_conding: str = 'utf-8',
verify: bool = False,
http2: bool = True
) -> OptionalDict | OptionalStr | tuple:
if headers is None:
headers = {}
try:
proxy_addr = utils.handle_proxy_addr(proxy_addr)
if data or json_data:
async with httpx.AsyncClient(proxy=proxy_addr, timeout=timeout, verify=verify, http2=http2) as client:
response = await client.post(url, data=data, json=json_data, headers=headers)
else:
async with httpx.AsyncClient(proxy=proxy_addr, timeout=timeout, verify=verify, http2=http2) as client:
response = await client.get(url, headers=headers, follow_redirects=True)
if redirect_url:
return str(response.url)
elif return_cookies:
cookies_dict = {name: value for name, value in response.cookies.items()}
return (response.text, cookies_dict) if include_cookies else cookies_dict
else:
resp_str = response.text
except Exception as e:
resp_str = str(e)
return resp_str
async def get_response_status(url: str, proxy_addr: OptionalStr = None, headers: OptionalDict = None,
timeout: int = 10, abroad: bool = False, verify: bool = False, http2=False) -> bool:
try:
proxy_addr = utils.handle_proxy_addr(proxy_addr)
async with httpx.AsyncClient(proxy=proxy_addr, timeout=timeout, verify=verify) as client:
response = await client.head(url, headers=headers, follow_redirects=True)
return response.status_code == 200
except Exception as e:
print(e)
return False

View File

@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
import gzip
import urllib.parse
import urllib.error
import requests
import ssl
import json
import urllib.request
no_proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(no_proxy_handler)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
OptionalStr = str | None
OptionalDict = dict | None
def sync_req(
url: str,
proxy_addr: OptionalStr = None,
headers: OptionalDict = None,
data: dict | bytes | None = None,
json_data: dict | list | None = None,
timeout: int = 20,
redirect_url: bool = False,
abroad: bool = False,
content_conding: str = 'utf-8'
) -> str:
if headers is None:
headers = {}
try:
if proxy_addr:
proxies = {
'http': proxy_addr,
'https': proxy_addr
}
if data or json_data:
response = requests.post(
url, data=data, json=json_data, headers=headers, proxies=proxies, timeout=timeout
)
else:
response = requests.get(url, headers=headers, proxies=proxies, timeout=timeout)
if redirect_url:
return response.url
resp_str = response.text
else:
if data and not isinstance(data, bytes):
data = urllib.parse.urlencode(data).encode(content_conding)
if json_data and isinstance(json_data, (dict, list)):
data = json.dumps(json_data).encode(content_conding)
req = urllib.request.Request(url, data=data, headers=headers)
try:
if abroad:
response = urllib.request.urlopen(req, timeout=timeout)
else:
response = opener.open(req, timeout=timeout)
if redirect_url:
return response.url
content_encoding = response.info().get('Content-Encoding')
try:
if content_encoding == 'gzip':
with gzip.open(response, 'rt', encoding=content_conding) as gzipped:
resp_str = gzipped.read()
else:
resp_str = response.read().decode(content_conding)
finally:
response.close()
except urllib.error.HTTPError as e:
if e.code == 400:
resp_str = e.read().decode(content_conding)
else:
raise
except urllib.error.URLError as e:
print(f"URL Error: {e}")
raise
except Exception as e:
print(f"An error occurred: {e}")
raise
except Exception as e:
resp_str = str(e)
return resp_str

33
src/javascript/laixiu.js Normal file
View File

@ -0,0 +1,33 @@
function generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
const r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
function calculateSign() {
const a = new Date().getTime();
const s = generateUUID().replace(/-/g, "");
const u = 'kk792f28d6ff1f34ec702c08626d454b39pro';
const input = "web" + s + a + u;
const hash = CryptoJS.MD5(input).toString();
return {
timestamp: a,
imei: s,
requestId: hash,
inputString: input
};
}
function sign(cryptoJSPath) {
CryptoJS = require(cryptoJSPath);
return calculateSign();
}
module.exports = {
sign
};

143
src/javascript/migu.js Normal file
View File

@ -0,0 +1,143 @@
/**
* Function to get the ddCalcu parameter value
* @param {string} inputUrl - The original URL before encryption
* @returns {Promise<string>} - Returns the calculated ddCalcu value
*/
async function getDdCalcu(inputUrl) {
let wasmInstance = null;
let memory_p = null; // Uint8Array view
let memory_h = null; // Uint32Array view
// Fixed parameter
const f = 'PBTxuWiTEbUPPFcpyxs0ww==';
// Utility function: Convert string to UTF-8 in memory
function stringToUTF8(string, offset) {
const encoder = new TextEncoder();
const encoded = encoder.encode(string);
for (let i = 0; i < encoded.length; i++) {
memory_p[offset + i] = encoded[i];
}
memory_p[offset + encoded.length] = 0; // Null-terminate
}
// Utility function: Read UTF-8 string from memory address
function UTF8ToString(offset) {
let s = '';
let i = 0;
while (memory_p[offset + i]) {
s += String.fromCharCode(memory_p[offset + i]);
i++;
}
return s;
}
// WASM import function stubs
function a(e, t, r, n) {
let s = 0;
for (let i = 0; i < r; i++) {
const d = memory_h[t + 4 >> 2];
t += 8;
s += d;
}
memory_h[n >> 2] = s;
return 0;
}
function b() {}
function c() {}
// Step 1: Retrieve playerVersion
const settingsResp = await fetch('https://app-sc.miguvideo.com/common/v1/settings/H5_DetailPage');
const settingsData = await settingsResp.json();
const playerVersion = JSON.parse(settingsData.body.paramValue).playerVersion;
// Step 2: Load WASM module
const wasmUrl = `https://www.miguvideo.com/mgs/player/prd/${playerVersion}/dist/mgprtcl.wasm`;
const wasmResp = await fetch(wasmUrl);
if (!wasmResp.ok) throw new Error("Failed to download WASM");
const wasmBuffer = await wasmResp.arrayBuffer();
const importObject = {
a: { a, b, c }
};
const { instance } = await WebAssembly.instantiate(wasmBuffer, importObject);
wasmInstance = instance;
const memory = wasmInstance.exports.d;
memory_p = new Uint8Array(memory.buffer);
memory_h = new Uint32Array(memory.buffer);
const exports = {
CallInterface1: wasmInstance.exports.h,
CallInterface2: wasmInstance.exports.i,
CallInterface3: wasmInstance.exports.j,
CallInterface4: wasmInstance.exports.k,
CallInterface6: wasmInstance.exports.m,
CallInterface7: wasmInstance.exports.n,
CallInterface8: wasmInstance.exports.o,
CallInterface9: wasmInstance.exports.p,
CallInterface10: wasmInstance.exports.q,
CallInterface11: wasmInstance.exports.r,
CallInterface14: wasmInstance.exports.t,
malloc: wasmInstance.exports.u,
};
const parsedUrl = new URL(inputUrl);
const query = Object.fromEntries(parsedUrl.searchParams);
const o = query.userid || '';
const a_val = query.timestamp || '';
const s = query.ProgramID || '';
const u = query.Channel_ID || '';
const v = query.puData || '';
// Allocate memory
const d = exports.malloc(o.length + 1);
const h = exports.malloc(a_val.length + 1);
const y = exports.malloc(s.length + 1);
const m = exports.malloc(u.length + 1);
const g = exports.malloc(v.length + 1);
const b_val = exports.malloc(f.length + 1);
const E = exports.malloc(128);
const T = exports.malloc(128);
// Write data to memory
stringToUTF8(o, d);
stringToUTF8(a_val, h);
stringToUTF8(s, y);
stringToUTF8(u, m);
stringToUTF8(v, g);
stringToUTF8(f, b_val);
// Call interface functions
const S = exports.CallInterface6(); // Create context
exports.CallInterface1(S, y, s.length);
exports.CallInterface10(S, h, a_val.length);
exports.CallInterface9(S, d, o.length);
exports.CallInterface3(S, 0, 0);
exports.CallInterface11(S, 0, 0);
exports.CallInterface8(S, g, v.length);
exports.CallInterface2(S, m, u.length);
exports.CallInterface14(S, b_val, f.length, T, 128);
const w = UTF8ToString(T);
const I = exports.malloc(w.length + 1);
stringToUTF8(w, I);
exports.CallInterface7(S, I, w.length);
exports.CallInterface4(S, E, 128);
return UTF8ToString(E);
}
const url = process.argv[2];
getDdCalcu(url).then(result => {
console.log(result);
}).catch(err => {
console.error(err);
process.exit(1);
});

View File

@ -2,15 +2,24 @@
import os
import sys
from loguru import logger
logger.remove()
custom_format = "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> - <level>{message}</level>"
os.environ["LOGURU_FORMAT"] = custom_format
from loguru import logger
logger.add(
sink=sys.stderr,
format=custom_format,
level="DEBUG",
colorize=True,
enqueue=True
)
script_path = os.path.split(os.path.realpath(sys.argv[0]))[0]
logger.add(
f"{script_path}/logs/DouyinLiveRecorder.log",
f"{script_path}/logs/streamget.log",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}",
filter=lambda i: i["level"].name != "INFO",

View File

@ -4,21 +4,24 @@
Author: Hmily
GitHub:https://github.com/ihmily
Date: 2023-07-17 23:52:05
Update: 2025-01-27 22:08:00
Update: 2025-02-04 04:57:00
Copyright (c) 2023 by Hmily, All Rights Reserved.
"""
import json
import re
import urllib.parse
import execjs
import httpx
import urllib.request
from . import JS_SCRIPT_PATH
from .utils import handle_proxy_addr
from . import JS_SCRIPT_PATH, utils
no_proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(no_proxy_handler)
class UnsupportedUrlError(Exception):
pass
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; SAMSUNG SM-G973U) AppleWebKit/537.36 (KHTML, like Gecko) '
'SamsungBrowser/14.2 Chrome/87.0.4280.141 Mobile Safari/537.36',
@ -27,34 +30,33 @@ HEADERS = {
}
HEADERS_PC = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.5845.97 '
'Safari/537.36 Core/1.116.438.400 QQBrowser/13.0.6070.400',
'Cookie': 'sessionid=7494ae59ae06784454373ce25761e864; __ac_nonce=0670497840077ee4c9eb2; '
'__ac_signature=_02B4Z6wo00f012DZczQAAIDCJJBb3EjnINdg-XeAAL8-db; '
's_v_web_id=verify_m1ztgtjj_vuHnMLZD_iwZ9_4YO4_BdN1_7wLP3pyqXsf2; ',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0',
'Cookie': 'sessionid=7494ae59ae06784454373ce25761e864; __ac_nonce=0670497840077ee4c9eb2; '
'__ac_signature=_02B4Z6wo00f012DZczQAAIDCJJBb3EjnINdg-XeAAL8-db; '
's_v_web_id=verify_m1ztgtjj_vuHnMLZD_iwZ9_4YO4_BdN1_7wLP3pyqXsf2; '
}
# X-bogus算法
def get_xbogus(url: str, headers: dict | None = None) -> str:
async def get_xbogus(url: str, headers: dict | None = None) -> str:
if not headers or 'user-agent' not in (k.lower() for k in headers):
headers = HEADERS
query = urllib.parse.urlparse(url).query
xbogus = execjs.compile(open(f'{JS_SCRIPT_PATH}/x-bogus.js').read()).call('sign', query, headers.get("User-Agent", "user-agent"))
xbogus = execjs.compile(open(f'{JS_SCRIPT_PATH}/x-bogus.js').read()).call(
'sign', query, headers.get("User-Agent", "user-agent"))
return xbogus
# 获取房间ID和用户secID
async def get_sec_user_id(url: str, proxy_addr: str | None = None, headers: dict | None = None) -> tuple | None:
# 如果没有提供headers或者headers中不包含user-agent和cookie则使用默认headers
if not headers or all(k.lower() not in ['user-agent', 'cookie'] for k in headers):
headers = HEADERS
try:
proxy_addr = handle_proxy_addr(proxy_addr)
proxy_addr = utils.handle_proxy_addr(proxy_addr)
async with httpx.AsyncClient(proxy=proxy_addr, timeout=15) as client:
response = await client.get(url, headers=headers, follow_redirects=True)
redirect_url = response.url
if 'reflow/' in str(redirect_url):
match = re.search(r'sec_user_id=([\w_\-]+)&', str(redirect_url))
@ -63,53 +65,52 @@ async def get_sec_user_id(url: str, proxy_addr: str | None = None, headers: dict
room_id = str(redirect_url).split('?')[0].rsplit('/', maxsplit=1)[1]
return room_id, sec_user_id
else:
print("Could not find sec_user_id in the URL.")
raise RuntimeError("Could not find sec_user_id in the URL.")
else:
print("The redirect URL does not contain 'reflow/'.")
raise UnsupportedUrlError("The redirect URL does not contain 'reflow/'.")
except UnsupportedUrlError as e:
raise e
except Exception as e:
print(f"An error occurred: {e}")
return None
raise RuntimeError(f"An error occurred: {e}")
# 获取抖音号
async def get_unique_id(url: str, proxy_addr: str | None = None, headers: dict | None = None) -> str | None:
# 如果没有提供headers或者headers中不包含user-agent和cookie则使用默认headers
if not headers or all(k.lower() not in ['user-agent', 'cookie'] for k in headers):
headers = HEADERS_PC
headers = HEADERS
try:
proxy_addr = handle_proxy_addr(proxy_addr)
proxy_addr = utils.handle_proxy_addr(proxy_addr)
async with httpx.AsyncClient(proxy=proxy_addr, timeout=15) as client:
# 第一次请求获取重定向后的URL以提取sec_user_id
response = await client.get(url, headers=headers, follow_redirects=True)
redirect_url = str(response.url)
if 'reflow/' in str(redirect_url):
raise UnsupportedUrlError("Unsupported URL")
sec_user_id = redirect_url.split('?')[0].rsplit('/', maxsplit=1)[1]
# 第二次请求获取用户页面内容来提取unique_id
user_page_response = await client.get(f'https://www.douyin.com/user/{sec_user_id}', headers=headers)
# 使用正则表达式查找unique_id
matches = re.findall(r'undefined\\"},\\"uniqueId\\":\\"(.*?)\\",\\"customVerify', user_page_response.text)
headers['Cookie'] = ('ttwid=1%7C4ejCkU2bKY76IySQENJwvGhg1IQZrgGEupSyTKKfuyk%7C1740470403%7Cbc9a'
'd2ee341f1a162f9e27f4641778030d1ae91e31f9df6553a8f2efa3bdb7b4; __ac_nonce=06'
'83e59f3009cc48fbab0; __ac_signature=_02B4Z6wo00f01mG6waQAAIDB9JUCzFb6.TZhmsU'
'AAPBf34; __ac_referer=__ac_blank')
user_page_response = await client.get(f'https://www.iesdouyin.com/share/user/{sec_user_id}',
headers=headers, follow_redirects=True)
matches = re.findall(r'unique_id":"(.*?)","verification_type', user_page_response.text)
if matches:
unique_id = matches[-1]
return unique_id
else:
print("Could not find unique_id in the response.")
return None
raise RuntimeError("Could not find unique_id in the response.")
except UnsupportedUrlError as e:
raise e
except Exception as e:
print(f"An error occurred: {e}")
return None
raise RuntimeError(f"An error occurred: {e}")
# 获取直播间webID
async def get_live_room_id(room_id: str, sec_user_id: str, proxy_addr: str | None = None,
params: dict | None = None, headers: dict | None = None) -> str:
# 如果没有提供headers或者headers中不包含user-agent和cookie则使用默认headers
async def get_live_room_id(room_id: str, sec_user_id: str, proxy_addr: str | None = None, params: dict | None = None,
headers: dict | None = None) -> str:
if not headers or all(k.lower() not in ['user-agent', 'cookie'] for k in headers):
headers = HEADERS
# 设置默认参数
if not params:
params = {
"verifyFp": "verify_lk07kv74_QZYCUApD_xhiB_405x_Ax51_GYO9bUIyZQVf",
@ -122,21 +123,18 @@ async def get_live_room_id(room_id: str, sec_user_id: str, proxy_addr: str | Non
"-me3Yudck2ailla5Q4osnYIHxd9dI4WtQ==",
}
# 构建API URL并添加X-Bogus签名
api = f'https://webcast.amemv.com/webcast/room/reflow/info/?{urllib.parse.urlencode(params)}'
xbogus = get_xbogus(api)
xbogus = await get_xbogus(api)
api = api + "&X-Bogus=" + xbogus
try:
proxy_addr = handle_proxy_addr(proxy_addr)
proxy_addr = utils.handle_proxy_addr(proxy_addr)
async with httpx.AsyncClient(proxy=proxy_addr,
timeout=15) as client:
response = await client.get(api, headers=headers)
response.raise_for_status() # 检查HTTP响应状态码是否表示成功
response.raise_for_status()
json_data = response.json()
web_rid = json_data['data']['room']['owner']['web_rid']
return web_rid
return json_data['data']['room']['owner']['web_rid']
except httpx.HTTPStatusError as e:
print(f"HTTP status error occurred: {e.response.status_code}")
raise
@ -149,4 +147,4 @@ if __name__ == '__main__':
room_url = "https://v.douyin.com/iQLgKSj/"
_room_id, sec_uid = get_sec_user_id(room_url)
web_rid = get_live_room_id(_room_id, sec_uid)
print("return web_rid:", web_rid)
print("return web_rid:", web_rid)

File diff suppressed because it is too large Load Diff

View File

@ -4,8 +4,8 @@
Author: Hmily
GitHub: https://github.com/ihmily
Date: 2023-07-15 23:15:00
Update: 2024-10-27 17:15:00
Copyright (c) 2023-2024 by Hmily, All Rights Reserved.
Update: 2025-02-06 02:28:00
Copyright (c) 2023-2025 by Hmily, All Rights Reserved.
Function: Get live stream data.
"""
import base64
@ -21,10 +21,24 @@ from .utils import trace_error_decorator
from .spider import (
get_douyu_stream_data, get_bilibili_stream_data
)
from .http_clients.async_http import get_response_status
QUALITY_MAPPING = {"OD": 0, "BD": 0, "UHD": 1, "HD": 2, "SD": 3, "LD": 4}
def get_quality_index(quality) -> tuple:
if not quality:
return list(QUALITY_MAPPING.items())[0]
quality_str = str(quality).upper()
if quality_str.isdigit():
quality_int = int(quality_str[0])
quality_str = list(QUALITY_MAPPING.keys())[quality_int]
return quality_str, QUALITY_MAPPING.get(quality_str, 0)
@trace_error_decorator
def get_douyin_stream_url(json_data: dict, video_quality: str) -> dict:
async def get_douyin_stream_url(json_data: dict, video_quality: str, proxy_addr: str) -> dict:
anchor_name = json_data.get('anchor_name')
result = {
@ -32,7 +46,7 @@ def get_douyin_stream_url(json_data: dict, video_quality: str) -> dict:
"is_live": False,
}
status = json_data.get("status", 4) # 直播状态 2 是正在直播、4 是未开播
status = json_data.get("status", 4)
if status == 2:
stream_url = json_data['stream_url']
@ -45,13 +59,18 @@ def get_douyin_stream_url(json_data: dict, video_quality: str) -> dict:
flv_url_list.append(flv_url_list[-1])
m3u8_url_list.append(m3u8_url_list[-1])
video_qualities = {"原画": 0, "蓝光": 0, "超清": 1, "高清": 2, "标清": 3, "流畅": 4}
quality_index = video_qualities.get(video_quality)
video_quality, quality_index = get_quality_index(video_quality)
m3u8_url = m3u8_url_list[quality_index]
flv_url = flv_url_list[quality_index]
ok = await get_response_status(url=m3u8_url, proxy_addr=proxy_addr)
if not ok:
index = quality_index + 1 if quality_index < 4 else quality_index - 1
m3u8_url = m3u8_url_list[index]
flv_url = flv_url_list[index]
result |= {
'is_live': True,
'title': json_data['title'],
'quality': video_quality,
'm3u8_url': m3u8_url,
'flv_url': flv_url,
'record_url': m3u8_url or flv_url,
@ -60,7 +79,7 @@ def get_douyin_stream_url(json_data: dict, video_quality: str) -> dict:
@trace_error_decorator
def get_tiktok_stream_url(json_data: dict, video_quality: str) -> dict:
async def get_tiktok_stream_url(json_data: dict, video_quality: str, proxy_addr: str) -> dict:
if not json_data:
return {"anchor_name": None, "is_live": False}
@ -68,10 +87,18 @@ def get_tiktok_stream_url(json_data: dict, video_quality: str) -> dict:
play_list = []
for key in stream:
url_info = stream[key]['main']
play_url = url_info[q_key]
sdk_params = url_info['sdk_params']
sdk_params = json.loads(sdk_params)
vbitrate = int(sdk_params['vbitrate'])
v_codec = sdk_params.get('VCodec', '')
play_url = ''
if url_info.get(q_key):
if url_info[q_key].endswith(".flv") or url_info[q_key].endswith(".m3u8"):
play_url = url_info[q_key] + '?codec=' + v_codec
else:
play_url = url_info[q_key] + '&codec=' + v_codec
resolution = sdk_params['resolution']
if vbitrate != 0 and resolution:
width, height = map(int, resolution.split('x'))
@ -101,13 +128,24 @@ def get_tiktok_stream_url(json_data: dict, video_quality: str) -> dict:
flv_url_list.append(flv_url_list[-1])
while len(m3u8_url_list) < 5:
m3u8_url_list.append(m3u8_url_list[-1])
video_qualities = {"原画": 0, "蓝光": 0, "超清": 1, "高清": 2, "标清": 3, '流畅': 4}
quality_index = video_qualities.get(video_quality)
flv_url = flv_url_list[quality_index]['url'].replace("https://", "http://")
m3u8_url = m3u8_url_list[quality_index]['url'].replace("https://", "http://")
video_quality, quality_index = get_quality_index(video_quality)
flv_dict: dict = flv_url_list[quality_index]
m3u8_dict: dict = m3u8_url_list[quality_index]
check_url = m3u8_dict.get('url') or flv_dict.get('url')
ok = await get_response_status(url=check_url, proxy_addr=proxy_addr, http2=False)
if not ok:
index = quality_index + 1 if quality_index < 4 else quality_index - 1
flv_dict: dict = flv_url_list[index]
m3u8_dict: dict = m3u8_url_list[index]
flv_url = flv_dict['url']
m3u8_url = m3u8_dict['url']
result |= {
'is_live': True,
'title': live_room['liveRoom']['title'],
'quality': video_quality,
'm3u8_url': m3u8_url,
'flv_url': flv_url,
'record_url': m3u8_url or flv_url,
@ -116,7 +154,7 @@ def get_tiktok_stream_url(json_data: dict, video_quality: str) -> dict:
@trace_error_decorator
def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict:
async def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict:
if json_data['type'] == 1 and not json_data["is_live"]:
return json_data
live_status = json_data['is_live']
@ -128,11 +166,10 @@ def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict:
}
if live_status:
quality_mapping = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4}
quality_mapping_bitrate = {'原画': 99999, '蓝光': 4000, '超清': 2000, '高清': 1000, '标清': 800, '流畅': 600}
if video_quality in quality_mapping:
quality_mapping_bit = {'OD': 99999, 'BD': 4000, 'UHD': 2000, 'HD': 1000, 'SD': 800, 'LD': 600}
if video_quality in QUALITY_MAPPING:
quality_index = quality_mapping[video_quality]
quality, quality_index = get_quality_index(video_quality)
if 'm3u8_url_list' in json_data:
m3u8_url_list = json_data['m3u8_url_list'][::-1]
while len(m3u8_url_list) < 5:
@ -141,35 +178,36 @@ def get_kuaishou_stream_url(json_data: dict, video_quality: str) -> dict:
result['m3u8_url'] = m3u8_url
if 'flv_url_list' in json_data:
# checks if bitrate in flv_url_list
if 'bitrate' in json_data['flv_url_list'][0]:
flv_url_list = json_data['flv_url_list']
flv_url_list = sorted(flv_url_list, key=lambda x: x['bitrate'], reverse=True)
# uses quality_mapping_bitrate to get the index of the quality
quality_index_bitrate_value = quality_mapping_bitrate[video_quality]
# find the value below `quality_index_bitrate_value`, or else use the previous one.
quality_str = str(video_quality).upper()
if quality_str.isdigit():
video_quality, quality_index_bitrate_value = list(quality_mapping_bit.items())[int(quality_str)]
else:
quality_index_bitrate_value = quality_mapping_bit.get(quality_str, 99999)
video_quality = quality_str
quality_index = next(
(i for i, x in enumerate(flv_url_list) if x['bitrate'] <= quality_index_bitrate_value), None)
if quality_index is None:
# latest quality
quality_index = len(flv_url_list) - 1
flv_url = flv_url_list[quality_index]['url']
result['flv_url'] = flv_url
result['record_url'] = flv_url
else:
# TODO: Old version which not working at 20241128, could be removed if not working confirmed,
# please also clean the quality_mapping mapping
flv_url_list = json_data['flv_url_list'][::-1]
while len(flv_url_list) < 5:
flv_url_list.append(flv_url_list[-1])
flv_url = flv_url_list[quality_index]['url']
result |= {'flv_url': flv_url, 'record_url': flv_url}
result['is_live'] = True
result['quality'] = video_quality
return result
@trace_error_decorator
def get_huya_stream_url(json_data: dict, video_quality: str) -> dict:
async def get_huya_stream_url(json_data: dict, video_quality: str) -> dict:
game_live_info = json_data['data'][0]['gameLiveInfo']
live_title = game_live_info['introduction']
stream_info_list = json_data['data'][0]['gameStreamInfoList']
@ -230,17 +268,17 @@ def get_huya_stream_url(json_data: dict, video_quality: str) -> dict:
m3u8_url = f'{hls_url}/{stream_name}.{hls_url_suffix}?{new_anti_code}&ratio='
quality_list = flv_anti_code.split('&exsphd=')
if len(quality_list) > 1 and video_quality not in ["原画", "蓝光"]:
if len(quality_list) > 1 and video_quality not in ["OD", "BD"]:
pattern = r"(?<=264_)\d+"
quality_list = list(re.findall(pattern, quality_list[1]))[::-1]
while len(quality_list) < 5:
quality_list.append(quality_list[-1])
video_quality_options = {
"超清": quality_list[0],
"高清": quality_list[1],
"标清": quality_list[2],
"流畅": quality_list[3]
"UHD": quality_list[0],
"HD": quality_list[1],
"SD": quality_list[2],
"LD": quality_list[3]
}
if video_quality not in video_quality_options:
@ -253,6 +291,7 @@ def get_huya_stream_url(json_data: dict, video_quality: str) -> dict:
result |= {
'is_live': True,
'title': live_title,
'quality': video_quality,
'm3u8_url': m3u8_url,
'flv_url': flv_url,
'record_url': flv_url or m3u8_url
@ -266,12 +305,12 @@ async def get_douyu_stream_url(json_data: dict, video_quality: str, cookies: str
return json_data
video_quality_options = {
"原画": '0',
"蓝光": '0',
"超清": '3',
"高清": '2',
"标清": '1',
"流畅": '1'
"OD": '0',
"BD": '0',
"UHD": '3',
"HD": '2',
"SD": '1',
"LD": '1'
}
rid = str(json_data["room_id"])
@ -282,12 +321,12 @@ async def get_douyu_stream_url(json_data: dict, video_quality: str, cookies: str
rtmp_live = flv_data['data'].get('rtmp_live')
if rtmp_live:
flv_url = f'{rtmp_url}/{rtmp_live}'
json_data |= {'flv_url': flv_url, 'record_url': flv_url}
json_data |= {'quality': video_quality, 'flv_url': flv_url, 'record_url': flv_url}
return json_data
@trace_error_decorator
def get_yy_stream_url(json_data: dict) -> dict:
async def get_yy_stream_url(json_data: dict) -> dict:
anchor_name = json_data.get('anchor_name', '')
result = {
"anchor_name": anchor_name,
@ -300,6 +339,7 @@ def get_yy_stream_url(json_data: dict) -> dict:
result |= {
'is_live': True,
'title': json_data['title'],
'quality': 'OD',
'flv_url': flv_url,
'record_url': flv_url
}
@ -318,12 +358,12 @@ async def get_bilibili_stream_url(json_data: dict, video_quality: str, proxy_add
room_url = json_data['room_url']
video_quality_options = {
"原画": '10000',
"蓝光": '400',
"超清": '250',
"高清": '150',
"标清": '80',
"流畅": '80'
"OD": '10000',
"BD": '400',
"UHD": '250',
"HD": '150',
"SD": '80',
"LD": '80'
}
select_quality = video_quality_options[video_quality]
@ -333,44 +373,51 @@ async def get_bilibili_stream_url(json_data: dict, video_quality: str, proxy_add
'anchor_name': json_data['anchor_name'],
'is_live': True,
'title': json_data['title'],
'quality': video_quality,
'record_url': play_url
}
@trace_error_decorator
def get_netease_stream_url(json_data: dict, video_quality: str) -> dict:
async def get_netease_stream_url(json_data: dict, video_quality: str) -> dict:
if not json_data['is_live']:
return json_data
stream_list = json_data['stream_list']['resolution']
order = ['blueray', 'ultra', 'high', 'standard']
sorted_keys = [key for key in order if key in stream_list]
while len(sorted_keys) < 5:
sorted_keys.append(sorted_keys[-1])
quality_list = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4}
selected_quality = sorted_keys[quality_list[video_quality]]
flv_url_list = stream_list[selected_quality]['cdn']
selected_cdn = list(flv_url_list.keys())[0]
flv_url = flv_url_list[selected_cdn]
m3u8_url = json_data['m3u8_url']
flv_url = None
if json_data.get('stream_list'):
stream_list = json_data['stream_list']['resolution']
order = ['blueray', 'ultra', 'high', 'standard']
sorted_keys = [key for key in order if key in stream_list]
while len(sorted_keys) < 5:
sorted_keys.append(sorted_keys[-1])
video_quality, quality_index = get_quality_index(video_quality)
selected_quality = sorted_keys[quality_index]
flv_url_list = stream_list[selected_quality]['cdn']
selected_cdn = list(flv_url_list.keys())[0]
flv_url = flv_url_list[selected_cdn]
return {
"is_live": True,
"anchor_name": json_data['anchor_name'],
"title": json_data['title'],
'quality': video_quality,
"m3u8_url": m3u8_url,
"flv_url": flv_url,
"record_url": flv_url
"record_url": flv_url or m3u8_url
}
def get_stream_url(json_data: dict, video_quality: str, url_type: str = 'm3u8', spec: bool = False,
hls_extra_key: str | int = None, flv_extra_key: str | int = None) -> dict:
async def get_stream_url(json_data: dict, video_quality: str, url_type: str = 'm3u8', spec: bool = False,
hls_extra_key: str | int = None, flv_extra_key: str | int = None) -> dict:
if not json_data['is_live']:
return json_data
play_url_list = json_data['play_url_list']
quality_list = {'原画': 0, '蓝光': 0, '超清': 1, '高清': 2, '标清': 3, '流畅': 4}
while len(play_url_list) < 5:
play_url_list.append(play_url_list[-1])
selected_quality = quality_list[video_quality]
video_quality, selected_quality = get_quality_index(video_quality)
data = {
"anchor_name": json_data['anchor_name'],
"is_live": True
@ -395,4 +442,5 @@ def get_stream_url(json_data: dict, video_quality: str, url_type: str = 'm3u8',
flv_url = get_url(flv_extra_key)
data |= {"flv_url": flv_url, "record_url": flv_url}
data['title'] = json_data.get('title')
return data
data['quality'] = video_quality
return data

View File

@ -1,18 +1,24 @@
# -*- coding: utf-8 -*-
import json
import os
import random
import shutil
import string
from pathlib import Path
import functools
import hashlib
import re
import traceback
from typing import Any
from urllib.parse import parse_qs, urlparse
from collections import OrderedDict
import execjs
from .logger import logger
import configparser
OptionalStr = str | None
OptionalDict = dict | None
class Color:
RED = "\033[31m"
@ -160,3 +166,41 @@ def handle_proxy_addr(proxy_addr):
else:
proxy_addr = None
return proxy_addr
def generate_random_string(length: int) -> str:
characters = string.ascii_uppercase + string.digits
random_string = ''.join(random.choices(characters, k=length))
return random_string
def jsonp_to_json(jsonp_str: str) -> OptionalDict:
pattern = r'(\w+)\((.*)\);?$'
match = re.search(pattern, jsonp_str)
if match:
_, json_str = match.groups()
json_obj = json.loads(json_str)
return json_obj
else:
raise Exception("No JSON data found in JSONP response.")
def replace_url(file_path: str | Path, old: str, new: str) -> None:
with open(file_path, 'r', encoding='utf-8-sig') as f:
content = f.read()
if old in content:
with open(file_path, 'w', encoding='utf-8-sig') as f:
f.write(content.replace(old, new))
def get_query_params(url: str, param_name: OptionalStr) -> dict | list[str]:
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
if param_name is None:
return query_params
else:
values = query_params.get(param_name, [])
return values