新增支持命令行下载作品文件

This commit is contained in:
JoeanAmier 2024-03-07 23:25:32 +08:00
parent 18f92134d8
commit a997fe1a1d
14 changed files with 200 additions and 34 deletions

View File

@ -26,8 +26,8 @@
<li>✅ 作品文件储存至单独文件夹</li>
<li>✅ 后台监听剪贴板下载作品</li>
<li>✅ 记录已下载作品 ID</li>
<li>✅ 支持命令行下载作品文件</li>
<li>☑️ 支持 API 调用功能</li>
<li>☑️ 支持命令行参数下载作品文件</li>
</ul>
<ul><b>脚本功能</b>
<li>✅ 下载小红书无水印作品文件</li>
@ -67,6 +67,11 @@
<li>下载本项目最新的源码或 <a href="https://github.com/JoeanAmier/XHS-Downloader/releases/latest">Releases</a> 发布的源码至本地</li>
<li>运行 <code>main.py</code> 即可使用</li>
</ol>
<h1>🛠 命令行模式</h1>
<p>项目支持命令行运行模式,若想要下载图文作品的部分图片,可以使用此模式传入需要下载的图片序号!</p>
<img src="static/screenshot/命令行模式截图1.png" alt="">
<hr>
<img src="static/screenshot/命令行模式截图2.png" alt="">
<h1>🕹 用户脚本</h1>
<img src="static/screenshot/用户脚本截图1.png" alt="">
<hr>
@ -111,9 +116,11 @@ async def example():
download = True # 是否下载作品文件默认值False
efficient = True # 高效模式,禁用请求延时
# 返回作品详细信息,包括下载地址
print(await xhs.extract(error_link, download, efficient)) # 获取数据失败时返回空字典
print(await xhs.extract(demo_link, download, efficient))
print(await xhs.extract(multiple_links, download, efficient)) # 支持传入多个作品链接
# 获取数据失败时返回空字典
print(await xhs.extract(error_link, download, efficient=efficient))
print(await xhs.extract(demo_link, download, efficient=efficient))
# 支持传入多个作品链接
print(await xhs.extract(multiple_links, download, efficient=efficient))
</pre>
<h1>⚙️ 配置文件</h1>
<p>项目根目录下的 <code>settings.json</code> 文件,首次运行自动生成,可以自定义部分运行参数。</p>
@ -235,16 +242,10 @@ async def example():
<p>如果您愿意,可以考虑提供资助为 <b>XHS-Downloader</b> 提供额外的支持!</p>
<h1>✉️ 联系作者</h1>
<ul>
<li>QQ: 2437596031联系请说明来意</li>
<li>QQ Group: <a href="https://github.com/JoeanAmier/XHS-Downloader/blob/master/static/QQ%E7%BE%A4%E8%81%8A%E4%BA%8C%E7%BB%B4%E7%A0%81.png">点击扫码加入群聊</a></li>
<li>Email: yonglelolu@gmail.com</li>
<li>微信: Downloader_Tools</li>
<li>微信公众号: Downloader Tools</li>
<li>QQ 群聊(使用交流): <a href="https://github.com/JoeanAmier/XHS-Downloader/blob/master/static/QQ%E7%BE%A4%E8%81%8A%E4%BA%8C%E7%BB%B4%E7%A0%81.png">扫码加入群聊</a></li>
</ul>
<p>
<b>如果您在使用 XHS-Downloader 的时候遇到问题,请先阅读<a href="https://github.com/ryanhanwu/How-To-Ask-Questions-The-Smart-Way/blob/main/README-zh_CN.md">《提问的智慧》</a>,然后加入 QQ 群聊寻求帮助!</b>
</p>
<p>
<b>如果您通过 Email 联系我,我可能无法及时查看并回复信息,我会尽力在七天内回复您的邮件;如果有紧急事项或需要更快的回复,请通过其他方式与我联系,谢谢理解!</b>
</p>
<p><b>如果您对抖音 / TikTok 感兴趣,可以了解一下我的另一个开源项目 <a href="https://github.com/JoeanAmier/TikTokDownloader">TikTokDownloader</a></b></p>
<h1>⚠️ 免责声明</h1>
<ul>
@ -264,5 +265,7 @@ async def example():
# 💡 代码参考
* https://textual.textualize.io/
* https://docs.aiohttp.org/en/stable/
* https://textual.textualize.io/
* https://aiosqlite.omnilib.dev/en/stable/
* https://click.palletsprojects.com/en/8.1.x/

