From 1570ba320cea4e58cdacfe514b181c35f768fc82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=A8?= Date: Sat, 15 Feb 2025 21:30:24 +0800 Subject: [PATCH] =?UTF-8?q?style:=20=E4=BB=A3=E7=A0=81=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=E5=8C=96=E5=92=8C=E5=AD=97=E7=AC=A6=E4=B8=B2=E5=A4=84=E7=90=86?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 优化代码缩进和换行,提高可读性 - 统一使用单引号或双引号,保持一致性 - 移除冗余的空格和括号,精简代码 --- example.py | 7 +- locale/generate_path.py | 8 +- locale/po_to_mo.py | 6 +- main.py | 14 +- pyproject.toml | 79 +++++++++++ source/CLI/main.py | 226 ++++++++++++++++++++++++------- source/TUI/__init__.py | 2 +- source/TUI/about.py | 43 +++--- source/TUI/app.py | 54 ++++++-- source/TUI/index.py | 48 ++++--- source/TUI/loading.py | 4 +- source/TUI/monitor.py | 20 ++- source/TUI/record.py | 26 +++- source/TUI/setting.py | 228 ++++++++++++++++++++++++-------- source/TUI/update.py | 30 +++-- source/__init__.py | 7 +- source/application/app.py | 118 ++++++++++++----- source/application/download.py | 82 ++++++++---- source/application/explore.py | 44 +++--- source/application/image.py | 18 +-- source/application/request.py | 46 +++++-- source/application/video.py | 9 +- source/expansion/browser.py | 35 +++-- source/expansion/cleaner.py | 18 ++- source/expansion/converter.py | 4 +- source/expansion/file_folder.py | 4 +- source/expansion/namespace.py | 21 +-- source/expansion/truncate.py | 2 +- source/module/manager.py | 33 ++--- source/module/recorder.py | 16 ++- source/module/settings.py | 2 +- source/module/static.py | 42 +++--- source/module/tools.py | 5 +- source/translation/translate.py | 5 +- 34 files changed, 948 insertions(+), 358 deletions(-) diff --git a/example.py b/example.py index 9e40326..212980d 100644 --- a/example.py +++ b/example.py @@ -60,7 +60,12 @@ async def example(): async def test(): url = "" async with XHS() as xhs: - print(await xhs.extract(url, download=True, )) + print( + await xhs.extract( + url, + download=True, + ) + ) if __name__ == "__main__": diff --git a/locale/generate_path.py b/locale/generate_path.py index 0dc0a79..9e84c62 100644 --- a/locale/generate_path.py +++ b/locale/generate_path.py @@ -4,14 +4,14 @@ ROOT = Path(__file__).resolve().parent.parent def find_python_files(dir_, file): - with open(file, 'w', encoding='utf-8') as f: - for py_file in dir_.rglob('*.py'): # 递归查找所有 .py 文件 - f.write(str(py_file) + '\n') # 写入文件路径 + with open(file, "w", encoding="utf-8") as f: + for py_file in dir_.rglob("*.py"): # 递归查找所有 .py 文件 + f.write(str(py_file) + "\n") # 写入文件路径 # 设置源目录和输出文件 source_directory = ROOT.joinpath("source") # 源目录 -output_file = 'py_files.txt' # 输出文件名 +output_file = "py_files.txt" # 输出文件名 find_python_files(source_directory, output_file) print(f"所有 .py 文件路径已保存到 {output_file}") diff --git a/locale/po_to_mo.py b/locale/po_to_mo.py index a571bad..391cbf3 100644 --- a/locale/po_to_mo.py +++ b/locale/po_to_mo.py @@ -6,9 +6,7 @@ ROOT = Path(__file__).resolve().parent def scan_directory(): return [ - item.joinpath("LC_MESSAGES/xhs.po") - for item in ROOT.iterdir() - if item.is_dir() + item.joinpath("LC_MESSAGES/xhs.po") for item in ROOT.iterdir() if item.is_dir() ] @@ -18,7 +16,7 @@ def generate_map(files: list[Path]): def generate_mo(maps: list[tuple[Path, Path]]): for i, j in maps: - command = f"msgfmt --check -o \"{j}\" \"{i}\"" + command = f'msgfmt --check -o "{j}" "{i}"' print(run(command, shell=True, text=True)) diff --git a/main.py b/main.py index a831cca..520d3bf 100644 --- a/main.py +++ b/main.py @@ -14,12 +14,20 @@ async def app(): await xhs.run_async() -async def server(host="0.0.0.0", port=8000, log_level="info", ): +async def server( + host="0.0.0.0", + port=8000, + log_level="info", +): async with XHS(**Settings().run()) as xhs: - await xhs.run_server(host, port, log_level, ) + await xhs.run_server( + host, + port, + log_level, + ) -if __name__ == '__main__': +if __name__ == "__main__": with suppress( KeyboardInterrupt, CancelledError, diff --git a/pyproject.toml b/pyproject.toml index dc68cb1..7818d92 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,3 +28,82 @@ Repository = "https://github.com/JoeanAmier/XHS-Downloader" [tool.uv.pip] index-url = "https://mirrors.ustc.edu.cn/pypi/simple" + +[tool.ruff] +# Exclude a variety of commonly ignored directories. +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".git-rewrite", + ".hg", + ".ipynb_checkpoints", + ".mypy_cache", + ".nox", + ".pants.d", + ".pyenv", + ".pytest_cache", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + ".vscode", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "site-packages", + "venv", +] + +# Same as Black. +line-length = 88 +indent-width = 4 + +# Assume Python 3.12 +target-version = "py312" + +[tool.ruff.lint] +# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. +# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or +# McCabe complexity (`C901`) by default. +select = ["E4", "E7", "E9", "F"] +ignore = [] + +# Allow fix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] + +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" + +[tool.ruff.format] +# Like Black, use double quotes for strings. +quote-style = "double" + +# Like Black, indent with spaces, rather than tabs. +indent-style = "space" + +# Like Black, respect magic trailing commas. +skip-magic-trailing-comma = false + +# Like Black, automatically detect the appropriate line ending. +line-ending = "auto" + +# Enable auto-formatting of code examples in docstrings. Markdown, +# reStructuredText code/literal blocks and doctests are all supported. +# +# This is currently disabled by default, but it is planned for this +# to be opt-out in the future. +docstring-code-format = false + +# Set the line length limit used when formatting code snippets in +# docstrings. +# +# This only has an effect when the `docstring-code-format` setting is +# enabled. +docstring-code-line-length = "dynamic" diff --git a/source/CLI/main.py b/source/CLI/main.py index 48fc680..15cfe0d 100644 --- a/source/CLI/main.py +++ b/source/CLI/main.py @@ -99,7 +99,12 @@ class CLI: @staticmethod @check_value def read_cookie(ctx: Context, param, value) -> str: - return BrowserCookie.get(value, domains=["xiaohongshu.com", ]) + return BrowserCookie.get( + value, + domains=[ + "xiaohongshu.com", + ], + ) @staticmethod @check_value @@ -110,12 +115,24 @@ class CLI: table.add_column("parameter", no_wrap=True, style="bold") table.add_column("abbreviation", no_wrap=True, style="bold") table.add_column("type", no_wrap=True, style="bold") - table.add_column("description", no_wrap=True, ) + table.add_column( + "description", + no_wrap=True, + ) options = ( ("--url", "-u", "str", _("小红书作品链接")), - ("--index", "-i", "str", - fill(_("下载指定序号的图片文件,仅对图文作品生效;多个序号输入示例:\"1 3 5 7\""), width=55),), + ( + "--index", + "-i", + "str", + fill( + _( + '下载指定序号的图片文件,仅对图文作品生效;多个序号输入示例:"1 3 5 7"' + ), + width=55, + ), + ), ("--work_path", "-wp", "str", _("作品数据 / 文件保存根路径")), ("--folder_name", "-fn", "str", _("作品文件储存文件夹名称")), ("--name_format", "-nf", "str", _("作品文件名称格式")), @@ -125,22 +142,51 @@ class CLI: ("--cookie", "-ck", "str", _("小红书网页版 Cookie,无需登录")), ("--proxy", "-p", "str", _("网络代理")), ("--timeout", "-t", "int", _("请求数据超时限制,单位:秒")), - ("--chunk", "-c", "int", fill(_("下载文件时,每次从服务器获取的数据块大小,单位:字节"), width=55),), + ( + "--chunk", + "-c", + "int", + fill( + _("下载文件时,每次从服务器获取的数据块大小,单位:字节"), width=55 + ), + ), ("--max_retry", "-mr", "int", _("请求数据失败时,重试的最大次数")), ("--record_data", "-rd", "bool", _("是否记录作品数据至文件")), - ("--image_format", "-if", "choice", _("图文作品文件下载格式,支持:PNG、WEBP")), + ( + "--image_format", + "-if", + "choice", + _("图文作品文件下载格式,支持:PNG、WEBP"), + ), ("--live_download", "-ld", "bool", _("动态图片下载开关")), ("--download_record", "-dr", "bool", _("作品下载记录开关")), - ("--folder_mode", "-fm", "bool", _("是否将每个作品的文件储存至单独的文件夹")), + ( + "--folder_mode", + "-fm", + "bool", + _("是否将每个作品的文件储存至单独的文件夹"), + ), ("--language", "-l", "choice", _("设置程序语言,目前支持:zh_CN、en_US")), ("--settings", "-s", "str", _("读取指定配置文件")), - ("--browser_cookie", "-bc", "choice", - fill(_("从指定的浏览器读取小红书网页版 Cookie,支持:{0}; 输入浏览器名称或序号").format( - ", ".join(f"{i}: {j}" for i, j in enumerate( - BrowserCookie.SUPPORT_BROWSER.keys(), - start=1, - )) - ), width=55)), + ( + "--browser_cookie", + "-bc", + "choice", + fill( + _( + "从指定的浏览器读取小红书网页版 Cookie,支持:{0}; 输入浏览器名称或序号" + ).format( + ", ".join( + f"{i}: {j}" + for i, j in enumerate( + BrowserCookie.SUPPORT_BROWSER.keys(), + start=1, + ) + ) + ), + width=55, + ), + ), ("--update_settings", "-us", "flag", _("是否更新配置文件")), ("--help", "-h", "flag", _("查看详细参数说明")), ("--version", "-v", "flag", _("查看 XHS-Downloader 版本")), @@ -154,45 +200,123 @@ class CLI: table, border_style="bold", title="XHS-Downloader CLI Parameters", - title_align="left")) + title_align="left", + ) + ) @command(name="XHS-Downloader", help=PROJECT) -@option("--url", "-u", ) -@option("--index", "-i", ) -@option("--work_path", - "-wp", - type=Path(file_okay=False), - ) -@option("--folder_name", "-fn", ) -@option("--name_format", "-nf", ) -@option("--user_agent", "-ua", ) -@option("--cookie", "-ck", ) -@option("--proxy", "-p", ) -@option("--timeout", "-t", type=int, ) -@option("--chunk", "-c", type=int, ) -@option("--max_retry", "-mr", type=int, ) -@option("--record_data", "-rd", type=bool, ) -@option("--image_format", "-if", type=Choice(["png", "PNG", "webp", "WEBP"]), ) -@option("--live_download", "-ld", type=bool, ) -@option("--download_record", "-dr", type=bool, ) -@option("--folder_mode", "-fm", type=bool, ) -@option("--language", "-l", - type=Choice(["zh_CN", "en_US"]), ) -@option("--settings", "-s", type=Path(dir_okay=False), ) -@option("--browser_cookie", "-bc", type=Choice( - list(BrowserCookie.SUPPORT_BROWSER.keys() - ) + [str(i) for i in range(1, len(BrowserCookie.SUPPORT_BROWSER) + 1)]), callback=CLI.read_cookie, ) -@option("--update_settings", "-us", type=bool, - is_flag=True, ) -@option("-h", - "--help", - is_flag=True, ) -@option("--version", "-v", - is_flag=True, - is_eager=True, - expose_value=False, - callback=CLI.version, ) +@option( + "--url", + "-u", +) +@option( + "--index", + "-i", +) +@option( + "--work_path", + "-wp", + type=Path(file_okay=False), +) +@option( + "--folder_name", + "-fn", +) +@option( + "--name_format", + "-nf", +) +@option( + "--user_agent", + "-ua", +) +@option( + "--cookie", + "-ck", +) +@option( + "--proxy", + "-p", +) +@option( + "--timeout", + "-t", + type=int, +) +@option( + "--chunk", + "-c", + type=int, +) +@option( + "--max_retry", + "-mr", + type=int, +) +@option( + "--record_data", + "-rd", + type=bool, +) +@option( + "--image_format", + "-if", + type=Choice(["png", "PNG", "webp", "WEBP"]), +) +@option( + "--live_download", + "-ld", + type=bool, +) +@option( + "--download_record", + "-dr", + type=bool, +) +@option( + "--folder_mode", + "-fm", + type=bool, +) +@option( + "--language", + "-l", + type=Choice(["zh_CN", "en_US"]), +) +@option( + "--settings", + "-s", + type=Path(dir_okay=False), +) +@option( + "--browser_cookie", + "-bc", + type=Choice( + list(BrowserCookie.SUPPORT_BROWSER.keys()) + + [str(i) for i in range(1, len(BrowserCookie.SUPPORT_BROWSER) + 1)] + ), + callback=CLI.read_cookie, +) +@option( + "--update_settings", + "-us", + type=bool, + is_flag=True, +) +@option( + "-h", + "--help", + is_flag=True, +) +@option( + "--version", + "-v", + is_flag=True, + is_eager=True, + expose_value=False, + callback=CLI.version, +) @pass_context def cli(ctx, help, language, **kwargs): # Step 1: 切换语言 @@ -217,4 +341,4 @@ if __name__ == "__main__": from click.testing import CliRunner runner = CliRunner() - result = runner.invoke(cli, ['-l', 'en_US', '-u', '']) + result = runner.invoke(cli, ["-l", "en_US", "-u", ""]) diff --git a/source/TUI/__init__.py b/source/TUI/__init__.py index 7aed1a2..f23dfda 100644 --- a/source/TUI/__init__.py +++ b/source/TUI/__init__.py @@ -1,3 +1,3 @@ from .app import XHSDownloader -__all__ = ['XHSDownloader'] +__all__ = ["XHSDownloader"] diff --git a/source/TUI/about.py b/source/TUI/about.py index e538efe..85424e7 100644 --- a/source/TUI/about.py +++ b/source/TUI/about.py @@ -20,46 +20,53 @@ __all__ = ["About"] class About(Screen): BINDINGS = [ - Binding( - key="Q", - action="quit", - description=_("退出程序")), - Binding( - key="U", - action="update", - description=_("检查更新")), - Binding( - key="B", - action="back", - description=_("返回首页")), + Binding(key="Q", action="quit", description=_("退出程序")), + Binding(key="U", action="update", description=_("检查更新")), + Binding(key="B", action="back", description=_("返回首页")), ] - def __init__(self, ): + def __init__( + self, + ): super().__init__() def compose(self) -> ComposeResult: yield Header() yield Label( Text( - _("如果 XHS-Downloader 对您有帮助,请考虑为它点个 Star,感谢您的支持!"), + _( + "如果 XHS-Downloader 对您有帮助,请考虑为它点个 Star,感谢您的支持!" + ), style=INFO, ), classes="prompt", ) - yield Label(Text(_("Discord 社区"), style=PROMPT), classes="prompt", ) + yield Label( + Text(_("Discord 社区"), style=PROMPT), + classes="prompt", + ) yield Link( _("邀请链接:") + "https://discord.com/invite/ZYtmgKud9Y", url="https://discord.com/invite/ZYtmgKud9Y", tooltip=_("点击访问"), ) - yield Label(Text(_("作者的其他开源项目"), style=PROMPT), classes="prompt", ) - yield Label(Text("TikTokDownloader (抖音 / TikTok)", style=MASTER), classes="prompt", ) + yield Label( + Text(_("作者的其他开源项目"), style=PROMPT), + classes="prompt", + ) + yield Label( + Text("TikTokDownloader (抖音 / TikTok)", style=MASTER), + classes="prompt", + ) yield Link( "https://github.com/JoeanAmier/TikTokDownloader", url="https://github.com/JoeanAmier/TikTokDownloader", tooltip=_("点击访问"), ) - yield Label(Text("KS-Downloader (快手)", style=MASTER), classes="prompt", ) + yield Label( + Text("KS-Downloader (快手)", style=MASTER), + classes="prompt", + ) yield Link( "https://github.com/JoeanAmier/KS-Downloader", url="https://github.com/JoeanAmier/KS-Downloader", diff --git a/source/TUI/app.py b/source/TUI/app.py index 1692e71..6999aea 100644 --- a/source/TUI/app.py +++ b/source/TUI/app.py @@ -49,20 +49,31 @@ class XHSDownloader(App): Setting( self.parameter, ), - name="setting") - self.install_screen(Index(self.APP, ), name="index") + name="setting", + ) + self.install_screen( + Index( + self.APP, + ), + name="index", + ) self.install_screen(Loading(), name="loading") self.install_screen(About(), name="about") - self.install_screen(Record(self.APP, ), name="record") + self.install_screen( + Record( + self.APP, + ), + name="record", + ) await self.push_screen("index") self.SETTINGS.check_keys( self.parameter, logging, self.query_one(RichLog), - _("配置文件 settings.json 缺少必要的参数,请删除该文件,然后重新运行程序,自动生成默认配置文件!") + - f"\n{ - ">" * - 50}", + _( + "配置文件 settings.json 缺少必要的参数,请删除该文件,然后重新运行程序,自动生成默认配置文件!" + ) + + f"\n{'>' * 50}", ERROR, ) @@ -84,22 +95,41 @@ class XHSDownloader(App): self.uninstall_screen("loading") self.uninstall_screen("about") self.uninstall_screen("record") - self.install_screen(Index(self.APP, ), name="index") + self.install_screen( + Index( + self.APP, + ), + name="index", + ) self.install_screen( Setting( self.parameter, ), - name="setting") + name="setting", + ) self.install_screen(Loading(), name="loading") self.install_screen(About(), name="about") - self.install_screen(Record(self.APP, ), name="record") + self.install_screen( + Record( + self.APP, + ), + name="record", + ) await self.push_screen("index") def update_result(self, args: tuple[str, str]) -> None: - self.notify(args[0], severity=args[1], ) + self.notify( + args[0], + severity=args[1], + ) async def action_update(self): - await self.push_screen(Update(self.APP, ), callback=self.update_result) + await self.push_screen( + Update( + self.APP, + ), + callback=self.update_result, + ) async def close_database(self): await self.APP.id_recorder.cursor.close() diff --git a/source/TUI/index.py b/source/TUI/index.py index 5bfebc3..df8a5cb 100644 --- a/source/TUI/index.py +++ b/source/TUI/index.py @@ -42,7 +42,10 @@ class Index(Screen): Binding(key="A", action="about", description=_("关于项目")), ] - def __init__(self, app: XHS, ): + def __init__( + self, + app: XHS, + ): super().__init__() self.xhs = app self.url = None @@ -51,11 +54,7 @@ class Index(Screen): def compose(self) -> ComposeResult: yield Header() yield ScrollableContainer( - Label( - Text( - _("开源协议: ") + LICENCE, - style=MASTER) - ), + Label(Text(_("开源协议: ") + LICENCE, style=MASTER)), Link( Text( _("项目地址: ") + REPOSITORY, @@ -65,9 +64,8 @@ class Index(Screen): tooltip=_("点击访问"), ), Label( - Text( - _("请输入小红书图文/视频作品链接"), - style=PROMPT), classes="prompt", + Text(_("请输入小红书图文/视频作品链接"), style=PROMPT), + classes="prompt", ), Input(placeholder=_("多个链接之间使用空格分隔")), HorizontalScroll( @@ -76,7 +74,11 @@ class Index(Screen): Button(_("清空输入框"), id="reset"), ), ) - yield RichLog(markup=True, wrap=True, auto_scroll=True, ) + yield RichLog( + markup=True, + wrap=True, + auto_scroll=True, + ) yield Footer() def on_mount(self) -> None: @@ -84,15 +86,12 @@ class Index(Screen): self.url = self.query_one(Input) self.tip = self.query_one(RichLog) self.tip.write( - Text( - _("免责声明\n") + - f"\n{ - ">" * - 50}", - style=MASTER), + Text(_("免责声明\n") + f"\n{'>' * 50}", style=MASTER), scroll_end=True, ) - self.xhs.manager.print_proxy_tip(log=self.tip, ) + self.xhs.manager.print_proxy_tip( + log=self.tip, + ) @on(Button.Pressed, "#deal") async def deal_button(self): @@ -119,7 +118,14 @@ class Index(Screen): @work(exclusive=True) async def deal(self): await self.app.push_screen("loading") - if any(await self.xhs.extract(self.url.value, True, log=self.tip, data=False, )): + if any( + await self.xhs.extract( + self.url.value, + True, + log=self.tip, + data=False, + ) + ): self.url.value = "" else: self.tip.write( @@ -143,7 +149,11 @@ class Index(Screen): await self.app.run_action("settings") async def action_monitor(self): - await self.app.push_screen(Monitor(self.xhs, )) + await self.app.push_screen( + Monitor( + self.xhs, + ) + ) async def action_about(self): await self.app.push_screen("about") diff --git a/source/TUI/loading.py b/source/TUI/loading.py index 6f3d09a..585e912 100644 --- a/source/TUI/loading.py +++ b/source/TUI/loading.py @@ -10,7 +10,9 @@ __all__ = ["Loading"] class Loading(ModalScreen): - def __init__(self, ): + def __init__( + self, + ): super().__init__() def compose(self) -> ComposeResult: diff --git a/source/TUI/monitor.py b/source/TUI/monitor.py index f8f2787..7460a85 100644 --- a/source/TUI/monitor.py +++ b/source/TUI/monitor.py @@ -27,7 +27,10 @@ class Monitor(Screen): Binding(key="C", action="close", description=_("关闭监听")), ] - def __init__(self, app: XHS, ): + def __init__( + self, + app: XHS, + ): super().__init__() self.xhs = app @@ -44,15 +47,22 @@ class Monitor(Screen): @work(exclusive=True) async def run_monitor(self): - await self.xhs.monitor(download=True, log=self.query_one(RichLog), data=False, ) + await self.xhs.monitor( + download=True, + log=self.query_one(RichLog), + data=False, + ) await self.action_close() def on_mount(self) -> None: self.title = PROJECT self.query_one(RichLog).write( - Text(_( - "程序会自动读取并提取剪贴板中的小红书作品链接,并自动下载链接对应的作品文件,如需关闭,请点击关闭按钮,或者向剪贴板写入 “close” 文本!"), - style=MASTER), + Text( + _( + "程序会自动读取并提取剪贴板中的小红书作品链接,并自动下载链接对应的作品文件,如需关闭,请点击关闭按钮,或者向剪贴板写入 “close” 文本!" + ), + style=MASTER, + ), scroll_end=True, ) self.run_monitor() diff --git a/source/TUI/record.py b/source/TUI/record.py index 0bade2a..e607e7b 100644 --- a/source/TUI/record.py +++ b/source/TUI/record.py @@ -14,23 +14,37 @@ __all__ = ["Record"] class Record(ModalScreen): - def __init__(self, app: XHS, ): + def __init__( + self, + app: XHS, + ): super().__init__() self.xhs = app def compose(self) -> ComposeResult: yield Grid( Label(_("请输入待删除的小红书作品链接或作品 ID"), classes="prompt"), - Input(placeholder=_("支持输入作品 ID 或包含作品 ID 的作品链接,多个链接或 ID 之间使用空格分隔"), - id="id", ), + Input( + placeholder=_( + "支持输入作品 ID 或包含作品 ID 的作品链接,多个链接或 ID 之间使用空格分隔" + ), + id="id", + ), HorizontalScroll( - Button(_("删除指定作品 ID"), id="enter", ), - Button(_("返回首页"), id="close"), ), + Button( + _("删除指定作品 ID"), + id="enter", + ), + Button(_("返回首页"), id="close"), + ), id="record", ) async def delete(self, text: str): - text = await self.xhs.extract_links(text, None, ) + text = await self.xhs.extract_links( + text, + None, + ) text = self.xhs.extract_id(text) await self.xhs.id_recorder.delete(text) self.app.notify(_("删除下载记录成功")) diff --git a/source/TUI/setting.py b/source/TUI/setting.py index be2f96f..6822b03 100644 --- a/source/TUI/setting.py +++ b/source/TUI/setting.py @@ -23,49 +23,151 @@ class Setting(Screen): Binding(key="B", action="index", description=_("返回首页")), ] - def __init__(self, data: dict, ): + def __init__( + self, + data: dict, + ): super().__init__() self.data = data def compose(self) -> ComposeResult: yield Header() yield ScrollableContainer( - Label(_("作品数据 / 文件保存根路径"), classes="params", ), - Input(self.data["work_path"], placeholder=_("程序根路径"), valid_empty=True, - id="work_path", ), - Label(_("作品文件储存文件夹名称"), classes="params", ), - Input(self.data["folder_name"], placeholder="Download", id="folder_name", ), - Label(_("作品文件名称格式"), classes="params", ), - Input(self.data["name_format"], placeholder="发布时间 作者昵称 作品标题", valid_empty=True, - id="name_format", ), - Label("User-Agent", classes="params", ), - Input(self.data["user_agent"], placeholder=_("内置 Chrome User Agent"), valid_empty=True, - id="user_agent", ), - Label(_("小红书网页版 Cookie"), classes="params", ), - Input(placeholder=self.__check_cookie(), valid_empty=True, id="cookie", ), - Label(_("网络代理"), classes="params", ), - Input(self.data["proxy"], placeholder=_("不使用代理"), valid_empty=True, id="proxy", ), - Label(_("请求数据超时限制,单位:秒"), classes="params", ), - Input(str(self.data["timeout"]), placeholder="10", type="integer", id="timeout", ), - Label(_("下载文件时,每次从服务器获取的数据块大小,单位:字节"), classes="params", ), - Input(str(self.data["chunk"]), placeholder="1048576", type="integer", id="chunk", ), - Label(_("请求数据失败时,重试的最大次数"), classes="params", ), - Input(str(self.data["max_retry"]), placeholder="5", type="integer", id="max_retry", ), + Label( + _("作品数据 / 文件保存根路径"), + classes="params", + ), + Input( + self.data["work_path"], + placeholder=_("程序根路径"), + valid_empty=True, + id="work_path", + ), + Label( + _("作品文件储存文件夹名称"), + classes="params", + ), + Input( + self.data["folder_name"], + placeholder="Download", + id="folder_name", + ), + Label( + _("作品文件名称格式"), + classes="params", + ), + Input( + self.data["name_format"], + placeholder="发布时间 作者昵称 作品标题", + valid_empty=True, + id="name_format", + ), + Label( + "User-Agent", + classes="params", + ), + Input( + self.data["user_agent"], + placeholder=_("内置 Chrome User Agent"), + valid_empty=True, + id="user_agent", + ), + Label( + _("小红书网页版 Cookie"), + classes="params", + ), + Input( + placeholder=self.__check_cookie(), + valid_empty=True, + id="cookie", + ), + Label( + _("网络代理"), + classes="params", + ), + Input( + self.data["proxy"], + placeholder=_("不使用代理"), + valid_empty=True, + id="proxy", + ), + Label( + _("请求数据超时限制,单位:秒"), + classes="params", + ), + Input( + str(self.data["timeout"]), + placeholder="10", + type="integer", + id="timeout", + ), + Label( + _("下载文件时,每次从服务器获取的数据块大小,单位:字节"), + classes="params", + ), + Input( + str(self.data["chunk"]), + placeholder="1048576", + type="integer", + id="chunk", + ), + Label( + _("请求数据失败时,重试的最大次数"), + classes="params", + ), + Input( + str(self.data["max_retry"]), + placeholder="5", + type="integer", + id="max_retry", + ), Label(), Container( - Checkbox(_("记录作品详细数据"), id="record_data", value=self.data["record_data"], ), - Checkbox(_("作品文件夹归档模式"), id="folder_mode", value=self.data["folder_mode"], ), - Checkbox(_("视频作品下载开关"), id="video_download", value=self.data["video_download"], ), - Checkbox(_("图文作品下载开关"), id="image_download", value=self.data["image_download"], ), - classes="horizontal-layout"), + Checkbox( + _("记录作品详细数据"), + id="record_data", + value=self.data["record_data"], + ), + Checkbox( + _("作品文件夹归档模式"), + id="folder_mode", + value=self.data["folder_mode"], + ), + Checkbox( + _("视频作品下载开关"), + id="video_download", + value=self.data["video_download"], + ), + Checkbox( + _("图文作品下载开关"), + id="image_download", + value=self.data["image_download"], + ), + classes="horizontal-layout", + ), Label(), Container( - Checkbox(_("动图文件下载开关"), id="live_download", value=self.data["live_download"], ), - Checkbox(_("作品下载记录开关"), id="download_record", value=self.data["download_record"], ), - classes="horizontal-layout"), + Checkbox( + _("动图文件下载开关"), + id="live_download", + value=self.data["live_download"], + ), + Checkbox( + _("作品下载记录开关"), + id="download_record", + value=self.data["download_record"], + ), + classes="horizontal-layout", + ), Container( - Label(_("图片下载格式"), classes="params", ), - Label(_("程序语言"), classes="params", ), + Label( + _("图片下载格式"), + classes="params", + ), + Label( + _("程序语言"), + classes="params", + ), classes="horizontal-layout", ), Label(), @@ -74,17 +176,27 @@ class Setting(Screen): ("PNG", "WEBP"), value=self.data["image_format"].upper(), allow_blank=False, - id="image_format"), + id="image_format", + ), Select.from_values( ["zh_CN", "en_US"], value=self.data["language"], allow_blank=False, - id="language", ), - classes="horizontal-layout"), + id="language", + ), + classes="horizontal-layout", + ), Container( - Button(_("保存配置"), id="save", ), - Button(_("放弃更改"), id="abandon", ), - classes="settings_button", ), + Button( + _("保存配置"), + id="save", + ), + Button( + _("放弃更改"), + id="abandon", + ), + classes="settings_button", + ), ) yield Footer() @@ -98,25 +210,27 @@ class Setting(Screen): @on(Button.Pressed, "#save") def save_settings(self): - self.dismiss({ - "work_path": self.query_one("#work_path").value, - "folder_name": self.query_one("#folder_name").value, - "name_format": self.query_one("#name_format").value, - "user_agent": self.query_one("#user_agent").value, - "cookie": self.query_one("#cookie").value or self.data["cookie"], - "proxy": self.query_one("#proxy").value or None, - "timeout": int(self.query_one("#timeout").value), - "chunk": int(self.query_one("#chunk").value), - "max_retry": int(self.query_one("#max_retry").value), - "record_data": self.query_one("#record_data").value, - "image_format": self.query_one("#image_format").value, - "folder_mode": self.query_one("#folder_mode").value, - "language": self.query_one("#language").value, - "image_download": self.query_one("#image_download").value, - "video_download": self.query_one("#video_download").value, - "live_download": self.query_one("#live_download").value, - "download_record": self.query_one("#download_record").value, - }) + self.dismiss( + { + "work_path": self.query_one("#work_path").value, + "folder_name": self.query_one("#folder_name").value, + "name_format": self.query_one("#name_format").value, + "user_agent": self.query_one("#user_agent").value, + "cookie": self.query_one("#cookie").value or self.data["cookie"], + "proxy": self.query_one("#proxy").value or None, + "timeout": int(self.query_one("#timeout").value), + "chunk": int(self.query_one("#chunk").value), + "max_retry": int(self.query_one("#max_retry").value), + "record_data": self.query_one("#record_data").value, + "image_format": self.query_one("#image_format").value, + "folder_mode": self.query_one("#folder_mode").value, + "language": self.query_one("#language").value, + "image_download": self.query_one("#image_download").value, + "video_download": self.query_one("#video_download").value, + "live_download": self.query_one("#live_download").value, + "download_record": self.query_one("#download_record").value, + } + ) @on(Button.Pressed, "#abandon") def reset(self): diff --git a/source/TUI/update.py b/source/TUI/update.py index b12cc5e..dab1fa9 100644 --- a/source/TUI/update.py +++ b/source/TUI/update.py @@ -15,7 +15,10 @@ __all__ = ["Update"] class Update(ModalScreen): - def __init__(self, app: XHS, ): + def __init__( + self, + app: XHS, + ): super().__init__() self.xhs = app @@ -29,9 +32,16 @@ class Update(ModalScreen): @work(exclusive=True) async def check_update(self) -> None: try: - url = await self.xhs.html.request_url(RELEASES, False, None, timeout=5, ) + url = await self.xhs.html.request_url( + RELEASES, + False, + None, + timeout=5, + ) version = url.split("/")[-1] - match self.compare_versions(f"{XHS.VERSION_MAJOR}.{XHS.VERSION_MINOR}", version, XHS.VERSION_BETA): + match self.compare_versions( + f"{XHS.VERSION_MAJOR}.{XHS.VERSION_MINOR}", version, XHS.VERSION_BETA + ): case 4: args = ( _("检测到新版本:{0}.{1}").format( @@ -58,7 +68,10 @@ class Update(ModalScreen): case _: raise ValueError except ValueError: - args = (_("检测新版本失败"), "error",) + args = ( + _("检测新版本失败"), + "error", + ) self.dismiss(args) def on_mount(self) -> None: @@ -66,11 +79,10 @@ class Update(ModalScreen): @staticmethod def compare_versions( - current_version: str, - target_version: str, - is_development: bool) -> int: - current_major, current_minor = map(int, current_version.split('.')) - target_major, target_minor = map(int, target_version.split('.')) + current_version: str, target_version: str, is_development: bool + ) -> int: + current_major, current_minor = map(int, current_version.split(".")) + target_major, target_minor = map(int, target_version.split(".")) if target_major > current_major: return 4 diff --git a/source/__init__.py b/source/__init__.py index 3818bec..35b8dc6 100644 --- a/source/__init__.py +++ b/source/__init__.py @@ -3,4 +3,9 @@ from .TUI import XHSDownloader from .application import XHS from .module import Settings -__all__ = ['XHS', 'XHSDownloader', 'cli', 'Settings', ] +__all__ = [ + "XHS", + "XHSDownloader", + "cli", + "Settings", +] diff --git a/source/application/app.py b/source/application/app.py index 4ece032..9f26de5 100644 --- a/source/application/app.py +++ b/source/application/app.py @@ -48,11 +48,17 @@ __all__ = ["XHS"] def _data_cache(function): - async def inner(self, data: dict, ): + async def inner( + self, + data: dict, + ): if self.manager.record_data: download = data["下载地址"] lives = data["动图地址"] - await function(self, data, ) + await function( + self, + data, + ) data["下载地址"] = download data["动图地址"] = lives @@ -137,11 +143,14 @@ class XHS: def __extract_image(self, container: dict, data: Namespace): container["下载地址"], container["动图地址"] = self.image.get_image_link( - data, self.manager.image_format) + data, self.manager.image_format + ) def __extract_video(self, container: dict, data: Namespace): container["下载地址"] = self.video.get_video_link(data) - container["动图地址"] = [None, ] + container["动图地址"] = [ + None, + ] async def __download_files( self, @@ -154,8 +163,7 @@ class XHS: name = self.__naming_rules(container) if (u := container["下载地址"]) and download: if await self.skip_download(i := container["作品ID"]): - logging( - log, _("作品 {0} 存在下载记录,跳过下载").format(i)) + logging(log, _("作品 {0} 存在下载记录,跳过下载").format(i)) else: path, result = await self.download.run( u, @@ -172,7 +180,10 @@ class XHS: await self.save_data(container) @_data_cache - async def save_data(self, data: dict, ): + async def save_data( + self, + data: dict, + ): data["采集时间"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") data["下载地址"] = " ".join(data["下载地址"]) data["动图地址"] = " ".join(i or "NaN" for i in data["动图地址"]) @@ -196,10 +207,19 @@ class XHS: if not urls: logging(log, _("提取小红书作品链接失败"), WARNING) else: - logging( - log, _("共 {0} 个小红书作品待处理...").format(len(urls))) + logging(log, _("共 {0} 个小红书作品待处理...").format(len(urls))) # return urls # 调试代码 - return [await self.__deal_extract(i, download, index, log, bar, data, ) for i in urls] + return [ + await self.__deal_extract( + i, + download, + index, + log, + bar, + data, + ) + for i in urls + ] async def extract_cli( self, @@ -214,7 +234,14 @@ class XHS: if not url: logging(log, _("提取小红书作品链接失败"), WARNING) else: - await self.__deal_extract(url[0], download, index, log, bar, data, ) + await self.__deal_extract( + url[0], + download, + index, + log, + bar, + data, + ) async def extract_links(self, url: str, log) -> list: urls = [] @@ -253,7 +280,11 @@ class XHS: logging(log, msg) return {"message": msg} logging(log, _("开始处理作品:{0}").format(i)) - html = await self.html.request_url(url, log=log, cookie=cookie, ) + html = await self.html.request_url( + url, + log=log, + cookie=cookie, + ) namespace = self.__generate_data_object(html) if not namespace: logging(log, _("{0} 获取数据失败").format(i), ERROR) @@ -299,10 +330,12 @@ class XHS: return beautify_string( self.CLEANER.filter_name( self.manager.SEPARATE.join(values), - default=self.manager.SEPARATE.join(( - data["作者ID"], - data["作品ID"], - )), + default=self.manager.SEPARATE.join( + ( + data["作者ID"], + data["作品ID"], + ) + ), ), length=128, ) @@ -315,10 +348,13 @@ class XHS: return self.manager.filter_name(data["作者昵称"]) or data["作者ID"] def __get_name_title(self, data: dict) -> str: - return beautify_string( - self.manager.filter_name(data["作品标题"]), - 64, - ) or data["作品ID"] + return ( + beautify_string( + self.manager.filter_name(data["作品标题"]), + 64, + ) + or data["作品ID"] + ) async def monitor( self, @@ -331,11 +367,15 @@ class XHS: logging( None, _( - "程序会自动读取并提取剪贴板中的小红书作品链接,并自动下载链接对应的作品文件,如需关闭,请点击关闭按钮,或者向剪贴板写入 “close” 文本!"), + "程序会自动读取并提取剪贴板中的小红书作品链接,并自动下载链接对应的作品文件,如需关闭,请点击关闭按钮,或者向剪贴板写入 “close” 文本!" + ), style=MASTER, ) self.event.clear() - await gather(self.__push_link(delay), self.__receive_link(delay, download, None, log, bar, data)) + await gather( + self.__push_link(delay), + self.__receive_link(delay, download, None, log, bar, data), + ) async def __push_link(self, delay: int): while not self.event.is_set(): @@ -373,10 +413,16 @@ class XHS: @staticmethod def read_browser_cookie(value: str | int) -> str: - return BrowserCookie.get( - value, - domains=["xiaohongshu.com", ], - ) if value else "" + return ( + BrowserCookie.get( + value, + domains=[ + "xiaohongshu.com", + ], + ) + if value + else "" + ) # @staticmethod # async def index(request): @@ -425,11 +471,17 @@ class XHS: # await self.runner.cleanup() # logging(log, _("Web API 服务器已关闭!")) - async def run_server(self, host="0.0.0.0", port=8000, log_level="info", ): + async def run_server( + self, + host="0.0.0.0", + port=8000, + log_level="info", + ): self.server = FastAPI( debug=self.VERSION_BETA, title="XHS-Downloader", - version=f"{self.VERSION_MAJOR}.{self.VERSION_MINOR}") + version=f"{self.VERSION_MAJOR}.{self.VERSION_MINOR}", + ) self.setup_routes() config = Config( self.server, @@ -445,7 +497,10 @@ class XHS: async def index(): return RedirectResponse(url=REPOSITORY) - @self.server.post("/xhs/", response_model=ExtractData, ) + @self.server.post( + "/xhs/", + response_model=ExtractData, + ) async def handle(extract: ExtractParams): url = await self.extract_links(extract.url, None) if not url: @@ -466,6 +521,5 @@ class XHS: msg = _("获取小红书作品数据失败") data = None return ExtractData( - message=msg, - url=url[0] if url else extract.url, - data=data) + message=msg, url=url[0] if url else extract.url, data=data + ) diff --git a/source/application/download.py b/source/application/download.py index 8e608fb..0806d9a 100644 --- a/source/application/download.py +++ b/source/application/download.py @@ -23,7 +23,7 @@ from ..translation import _ if TYPE_CHECKING: from httpx import AsyncClient -__all__ = ['Download'] +__all__ = ["Download"] class Download: @@ -38,7 +38,10 @@ class Download: "audio/mpeg": "mp3", } - def __init__(self, manager: Manager, ): + def __init__( + self, + manager: Manager, + ): self.manager = manager self.folder = manager.folder self.temp = manager.temp @@ -98,7 +101,8 @@ class Download: format_, log, bar, - ) for url, name, format_ in tasks + ) + for url, name, format_ in tasks ] tasks = await gather(*tasks) return path, tasks @@ -109,11 +113,8 @@ class Download: return path def __ready_download_video( - self, - urls: list[str], - path: Path, - name: str, - log) -> list: + self, urls: list[str], path: Path, name: str, log + ) -> list: if not self.video_download: logging(log, _("视频作品下载功能已关闭,跳过下载")) return [] @@ -128,7 +129,8 @@ class Download: index: list | tuple | None, path: Path, name: str, - log) -> list: + log, + ) -> list: tasks = [] if not self.image_download: logging(log, _("图文作品下载功能已关闭,跳过下载")) @@ -146,28 +148,38 @@ class Download: for s in self.image_format_list ): tasks.append([j[0], file, self.image_format]) - if not self.live_download or not j[1] or self.__check_exists_path( + if ( + not self.live_download + or not j[1] + or self.__check_exists_path( path, f"{file}.{self.live_format}", log, + ) ): continue tasks.append([j[1], file, self.live_format]) return tasks - def __check_exists_glob(self, path: Path, name: str, log, ) -> bool: + def __check_exists_glob( + self, + path: Path, + name: str, + log, + ) -> bool: if any(path.glob(name)): - logging( - log, _( - "{0} 文件已存在,跳过下载").format(name)) + logging(log, _("{0} 文件已存在,跳过下载").format(name)) return True return False - def __check_exists_path(self, path: Path, name: str, log, ) -> bool: + def __check_exists_path( + self, + path: Path, + name: str, + log, + ) -> bool: if path.joinpath(name).exists(): - logging( - log, _( - "{0} 文件已存在,跳过下载").format(name)) + logging(log, _("{0} 文件已存在,跳过下载").format(name)) return True return False @@ -199,9 +211,16 @@ class Download: # return False # temp = self.temp.joinpath(f"{name}.{suffix}") temp = self.temp.joinpath(f"{name}.{format_}") - self.__update_headers_range(headers, temp, ) + self.__update_headers_range( + headers, + temp, + ) try: - async with self.client.stream("GET", url, headers=headers, ) as response: + async with self.client.stream( + "GET", + url, + headers=headers, + ) as response: await sleep_time() if response.status_code == 416: raise CacheError( @@ -234,8 +253,9 @@ class Download: # self.__create_progress(bar, None) logging( log, - _( - "网络异常,{0} 下载失败,错误信息: {1}").format(name, repr(error)), + _("网络异常,{0} 下载失败,错误信息: {1}").format( + name, repr(error) + ), ERROR, ) return False @@ -248,7 +268,11 @@ class Download: ) @staticmethod - def __create_progress(bar, total: int | None, completed=0, ): + def __create_progress( + bar, + total: int | None, + completed=0, + ): if bar: bar.update(total=total, completed=completed) @@ -273,10 +297,8 @@ class Download: ) await sleep_time() response.raise_for_status() - suffix = self.__extract_type( - response.headers.get("Content-Type")) or suffix - length = response.headers.get( - "Content-Length", 0) + suffix = self.__extract_type(response.headers.get("Content-Type")) or suffix + length = response.headers.get("Content-Length", 0) return int(length), suffix @staticmethod @@ -303,12 +325,14 @@ class Download: async with open(temp, "rb") as f: file_start = await f.read(FILE_SIGNATURES_LENGTH) for offset, signature, suffix in FILE_SIGNATURES: - if file_start[offset:offset + len(signature)] == signature: + if file_start[offset: offset + len(signature)] == signature: return path.joinpath(f"{name}.{suffix}") except Exception as error: logging( log, - _("文件 {0} 格式判断失败,错误信息:{1}").format(temp.name, repr(error)), + _("文件 {0} 格式判断失败,错误信息:{1}").format( + temp.name, repr(error) + ), ERROR, ) return path.joinpath(f"{name}.{default_suffix}") diff --git a/source/application/explore.py b/source/application/explore.py index 4b989a1..68da201 100644 --- a/source/application/explore.py +++ b/source/application/explore.py @@ -3,7 +3,7 @@ from datetime import datetime from ..expansion import Namespace from ..translation import _ -__all__ = ['Explore'] +__all__ = ["Explore"] class Explore: @@ -27,10 +27,8 @@ class Explore: @staticmethod def __extract_interact_info(container: dict, data: Namespace) -> None: - container["收藏数量"] = data.safe_extract( - "interactInfo.collectedCount", "-1") - container["评论数量"] = data.safe_extract( - "interactInfo.commentCount", "-1") + container["收藏数量"] = data.safe_extract("interactInfo.collectedCount", "-1") + container["评论数量"] = data.safe_extract("interactInfo.commentCount", "-1") container["分享数量"] = data.safe_extract("interactInfo.shareCount", "-1") container["点赞数量"] = data.safe_extract("interactInfo.likedCount", "-1") @@ -38,33 +36,37 @@ class Explore: def __extract_tags(container: dict, data: Namespace): tags = data.safe_extract("tagList", []) container["作品标签"] = " ".join( - Namespace.object_extract( - i, "name") for i in tags) + Namespace.object_extract(i, "name") for i in tags + ) def __extract_info(self, container: dict, data: Namespace): container["作品ID"] = data.safe_extract("noteId") - container["作品链接"] = f"https://www.xiaohongshu.com/explore/{container["作品ID"]}" + container["作品链接"] = ( + f"https://www.xiaohongshu.com/explore/{container['作品ID']}" + ) container["作品标题"] = data.safe_extract("title") container["作品描述"] = data.safe_extract("desc") container["作品类型"] = self.explore_type.get( - data.safe_extract("type"), _("未知")) + data.safe_extract("type"), _("未知") + ) # container["IP归属地"] = data.safe_extract("ipLocation") def __extract_time(self, container: dict, data: Namespace): - container["发布时间"] = datetime.fromtimestamp( - time / - 1000).strftime( - self.time_format) if ( - time := data.safe_extract("time")) else _("未知") - container["最后更新时间"] = datetime.fromtimestamp( - last / - 1000).strftime( - self.time_format) if ( - last := data.safe_extract("lastUpdateTime")) else _("未知") + container["发布时间"] = ( + datetime.fromtimestamp(time / 1000).strftime(self.time_format) + if (time := data.safe_extract("time")) + else _("未知") + ) + container["最后更新时间"] = ( + datetime.fromtimestamp(last / 1000).strftime(self.time_format) + if (last := data.safe_extract("lastUpdateTime")) + else _("未知") + ) @staticmethod def __extract_user(container: dict, data: Namespace): container["作者昵称"] = data.safe_extract("user.nickname") container["作者ID"] = data.safe_extract("user.userId") - container["作者链接"] = f"https://www.xiaohongshu.com/user/profile/{ - container["作者ID"]}" + container["作者链接"] = ( + f"https://www.xiaohongshu.com/user/profile/{container['作者ID']}" + ) diff --git a/source/application/image.py b/source/application/image.py index cc9045d..2455f65 100644 --- a/source/application/image.py +++ b/source/application/image.py @@ -1,7 +1,7 @@ from source.expansion import Namespace from .request import Html -__all__ = ['Image'] +__all__ = ["Image"] class Image: @@ -10,16 +10,18 @@ class Image: images = data.safe_extract("imageList", []) live_link = cls.__get_live_link(images) token_list = [ - cls.__extract_image_token( - Namespace.object_extract( - i, "urlDefault")) for i in images] + cls.__extract_image_token(Namespace.object_extract(i, "urlDefault")) + for i in images + ] match format_: case "png": - return [Html.format_url(cls.__generate_png_link(i)) - for i in token_list], live_link + return [ + Html.format_url(cls.__generate_png_link(i)) for i in token_list + ], live_link case "webp": - return [Html.format_url(cls.__generate_webp_link(i)) - for i in token_list], live_link + return [ + Html.format_url(cls.__generate_webp_link(i)) for i in token_list + ], live_link case _: raise ValueError diff --git a/source/application/request.py b/source/application/request.py index cc806a9..62f9213 100644 --- a/source/application/request.py +++ b/source/application/request.py @@ -11,7 +11,10 @@ __all__ = ["Html"] class Html: - def __init__(self, manager: Manager, ): + def __init__( + self, + manager: Manager, + ): self.retry = manager.retry self.client = manager.request_client self.headers = manager.headers @@ -26,23 +29,32 @@ class Html: cookie: str = None, **kwargs, ) -> str: - headers = self.select_headers(url, cookie, ) + headers = self.select_headers( + url, + cookie, + ) try: match content: case True: - response = await self.__request_url_get(url, headers, **kwargs, ) + response = await self.__request_url_get( + url, + headers, + **kwargs, + ) await sleep_time() response.raise_for_status() return response.text case False: - response = await self.__request_url_head(url, headers, **kwargs, ) + response = await self.__request_url_head( + url, + headers, + **kwargs, + ) await sleep_time() return str(response.url) except HTTPError as error: logging( - log, - _("网络异常,{0} 请求失败: {1}").format(url, repr(error)), - ERROR + log, _("网络异常,{0} 请求失败: {1}").format(url, repr(error)), ERROR ) return "" @@ -50,19 +62,33 @@ class Html: def format_url(url: str) -> str: return bytes(url, "utf-8").decode("unicode_escape") - def select_headers(self, url: str, cookie: str = None, ) -> dict: + def select_headers( + self, + url: str, + cookie: str = None, + ) -> dict: if "explore" not in url: return self.blank_headers return self.headers | {"Cookie": cookie} if cookie else self.headers - async def __request_url_head(self, url: str, headers: dict, **kwargs, ): + async def __request_url_head( + self, + url: str, + headers: dict, + **kwargs, + ): return await self.client.head( url, headers=headers, **kwargs, ) - async def __request_url_get(self, url: str, headers: dict, **kwargs, ): + async def __request_url_get( + self, + url: str, + headers: dict, + **kwargs, + ): return await self.client.get( url, headers=headers, diff --git a/source/application/video.py b/source/application/video.py index e8367ce..678b819 100644 --- a/source/application/video.py +++ b/source/application/video.py @@ -1,7 +1,7 @@ from source.expansion import Namespace from .request import Html -__all__ = ['Video'] +__all__ = ["Video"] class Video: @@ -13,5 +13,8 @@ class Video: @classmethod def get_video_link(cls, data: Namespace) -> list: - return [Html.format_url(f"https://sns-video-bd.xhscdn.com/{t}")] if ( - t := data.safe_extract(".".join(cls.VIDEO_LINK))) else [] + return ( + [Html.format_url(f"https://sns-video-bd.xhscdn.com/{t}")] + if (t := data.safe_extract(".".join(cls.VIDEO_LINK))) + else [] + ) diff --git a/source/expansion/browser.py b/source/expansion/browser.py index 28a5752..33e08e3 100644 --- a/source/expansion/browser.py +++ b/source/expansion/browser.py @@ -38,25 +38,44 @@ class BrowserCookie: } @classmethod - def run(cls, domains: list[str], console: Console = None, ) -> str: + def run( + cls, + domains: list[str], + console: Console = None, + ) -> str: console = console or Console() - options = "\n".join(f"{i}. {k}: {v[1]}" for i, (k, v) in enumerate(cls.SUPPORT_BROWSER.items(), start=1)) + options = "\n".join( + f"{i}. {k}: {v[1]}" + for i, (k, v) in enumerate(cls.SUPPORT_BROWSER.items(), start=1) + ) if browser := console.input( - _("读取指定浏览器的 Cookie 并写入配置文件\n" - "Windows 系统需要以管理员身份运行程序才能读取 Chromium、Chrome、Edge 浏览器 Cookie!\n" - "{options}\n请输入浏览器名称或序号:").format(options=options), ): - return cls.get(browser, domains, console, ) + _( + "读取指定浏览器的 Cookie 并写入配置文件\n" + "Windows 系统需要以管理员身份运行程序才能读取 Chromium、Chrome、Edge 浏览器 Cookie!\n" + "{options}\n请输入浏览器名称或序号:" + ).format(options=options), + ): + return cls.get( + browser, + domains, + console, + ) console.print(_("未选择浏览器!")) @classmethod - def get(cls, browser: str | int, domains: list[str], console: Console = None, ) -> str: + def get( + cls, + browser: str | int, + domains: list[str], + console: Console = None, + ) -> str: console = console or Console() if not (browser := cls.__browser_object(browser)): console.print(_("浏览器名称或序号输入错误!")) return "" try: cookies = browser(domains=domains) - return "; ".join(f"{i["name"]}={i["value"]}" for i in cookies) + return "; ".join(f"{i['name']}={i['value']}" for i in cookies) except RuntimeError: console.print(_("获取 Cookie 失败,未找到 Cookie 数据!")) return "" diff --git a/source/expansion/cleaner.py b/source/expansion/cleaner.py index 68bb661..154cc09 100644 --- a/source/expansion/cleaner.py +++ b/source/expansion/cleaner.py @@ -30,7 +30,7 @@ class Cleaner: "|": "", "<": "", ">": "", - "\"": "", + '"': "", "?": "", ":": "", "*": "", @@ -80,7 +80,10 @@ class Cleaner: text = self.filter(text) - text = replace_emoji(text, replace, ) + text = replace_emoji( + text, + replace, + ) text = self.clear_spaces(text) @@ -94,9 +97,16 @@ class Cleaner: return " ".join(string.split()) @classmethod - def remove_control_characters(cls, text, replace="", ): + def remove_control_characters( + cls, + text, + replace="", + ): # 使用正则表达式匹配所有控制字符 - return cls.CONTROL_CHARACTERS.sub(replace, text, ) + return cls.CONTROL_CHARACTERS.sub( + replace, + text, + ) if __name__ == "__main__": diff --git a/source/expansion/converter.py b/source/expansion/converter.py index 578b141..d1e01bd 100644 --- a/source/expansion/converter.py +++ b/source/expansion/converter.py @@ -16,9 +16,7 @@ class Converter: ) def run(self, content: str) -> dict: - return self._filter_object( - self._convert_object( - self._extract_object(content))) + return self._filter_object(self._convert_object(self._extract_object(content))) def _extract_object(self, html: str) -> str: if not html: diff --git a/source/expansion/file_folder.py b/source/expansion/file_folder.py index 237cafd..acb3adc 100644 --- a/source/expansion/file_folder.py +++ b/source/expansion/file_folder.py @@ -15,7 +15,9 @@ def remove_empty_directories(path: Path) -> None: "\\_", "\\__", } - for dir_path, dir_names, file_names in path.walk(top_down=False, ): + for dir_path, dir_names, file_names in path.walk( + top_down=False, + ): if any(i in str(dir_path) for i in exclude): continue if not dir_names and not file_names: diff --git a/source/expansion/namespace.py b/source/expansion/namespace.py index ddf55de..08017c6 100644 --- a/source/expansion/namespace.py +++ b/source/expansion/namespace.py @@ -14,7 +14,8 @@ class Namespace: def depth_conversion(element): if isinstance(element, dict): return SimpleNamespace( - **{k: depth_conversion(v) for k, v in element.items()}) + **{k: depth_conversion(v) for k, v in element.items()} + ) elif isinstance(element, list): return [depth_conversion(item) for item in element] else: @@ -25,14 +26,16 @@ class Namespace: def safe_extract( self, attribute_chain: str, - default: Union[str, int, list, dict, SimpleNamespace] = ""): + default: Union[str, int, list, dict, SimpleNamespace] = "", + ): return self.__safe_extract(self.data, attribute_chain, default) @staticmethod def __safe_extract( data_object: SimpleNamespace, attribute_chain: str, - default: Union[str, int, list, dict, SimpleNamespace] = "", ): + default: Union[str, int, list, dict, SimpleNamespace] = "", + ): data = deepcopy(data_object) attributes = attribute_chain.split(".") for attribute in attributes: @@ -61,7 +64,8 @@ class Namespace: return cls.__safe_extract( data_object, attribute_chain, - default, ) + default, + ) @property def __dict__(self): @@ -70,10 +74,11 @@ class Namespace: @classmethod def convert_to_dict(cls, data) -> dict: return { - key: cls.convert_to_dict(value) if isinstance( - value, - SimpleNamespace) else value for key, - value in vars(data).items()} + key: cls.convert_to_dict(value) + if isinstance(value, SimpleNamespace) + else value + for key, value in vars(data).items() + } def __bool__(self): return bool(vars(self.data)) diff --git a/source/expansion/truncate.py b/source/expansion/truncate.py index 96efced..ea99010 100644 --- a/source/expansion/truncate.py +++ b/source/expansion/truncate.py @@ -2,7 +2,7 @@ from unicodedata import name def is_chinese_char(char: str) -> bool: - return 'CJK' in name(char, "") + return "CJK" in name(char, "") def truncate_string(s: str, length: int = 64) -> str: diff --git a/source/module/manager.py b/source/module/manager.py index 98cb779..cdd1541 100644 --- a/source/module/manager.py +++ b/source/module/manager.py @@ -72,10 +72,10 @@ class Manager: self.path = self.__check_path(path) self.folder = self.__check_folder(folder) self.blank_headers = HEADERS | { - 'user-agent': user_agent or USERAGENT, + "user-agent": user_agent or USERAGENT, } self.headers = self.blank_headers | { - 'cookie': cookie, + "cookie": cookie, } self.retry = retry self.chunk = chunk @@ -86,10 +86,13 @@ class Manager: self.download_record = self.check_bool(download_record, True) self.proxy_tip = None self.proxy = self.__check_proxy(proxy) - self.print_proxy_tip(_print, ) + self.print_proxy_tip( + _print, + ) self.request_client = AsyncClient( - headers=self.headers | { - 'referer': 'https://www.xiaohongshu.com/', + headers=self.headers + | { + "referer": "https://www.xiaohongshu.com/", }, timeout=timeout, verify=False, @@ -177,11 +180,7 @@ class Manager: def __check_name_format(self, format_: str) -> str: keys = format_.split() return next( - ( - "发布时间 作者昵称 作品标题" - for key in keys - if key not in self.NAME_KEYS - ), + ("发布时间 作者昵称 作品标题" for key in keys if key not in self.NAME_KEYS), format_, ) @@ -198,7 +197,7 @@ class Manager: timeout=10, headers={ "User-Agent": USERAGENT, - } + }, ) response.raise_for_status() self.proxy_tip = (_("代理 {0} 测试成功").format(proxy),) @@ -220,7 +219,11 @@ class Manager: WARNING, ) - def print_proxy_tip(self, _print: bool = True, log=None, ) -> None: + def print_proxy_tip( + self, + _print: bool = True, + log=None, + ) -> None: if _print and self.proxy_tip: logging(log, *self.proxy_tip) @@ -240,6 +243,6 @@ class Manager: # 使用空字符串替换匹配到的部分 cookie_string = sub(pattern, "", cookie_string) # 去除多余的分号和空格 - cookie_string = sub(r';\s*$', "", cookie_string) # 删除末尾的分号和空格 - cookie_string = sub(r';\s*;', ";", cookie_string) # 删除中间多余分号后的空格 - return cookie_string.strip('; ') + cookie_string = sub(r";\s*$", "", cookie_string) # 删除末尾的分号和空格 + cookie_string = sub(r";\s*;", ";", cookie_string) # 删除中间多余分号后的空格 + return cookie_string.strip("; ") diff --git a/source/module/recorder.py b/source/module/recorder.py index f18fc66..3096e21 100644 --- a/source/module/recorder.py +++ b/source/module/recorder.py @@ -5,7 +5,10 @@ from aiosqlite import connect from ..module import Manager -__all__ = ["IDRecorder", "DataRecorder", ] +__all__ = [ + "IDRecorder", + "DataRecorder", +] class IDRecorder: @@ -18,7 +21,9 @@ class IDRecorder: async def _connect_database(self): self.database = await connect(self.file) self.cursor = await self.database.cursor() - await self.database.execute("CREATE TABLE IF NOT EXISTS explore_id (ID TEXT PRIMARY KEY);") + await self.database.execute( + "CREATE TABLE IF NOT EXISTS explore_id (ID TEXT PRIMARY KEY);" + ) await self.database.commit() async def select(self, id_: str): @@ -95,11 +100,14 @@ class DataRecorder(IDRecorder): async def add(self, **kwargs) -> None: if self.switch: - await self.database.execute(f"""REPLACE INTO explore_data ( + await self.database.execute( + f"""REPLACE INTO explore_data ( {", ".join(i[0] for i in self.DATA_TABLE)} ) VALUES ( {", ".join("?" for _ in kwargs)} - );""", self.__generate_values(kwargs)) + );""", + self.__generate_values(kwargs), + ) await self.database.commit() async def __delete(self, id_: str) -> None: diff --git a/source/module/settings.py b/source/module/settings.py index 0258ed9..f6e2e5e 100644 --- a/source/module/settings.py +++ b/source/module/settings.py @@ -6,7 +6,7 @@ from platform import system from .static import ROOT from .static import USERAGENT -__all__ = ['Settings'] +__all__ = ["Settings"] class Settings: diff --git a/source/module/static.py b/source/module/static.py index c69ad6e..64b34c2 100644 --- a/source/module/static.py +++ b/source/module/static.py @@ -3,10 +3,11 @@ from pathlib import Path VERSION_MAJOR = 2 VERSION_MINOR = 5 VERSION_BETA = True -__version__ = f"{VERSION_MAJOR}.{VERSION_MINOR}.{"beta" if VERSION_BETA else "stable"}" +__version__ = f"{VERSION_MAJOR}.{VERSION_MINOR}.{'beta' if VERSION_BETA else 'stable'}" ROOT = Path(__file__).resolve().parent.parent.parent -PROJECT = f"XHS-Downloader V{VERSION_MAJOR}.{ -VERSION_MINOR} {"Beta" if VERSION_BETA else "Stable"}" +PROJECT = f"XHS-Downloader V{VERSION_MAJOR}.{VERSION_MINOR} { +'Beta' if VERSION_BETA else 'Stable' +}" REPOSITORY = "https://github.com/JoeanAmier/XHS-Downloader" LICENCE = "GNU General Public License v3.0" @@ -14,8 +15,10 @@ RELEASES = "https://github.com/JoeanAmier/XHS-Downloader/releases/latest" USERSCRIPT = "https://raw.githubusercontent.com/JoeanAmier/XHS-Downloader/master/static/XHS-Downloader.js" -USERAGENT = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 " - "Safari/537.36") +USERAGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 " + "Safari/537.36" +) HEADERS = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8," @@ -32,26 +35,35 @@ ERROR = "b bright_red" WARNING = "b bright_yellow" INFO = "b bright_green" -FILE_SIGNATURES: tuple[tuple[int, bytes, str,], ...] = ( +FILE_SIGNATURES: tuple[ + tuple[ + int, + bytes, + str, + ], + ..., +] = ( # 分别为偏移量(字节)、十六进制签名、后缀 # 参考:https://en.wikipedia.org/wiki/List_of_file_signatures # 参考:https://www.garykessler.net/library/file_sigs.html - (0, b"\xFF\xD8\xFF", "jpeg"), - (0, b"\x89\x50\x4E\x47\x0D\x0A\x1A\x0A", "png"), + (0, b"\xff\xd8\xff", "jpeg"), + (0, b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a", "png"), (4, b"\x66\x74\x79\x70\x61\x76\x69\x66", "avif"), (4, b"\x66\x74\x79\x70\x68\x65\x69\x63", "heic"), (8, b"\x57\x45\x42\x50", "webp"), - (4, b"\x66\x74\x79\x70\x4D\x53\x4E\x56", "mp4"), - (4, b"\x66\x74\x79\x70\x69\x73\x6F\x6D", "mp4"), - (4, b"\x66\x74\x79\x70\x6D\x70\x34\x32", "m4v"), + (4, b"\x66\x74\x79\x70\x4d\x53\x4e\x56", "mp4"), + (4, b"\x66\x74\x79\x70\x69\x73\x6f\x6d", "mp4"), + (4, b"\x66\x74\x79\x70\x6d\x70\x34\x32", "m4v"), (4, b"\x66\x74\x79\x70\x71\x74\x20\x20", "mov"), - (0, b"\x1A\x45\xDF\xA3", "mkv"), - (0, b"\x00\x00\x01\xB3", "mpg"), - (0, b"\x00\x00\x01\xBA", "mpg"), + (0, b"\x1a\x45\xdf\xa3", "mkv"), + (0, b"\x00\x00\x01\xb3", "mpg"), + (0, b"\x00\x00\x01\xba", "mpg"), (0, b"\x46\x4c\x56\x01", "flv"), (8, b"\x41\x56\x49\x20", "avi"), ) -FILE_SIGNATURES_LENGTH = max(offset + len(signature) for offset, signature, _ in FILE_SIGNATURES) +FILE_SIGNATURES_LENGTH = max( + offset + len(signature) for offset, signature, _ in FILE_SIGNATURES +) MAX_WORKERS: int = 4 diff --git a/source/module/tools.py b/source/module/tools.py index 49ae686..8a830fe 100644 --- a/source/module/tools.py +++ b/source/module/tools.py @@ -22,7 +22,10 @@ def retry(function): def logging(log, text, style=INFO): string = Text(text, style=style) if log: - log.write(string, scroll_end=True, ) + log.write( + string, + scroll_end=True, + ) else: print(string) diff --git a/source/translation/translate.py b/source/translation/translate.py index 691c808..0e649e1 100644 --- a/source/translation/translate.py +++ b/source/translation/translate.py @@ -18,7 +18,7 @@ class TranslationManager: def __init__(self, domain="xhs", localedir=None): self.domain = domain if not localedir: - localedir = ROOT.joinpath('locale') + localedir = ROOT.joinpath("locale") self.localedir = Path(localedir) self.current_translator = self.setup_translation( self.get_language_code(), @@ -41,7 +41,8 @@ class TranslationManager: ) except FileNotFoundError as e: print( - f"Warning: Translation files for '{self.domain}' not found. Error: {e}") + f"Warning: Translation files for '{self.domain}' not found. Error: {e}" + ) return translation(self.domain, fallback=True) def switch_language(self, language: str = "en_US"):