广告

如何用 Python urllib3 发送 HTTP 请求:完整教程与实战案例

1. 准备工作与环境搭建

1.1 安装与环境准备

在开始使用 Python urllib3 发送 HTTP 请求之前,建议你搭建一个干净的开发环境,确保依赖可控。使用 虚拟环境 可以避免全局包冲突,并方便不同项目的隔离管理。

首先需要安装 urllib3,以及可选的辅助工具以提升安全性与兼容性。常用的安装命令为 pip,例如

pip install urllib3
。如果你使用虚拟环境,确保已经激活后再执行安装。

下面提供一个快速的虚拟环境创建与激活示例,确保你具备一个可重复的环境:

python3 -m venv venv
source venv/bin/activate  # Linux/macOS
venv\Scripts\activate     # Windows
pip install urllib3

1.2 证书与依赖

对于 HTTPS 请求,证书校验是重要的安全环节。使用 certifi 这样的 CA 证书包可以提升对服务器证书的信任度,避免中间人攻击等风险,同时保持与主流站点的兼容性。

在实际工程中,除了安装 urllib3,通常还会安装 certifi,并在请求中开启证书校验。常见的做法是通过 ca_certs 或系统默认证书路径完成校验。

请确保你的环境具备网络访问权限,以便下载必要的依赖包与更新的 CA 证书链。若遇到证书错误,可以临时开启禁用证书校验进行排错,但生产环境强烈不建议这样做,需使用正确的证书链。

2. urllib3 的核心概念与基本用法

2.1 PoolManager 与请求流程

urllib3 中,核心的请求入口是 PoolManager,它提供连接池以复用 TCP 连接,显著提升吞吐量并减少连接建立成本。通过实例化一个 PoolManager,你可以发起对任意 URL 的同步请求。

请求流程通常包括:建立连接、发送请求、接收响应、对响应体进行读取,同时连接会被回收到池中以供后续重用。正确地管理连接池可以带来稳定的并发性能。

下面给出一个简化的示例,展示如何创建 PoolManager 并发起一个 GET 请求:

import urllib3

http = urllib3.PoolManager()
r = http.request('GET', 'https://httpbin.org/get', retries=2, timeout=urllib3.Timeout(connect=2.0, read=5.0))

print(r.status)
print(r.data)

2.2 基本用法示例

除了 GET,urllib3 还支持 POST、PUT、DELETE 等常用方法,并且提供了对超时、重试、代理等特性的内置支持。最常见的用法是通过 PoolManager 来统一管理请求。

在实践中,你可能会结合自定义头信息、查询参数、以及请求体来完成复杂的交互。下面是一个同时演示 GET 参数与自定义头信息的示例:

import urllib3
import json

http = urllib3.PoolManager()
headers = {'User-Agent': 'MyClient/1.0', 'Accept': 'application/json'}
params = {'key1': 'value1', 'key2': 'value2'}

r = http.request(
    'GET',
    'https://httpbin.org/get',
    fields=params,
    headers=headers
)

print(r.status)
print(r.data == json.dumps({'args': params}).encode())

3. GET 与 POST 请求的实操

3.1 GET 请求

GET 请求用于获取远端资源,参数通常以查询字符串形式附加在 URL 中。通过 PoolManager.requestfields 参数即可实现自动编码的查询参数拼接。

在实际场景中,你可能需要设置自定义的请求头、控制超时以及对响应进行流式处理。以下示例演示完整的 GET 请求流程与简单断言:

import urllib3

http = urllib3.PoolManager()
r = http.request('GET', 'https://httpbin.org/get', fields={'q':'urllib3','page':'1'})

assert r.status == 200
print(r.data.decode('utf-8'))

3.2 POST 请求

POST 请求用于提交数据到服务端,常见的场景包括提交 JSON、表单数据等。你需要在请求中设置正确的 Content-Type,并将数据以字节串形式传送。

下面的示例以 JSON 作为载荷进行 POST,请求头部显式指定了 Content-Type,并将 Python 字典序列化为 JSON 字符串:

import urllib3, json

http = urllib3.PoolManager()
payload = {'name': 'Alice', 'age': 30}
headers = {'Content-Type': 'application/json'}

r = http.request(
    'POST',
    'https://httpbin.org/post',
    body=json.dumps(payload).encode('utf-8'),
    headers=headers
)

print(r.status)
print(r.data.decode('utf-8'))

4. 超时、重试与错误处理

4.1 超时设置

