mirror of
https://github.com/JoeanAmier/XHS-Downloader.git
synced 2026-03-22 06:57:16 +08:00
更新 TUI 界面
This commit is contained in:
16
main.py
16
main.py
@@ -90,20 +90,20 @@ class XHSDownloader(App):
|
|||||||
|
|
||||||
def solo(self):
|
def solo(self):
|
||||||
url = self.query_one(Input).value
|
url = self.query_one(Input).value
|
||||||
self.APP.extract(url, download=True)
|
self.APP.extract(url, True, self.query_one(Log))
|
||||||
# self.query_one(Log).write_line()
|
|
||||||
|
|
||||||
def batch(self):
|
def batch(self):
|
||||||
urls = self.Batch.read_txt()
|
urls = self.Batch.read_txt()
|
||||||
|
log = self.query_one(Log)
|
||||||
if not urls:
|
if not urls:
|
||||||
self.query_one(Log).write_line("未检测到 xhs.txt 文件 或者 该文件为空!")
|
log.write_line("未检测到 xhs.txt 文件 或者 该文件为空!")
|
||||||
for url in urls:
|
for url in urls:
|
||||||
self.query_one(Log).write_line(f"当前作品链接: {url}")
|
log.write_line(f"当前作品链接: {url}")
|
||||||
self.APP.extract(url)
|
self.APP.extract(url, True, log)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# example()
|
# example()
|
||||||
program()
|
# program()
|
||||||
# app = XHSDownloader()
|
app = XHSDownloader()
|
||||||
# app.run()
|
app.run()
|
||||||
|
|||||||
@@ -40,18 +40,18 @@ class Download:
|
|||||||
self.temp.mkdir()
|
self.temp.mkdir()
|
||||||
return root
|
return root
|
||||||
|
|
||||||
def run(self, urls: list, name: str, type_: int):
|
def run(self, urls: list, name: str, type_: int, log):
|
||||||
if type_ == 0:
|
if type_ == 0:
|
||||||
self.__download(urls[0], f"{name}.mp4")
|
self.__download(urls[0], f"{name}.mp4", log)
|
||||||
elif type_ == 1:
|
elif type_ == 1:
|
||||||
for index, url in enumerate(urls):
|
for index, url in enumerate(urls):
|
||||||
self.__download(url, f"{name}_{index + 1}.jpeg")
|
self.__download(url, f"{name}_{index + 1}.jpeg", log)
|
||||||
|
|
||||||
def __download(self, url: str, name: str):
|
def __download(self, url: str, name: str, log):
|
||||||
temp = self.temp.joinpath(name)
|
temp = self.temp.joinpath(name)
|
||||||
file = self.root.joinpath(name)
|
file = self.root.joinpath(name)
|
||||||
if self.manager.is_exists(file):
|
if self.manager.is_exists(file):
|
||||||
print(f"{name} 已存在,跳过下载!")
|
self.output_prompt(f"文件 {name} 已存在,跳过下载!", log)
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
with get(url, headers=self.headers, proxies=self.proxies, stream=True) as response:
|
with get(url, headers=self.headers, proxies=self.proxies, stream=True) as response:
|
||||||
@@ -59,13 +59,20 @@ class Download:
|
|||||||
for chunk in response.iter_content(chunk_size=self.chunk):
|
for chunk in response.iter_content(chunk_size=self.chunk):
|
||||||
f.write(chunk)
|
f.write(chunk)
|
||||||
self.manager.move(temp, file)
|
self.manager.move(temp, file)
|
||||||
print(f"{name} 下载成功!")
|
self.output_prompt(f"文件 {name} 下载成功!", log)
|
||||||
except exceptions.ChunkedEncodingError:
|
except exceptions.ChunkedEncodingError:
|
||||||
self.manager.delete(temp)
|
self.manager.delete(temp)
|
||||||
print(f"网络异常,{name} 下载失败!")
|
self.output_prompt(f"网络异常,文件 {name} 下载失败!", log)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __delete_cookie(headers: dict) -> dict:
|
def __delete_cookie(headers: dict) -> dict:
|
||||||
download_headers = headers.copy()
|
download_headers = headers.copy()
|
||||||
del download_headers["Cookie"]
|
del download_headers["Cookie"]
|
||||||
return download_headers
|
return download_headers
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def output_prompt(tip: str, log):
|
||||||
|
if log:
|
||||||
|
log.write_line(tip)
|
||||||
|
else:
|
||||||
|
print(tip)
|
||||||
|
|||||||
@@ -46,19 +46,19 @@ class XHS:
|
|||||||
proxies,
|
proxies,
|
||||||
chunk)
|
chunk)
|
||||||
|
|
||||||
def __get_image(self, container: dict, html: str, download):
|
def __get_image(self, container: dict, html: str, download, log):
|
||||||
urls = self.image.get_image_link(html)
|
urls = self.image.get_image_link(html)
|
||||||
if download:
|
if download:
|
||||||
self.download.run(urls, self.__naming_rules(container), 1)
|
self.download.run(urls, self.__naming_rules(container), 1, log)
|
||||||
container["下载地址"] = urls
|
container["下载地址"] = urls
|
||||||
|
|
||||||
def __get_video(self, container: dict, html: str, download):
|
def __get_video(self, container: dict, html: str, download, log):
|
||||||
url = self.video.get_video_link(html)
|
url = self.video.get_video_link(html)
|
||||||
if download:
|
if download:
|
||||||
self.download.run(url, self.__naming_rules(container), 0)
|
self.download.run(url, self.__naming_rules(container), 0, log)
|
||||||
container["下载地址"] = url
|
container["下载地址"] = url
|
||||||
|
|
||||||
def extract(self, url: str, download=False) -> dict:
|
def extract(self, url: str, download=False, log=None) -> dict:
|
||||||
if not self.__check(url):
|
if not self.__check(url):
|
||||||
print(f"无效的作品链接: {url}")
|
print(f"无效的作品链接: {url}")
|
||||||
return {}
|
return {}
|
||||||
@@ -70,9 +70,9 @@ class XHS:
|
|||||||
print(f"获取作品数据失败: {url}")
|
print(f"获取作品数据失败: {url}")
|
||||||
return {}
|
return {}
|
||||||
if data["作品类型"] == "视频":
|
if data["作品类型"] == "视频":
|
||||||
self.__get_video(data, html, download)
|
self.__get_video(data, html, download, log)
|
||||||
else:
|
else:
|
||||||
self.__get_image(data, html, download)
|
self.__get_image(data, html, download, log)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def __check(self, url: str):
|
def __check(self, url: str):
|
||||||
|
|||||||
@@ -8,4 +8,5 @@ Label {
|
|||||||
color: white;
|
color: white;
|
||||||
content-align-horizontal: center;
|
content-align-horizontal: center;
|
||||||
content-align-vertical: middle;
|
content-align-vertical: middle;
|
||||||
|
text-style: bold;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user