更新项目代码

This commit is contained in:
JoeamAmier 2023-12-02 10:33:51 +08:00
parent 88cc09c371
commit cc72e54302
10 changed files with 90 additions and 106 deletions

View File

@ -25,46 +25,44 @@
<h1>🥣 使用方法</h1>
<p>如果仅需下载作品文件,选择 <b>直接运行</b> 或者 <b>源码运行</b> 均可,如果需要获取作品信息,则需要进行二次开发进行调用。</p>
<h2>🖱 直接运行</h2>
<p>前往 Releases 下载程序压缩包,解压后打开程序文件夹,双击运行 <code>main.exe</code> 即可使用。</p>
<p>前往 <a href="https://github.com/JoeanAmier/XHS-Downloader/releases/latest">Releases</a> 下载程序压缩包,解压后打开程序文件夹,双击运行 <code>main.exe</code> 即可使用。</p>
<h2>⌨️ 源码运行</h2>
<ol>
<li>安装版本号不低于 <code>3.12</code> 的 Python 解释器</li>
<li>运行 <code>pip install -r requirements.txt</code> 命令安装程序所需模块</li>
<li>下载本项目最新的源码或 <code>Releases</code> 发布的源码至本地</li>
<li>下载本项目最新的源码或 <a href="https://github.com/JoeanAmier/XHS-Downloader/releases/latest">Releases</a> 发布的源码至本地</li>
<li>运行 <code>main.py</code> 即可使用</li>
</ol>
<h2>💻 二次开发</h2>
<p>如果想要获取小红书图文/视频作品信息,可以根据 <code>main.py</code> 的注释提示进行代码调用。</p>
<pre>
# 测试链接
error_demo = "https://github.com/JoeanAmier/XHS-Downloader"
error_demo = "https://github.com/JoeanAmier/XHS_Downloader"
image_demo = "https://www.xiaohongshu.com/explore/63b275a30000000019020185"
video_demo = "https://www.xiaohongshu.com/explore/64edb460000000001f03cadc"
multiple_demo = f"{image_demo} {video_demo}"
# 实例对象
path = "" # 作品下载储存根路径,默认值:当前路径
path = "D:\\" # 作品下载储存根路径,默认值:当前路径
folder = "Download" # 作品下载文件夹名称自动创建默认值Download
cookie = "" # 小红书网页版 Cookie
proxies = None # 网络代理
timeout = 5 # 网络请求超时限制默认值10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位字节
xhs = XHS(
path=path,
folder=folder,
cookie=cookie,
proxies=proxies,
timeout=timeout,
chunk=chunk, ) # 使用自定义参数
# xhs = XHS() # 使用默认参数
download = True # 是否下载作品文件
download = True # 是否下载作品文件默认值False
# 返回作品详细信息,包括下载地址
print(xhs.extract(error_demo)) # 获取数据失败时返回空字典
print(xhs.extract(image_demo, download=download))
print(xhs.extract(video_demo, download=download))
print(xhs.extract(multiple_demo, download=download))
</pre>
<h1>⛓ 批量下载</h1>
<p>在程序所在文件夹创建一个 <code>xhs.txt</code> 文本文件,然后将待处理的作品链接输入文件,每行输入一个作品链接,编辑完成后保存文件,然后运行程序,点击 <code>读取 xhs.txt 文件并批量下载作品</code> 按钮,程序会批量下载每个链接对应的作品文件。</p>
<h1>⚙️ 配置文件</h1>
<p>根目录下的 <code>settings.json</code> 文件,可以自定义部分运行参数。</p>
<p>项目根目录下的 <code>settings.json</code> 文件,可以自定义部分运行参数。</p>
<table>
<thead>
<tr>
@ -88,12 +86,6 @@ print(xhs.extract(video_demo, download=download))
<td align="center">Download</td>
</tr>
<tr>
<td align="center">cookie</td>
<td align="center">str</td>
<td align="center">小红书网页版 Cookie无需登录建议自行设置</td>
<td align="center">内置 Cookie</td>
</tr>
<tr>
<td align="center">proxies</td>
<td align="center">str</td>
<td align="center">设置代理</td>
@ -109,20 +101,10 @@ print(xhs.extract(video_demo, download=download))
<td align="center">chunk</td>
<td align="center">int</td>
<td align="center">下载文件时,每次从服务器获取的数据块大小,单位:字节</td>
<td align="center">262144(256KB)</td>
<td align="center">1048576(1 MB)</td>
</tr>
</tbody>
</table>
<h1>🌐 Cookie</h1>
<ol>
<li>打开浏览器(可选无痕模式启动),访问小红书任意网页</li>
<li><code>F12</code> 打开开发人员工具</li>
<li>选择 <code>控制台</code> 选项卡</li>
<li>输入 <code>document.cookie</code> 后回车确认</li>
<li>输出内容即为所需 Cookie</li>
</ol>
<br>
<img src="static/获取Cookie示意图.png" alt="">
<h1>♥️ 支持项目</h1>
<p>如果 <b>XHS-Downloader</b> 对您有帮助,请考虑为它点个 <b>Star</b> ⭐,感谢您的支持!</p>
<table>
@ -141,7 +123,7 @@ print(xhs.extract(video_demo, download=download))
<p>如果您愿意,可以考虑提供资助为 <b>XHS-Downloader</b> 提供额外的支持!</p>
<h1>✉️ 联系作者</h1>
<ul>
<li>QQ: 2437596031</li>
<li>QQ: 2437596031(联系请说明来意)</li>
<li>QQ Group: <a href="https://github.com/JoeanAmier/XHS-Downloader/blob/master/static/QQ%E7%BE%A4%E8%81%8A%E4%BA%8C%E7%BB%B4%E7%A0%81.png">点击扫码加入群聊</a></li>
<li>Email: yonglelolu@gmail.com</li>
</ul>

