新增图片下载格式设置

This commit is contained in:
JoeamAmier 2024-01-04 21:22:54 +08:00
parent 8ed4e81a24
commit 48cfa60862
18 changed files with 514 additions and 417 deletions

View File

@ -9,17 +9,20 @@
<img alt="GitHub code size in bytes" src="https://img.shields.io/github/languages/code-size/JoeanAmier/XHS-Downloader?style=for-the-badge&color=73d13d">
<img alt="GitHub release (with filter)" src="https://img.shields.io/github/v/release/JoeanAmier/XHS-Downloader?style=for-the-badge&color=40a9ff">
<img alt="GitHub all releases" src="https://img.shields.io/github/downloads/JoeanAmier/XHS-Downloader/total?style=for-the-badge&color=f759ab">
<br>
<p>🔥 <b>小红书作品采集工具</b>:采集小红书作品信息;提取小红书作品下载地址;下载小红书无水印作品文件!</p>
<p>❤️ 作者仅在 GitHub 发布 XHS-Downloader未与任何个人或网站合作且没有任何收费计划</p>
</div>
<h1>📑 功能清单</h1>
<h1>📑 项目功能</h1>
<ul>
<li>✅ 采集小红书图文 / 视频作品信息</li>
<li>✅ 提取小红书图文 / 视频作品下载地址</li>
<li>✅ 下载小红书无水印图文 / 视频作品文件</li>
<li>✅ 支持 Tampermonkey 用户脚本</li>
<li>✅ 批量下载账号作品(搭配用户脚本)</li>
<li>✅ 自动跳过已下载的作品文件</li>
<li>✅ 作品文件完整性处理机制</li>
<li>✅ 自定义图文作品文件下载格式</li>
<li>✅ 持久化储存作品信息至文件</li>
<li>✅ 作品文件储存至单独文件夹</li>
<li>☑️ 后台监听剪贴板下载作品</li>
@ -54,13 +57,12 @@
<h1>💻 二次开发</h1>
<p>如果有其他需求,可以根据 <code>main.py</code> 的注释提示进行代码调用或修改!</p>
<pre>
# 测试链接
error_demo = "https://github.com/JoeanAmier/XHS_Downloader"
image_demo = "https://www.xiaohongshu.com/explore/63b275a30000000019020185"
video_demo = "https://www.xiaohongshu.com/explore/64edb460000000001f03cadc"
multiple_demo = f"{image_demo} {video_demo}"
# 示例链接
error_link = "https://github.com/JoeanAmier/XHS_Downloader"
demo_link = "https://www.xiaohongshu.com/explore/xxxxxxxxxx"
multiple_links = f"{demo_link} {demo_link} {demo_link}"
# 实例对象
path = "" # 作品数据/文件保存根路径,默认值:项目根路径
work_path = "D:\\" # 作品数据/文件保存根路径,默认值:项目根路径
folder_name = "Download" # 作品文件储存文件夹名称自动创建默认值Download
user_agent = "" # 请求头 User-Agent
cookie = "" # 小红书网页版 Cookie无需登录
@ -69,11 +71,11 @@ timeout = 5 # 请求数据超时限制单位默认值10
chunk = 1024 * 1024 * 10 # 下载文件时,每次从服务器获取的数据块大小,单位:字节
max_retry = 2 # 请求数据失败时重试的最大次数单位默认值5
record_data = False # 是否记录作品数据至文件
image_format = "jpg" # 图文作品文件名称后缀
image_format = "WEBP" # 图文作品文件下载格式支持PNG、WEBP
folder_mode = False # 是否将每个作品的文件储存至单独的文件夹
async with XHS() as xhs:
pass # 使用默认参数
async with XHS(path=path,
async with XHS(work_path=work_path,
folder_name=folder_name,
user_agent=user_agent,
cookie=cookie,
@ -87,10 +89,9 @@ async with XHS(path=path,
) as xhs: # 使用自定义参数
download = True # 是否下载作品文件默认值False
# 返回作品详细信息,包括下载地址
print(await xhs.extract(error_demo, download)) # 获取数据失败时返回空字典
print(await xhs.extract(image_demo, download))
print(await xhs.extract(video_demo, download))
print(await xhs.extract(multiple_demo, download)) # 支持传入多个作品链接
print(await xhs.extract(error_link, download)) # 获取数据失败时返回空字典
print(await xhs.extract(demo_link, download))
print(await xhs.extract(multiple_links, download)) # 支持传入多个作品链接
</pre>
<h1>⚙️ 配置文件</h1>
<p>项目根目录下的 <code>settings.json</code> 文件,首次运行自动生成,可以自定义部分运行参数。</p>
@ -106,7 +107,7 @@ async with XHS(path=path,
</thead>
<tbody>
<tr>
<td align="center">path</td>
<td align="center">work_path</td>
<td align="center">str</td>
<td align="center">作品数据 / 文件保存根路径</td>
<td align="center">项目根路径</td>
@ -162,14 +163,8 @@ async with XHS(path=path,
<tr>
<td align="center">image_format</td>
<td align="center">str</td>
<td align="center">图文作品文件名称后缀,不影响实际文件格式,仅在无法判断文件类型时生效</td>
<td align="center">webp</td>
</tr>
<tr>
<td align="center">video_format</td>
<td align="center">str</td>
<td align="center">视频作品文件名称后缀,不影响实际文件格式,仅在无法判断文件类型时生效</td>
<td align="center">mp4</td>
<td align="center">图文作品文件下载格式,支持:<code>PNG</code><code>WEBP</code></td>
<td align="center">PNG</td>
</tr>
<tr>
<td align="center">folder_mode</td>

22
main.py
View File

@ -6,13 +6,12 @@ from source import XHSDownloader
async def example():
"""通过代码设置参数,适合二次开发"""
# 测试链接
error_demo = "https://github.com/JoeanAmier/XHS_Downloader"
image_demo = "https://www.xiaohongshu.com/explore/63b275a30000000019020185"
video_demo = "https://www.xiaohongshu.com/explore/64edb460000000001f03cadc"
multiple_demo = f"{image_demo} {video_demo}"
# 示例链接
error_link = "https://github.com/JoeanAmier/XHS_Downloader"
demo_link = "https://www.xiaohongshu.com/explore/xxxxxxxxxx"
multiple_links = f"{demo_link} {demo_link} {demo_link}"
# 实例对象
path = "" # 作品数据/文件保存根路径,默认值:项目根路径
work_path = "D:\\" # 作品数据/文件保存根路径,默认值:项目根路径
folder_name = "Download" # 作品文件储存文件夹名称自动创建默认值Download
user_agent = "" # 请求头 User-Agent
cookie = "" # 小红书网页版 Cookie无需登录
@ -21,11 +20,11 @@ async def example():
chunk = 1024 * 1024 * 10 # 下载文件时,每次从服务器获取的数据块大小,单位:字节
max_retry = 2 # 请求数据失败时重试的最大次数单位默认值5
record_data = False # 是否记录作品数据至文件
image_format = "jpg" # 图文作品文件名称后缀
image_format = "WEBP" # 图文作品文件下载格式支持PNG、WEBP
folder_mode = False # 是否将每个作品的文件储存至单独的文件夹
async with XHS() as xhs:
pass # 使用默认参数
async with XHS(path=path,
async with XHS(work_path=work_path,
folder_name=folder_name,
user_agent=user_agent,
cookie=cookie,
@ -39,10 +38,9 @@ async def example():
) as xhs: # 使用自定义参数
download = True # 是否下载作品文件默认值False
# 返回作品详细信息,包括下载地址
print(await xhs.extract(error_demo, download)) # 获取数据失败时返回空字典
print(await xhs.extract(image_demo, download))
print(await xhs.extract(video_demo, download))
print(await xhs.extract(multiple_demo, download)) # 支持传入多个作品链接
print(await xhs.extract(error_link, download)) # 获取数据失败时返回空字典
print(await xhs.extract(demo_link, download))
print(await xhs.extract(multiple_links, download)) # 支持传入多个作品链接
async def main():

View File

@ -1,3 +1,5 @@
aiohttp>=3.9.0
textual>=0.40.0
pyperclip>=1.8.2
lxml>=4.9.3
PyYAML>=6.0.1

View File

@ -1,5 +1,7 @@
from re import compile
from .Converter import Converter
from .Converter import Namespace
from .Downloader import Download
from .Explore import Explore
from .Html import Html
@ -10,18 +12,17 @@ from .Static import (
ERROR,
WARNING,
)
from .Tools import logging
from .Video import Video
__all__ = ["XHS"]
class XHS:
LINK = compile(r"https?://www\.xiaohongshu\.com/explore/[a-z0-9]+")
SHARE = compile(r"https?://www\.xiaohongshu\.com/discovery/item/[a-z0-9]+")
SHORT = compile(r"https?://xhslink\.com/[A-Za-z0-9]+")
__INSTANCE = None
TYPE = {
"视频": "v",
"图文": "n",
}
def __new__(cls, *args, **kwargs):
if not cls.__INSTANCE:
@ -30,7 +31,7 @@ class XHS:
def __init__(
self,
path="",
work_path="",
folder_name="Download",
user_agent: str = None,
cookie: str = None,
@ -39,13 +40,12 @@ class XHS:
chunk=1024 * 1024,
max_retry=5,
record_data=False,
image_format="webp",
video_format="mp4",
image_format="PNG",
folder_mode=False,
):
self.manager = Manager(
ROOT,
path,
work_path,
folder_name,
user_agent,
chunk,
@ -55,46 +55,47 @@ class XHS:
max_retry,
record_data,
image_format,
video_format,
folder_mode,
)
self.html = Html(self.manager)
self.image = Image()
self.video = Video()
self.explore = Explore()
self.download = Download(self.manager, )
self.rich_log = self.download.rich_log
self.convert = Converter()
self.download = Download(self.manager)
def __extract_image(self, container: dict, html: str):
container["下载地址"] = self.image.get_image_link(html)
def __extract_image(self, container: dict, data: Namespace):
container["下载地址"] = self.image.get_image_link(
data, self.manager.image_format)
def __extract_video(self, container: dict, html: str):
container["下载地址"] = self.video.get_video_link(html)
def __extract_video(self, container: dict, data: Namespace):
container["下载地址"] = self.video.get_video_link(data)
async def __download_files(self, container: dict, download: bool, log, bar):
name = self.__naming_rules(container)
path = self.manager.folder
if (u := container["下载地址"]) and download:
await self.download.run(u, name, self.TYPE[container["作品类型"]], log, bar)
path = await self.download.run(u, name, container["作品类型"], log, bar)
elif not u:
self.rich_log(log, "提取作品文件下载地址失败!", ERROR)
self.manager.save_data(name, container)
logging(log, "提取作品文件下载地址失败!", ERROR)
self.manager.save_data(path, name, container)
async def extract(self, url: str, download=False, log=None, bar=None) -> list[dict]:
# return # 调试代码
urls = await self.__extract_links(url)
urls = await self.__extract_links(url, log)
if not urls:
self.rich_log(log, "提取小红书作品链接失败!", WARNING)
logging(log, "提取小红书作品链接失败!", WARNING)
else:
self.rich_log(log, f"{len(urls)} 个小红书作品待处理...")
logging(log, f"{len(urls)} 个小红书作品待处理...")
# return urls # 调试代码
return [await self.__deal_extract(i, download, log, bar) for i in urls]
async def __extract_links(self, url: str) -> list:
async def __extract_links(self, url: str, log) -> list:
urls = []
for i in url.split():
if u := self.SHORT.search(i):
i = await self.html.request_url(
u.group(), False)
u.group(), False, log)
if u := self.SHARE.search(i):
urls.append(u.group())
elif u := self.LINK.search(i):
@ -102,28 +103,33 @@ class XHS:
return urls
async def __deal_extract(self, url: str, download: bool, log, bar):
self.rich_log(log, f"开始处理作品:{url}")
html = await self.html.request_url(url)
# self.rich_log(log, html) # 调试代码
logging(log, f"开始处理作品:{url}")
html = await self.html.request_url(url, log=log)
# logging(log, html) # 调试代码
if not html:
self.rich_log(log, f"{url} 获取数据失败!", ERROR)
logging(log, f"{url} 获取数据失败!", ERROR)
return {}
data = self.explore.run(html)
# self.rich_log(log, data) # 调试代码
namespace = self.__generate_data_object(html)
data = self.explore.run(namespace)
# logging(log, data) # 调试代码
if not data:
self.rich_log(log, f"{url} 提取数据失败!", ERROR)
logging(log, f"{url} 提取数据失败!", ERROR)
return {}
match data["作品类型"]:
case "视频":
self.__extract_video(data, html)
self.__extract_video(data, namespace)
case "图文":
self.__extract_image(data, html)
self.__extract_image(data, namespace)
case _:
data["下载地址"] = []
await self.__download_files(data, download, log, bar)
self.rich_log(log, f"作品处理完成:{url}")
logging(log, f"作品处理完成:{url}")
return data
def __generate_data_object(self, html: str) -> Namespace:
data = self.convert.run(html)
return Namespace(data)
def __naming_rules(self, data: dict) -> str:
"""下载文件默认使用 作品标题 或 作品 ID 作为文件名称,可修改此方法自定义文件名称格式"""
return self.manager.filter_name(data["作品标题"]) or data["作品ID"]
@ -135,6 +141,4 @@ class XHS:
await self.close()
async def close(self):
self.manager.clean()
await self.html.session.close()
await self.download.session.close()
await self.manager.close()

116
source/Converter.py Normal file
View File

@ -0,0 +1,116 @@
from copy import deepcopy
from types import SimpleNamespace
from lxml.etree import HTML
from yaml import safe_load
__all__ = ["Converter", "Namespace"]
class Converter:
INITIAL_STATE = "(//script)[last()]/text()"
KEYS_LINK = (
"note",
"noteDetailMap",
"[-1]",
"note",
)
def run(self, content: str) -> dict:
return self.__filter_object(
self.__convert_object(
self.__extract_object(content)))
def __extract_object(self, html: str) -> str:
html_tree = HTML(html)
return d[0] if (d := html_tree.xpath(self.INITIAL_STATE)) else ""
@staticmethod
def __convert_object(text: str) -> dict:
return safe_load(text.lstrip("window.__INITIAL_STATE__="))
@classmethod
def __filter_object(cls, data: dict) -> dict:
return cls.deep_get(data, cls.KEYS_LINK) or {}
@classmethod
def deep_get(cls, data: dict, keys: list | tuple, default=None):
try:
for key in keys:
if key.startswith("[") and key.endswith("]"):
data = cls.safe_get(data, int(key[1:-1]))
else:
data = data[key]
return data
except (KeyError, IndexError, ValueError):
return default
@staticmethod
def safe_get(data: dict | list | tuple | set, index: int):
if isinstance(data, dict):
return list(data.values())[index]
elif isinstance(data, list | tuple | set):
return data[index]
raise TypeError
class Namespace:
def __init__(self, data: dict):
self.data = self.generate_data_object(data)
@staticmethod
def generate_data_object(data: dict) -> SimpleNamespace:
def depth_conversion(element):
if isinstance(element, dict):
return SimpleNamespace(
**{k: depth_conversion(v) for k, v in element.items()})
elif isinstance(element, list):
return [depth_conversion(item) for item in element]
else:
return element
return depth_conversion(data)
def safe_extract(
self,
attribute_chain: str,
default: str | int | list | dict | SimpleNamespace = ""):
return self.__safe_extract(self.data, attribute_chain, default)
@staticmethod
def __safe_extract(
data_object,
attribute_chain: str,
default: str | int | list | dict | SimpleNamespace = "", ):
data = deepcopy(data_object)
attributes = attribute_chain.split(".")
for attribute in attributes:
if "[" in attribute:
parts = attribute.split("[", 1)
attribute = parts[0]
index = parts[1].split("]", 1)[0]
try:
index = int(index)
data = getattr(data, attribute, None)[index]
except (IndexError, TypeError, ValueError):
return default
else:
data = getattr(data, attribute, None)
if not data:
return default
return data or default
@classmethod
def object_extract(
cls,
data_object: SimpleNamespace,
attribute_chain: str,
default: str | int | list | dict | SimpleNamespace = "",
):
return cls.__safe_extract(
data_object,
attribute_chain,
default, )
def __dict__(self):
return vars(self.data)

View File

@ -1,44 +1,40 @@
from pathlib import Path
from aiohttp import ClientOSError
from aiohttp import ClientPayloadError
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from aiohttp import ServerDisconnectedError
from aiohttp import ServerTimeoutError
from rich.text import Text
from aiohttp import ClientError
from .Html import retry as re_download
from .Static import ERROR, INFO
from .Manager import Manager
from .Static import ERROR
from .Tools import logging
from .Tools import retry as re_download
__all__ = ['Download']
class Download:
def __init__(self, manager, ):
def __init__(self, manager: Manager, ):
self.manager = manager
self.folder = manager.folder
self.temp = manager.temp
self.proxy = manager.proxy
self.chunk = manager.chunk
self.session = ClientSession(
headers={"User-Agent": manager.headers["User-Agent"]},
timeout=ClientTimeout(connect=manager.timeout))
self.session = manager.download_session
self.retry = manager.retry
self.folder_mode = manager.folder_mode
self.video_format = manager.video_format
self.video_format = "mp4"
self.image_format = manager.image_format
async def run(self, urls: list, name: str, type_: str, log, bar):
async def run(self, urls: list, name: str, type_: str, log, bar) -> Path:
path = self.__generate_path(name)
if type_ == "v":
await self.__download(urls[0], path, f"{name}", self.video_format, log, bar)
elif type_ == "n":
for index, url in enumerate(urls, start=1):
await self.__download(url, path, f"{name}_{index}", self.image_format, log, bar)
else:
raise ValueError
match type_:
case "视频":
await self.__download(urls[0], path, f"{name}", self.video_format, log, bar)
case "图文":
for index, url in enumerate(urls, start=1):
await self.__download(url, path, f"{name}_{index}", self.image_format, log, bar)
case _:
raise ValueError
return path
def __generate_path(self, name: str):
path = self.manager.archive(self.folder, name, self.folder_mode)
@ -54,7 +50,7 @@ class Download:
temp = self.temp.joinpath(name)
file = path.joinpath(name).with_suffix(f".{suffix}")
if self.manager.is_exists(file):
self.rich_log(log, f"{name} 已存在,跳过下载!")
logging(log, f"{name} 已存在,跳过下载!")
return True
# self.__create_progress(
# bar, int(
@ -66,17 +62,13 @@ class Download:
# self.__update_progress(bar, len(chunk))
self.manager.move(temp, file)
# self.__create_progress(bar, None)
self.rich_log(log, f"{name} 下载成功!")
logging(log, f"{name} 下载成功!")
return True
except (
ServerTimeoutError,
ServerDisconnectedError,
ClientOSError,
ClientPayloadError,
):
except ClientError as error:
self.manager.delete(temp)
# self.__create_progress(bar, None)
self.rich_log(log, f"{name} 下载失败!", ERROR)
logging(log, error, ERROR)
logging(log, f"网络异常,{name} 下载失败!", ERROR)
return False
@staticmethod
@ -93,10 +85,3 @@ class Download:
def __extract_type(content: str) -> str:
return "" if content == "application/octet-stream" else content.split(
"/")[-1]
@staticmethod
def rich_log(log, text, style=INFO):
if log:
log.write(Text(text, style=style))
else:
print(Text(text, style=style))

View File

@ -1,25 +1,18 @@
from datetime import datetime
from json import loads
from re import compile
from .Converter import Namespace
__all__ = ['Explore']
class Explore:
explore_data = compile(
r'"currentTime":\d{13},"note":(.*?)}},"serverRequestInfo"')
time_format = "%Y-%m-%d %H:%M:%S"
explore_type = {"video": "视频", "normal": "图文"}
def run(self, html: str) -> dict:
data = self.__get_json_data(html)
def run(self, data: Namespace) -> dict:
return self.__extract_data(data)
def __get_json_data(self, html: str) -> dict:
data = self.explore_data.search(html)
return loads(data.group(1)) if data else {}
def __extract_data(self, data: dict) -> dict:
def __extract_data(self, data: Namespace) -> dict:
result = {}
if data:
self.__extract_interact_info(result, data)
@ -30,38 +23,39 @@ class Explore:
return result
@staticmethod
def __extract_interact_info(container: dict, data: dict):
interact_info = data.get("interactInfo", {})
container["收藏数量"] = interact_info.get("collectedCount")
container["评论数量"] = interact_info.get("commentCount")
container["分享数量"] = interact_info.get("shareCount")
container["点赞数量"] = interact_info.get("likedCount")
def __extract_interact_info(container: dict, data: Namespace) -> None:
container["收藏数量"] = data.safe_extract(
"interactInfo.collectedCount", -1)
container["评论数量"] = data.safe_extract("interactInfo.commentCount", -1)
container["分享数量"] = data.safe_extract("interactInfo.shareCount", -1)
container["点赞数量"] = data.safe_extract("interactInfo.likedCount", -1)
@staticmethod
def __extract_tags(container: dict, data: dict):
tags = data.get("tagList", [])
container["作品标签"] = [i.get("name", "") for i in tags]
def __extract_tags(container: dict, data: Namespace):
tags = data.safe_extract("tagList", [])
container["作品标签"] = [Namespace.object_extract(i, "name") for i in tags]
def __extract_info(self, container: dict, data: dict):
container["作品ID"] = data.get("noteId")
container["作品标题"] = data.get("title")
container["作品描述"] = data.get("desc")
container["作品类型"] = self.explore_type.get(data.get("type"), "未知")
container["IP归属地"] = data.get("ipLocation")
def __extract_info(self, container: dict, data: Namespace):
container["作品ID"] = data.safe_extract("noteId")
container["作品标题"] = data.safe_extract("title")
container["作品描述"] = data.safe_extract("desc")
container["作品类型"] = self.explore_type.get(
data.safe_extract("type"), "未知")
container["IP归属地"] = data.safe_extract("ipLocation")
def __extract_time(self, container: dict, data: dict):
def __extract_time(self, container: dict, data: Namespace):
container["发布时间"] = datetime.fromtimestamp(
time /
1000).strftime(
self.time_format) if (
time := data.get("time")) else "未知"
time := data.safe_extract("time")) else "未知"
container["最后更新时间"] = datetime.fromtimestamp(
last /
1000).strftime(
self.time_format) if (last := data.get("lastUpdateTime")) else "未知"
self.time_format) if (
last := data.safe_extract("lastUpdateTime")) else "未知"
@staticmethod
def __extract_user(container: dict, data: dict):
user = data.get("user", {})
container["作者昵称"] = user.get("nickname")
container["作者ID"] = user.get("userId")
def __extract_user(container: dict, data: Namespace):
container["作者昵称"] = data.safe_extract("user.nickname")
container["作者ID"] = data.safe_extract("user.userId")

5
source/Extend.py Normal file
View File

@ -0,0 +1,5 @@
__all__ = ["Account"]
class Account:
pass

View File

@ -1,53 +1,35 @@
from aiohttp import ClientOSError
from aiohttp import ClientPayloadError
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from aiohttp import ServerDisconnectedError
from aiohttp import ServerTimeoutError
from aiohttp import ClientError
__all__ = ["Html", "retry"]
from .Manager import Manager
from .Static import ERROR
from .Tools import logging
from .Tools import retry
def retry(function):
async def inner(self, *args, **kwargs):
if result := await function(self, *args, **kwargs):
return result
for _ in range(self.retry):
if result := await function(self, *args, **kwargs):
return result
return result
return inner
__all__ = ["Html"]
class Html:
def __init__(self, manager, ):
def __init__(self, manager: Manager, ):
self.proxy = manager.proxy
self.session = ClientSession(
headers=manager.headers | {
"Referer": "https://www.xiaohongshu.com/", },
timeout=ClientTimeout(connect=manager.timeout),
)
self.retry = manager.retry
self.session = manager.request_session
@retry
async def request_url(
self,
url: str,
text=True, ) -> str:
content=True,
log=None,
) -> str:
try:
async with self.session.get(
url,
proxy=self.proxy,
) as response:
return await response.text() if text else str(response.url)
except (
ServerTimeoutError,
ServerDisconnectedError,
ClientOSError,
ClientPayloadError,
):
return await response.text() if content else str(response.url)
except ClientError as error:
logging(log, error, ERROR)
logging(log, f"网络异常,请求 {url} 失败!", ERROR)
return ""
@staticmethod

View File

@ -1,18 +1,42 @@
from re import compile
from .Converter import Namespace
from .Html import Html
__all__ = ['Image']
class Image:
IMAGE_TOKEN = compile(
r'"urlDefault":"http:\\u002F\\u002Fsns-webpic-qc\.xhscdn\.com\\u002F\d+?\\u002F\S+?\\u002F(\S+?)!')
def get_image_link(self, html: str) -> list:
return [Html.format_url(self.__generate_image_link(i))
for i in self.IMAGE_TOKEN.findall(html)]
@classmethod
def get_image_link(cls, data: Namespace, format_: str) -> list:
images = data.safe_extract("imageList", [])
match format_:
case "png":
return [
Html.format_url(
cls.__generate_png_link(
cls.__extract_png_token(Namespace.object_extract(
i,
"urlDefault")))) for i in images]
case "webp":
return [
Html.format_url(
cls.__generate_webp_link(
cls.__extract_webp_token(Namespace.object_extract(
i,
"urlDefault")))) for i in images]
raise ValueError
@staticmethod
def __generate_image_link(token: str) -> str:
def __generate_webp_link(token: str) -> str:
return f"https://sns-img-bd.xhscdn.com/{token}"
@staticmethod
def __generate_png_link(token: str) -> str:
return f"https://ci.xiaohongshu.com/{token}?imageView2/2/w/format/png"
@staticmethod
def __extract_webp_token(url: str) -> str:
return "/".join(url.split("/")[5:]).split("!")[0]
@staticmethod
def __extract_png_token(url: str) -> str:
return url.split("/")[-1].split("!")[0]

View File

@ -6,6 +6,12 @@ from re import sub
from shutil import move
from shutil import rmtree
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from .Static import COOKIE
from .Static import USERAGENT
__all__ = ["Manager"]
@ -25,39 +31,55 @@ class Manager:
retry: int,
record_data: bool,
image_format: str,
video_format: str,
folder_mode: bool,
):
self.root = root
self.temp = root.joinpath("./temp")
self.folder = self.__init_root(root, path, folder)
self.path = self.__check_path(path)
self.folder = self.__check_folder(folder)
self.headers = {
"User-Agent": user_agent or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gec"
"ko) Chrome/120.0.0.0 Safari/537.36",
"Cookie": cookie or "abRequestId=54c534bb-a2c6-558f-8e03-5b4c5c45635c; xsecappid=xhs-pc-web; a1=18c286a400"
"4jy56qvzejvp631col0hd3032h4zjez50000106381; webId=779c977da3a15b5623015be94bdcc9e9; g"
"id=yYSJYK0qDW8KyYSJYK048quV84Vv2KAhudVhJduUKqySlx2818xfq4888y8KqYy8y2y2f8Jy; web_sess"
"ion=030037a259ce5f15c8d560dc12224a9fdc2ed1; webBuild=3.19.4; websectiga=984412fef754c"
"018e472127b8effd174be8a5d51061c991aadd200c69a2801d6; sec_poison_id=3dd48845-d604-4535"
"-bcc2-a859e97518bf; unread={%22ub%22:%22655eb3d60000000032033955%22%2C%22ue%22:%22656"
"e9ef2000000003801ff3d%22%2C%22uc%22:29}; cache_feeds=[]"}
"User-Agent": user_agent or USERAGENT,
"Cookie": cookie or COOKIE}
self.retry = retry
self.chunk = chunk
self.record_data = record_data
self.image_format = image_format
self.video_format = video_format
self.image_format = self.__check_image_format(image_format)
self.folder_mode = folder_mode
self.timeout = timeout
self.proxy = proxy
self.request_session = ClientSession(
headers=self.headers | {
"Referer": "https://www.xiaohongshu.com/", },
timeout=ClientTimeout(connect=timeout),
)
self.download_session = ClientSession(
headers={"User-Agent": self.headers["User-Agent"]},
timeout=ClientTimeout(connect=timeout))
def __init_root(self, root: Path, path: str, folder: str) -> Path:
if path and (r := Path(path)).is_dir():
root = r.joinpath(folder or "Download")
else:
root = root.joinpath(folder or "Download")
root.mkdir(exist_ok=True)
def __check_path(self, path: str) -> Path:
if not path:
return self.root
if (r := Path(path)).is_dir():
return r
return r if (r := self.__check_root_again(r)) else self.root
def __check_folder(self, folder: str) -> Path:
folder = self.path.joinpath(folder or "Download")
folder.mkdir(exist_ok=True)
self.temp.mkdir(exist_ok=True)
return root
return folder
@staticmethod
def __check_root_again(root: Path) -> bool | Path:
if root.resolve().parent.is_dir():
root.mkdir()
return root
return False
@staticmethod
def __check_image_format(image_format) -> str:
if image_format in {"png", "PNG", "webp", "WEBP"}:
return image_format.lower()
return "png"
@staticmethod
def is_exists(path: Path) -> bool:
@ -75,17 +97,17 @@ class Manager:
def move(temp: Path, path: Path):
move(temp.resolve(), path.resolve())
def clean(self):
def __clean(self):
rmtree(self.temp.resolve())
def filter_name(self, name: str) -> str:
name = self.NAME.sub("_", name)
return sub(r"_+", "_", name).strip("_")
def save_data(self, name: str, data: dict):
def save_data(self, path: Path, name: str, data: dict):
if not self.record_data:
return
with self.folder.joinpath(f"{name}.txt").open("a", encoding="utf-8") as f:
with path.joinpath(f"{name}.txt").open("a", encoding="utf-8") as f:
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = f"{
time.center(
@ -96,3 +118,8 @@ class Manager:
indent=4,
ensure_ascii=False)}\n"
f.write(content)
async def close(self):
await self.request_session.close()
await self.download_session.close()
self.__clean()

