背景与技术选型
后端实时推送是现代应用提升用户体验的关键能力,而传统的轮询模式在高并发场景下往往表现出轮询成本高、带宽浪费与低效的资源利用率。
在面向浏览器的场景中,SSE(服务器发送事件)和 WebSocket 是两条主线,分别适用于不同的业务需求:SSE提供简单、单向的事件流,而 WebSocket提供全双工、低延迟的通信通道,从而让应用实现真正的实时推送,告别轮询成为现实。
以 FastAPI 为后端框架,可以通过异步编程模型快速搭建两种推送机制的实现,结合中间件与部署策略,能够在高并发场景中保持稳定性与可维护性,真正实现 FastAPI 实现后端实时推送 的目标。
FastAPI中的SSE实现实战
场景定位:新闻聚合、监控指标、社交更新等场景更适合使用 SSE 的单向推送能力,前端只需要接收数据、更新界面。
实现要点包括:定义稳定的 事件流格式、保证 连接保持、以及对客户端断线的自动重连处理,以确保实时性与可用性。
场景与限制
SSE 以文本数据流的形式传输,浏览器原生支持便于快速落地,但在需要双向交互或复杂通信时,需要结合其他通道实现。单向推送的特点使得实现简单、性能可控。
代理与缓存对 SSE 也会产生影响,因此在部署时需要设置 代理/网关的连接保持时间与 慢请求处理策略,确保事件流不会被中间层截断。
代码示例与要点
下面给出一个基于 FastAPI 与 sse-starlette 的简单示例,展示如何实现持续推送的数据流,帮助你快速理解整体架构。
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
import time
app = FastAPI()
async def event_stream():
while True:
yield f"data: {{\"time\": \"{time.time()}\"}}\\n\\n"
await asyncio.sleep(1)
@app.get("/stream")
async def stream():
return EventSourceResponse(event_stream())
要点总结:使用 EventSourceResponse 将异步生成器直接转换为 SSE 流,保持连接与合适的心跳间隔是稳定性的关键,能够实现真正的 后端实时推送。
FastAPI中的WebSocket实现实战
核心优势在于全双工通信与低延迟,适合需要即时交互、双向数据传输的应用场景,例如在线协作、实时聊天等。
为了实现稳定的高并发 WebSocket 服务,通常需要对连接进行管理、实现广播能力,并设计健壮的断线处理机制,确保连接恢复与状态一致性。
连接管理
要实现稳定的多连接广播,需要在服务端维护一个 活跃连接集合,并在断线时执行清理操作以释放资源,避免连接泄漏带来的内存压力。心跳机制还能帮助快速发现异常连接,提高系统可用性。
在实现中,并发保护和正确的异常处理是关键,确保单个连接异常不会影响到整体广播逻辑。
广播与分组
通过将连接按主题或房间进行归类,可以实现针对性广播,例如通知订阅某个话题的客户端,提升带宽使用效率与消息相关性。分组机制是实现高效广播的核心。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
active_connections: List[WebSocket] = []
async def broadcast(message: str):
for connection in active_connections:
await connection.send_text(message)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_connections.append(websocket)
try:
while True:
data = await websocket.receive_text()
# 将收到的消息广播给所有客户端
await broadcast(f"echo: {data}")
except WebSocketDisconnect:
active_connections.remove(websocket)
注意事项:在生产环境中,应考虑 连接上限、横向扩展、以及 分布式广播 的实现方式,确保系统在高并发下仍然稳定。
SSE与WebSocket对比与选型要点
在设计后端实时推送方案时,明确 场景匹配 是第一步,SSE 与 WebSocket 各有适用边界,选型要结合业务需求与基础设施能力。
综合考虑,延迟、吞吐、以及 兼容性等因素,决定了最终方案的边界条件与开发成本。
场景匹配
若需求侧主张单向推送、浏览器原生支持、实现成本低,SSE 是更简单的选择,尤其适用于传统的新闻、监控、日志等推送场景。告别轮询的优势在此时尤为突出。
若需要双向通信、低延迟交互、对客户端状态有实时控制,WebSocket 能带来更高的交互性,但也需要更多的资源与管理工作以保证稳定性。
性能与兼容性
SSE 在浏览器端原生支持良好,代理和负载均衡的配置需要额外关注,以避免中间节点对事件流的截断。心跳与重连策略是提升稳定性的关键。
WebSocket 虽然在多平台有良好支持,但要处理 连接数、内存占用、以及横向扩展 等问题,尤其是在多实例部署时,需要采用一致性哈希、消息代理或专门的广播中间件来实现跨实例广播。
快速落地的代码骨架
下面给出一个简明的代码骨架,帮助你在一个 FastAPI 服务中同时支持 SSE 与 WebSocket 的实时推送能力,便于对比与迭代。
快速服务器骨架整合了两种推送方式,便于你在同一个后端结构中进行性能对比与稳定性验证。
快速服务器骨架
该骨架中包含 FastAPI 应用、SSE 的端点,以及 WebSocket 的端点,方便你在同一个服务内进行对比与验证。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from sse_starlette.sse import EventSourceResponse
import asyncio
import time
from typing import List
app = FastAPI()
# SSE 流数据
async def sse_stream():
while True:
yield f"data: {{\"time\": \"{time.time()}\"}}\\n\\n"
await asyncio.sleep(1)
# WebSocket 连接集合
connections: List[WebSocket] = []
async def broadcast(message: str):
for ws in connections:
await ws.send_text(message)
@app.get("/sse/stream")
async def sse_endpoint():
return EventSourceResponse(sse_stream())
@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
await websocket.accept()
connections.append(websocket)
try:
while True:
msg = await websocket.receive_text()
await websocket.send_text(f"received: {msg}")
await broadcast(f"broadcast: {msg}")
except WebSocketDisconnect:
connections.remove(websocket) 

