Skip to content

main

Router FastAPI entry point.

Exposes the same /v1/stream WebSocket path the backend servers do, accepts a client, picks a healthy replica from the registry, and proxies frames bidirectionally.

PR 7.9 ships the minimum-viable shape: explicit replica list, single primary, JSON + binary passthrough in both directions, and a /status endpoint for operators. Sticky-session routing (so a reconnect lands on the same backend) is left for a follow-up.

Classes

Functions

fastvideo.entrypoints.streaming.router.main.build_router_app

build_router_app(config: RouterConfig, *, registry: ReplicaRegistry | None = None) -> FastAPI

Build the router FastAPI app.

registry can be injected for tests; defaults to one built from config.replicas.

Source code in fastvideo/entrypoints/streaming/router/main.py
def build_router_app(
    config: RouterConfig,
    *,
    registry: ReplicaRegistry | None = None,
) -> FastAPI:
    """Build the router FastAPI app.

    ``registry`` can be injected for tests; defaults to one built from
    ``config.replicas``.
    """
    registry = registry or ReplicaRegistry(config.replicas)
    state = _RouterState(
        config=config,
        registry=registry,
        stop_event=asyncio.Event(),
    )

    @contextlib.asynccontextmanager
    async def _lifespan(_app: FastAPI):
        state.health_task = asyncio.create_task(
            run_health_check_loop(
                registry=state.registry,
                config=state.config,
                stop_event=state.stop_event,
            ))
        try:
            yield
        finally:
            state.stop_event.set()
            if state.health_task is not None:
                with contextlib.suppress(asyncio.CancelledError):
                    await state.health_task

    app = FastAPI(title="FastVideo Streaming Router", lifespan=_lifespan)

    @app.get("/status")
    async def _status() -> JSONResponse:
        return JSONResponse({
            "replicas": [{
                "url": r.url,
                "primary": r.primary,
                "status": r.health.status.value,
                "last_ok_at": r.health.last_ok_at,
                "last_latency_ms": r.health.last_latency_ms,
                "consecutive_failures": r.health.consecutive_failures,
            } for r in state.registry.all()],
        })

    @app.websocket("/v1/stream")
    async def _proxy(websocket: WebSocket) -> None:
        await websocket.accept()
        replica = state.registry.select()
        if replica is None:
            await websocket.send_json({
                "type": "error",
                "code": "gpu_unavailable",
                "message": "router: no healthy replica available",
                "retryable": True,
            })
            await websocket.close(code=1013, reason="no_healthy_replica")
            return

        ws_url = _websocket_url_for(replica.url)
        try:
            await _bridge_session(websocket, ws_url)
        except WebSocketDisconnect:
            logger.info("router: client disconnected")
        except Exception as exc:
            logger.exception("router: bridge failed: %s", exc)
            with contextlib.suppress(RuntimeError):
                await websocket.send_json({
                    "type": "error",
                    "code": "worker_failed",
                    "message": f"router bridge failed: {exc}",
                    "retryable": True,
                })
            with contextlib.suppress(RuntimeError):
                await websocket.close(code=1011)

    app.state.router_state = state
    return app