5
source/Recorder.py Normal file
View File

@ -0,0 +1,5 @@
__all__ = ["Recorder"]
class Recorder:
pass

View File

@ -8,7 +8,7 @@ __all__ = ['Settings']
class Settings:
default = {
"path": "",
"work_path": "",
"folder_name": "Download",
"user_agent": "",
"cookie": "",
@ -17,8 +17,7 @@ class Settings:
"chunk": 1024 * 1024,
"max_retry": 5,
"record_data": False,
"image_format": "webp",
"video_format": "mp4",
"image_format": "PNG",
"folder_mode": False,
}
encode = "UTF-8-SIG" if system() == "Windows" else "UTF-8"

View File

@ -17,6 +17,8 @@ __all__ = [
"INFO",
"DISCLAIMER_TEXT",
"USERSCRIPT",
"USERAGENT",
"COOKIE",
]
VERSION_MAJOR = 1
@ -47,6 +49,18 @@ DISCLAIMER_TEXT = (
USERSCRIPT = "https://raw.githubusercontent.com/JoeanAmier/XHS-Downloader/master/static/XHS-Downloader.js"
USERAGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 "
"Safari/537.36")
COOKIE = (
"abRequestId=54c534bb-a2c6-558f-8e03-5b4c5c45635c; xsecappid=xhs-pc-web; a1=18c286a400"
"4jy56qvzejvp631col0hd3032h4zjez50000106381; webId=779c977da3a15b5623015be94bdcc9e9; g"
"id=yYSJYK0qDW8KyYSJYK048quV84Vv2KAhudVhJduUKqySlx2818xfq4888y8KqYy8y2y2f8Jy; web_sess"
"ion=030037a259ce5f15c8d560dc12224a9fdc2ed1; webBuild=3.19.4; websectiga=984412fef754c"
"018e472127b8effd174be8a5d51061c991aadd200c69a2801d6; sec_poison_id=3dd48845-d604-4535"
"-bcc2-a859e97518bf; unread={%22ub%22:%22655eb3d60000000032033955%22%2C%22ue%22:%22656"
"e9ef2000000003801ff3d%22%2C%22uc%22:29}; cache_feeds=[]")
MASTER = "b #fff200"
PROMPT = "b turquoise2"
GENERAL = "b bright_white"