16
main.py
View File

@ -1,7 +1,9 @@
from asyncio import run
from sys import argv
from source import XHS
from source import XHSDownloader
from source import cli
async def example():
@ -39,9 +41,11 @@ async def example():
download = True # 是否下载作品文件默认值False
efficient = True # 高效模式,禁用请求延时
# 返回作品详细信息,包括下载地址
print(await xhs.extract(error_link, download, efficient)) # 获取数据失败时返回空字典
print(await xhs.extract(demo_link, download, efficient))
print(await xhs.extract(multiple_links, download, efficient)) # 支持传入多个作品链接
# 获取数据失败时返回空字典
print(await xhs.extract(error_link, download, efficient=efficient))
print(await xhs.extract(demo_link, download, efficient=efficient))
# 支持传入多个作品链接
print(await xhs.extract(multiple_links, download, efficient=efficient))
async def main():
@ -50,5 +54,7 @@ async def main():
if __name__ == '__main__':
# run(example())
run(main())
if len(argv) > 1:
cli()
else:
run(main())

View File

@ -4,3 +4,4 @@ pyperclip>=1.8.2
lxml>=5.1.0
PyYAML>=6.0.1
aiosqlite>=0.20.0
click>=8.1.7

View File

@ -0,0 +1,3 @@
from .main import cli
__all__ = ["cli"]

7
source/CLI/help.py Normal file
View File

@ -0,0 +1,7 @@
from click import Context
__all__ = ["help"]
def help(ctx: Context, *args, **kwargs):
ctx.exit()

121
source/CLI/main.py Normal file
View File

@ -0,0 +1,121 @@
from asyncio import run
from contextlib import suppress
from pathlib import Path as Root
from click import (
command,
option,
Path,
Choice,
pass_context,
Context,
echo,
)
from source.application import XHS
from source.module import (
ROOT,
PROJECT,
)
from source.module import Settings
from .help import help
__all__ = ["cli"]
class CLI:
def __init__(self, ctx: Context, **kwargs):
self.ctx = ctx
self.url = kwargs.pop("url")
self.index = self.__format_index(kwargs.pop("index"))
self.path = kwargs.pop("settings")
self.update = kwargs.pop("update_settings")
self.settings = Settings(self.__check_settings_path())
self.parameter = self.settings.run() | self.__clean_params(kwargs)
self.APP = XHS(**self.parameter)
async def __aenter__(self):
await self.APP.__aenter__()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.APP.__aexit__(exc_type, exc_value, traceback)
async def run(self):
if not self.url:
echo("No URL specified")
self.ctx.exit()
await self.APP.extract_cli(self.url, index=self.index)
self.__update_settings()
def __update_settings(self):
if self.update:
self.settings.update(self.parameter)
def __check_settings_path(self) -> Path:
if not self.path:
return ROOT
return s.parent if (s := Root(self.path)).is_file() else ROOT
@staticmethod
def __clean_params(data: dict) -> dict:
return {k: v for k, v in data.items() if v}
def __check_params(self):
pass
@staticmethod
def __format_index(index: str) -> list:
if index:
result = []
values = index.split()
for i in values:
with suppress(ValueError):
result.append(int(i))
return result
return []
@staticmethod
def version(ctx: Context, *args, **kwargs):
echo(PROJECT)
ctx.exit()
@command(name="XHS-Downloader", help=PROJECT)
@option("--url", "-u", type=str, help="小红书作品链接", )
@option("--index", "-i", type=str, help="下载指定序号的图片文件,仅对图文作品生效", )
@option("--work_path",
"-wp",
type=Path(file_okay=False),
help="作品数据 / 文件保存根路径",
)
@option("--folder_name", "-fn", type=str, help="作品文件储存文件夹名称", )
@option("--user_agent", "-ua", type=str, help="请求头 User-Agent", )
@option("--cookie", "-ck", type=str, help="小红书网页版 Cookie无需登录", )
@option("--proxy", "-p", type=str, help="设置程序代理", )
@option("--timeout", "-t", type=int, help="请求数据超时限制,单位:秒", )
@option("--chunk", "-c", type=int, help="下载文件时,每次从服务器获取的数据块大小,单位:字节", )
@option("--max_retry", "-mr", type=int, help="请求数据失败时,重试的最大次数,单位:秒", )
@option("--record_data", "-rd", type=bool, help="是否记录作品数据至 TXT 文件", )
@option("--image_format", "-if", type=Choice(["png", "PNG", "webp", "WEBP"]),
help="图文作品文件下载格式支持PNG、WEBP", )
@option("--folder_mode", "-fm", type=bool, help="是否将每个作品的文件储存至单独的文件夹", )
@option("--language", "-l",
type=Choice(["zh-CN", "en-GB"]), help="设置程序语言目前支持zh-CN、en-GB", )
@option("--settings", "-s", type=Path(dir_okay=False), help="读取指定配置文件", )
@option("--update_settings", "-us", type=bool, help="是否更新配置文件", )
@option("-h",
is_flag=True,
is_eager=True,
expose_value=False,
help="查看详细参数说明",
callback=help)
@option("--version", "-v", is_flag=True, is_eager=True,
expose_value=False, help="查看程序版本信息", callback=CLI.version)
@pass_context
def cli(ctx, **kwargs):
async def main():
async with CLI(ctx, **kwargs) as xhs:
await xhs.run()
run(main())

