diff --git a/README.md b/README.md
index 9323f52..8274e23 100644
--- a/README.md
+++ b/README.md
@@ -6,24 +6,24 @@
-
+🔥 小红书作品采集工具:采集小红书作品信息;提取小红书作品下载地址;下载小红书无水印作品文件!
📑 功能清单
-- ✅ 采集小红书图文/视频作品信息
-- ✅ 提取小红书图文/视频作品下载地址
-- ✅ 下载小红书无水印图文/视频作品文件
+- ✅ 采集小红书图文 / 视频作品信息
+- ✅ 提取小红书图文 / 视频作品下载地址
+- ✅ 下载小红书无水印图文 / 视频作品文件
- ✅ 自动跳过已下载的作品文件
- ✅ 作品文件完整性处理机制
- ✅ 持久化储存作品信息至文件
+- ✅ 作品文件储存至单独文件夹
- ☑️ 后台监听剪贴板下载作品
- ☑️ 支持 API 调用功能
📸 程序截图
-
-
-
+🎥 点击图片观看演示视频
+
🔗 支持链接
https://www.xiaohongshu.com/explore/作品ID
@@ -35,18 +35,19 @@
🪟 关于终端
⭐ 推荐使用 Windows 终端 (Windows 11 自带默认终端)运行程序以便获得最佳显示效果!
🥣 使用方法
-如果仅需下载作品文件,选择 直接运行 或者 源码运行 均可,如果需要获取作品信息,则需要进行二次开发进行调用。
-🖱 直接运行
-前往 Releases 下载程序压缩包,解压后打开程序文件夹,双击运行 main.exe 即可使用。
+如果仅需下载无水印作品文件,建议选择 程序运行;如果有其他需求,建议选择 源码运行!
+🖱 程序运行
+Windows 10 及以上用户可前往 Releases 下载程序压缩包,解压后打开程序文件夹,双击运行 main.exe 即可使用。
+若通过此方式使用程序,文件默认下载路径:.\_internal\Download;配置文件路径:.\_internal\settings.json
⌨️ 源码运行
- 安装版本号不低于
3.12 的 Python 解释器
-- 运行
pip install -r requirements.txt 命令安装程序所需模块
+- 运行
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple -r requirements.txt 命令安装程序所需模块
- 下载本项目最新的源码或 Releases 发布的源码至本地
- 运行
main.py 即可使用
-💻 二次开发
-如果需要获取小红书图文/视频作品信息,可以根据 main.py 的注释提示进行代码调用。
+💻 二次开发
+如果有其他需求,可以根据 main.py 的注释提示进行代码调用或修改!
# 测试链接
error_demo = "https://github.com/JoeanAmier/XHS_Downloader"
@@ -58,12 +59,15 @@ path = "" # 作品数据/文件保存根路径,默认值:项目根路径
folder_name = "Download" # 作品文件储存文件夹名称(自动创建),默认值:Download
user_agent = "" # 请求头 User-Agent
cookie = "" # 小红书网页版 Cookie,无需登录
-proxy = "" # 网络代理
-timeout = 5 # 网络请求超时限制,单位:秒,默认值:10
-chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位:字节
+proxy = None # 网络代理
+timeout = 5 # 请求数据超时限制,单位:秒,默认值:10
+chunk = 1024 * 1024 * 10 # 下载文件时,每次从服务器获取的数据块大小,单位:字节
max_retry = 2 # 请求数据失败时,重试的最大次数,单位:秒,默认值:5
-# async with XHS() as xhs:
-# pass # 使用默认参数
+record_data = False # 是否记录作品数据至文件
+image_format = "jpg" # 图文作品文件名称后缀
+folder_mode = False # 是否将每个作品的文件储存至单独的文件夹
+async with XHS() as xhs:
+ pass # 使用默认参数
async with XHS(path=path,
folder_name=folder_name,
user_agent=user_agent,
@@ -71,7 +75,11 @@ async with XHS(path=path,
proxy=proxy,
timeout=timeout,
chunk=chunk,
- max_retry=max_retry, ) as xhs: # 使用自定义参数
+ max_retry=max_retry,
+ record_data=record_data,
+ image_format=image_format,
+ folder_mode=folder_mode,
+ ) as xhs: # 使用自定义参数
download = True # 是否下载作品文件,默认值:False
# 返回作品详细信息,包括下载地址
print(await xhs.extract(error_demo, download)) # 获取数据失败时返回空字典
@@ -81,6 +89,7 @@ async with XHS(path=path,
⚙️ 配置文件
项目根目录下的 settings.json 文件,首次运行自动生成,可以自定义部分运行参数。
+如果您的计算机没有合适的程序编辑 JSON 文件,建议使用 JSON 在线工具 编辑配置文件内容
@@ -112,14 +121,14 @@ async with XHS(path=path,
| cookie |
str |
-小红书网页版 Cookie,无需登录 |
+小红书网页版 Cookie,无需登录 |
默认 Cookie |
| proxy |
str |
-设置代理 |
-无 |
+设置程序代理 |
+null |
| timeout |
@@ -142,15 +151,27 @@ async with XHS(path=path,
| record_data |
bool |
-是否记录作品数据至文件 |
+是否记录作品数据至 TXT 文件 |
false |
| image_format |
str |
-图文作品文件名称后缀,例如:jpg、png |
+图文作品文件名称后缀,不影响实际文件格式 |
webp |
+
+| video_format |
+str |
+视频作品文件名称后缀,不影响实际文件格式 |
+mp4 |
+
+
+| folder_mode |
+bool |
+是否将每个作品的文件储存至单独的文件夹;文件夹名称与文件名称保持一致 |
+false |
+
🌐 Cookie
diff --git a/main.py b/main.py
index 638e454..ef05d9f 100644
--- a/main.py
+++ b/main.py
@@ -16,12 +16,15 @@ async def example():
folder_name = "Download" # 作品文件储存文件夹名称(自动创建),默认值:Download
user_agent = "" # 请求头 User-Agent
cookie = "" # 小红书网页版 Cookie,无需登录
- proxy = "" # 网络代理
- timeout = 5 # 网络请求超时限制,单位:秒,默认值:10
- chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位:字节
+ proxy = None # 网络代理
+ timeout = 5 # 请求数据超时限制,单位:秒,默认值:10
+ chunk = 1024 * 1024 * 10 # 下载文件时,每次从服务器获取的数据块大小,单位:字节
max_retry = 2 # 请求数据失败时,重试的最大次数,单位:秒,默认值:5
- # async with XHS() as xhs:
- # pass # 使用默认参数
+ record_data = False # 是否记录作品数据至文件
+ image_format = "jpg" # 图文作品文件名称后缀
+ folder_mode = False # 是否将每个作品的文件储存至单独的文件夹
+ async with XHS() as xhs:
+ pass # 使用默认参数
async with XHS(path=path,
folder_name=folder_name,
user_agent=user_agent,
@@ -29,7 +32,11 @@ async def example():
proxy=proxy,
timeout=timeout,
chunk=chunk,
- max_retry=max_retry, ) as xhs: # 使用自定义参数
+ max_retry=max_retry,
+ record_data=record_data,
+ image_format=image_format,
+ folder_mode=folder_mode,
+ ) as xhs: # 使用自定义参数
download = True # 是否下载作品文件,默认值:False
# 返回作品详细信息,包括下载地址
print(await xhs.extract(error_demo, download)) # 获取数据失败时返回空字典
diff --git a/source/App.py b/source/App.py
new file mode 100644
index 0000000..be8d2af
--- /dev/null
+++ b/source/App.py
@@ -0,0 +1,140 @@
+from re import compile
+
+from .Downloader import Download
+from .Explore import Explore
+from .Html import Html
+from .Image import Image
+from .Manager import Manager
+from .Static import (
+ ROOT,
+ ERROR,
+ WARNING,
+)
+from .Video import Video
+
+
+class XHS:
+ LINK = compile(r"https?://www\.xiaohongshu\.com/explore/[a-z0-9]+")
+ SHARE = compile(r"https?://www\.xiaohongshu\.com/discovery/item/[a-z0-9]+")
+ SHORT = compile(r"https?://xhslink\.com/[A-Za-z0-9]+")
+ __INSTANCE = None
+ TYPE = {
+ "视频": "v",
+ "图文": "n",
+ }
+
+ def __new__(cls, *args, **kwargs):
+ if not cls.__INSTANCE:
+ cls.__INSTANCE = super().__new__(cls)
+ return cls.__INSTANCE
+
+ def __init__(
+ self,
+ path="",
+ folder_name="Download",
+ user_agent: str = None,
+ cookie: str = None,
+ proxy: str = None,
+ timeout=10,
+ chunk=1024 * 1024,
+ max_retry=5,
+ record_data=False,
+ image_format="webp",
+ video_format="mp4",
+ folder_mode=False,
+ ):
+ self.manager = Manager(
+ ROOT,
+ path,
+ folder_name,
+ user_agent,
+ chunk,
+ cookie,
+ proxy,
+ timeout,
+ max_retry,
+ record_data,
+ image_format,
+ video_format,
+ folder_mode,
+ )
+ self.html = Html(self.manager)
+ self.image = Image()
+ self.video = Video()
+ self.explore = Explore()
+ self.download = Download(self.manager, )
+ self.rich_log = self.download.rich_log
+
+ def __extract_image(self, container: dict, html: str):
+ container["下载地址"] = self.image.get_image_link(html)
+
+ def __extract_video(self, container: dict, html: str):
+ container["下载地址"] = self.video.get_video_link(html)
+
+ async def __download_files(self, container: dict, download: bool, log, bar):
+ name = self.__naming_rules(container)
+ if (u := container["下载地址"]) and download:
+ await self.download.run(u, name, self.TYPE[container["作品类型"]], log, bar)
+ elif not u:
+ self.rich_log(log, "提取作品文件下载地址失败!", ERROR)
+ self.manager.save_data(name, container)
+
+ async def extract(self, url: str, download=False, log=None, bar=None) -> list[dict]:
+ # return # 调试代码
+ urls = await self.__extract_links(url)
+ if not urls:
+ self.rich_log(log, "提取小红书作品链接失败!", WARNING)
+ else:
+ self.rich_log(log, f"共 {len(urls)} 个小红书作品待处理...")
+ # return urls # 调试代码
+ return [await self.__deal_extract(i, download, log, bar) for i in urls]
+
+ async def __extract_links(self, url: str) -> list:
+ urls = []
+ for i in url.split():
+ if u := self.SHORT.search(i):
+ i = await self.html.request_url(
+ u.group(), False)
+ if u := self.SHARE.search(i):
+ urls.append(u.group())
+ elif u := self.LINK.search(i):
+ urls.append(u.group())
+ return urls
+
+ async def __deal_extract(self, url: str, download: bool, log, bar):
+ self.rich_log(log, f"开始处理作品:{url}")
+ html = await self.html.request_url(url)
+ # self.rich_log(log, html) # 调试代码
+ if not html:
+ self.rich_log(log, f"{url} 获取数据失败!", ERROR)
+ return {}
+ data = self.explore.run(html)
+ # self.rich_log(log, data) # 调试代码
+ if not data:
+ self.rich_log(log, f"{url} 提取数据失败!", ERROR)
+ return {}
+ match data["作品类型"]:
+ case "视频":
+ self.__extract_video(data, html)
+ case "图文":
+ self.__extract_image(data, html)
+ case _:
+ data["下载地址"] = []
+ await self.__download_files(data, download, log, bar)
+ self.rich_log(log, f"作品处理完成:{url}")
+ return data
+
+ def __naming_rules(self, data: dict) -> str:
+ """下载文件默认使用 作品标题 或 作品 ID 作为文件名称,可修改此方法自定义文件名称格式"""
+ return self.manager.filter_name(data["作品标题"]) or data["作品ID"]
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_value, traceback):
+ await self.close()
+
+ async def close(self):
+ self.manager.clean()
+ await self.html.session.close()
+ await self.download.session.close()
diff --git a/source/Downloader.py b/source/Downloader.py
index 330fc25..bde1f17 100644
--- a/source/Downloader.py
+++ b/source/Downloader.py
@@ -1,3 +1,6 @@
+from pathlib import Path
+
+from aiohttp import ClientOSError
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from aiohttp import ServerDisconnectedError
@@ -5,47 +8,53 @@ from aiohttp import ServerTimeoutError
from rich.text import Text
from .Html import retry as re_download
+from .Static import ERROR, INFO
__all__ = ['Download']
class Download:
- def __init__(
- self,
- manager,
- proxy: str = "",
- chunk=1024 * 1024,
- timeout=10):
+ def __init__(self, manager, ):
self.manager = manager
self.folder = manager.folder
self.temp = manager.temp
- self.proxy = proxy
- self.chunk = chunk
+ self.proxy = manager.proxy
+ self.chunk = manager.chunk
self.session = ClientSession(
headers={"User-Agent": manager.headers["User-Agent"]},
- timeout=ClientTimeout(connect=timeout))
+ timeout=ClientTimeout(connect=manager.timeout))
self.retry = manager.retry
+ self.folder_mode = manager.folder_mode
+ self.video_format = manager.video_format
self.image_format = manager.image_format
async def run(self, urls: list, name: str, type_: str, log, bar):
+ path = self.__generate_path(name)
if type_ == "v":
- await self.__download(urls[0], f"{name}.mp4", log, bar)
+ await self.__download(urls[0], path, f"{name}", self.video_format, log, bar)
elif type_ == "n":
for index, url in enumerate(urls, start=1):
- await self.__download(url, f"{name}_{index}.{self.image_format}", log, bar)
+ await self.__download(url, path, f"{name}_{index}", self.image_format, log, bar)
else:
raise ValueError
+ def __generate_path(self, name: str):
+ path = self.manager.archive(self.folder, name, self.folder_mode)
+ path.mkdir(exist_ok=True)
+ return path
+
@re_download
- async def __download(self, url: str, name: str, log, bar):
- temp = self.temp.joinpath(name)
- file = self.folder.joinpath(name)
- if self.manager.is_exists(file):
- self.rich_log(log, f"{name} 已存在,跳过下载")
- return True
+ async def __download(self, url: str, path: Path, name: str, format_: str, log, bar):
try:
async with self.session.get(url, proxy=self.proxy) as response:
+ suffix = self.__extract_type(
+ response.headers.get("Content-Type", "")) or format_
+ temp = self.temp.joinpath(name)
+ file = path.joinpath(name).with_suffix(f".{suffix}")
+ if self.manager.is_exists(file):
+ self.rich_log(log, f"{name} 已存在,跳过下载!")
+ return True
# self.__create_progress(
# bar, int(
# response.headers.get(
@@ -56,30 +65,36 @@ class Download:
# self.__update_progress(bar, len(chunk))
self.manager.move(temp, file)
# self.__create_progress(bar, None)
- self.rich_log(log, f"{name} 下载成功")
+ self.rich_log(log, f"{name} 下载成功!")
return True
except (
ServerTimeoutError,
ServerDisconnectedError,
+ ClientOSError,
):
self.manager.delete(temp)
# self.__create_progress(bar, None)
- self.rich_log(log, f"{name} 下载失败", "bright_red")
+ self.rich_log(log, f"{name} 下载失败!", ERROR)
return False
- # @staticmethod
- # def __create_progress(bar, total: int | None):
- # if bar:
- # bar.update(total=total)
-
- # @staticmethod
- # def __update_progress(bar, advance: int):
- # if bar:
- # bar.advance(advance)
+ @staticmethod
+ def __create_progress(bar, total: int | None):
+ if bar:
+ bar.update(total=total)
@staticmethod
- def rich_log(log, text, style="bright_green"):
+ def __update_progress(bar, advance: int):
+ if bar:
+ bar.advance(advance)
+
+ @staticmethod
+ def __extract_type(content: str) -> str:
+ return "" if content == "application/octet-stream" else content.split(
+ "/")[-1]
+
+ @staticmethod
+ def rich_log(log, text, style=INFO):
if log:
- log.write(Text(text, style=f"b {style}"))
+ log.write(Text(text, style=style))
else:
- print(text)
+ print(Text(text, style=style))
diff --git a/source/Html.py b/source/Html.py
index 9925eb9..9eee55a 100644
--- a/source/Html.py
+++ b/source/Html.py
@@ -1,3 +1,4 @@
+from aiohttp import ClientOSError
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from aiohttp import ServerDisconnectedError
@@ -20,19 +21,14 @@ def retry(function):
class Html:
- def __init__(
- self,
- headers: dict,
- proxy: str = "",
- timeout=10,
- retry_=5, ):
- self.proxy = proxy
+ def __init__(self, manager, ):
+ self.proxy = manager.proxy
self.session = ClientSession(
- headers=headers | {
+ headers=manager.headers | {
"Referer": "https://www.xiaohongshu.com/", },
- timeout=ClientTimeout(connect=timeout),
+ timeout=ClientTimeout(connect=manager.timeout),
)
- self.retry = retry_
+ self.retry = manager.retry
@retry
async def request_url(
@@ -48,6 +44,7 @@ class Html:
except (
ServerTimeoutError,
ServerDisconnectedError,
+ ClientOSError,
):
return ""
diff --git a/source/Image.py b/source/Image.py
index 5577f65..f614c7b 100644
--- a/source/Image.py
+++ b/source/Image.py
@@ -1,39 +1,18 @@
-from json import loads
from re import compile
+from .Html import Html
+
__all__ = ['Image']
class Image:
- IMAGE_INFO = compile(r'("infoList":\[\{.*?}])')
IMAGE_TOKEN = compile(
- r"http://sns-webpic-qc.xhscdn.com/\d+/\w+/(\w+)!")
+ r'"urlDefault":"http:\\u002F\\u002Fsns-webpic-qc\.xhscdn\.com\\u002F\d+?\\u002F\S+?\\u002F(\S+?)!')
def get_image_link(self, html: str) -> list:
- data = self.__extract_image_data(html)
- data = self.__format_image_data(data)
- return self.__extract_image_urls(data)
-
- def __extract_image_data(self, html: str) -> list[str]:
- return self.IMAGE_INFO.findall(html)
-
- @staticmethod
- def __format_image_data(data: list[str]) -> list[dict]:
- return [loads(f"{{{i}}}") for i in data]
+ return [Html.format_url(self.__generate_image_link(i))
+ for i in self.IMAGE_TOKEN.findall(html)]
@staticmethod
def __generate_image_link(token: str) -> str:
return f"https://sns-img-bd.xhscdn.com/{token}"
-
- def __extract_image_token(self, url: str) -> str:
- return self.__generate_image_link(token.group(1)) if (
- token := self.IMAGE_TOKEN.search(url)) else ""
-
- def __extract_image_urls(self, data: list[dict]) -> list[str]:
- urls = []
- for i in data:
- for j in i.get("infoList", []):
- if j.get("imageScene", "") == "WB_DFT":
- urls.append(self.__extract_image_token(j.get("url", "")))
- break
- return [i for i in urls if i]
diff --git a/source/Manager.py b/source/Manager.py
index 3b27ffe..9b5f7cf 100644
--- a/source/Manager.py
+++ b/source/Manager.py
@@ -18,10 +18,15 @@ class Manager:
path: str,
folder: str,
user_agent: str,
+ chunk: int,
cookie: str,
+ proxy: str,
+ timeout: int,
retry: int,
record_data: bool,
image_format: str,
+ video_format: str,
+ folder_mode: bool,
):
self.root = root
self.temp = root.joinpath("./temp")
@@ -37,8 +42,13 @@ class Manager:
"-bcc2-a859e97518bf; unread={%22ub%22:%22655eb3d60000000032033955%22%2C%22ue%22:%22656"
"e9ef2000000003801ff3d%22%2C%22uc%22:29}; cache_feeds=[]"}
self.retry = retry
+ self.chunk = chunk
self.record_data = record_data
self.image_format = image_format
+ self.video_format = video_format
+ self.folder_mode = folder_mode
+ self.timeout = timeout
+ self.proxy = proxy
def __init_root(self, root: Path, path: str, folder: str) -> Path:
if path and (r := Path(path)).is_dir():
@@ -57,6 +67,10 @@ class Manager:
def delete(path: Path):
path.unlink()
+ @staticmethod
+ def archive(root: Path, name: str, folder_mode: bool) -> Path:
+ return root.joinpath(name) if folder_mode else root
+
@staticmethod
def move(temp: Path, path: Path):
move(temp.resolve(), path.resolve())
diff --git a/source/Settings.py b/source/Settings.py
index b3e4964..481eada 100644
--- a/source/Settings.py
+++ b/source/Settings.py
@@ -12,12 +12,14 @@ class Settings:
"folder_name": "Download",
"user_agent": "",
"cookie": "",
- "proxy": "",
+ "proxy": None,
"timeout": 10,
"chunk": 1024 * 1024,
"max_retry": 5,
"record_data": False,
"image_format": "webp",
+ "video_format": "mp4",
+ "folder_mode": False,
}
encode = "UTF-8-SIG" if system() == "Windows" else "UTF-8"
diff --git a/source/Static.py b/source/Static.py
new file mode 100644
index 0000000..4f73a64
--- /dev/null
+++ b/source/Static.py
@@ -0,0 +1,53 @@
+from pathlib import Path
+
+__all__ = [
+ "VERSION_MAJOR",
+ "VERSION_MINOR",
+ "VERSION_BETA",
+ "ROOT",
+ "REPOSITORY",
+ "LICENCE",
+ "RELEASES",
+ "MASTER",
+ "PROMPT",
+ "GENERAL",
+ "PROGRESS",
+ "ERROR",
+ "WARNING",
+ "INFO",
+ "DISCLAIMER_TEXT"
+]
+
+VERSION_MAJOR = 1
+VERSION_MINOR = 7
+VERSION_BETA = False
+ROOT = Path(__file__).resolve().parent.parent
+
+REPOSITORY = "https://github.com/JoeanAmier/XHS-Downloader"
+LICENCE = "GNU General Public License v3.0"
+RELEASES = "https://github.com/JoeanAmier/XHS-Downloader/releases/latest"
+DISCLAIMER_TEXT = (
+ "关于 XHS-Downloader 的 免责声明:",
+ "",
+ "1. 使用者对本项目的使用由使用者自行决定,并自行承担风险。作者对使用者使用本项目所产生的任何损失、责任、或风险概不负责。",
+ "2. 本项目的作者提供的代码和功能是基于现有知识和技术的开发成果。作者尽力确保代码的正确性和安全性,但不保证代码完全没有错误或缺陷。",
+ "3. 使用者在使用本项目时必须严格遵守 GNU General Public License v3.0 的要求,并在适当的地方注明使用了 GNU General Public License v3.0 的代码。",
+ "4. 使用者在任何情况下均不得将本项目的作者、贡献者或其他相关方与使用者的使用行为联系起来,或要求其对使用者使用本项目所产生的任何损失或损害负责。",
+ "5. 使用者在使用本项目的代码和功能时,必须自行研究相关法律法规,并确保其使用行为合法合规。任何因违反法律法规而导致的法律责任和风险,均由使用者自行承担。",
+ "6. 本项目的作者不会提供 XHS-Downloader 项目的付费版本,也不会提供与 XHS-Downloader 项目相关的任何商业服务。",
+ "7. 基于本项目进行的任何二次开发、修改或编译的程序与原创作者无关,原创作者不承担与二次开发行为或其结果相关的任何责任,使用者应自行对因"
+ "二次开发可能带来的各种情况负全部责任。",
+ "",
+ "在使用本项目的代码和功能之前,请您认真考虑并接受以上免责声明。如果您对上述声明有任何疑问或不同意,请不要使用本项目的代码和功能。如果"
+ "您使用了本项目的代码和功能,则视为您已完全理解并接受上述免责声明,并自愿承担使用本项目的一切风险和后果。",
+ "",
+ ">" * 50,
+)
+
+MASTER = "b #fff200"
+PROMPT = "b turquoise2"
+GENERAL = "b bright_white"
+PROGRESS = "b bright_magenta"
+ERROR = "b bright_red"
+WARNING = "b bright_yellow"
+INFO = "b bright_green"
diff --git a/source/TUI.py b/source/TUI.py
new file mode 100644
index 0000000..85765dd
--- /dev/null
+++ b/source/TUI.py
@@ -0,0 +1,148 @@
+from pyperclip import paste
+from rich.text import Text
+from textual.app import App
+from textual.app import ComposeResult
+from textual.binding import Binding
+from textual.containers import Center
+from textual.containers import HorizontalScroll
+from textual.containers import ScrollableContainer
+from textual.widgets import Button
+from textual.widgets import Footer
+from textual.widgets import Header
+from textual.widgets import Input
+from textual.widgets import Label
+from textual.widgets import ProgressBar
+from textual.widgets import RichLog
+
+from .App import XHS
+from .Settings import Settings
+from .Static import (
+ VERSION_MAJOR,
+ VERSION_MINOR,
+ VERSION_BETA,
+ ROOT,
+ PROMPT,
+ MASTER,
+ ERROR,
+ WARNING,
+ INFO,
+ LICENCE,
+ REPOSITORY,
+ RELEASES,
+ GENERAL,
+ DISCLAIMER_TEXT,
+)
+
+
+def show_state(function):
+ async def inner(self, *args, **kwargs):
+ self.close_show()
+ self.bar.update(total=100, progress=100)
+ result = await function(self, *args, **kwargs)
+ self.bar.update(total=None)
+ self.tip.write(Text(">" * 50, style=GENERAL))
+ return result
+
+ return inner
+
+
+class XHSDownloader(App):
+ CSS_PATH = ROOT.joinpath(
+ "static/XHS-Downloader.tcss")
+ BINDINGS = [
+ Binding(key="q", action="quit", description="退出程序"),
+ # ("d", "toggle_dark", "切换主题"),
+ Binding(key="u", action="check_update", description="检查更新"),
+ ]
+
+ def __init__(self):
+ super().__init__()
+ self.APP = XHS(**Settings(ROOT).run())
+ self.url = None
+ self.tip = None
+ self.bar = None
+ self.show = True
+
+ async def __aenter__(self):
+ await self.APP.__aenter__()
+ return self
+
+ async def __aexit__(self, exc_type, exc_value, traceback):
+ await self.APP.__aexit__(exc_type, exc_value, traceback)
+
+ def compose(self) -> ComposeResult:
+ yield Header()
+ yield ScrollableContainer(Label(Text(f"开源协议:{LICENCE}", style=MASTER)),
+ Label(
+ Text(
+ f"项目地址:{REPOSITORY}",
+ style=MASTER)),
+ Label(Text("请输入小红书图文/视频作品链接:",
+ style=PROMPT), id="prompt"),
+ Input(placeholder="多个链接之间使用空格分隔"),
+ HorizontalScroll(Button("下载无水印图片/视频", id="deal"),
+ Button("读取剪贴板", id="paste"),
+ Button("清空输入框", id="reset"), ),
+ # Label(Text("准备就绪", style=INFO), id="state"),
+ )
+ with Center():
+ yield ProgressBar(total=None, show_percentage=False, show_eta=False)
+ yield RichLog(markup=True)
+ yield Footer()
+
+ def on_mount(self) -> None:
+ self.title = f"XHS-Downloader V{VERSION_MAJOR}.{
+ VERSION_MINOR}{" Beta" if VERSION_BETA else ""}"
+
+ def on_ready(self) -> None:
+ self.url = self.query_one(Input)
+ self.tip = self.query_one(RichLog)
+ self.bar = self.query_one(ProgressBar)
+ self.tip.write(Text("\n".join(DISCLAIMER_TEXT), style=MASTER))
+
+ def close_show(self):
+ if self.show:
+ self.tip.clear()
+ self.show = False
+
+ async def on_button_pressed(self, event: Button.Pressed) -> None:
+ if event.button.id == "deal":
+ await self.deal()
+ elif event.button.id == "reset":
+ self.query_one(Input).value = ""
+ elif event.button.id == "paste":
+ self.query_one(Input).value = paste()
+
+ @show_state
+ async def deal(self):
+ if not self.url.value:
+ self.tip.write(Text("未输入任何小红书作品链接!", style=WARNING))
+ return
+ if any(await self.APP.extract(self.url.value, True, log=self.tip)):
+ self.url.value = ""
+ else:
+ self.tip.write(Text("下载小红书作品文件失败!", style=ERROR))
+
+ @show_state
+ async def action_check_update(self):
+ self.tip.write(Text("正在检查新版本,请稍等...", style=WARNING))
+ try:
+ url = await self.APP.html.request_url(RELEASES, False)
+ latest_major, latest_minor = map(
+ int, url.split("/")[-1].split(".", 1))
+ if latest_major > VERSION_MAJOR or latest_minor > VERSION_MINOR:
+ self.tip.write(
+ Text(
+ f"检测到新版本:{latest_major}.{latest_minor}",
+ style=WARNING))
+ self.tip.write(RELEASES)
+ elif latest_minor == VERSION_MINOR and VERSION_BETA:
+ self.tip.write(
+ Text("当前版本为开发版, 可更新至正式版!", style=WARNING))
+ self.tip.write(RELEASES)
+ elif VERSION_BETA:
+ self.tip.write(Text("当前已是最新开发版!", style=WARNING))
+ else:
+ self.tip.write(Text("当前已是最新正式版!", style=INFO))
+ except ValueError:
+ self.tip.write(Text("检测新版本失败!", style=ERROR))
diff --git a/source/__init__.py b/source/__init__.py
index a355081..3cf8dd0 100644
--- a/source/__init__.py
+++ b/source/__init__.py
@@ -1,247 +1,4 @@
-from pathlib import Path
-from re import compile
-
-from pyperclip import paste
-from rich.text import Text
-from textual.app import App
-from textual.app import ComposeResult
-from textual.binding import Binding
-# from textual.containers import Center
-from textual.containers import HorizontalScroll
-from textual.containers import ScrollableContainer
-from textual.widgets import Button
-from textual.widgets import Footer
-from textual.widgets import Header
-from textual.widgets import Input
-from textual.widgets import Label
-# from textual.widgets import ProgressBar
-from textual.widgets import RichLog
-
-from .Downloader import Download
-from .Explore import Explore
-from .Html import Html
-from .Image import Image
-from .Manager import Manager
-from .Settings import Settings
-from .Video import Video
+from .App import XHS
+from .TUI import XHSDownloader
__all__ = ['XHS', 'XHSDownloader']
-
-RELEASES = "https://github.com/JoeanAmier/XHS-Downloader/releases/latest"
-VERSION = 1.7
-BETA = True
-ROOT = Path(__file__).resolve().parent.parent
-
-
-class XHS:
- LINK = compile(r"https://www\.xiaohongshu\.com/explore/[a-z0-9]+")
- SHARE = compile(r"https://www\.xiaohongshu\.com/discovery/item/[a-z0-9]+")
- SHORT = compile(r"https://xhslink\.com/[A-Za-z0-9]+")
- __INSTANCE = None
- TYPE = {
- "视频": "v",
- "图文": "n",
- }
-
- def __new__(cls, *args, **kwargs):
- if not cls.__INSTANCE:
- cls.__INSTANCE = super().__new__(cls)
- return cls.__INSTANCE
-
- def __init__(
- self,
- path="",
- folder_name="Download",
- user_agent: str = None,
- cookie: str = None,
- proxy: str = "",
- timeout=10,
- chunk=1024 * 1024,
- max_retry=5,
- record_data=False,
- image_format="webp",
- **kwargs,
- ):
- self.manager = Manager(
- ROOT,
- path,
- folder_name,
- user_agent,
- cookie,
- max_retry,
- record_data,
- image_format,
- )
- self.html = Html(
- self.manager.headers,
- proxy,
- timeout,
- self.manager.retry)
- self.image = Image()
- self.video = Video()
- self.explore = Explore()
- self.download = Download(
- self.manager,
- proxy,
- chunk,
- timeout, )
- self.rich_log = self.download.rich_log
-
- def __extract_image(self, container: dict, html: str):
- container["下载地址"] = self.image.get_image_link(html)
-
- def __extract_video(self, container: dict, html: str):
- container["下载地址"] = self.video.get_video_link(html)
-
- async def __download_files(self, container: dict, download: bool, log, bar):
- name = self.__naming_rules(container)
- if download and (u := container["下载地址"]):
- await self.download.run(u, name, self.TYPE[container["作品类型"]], log, bar)
- self.manager.save_data(name, container)
-
- async def extract(self, url: str, download=False, log=None, bar=None) -> list[dict]:
- # return # 调试代码
- urls = await self.__extract_links(url)
- if not urls:
- self.rich_log(log, "提取小红书作品链接失败", "bright_red")
- else:
- self.rich_log(log, f"共 {len(urls)} 个小红书作品待处理")
- # return urls # 调试代码
- return [await self.__deal_extract(i, download, log, bar) for i in urls]
-
- async def __extract_links(self, url: str) -> list:
- urls = []
- for i in url.split():
- if u := self.SHORT.search(i):
- i = await self.html.request_url(
- u.group(), False)
- if u := self.SHARE.search(i):
- urls.append(u.group())
- elif u := self.LINK.search(i):
- urls.append(u.group())
- return urls
-
- async def __deal_extract(self, url: str, download: bool, log, bar):
- self.rich_log(log, f"开始处理:{url}")
- html = await self.html.request_url(url)
- # self.rich_log(log, html) # 调试代码
- if not html:
- self.rich_log(log, f"{url} 获取数据失败", "bright_red")
- return {}
- data = self.explore.run(html)
- # self.rich_log(log, data) # 调试代码
- if not data:
- self.rich_log(log, f"{url} 提取数据失败", "bright_red")
- return {}
- match data["作品类型"]:
- case "视频":
- self.__extract_video(data, html)
- case "图文":
- self.__extract_image(data, html)
- case _:
- data["下载地址"] = []
- await self.__download_files(data, download, log, bar)
- self.rich_log(log, f"完成处理:{url}")
- return data
-
- def __naming_rules(self, data: dict) -> str:
- """下载文件默认使用 作品标题 或 作品 ID 作为文件名称,可修改此方法自定义文件名称格式"""
- return self.manager.filter_name(data["作品标题"]) or data["作品ID"]
-
- async def __aenter__(self):
- return self
-
- async def __aexit__(self, exc_type, exc_value, traceback):
- await self.close()
-
- async def close(self):
- self.manager.clean()
- await self.html.session.close()
- await self.download.session.close()
-
-
-class XHSDownloader(App):
- CSS_PATH = ROOT.joinpath(
- "static/XHS-Downloader.tcss")
- BINDINGS = [
- Binding(key="q", action="quit", description="退出程序"),
- ("d", "toggle_dark", "切换主题"),
- Binding(key="u", action="check_update", description="检查更新"),
- ]
-
- def __init__(self):
- super().__init__()
- self.APP = XHS(**Settings(ROOT).run())
- self.url = None
- self.log_ = None
- self.bar = None
-
- async def __aenter__(self):
- await self.APP.__aenter__()
- return self
-
- async def __aexit__(self, exc_type, exc_value, traceback):
- await self.APP.__aexit__(exc_type, exc_value, traceback)
-
- def compose(self) -> ComposeResult:
- # yield LoadingIndicator()
- yield Header()
- yield ScrollableContainer(Label(Text("请输入小红书图文/视频作品链接:", style="b bright_blue")),
- Input(placeholder="多个链接之间使用空格分隔"),
- HorizontalScroll(Button("下载无水印图片/视频", id="deal"),
- Button("读取剪贴板", id="paste"),
- Button("清空输入框", id="reset"), ),
- # Label(Text("程序状态", style="b bright_blue")),
- )
- # with Center():
- # yield ProgressBar(total=None)
- yield RichLog(markup=True)
- yield Footer()
-
- def on_mount(self) -> None:
- self.title = f"XHS-Downloader V{VERSION}{" Beta" if BETA else ""}"
-
- async def on_button_pressed(self, event: Button.Pressed) -> None:
- if event.button.id == "deal":
- await self.deal()
- elif event.button.id == "reset":
- self.query_one(Input).value = ""
- elif event.button.id == "paste":
- self.query_one(Input).value = paste()
-
- async def deal(self):
- self.__init_objects()
- if not self.url.value:
- self.log_.write(Text("未输入任何小红书作品链接", style="b bright_yellow"))
- return
- if any(await self.APP.extract(self.url.value, True, log=self.log_, bar=self.bar)):
- self.url.value = ""
- else:
- self.log_.write(Text("下载小红书作品文件失败", style="b bright_red"))
-
- def __init_objects(self):
- if any((self.url, self.log_, self.bar)):
- return
- self.url = self.query_one(Input)
- self.log_ = self.query_one(RichLog)
- # self.bar = self.query_one(ProgressBar)
-
- async def action_check_update(self):
- self.__init_objects()
- try:
- url = await self.APP.html.request_url(RELEASES, False)
- tag = float(url.split("/")[-1])
- if tag > VERSION:
- self.log_.write(
- Text(f"检测到新版本: {tag}", style="b bright_yellow"))
- self.log_.write(RELEASES)
- elif tag == VERSION and BETA:
- self.log_.write(
- Text("当前版本为开发版, 可更新至正式版", style="b bright_yellow"))
- self.log_.write(RELEASES)
- elif BETA:
- self.log_.write(Text("当前已是最新开发版", style="b bright_yellow"))
- else:
- self.log_.write(Text("当前已是最新正式版", style="b bright_green"))
- except ValueError:
- self.log_.write(Text("检测新版本失败", style="b bright_red"))
diff --git a/static/XHS-Downloader.tcss b/static/XHS-Downloader.tcss
index ff007b1..f466f13 100644
--- a/static/XHS-Downloader.tcss
+++ b/static/XHS-Downloader.tcss
@@ -11,11 +11,19 @@ Button#reset {
}
Label {
width: 100%;
- padding: 1;
content-align-horizontal: center;
content-align-vertical: middle;
text-style: bold;
}
-Bar > .bar--indeterminate {
- color: #2ed573;
+Label#prompt {
+ padding: 1;
+}
+Bar {
+ width: 33vw;
+}
+Bar > .bar--indeterminate {
+ color: #7bed9f;
+}
+Bar > .bar--complete {
+ color: #ff7f50;
}
diff --git a/static/程序运行截图.png b/static/程序运行截图.png
new file mode 100644
index 0000000..96e44c9
Binary files /dev/null and b/static/程序运行截图.png differ
diff --git a/static/程序运行截图1.png b/static/程序运行截图1.png
deleted file mode 100644
index aab11f2..0000000
Binary files a/static/程序运行截图1.png and /dev/null differ
diff --git a/static/程序运行截图2.png b/static/程序运行截图2.png
deleted file mode 100644
index 6a1f5ea..0000000
Binary files a/static/程序运行截图2.png and /dev/null differ