mirror of
https://github.com/JoeanAmier/XHS-Downloader.git
synced 2025-12-26 12:56:22 +08:00
79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
from pathlib import Path
|
|
|
|
from requests import exceptions
|
|
from requests import get
|
|
|
|
from .Manager import Manager
|
|
|
|
__all__ = ['Download']
|
|
|
|
|
|
class Download:
|
|
manager = Manager()
|
|
|
|
def __init__(
|
|
self,
|
|
root: Path,
|
|
path: str,
|
|
folder: str,
|
|
headers: dict,
|
|
proxies=None,
|
|
chunk=256 * 1024, ):
|
|
self.temp = root.joinpath("./temp")
|
|
self.root = self.__init_root(root, path, folder)
|
|
self.headers = self.__delete_cookie(headers)
|
|
self.proxies = {
|
|
"http": proxies,
|
|
"https": proxies,
|
|
"ftp": proxies,
|
|
}
|
|
self.chunk = chunk
|
|
|
|
def __init_root(self, root: Path, path: str, folder: str) -> Path:
|
|
if path and (r := Path(path)).exists():
|
|
root = r.joinpath(folder or "Download")
|
|
else:
|
|
root = root.joinpath(folder or "Download")
|
|
if not root.is_dir():
|
|
root.mkdir()
|
|
if not self.temp.is_dir():
|
|
self.temp.mkdir()
|
|
return root
|
|
|
|
def run(self, urls: list, name: str, type_: int, log):
|
|
if type_ == 0:
|
|
self.__download(urls[0], f"{name}.mp4", log)
|
|
elif type_ == 1:
|
|
for index, url in enumerate(urls):
|
|
self.__download(url, f"{name}_{index + 1}.jpeg", log)
|
|
|
|
def __download(self, url: str, name: str, log):
|
|
temp = self.temp.joinpath(name)
|
|
file = self.root.joinpath(name)
|
|
if self.manager.is_exists(file):
|
|
self.output_prompt(f"文件 {name} 已存在,跳过下载!", log)
|
|
return
|
|
try:
|
|
with get(url, headers=self.headers, proxies=self.proxies, stream=True) as response:
|
|
with temp.open("wb") as f:
|
|
for chunk in response.iter_content(chunk_size=self.chunk):
|
|
f.write(chunk)
|
|
self.manager.move(temp, file)
|
|
self.output_prompt(f"文件 {name} 下载成功!", log)
|
|
except exceptions.ChunkedEncodingError:
|
|
self.manager.delete(temp)
|
|
self.output_prompt(f"网络异常,文件 {name} 下载失败!", log)
|
|
|
|
@staticmethod
|
|
def __delete_cookie(headers: dict) -> dict:
|
|
download_headers = headers.copy()
|
|
del download_headers["Cookie"]
|
|
return download_headers
|
|
|
|
@staticmethod
|
|
def output_prompt(tip: str, log):
|
|
if log:
|
|
log.write_line(tip)
|
|
else:
|
|
print(tip)
|