mirror of
https://github.com/JoeanAmier/XHS-Downloader.git
synced 2025-12-26 04:48:05 +08:00
完善保存作品数据功能
This commit is contained in:
parent
af8e3cabb1
commit
84a0889c8f
18
README.md
18
README.md
@ -11,11 +11,13 @@
|
||||
<h1>📑 功能清单</h1>
|
||||
<ul>
|
||||
<li>✅ 采集小红书图文/视频作品信息</li>
|
||||
<li>✅ 提取小红书图文/视频作品文件下载地址</li>
|
||||
<li>✅ 提取小红书图文/视频作品下载地址</li>
|
||||
<li>✅ 下载小红书无水印图文/视频作品文件</li>
|
||||
<li>✅ 自动跳过已下载的作品文件</li>
|
||||
<li>✅ 作品文件完整性处理机制</li>
|
||||
<li>☑️ 采集作品信息储存至文件</li>
|
||||
<li>✅ 持久化储存作品信息至文件</li>
|
||||
<li>☑️ 后台监听剪贴板下载作品</li>
|
||||
<li>☑️ 支持 API 调用功能</li>
|
||||
</ul>
|
||||
<h1>📸 程序截图</h1>
|
||||
<br>
|
||||
@ -137,6 +139,18 @@ async with XHS(path=path,
|
||||
<td align="center">请求数据失败时,重试的最大次数,单位:秒</td>
|
||||
<td align="center">5</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td align="center">record_data</td>
|
||||
<td align="center">bool</td>
|
||||
<td align="center">是否记录作品数据至文件</td>
|
||||
<td align="center">false</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td align="center">image_format</td>
|
||||
<td align="center">str</td>
|
||||
<td align="center">图文作品文件名称后缀,例如:<code>jpg</code>、<code>png</code></td>
|
||||
<td align="center">webp</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
<h1>🌐 Cookie</h1>
|
||||
|
||||
@ -4,7 +4,7 @@ from aiohttp import ServerDisconnectedError
|
||||
from aiohttp import ServerTimeoutError
|
||||
from rich.text import Text
|
||||
|
||||
from .Html import retry
|
||||
from .Html import retry as re_download
|
||||
|
||||
__all__ = ['Download']
|
||||
|
||||
@ -26,17 +26,18 @@ class Download:
|
||||
headers={"User-Agent": manager.headers["User-Agent"]},
|
||||
timeout=ClientTimeout(connect=timeout))
|
||||
self.retry = manager.retry
|
||||
self.image_format = manager.image_format
|
||||
|
||||
async def run(self, urls: list, name: str, type_: int, log, bar):
|
||||
if type_ == 0:
|
||||
async def run(self, urls: list, name: str, type_: str, log, bar):
|
||||
if type_ == "v":
|
||||
await self.__download(urls[0], f"{name}.mp4", log, bar)
|
||||
elif type_ == 1:
|
||||
elif type_ == "n":
|
||||
for index, url in enumerate(urls, start=1):
|
||||
await self.__download(url, f"{name}_{index}.png", log, bar)
|
||||
await self.__download(url, f"{name}_{index}.{self.image_format}", log, bar)
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
@retry
|
||||
@re_download
|
||||
async def __download(self, url: str, name: str, log, bar):
|
||||
temp = self.temp.joinpath(name)
|
||||
file = self.folder.joinpath(name)
|
||||
|
||||
@ -9,6 +9,7 @@ class Explore:
|
||||
explore_data = compile(
|
||||
r'"currentTime":\d{13},"note":(.*?)}},"serverRequestInfo"')
|
||||
time_format = "%Y-%m-%d %H:%M:%S"
|
||||
explore_type = {"video": "视频", "normal": "图文"}
|
||||
|
||||
def run(self, html: str) -> dict:
|
||||
data = self.__get_json_data(html)
|
||||
@ -41,14 +42,11 @@ class Explore:
|
||||
tags = data.get("tagList", [])
|
||||
container["作品标签"] = [i.get("name", "") for i in tags]
|
||||
|
||||
@staticmethod
|
||||
def __extract_info(container: dict, data: dict):
|
||||
def __extract_info(self, container: dict, data: dict):
|
||||
container["作品ID"] = data.get("noteId")
|
||||
container["作品标题"] = data.get("title")
|
||||
container["作品描述"] = data.get("desc")
|
||||
container["作品类型"] = {
|
||||
"video": "视频", "normal": "图文"}.get(
|
||||
data.get("type"), "未知")
|
||||
container["作品类型"] = self.explore_type.get(data.get("type"), "未知")
|
||||
container["IP归属地"] = data.get("ipLocation")
|
||||
|
||||
def __extract_time(self, container: dict, data: dict):
|
||||
|
||||
@ -3,7 +3,7 @@ from aiohttp import ClientTimeout
|
||||
from aiohttp import ServerDisconnectedError
|
||||
from aiohttp import ServerTimeoutError
|
||||
|
||||
__all__ = ['Html']
|
||||
__all__ = ["Html", "retry"]
|
||||
|
||||
|
||||
def retry(function):
|
||||
|
||||
@ -23,7 +23,7 @@ class Image:
|
||||
|
||||
@staticmethod
|
||||
def __generate_image_link(token: str) -> str:
|
||||
return f"https://ci.xiaohongshu.com/{token}?imageView2/2/w/format/png"
|
||||
return f"https://sns-img-bd.xhscdn.com/{token}"
|
||||
|
||||
def __extract_image_token(self, url: str) -> str:
|
||||
return self.__generate_image_link(token.group(1)) if (
|
||||
|
||||
@ -19,7 +19,10 @@ class Manager:
|
||||
folder: str,
|
||||
user_agent: str,
|
||||
cookie: str,
|
||||
retry: int):
|
||||
retry: int,
|
||||
record_data: bool,
|
||||
image_format: str,
|
||||
):
|
||||
self.root = root
|
||||
self.temp = root.joinpath("./temp")
|
||||
self.folder = self.__init_root(root, path, folder)
|
||||
@ -34,6 +37,8 @@ class Manager:
|
||||
"-bcc2-a859e97518bf; unread={%22ub%22:%22655eb3d60000000032033955%22%2C%22ue%22:%22656"
|
||||
"e9ef2000000003801ff3d%22%2C%22uc%22:29}; cache_feeds=[]"}
|
||||
self.retry = retry
|
||||
self.record_data = record_data
|
||||
self.image_format = image_format
|
||||
|
||||
def __init_root(self, root: Path, path: str, folder: str) -> Path:
|
||||
if path and (r := Path(path)).is_dir():
|
||||
@ -61,9 +66,11 @@ class Manager:
|
||||
|
||||
def filter_name(self, name: str) -> str:
|
||||
name = self.NAME.sub("_", name)
|
||||
return sub(r"_+", "_", name)
|
||||
return sub(r"_+", "_", name).strip("_")
|
||||
|
||||
def save_data(self, name: str, data: dict):
|
||||
if not self.record_data:
|
||||
return
|
||||
with self.folder.joinpath(f"{name}.txt").open("a", encoding="utf-8") as f:
|
||||
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
content = f"{
|
||||
|
||||
@ -16,6 +16,8 @@ class Settings:
|
||||
"timeout": 10,
|
||||
"chunk": 1024 * 1024,
|
||||
"max_retry": 5,
|
||||
"record_data": False,
|
||||
"image_format": "webp",
|
||||
}
|
||||
encode = "UTF-8-SIG" if system() == "Windows" else "UTF-8"
|
||||
|
||||
|
||||
@ -38,6 +38,10 @@ class XHS:
|
||||
SHARE = compile(r"https://www\.xiaohongshu\.com/discovery/item/[a-z0-9]+")
|
||||
SHORT = compile(r"https://xhslink\.com/[A-Za-z0-9]+")
|
||||
__INSTANCE = None
|
||||
TYPE = {
|
||||
"视频": "v",
|
||||
"图文": "n",
|
||||
}
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if not cls.__INSTANCE:
|
||||
@ -54,6 +58,8 @@ class XHS:
|
||||
timeout=10,
|
||||
chunk=1024 * 1024,
|
||||
max_retry=5,
|
||||
record_data=False,
|
||||
image_format="webp",
|
||||
**kwargs,
|
||||
):
|
||||
self.manager = Manager(
|
||||
@ -62,7 +68,10 @@ class XHS:
|
||||
folder_name,
|
||||
user_agent,
|
||||
cookie,
|
||||
max_retry)
|
||||
max_retry,
|
||||
record_data,
|
||||
image_format,
|
||||
)
|
||||
self.html = Html(
|
||||
self.manager.headers,
|
||||
proxy,
|
||||
@ -78,27 +87,21 @@ class XHS:
|
||||
timeout, )
|
||||
self.rich_log = self.download.rich_log
|
||||
|
||||
async def __get_image(self, container: dict, html: str, download, log, bar):
|
||||
urls = self.image.get_image_link(html)
|
||||
# self.rich_log(log, urls) # 调试代码
|
||||
name = self.__naming_rules(container)
|
||||
if download:
|
||||
await self.download.run(urls, name, 1, log, bar)
|
||||
container["下载地址"] = urls
|
||||
self.manager.save_data(name, container)
|
||||
def __extract_image(self, container: dict, html: str):
|
||||
container["下载地址"] = self.image.get_image_link(html)
|
||||
|
||||
async def __get_video(self, container: dict, html: str, download, log, bar):
|
||||
url = self.video.get_video_link(html)
|
||||
# self.rich_log(log, url) # 调试代码
|
||||
def __extract_video(self, container: dict, html: str):
|
||||
container["下载地址"] = self.video.get_video_link(html)
|
||||
|
||||
async def __download_files(self, container: dict, download: bool, log, bar):
|
||||
name = self.__naming_rules(container)
|
||||
if download:
|
||||
await self.download.run(url, name, 0, log, bar)
|
||||
container["下载地址"] = url
|
||||
if download and (u := container["下载地址"]):
|
||||
await self.download.run(u, name, self.TYPE[container["作品类型"]], log, bar)
|
||||
self.manager.save_data(name, container)
|
||||
|
||||
async def extract(self, url: str, download=False, log=None, bar=None) -> list[dict]:
|
||||
# return # 调试代码
|
||||
urls = await self.__deal_links(url)
|
||||
urls = await self.__extract_links(url)
|
||||
if not urls:
|
||||
self.rich_log(log, "提取小红书作品链接失败", "bright_red")
|
||||
else:
|
||||
@ -106,7 +109,7 @@ class XHS:
|
||||
# return urls # 调试代码
|
||||
return [await self.__deal_extract(i, download, log, bar) for i in urls]
|
||||
|
||||
async def __deal_links(self, url: str) -> list:
|
||||
async def __extract_links(self, url: str) -> list:
|
||||
urls = []
|
||||
for i in url.split():
|
||||
if u := self.SHORT.search(i):
|
||||
@ -130,10 +133,14 @@ class XHS:
|
||||
if not data:
|
||||
self.rich_log(log, f"{url} 提取数据失败", "bright_red")
|
||||
return {}
|
||||
if data["作品类型"] == "视频":
|
||||
await self.__get_video(data, html, download, log, bar)
|
||||
else:
|
||||
await self.__get_image(data, html, download, log, bar)
|
||||
match data["作品类型"]:
|
||||
case "视频":
|
||||
self.__extract_video(data, html)
|
||||
case "图文":
|
||||
self.__extract_image(data, html)
|
||||
case _:
|
||||
data["下载地址"] = []
|
||||
await self.__download_files(data, download, log, bar)
|
||||
self.rich_log(log, f"完成处理:{url}")
|
||||
return data
|
||||
|
||||
@ -145,6 +152,9 @@ class XHS:
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_value, traceback):
|
||||
await self.close()
|
||||
|
||||
async def close(self):
|
||||
self.manager.clean()
|
||||
await self.html.session.close()
|
||||
await self.download.session.close()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user