使用 HTTPX 替代 AIOHTTP

This commit is contained in:
JoeanAmier
2024-06-27 18:45:14 +08:00
parent 01711be950
commit e7c4d6daee
18 changed files with 222 additions and 105 deletions

View File

@@ -5,10 +5,15 @@ from shutil import move
from shutil import rmtree
from typing import Callable
from aiohttp import ClientSession
from aiohttp import ClientTimeout
from httpx import AsyncClient
from httpx import RequestError
from httpx import TimeoutException
from httpx import get
from .static import HEADERS
from .static import USERAGENT
from .static import WARNING
from .tools import logging
__all__ = ["Manager"]
@@ -30,6 +35,10 @@ class Manager:
'作者昵称',
'作者ID',
)
NO_PROXY = {
"http://": None,
"https://": None,
}
SEPARATE = "_"
def __init__(
@@ -39,8 +48,9 @@ class Manager:
folder: str,
name_format: str,
chunk: int,
user_agent: str,
cookie: str,
proxy: str,
proxy: str | dict,
timeout: int,
retry: int,
record_data: bool,
@@ -51,12 +61,14 @@ class Manager:
folder_mode: bool,
# server: bool,
transition: Callable[[str], str],
_print: bool,
):
self.root = root
self.temp = root.joinpath("./temp")
self.path = self.__check_path(path)
self.folder = self.__check_folder(folder)
self.blank_headers = HEADERS
self.message = transition
self.blank_headers = HEADERS | {"User-Agent": user_agent or USERAGENT}
self.headers = self.blank_headers | {"Cookie": cookie}
self.retry = retry
self.chunk = chunk
@@ -64,16 +76,20 @@ class Manager:
self.record_data = self.check_bool(record_data, False)
self.image_format = self.__check_image_format(image_format)
self.folder_mode = self.check_bool(folder_mode, False)
self.proxy = proxy
self.request_session = ClientSession(
self.proxy_tip = None
self.proxy = self.__check_proxy(proxy)
self.print_proxy_tip(_print, )
self.request_client = AsyncClient(
headers=self.headers | {
"Referer": "https://www.xiaohongshu.com/explore", },
timeout=ClientTimeout(connect=timeout),
timeout=timeout,
**self.proxy,
)
self.download_session = ClientSession(
self.download_client = AsyncClient(
headers=self.blank_headers,
timeout=ClientTimeout(connect=timeout))
self.message = transition
timeout=timeout,
**self.proxy,
)
self.image_download = self.check_bool(image_download, True)
self.video_download = self.check_bool(video_download, True)
self.live_download = self.check_bool(live_download, True)
@@ -134,8 +150,8 @@ class Manager:
return value if isinstance(value, bool) else default
async def close(self):
await self.request_session.close()
await self.download_session.close()
await self.request_client.aclose()
await self.download_client.aclose()
self.__clean()
def __check_name_format(self, format_: str) -> str:
@@ -148,3 +164,38 @@ class Manager:
),
format_,
)
def __check_proxy(
self,
proxy: str | dict,
url="https://www.baidu.com/",
) -> dict:
if not proxy:
return {"proxies": self.NO_PROXY}
if isinstance(proxy, str):
kwarg = {"proxy": proxy}
elif isinstance(proxy, dict):
kwarg = {"proxies": proxy}
else:
self.proxy_tip = (
self.message("proxy 参数 {0} 设置错误,程序将不会使用代理").format(proxy), WARNING,)
return {"proxies": self.NO_PROXY}
try:
response = get(
url,
**kwarg, )
if response.status_code < 400:
self.proxy_tip = (self.message("代理 {0} 测试成功").format(proxy),)
return kwarg
except TimeoutException:
self.proxy_tip = (
self.message("代理 {0} 测试超时").format(proxy), WARNING,)
except RequestError as e:
self.proxy_tip = (
self.message("代理 {0} 测试失败:{1}").format(
proxy, e), WARNING,)
return {"proxies": self.NO_PROXY}
def print_proxy_tip(self, _print: bool = True, log=None, ) -> None:
if _print and self.proxy_tip:
logging(log, *self.proxy_tip)