更新代码

This commit is contained in:
JoeamAmier 2023-08-27 17:12:17 +08:00
parent 198f07cb05
commit d9eb59d128
3 changed files with 61 additions and 13 deletions

26
main.py
View File

@ -3,11 +3,31 @@ from source import XHS
def example():
"""使用示例"""
# 测试链接
image_demo = "https://www.xiaohongshu.com/explore/64d1b406000000000103ee8d"
video_demo = "https://www.xiaohongshu.com/explore/64c05652000000000c0378e7"
xhs = XHS()
print(xhs.extract(image_demo))
print(xhs.extract(video_demo))
# 实例对象
path = "./" # 作品下载储存根路径,默认值:当前路径
folder = "Download" # 作品下载文件夹名称自动创建默认值Download
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.203",
} # 请求头
proxies = None # 代理
timeout = 10 # 网络请求超时限制默认值10
cookie = "" # 小红书网页 cookie获取数据失败时可以尝试手动设置
xhs = XHS(
path=path,
folder=folder,
headers=headers,
proxies=proxies,
timeout=timeout,
cookie=cookie) # 使用自定义参数
# xhs = XHS() # 使用默认参数
# 无需区分图文和视频作品
# 返回作品详细数据,包括下载地址
download = True # 启用自动下载作品文件
print(xhs.extract(image_demo, download=download))
print(xhs.extract(video_demo, download=download))
if __name__ == '__main__':

View File

@ -1,23 +1,50 @@
from pathlib import Path
from requests import exceptions
from requests import get
class Download:
chunk = 262144
def __init__(
self,
path,
folder,
headers: dict,
proxies=None, ):
self.root = Path(path).joinpath(folder)
self.headers = headers
self.root = self.init_root(path, folder)
self.headers = self.init_headers(headers)
self.proxies = {
"http": proxies,
"https": proxies,
"ftp": proxies,
}
def run(self, urls: list):
pass
@staticmethod
def init_headers(headers: dict) -> dict:
return {"User-Agent": headers["User-Agent"]}
def download(self, url: str):
pass
@staticmethod
def init_root(path: str, folder: str) -> Path:
root = Path(path).joinpath(folder)
if not root.is_dir():
root.mkdir()
return root
def run(self, urls: list, name: str):
if (l := len(urls)) > 1:
for index, url in enumerate(urls):
self.download(url, f"{name}_{index + 1}.webp")
elif l == 1:
self.download(urls[0], f"{name}.mp4")
def download(self, url: str, name: str):
try:
with get(url, headers=self.headers, proxies=self.proxies, stream=True) as response:
with self.root.joinpath(name).open("wb") as f:
for chunk in response.iter_content(chunk_size=self.chunk):
f.write(chunk)
print(f"{name} 下载成功!")
except exceptions.ChunkedEncodingError:
print("网络异常,下载文件失败!")

View File

@ -7,8 +7,9 @@ from .Video import Video
class XHS:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.203",
"Cookie": "abRequestId=fd245483-beed-57b0-abfc-440b6a6be2aa; webBuild=3.4.1; xsecappid=xhs-pc-web; a1=189fe37918ezx1jqcbe9fin95cnxqj2ewcbc250yp50000234538; webId=9fff21309cfd3e4f380a6c75ed463803; websectiga=f47eda31ec99545da40c2f731f0630efd2b0959e1dd10d5fedac3dce0bd1e04d; sec_poison_id=003395d3-6520-4a02-851a-17d093203251; web_session=030037a3efee2e602d5d16fca4234a8a44466c; gid=yYjidqWi2KE4yYjidqWjyS28YduCyVASDdjiDvU3Ij2SIS28CAVJdJ888Jq42qY88J44DyjS",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
"Referer": "https://www.xiaohongshu.com/",
"Cookie": "abRequestId=27dafe41-28af-5b33-9f22-fe05d8c4ac2f; xsecappid=xhs-pc-web; a1=18a363d90c9gw7eaz2krqhj4cx2gtwgotul1wur8950000289463; webId=27fb29ed7ff41eadd4bc58197a465b63; websectiga=cffd9dcea65962b05ab048ac76962acee933d26157113bb213105a116241fa6c; sec_poison_id=3a1e34ee-3535-4ee9-8186-4d574da5291e; web_session=030037a3d84590608f6da85793234a9a6588ed; gid=yY0qKqfd2Y9qyY0qKqfj877FSjkEWd0uJTFA1YjxV4SCJy28k9EklE888JYj4Kq82242dKiY; webBuild=3.6.0; cache_feeds=[]",
}
def __init__(
@ -33,13 +34,13 @@ class XHS:
def get_image(self, container: dict, html: str, download):
urls = self.image.get_image_link(html)
if download:
self.download.run(urls)
self.download.run(urls, container["作品ID"])
container["下载地址"] = urls
def get_video(self, container: dict, html: str, download):
url = self.video.get_video_link(html)
if download:
self.download.run([url])
self.download.run(url, container["作品ID"])
container["下载地址"] = url
def extract(self, url: str, download=False) -> dict: