diff --git a/README.md b/README.md
index 3cebd5b..5418256 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,8 @@
2023/9/11:因小红书更新,无水印图片采集功能失效!
2023/9/11:因小红书更新,无水印图片采集功能失效!
2023/9/14:已修复无水印图片采集功能!
如果想要获取小红书图文/视频作品信息,可以根据 main.py 的注释提示进行代码调用。
# 测试链接
-error_demo = "https://www.xiaohongshu.com/explore/"
-image_demo = "https://www.xiaohongshu.com/explore/64d1b406000000000103ee8d"
-video_demo = "https://www.xiaohongshu.com/explore/64c05652000000000c0378e7"
+error_demo = "https://github.com/JoeanAmier/XHS_Downloader"
+image_demo = "https://www.xiaohongshu.com/explore/63b275a30000000019020185"
+video_demo = "https://www.xiaohongshu.com/explore/64edb460000000001f03cadc"
# 实例对象
path = "./" # 作品下载储存根路径,默认值:当前路径
folder = "Download" # 作品下载文件夹名称(自动创建),默认值:Download
+cookie = "" # 小红书网页版 Cookie
proxies = None # 网络代理
timeout = 5 # 网络请求超时限制,默认值:10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位字节
xhs = XHS(
path=path,
folder=folder,
+ cookie=cookie,
proxies=proxies,
timeout=timeout,
chunk=chunk, ) # 使用自定义参数
@@ -85,6 +88,12 @@ print(xhs.extract(video_demo, download=download))
Download
+cookie
+str
+小红书网页版 Cookie,无需登录
+内置 Cookie
+
+
proxies
str
设置代理
diff --git a/main.py b/main.py
index afc7a5a..18882c3 100644
--- a/main.py
+++ b/main.py
@@ -16,18 +16,20 @@ from source import XHS
def example():
"""通过代码设置参数,适合二次开发"""
# 测试链接
- error_demo = "https://www.xiaohongshu.com/explore/"
- image_demo = "https://www.xiaohongshu.com/explore/64d1b406000000000103ee8d"
- video_demo = "https://www.xiaohongshu.com/explore/64c05652000000000c0378e7"
+ error_demo = "https://github.com/JoeanAmier/XHS_Downloader"
+ image_demo = "https://www.xiaohongshu.com/explore/63b275a30000000019020185"
+ video_demo = "https://www.xiaohongshu.com/explore/64edb460000000001f03cadc"
# 实例对象
path = "./" # 作品下载储存根路径,默认值:当前路径
folder = "Download" # 作品下载文件夹名称(自动创建),默认值:Download
+ cookie = "" # 小红书网页版 Cookie
proxies = None # 网络代理
timeout = 5 # 网络请求超时限制,默认值:10
chunk = 1024 * 1024 # 下载文件时,每次从服务器获取的数据块大小,单位字节
xhs = XHS(
path=path,
folder=folder,
+ cookie=cookie,
proxies=proxies,
timeout=timeout,
chunk=chunk, ) # 使用自定义参数
@@ -39,7 +41,7 @@ def example():
print(xhs.extract(video_demo, download=download))
-def main():
+def program():
"""读取并应用配置文件设置的参数,适合一般作品文件下载需求"""
xhs = XHS(**Settings().run())
if ids := Batch().read_txt():
@@ -89,6 +91,6 @@ class XHSDownloader(App):
if __name__ == '__main__':
# example()
- main()
+ program()
# app = XHSDownloader()
# app.run()
diff --git a/source/Download.py b/source/Download.py
index 6c88a4b..87ea265 100644
--- a/source/Download.py
+++ b/source/Download.py
@@ -10,17 +10,18 @@ __all__ = ['Download']
class Download:
manager = Manager()
- temp = Path("./Temp")
def __init__(
self,
+ root: Path,
path: str,
folder: str,
headers: dict,
proxies=None,
chunk=256 * 1024, ):
- self.root = self.init_root(path, folder)
- self.headers = headers
+ self.temp = root.joinpath("./Temp")
+ self.root = self.__init_root(root, path, folder)
+ self.headers = self.__delete_cookie(headers)
self.proxies = {
"http": proxies,
"https": proxies,
@@ -28,22 +29,25 @@ class Download:
}
self.chunk = chunk
- def init_root(self, path: str, folder: str) -> Path:
- root = Path(path).joinpath(folder)
+ def __init_root(self, root: Path, path: str, folder: str) -> Path:
+ if path and (r := Path(path)).exists():
+ root = r.joinpath(folder or "Download")
+ else:
+ root = root.joinpath(folder or "Download")
if not root.is_dir():
root.mkdir()
if not self.temp.is_dir():
self.temp.mkdir()
return root
- def run(self, urls: list, name: str):
- if (l := len(urls)) > 1:
+ def run(self, urls: list, name: str, type_: int):
+ if type_ == 0:
+ self.__download(urls[0], f"{name}.mp4")
+ elif type_ == 1:
for index, url in enumerate(urls):
- self.download(url, f"{name}_{index + 1}.webp")
- elif l == 1:
- self.download(urls[0], f"{name}.mp4")
+ self.__download(url, f"{name}_{index + 1}.jpeg")
- def download(self, url: str, name: str):
+ def __download(self, url: str, name: str):
temp = self.temp.joinpath(name)
file = self.root.joinpath(name)
if self.manager.is_exists(file):
@@ -59,3 +63,9 @@ class Download:
except exceptions.ChunkedEncodingError:
self.manager.delete(temp)
print(f"网络异常,{name} 下载失败!")
+
+ @staticmethod
+ def __delete_cookie(headers: dict) -> dict:
+ download_headers = headers.copy()
+ del download_headers["Cookie"]
+ return download_headers
diff --git a/source/Html.py b/source/Html.py
index f39c621..7f405bc 100644
--- a/source/Html.py
+++ b/source/Html.py
@@ -42,3 +42,7 @@ class Html:
print("获取网页源码失败,请尝试设置 Cookie 后重试!")
return ""
return response.text
+
+ @staticmethod
+ def format_url(url: str) -> str:
+ return bytes(url, "utf-8").decode("unicode_escape")
diff --git a/source/Image.py b/source/Image.py
index a91a89e..84dc00f 100644
--- a/source/Image.py
+++ b/source/Image.py
@@ -1,20 +1,12 @@
from re import compile
+from .Html import Html
+
__all__ = ['Image']
class Image:
- IMAGE_API = "https://sns-img-qc.xhscdn.com/"
- IMAGE_ID = compile(r'"traceId":"(.*?)"')
+ IMAGE_URL = compile(r'"CRD_WM_.*?","url":"(.*?)"')
- def get_image_link(self, html: str):
- return self.__get_image_links(html)
-
- def __get_id(self, html: str) -> list:
- return self.IMAGE_ID.findall(html)
-
- def __generate_url(self, ids: list) -> list:
- return [self.IMAGE_API + i for i in ids]
-
- def __get_image_links(self, html: str) -> list:
- return self.__generate_url(self.__get_id(html))
+ def get_image_link(self, html: str) -> list:
+ return [Html.format_url(i) for i in self.IMAGE_URL.findall(html)]
diff --git a/source/Settings.py b/source/Settings.py
index 7f510ac..d459229 100644
--- a/source/Settings.py
+++ b/source/Settings.py
@@ -6,24 +6,25 @@ __all__ = ['Settings', 'Batch']
class Settings:
- path = Path("./settings.json")
+ file = Path(__file__).resolve().parent.parent.joinpath("./settings.json")
default = {
"path": "./",
"folder": "Download",
+ "cookie": "",
"proxies": None,
"timeout": 10,
"chunk": 256 * 1024,
}
def run(self):
- return self.read() if self.path.is_file() else self.create()
+ return self.read() if self.file.is_file() else self.create()
def read(self):
- with self.path.open("r", encoding="utf-8") as f:
+ with self.file.open("r", encoding="utf-8") as f:
return load(f)
def create(self):
- with self.path.open("w", encoding="utf-8") as f:
+ with self.file.open("w", encoding="utf-8") as f:
dump(self.default, f, indent=2)
return self.default
diff --git a/source/Video.py b/source/Video.py
index 1d9f064..8747430 100644
--- a/source/Video.py
+++ b/source/Video.py
@@ -1,5 +1,7 @@
from re import compile
+from .Html import Html
+
__all__ = ['Video']
@@ -7,11 +9,4 @@ class Video:
VIDEO_ID = compile(r'"masterUrl":"(.*?)"')
def get_video_link(self, html: str):
- return self.__get_video_link(html)
-
- def __get_video_link(self, html: str) -> list:
- return [self.clean_url(u) for u in self.VIDEO_ID.findall(html)]
-
- @staticmethod
- def clean_url(url: str) -> str:
- return bytes(url, "utf-8").decode("unicode_escape")
+ return [Html.format_url(u) for u in self.VIDEO_ID.findall(html)]
diff --git a/source/__init__.py b/source/__init__.py
index 3a6e3ae..e7e6a3c 100644
--- a/source/__init__.py
+++ b/source/__init__.py
@@ -1,3 +1,4 @@
+from pathlib import Path
from re import compile
from .Download import Download
@@ -12,8 +13,14 @@ __all__ = ['XHS', 'Settings', 'Batch']
class XHS:
+ ROOT = Path(__file__).resolve().parent.parent
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
+ "Cookie": "abRequestId=27dafe41-28af-5b33-9f22-fe05d8c4ac2f; xsecappid=xhs-pc-web; a1=18a363d90c9gw7eaz2krqhj4c"
+ "x2gtwgotul1wur8950000289463; webId=27fb29ed7ff41eadd4bc58197a465b63; web_session=030037a3d84590608f6"
+ "da85793234a9a6588ed; gid=yY0qKqfd2Y9qyY0qKqfj877FSjkEWd0uJTFA1YjxV4SCJy28k9EklE888JYj4Kq82242dKiY; w"
+ "ebBuild=3.8.1; websectiga=3633fe24d49c7dd0eb923edc8205740f10fdb18b25d424d2a2322c6196d2a4ad; sec_pois"
+ "on_id=179f847f-ba58-4ede-86bf-977d710da3b2; cache_feeds=[]",
}
links = compile(r"https://www.xiaohongshu.com/explore/[0-9a-z]+")
@@ -21,15 +28,18 @@ class XHS:
self,
path="./",
folder="Download",
+ cookie=None,
proxies=None,
timeout=10,
chunk=256 * 1024,
):
+ self.__update_cookie(cookie)
self.html = Html(self.headers, proxies, timeout)
self.image = Image()
self.video = Video()
self.explore = Explore()
self.download = Download(
+ self.ROOT,
path,
folder,
self.headers,
@@ -39,13 +49,13 @@ class XHS:
def __get_image(self, container: dict, html: str, download):
urls = self.image.get_image_link(html)
if download:
- self.download.run(urls, self.__naming_rules(container))
+ self.download.run(urls, self.__naming_rules(container), 1)
container["下载地址"] = urls
def __get_video(self, container: dict, html: str, download):
url = self.video.get_video_link(html)
if download:
- self.download.run(url, self.__naming_rules(container))
+ self.download.run(url, self.__naming_rules(container), 0)
container["下载地址"] = url
def extract(self, url: str, download=False) -> dict:
@@ -72,3 +82,7 @@ class XHS:
def __naming_rules(data: dict) -> str:
"""下载文件默认使用作品 ID 作为文件名,可修改此方法自定义文件名格式"""
return data["作品ID"]
+
+ def __update_cookie(self, cookie: str) -> None:
+ if cookie and isinstance(cookie, str):
+ self.headers["Cookie"] = cookie