合理的超时设置可以防止请求阻塞和资源耗尽。urllib3 提供了专门的 Timeout 对象,用于分离连接建立与读取/写入阶段的超时阈值。

通过传递 timeout 给 PoolManager,或者在单次请求中覆盖默认超时,可以实现对网络波动的鲁棒处理。下面示例展示了一个带有连接与读取超时的请求:

import urllib3
from urllib3.util.timeout import Timeout

timeout = Timeout(connect=2.0, read=5.0)
http = urllib3.PoolManager(timeout=timeout)

r = http.request('GET', 'https://httpbin.org/get')
print(r.status)

4.2 重试策略

网络环境不稳定时,重试机制可以提升成功率。Retry 对象提供了多种策略,常见的有总重试次数、按状态码触发重试,以及指数退避等。将 RetryPoolManager 结合使用,可以实现灵活的断路保护。

下面给出一个简单的重试示例,遇到 5xx 状态码时自动重试,并在每次重试之间进行等待:

from urllib3 import PoolManager
from urllib3.util.retry import Retry

retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
http = PoolManager(retries=retry)

r = http.request('GET', 'https://httpbin.org/status/503')
print(r.status)

5. 下载与流式传输实战

5.1 流式下载

对于大文件下载,理想的做法是开启流式读取以避免将整块数据一次性加载到内存中。使用 preload_content=False,并对响应体进行分块读取。

下面的示例演示如何以 1KB 为单位进行分块下载并将数据写入本地文件:

import urllib3

http = urllib3.PoolManager()
r = http.request('GET', 'https://speed.hetzner.de/100MB.bin', preload_content=False)

with open('100MB.bin', 'wb') as f:
    for chunk in r.stream(1024):
        f.write(chunk)

r.release_conn()

5.2 小文件与错误处理的结合

对于较小的资源,可以直接使用 r.data 获取完整响应体;对于流式场景,记得在完成写入后调用 r.release_conn() 以释放连接回池。若出现网络中断,务必捕捉 urllib3.exceptions.HTTPError 等异常并进行日志记录,以便排错。

综合示例展示了从请求、流式读取到错误处理的完整流程,以及如何在应用层面对失败进行回退策略:

import urllib3
from urllib3.exceptions import HTTPError

http = urllib3.PoolManager()
try:
    r = http.request('GET', 'https://httpbin.org/gzip', preload_content=False)
    with open('gzip.bin', 'wb') as f:
        for chunk in r.stream(1024):
            f.write(chunk)
    r.release_conn()
except HTTPError as e:
    print('请求失败:', e)

6. 实战案例:并发请求与速率控制

6.1 使用 ThreadPoolExecutor 实现并发

在需要对同一目标进行大量并发请求时,可以借助 concurrent.futures.ThreadPoolExecutorurllib3 请求分发到多个线程实现并发。但要注意对服务器的压力与本地网络带宽的限制。

下面的案例演示了如何用线程池同时发起多个请求,并对结果进行聚合处理:

import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib3

http = urllib3.PoolManager()
urls = [f'https://httpbin.org/get?i={i}' for i in range(20)]

def fetch(url):
    r = http.request('GET', url)
    return r.status, r.data.decode('utf-8')

results = []
with ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(fetch, url): url for url in urls}
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            status, body = future.result()
            results.append((url, status, body))
        except Exception as exc:
            results.append((url, None, str(exc)))

print('Total:', len(results))

6.2 简单的速率限制与重试配合

为了避免对目标服务造成突发压力,可以结合简易的速率控制与前述的重试策略。在实际应用中,常见做法是按批次延迟发送请求或使用带有 backoff 的重试策略进行自适应节流。

以下示例展示了将并发请求与每批次延时结合的思路,保持较低的并发峰值,同时在遇到错误时进行有限次重试:

import time
from concurrent.futures import ThreadPoolExecutor
from urllib3.util.retry import Retry
import urllib3

retry = Retry(total=2, backoff_factor=0.3, status_forcelist=[500, 502])
http = urllib3.PoolManager(retries=retry)

urls = [f'https://httpbin.org/get?i={i}' for i in range(30)]
def worker(url):
    r = http.request('GET', url)
    return url, r.status

with ThreadPoolExecutor(max_workers=6) as executor:
    for i in range(0, len(urls), 6):
        batch = urls[i:i+6]
        futures = [executor.submit(worker, u) for u in batch]
        for f in futures:
            print(f.result())
        time.sleep(0.5)  # 简单批次延时,控制速率

广告

后端开发标签