View File

@ -90,7 +90,7 @@ class Index(Screen):
self.deal()
else:
self.tip.write(Text(self.prompt.invalid_link, style=WARNING))
self.tip.write(Text(">" * 50, style=GENERAL))
self.tip.write(Text(">" * 50, style=GENERAL))
@on(Button.Pressed, "#reset")
def reset_button(self):
@ -107,4 +107,5 @@ class Index(Screen):
self.url.value = ""
else:
self.tip.write(Text(self.prompt.download_failure, style=ERROR))
self.tip.write(Text(">" * 50, style=GENERAL))
self.app.pop_screen()

View File

@ -1,4 +1,5 @@
from .CLI import cli
from .TUI import XHSDownloader
from .application import XHS
__all__ = ['XHS', 'XHSDownloader']
__all__ = ['XHS', 'XHSDownloader', 'cli']

View File

@ -94,14 +94,14 @@ class XHS:
def __extract_video(self, container: dict, data: Namespace):
container["下载地址"] = self.video.get_video_link(data)
async def __download_files(self, container: dict, download: bool, log, bar):
async def __download_files(self, container: dict, download: bool, index, log, bar):
name = self.__naming_rules(container)
path = self.manager.folder
if (u := container["下载地址"]) and download:
if await self.skip_download(i := container["作品ID"]):
logging(log, self.prompt.exist_record(i))
else:
path, result = await self.download.run(u, name, container["作品类型"], log, bar)
path, result = await self.download.run(u, index, name, container["作品类型"], log, bar)
await self.__add_record(i, result)
elif not u:
logging(log, self.prompt.download_link_error, ERROR)
@ -111,7 +111,13 @@ class XHS:
if all(result):
await self.recorder.add(id_)
async def extract(self, url: str, download=False, efficient=False, log=None, bar=None) -> list[dict]:
async def extract(self,
url: str,
download=False,
index: list | tuple = None,
efficient=False,
log=None,
bar=None) -> list[dict]:
# return # 调试代码
urls = await self.__extract_links(url, log)
if not urls:
@ -119,7 +125,20 @@ class XHS:
else:
logging(log, self.prompt.pending_processing(len(urls)))
# return urls # 调试代码
return [await self.__deal_extract(i, download, efficient, log, bar) for i in urls]
return [await self.__deal_extract(i, download, index, efficient, log, bar) for i in urls]
async def extract_cli(self,
url: str,
download=True,
index: list | tuple = None,
efficient=True,
log=None,
bar=None) -> None:
url = await self.__extract_links(url, log)
if not url:
logging(log, self.prompt.extract_link_failure, WARNING)
else:
await self.__deal_extract(url[0], download, index, efficient, log, bar)
async def __extract_links(self, url: str, log) -> list:
urls = []
@ -133,7 +152,7 @@ class XHS:
urls.append(u.group())
return urls
async def __deal_extract(self, url: str, download: bool, efficient: bool, log, bar):
async def __deal_extract(self, url: str, download: bool, index: list | tuple | None, efficient: bool, log, bar):
logging(log, self.prompt.start_processing(url))
html = await self.html.request_url(url, log=log)
namespace = self.__generate_data_object(html)
@ -153,7 +172,7 @@ class XHS:
self.__extract_image(data, namespace)
case _:
data["下载地址"] = []
await self.__download_files(data, download, log, bar)
await self.__download_files(data, download, index, log, bar)
logging(log, self.prompt.processing_completed(url))
return data

