mirror of
https://github.com/JoeanAmier/XHS-Downloader.git
synced 2025-12-26 04:48:05 +08:00
新增保存作品数据功能
This commit is contained in:
parent
5f1e4a6657
commit
af8e3cabb1
@ -1,5 +1,3 @@
|
||||
from pathlib import Path
|
||||
|
||||
from aiohttp import ClientSession
|
||||
from aiohttp import ClientTimeout
|
||||
from aiohttp import ServerDisconnectedError
|
||||
@ -16,31 +14,18 @@ class Download:
|
||||
def __init__(
|
||||
self,
|
||||
manager,
|
||||
root: Path,
|
||||
path: str,
|
||||
folder: str,
|
||||
proxy: str = "",
|
||||
chunk=1024 * 1024,
|
||||
timeout=10,
|
||||
retry_=5, ):
|
||||
timeout=10):
|
||||
self.manager = manager
|
||||
self.folder = manager.folder
|
||||
self.temp = manager.temp
|
||||
self.root = self.__init_root(root, path, folder)
|
||||
self.proxy = proxy
|
||||
self.chunk = chunk
|
||||
self.session = ClientSession(
|
||||
headers={"User-Agent": manager.headers["User-Agent"]},
|
||||
timeout=ClientTimeout(connect=timeout))
|
||||
self.retry = retry_
|
||||
|
||||
def __init_root(self, root: Path, path: str, folder: str) -> Path:
|
||||
if path and (r := Path(path)).is_dir():
|
||||
root = r.joinpath(folder or "Download")
|
||||
else:
|
||||
root = root.joinpath(folder or "Download")
|
||||
root.mkdir(exist_ok=True)
|
||||
self.temp.mkdir(exist_ok=True)
|
||||
return root
|
||||
self.retry = manager.retry
|
||||
|
||||
async def run(self, urls: list, name: str, type_: int, log, bar):
|
||||
if type_ == 0:
|
||||
@ -54,7 +39,7 @@ class Download:
|
||||
@retry
|
||||
async def __download(self, url: str, name: str, log, bar):
|
||||
temp = self.temp.joinpath(name)
|
||||
file = self.root.joinpath(name)
|
||||
file = self.folder.joinpath(name)
|
||||
if self.manager.is_exists(file):
|
||||
self.rich_log(log, f"{name} 已存在,跳过下载")
|
||||
return True
|
||||
|
||||
@ -1,4 +1,8 @@
|
||||
from datetime import datetime
|
||||
from json import dumps
|
||||
from pathlib import Path
|
||||
from re import compile
|
||||
from re import sub
|
||||
from shutil import move
|
||||
from shutil import rmtree
|
||||
|
||||
@ -6,11 +10,22 @@ __all__ = ["Manager"]
|
||||
|
||||
|
||||
class Manager:
|
||||
def __init__(self, root: Path, ua: str, cookie: str, retry: int):
|
||||
NAME = compile(r"[^\u4e00-\u9fa5a-zA-Z0-9_]")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
root: Path,
|
||||
path: str,
|
||||
folder: str,
|
||||
user_agent: str,
|
||||
cookie: str,
|
||||
retry: int):
|
||||
self.root = root
|
||||
self.temp = root.joinpath("./temp")
|
||||
self.folder = self.__init_root(root, path, folder)
|
||||
self.headers = {
|
||||
"User-Agent": ua or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||
"Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
|
||||
"User-Agent": user_agent or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gec"
|
||||
"ko) Chrome/120.0.0.0 Safari/537.36",
|
||||
"Cookie": cookie or "abRequestId=54c534bb-a2c6-558f-8e03-5b4c5c45635c; xsecappid=xhs-pc-web; a1=18c286a400"
|
||||
"4jy56qvzejvp631col0hd3032h4zjez50000106381; webId=779c977da3a15b5623015be94bdcc9e9; g"
|
||||
"id=yYSJYK0qDW8KyYSJYK048quV84Vv2KAhudVhJduUKqySlx2818xfq4888y8KqYy8y2y2f8Jy; web_sess"
|
||||
@ -20,6 +35,15 @@ class Manager:
|
||||
"e9ef2000000003801ff3d%22%2C%22uc%22:29}; cache_feeds=[]"}
|
||||
self.retry = retry
|
||||
|
||||
def __init_root(self, root: Path, path: str, folder: str) -> Path:
|
||||
if path and (r := Path(path)).is_dir():
|
||||
root = r.joinpath(folder or "Download")
|
||||
else:
|
||||
root = root.joinpath(folder or "Download")
|
||||
root.mkdir(exist_ok=True)
|
||||
self.temp.mkdir(exist_ok=True)
|
||||
return root
|
||||
|
||||
@staticmethod
|
||||
def is_exists(path: Path) -> bool:
|
||||
return path.exists()
|
||||
@ -34,3 +58,20 @@ class Manager:
|
||||
|
||||
def clean(self):
|
||||
rmtree(self.temp.resolve())
|
||||
|
||||
def filter_name(self, name: str) -> str:
|
||||
name = self.NAME.sub("_", name)
|
||||
return sub(r"_+", "_", name)
|
||||
|
||||
def save_data(self, name: str, data: dict):
|
||||
with self.folder.joinpath(f"{name}.txt").open("a", encoding="utf-8") as f:
|
||||
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
content = f"{
|
||||
time.center(
|
||||
50,
|
||||
"=")}\n{
|
||||
dumps(
|
||||
data,
|
||||
indent=4,
|
||||
ensure_ascii=False)}\n"
|
||||
f.write(content)
|
||||
|
||||
@ -56,7 +56,13 @@ class XHS:
|
||||
max_retry=5,
|
||||
**kwargs,
|
||||
):
|
||||
self.manager = Manager(ROOT, user_agent, cookie, max_retry)
|
||||
self.manager = Manager(
|
||||
ROOT,
|
||||
path,
|
||||
folder_name,
|
||||
user_agent,
|
||||
cookie,
|
||||
max_retry)
|
||||
self.html = Html(
|
||||
self.manager.headers,
|
||||
proxy,
|
||||
@ -67,28 +73,28 @@ class XHS:
|
||||
self.explore = Explore()
|
||||
self.download = Download(
|
||||
self.manager,
|
||||
ROOT,
|
||||
path,
|
||||
folder_name,
|
||||
proxy,
|
||||
chunk,
|
||||
timeout,
|
||||
self.manager.retry, )
|
||||
timeout, )
|
||||
self.rich_log = self.download.rich_log
|
||||
|
||||
async def __get_image(self, container: dict, html: str, download, log, bar):
|
||||
urls = self.image.get_image_link(html)
|
||||
# self.rich_log(log, urls) # 调试代码
|
||||
name = self.__naming_rules(container)
|
||||
if download:
|
||||
await self.download.run(urls, self.__naming_rules(container), 1, log, bar)
|
||||
await self.download.run(urls, name, 1, log, bar)
|
||||
container["下载地址"] = urls
|
||||
self.manager.save_data(name, container)
|
||||
|
||||
async def __get_video(self, container: dict, html: str, download, log, bar):
|
||||
url = self.video.get_video_link(html)
|
||||
# self.rich_log(log, url) # 调试代码
|
||||
name = self.__naming_rules(container)
|
||||
if download:
|
||||
await self.download.run(url, self.__naming_rules(container), 0, log, bar)
|
||||
await self.download.run(url, name, 0, log, bar)
|
||||
container["下载地址"] = url
|
||||
self.manager.save_data(name, container)
|
||||
|
||||
async def extract(self, url: str, download=False, log=None, bar=None) -> list[dict]:
|
||||
# return # 调试代码
|
||||
@ -131,10 +137,9 @@ class XHS:
|
||||
self.rich_log(log, f"完成处理:{url}")
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def __naming_rules(data: dict) -> str:
|
||||
"""下载文件默认使用作品 ID 作为文件名,可修改此方法自定义文件名格式"""
|
||||
return data["作品ID"]
|
||||
def __naming_rules(self, data: dict) -> str:
|
||||
"""下载文件默认使用 作品标题 或 作品 ID 作为文件名称,可修改此方法自定义文件名称格式"""
|
||||
return self.manager.filter_name(data["作品标题"]) or data["作品ID"]
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user