fix: add urlib response content encording

This commit is contained in:
ihmily 2024-05-10 12:28:05 +08:00
parent cd2a5472d4
commit 3a0079a722

View File

@ -4,11 +4,11 @@
Author: Hmily
GitHub: https://github.com/ihmily
Date: 2023-07-15 23:15:00
Update: 2024-05-09 13:03:17
Update: 2024-05-10 12:25:33
Copyright (c) 2023 by Hmily, All Rights Reserved.
Function: Get live stream data.
"""
import gzip
import hashlib
import random
import time
@ -44,7 +44,8 @@ def get_req(
data: Union[dict, bytes, None] = None,
json_data: Union[dict, list, None] = None,
timeout: int = 20,
abroad: bool = False
abroad: bool = False,
content_conding: str = 'utf-8'
) -> Union[str, Any]:
if headers is None:
headers = {}
@ -62,22 +63,30 @@ def get_req(
resp_str = response.text
else:
if data and not isinstance(data, bytes):
data = urllib.parse.urlencode(data).encode('utf-8')
data = urllib.parse.urlencode(data).encode(content_conding)
if json_data and isinstance(json_data, (dict, list)):
data = json.dumps(json_data).encode('utf-8')
data = json.dumps(json_data).encode(content_conding)
req = urllib.request.Request(url, data=data, headers=headers)
try:
if abroad:
with urllib.request.urlopen(req, timeout=timeout) as response:
resp_str = response.read().decode('utf-8')
response = urllib.request.urlopen(req, timeout=timeout)
else:
with opener.open(req, timeout=timeout) as response:
resp_str = response.read().decode('utf-8')
response = opener.open(req, timeout=timeout)
content_encoding = response.info().get('Content-Encoding')
try:
if content_encoding == 'gzip':
with gzip.open(response, 'rt', encoding=content_conding) as gzipped:
resp_str = gzipped.read()
else:
resp_str = response.read().decode(content_conding)
finally:
response.close()
except urllib.error.HTTPError as e:
if e.code == 400:
resp_str = e.read().decode('utf-8')
resp_str = e.read().decode(content_conding)
else:
raise
except urllib.error.URLError as e:
@ -99,8 +108,6 @@ def get_params(url: str, params: str) -> Union[str, None]:
if params in query_params:
return query_params[params][0]
else:
return None
def jsonp_to_json(jsonp_str: str) -> Union[dict, None]:
@ -112,8 +119,7 @@ def jsonp_to_json(jsonp_str: str) -> Union[dict, None]:
json_obj = json.loads(json_str)
return json_obj
else:
print("No JSON data found in JSONP response.")
return None
raise Exception("No JSON data found in JSONP response.")
def replace_url(file_path: str, old: str, new: str) -> None: