1. 准备工作与环境搭建
1.1 安装与环境准备
在开始使用 Python urllib3 发送 HTTP 请求之前,建议你搭建一个干净的开发环境,确保依赖可控。使用 虚拟环境 可以避免全局包冲突,并方便不同项目的隔离管理。
首先需要安装 urllib3,以及可选的辅助工具以提升安全性与兼容性。常用的安装命令为 pip,例如
pip install urllib3。如果你使用虚拟环境,确保已经激活后再执行安装。
下面提供一个快速的虚拟环境创建与激活示例,确保你具备一个可重复的环境:
python3 -m venv venv
source venv/bin/activate # Linux/macOS
venv\Scripts\activate # Windows
pip install urllib3
1.2 证书与依赖
对于 HTTPS 请求,证书校验是重要的安全环节。使用 certifi 这样的 CA 证书包可以提升对服务器证书的信任度,避免中间人攻击等风险,同时保持与主流站点的兼容性。
在实际工程中,除了安装 urllib3,通常还会安装 certifi,并在请求中开启证书校验。常见的做法是通过 ca_certs 或系统默认证书路径完成校验。
请确保你的环境具备网络访问权限,以便下载必要的依赖包与更新的 CA 证书链。若遇到证书错误,可以临时开启禁用证书校验进行排错,但生产环境强烈不建议这样做,需使用正确的证书链。
2. urllib3 的核心概念与基本用法
2.1 PoolManager 与请求流程
在 urllib3 中,核心的请求入口是 PoolManager,它提供连接池以复用 TCP 连接,显著提升吞吐量并减少连接建立成本。通过实例化一个 PoolManager,你可以发起对任意 URL 的同步请求。
请求流程通常包括:建立连接、发送请求、接收响应、对响应体进行读取,同时连接会被回收到池中以供后续重用。正确地管理连接池可以带来稳定的并发性能。
下面给出一个简化的示例,展示如何创建 PoolManager 并发起一个 GET 请求:
import urllib3
http = urllib3.PoolManager()
r = http.request('GET', 'https://httpbin.org/get', retries=2, timeout=urllib3.Timeout(connect=2.0, read=5.0))
print(r.status)
print(r.data)
2.2 基本用法示例
除了 GET,urllib3 还支持 POST、PUT、DELETE 等常用方法,并且提供了对超时、重试、代理等特性的内置支持。最常见的用法是通过 PoolManager 来统一管理请求。
在实践中,你可能会结合自定义头信息、查询参数、以及请求体来完成复杂的交互。下面是一个同时演示 GET 参数与自定义头信息的示例:
import urllib3
import json
http = urllib3.PoolManager()
headers = {'User-Agent': 'MyClient/1.0', 'Accept': 'application/json'}
params = {'key1': 'value1', 'key2': 'value2'}
r = http.request(
'GET',
'https://httpbin.org/get',
fields=params,
headers=headers
)
print(r.status)
print(r.data == json.dumps({'args': params}).encode())
3. GET 与 POST 请求的实操
3.1 GET 请求
GET 请求用于获取远端资源,参数通常以查询字符串形式附加在 URL 中。通过 PoolManager.request 的 fields 参数即可实现自动编码的查询参数拼接。
在实际场景中,你可能需要设置自定义的请求头、控制超时以及对响应进行流式处理。以下示例演示完整的 GET 请求流程与简单断言:
import urllib3
http = urllib3.PoolManager()
r = http.request('GET', 'https://httpbin.org/get', fields={'q':'urllib3','page':'1'})
assert r.status == 200
print(r.data.decode('utf-8'))
3.2 POST 请求
POST 请求用于提交数据到服务端,常见的场景包括提交 JSON、表单数据等。你需要在请求中设置正确的 Content-Type,并将数据以字节串形式传送。
下面的示例以 JSON 作为载荷进行 POST,请求头部显式指定了 Content-Type,并将 Python 字典序列化为 JSON 字符串:
import urllib3, json
http = urllib3.PoolManager()
payload = {'name': 'Alice', 'age': 30}
headers = {'Content-Type': 'application/json'}
r = http.request(
'POST',
'https://httpbin.org/post',
body=json.dumps(payload).encode('utf-8'),
headers=headers
)
print(r.status)
print(r.data.decode('utf-8'))
4. 超时、重试与错误处理
4.1 超时设置
合理的超时设置可以防止请求阻塞和资源耗尽。urllib3 提供了专门的 Timeout 对象,用于分离连接建立与读取/写入阶段的超时阈值。
通过传递 timeout 给 PoolManager,或者在单次请求中覆盖默认超时,可以实现对网络波动的鲁棒处理。下面示例展示了一个带有连接与读取超时的请求:
import urllib3
from urllib3.util.timeout import Timeout
timeout = Timeout(connect=2.0, read=5.0)
http = urllib3.PoolManager(timeout=timeout)
r = http.request('GET', 'https://httpbin.org/get')
print(r.status)
4.2 重试策略
网络环境不稳定时,重试机制可以提升成功率。Retry 对象提供了多种策略,常见的有总重试次数、按状态码触发重试,以及指数退避等。将 Retry 与 PoolManager 结合使用,可以实现灵活的断路保护。
下面给出一个简单的重试示例,遇到 5xx 状态码时自动重试,并在每次重试之间进行等待:
from urllib3 import PoolManager
from urllib3.util.retry import Retry
retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
http = PoolManager(retries=retry)
r = http.request('GET', 'https://httpbin.org/status/503')
print(r.status)
5. 下载与流式传输实战
5.1 流式下载
对于大文件下载,理想的做法是开启流式读取以避免将整块数据一次性加载到内存中。使用 preload_content=False,并对响应体进行分块读取。
下面的示例演示如何以 1KB 为单位进行分块下载并将数据写入本地文件:
import urllib3
http = urllib3.PoolManager()
r = http.request('GET', 'https://speed.hetzner.de/100MB.bin', preload_content=False)
with open('100MB.bin', 'wb') as f:
for chunk in r.stream(1024):
f.write(chunk)
r.release_conn()
5.2 小文件与错误处理的结合
对于较小的资源,可以直接使用 r.data 获取完整响应体;对于流式场景,记得在完成写入后调用 r.release_conn() 以释放连接回池。若出现网络中断,务必捕捉 urllib3.exceptions.HTTPError 等异常并进行日志记录,以便排错。
综合示例展示了从请求、流式读取到错误处理的完整流程,以及如何在应用层面对失败进行回退策略:
import urllib3
from urllib3.exceptions import HTTPError
http = urllib3.PoolManager()
try:
r = http.request('GET', 'https://httpbin.org/gzip', preload_content=False)
with open('gzip.bin', 'wb') as f:
for chunk in r.stream(1024):
f.write(chunk)
r.release_conn()
except HTTPError as e:
print('请求失败:', e)
6. 实战案例:并发请求与速率控制
6.1 使用 ThreadPoolExecutor 实现并发
在需要对同一目标进行大量并发请求时,可以借助 concurrent.futures.ThreadPoolExecutor 将 urllib3 请求分发到多个线程实现并发。但要注意对服务器的压力与本地网络带宽的限制。
下面的案例演示了如何用线程池同时发起多个请求,并对结果进行聚合处理:
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib3
http = urllib3.PoolManager()
urls = [f'https://httpbin.org/get?i={i}' for i in range(20)]
def fetch(url):
r = http.request('GET', url)
return r.status, r.data.decode('utf-8')
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
status, body = future.result()
results.append((url, status, body))
except Exception as exc:
results.append((url, None, str(exc)))
print('Total:', len(results))
6.2 简单的速率限制与重试配合
为了避免对目标服务造成突发压力,可以结合简易的速率控制与前述的重试策略。在实际应用中,常见做法是按批次延迟发送请求或使用带有 backoff 的重试策略进行自适应节流。
以下示例展示了将并发请求与每批次延时结合的思路,保持较低的并发峰值,同时在遇到错误时进行有限次重试:
import time
from concurrent.futures import ThreadPoolExecutor
from urllib3.util.retry import Retry
import urllib3
retry = Retry(total=2, backoff_factor=0.3, status_forcelist=[500, 502])
http = urllib3.PoolManager(retries=retry)
urls = [f'https://httpbin.org/get?i={i}' for i in range(30)]
def worker(url):
r = http.request('GET', url)
return url, r.status
with ThreadPoolExecutor(max_workers=6) as executor:
for i in range(0, len(urls), 6):
batch = urls[i:i+6]
futures = [executor.submit(worker, u) for u in batch]
for f in futures:
print(f.result())
time.sleep(0.5) # 简单批次延时,控制速率


