fix(download.py): 修复文件下载失败的问题

This commit is contained in:
JoeanAmier 2024-08-09 19:07:05 +08:00
parent e2eee03feb
commit 0915574bc6

View File

@ -1,8 +1,10 @@
from asyncio import gather
from pathlib import Path
from typing import TYPE_CHECKING
from aiofiles import open
from httpx import HTTPError
from typing import TYPE_CHECKING
from source.module import ERROR
from source.module import Manager
from source.module import logging
@ -126,8 +128,9 @@ class Download:
@re_download
async def __download(self, url: str, path: Path, name: str, format_: str, log, bar):
headers = self.headers.copy()
try:
length, suffix = await self.__hand_file(url, format_, )
length, suffix = await self.__head_file(url, headers, format_, )
except HTTPError as error:
logging(log, str(error), ERROR)
logging(
@ -139,9 +142,10 @@ class Download:
return False
temp = self.temp.joinpath(f"{name}.{suffix}")
real = path.joinpath(f"{name}.{suffix}")
self.__update_headers_range(temp, )
self.__update_headers_range(headers, temp, )
try:
async with self.client.stream("GET", url, headers=self.headers) as response:
# print(f"{url} Stream Headers:", headers) # 调试代码
async with self.client.stream("GET", url, headers=headers, ) as response:
response.raise_for_status()
# self.__create_progress(
# bar,
@ -183,14 +187,16 @@ class Download:
def __extract_type(cls, content: str) -> str:
return cls.CONTENT_TYPE_MAP.get(content, "")
async def __hand_file(self,
async def __head_file(self,
url: str,
headers: dict[str, str],
suffix: str,
) -> [int, str]:
response = await self.client.head(url,
headers=self.headers | {
"Range": "bytes=0-",
}, )
# print(f"{url} Head Headers:", headers) # 调试代码
response = await self.client.head(
url,
headers=headers,
)
response.raise_for_status()
suffix = self.__extract_type(
response.headers.get("Content-Type")) or suffix
@ -202,6 +208,10 @@ class Download:
def __get_resume_byte_position(file: Path) -> int:
return file.stat().st_size if file.is_file() else 0
def __update_headers_range(self, file: Path) -> int:
self.headers["Range"] = f"bytes={(p := self.__get_resume_byte_position(file))}-"
def __update_headers_range(
self,
headers: dict[str, str],
file: Path,
) -> int:
headers["Range"] = f"bytes={(p := self.__get_resume_byte_position(file))}-"
return p