发布 1.7 版本

This commit is contained in:
JoeamAmier 2023-12-16 11:24:42 +08:00
parent 84a0889c8f
commit e0ba7af1f8
15 changed files with 487 additions and 346 deletions

View File

@ -6,7 +6,7 @@
<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/JoeanAmier/XHS-Downloader?style=for-the-badge&color=fff200">
<img alt="GitHub all releases" src="https://img.shields.io/github/downloads/JoeanAmier/XHS-Downloader/total?style=for-the-badge&color=1b9cfc">
<img alt="GitHub release (with filter)" src="https://img.shields.io/github/v/release/JoeanAmier/XHS-Downloader?style=for-the-badge&color=44bd32">
<hr>
<p>🔥 <b>小红书作品采集工具</b>:采集小红书作品信息;提取小红书作品下载地址;下载小红书无水印作品文件!</p>
</div>
<h1>📑 功能清单</h1>
<ul>
@ -16,14 +16,14 @@
<li>✅ 自动跳过已下载的作品文件</li>
<li>✅ 作品文件完整性处理机制</li>
<li>✅ 持久化储存作品信息至文件</li>
<li>✅ 作品文件储存至单独文件夹</li>
<li>☑️ 后台监听剪贴板下载作品</li>
<li>☑️ 支持 API 调用功能</li>
</ul>
<h1>📸 程序截图</h1>
<br>
<img src="static/程序运行截图1.png" alt="">
<hr>
<img src="static/程序运行截图2.png" alt="">
<p><b>🎥 点击图片观看演示视频</b></p>
<a href="https://www.bilibili.com/video/BV1nQ4y137it/"><img src="static/程序运行截图.png" alt=""></a>
<h1>🔗 支持链接</h1>
<ul>
<li><code>https://www.xiaohongshu.com/explore/作品ID</code></li>
@ -35,18 +35,19 @@
<h1>🪟 关于终端</h1>
<p>⭐ 推荐使用 <a href="https://learn.microsoft.com/zh-cn/windows/terminal/install">Windows 终端</a> Windows 11 自带默认终端)运行程序以便获得最佳显示效果!</p>
<h1>🥣 使用方法</h1>
<p>如果仅需下载作品文件,选择 <b>直接运行</b> 或者 <b>源码运行</b> 均可,如果需要获取作品信息,则需要进行二次开发进行调用。</p>
<h2>🖱 直接运行</h2>
<p>前往 <a href="https://github.com/JoeanAmier/XHS-Downloader/releases/latest">Releases</a> 下载程序压缩包,解压后打开程序文件夹,双击运行 <code>main.exe</code> 即可使用。</p>
<p>如果仅需下载无水印作品文件,建议选择 <b>程序运行</b>;如果有其他需求,建议选择 <b>源码运行</b></p>
<h2>🖱 程序运行</h2>
<p>Windows 10 及以上用户可前往 <a href="https://github.com/JoeanAmier/XHS-Downloader/releases/latest">Releases</a> 下载程序压缩包,解压后打开程序文件夹,双击运行 <code>main.exe</code> 即可使用。</p>
<p>若通过此方式使用程序,文件默认下载路径:<code>.\_internal\Download</code>;配置文件路径:<code>.\_internal\settings.json</code></p>
<h2>⌨️ 源码运行</h2>
<ol>
<li>安装版本号不低于 <code>3.12</code> 的 Python 解释器</li>
<li>运行 <code>pip install -r requirements.txt</code> 命令安装程序所需模块</li>
<li>运行 <code>pip install -i https://pypi.tuna.tsinghua.edu.cn/simple -r requirements.txt</code> 命令安装程序所需模块</li>
<li>下载本项目最新的源码或 <a href="https://github.com/JoeanAmier/XHS-Downloader/releases/latest">Releases</a> 发布的源码至本地</li>
<li>运行 <code>main.py</code> 即可使用</li>
</ol>
<h2>💻 二次开发</h2>
<p>如果需要获取小红书图文/视频作品信息,可以根据 <code>main.py</code> 的注释提示进行代码调用</p>
<h1>💻 二次开发</h1>
<p>如果有其他需求,可以根据 <code>main.py</code> 的注释提示进行代码调用或修改!</p>
<pre>
# 测试链接
error_demo = "https://github.com/JoeanAmier/XHS_Downloader"
@ -58,12 +59,15 @@ path = "" # 作品数据/文件保存根路径,默认值:项目根路径
folder_name = "Download" # 作品文件储存文件夹名称自动创建默认值Download
user_agent = "" # 请求头 User-Agent
cookie = "" # 小红书网页版 Cookie无需登录
proxy = "" # 网络代理
timeout = 5 # 网络请求超时限制单位默认值10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位:字节
proxy = None # 网络代理
timeout = 5 # 请求数据超时限制单位默认值10
chunk = 1024 * 1024 * 10 # 下载文件时,每次从服务器获取的数据块大小,单位:字节
max_retry = 2 # 请求数据失败时重试的最大次数单位默认值5
# async with XHS() as xhs:
# pass # 使用默认参数
record_data = False # 是否记录作品数据至文件
image_format = "jpg" # 图文作品文件名称后缀
folder_mode = False # 是否将每个作品的文件储存至单独的文件夹
async with XHS() as xhs:
pass # 使用默认参数
async with XHS(path=path,
folder_name=folder_name,
user_agent=user_agent,
@ -71,7 +75,11 @@ async with XHS(path=path,
proxy=proxy,
timeout=timeout,
chunk=chunk,
max_retry=max_retry, ) as xhs: # 使用自定义参数
max_retry=max_retry,
record_data=record_data,
image_format=image_format,
folder_mode=folder_mode,
) as xhs: # 使用自定义参数
download = True # 是否下载作品文件默认值False
# 返回作品详细信息,包括下载地址
print(await xhs.extract(error_demo, download)) # 获取数据失败时返回空字典
@ -81,6 +89,7 @@ async with XHS(path=path,
</pre>
<h1>⚙️ 配置文件</h1>
<p>项目根目录下的 <code>settings.json</code> 文件,首次运行自动生成,可以自定义部分运行参数。</p>
<p>如果您的计算机没有合适的程序编辑 JSON 文件,建议使用 <a href="https://try8.cn/tool/format/json">JSON 在线工具</a> 编辑配置文件内容</p>
<table>
<thead>
<tr>
@ -112,14 +121,14 @@ async with XHS(path=path,
<tr>
<td align="center">cookie</td>
<td align="center">str</td>
<td align="center">小红书网页版 Cookie无需登录</td>
<td align="center">小红书网页版 Cookie<b>无需登录</b></td>
<td align="center">默认 Cookie</td>
</tr>
<tr>
<td align="center">proxy</td>
<td align="center">str</td>
<td align="center">设置代理</td>
<td align="center"></td>
<td align="center">设置程序代理</td>
<td align="center">null</td>
</tr>
<tr>
<td align="center">timeout</td>
@ -142,15 +151,27 @@ async with XHS(path=path,
<tr>
<td align="center">record_data</td>
<td align="center">bool</td>
<td align="center">是否记录作品数据至文件</td>
<td align="center">是否记录作品数据至 <code>TXT</code> 文件</td>
<td align="center">false</td>
</tr>
<tr>
<td align="center">image_format</td>
<td align="center">str</td>
<td align="center">图文作品文件名称后缀,例如:<code>jpg</code><code>png</code></td>
<td align="center">图文作品文件名称后缀,不影响实际文件格式</td>
<td align="center">webp</td>
</tr>
<tr>
<td align="center">video_format</td>
<td align="center">str</td>
<td align="center">视频作品文件名称后缀,不影响实际文件格式</td>
<td align="center">mp4</td>
</tr>
<tr>
<td align="center">folder_mode</td>
<td align="center">bool</td>
<td align="center">是否将每个作品的文件储存至单独的文件夹;文件夹名称与文件名称保持一致</td>
<td align="center">false</td>
</tr>
</tbody>
</table>
<h1>🌐 Cookie</h1>

19
main.py
View File

@ -16,12 +16,15 @@ async def example():
folder_name = "Download" # 作品文件储存文件夹名称自动创建默认值Download
user_agent = "" # 请求头 User-Agent
cookie = "" # 小红书网页版 Cookie无需登录
proxy = "" # 网络代理
timeout = 5 # 网络请求超时限制单位默认值10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位:字节
proxy = None # 网络代理
timeout = 5 # 请求数据超时限制单位默认值10
chunk = 1024 * 1024 * 10 # 下载文件时,每次从服务器获取的数据块大小,单位:字节
max_retry = 2 # 请求数据失败时重试的最大次数单位默认值5
# async with XHS() as xhs:
# pass # 使用默认参数
record_data = False # 是否记录作品数据至文件
image_format = "jpg" # 图文作品文件名称后缀
folder_mode = False # 是否将每个作品的文件储存至单独的文件夹
async with XHS() as xhs:
pass # 使用默认参数
async with XHS(path=path,
folder_name=folder_name,
user_agent=user_agent,
@ -29,7 +32,11 @@ async def example():
proxy=proxy,
timeout=timeout,
chunk=chunk,
max_retry=max_retry, ) as xhs: # 使用自定义参数
max_retry=max_retry,
record_data=record_data,
image_format=image_format,
folder_mode=folder_mode,
) as xhs: # 使用自定义参数
download = True # 是否下载作品文件默认值False
# 返回作品详细信息,包括下载地址
print(await xhs.extract(error_demo, download)) # 获取数据失败时返回空字典

140
source/App.py Normal file
View File

@ -0,0 +1,140 @@
from re import compile
from .Downloader import Download
from .Explore import Explore
from .Html import Html
from .Image import Image
from .Manager import Manager
from .Static import (
ROOT,
ERROR,
WARNING,
)
from .Video import Video
class XHS:
LINK = compile(r"https?://www\.xiaohongshu\.com/explore/[a-z0-9]+")
SHARE = compile(r"https?://www\.xiaohongshu\.com/discovery/item/[a-z0-9]+")
SHORT = compile(r"https?://xhslink\.com/[A-Za-z0-9]+")
__INSTANCE = None
TYPE = {
"视频": "v",
"图文": "n",
}
def __new__(cls, *args, **kwargs):
if not cls.__INSTANCE:
cls.__INSTANCE = super().__new__(cls)
return cls.__INSTANCE
def __init__(
self,
path="",
folder_name="Download",
user_agent: str = None,
cookie: str = None,
proxy: str = None,
timeout=10,
chunk=1024 * 1024,
max_retry=5,
record_data=False,
image_format="webp",
video_format="mp4",
folder_mode=False,
):
self.manager = Manager(
ROOT,
path,
folder_name,
user_agent,
chunk,
cookie,
proxy,
timeout,
max_retry,
record_data,
image_format,
video_format,
folder_mode,
)
self.html = Html(self.manager)
self.image = Image()
self.video = Video()
self.explore = Explore()
self.download = Download(self.manager, )
self.rich_log = self.download.rich_log
def __extract_image(self, container: dict, html: str):
container["下载地址"] = self.image.get_image_link(html)
def __extract_video(self, container: dict, html: str):
container["下载地址"] = self.video.get_video_link(html)
async def __download_files(self, container: dict, download: bool, log, bar):
name = self.__naming_rules(container)
if (u := container["下载地址"]) and download:
await self.download.run(u, name, self.TYPE[container["作品类型"]], log, bar)
elif not u:
self.rich_log(log, "提取作品文件下载地址失败!", ERROR)
self.manager.save_data(name, container)
async def extract(self, url: str, download=False, log=None, bar=None) -> list[dict]:
# return # 调试代码
urls = await self.__extract_links(url)
if not urls:
self.rich_log(log, "提取小红书作品链接失败!", WARNING)
else:
self.rich_log(log, f"{len(urls)} 个小红书作品待处理...")
# return urls # 调试代码
return [await self.__deal_extract(i, download, log, bar) for i in urls]
async def __extract_links(self, url: str) -> list:
urls = []
for i in url.split():
if u := self.SHORT.search(i):
i = await self.html.request_url(
u.group(), False)
if u := self.SHARE.search(i):
urls.append(u.group())
elif u := self.LINK.search(i):
urls.append(u.group())
return urls
async def __deal_extract(self, url: str, download: bool, log, bar):
self.rich_log(log, f"开始处理作品:{url}")
html = await self.html.request_url(url)
# self.rich_log(log, html) # 调试代码
if not html:
self.rich_log(log, f"{url} 获取数据失败!", ERROR)
return {}
data = self.explore.run(html)
# self.rich_log(log, data) # 调试代码
if not data:
self.rich_log(log, f"{url} 提取数据失败!", ERROR)
return {}
match data["作品类型"]:
case "视频":
self.__extract_video(data, html)
case "图文":
self.__extract_image(data, html)
case _:
data["下载地址"] = []
await self.__download_files(data, download, log, bar)
self.rich_log(log, f"作品处理完成:{url}")
return data
def __naming_rules(self, data: dict) -> str:
"""下载文件默认使用 作品标题 或 作品 ID 作为文件名称,可修改此方法自定义文件名称格式"""
return self.manager.filter_name(data["作品标题"]) or data["作品ID"]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()
async def close(self):
self.manager.clean()
await self.html.session.close()
await self.download.session.close()

View File

@ -1,3 +1,6 @@
from pathlib import Path
from aiohttp import ClientOSError
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from aiohttp import ServerDisconnectedError
@ -5,47 +8,53 @@ from aiohttp import ServerTimeoutError
from rich.text import Text
from .Html import retry as re_download
from .Static import ERROR, INFO
__all__ = ['Download']
class Download:
def __init__(
self,
manager,
proxy: str = "",
chunk=1024 * 1024,
timeout=10):
def __init__(self, manager, ):
self.manager = manager
self.folder = manager.folder
self.temp = manager.temp
self.proxy = proxy
self.chunk = chunk
self.proxy = manager.proxy
self.chunk = manager.chunk
self.session = ClientSession(
headers={"User-Agent": manager.headers["User-Agent"]},
timeout=ClientTimeout(connect=timeout))
timeout=ClientTimeout(connect=manager.timeout))
self.retry = manager.retry
self.folder_mode = manager.folder_mode
self.video_format = manager.video_format
self.image_format = manager.image_format
async def run(self, urls: list, name: str, type_: str, log, bar):
path = self.__generate_path(name)
if type_ == "v":
await self.__download(urls[0], f"{name}.mp4", log, bar)
await self.__download(urls[0], path, f"{name}", self.video_format, log, bar)
elif type_ == "n":
for index, url in enumerate(urls, start=1):
await self.__download(url, f"{name}_{index}.{self.image_format}", log, bar)
await self.__download(url, path, f"{name}_{index}", self.image_format, log, bar)
else:
raise ValueError
def __generate_path(self, name: str):
path = self.manager.archive(self.folder, name, self.folder_mode)
path.mkdir(exist_ok=True)
return path
@re_download
async def __download(self, url: str, name: str, log, bar):
temp = self.temp.joinpath(name)
file = self.folder.joinpath(name)
if self.manager.is_exists(file):
self.rich_log(log, f"{name} 已存在,跳过下载")
return True
async def __download(self, url: str, path: Path, name: str, format_: str, log, bar):
try:
async with self.session.get(url, proxy=self.proxy) as response:
suffix = self.__extract_type(
response.headers.get("Content-Type", "")) or format_
temp = self.temp.joinpath(name)
file = path.joinpath(name).with_suffix(f".{suffix}")
if self.manager.is_exists(file):
self.rich_log(log, f"{name} 已存在,跳过下载!")
return True
# self.__create_progress(
# bar, int(
# response.headers.get(
@ -56,30 +65,36 @@ class Download:
# self.__update_progress(bar, len(chunk))
self.manager.move(temp, file)
# self.__create_progress(bar, None)
self.rich_log(log, f"{name} 下载成功")
self.rich_log(log, f"{name} 下载成功")
return True
except (
ServerTimeoutError,
ServerDisconnectedError,
ClientOSError,
):
self.manager.delete(temp)
# self.__create_progress(bar, None)
self.rich_log(log, f"{name} 下载失败", "bright_red")
self.rich_log(log, f"{name} 下载失败", ERROR)
return False
# @staticmethod
# def __create_progress(bar, total: int | None):
# if bar:
# bar.update(total=total)
# @staticmethod
# def __update_progress(bar, advance: int):
# if bar:
# bar.advance(advance)
@staticmethod
def __create_progress(bar, total: int | None):
if bar:
bar.update(total=total)
@staticmethod
def rich_log(log, text, style="bright_green"):
def __update_progress(bar, advance: int):
if bar:
bar.advance(advance)
@staticmethod
def __extract_type(content: str) -> str:
return "" if content == "application/octet-stream" else content.split(
"/")[-1]
@staticmethod
def rich_log(log, text, style=INFO):
if log:
log.write(Text(text, style=f"b {style}"))
log.write(Text(text, style=style))
else:
print(text)
print(Text(text, style=style))

View File

@ -1,3 +1,4 @@
from aiohttp import ClientOSError
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from aiohttp import ServerDisconnectedError
@ -20,19 +21,14 @@ def retry(function):
class Html:
def __init__(
self,
headers: dict,
proxy: str = "",
timeout=10,
retry_=5, ):
self.proxy = proxy
def __init__(self, manager, ):
self.proxy = manager.proxy
self.session = ClientSession(
headers=headers | {
headers=manager.headers | {
"Referer": "https://www.xiaohongshu.com/", },
timeout=ClientTimeout(connect=timeout),
timeout=ClientTimeout(connect=manager.timeout),
)
self.retry = retry_
self.retry = manager.retry
@retry
async def request_url(
@ -48,6 +44,7 @@ class Html:
except (
ServerTimeoutError,
ServerDisconnectedError,
ClientOSError,
):
return ""

View File

@ -1,39 +1,18 @@
from json import loads
from re import compile
from .Html import Html
__all__ = ['Image']
class Image:
IMAGE_INFO = compile(r'("infoList":\[\{.*?}])')
IMAGE_TOKEN = compile(
r"http://sns-webpic-qc.xhscdn.com/\d+/\w+/(\w+)!")
r'"urlDefault":"http:\\u002F\\u002Fsns-webpic-qc\.xhscdn\.com\\u002F\d+?\\u002F\S+?\\u002F(\S+?)!')
def get_image_link(self, html: str) -> list:
data = self.__extract_image_data(html)
data = self.__format_image_data(data)
return self.__extract_image_urls(data)
def __extract_image_data(self, html: str) -> list[str]:
return self.IMAGE_INFO.findall(html)
@staticmethod
def __format_image_data(data: list[str]) -> list[dict]:
return [loads(f"{{{i}}}") for i in data]
return [Html.format_url(self.__generate_image_link(i))
for i in self.IMAGE_TOKEN.findall(html)]
@staticmethod
def __generate_image_link(token: str) -> str:
return f"https://sns-img-bd.xhscdn.com/{token}"
def __extract_image_token(self, url: str) -> str:
return self.__generate_image_link(token.group(1)) if (
token := self.IMAGE_TOKEN.search(url)) else ""
def __extract_image_urls(self, data: list[dict]) -> list[str]:
urls = []
for i in data:
for j in i.get("infoList", []):
if j.get("imageScene", "") == "WB_DFT":
urls.append(self.__extract_image_token(j.get("url", "")))
break
return [i for i in urls if i]

View File

@ -18,10 +18,15 @@ class Manager:
path: str,
folder: str,
user_agent: str,
chunk: int,
cookie: str,
proxy: str,
timeout: int,
retry: int,
record_data: bool,
image_format: str,
video_format: str,
folder_mode: bool,
):
self.root = root
self.temp = root.joinpath("./temp")
@ -37,8 +42,13 @@ class Manager:
"-bcc2-a859e97518bf; unread={%22ub%22:%22655eb3d60000000032033955%22%2C%22ue%22:%22656"
"e9ef2000000003801ff3d%22%2C%22uc%22:29}; cache_feeds=[]"}
self.retry = retry
self.chunk = chunk
self.record_data = record_data
self.image_format = image_format
self.video_format = video_format
self.folder_mode = folder_mode
self.timeout = timeout
self.proxy = proxy
def __init_root(self, root: Path, path: str, folder: str) -> Path:
if path and (r := Path(path)).is_dir():
@ -57,6 +67,10 @@ class Manager:
def delete(path: Path):
path.unlink()
@staticmethod
def archive(root: Path, name: str, folder_mode: bool) -> Path:
return root.joinpath(name) if folder_mode else root
@staticmethod
def move(temp: Path, path: Path):
move(temp.resolve(), path.resolve())