View File

@ -3,31 +3,31 @@ from pathlib import Path
from requests import exceptions
from requests import get
from .Manager import Manager
__all__ = ['Download']
class Download:
manager = Manager()
def __init__(
self,
manager,
root: Path,
path: str,
folder: str,
headers: dict,
proxies=None,
chunk=1024 * 1024, ):
self.temp = root.joinpath("./temp")
chunk=1024 * 1024,
timeout=10, ):
self.manager = manager
self.temp = manager.temp
self.headers = manager.headers
self.root = self.__init_root(root, path, folder)
self.headers = self.__delete_cookie(headers)
self.proxies = {
"http": proxies,
"https": proxies,
"ftp": proxies,
}
self.chunk = chunk
self.timeout = timeout
def __init_root(self, root: Path, path: str, folder: str) -> Path:
if path and (r := Path(path)).is_dir():
@ -38,39 +38,29 @@ class Download:
self.temp.mkdir(exist_ok=True)
return root
def run(self, urls: list, name: str, type_: int, log):
def run(self, urls: list, name: str, type_: int):
if type_ == 0:
self.__download(urls[0], f"{name}.mp4", log)
self.__download(urls[0], f"{name}.mp4")
elif type_ == 1:
for index, url in enumerate(urls):
self.__download(url, f"{name}_{index + 1}.jpeg", log)
self.__download(url, f"{name}_{index + 1}.png")
def __download(self, url: str, name: str, log):
def __download(self, url: str, name: str):
temp = self.temp.joinpath(name)
file = self.root.joinpath(name)
if self.manager.is_exists(file):
self.output_prompt(f"文件 {name} 已存在,跳过下载!", log)
return
try:
with get(url, headers=self.headers, proxies=self.proxies, stream=True) as response:
with get(url, headers=self.headers, proxies=self.proxies, stream=True, timeout=self.timeout) as response:
with temp.open("wb") as f:
for chunk in response.iter_content(chunk_size=self.chunk):
f.write(chunk)
self.manager.move(temp, file)
self.output_prompt(f"文件 {name} 下载成功!", log)
except exceptions.ChunkedEncodingError:
except (
exceptions.ProxyError,
exceptions.SSLError,
exceptions.ChunkedEncodingError,
exceptions.ConnectionError,
exceptions.ReadTimeout,
):
self.manager.delete(temp)
self.output_prompt(f"网络异常,文件 {name} 下载失败!", log)
@staticmethod
def __delete_cookie(headers: dict) -> dict:
download_headers = headers.copy()
del download_headers["Cookie"]
return download_headers
@staticmethod
def output_prompt(tip: str, log):
if log:
log.write_line(tip)
else:
print(tip)

