修复无水印图片采集功能

This commit is contained in:
JoeamAmier
2023-09-14 20:13:30 +08:00
parent b68020152c
commit f3ba085910
8 changed files with 74 additions and 47 deletions

View File

@@ -8,7 +8,8 @@
<img alt="GitHub release (with filter)" src="https://img.shields.io/github/v/release/JoeanAmier/XHS_Downloader?style=for-the-badge&color=44bd32">
<hr>
</div>
<p><b><code>2023/9/11</code>:因小红书更新,无水印图片采集功能失效!</b></p>
<p><del><b><code>2023/9/11</code>:因小红书更新,无水印图片采集功能失效!</b></del></p>
<p><b><code>2023/9/14</code>:已修复无水印图片采集功能!</b></p>
<h1>📝 功能清单</h1>
<ul>
<li>采集小红书图文/视频作品信息</li>
@@ -36,18 +37,20 @@
<p>如果想要获取小红书图文/视频作品信息,可以根据 <code>main.py</code> 的注释提示进行代码调用。</p>
<pre>
# 测试链接
error_demo = "https://www.xiaohongshu.com/explore/"
image_demo = "https://www.xiaohongshu.com/explore/64d1b406000000000103ee8d"
video_demo = "https://www.xiaohongshu.com/explore/64c05652000000000c0378e7"
error_demo = "https://github.com/JoeanAmier/XHS_Downloader"
image_demo = "https://www.xiaohongshu.com/explore/63b275a30000000019020185"
video_demo = "https://www.xiaohongshu.com/explore/64edb460000000001f03cadc"
# 实例对象
path = "./" # 作品下载储存根路径,默认值:当前路径
folder = "Download" # 作品下载文件夹名称自动创建默认值Download
cookie = "" # 小红书网页版 Cookie
proxies = None # 网络代理
timeout = 5 # 网络请求超时限制默认值10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位字节
xhs = XHS(
path=path,
folder=folder,
cookie=cookie,
proxies=proxies,
timeout=timeout,
chunk=chunk, ) # 使用自定义参数
@@ -85,6 +88,12 @@ print(xhs.extract(video_demo, download=download))
<td align="center">Download</td>
</tr>
<tr>
<td align="center">cookie</td>
<td align="center">str</td>
<td align="center">小红书网页版 Cookie无需登录</td>
<td align="center">内置 Cookie</td>
</tr>
<tr>
<td align="center">proxies</td>
<td align="center">str</td>
<td align="center">设置代理</td>

12
main.py
View File

@@ -16,18 +16,20 @@ from source import XHS
def example():
"""通过代码设置参数,适合二次开发"""
# 测试链接
error_demo = "https://www.xiaohongshu.com/explore/"
image_demo = "https://www.xiaohongshu.com/explore/64d1b406000000000103ee8d"
video_demo = "https://www.xiaohongshu.com/explore/64c05652000000000c0378e7"
error_demo = "https://github.com/JoeanAmier/XHS_Downloader"
image_demo = "https://www.xiaohongshu.com/explore/63b275a30000000019020185"
video_demo = "https://www.xiaohongshu.com/explore/64edb460000000001f03cadc"
# 实例对象
path = "./" # 作品下载储存根路径,默认值:当前路径
folder = "Download" # 作品下载文件夹名称自动创建默认值Download
cookie = "" # 小红书网页版 Cookie
proxies = None # 网络代理
timeout = 5 # 网络请求超时限制默认值10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位字节
xhs = XHS(
path=path,
folder=folder,
cookie=cookie,
proxies=proxies,
timeout=timeout,
chunk=chunk, ) # 使用自定义参数
@@ -39,7 +41,7 @@ def example():
print(xhs.extract(video_demo, download=download))
def main():
def program():
"""读取并应用配置文件设置的参数,适合一般作品文件下载需求"""
xhs = XHS(**Settings().run())
if ids := Batch().read_txt():
@@ -89,6 +91,6 @@ class XHSDownloader(App):
if __name__ == '__main__':
# example()
main()
program()
# app = XHSDownloader()
# app.run()

View File

@@ -10,17 +10,18 @@ __all__ = ['Download']
class Download:
manager = Manager()
temp = Path("./Temp")
def __init__(
self,
root: Path,
path: str,
folder: str,
headers: dict,
proxies=None,
chunk=256 * 1024, ):
self.root = self.init_root(path, folder)
self.headers = headers
self.temp = root.joinpath("./Temp")
self.root = self.__init_root(root, path, folder)
self.headers = self.__delete_cookie(headers)
self.proxies = {
"http": proxies,
"https": proxies,
@@ -28,22 +29,25 @@ class Download:
}
self.chunk = chunk
def init_root(self, path: str, folder: str) -> Path:
root = Path(path).joinpath(folder)
def __init_root(self, root: Path, path: str, folder: str) -> Path:
if path and (r := Path(path)).exists():
root = r.joinpath(folder or "Download")
else:
root = root.joinpath(folder or "Download")
if not root.is_dir():
root.mkdir()
if not self.temp.is_dir():
self.temp.mkdir()
return root
def run(self, urls: list, name: str):
if (l := len(urls)) > 1:
def run(self, urls: list, name: str, type_: int):
if type_ == 0:
self.__download(urls[0], f"{name}.mp4")
elif type_ == 1:
for index, url in enumerate(urls):
self.download(url, f"{name}_{index + 1}.webp")
elif l == 1:
self.download(urls[0], f"{name}.mp4")
self.__download(url, f"{name}_{index + 1}.jpeg")
def download(self, url: str, name: str):
def __download(self, url: str, name: str):
temp = self.temp.joinpath(name)
file = self.root.joinpath(name)
if self.manager.is_exists(file):
@@ -59,3 +63,9 @@ class Download:
except exceptions.ChunkedEncodingError:
self.manager.delete(temp)
print(f"网络异常,{name} 下载失败!")
@staticmethod
def __delete_cookie(headers: dict) -> dict:
download_headers = headers.copy()
del download_headers["Cookie"]
return download_headers