View File

@ -12,12 +12,14 @@ class Settings:
"folder_name": "Download",
"user_agent": "",
"cookie": "",
"proxy": "",
"proxy": None,
"timeout": 10,
"chunk": 1024 * 1024,
"max_retry": 5,
"record_data": False,
"image_format": "webp",
"video_format": "mp4",
"folder_mode": False,
}
encode = "UTF-8-SIG" if system() == "Windows" else "UTF-8"

53
source/Static.py Normal file
View File

@ -0,0 +1,53 @@
from pathlib import Path
__all__ = [
"VERSION_MAJOR",
"VERSION_MINOR",
"VERSION_BETA",
"ROOT",
"REPOSITORY",
"LICENCE",
"RELEASES",
"MASTER",
"PROMPT",
"GENERAL",
"PROGRESS",
"ERROR",
"WARNING",
"INFO",
"DISCLAIMER_TEXT"
]
VERSION_MAJOR = 1
VERSION_MINOR = 7
VERSION_BETA = False
ROOT = Path(__file__).resolve().parent.parent
REPOSITORY = "https://github.com/JoeanAmier/XHS-Downloader"
LICENCE = "GNU General Public License v3.0"
RELEASES = "https://github.com/JoeanAmier/XHS-Downloader/releases/latest"
DISCLAIMER_TEXT = (
"关于 XHS-Downloader 的 免责声明:",
"",
"1. 使用者对本项目的使用由使用者自行决定,并自行承担风险。作者对使用者使用本项目所产生的任何损失、责任、或风险概不负责。",
"2. 本项目的作者提供的代码和功能是基于现有知识和技术的开发成果。作者尽力确保代码的正确性和安全性,但不保证代码完全没有错误或缺陷。",
"3. 使用者在使用本项目时必须严格遵守 GNU General Public License v3.0 的要求,并在适当的地方注明使用了 GNU General Public License v3.0 的代码。",
"4. 使用者在任何情况下均不得将本项目的作者、贡献者或其他相关方与使用者的使用行为联系起来,或要求其对使用者使用本项目所产生的任何损失或损害负责。",
"5. 使用者在使用本项目的代码和功能时,必须自行研究相关法律法规,并确保其使用行为合法合规。任何因违反法律法规而导致的法律责任和风险,均由使用者自行承担。",
"6. 本项目的作者不会提供 XHS-Downloader 项目的付费版本,也不会提供与 XHS-Downloader 项目相关的任何商业服务。",
"7. 基于本项目进行的任何二次开发、修改或编译的程序与原创作者无关,原创作者不承担与二次开发行为或其结果相关的任何责任,使用者应自行对因"
"二次开发可能带来的各种情况负全部责任。",
"",
"在使用本项目的代码和功能之前,请您认真考虑并接受以上免责声明。如果您对上述声明有任何疑问或不同意,请不要使用本项目的代码和功能。如果"
"您使用了本项目的代码和功能,则视为您已完全理解并接受上述免责声明,并自愿承担使用本项目的一切风险和后果。",
"",
">" * 50,
)
MASTER = "b #fff200"
PROMPT = "b turquoise2"
GENERAL = "b bright_white"
PROGRESS = "b bright_magenta"
ERROR = "b bright_red"
WARNING = "b bright_yellow"
INFO = "b bright_green"

