Router FastAPI entry point.
Exposes the same /v1/stream WebSocket path the backend servers do,
accepts a client, picks a healthy replica from the registry, and
proxies frames bidirectionally.
PR 7.9 ships the minimum-viable shape: explicit replica list, single
primary, JSON + binary passthrough in both directions, and a
/status endpoint for operators. Sticky-session routing (so a
reconnect lands on the same backend) is left for a follow-up.
Classes
Functions
fastvideo.entrypoints.streaming.router.main.build_router_app
Build the router FastAPI app.
registry can be injected for tests; defaults to one built from
config.replicas.
Source code in fastvideo/entrypoints/streaming/router/main.py
| def build_router_app(
config: RouterConfig,
*,
registry: ReplicaRegistry | None = None,
) -> FastAPI:
"""Build the router FastAPI app.
``registry`` can be injected for tests; defaults to one built from
``config.replicas``.
"""
registry = registry or ReplicaRegistry(config.replicas)
state = _RouterState(
config=config,
registry=registry,
stop_event=asyncio.Event(),
)
@contextlib.asynccontextmanager
async def _lifespan(_app: FastAPI):
state.health_task = asyncio.create_task(
run_health_check_loop(
registry=state.registry,
config=state.config,
stop_event=state.stop_event,
))
try:
yield
finally:
state.stop_event.set()
if state.health_task is not None:
with contextlib.suppress(asyncio.CancelledError):
await state.health_task
app = FastAPI(title="FastVideo Streaming Router", lifespan=_lifespan)
@app.get("/status")
async def _status() -> JSONResponse:
return JSONResponse({
"replicas": [{
"url": r.url,
"primary": r.primary,
"status": r.health.status.value,
"last_ok_at": r.health.last_ok_at,
"last_latency_ms": r.health.last_latency_ms,
"consecutive_failures": r.health.consecutive_failures,
} for r in state.registry.all()],
})
@app.websocket("/v1/stream")
async def _proxy(websocket: WebSocket) -> None:
await websocket.accept()
replica = state.registry.select()
if replica is None:
await websocket.send_json({
"type": "error",
"code": "gpu_unavailable",
"message": "router: no healthy replica available",
"retryable": True,
})
await websocket.close(code=1013, reason="no_healthy_replica")
return
ws_url = _websocket_url_for(replica.url)
try:
await _bridge_session(websocket, ws_url)
except WebSocketDisconnect:
logger.info("router: client disconnected")
except Exception as exc:
logger.exception("router: bridge failed: %s", exc)
with contextlib.suppress(RuntimeError):
await websocket.send_json({
"type": "error",
"code": "worker_failed",
"message": f"router bridge failed: {exc}",
"retryable": True,
})
with contextlib.suppress(RuntimeError):
await websocket.close(code=1011)
app.state.router_state = state
return app
|