View File

@ -33,13 +33,14 @@ class Download:
self.video_format = "mp4"
self.image_format = manager.image_format
async def run(self, urls: list, name: str, type_: str, log, bar) -> tuple[Path, tuple]:
async def run(self, urls: list, index: list | tuple | None, name: str, type_: str, log, bar) -> tuple[Path, tuple]:
path = self.__generate_path(name)
match type_:
case "视频":
tasks = self.__ready_download_video(urls, path, name, log)
case "图文":
tasks = self.__ready_download_image(urls, path, name, log)
tasks = self.__ready_download_image(
urls, index, path, name, log)
case _:
raise ValueError
tasks = [
@ -74,11 +75,14 @@ class Download:
def __ready_download_image(
self,
urls: list[str],
index: list | tuple | None,
path: Path,
name: str,
log) -> list:
tasks = []
for i, j in enumerate(urls, start=1):
if index and i not in index:
continue
file = f"{name}_{i}"
if any(path.glob(f"{file}.*")):
logging(log, self.prompt.skip_download(file))

View File

@ -14,7 +14,7 @@ class IDRecorder:
async def __connect_database(self):
self.database = await connect(self.file)
self.cursor = await self.database.cursor()
await self.cursor.execute("CREATE TABLE IF NOT EXISTS explore_ids (ID TEXT PRIMARY KEY);")
await self.database.execute("CREATE TABLE IF NOT EXISTS explore_ids (ID TEXT PRIMARY KEY);")
await self.database.commit()
async def select(self, id_: str):
@ -22,12 +22,12 @@ class IDRecorder:
return await self.cursor.fetchone()
async def add(self, id_: str) -> None:
await self.cursor.execute("REPLACE INTO explore_ids VALUES (?);", (id_,))
await self.database.execute("REPLACE INTO explore_ids VALUES (?);", (id_,))
await self.database.commit()
async def delete(self, id_: str) -> None:
if id_:
await self.cursor.execute("DELETE FROM explore_ids WHERE ID=?", (id_,))
await self.database.execute("DELETE FROM explore_ids WHERE ID=?", (id_,))
await self.database.commit()
async def delete_many(self, ids: list | tuple):

View File

@ -22,8 +22,8 @@ __all__ = [
]
VERSION_MAJOR = 1
VERSION_MINOR = 8
VERSION_BETA = False
VERSION_MINOR = 9
VERSION_BETA = True
ROOT = Path(__file__).resolve().parent.parent.parent
PROJECT = f"XHS-Downloader V{VERSION_MAJOR}.{
VERSION_MINOR}{" Beta" if VERSION_BETA else ""}"

Binary file not shown.

After

Width:  |  Height:  |  Size: 160 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 212 KiB