148
source/TUI.py Normal file
View File

@ -0,0 +1,148 @@
from pyperclip import paste
from rich.text import Text
from textual.app import App
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Center
from textual.containers import HorizontalScroll
from textual.containers import ScrollableContainer
from textual.widgets import Button
from textual.widgets import Footer
from textual.widgets import Header
from textual.widgets import Input
from textual.widgets import Label
from textual.widgets import ProgressBar
from textual.widgets import RichLog
from .App import XHS
from .Settings import Settings
from .Static import (
VERSION_MAJOR,
VERSION_MINOR,
VERSION_BETA,
ROOT,
PROMPT,
MASTER,
ERROR,
WARNING,
INFO,
LICENCE,
REPOSITORY,
RELEASES,
GENERAL,
DISCLAIMER_TEXT,
)
def show_state(function):
async def inner(self, *args, **kwargs):
self.close_show()
self.bar.update(total=100, progress=100)
result = await function(self, *args, **kwargs)
self.bar.update(total=None)
self.tip.write(Text(">" * 50, style=GENERAL))
return result
return inner
class XHSDownloader(App):
CSS_PATH = ROOT.joinpath(
"static/XHS-Downloader.tcss")
BINDINGS = [
Binding(key="q", action="quit", description="退出程序"),
# ("d", "toggle_dark", "切换主题"),
Binding(key="u", action="check_update", description="检查更新"),
]
def __init__(self):
super().__init__()
self.APP = XHS(**Settings(ROOT).run())
self.url = None
self.tip = None
self.bar = None
self.show = True
async def __aenter__(self):
await self.APP.__aenter__()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.APP.__aexit__(exc_type, exc_value, traceback)
def compose(self) -> ComposeResult:
yield Header()
yield ScrollableContainer(Label(Text(f"开源协议:{LICENCE}", style=MASTER)),
Label(
Text(
f"项目地址:{REPOSITORY}",
style=MASTER)),
Label(Text("请输入小红书图文/视频作品链接:",
style=PROMPT), id="prompt"),
Input(placeholder="多个链接之间使用空格分隔"),
HorizontalScroll(Button("下载无水印图片/视频", id="deal"),
Button("读取剪贴板", id="paste"),
Button("清空输入框", id="reset"), ),
# Label(Text("准备就绪", style=INFO), id="state"),
)
with Center():
yield ProgressBar(total=None, show_percentage=False, show_eta=False)
yield RichLog(markup=True)
yield Footer()
def on_mount(self) -> None:
self.title = f"XHS-Downloader V{VERSION_MAJOR}.{
VERSION_MINOR}{" Beta" if VERSION_BETA else ""}"
def on_ready(self) -> None:
self.url = self.query_one(Input)
self.tip = self.query_one(RichLog)
self.bar = self.query_one(ProgressBar)
self.tip.write(Text("\n".join(DISCLAIMER_TEXT), style=MASTER))
def close_show(self):
if self.show:
self.tip.clear()
self.show = False
async def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "deal":
await self.deal()
elif event.button.id == "reset":
self.query_one(Input).value = ""
elif event.button.id == "paste":
self.query_one(Input).value = paste()
@show_state
async def deal(self):
if not self.url.value:
self.tip.write(Text("未输入任何小红书作品链接!", style=WARNING))
return
if any(await self.APP.extract(self.url.value, True, log=self.tip)):
self.url.value = ""
else:
self.tip.write(Text("下载小红书作品文件失败!", style=ERROR))
@show_state
async def action_check_update(self):
self.tip.write(Text("正在检查新版本,请稍等...", style=WARNING))
try:
url = await self.APP.html.request_url(RELEASES, False)
latest_major, latest_minor = map(
int, url.split("/")[-1].split(".", 1))
if latest_major > VERSION_MAJOR or latest_minor > VERSION_MINOR:
self.tip.write(
Text(
f"检测到新版本:{latest_major}.{latest_minor}",
style=WARNING))
self.tip.write(RELEASES)
elif latest_minor == VERSION_MINOR and VERSION_BETA:
self.tip.write(
Text("当前版本为开发版, 可更新至正式版!", style=WARNING))
self.tip.write(RELEASES)
elif VERSION_BETA:
self.tip.write(Text("当前已是最新开发版!", style=WARNING))
else:
self.tip.write(Text("当前已是最新正式版!", style=INFO))
except ValueError:
self.tip.write(Text("检测新版本失败!", style=ERROR))

