更新项目代码

This commit is contained in:
JoeamAmier
2023-12-07 23:09:39 +08:00
parent 413168f122
commit a1d296191b
13 changed files with 185 additions and 82 deletions

View File

@@ -19,7 +19,9 @@
</ul>
<h1>📸 程序截图</h1>
<br>
<img src="static/程序运行截图.png" alt="">
<img src="static/程序运行截图1.png" alt="">
<hr>
<img src="static/程序运行截图2.png" alt="">
<h1>🔗 支持链接</h1>
<ul>
<li><code>https://www.xiaohongshu.com/explore/作品ID</code></li>
@@ -50,26 +52,28 @@ image_demo = "https://www.xiaohongshu.com/explore/63b275a30000000019020185"
video_demo = "https://www.xiaohongshu.com/explore/64edb460000000001f03cadc"
multiple_demo = f"{image_demo} {video_demo}"
# 实例对象
path = "" # 作品下载储存根路径,默认值:当前路径
folder = "Download" # 作品下载文件夹名称自动创建默认值Download
path = "" # 作品数据/文件保存根路径,默认值:项目根路径
folder_name = "Download" # 作品文件储存文件夹名称自动创建默认值Download
user_agent = "" # 请求头 User-Agent
proxy = None # 网络代理
timeout = 5 # 网络请求超时限制默认值10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位字节
proxy = "" # 网络代理
timeout = 5 # 网络请求超时限制,单位:秒,默认值10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位字节
max_retry = 2 # 请求数据失败时重试的最大次数单位默认值5
# async with XHS() as xhs:
# pass # 使用默认参数
async with XHS(path=path,
folder=folder,
folder_name=folder_name,
user_agent=user_agent,
proxy=proxy,
timeout=timeout,
chunk=chunk) as xhs: # 使用自定义参数
chunk=chunk,
max_retry=max_retry, ) as xhs: # 使用自定义参数
download = True # 是否下载作品文件默认值False
# 返回作品详细信息,包括下载地址
print(await xhs.extract(error_demo, download=download)) # 获取数据失败时返回空字典
print(await xhs.extract(image_demo, download=download))
print(await xhs.extract(video_demo, download=download))
print(await xhs.extract(multiple_demo, download=download)) # 支持传入多个作品链接
print(await xhs.extract(error_demo, download)) # 获取数据失败时返回空字典
print(await xhs.extract(image_demo, download))
print(await xhs.extract(video_demo, download))
print(await xhs.extract(multiple_demo, download)) # 支持传入多个作品链接
</pre>
<h1>⚙️ 配置文件</h1>
<p>项目根目录下的 <code>settings.json</code> 文件,首次运行自动生成,可以自定义部分运行参数。</p>
@@ -90,7 +94,7 @@ async with XHS(path=path,
<td align="center">项目根路径</td>
</tr>
<tr>
<td align="center">folder</td>
<td align="center">folder_name</td>
<td align="center">str</td>
<td align="center">作品文件储存文件夹名称</td>
<td align="center">Download</td>
@@ -99,7 +103,7 @@ async with XHS(path=path,
<td align="center">user_agent</td>
<td align="center">str</td>
<td align="center">请求头 User-Agent</td>
<td align="center">内置 UA</td>
<td align="center">默认 UA</td>
</tr>
<tr>
<td align="center">proxy</td>
@@ -119,6 +123,12 @@ async with XHS(path=path,
<td align="center">下载文件时,每次从服务器获取的数据块大小,单位:字节</td>
<td align="center">1048576(1 MB)</td>
</tr>
<tr>
<td align="center">max_retry</td>
<td align="center">int</td>
<td align="center">请求数据失败时,重试的最大次数,单位:秒</td>
<td align="center">5</td>
</tr>
</tbody>
</table>
<h1>♥️ 支持项目</h1>

35
main.py
View File

@@ -1,6 +1,7 @@
from asyncio import run
from source import XHS
from source import XHSDownloader
async def example():
@@ -11,29 +12,35 @@ async def example():
video_demo = "https://www.xiaohongshu.com/explore/64edb460000000001f03cadc"
multiple_demo = f"{image_demo} {video_demo}"
# 实例对象
path = "" # 作品下载储存根路径,默认值:当前路径
folder = "Download" # 作品下载文件夹名称自动创建默认值Download
path = "" # 作品数据/文件保存根路径,默认值:项目根路径
folder_name = "Download" # 作品文件储存文件夹名称自动创建默认值Download
user_agent = "" # 请求头 User-Agent
proxy = None # 网络代理
timeout = 5 # 网络请求超时限制默认值10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位字节
proxy = "" # 网络代理
timeout = 5 # 网络请求超时限制,单位:秒,默认值10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位字节
max_retry = 2 # 请求数据失败时重试的最大次数单位默认值5
# async with XHS() as xhs:
# pass # 使用默认参数
async with XHS(path=path,
folder=folder,
folder_name=folder_name,
user_agent=user_agent,
proxy=proxy,
timeout=timeout,
chunk=chunk) as xhs: # 使用自定义参数
chunk=chunk,
max_retry=max_retry, ) as xhs: # 使用自定义参数
download = True # 是否下载作品文件默认值False
# 返回作品详细信息,包括下载地址
print(await xhs.extract(error_demo, download=download)) # 获取数据失败时返回空字典
print(await xhs.extract(image_demo, download=download))
print(await xhs.extract(video_demo, download=download))
print(await xhs.extract(multiple_demo, download=download)) # 支持传入多个作品链接
print(await xhs.extract(error_demo, download)) # 获取数据失败时返回空字典
print(await xhs.extract(image_demo, download))
print(await xhs.extract(video_demo, download))
print(await xhs.extract(multiple_demo, download)) # 支持传入多个作品链接
async def main():
async with XHSDownloader() as xhs:
await xhs.run_async()
if __name__ == '__main__':
run(example())
# with XHSDownloader() as xhs:
# xhs.run()
# run(example())
run(main())

View File

@@ -1,2 +1,3 @@
aiohttp>=3.9.0
textual>=0.40.0
pyperclip>=1.8.2

View File

@@ -2,7 +2,11 @@ from pathlib import Path
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from aiohttp import ServerDisconnectedError
from aiohttp import ServerTimeoutError
from rich.text import Text
from .Html import retry
__all__ = ['Download']
@@ -15,9 +19,10 @@ class Download:
root: Path,
path: str,
folder: str,
proxy: str = None,
proxy: str = "",
chunk=1024 * 1024,
timeout=10, ):
timeout=10,
retry_=5, ):
self.manager = manager
self.temp = manager.temp
self.root = self.__init_root(root, path, folder)
@@ -26,6 +31,7 @@ class Download:
self.session = ClientSession(
headers=manager.headers,
timeout=ClientTimeout(connect=timeout))
self.retry = retry_
def __init_root(self, root: Path, path: str, folder: str) -> Path:
if path and (r := Path(path)).is_dir():
@@ -45,11 +51,13 @@ class Download:
else:
raise ValueError
@retry
async def __download(self, url: str, name: str, log, bar):
temp = self.temp.joinpath(name)
file = self.root.joinpath(name)
if self.manager.is_exists(file):
return
self.rich_log(log, f"{name} 已存在,跳过下载")
return True
try:
async with self.session.get(url, proxy=self.proxy) as response:
self.__create_progress(
@@ -62,9 +70,16 @@ class Download:
self.__update_progress(bar, len(chunk))
self.manager.move(temp, file)
self.__create_progress(bar, None)
except ServerTimeoutError:
self.rich_log(log, f"{name} 下载成功")
return True
except (
ServerTimeoutError,
ServerDisconnectedError,
):
self.manager.delete(temp)
self.__create_progress(bar, None)
self.rich_log(log, f"{name} 下载失败", "bright_red")
return False
@staticmethod
def __create_progress(bar, total: int | None):
@@ -75,3 +90,10 @@ class Download:
def __update_progress(bar, advance: int):
if bar:
bar.advance(advance)
@staticmethod
def rich_log(log, text, style="bright_green"):
if log:
log.write(Text(text, style=f"b {style}"))
else:
print(text)

View File

@@ -1,24 +1,40 @@
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from aiohttp import ServerDisconnectedError
from aiohttp import ServerTimeoutError
__all__ = ['Html']
def retry(function):
async def inner(self, *args, **kwargs):
if result := await function(self, *args, **kwargs):
return result
for _ in range(self.retry):
if result := await function(self, *args, **kwargs):
return result
return result
return inner
class Html:
def __init__(
self,
headers: dict,
proxy: str = None,
timeout=10, ):
proxy: str = "",
timeout=10,
retry_=5, ):
self.proxy = proxy
self.session = ClientSession(
headers=headers | {
"Referer": "https://www.xiaohongshu.com/", },
timeout=ClientTimeout(connect=timeout),
)
self.retry = retry_
@retry
async def request_url(
self,
url: str,
@@ -28,8 +44,11 @@ class Html:
url,
proxy=self.proxy,
) as response:
return await response.text() if text else response.url
except ServerTimeoutError:
return await response.text() if text else str(response.url)
except (
ServerTimeoutError,
ServerDisconnectedError,
):
return ""
@staticmethod