View File

@ -36,10 +36,12 @@ from .Static import (
USERSCRIPT,
)
__all__ = ["XHSDownloader"]
def show_state(function):
async def inner(self, *args, **kwargs):
self.close_show()
self.close_disclaimer()
self.bar.update(total=100, progress=100)
result = await function(self, *args, **kwargs)
self.bar.update(total=None)
@ -65,7 +67,7 @@ class XHSDownloader(App):
self.url = None
self.tip = None
self.bar = None
self.show = True
self.disclaimer = True
async def __aenter__(self):
await self.APP.__aenter__()
@ -104,10 +106,10 @@ class XHSDownloader(App):
self.bar = self.query_one(ProgressBar)
self.tip.write(Text("\n".join(DISCLAIMER_TEXT), style=MASTER))
def close_show(self):
if self.show:
def close_disclaimer(self):
if self.disclaimer:
self.tip.clear()
self.show = False
self.disclaimer = False
async def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "deal":
@ -131,7 +133,7 @@ class XHSDownloader(App):
async def action_check_update(self):
self.tip.write(Text("正在检查新版本,请稍等...", style=WARNING))
try:
url = await self.APP.html.request_url(RELEASES, False)
url = await self.APP.html.request_url(RELEASES, False, self.tip)
latest_major, latest_minor = map(
int, url.split("/")[-1].split(".", 1))
if latest_major > VERSION_MAJOR or latest_minor > VERSION_MINOR:

25
source/Tools.py Normal file
View File

@ -0,0 +1,25 @@
from rich.text import Text
from .Static import INFO
__all__ = ["retry", "logging"]
def retry(function):
async def inner(self, *args, **kwargs):
if result := await function(self, *args, **kwargs):
return result
for _ in range(self.retry):
if result := await function(self, *args, **kwargs):
return result
return result
return inner
def logging(log, text, style=INFO):
string = Text(text, style=style)
if log:
log.write(string)
else:
print(string)

View File

@ -1,13 +1,17 @@
from re import compile
from .Converter import Namespace
from .Html import Html
__all__ = ['Video']
class Video:
VIDEO_TOKEN = compile(r'"originVideoKey":"(\S+?)"')
VIDEO_LINK = (
"video",
"consumer",
"originVideoKey",
)
def get_video_link(self, html: str) -> list:
return [Html.format_url(f"https://sns-video-hw.xhscdn.com/{
t.group(1)}")] if (t := self.VIDEO_TOKEN.search(html)) else []
@classmethod
def get_video_link(cls, data: Namespace) -> list:
return [Html.format_url(f"https://sns-video-hw.xhscdn.com/{t}")] if (
t := data.safe_extract(".".join(cls.VIDEO_LINK))) else []

File diff suppressed because one or more lines are too long