View File

@ -1,247 +1,4 @@
from pathlib import Path
from re import compile
from pyperclip import paste
from rich.text import Text
from textual.app import App
from textual.app import ComposeResult
from textual.binding import Binding
# from textual.containers import Center
from textual.containers import HorizontalScroll
from textual.containers import ScrollableContainer
from textual.widgets import Button
from textual.widgets import Footer
from textual.widgets import Header
from textual.widgets import Input
from textual.widgets import Label
# from textual.widgets import ProgressBar
from textual.widgets import RichLog
from .Downloader import Download
from .Explore import Explore
from .Html import Html
from .Image import Image
from .Manager import Manager
from .Settings import Settings
from .Video import Video
from .App import XHS
from .TUI import XHSDownloader
__all__ = ['XHS', 'XHSDownloader']
RELEASES = "https://github.com/JoeanAmier/XHS-Downloader/releases/latest"
VERSION = 1.7
BETA = True
ROOT = Path(__file__).resolve().parent.parent
class XHS:
LINK = compile(r"https://www\.xiaohongshu\.com/explore/[a-z0-9]+")
SHARE = compile(r"https://www\.xiaohongshu\.com/discovery/item/[a-z0-9]+")
SHORT = compile(r"https://xhslink\.com/[A-Za-z0-9]+")
__INSTANCE = None
TYPE = {
"视频": "v",
"图文": "n",
}
def __new__(cls, *args, **kwargs):
if not cls.__INSTANCE:
cls.__INSTANCE = super().__new__(cls)
return cls.__INSTANCE
def __init__(
self,
path="",
folder_name="Download",
user_agent: str = None,
cookie: str = None,
proxy: str = "",
timeout=10,
chunk=1024 * 1024,
max_retry=5,
record_data=False,
image_format="webp",
**kwargs,
):
self.manager = Manager(
ROOT,
path,
folder_name,
user_agent,
cookie,
max_retry,
record_data,
image_format,
)
self.html = Html(
self.manager.headers,
proxy,
timeout,
self.manager.retry)
self.image = Image()
self.video = Video()
self.explore = Explore()
self.download = Download(
self.manager,
proxy,
chunk,
timeout, )
self.rich_log = self.download.rich_log
def __extract_image(self, container: dict, html: str):
container["下载地址"] = self.image.get_image_link(html)
def __extract_video(self, container: dict, html: str):
container["下载地址"] = self.video.get_video_link(html)
async def __download_files(self, container: dict, download: bool, log, bar):
name = self.__naming_rules(container)
if download and (u := container["下载地址"]):
await self.download.run(u, name, self.TYPE[container["作品类型"]], log, bar)
self.manager.save_data(name, container)
async def extract(self, url: str, download=False, log=None, bar=None) -> list[dict]:
# return # 调试代码
urls = await self.__extract_links(url)
if not urls:
self.rich_log(log, "提取小红书作品链接失败", "bright_red")
else:
self.rich_log(log, f"{len(urls)} 个小红书作品待处理")
# return urls # 调试代码
return [await self.__deal_extract(i, download, log, bar) for i in urls]
async def __extract_links(self, url: str) -> list:
urls = []
for i in url.split():
if u := self.SHORT.search(i):
i = await self.html.request_url(
u.group(), False)
if u := self.SHARE.search(i):
urls.append(u.group())
elif u := self.LINK.search(i):
urls.append(u.group())
return urls
async def __deal_extract(self, url: str, download: bool, log, bar):
self.rich_log(log, f"开始处理:{url}")
html = await self.html.request_url(url)
# self.rich_log(log, html) # 调试代码
if not html:
self.rich_log(log, f"{url} 获取数据失败", "bright_red")
return {}
data = self.explore.run(html)
# self.rich_log(log, data) # 调试代码
if not data:
self.rich_log(log, f"{url} 提取数据失败", "bright_red")
return {}
match data["作品类型"]:
case "视频":
self.__extract_video(data, html)
case "图文":
self.__extract_image(data, html)
case _:
data["下载地址"] = []
await self.__download_files(data, download, log, bar)
self.rich_log(log, f"完成处理:{url}")
return data
def __naming_rules(self, data: dict) -> str:
"""下载文件默认使用 作品标题 或 作品 ID 作为文件名称,可修改此方法自定义文件名称格式"""
return self.manager.filter_name(data["作品标题"]) or data["作品ID"]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()
async def close(self):
self.manager.clean()
await self.html.session.close()
await self.download.session.close()
class XHSDownloader(App):
CSS_PATH = ROOT.joinpath(
"static/XHS-Downloader.tcss")
BINDINGS = [
Binding(key="q", action="quit", description="退出程序"),
("d", "toggle_dark", "切换主题"),
Binding(key="u", action="check_update", description="检查更新"),
]
def __init__(self):
super().__init__()
self.APP = XHS(**Settings(ROOT).run())
self.url = None
self.log_ = None
self.bar = None
async def __aenter__(self):
await self.APP.__aenter__()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.APP.__aexit__(exc_type, exc_value, traceback)
def compose(self) -> ComposeResult:
# yield LoadingIndicator()
yield Header()
yield ScrollableContainer(Label(Text("请输入小红书图文/视频作品链接:", style="b bright_blue")),
Input(placeholder="多个链接之间使用空格分隔"),
HorizontalScroll(Button("下载无水印图片/视频", id="deal"),
Button("读取剪贴板", id="paste"),
Button("清空输入框", id="reset"), ),
# Label(Text("程序状态", style="b bright_blue")),
)
# with Center():
# yield ProgressBar(total=None)
yield RichLog(markup=True)
yield Footer()
def on_mount(self) -> None:
self.title = f"XHS-Downloader V{VERSION}{" Beta" if BETA else ""}"
async def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "deal":
await self.deal()
elif event.button.id == "reset":
self.query_one(Input).value = ""
elif event.button.id == "paste":
self.query_one(Input).value = paste()
async def deal(self):
self.__init_objects()
if not self.url.value:
self.log_.write(Text("未输入任何小红书作品链接", style="b bright_yellow"))
return
if any(await self.APP.extract(self.url.value, True, log=self.log_, bar=self.bar)):
self.url.value = ""
else:
self.log_.write(Text("下载小红书作品文件失败", style="b bright_red"))
def __init_objects(self):
if any((self.url, self.log_, self.bar)):
return
self.url = self.query_one(Input)
self.log_ = self.query_one(RichLog)
# self.bar = self.query_one(ProgressBar)
async def action_check_update(self):
self.__init_objects()
try:
url = await self.APP.html.request_url(RELEASES, False)
tag = float(url.split("/")[-1])
if tag > VERSION:
self.log_.write(
Text(f"检测到新版本: {tag}", style="b bright_yellow"))
self.log_.write(RELEASES)
elif tag == VERSION and BETA:
self.log_.write(
Text("当前版本为开发版, 可更新至正式版", style="b bright_yellow"))
self.log_.write(RELEASES)
elif BETA:
self.log_.write(Text("当前已是最新开发版", style="b bright_yellow"))
else:
self.log_.write(Text("当前已是最新正式版", style="b bright_green"))
except ValueError:
self.log_.write(Text("检测新版本失败", style="b bright_red"))

View File

@ -11,11 +11,19 @@ Button#reset {
}
Label {
width: 100%;
padding: 1;
content-align-horizontal: center;
content-align-vertical: middle;
text-style: bold;
}
Bar > .bar--indeterminate {
color: #2ed573;
Label#prompt {
padding: 1;
}
Bar {
width: 33vw;
}
Bar > .bar--indeterminate {
color: #7bed9f;
}
Bar > .bar--complete {
color: #ff7f50;
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 123 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 136 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 140 KiB