View File

@@ -26,13 +26,14 @@ class Image:
return f"https://ci.xiaohongshu.com/{token}?imageView2/2/w/format/png"
def __extract_image_token(self, url: str) -> str:
return self.__generate_image_link(token.group(1)) if (token := self.IMAGE_TOKEN.search(url)) else ""
return self.__generate_image_link(token.group(1)) if (
token := self.IMAGE_TOKEN.search(url)) else ""
def __extract_image_urls(self, data: list[dict]) -> list[str]:
urls = []
for i in data:
for j in i.get("infoList", []):
if j.get("imageScene", "").startswith("CRD_WM_"):
if j.get("imageScene", "") == "WB_DFT":
urls.append(self.__extract_image_token(j.get("url", "")))
break
return [i for i in urls if i]

View File

@@ -6,11 +6,12 @@ __all__ = ["Manager"]
class Manager:
def __init__(self, root: Path, ua: str):
def __init__(self, root: Path, ua: str, retry: int):
self.temp = root.joinpath("./temp")
self.headers = {
"User-Agent": ua or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0", }
self.retry = retry
@staticmethod
def is_exists(path: Path) -> bool:

View File

@@ -8,11 +8,12 @@ __all__ = ['Settings']
class Settings:
default = {
"path": "",
"folder": "Download",
"folder_name": "Download",
"user_agent": "",
"proxy": "",
"timeout": 10,
"chunk": 1024 * 1024,
"max_retry": 5,
}
def __init__(self, root: Path):

View File

@@ -27,9 +27,13 @@ from .Video import Video
__all__ = ['XHS', 'XHSDownloader']
RELEASES = "https://github.com/JoeanAmier/XHS-Downloader/releases/latest"
VERSION = 1.6
BETA = False
ROOT = Path(__file__).resolve().parent.parent
class XHS:
ROOT = Path(__file__).resolve().parent.parent
LINK = compile(r"https://www\.xiaohongshu\.com/explore/[a-z0-9]+")
SHARE = compile(r"https://www\.xiaohongshu\.com/discovery/item/[a-z0-9]+")
SHORT = compile(r"https://xhslink\.com/[A-Za-z0-9]+")
@@ -43,26 +47,33 @@ class XHS:
def __init__(
self,
path="",
folder="Download",
folder_name="Download",
user_agent: str = None,
proxy: str = None,
proxy: str = "",
timeout=10,
chunk=1024 * 1024,
max_retry=5,
**kwargs,
):
self.manager = Manager(self.ROOT, user_agent)
self.html = Html(self.manager.headers, proxy, timeout)
self.manager = Manager(ROOT, user_agent, max_retry)
self.html = Html(
self.manager.headers,
proxy,
timeout,
self.manager.retry)
self.image = Image()
self.video = Video()
self.explore = Explore()
self.download = Download(
self.manager,
self.ROOT,
ROOT,
path,
folder,
folder_name,
proxy,
chunk,
timeout)
timeout,
self.manager.retry, )
self.rich_log = self.download.rich_log
async def __get_image(self, container: dict, html: str, download, log, bar):
urls = self.image.get_image_link(html)
@@ -81,7 +92,10 @@ class XHS:
async def extract(self, url: str, download=False, log=None, bar=None) -> list[dict]:
# return # 调试代码
urls = await self.__deal_links(url)
# self.rich_log(log, urls) # 调试代码
if not urls:
self.rich_log(log, "提取小红书作品链接失败", "bright_red")
else:
self.rich_log(log, f"{len(urls)} 个小红书作品待处理")
# return urls # 调试代码
return [await self.__deal_extract(i, download, log, bar) for i in urls]
@@ -98,18 +112,22 @@ class XHS:
return urls
async def __deal_extract(self, url: str, download: bool, log, bar):
self.rich_log(log, f"开始处理:{url}")
html = await self.html.request_url(url)
# self.rich_log(log, html) # 调试代码
if not html:
self.rich_log(log, f"{url} 获取数据失败", "bright_red")
return {}
data = self.explore.run(html)
# self.rich_log(log, data) # 调试代码
if not data:
self.rich_log(log, f"{url} 提取数据失败", "bright_red")
return {}
if data["作品类型"] == "视频":
await self.__get_video(data, html, download, log, bar)
else:
await self.__get_image(data, html, download, log, bar)
self.rich_log(log, f"完成处理:{url}")
return data
@staticmethod
@@ -125,42 +143,39 @@ class XHS:
await self.html.session.close()
await self.download.session.close()
@staticmethod
def rich_log(log, text, style="b bright_green"):
if log:
log.write(Text(text, style=style))
else:
print(text)
class XHSDownloader(App):
VERSION = 1.6
BETA = True
ROOT = Path(__file__).resolve().parent.parent
# APP = XHS(**Settings(ROOT).run())
CSS_PATH = ROOT.joinpath(
"static/XHS-Downloader.tcss")
BINDINGS = [
Binding(key="q", action="quit", description="结束运行"),
Binding(key="q", action="quit", description="退出程序"),
("d", "toggle_dark", "切换主题"),
Binding(key="u", action="check_update", description="检查更新"),
]
def __init__(self):
super().__init__()
self.APP = XHS(**Settings(ROOT).run())
self.url = None
self.log_ = None
self.bar = None
def __enter__(self):
async def __aenter__(self):
await self.APP.__aenter__()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.APP.manager.clean()
async def __aexit__(self, exc_type, exc_value, traceback):
await self.APP.__aexit__(exc_type, exc_value, traceback)
def compose(self) -> ComposeResult:
# yield LoadingIndicator()
yield Header()
yield ScrollableContainer(Label("请输入小红书图文/视频作品链接:"),
yield ScrollableContainer(Label(Text("请输入小红书图文/视频作品链接:", style="b bright_blue")),
Input(placeholder="多个链接之间使用空格分隔"),
HorizontalScroll(Button("下载无水印图片/视频", id="deal"),
Button("读取剪贴板", id="paste"),
Button("清空输入框", id="reset"), ),
Label(Text("程序状态", style="b bright_blue")),
)
with Center():
yield ProgressBar(total=None)
@@ -168,26 +183,49 @@ class XHSDownloader(App):
yield Footer()
def on_mount(self) -> None:
self.title = f"XHS-Downloader V{
self.VERSION}{
" Beta" if self.BETA else ""}"
self.title = f"XHS-Downloader V{VERSION}{" Beta" if BETA else ""}"
def on_button_pressed(self, event: Button.Pressed) -> None:
async def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "deal":
self.deal()
await self.deal()
elif event.button.id == "reset":
self.query_one(Input).value = ""
elif event.button.id == "paste":
self.query_one(Input).value = paste()
def deal(self):
url = self.query_one(Input)
log = self.query_one(RichLog)
bar = self.query_one(ProgressBar)
if not url.value:
log.write(Text("未输入任何小红书作品链接!", style="yellow"))
async def deal(self):
self.__init_objects()
if not self.url.value:
self.log_.write(Text("未输入任何小红书作品链接", style="b bright_yellow"))
return
_ = self.APP.extract(url.value, True, log=log, bar=bar)
if not _:
log.write(Text("获取小红书作品数据失败!", style="red"))
url.value = ""
if any(await self.APP.extract(self.url.value, True, log=self.log_, bar=self.bar)):
self.url.value = ""
else:
self.log_.write(Text("下载小红书作品文件失败", style="b bright_red"))
def __init_objects(self):
if any((self.url, self.log_, self.bar)):
return
self.url = self.query_one(Input)
self.log_ = self.query_one(RichLog)
self.bar = self.query_one(ProgressBar)
async def action_check_update(self):
self.__init_objects()
try:
url = await self.APP.html.request_url(RELEASES, False)
tag = float(url.split("/")[-1])
if tag > VERSION:
self.log_.write(
Text(f"检测到新版本: {tag}", style="b bright_yellow"))
self.log_.write(RELEASES)
elif tag == VERSION and BETA:
self.log_.write(
Text("当前版本为开发版, 可更新至正式版", style="b bright_yellow"))
self.log_.write(RELEASES)
elif BETA:
self.log_.write(Text("当前已是最新开发版", style="b bright_yellow"))
else:
self.log_.write(Text("当前已是最新正式版", style="b bright_green"))
except ValueError:
self.log_.write(Text("检测新版本失败", style="b bright_red"))

View File

@@ -4,10 +4,10 @@ Button {
text-style: bold;
}
Button#deal, Button#paste {
tint: green 40%;
tint: #27ae60 60%;
}
Button#reset {
tint: red 40%;
tint: #c0392b 60%;
}
Label {
width: 100%;
@@ -16,3 +16,6 @@ Label {
content-align-vertical: middle;
text-style: bold;
}
Bar > .bar--indeterminate {
color: #2ed573;
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 107 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 136 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 140 KiB