View File

@ -15,8 +15,8 @@ class Explore:
return self.__extract_data(data)
def __get_json_data(self, html: str) -> dict:
data = self.explore_data.findall(html)
return {} if len(data) != 1 else loads(data[0])
data = self.explore_data.search(html)
return loads(data.group(1)) if data else {}
def __extract_data(self, data: dict) -> dict:
result = {}

View File

@ -19,11 +19,12 @@ class Html:
}
self.timeout = timeout
def get_html(
def request_url(
self,
url: str,
params=None,
headers=None, ) -> str:
headers=None,
text=True, ) -> str:
try:
response = get(
url,
@ -38,9 +39,9 @@ class Html:
exceptions.ConnectionError,
exceptions.ReadTimeout,
):
print("获取网页源码失败,请尝试设置 Cookie 后重试")
print("网络异常,获取网页源码失败!")
return ""
return response.text
return response.text if text else response.url
@staticmethod
def format_url(url: str) -> str:

View File

@ -1,5 +1,6 @@
from json import loads
from re import compile
__all__ = ['Image']
@ -25,8 +26,7 @@ class Image:
return f"https://ci.xiaohongshu.com/{token}?imageView2/2/w/format/png"
def __extract_image_token(self, url: str) -> str:
return self.__generate_image_link(token[0]) if len(
token := self.IMAGE_TOKEN.findall(url)) == 1 else ""
return self.__generate_image_link(token.group(1)) if (token := self.IMAGE_TOKEN.search(url)) else ""
def __extract_image_urls(self, data: list[dict]) -> list[str]:
urls = []

View File

@ -5,6 +5,14 @@ __all__ = ['Manager']
class Manager:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/119.0.0.0 Safari/537.36",
}
def __init__(self, root: Path):
self.temp = root.joinpath("./temp")
@staticmethod
def is_exists(path: Path) -> bool:
return path.exists()

View File

@ -6,16 +6,17 @@ __all__ = ['Settings']
class Settings:
file = Path(__file__).resolve().parent.parent.joinpath("./settings.json")
default = {
"path": "",
"folder": "Download",
"cookie": "",
"proxies": None,
"timeout": 10,
"chunk": 1024 * 1024,
}
def __init__(self, root: Path):
self.file = root.joinpath("./settings.json")
def run(self):
return self.read() if self.file.is_file() else self.create()

View File

@ -1,12 +1,11 @@
from re import compile
from .Html import Html
__all__ = ['Video']
class Video:
VIDEO_ID = compile(r'"masterUrl":"(.*?)"')
VIDEO_TOKEN = compile(r'"originVideoKey":"pre_post\\u002F(\S+?)"')
def get_video_link(self, html: str):
return [Html.format_url(u) for u in self.VIDEO_ID.findall(html)]
def get_video_link(self, html: str) -> list:
return [f"https://sns-video-hw.xhscdn.com/pre_post/{
t.group(1)}"] if (t := self.VIDEO_TOKEN.search(html)) else []

View File