View File

@@ -42,3 +42,7 @@ class Html:
print("获取网页源码失败,请尝试设置 Cookie 后重试!")
return ""
return response.text
@staticmethod
def format_url(url: str) -> str:
return bytes(url, "utf-8").decode("unicode_escape")

View File

@@ -1,20 +1,12 @@
from re import compile
from .Html import Html
__all__ = ['Image']
class Image:
IMAGE_API = "https://sns-img-qc.xhscdn.com/"
IMAGE_ID = compile(r'"traceId":"(.*?)"')
IMAGE_URL = compile(r'"CRD_WM_.*?","url":"(.*?)"')
def get_image_link(self, html: str):
return self.__get_image_links(html)
def __get_id(self, html: str) -> list:
return self.IMAGE_ID.findall(html)
def __generate_url(self, ids: list) -> list:
return [self.IMAGE_API + i for i in ids]
def __get_image_links(self, html: str) -> list:
return self.__generate_url(self.__get_id(html))
def get_image_link(self, html: str) -> list:
return [Html.format_url(i) for i in self.IMAGE_URL.findall(html)]

View File

@@ -6,24 +6,25 @@ __all__ = ['Settings', 'Batch']
class Settings:
path = Path("./settings.json")
file = Path(__file__).resolve().parent.parent.joinpath("./settings.json")
default = {
"path": "./",
"folder": "Download",
"cookie": "",
"proxies": None,
"timeout": 10,
"chunk": 256 * 1024,
}
def run(self):
return self.read() if self.path.is_file() else self.create()
return self.read() if self.file.is_file() else self.create()
def read(self):
with self.path.open("r", encoding="utf-8") as f:
with self.file.open("r", encoding="utf-8") as f:
return load(f)
def create(self):
with self.path.open("w", encoding="utf-8") as f:
with self.file.open("w", encoding="utf-8") as f:
dump(self.default, f, indent=2)
return self.default

View File

@@ -1,5 +1,7 @@
from re import compile
from .Html import Html
__all__ = ['Video']
@@ -7,11 +9,4 @@ class Video:
VIDEO_ID = compile(r'"masterUrl":"(.*?)"')
def get_video_link(self, html: str):
return self.__get_video_link(html)
def __get_video_link(self, html: str) -> list:
return [self.clean_url(u) for u in self.VIDEO_ID.findall(html)]
@staticmethod
def clean_url(url: str) -> str:
return bytes(url, "utf-8").decode("unicode_escape")
return [Html.format_url(u) for u in self.VIDEO_ID.findall(html)]

View File

@@ -1,3 +1,4 @@
from pathlib import Path
from re import compile
from .Download import Download
@@ -12,8 +13,14 @@ __all__ = ['XHS', 'Settings', 'Batch']
class XHS:
ROOT = Path(__file__).resolve().parent.parent
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
"Cookie": "abRequestId=27dafe41-28af-5b33-9f22-fe05d8c4ac2f; xsecappid=xhs-pc-web; a1=18a363d90c9gw7eaz2krqhj4c"
"x2gtwgotul1wur8950000289463; webId=27fb29ed7ff41eadd4bc58197a465b63; web_session=030037a3d84590608f6"
"da85793234a9a6588ed; gid=yY0qKqfd2Y9qyY0qKqfj877FSjkEWd0uJTFA1YjxV4SCJy28k9EklE888JYj4Kq82242dKiY; w"
"ebBuild=3.8.1; websectiga=3633fe24d49c7dd0eb923edc8205740f10fdb18b25d424d2a2322c6196d2a4ad; sec_pois"
"on_id=179f847f-ba58-4ede-86bf-977d710da3b2; cache_feeds=[]",
}
links = compile(r"https://www.xiaohongshu.com/explore/[0-9a-z]+")
@@ -21,15 +28,18 @@ class XHS:
self,
path="./",
folder="Download",
cookie=None,
proxies=None,
timeout=10,
chunk=256 * 1024,
):
self.__update_cookie(cookie)
self.html = Html(self.headers, proxies, timeout)
self.image = Image()
self.video = Video()
self.explore = Explore()
self.download = Download(
self.ROOT,
path,
folder,
self.headers,
@@ -39,13 +49,13 @@ class XHS:
def __get_image(self, container: dict, html: str, download):
urls = self.image.get_image_link(html)
if download:
self.download.run(urls, self.__naming_rules(container))
self.download.run(urls, self.__naming_rules(container), 1)
container["下载地址"] = urls
def __get_video(self, container: dict, html: str, download):
url = self.video.get_video_link(html)
if download:
self.download.run(url, self.__naming_rules(container))
self.download.run(url, self.__naming_rules(container), 0)
container["下载地址"] = url
def extract(self, url: str, download=False) -> dict:
@@ -72,3 +82,7 @@ class XHS:
def __naming_rules(data: dict) -> str:
"""下载文件默认使用作品 ID 作为文件名,可修改此方法自定义文件名格式"""
return data["作品ID"]
def __update_cookie(self, cookie: str) -> None:
if cookie and isinstance(cookie, str):
self.headers["Cookie"] = cookie