Skip to content

server

Single-generator FastAPI + WebSocket streaming server.

Classes

Functions:

fastvideo.entrypoints.streaming.server.build_app

build_app(serve_config: ServeConfig, generator: _GeneratorProto | None = None, *, pool: GpuPool | None = None, session_store: SessionStore | None = None) -> FastAPI

Build the FastAPI app used by :func:run_server.

Exposed so tests can drive the WebSocket endpoint in-process via starlette.testclient.TestClient(app).websocket_connect(...).

Exactly one of generator (backed by :class:InProcessGpuPool) or pool (for the subprocess-backed production shape) must be given.

Source code in fastvideo/entrypoints/streaming/server.py
def build_app(
    serve_config: ServeConfig,
    generator: _GeneratorProto | None = None,
    *,
    pool: GpuPool | None = None,
    session_store: SessionStore | None = None,
) -> FastAPI:
    """Build the FastAPI app used by :func:`run_server`.

    Exposed so tests can drive the WebSocket endpoint in-process via
    ``starlette.testclient.TestClient(app).websocket_connect(...)``.

    Exactly one of ``generator`` (backed by :class:`InProcessGpuPool`)
    or ``pool`` (for the subprocess-backed production shape) must be
    given.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming "
                         "server; got None. Add a `streaming:` block to your serve config.")
    streaming = serve_config.streaming
    if (generator is None) == (pool is None):
        raise ValueError("build_app requires exactly one of `generator` or `pool`")

    store = session_store or InMemorySessionStore()
    if pool is None:
        assert generator is not None
        pool = InProcessGpuPool(generator, session_store=store)

    sessions = SessionManager(
        segment_cap=serve_config.streaming.generation_segment_cap,
        session_timeout_seconds=serve_config.streaming.session_timeout_seconds,
    )
    state = ServerState(
        serve_config=serve_config,
        pool=pool,
        sessions=sessions,
        session_store=store,
    )

    app = FastAPI(title="FastVideo Streaming")

    @app.get("/health")
    async def _health() -> JSONResponse:
        return JSONResponse({
            "status": "ok",
            "sessions": len(state.sessions),
            "stream_mode": streaming.stream_mode,
        })

    app.include_router(build_health_router(pool))

    @app.websocket("/v1/stream")
    async def _stream(websocket: WebSocket) -> None:
        await websocket.accept()
        try:
            session = state.sessions.create()
        except SessionRejected as exc:
            await _send_error(websocket, "session_rejected", str(exc), retryable=False)
            await websocket.close(code=_WS_CLOSE_TRY_AGAIN_LATER, reason="session_rejected")
            return

        try:
            await _handle_session(websocket, session, state)
        except WebSocketDisconnect:
            logger.info("session %s: client disconnected", session.id[:8])
        except Exception:  # pragma: no cover - defensive catch-all
            logger.exception("session %s: unhandled error", session.id[:8])
            with contextlib.suppress(InvalidSessionTransition):
                session.transition(SessionState.ERROR)
        finally:
            with contextlib.suppress(Exception):
                await state.pool.release(session.id)
            _cleanup_session(session, state)

    app.state.server_state = state
    return app

fastvideo.entrypoints.streaming.server.run_server

run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None

Launch the streaming server.

Boots a :class:fastvideo.VideoGenerator from serve_config.generator unless generator is provided, then serves build_app(...) via uvicorn.

Source code in fastvideo/entrypoints/streaming/server.py
def run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None:
    """Launch the streaming server.

    Boots a :class:`fastvideo.VideoGenerator` from
    ``serve_config.generator`` unless ``generator`` is provided, then
    serves ``build_app(...)`` via uvicorn.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming server; "
                         "got None. Add a `streaming:` block to your serve config.")

    import uvicorn

    if generator is None:
        from fastvideo import VideoGenerator  # lazy to avoid boot cost

        generator = VideoGenerator.from_pretrained(config=serve_config.generator)
    app = build_app(serve_config, generator)
    uvicorn.run(
        app,
        host=serve_config.server.host,
        port=serve_config.server.port,
    )