@ -14,10 +14,11 @@ from textual.widgets import Input
from textual.widgets import Label
from textual.widgets import Log
from .Download import Download
from .Downloader import Download
from .Explore import Explore
from .Html import Html
from .Image import Image
from .Manager import Manager
from .Settings import Settings
from .Video import Video
@ -26,78 +27,79 @@ __all__ = ['XHS', 'XHSDownloader']
class XHS:
ROOT = Path(__file__).resolve().parent.parent
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome"
"/116.0.0.0 Safari/537.36",
"Cookie": "abRequestId=c76828f5-4f37-5b3b-8cc3-036eb91b2edb; webBuild=3.14.1; xsecappid=xhs-pc-web; "
"a1=18ba9b2b23co9uwihz4adkebwsw05g8upycgsldyj50000141248; webId=23ee7745020025247828cf8d6d0decff; "
"websectiga=6169c1e84f393779a5f7de7303038f3b47a78e47be716e7bec57ccce17d45f99; "
"sec_poison_id=ae001863-a9db-4463-ad78-ede3aac4e5b1; gid=yYD0jDJDWyU4yYD0jDJDJv1fqSlj7E3xu40fSvVTd"
"DEMEk2882kY7M888y4yJ4Y8D8SK0iiK; web_session=030037a2797dde5008c3e66f32224a8af75429; ",
}
links = compile(r"https://www.xiaohongshu.com/explore/[0-9a-z]+")
link = compile(r"https://www\.xiaohongshu\.com/explore/[a-z0-9]+")
share = compile(r"https://www\.xiaohongshu\.com/discovery/item/[a-z0-9]+")
short = compile(r"https://xhslink\.com/[A-Za-z0-9]+")
def __init__(
self,
path="",
folder="Download",
cookie=None,
proxies=None,
timeout=10,
chunk=1024 * 1024,
**kwargs,
):
self.__update_cookie(cookie)
self.html = Html(self.headers, proxies, timeout)
self.manager = Manager(self.ROOT)
self.html = Html(self.manager.headers, proxies, timeout)
self.image = Image()
self.video = Video()
self.explore = Explore()
self.download = Download(
self.manager,
self.ROOT,
path,
folder,
self.headers,
proxies,
chunk)
chunk,
timeout)
def __get_image(self, container: dict, html: str, download, log):
def __get_image(self, container: dict, html: str, download):
urls = self.image.get_image_link(html)
if download:
self.download.run(urls, self.__naming_rules(container), 1, log)
self.download.run(urls, self.__naming_rules(container), 1)
container["下载地址"] = urls
def __get_video(self, container: dict, html: str, download, log):
def __get_video(self, container: dict, html: str, download):
url = self.video.get_video_link(html)
if download:
self.download.run(url, self.__naming_rules(container), 0, log)
self.download.run(url, self.__naming_rules(container), 0)
container["下载地址"] = url
def extract(self, url: str, download=False, log=None) -> dict | list[dict]:
if not self.__check(url):
return {}
html = self.html.get_html(url)
def extract(self, url: str, download=False) -> list[dict]:
urls = self.__deal_links(url)
# return urls
return [self.__deal_extract(i, download) for i in urls]
def __deal_links(self, url: str) -> list:
urls = []
for i in url.split():
if u := self.short.search(i):
i = self.html.request_url(u.group(), headers=self.manager.headers, text=False)
if u := self.share.search(i):
urls.append(u.group())
elif u := self.link.search(i):
urls.append(u.group())
return urls
def __deal_extract(self, url: str, download: bool):
html = self.html.request_url(url)
if not html:
return {}
data = self.explore.run(html)
if not data:
return {}
if data["作品类型"] == "视频":
self.__get_video(data, html, download, log)
self.__get_video(data, html, download)
else:
self.__get_image(data, html, download, log)
self.__get_image(data, html, download)
return data
def __check(self, url: str):
return self.links.match(url)
@staticmethod
def __naming_rules(data: dict) -> str:
"""下载文件默认使用作品 ID 作为文件名,可修改此方法自定义文件名格式"""
return data["作品ID"]
def __update_cookie(self, cookie: str) -> None:
if cookie and isinstance(cookie, str):
self.headers["Cookie"] = cookie
class XHSDownloader(App):
VERSION = 1.6
@ -109,7 +111,8 @@ class XHSDownloader(App):
Binding(key="q", action="quit", description="退出程序"),
("d", "toggle_dark", "切换主题"),
]
APP = XHS(**Settings().run())
# APP = XHS(**Settings().run())
def compose(self) -> ComposeResult:
yield Header()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB