Skip to content

streaming

Classes

fastvideo.entrypoints.streaming.BlobStore

Bases: ABC

Opaque byte-blob storage keyed by id.

A :class:ContinuationState payload can reference large tensors stored in a :class:BlobStore rather than inlining them, so the JSON payload stays small when the state travels over the wire.

Methods:

fastvideo.entrypoints.streaming.BlobStore.drop abstractmethod
drop(blob_id: str) -> None

Remove a blob. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, blob_id: str) -> None:
    """Remove a blob. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.BlobStore.get abstractmethod
get(blob_id: str) -> bytes

Load a previously stored blob. Raises KeyError if absent.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def get(self, blob_id: str) -> bytes:
    """Load a previously stored blob. Raises ``KeyError`` if absent."""
fastvideo.entrypoints.streaming.BlobStore.put abstractmethod
put(data: bytes, *, mime: str = 'application/octet-stream') -> str

Store data and return a blob id for later retrieval.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def put(self, data: bytes, *, mime: str = "application/octet-stream") -> str:
    """Store ``data`` and return a blob id for later retrieval."""

fastvideo.entrypoints.streaming.FragmentedMP4Chunk dataclass

FragmentedMP4Chunk(kind: Literal['init', 'media'], data: bytes, stream_id: str, segment_idx: int)

A single fMP4 byte chunk emitted by :class:FragmentedMP4Encoder.

kind identifies whether the chunk is the init segment (must be fed into the client's SourceBuffer first) or a media fragment.

fastvideo.entrypoints.streaming.FragmentedMP4Encoder

FragmentedMP4Encoder(*, width: int, height: int, fps: int, segment_idx: int, stream_id: str | None = None, ffmpeg_path: str = 'ffmpeg', preset: str = 'ultrafast', pixel_format_out: str = 'yuv420p', extra_args: list[str] | None = None)

Stream RGB frames in, fMP4 chunks out.

One encoder covers one segment. The server creates a new encoder per :class:`ltx2_segment_start`` boundary so each segment becomes one media fragment the client can append independently.

Example::

encoder = FragmentedMP4Encoder(width=1024, height=576, fps=24,
                                segment_idx=0)
async with encoder:
    async for chunk in encoder.encode(frames):
        await websocket.send_bytes(chunk.data)
Source code in fastvideo/entrypoints/streaming/stream.py
def __init__(
    self,
    *,
    width: int,
    height: int,
    fps: int,
    segment_idx: int,
    stream_id: str | None = None,
    ffmpeg_path: str = "ffmpeg",
    preset: str = "ultrafast",
    pixel_format_out: str = "yuv420p",
    extra_args: list[str] | None = None,
) -> None:
    self.width = width
    self.height = height
    self.fps = fps
    self.segment_idx = segment_idx
    self.stream_id = stream_id or uuid.uuid4().hex
    self._ffmpeg_path = ffmpeg_path
    self._preset = preset
    self._pixel_format_out = pixel_format_out
    self._extra_args = list(extra_args or [])
    self._proc: subprocess.Popen | None = None
    self._init_emitted = False

Methods:

fastvideo.entrypoints.streaming.FragmentedMP4Encoder.encode async
encode(frames: list[ndarray] | AsyncIterator[ndarray]) -> AsyncIterator[FragmentedMP4Chunk]

Feed frames into ffmpeg and yield fMP4 chunks as they appear.

Source code in fastvideo/entrypoints/streaming/stream.py
async def encode(
    self,
    frames: list[np.ndarray] | AsyncIterator[np.ndarray],
) -> AsyncIterator[FragmentedMP4Chunk]:
    """Feed frames into ffmpeg and yield fMP4 chunks as they appear."""
    if self._proc is None:
        self._spawn()
    assert self._proc is not None and self._proc.stdin is not None
    proc = self._proc

    loop = asyncio.get_running_loop()

    async def _writer() -> None:
        try:
            if hasattr(frames, "__aiter__"):
                async for frame in frames:  # type: ignore[union-attr]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
            else:
                for frame in frames:  # type: ignore[assignment]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
        finally:
            with contextlib.suppress(BrokenPipeError):
                proc.stdin.close()

    writer_task = asyncio.create_task(_writer())
    try:
        reader = proc.stdout
        assert reader is not None
        # Read in reasonably-sized chunks; MSE tolerates any size
        # but we don't want to starve the event loop.
        chunk_size = 64 * 1024
        while True:
            data = await loop.run_in_executor(None, reader.read, chunk_size)
            if not data:
                break
            kind: Literal["init", "media"] = "init" if not self._init_emitted else "media"
            self._init_emitted = True
            yield FragmentedMP4Chunk(
                kind=kind,
                data=bytes(data),
                stream_id=self.stream_id,
                segment_idx=self.segment_idx,
            )
    finally:
        await writer_task

fastvideo.entrypoints.streaming.GpuPool

Bases: ABC

Abstract GPU pool.

acquire binds a session to a worker and holds that binding across segments so continuation state can stay hot. run submits a single GenerationRequest for a bound session.

Acquire / release are independent of run — a session can run many segments on one acquired worker, and must release on disconnect.

fastvideo.entrypoints.streaming.InMemoryBlobStore

InMemoryBlobStore()

Bases: BlobStore

Thread-safe in-memory :class:BlobStore for single-process servers.

No eviction policy — callers are responsible for calling :meth:drop when a blob's owning state is replaced or a session ends. A redis- or filesystem-backed :class:BlobStore should replace this when the streaming server lands as a real service (PR 7.5+).

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._blobs: dict[str, _BlobRecord] = {}
    self._lock = threading.Lock()

fastvideo.entrypoints.streaming.InMemorySessionStore

InMemorySessionStore()

Bases: SessionStore

Thread-safe in-memory :class:SessionStore.

Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.

No eviction / TTL / bounded capacity — sessions only leave via :meth:drop. The live streaming server (PR 7.5+) is responsible for bounding growth and for dropping any :class:BlobStore blobs referenced by a state when that state is replaced or a session ends; this class does not know about blobs.

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._sessions: dict[str, ContinuationState] = {}
    self._lock = threading.Lock()

fastvideo.entrypoints.streaming.InProcessGpuPool

InProcessGpuPool(generator: _GeneratorLike, *, gpu_id: int = 0, session_store: SessionStore | None = None)

Bases: GpuPool

Single-process pool backed by one :class:_GeneratorLike.

This is what PR 7.5's server uses by default; PR 7.6 adds the real SubprocessGpuPool alternative but keeps this one for tests and small deployments.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
def __init__(
    self,
    generator: _GeneratorLike,
    *,
    gpu_id: int = 0,
    session_store: SessionStore | None = None,
) -> None:
    self._generator = generator
    self._gpu_id = gpu_id
    self._worker_id = f"inproc-{uuid.uuid4().hex[:6]}"
    self._session_store = session_store or InMemorySessionStore()
    self._active: dict[str, PoolAssignment] = {}
    self._lock = asyncio.Lock()
    self._gen_lock = asyncio.Lock()

fastvideo.entrypoints.streaming.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.MockGenerator dataclass

MockGenerator(sleep_ms: float = 0.0)

Generator stand-in that returns synthetic gradient frames.

Each call produces one segment worth of frames whose pixels vary by a constant derived from the request seed and segment index. Latency is configurable via sleep_ms so the caller can exercise slow- generate scenarios without spinning a GPU.

fastvideo.entrypoints.streaming.PoolAcquireTimeout

Bases: RuntimeError

Raised when acquire times out waiting for a free worker.

fastvideo.entrypoints.streaming.PromptEnhancer

PromptEnhancer(*, providers: Sequence[LLMProvider], model: str, timeout_ms: int = 20000, temperature: float = 0.7, max_tokens: int | None = 256, system_prompt_dir: str | None = None)

Orchestrates prompt operations across a priority-ordered provider list with structured fallback + hot-reloadable system prompts.

Usage::

enhancer = PromptEnhancer(
    providers=[CerebrasProvider(), GroqProvider()],
    model="gpt-oss-120b",
    system_prompt_dir="/etc/fastvideo/prompts",
)
response = await enhancer.enhance("a fox running through snow")
Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def __init__(
    self,
    *,
    providers: Sequence[LLMProvider],
    model: str,
    timeout_ms: int = 20000,
    temperature: float = 0.7,
    max_tokens: int | None = 256,
    system_prompt_dir: str | None = None,
) -> None:
    if not providers:
        raise ValueError("PromptEnhancer requires at least one LLMProvider")
    self._providers = list(providers)
    self._model = model
    self._timeout_ms = timeout_ms
    self._temperature = temperature
    self._max_tokens = max_tokens
    self._system_prompt_dir = system_prompt_dir
    self._system_prompts = self._load_system_prompts()

Methods:

fastvideo.entrypoints.streaming.PromptEnhancer.register_provider
register_provider(provider: LLMProvider, *, priority: int = -1) -> None

Insert an additional provider. priority=0 makes it primary; priority=-1 (default) appends as a fallback.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def register_provider(self, provider: LLMProvider, *, priority: int = -1) -> None:
    """Insert an additional provider. ``priority=0`` makes it primary;
    ``priority=-1`` (default) appends as a fallback."""
    if priority < 0:
        self._providers.append(provider)
    else:
        self._providers.insert(priority, provider)
fastvideo.entrypoints.streaming.PromptEnhancer.reload_system_prompts
reload_system_prompts() -> None

Re-read the system prompt files from system_prompt_dir.

The streaming server exposes this via a management endpoint so operators can iterate on prompt templates without restarting workers.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def reload_system_prompts(self) -> None:
    """Re-read the system prompt files from ``system_prompt_dir``.

    The streaming server exposes this via a management endpoint so
    operators can iterate on prompt templates without restarting
    workers.
    """
    self._system_prompts = self._load_system_prompts()
    logger.info("prompt enhancer: reloaded system prompts from %s", self._system_prompt_dir or "defaults")

fastvideo.entrypoints.streaming.PromptSafetyFilter

PromptSafetyFilter(*, classifier_path: str | None, enabled: bool = True, block_threshold: float = 0.5)

Minimal fastText-backed prompt safety filter.

Loads the classifier lazily on first use so the streaming server can construct the filter eagerly at startup without paying the model-load cost when safety is disabled.

Source code in fastvideo/entrypoints/streaming/prompt/safety.py
def __init__(
    self,
    *,
    classifier_path: str | None,
    enabled: bool = True,
    block_threshold: float = 0.5,
) -> None:
    self._classifier_path = classifier_path
    self._enabled = enabled
    self._block_threshold = block_threshold
    self._model: Any | None = None
    self._load_attempted = False
    self._load_lock = threading.Lock()

fastvideo.entrypoints.streaming.SafetyDecision

Bases: Enum

Attributes

fastvideo.entrypoints.streaming.SafetyDecision.UNAVAILABLE class-attribute instance-attribute
UNAVAILABLE = 'unavailable'

Returned when the classifier can't run (not configured, fastText missing). Safety is opt-in; the server treats UNAVAILABLE as ALLOW but logs it so operators know the filter is off.

fastvideo.entrypoints.streaming.Session dataclass

Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())

Methods:

fastvideo.entrypoints.streaming.Session.transition
transition(target: SessionState) -> None

Move to target if the edge is allowed.

Raises :class:InvalidSessionTransition on illegal moves. The self-loop on ACTIVE is legal so the server can re-assert ACTIVE on segment completion without special casing.

Source code in fastvideo/entrypoints/streaming/session.py
def transition(self, target: SessionState) -> None:
    """Move to ``target`` if the edge is allowed.

    Raises :class:`InvalidSessionTransition` on illegal moves. The
    self-loop on ``ACTIVE`` is legal so the server can re-assert
    ACTIVE on segment completion without special casing.
    """
    allowed = _VALID_TRANSITIONS.get(self.state, frozenset())
    if target not in allowed and target is not self.state:
        raise InvalidSessionTransition(f"{self.state.value} -> {target.value} is not a valid "
                                       f"session transition")
    self.state = target
    self.last_activity = time.monotonic()

fastvideo.entrypoints.streaming.SessionLogEvent dataclass

SessionLogEvent(session_id: str, event: str, payload: dict[str, Any] = dict(), ts: float = time())

One line in the session JSONL file.

fastvideo.entrypoints.streaming.SessionLogger

SessionLogger(log_dir: str | None)

Append-only JSONL logger keyed by session id.

Thread-safe; the server may be writing from multiple asyncio tasks (fMP4 encoder thread + control-frame handler) for the same session.

Source code in fastvideo/entrypoints/streaming/session_logger.py
def __init__(self, log_dir: str | None) -> None:
    self._log_dir = log_dir
    self._files: dict[str, TextIO] = {}
    self._locks: dict[str, threading.Lock] = {}
    self._registry_lock = threading.Lock()
    self._ensure_dir()

fastvideo.entrypoints.streaming.SessionManager

SessionManager(*, segment_cap: int, session_timeout_seconds: int, max_sessions: int = 1)

Registers sessions and enforces per-server session limits.

Source code in fastvideo/entrypoints/streaming/session.py
def __init__(
    self,
    *,
    segment_cap: int,
    session_timeout_seconds: int,
    max_sessions: int = 1,
) -> None:
    self._segment_cap = segment_cap
    self._session_timeout_seconds = session_timeout_seconds
    self._max_sessions = max_sessions
    self._sessions: dict[str, Session] = {}

Methods:

fastvideo.entrypoints.streaming.SessionManager.reap_timed_out
reap_timed_out(now: float | None = None) -> list[str]

Return the ids of sessions that have exceeded the idle timeout.

The caller is responsible for actually closing them — this method only identifies dead sessions so the server can emit session_timeout frames before dropping the WebSocket.

TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.

Source code in fastvideo/entrypoints/streaming/session.py
def reap_timed_out(self, now: float | None = None) -> list[str]:
    """Return the ids of sessions that have exceeded the idle timeout.

    The caller is responsible for actually closing them — this
    method only *identifies* dead sessions so the server can emit
    ``session_timeout`` frames before dropping the WebSocket.

    TODO: unused until a background driver calls it. Per-connection
    idle enforcement currently happens via asyncio.wait_for on
    receive_json; this helper catches sessions stuck before any
    receive (e.g. future QUEUED state) and is expected to be wired
    into the GPU-pool reaper.
    """
    now = now if now is not None else time.monotonic()
    dead: list[str] = []
    for sid, session in self._sessions.items():
        if session.state in {
                SessionState.COMPLETE,
                SessionState.ERROR,
                SessionState.TIMEOUT,
                SessionState.REJECTED,
        }:
            continue
        if now - session.last_activity > self._session_timeout_seconds:
            dead.append(sid)
    return dead

fastvideo.entrypoints.streaming.SessionState

Bases: Enum

State-machine positions for a streaming session.

Transitions are server-owned. See docs/design/server_contracts/streaming.md for the full diagram.

fastvideo.entrypoints.streaming.SessionStore

Bases: ABC

Keyed store for per-session continuation state.

Implementations own the session-id → state mapping. The streaming server calls :meth:store after each segment and :meth:snapshot when a client explicitly asks for an exportable state handle.

Methods:

fastvideo.entrypoints.streaming.SessionStore.drop abstractmethod
drop(session_id: str) -> None

Forget a session. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, session_id: str) -> None:
    """Forget a session. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.SessionStore.hydrate abstractmethod
hydrate(state: ContinuationState, *, session_id: str | None = None) -> str

Install state as the starting point for a session.

When session_id is None the store allocates a fresh id (UUID4); when provided the store uses it verbatim, overwriting any prior state at that id.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def hydrate(
    self,
    state: ContinuationState,
    *,
    session_id: str | None = None,
) -> str:
    """Install ``state`` as the starting point for a session.

    When ``session_id`` is ``None`` the store allocates a fresh id
    (UUID4); when provided the store uses it verbatim, overwriting
    any prior state at that id.
    """
fastvideo.entrypoints.streaming.SessionStore.snapshot abstractmethod
snapshot(session_id: str) -> ContinuationState | None

Return the current state for session_id (or None).

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def snapshot(self, session_id: str) -> ContinuationState | None:
    """Return the current state for ``session_id`` (or ``None``)."""
fastvideo.entrypoints.streaming.SessionStore.store abstractmethod
store(session_id: str, state: ContinuationState) -> None

Persist state for session_id, replacing any prior value.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def store(self, session_id: str, state: ContinuationState) -> None:
    """Persist ``state`` for ``session_id``, replacing any prior value."""

fastvideo.entrypoints.streaming.SubprocessGpuPool

SubprocessGpuPool(generator_config: GeneratorConfig, *, pool_config: GpuPoolConfig, warmup_config: WarmupConfig | None = None, session_store: SessionStore | None = None, worker_factory: WorkerFactory | None = None)

Bases: GpuPool

One multiprocessing.Process per GPU.

Each worker boots :class:fastvideo.VideoGenerator from a typed :class:GeneratorConfig inside the child process (post- CUDA_VISIBLE_DEVICES setup) and consumes jobs from an mp Queue.

This is the production shape: the parent process stays CPU-only, and GPU state never crosses process boundaries. Continuation state is serialized through :class:SessionStore for cross-GPU handoff.

PR 7.6 ships this as an opt-in; PR 7.5's in-process pool remains the default until nightly runs validate the subprocess path.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
def __init__(
    self,
    generator_config: GeneratorConfig,
    *,
    pool_config: GpuPoolConfig,
    warmup_config: WarmupConfig | None = None,
    session_store: SessionStore | None = None,
    worker_factory: WorkerFactory | None = None,
) -> None:
    self._generator_config = generator_config
    self._pool_config = pool_config
    self._warmup_config = warmup_config or WarmupConfig()
    self._session_store = session_store or InMemorySessionStore()
    self._worker_factory = worker_factory or _default_worker_factory
    self._workers: list[_WorkerHandle] = []
    self._available: asyncio.Queue[int] = asyncio.Queue()
    self._assignments: dict[str, PoolAssignment] = {}
    self._worker_by_id: dict[str, _WorkerHandle] = {}
    self._pending: dict[str, _PendingJob] = {}
    self._lock = asyncio.Lock()
    self._result_reader_tasks: list[asyncio.Task] = []

Methods:

fastvideo.entrypoints.streaming.SubprocessGpuPool.start async
start() -> None

Spawn worker processes and wait for each to report ready.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
async def start(self) -> None:
    """Spawn worker processes and wait for each to report ready."""
    num_workers = self._pool_config.num_workers or 1
    for gpu_id in range(num_workers):
        handle = self._worker_factory(
            gpu_id=gpu_id,
            generator_config=self._generator_config,
            warmup_config=self._warmup_config,
        )
        self._workers.append(handle)
        self._worker_by_id[handle.worker_id] = handle

    # Wait for each worker's ready event in a thread to avoid
    # blocking the event loop.
    loop = asyncio.get_running_loop()
    await asyncio.gather(*[
        loop.run_in_executor(None, handle.ready.wait, self._warmup_config.timeout_seconds)
        for handle in self._workers
    ])

    # Start background result readers — one task per worker
    # drains its result queue and resolves futures in _pending.
    for handle in self._workers:
        task = asyncio.create_task(self._drain_results(handle))
        self._result_reader_tasks.append(task)

    # Only admit workers that successfully booted. Anything that
    # failed boot (timeout, crash, error sentinel) stays out of the
    # available queue so we never assign a session to it.
    for idx, handle in enumerate(self._workers):
        if handle.boot_ok.is_set():
            await self._available.put(idx)
        else:
            logger.error(
                "pool: worker %s failed to boot; skipping",
                handle.worker_id,
            )

Functions:

fastvideo.entrypoints.streaming.build_app

build_app(serve_config: ServeConfig, generator: _GeneratorProto | None = None, *, pool: GpuPool | None = None, session_store: SessionStore | None = None) -> FastAPI

Build the FastAPI app used by :func:run_server.

Exposed so tests can drive the WebSocket endpoint in-process via starlette.testclient.TestClient(app).websocket_connect(...).

Exactly one of generator (backed by :class:InProcessGpuPool) or pool (for the subprocess-backed production shape) must be given.

Source code in fastvideo/entrypoints/streaming/server.py
def build_app(
    serve_config: ServeConfig,
    generator: _GeneratorProto | None = None,
    *,
    pool: GpuPool | None = None,
    session_store: SessionStore | None = None,
) -> FastAPI:
    """Build the FastAPI app used by :func:`run_server`.

    Exposed so tests can drive the WebSocket endpoint in-process via
    ``starlette.testclient.TestClient(app).websocket_connect(...)``.

    Exactly one of ``generator`` (backed by :class:`InProcessGpuPool`)
    or ``pool`` (for the subprocess-backed production shape) must be
    given.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming "
                         "server; got None. Add a `streaming:` block to your serve config.")
    streaming = serve_config.streaming
    if (generator is None) == (pool is None):
        raise ValueError("build_app requires exactly one of `generator` or `pool`")

    store = session_store or InMemorySessionStore()
    if pool is None:
        assert generator is not None
        pool = InProcessGpuPool(generator, session_store=store)

    sessions = SessionManager(
        segment_cap=serve_config.streaming.generation_segment_cap,
        session_timeout_seconds=serve_config.streaming.session_timeout_seconds,
    )
    state = ServerState(
        serve_config=serve_config,
        pool=pool,
        sessions=sessions,
        session_store=store,
    )

    app = FastAPI(title="FastVideo Streaming")

    @app.get("/health")
    async def _health() -> JSONResponse:
        return JSONResponse({
            "status": "ok",
            "sessions": len(state.sessions),
            "stream_mode": streaming.stream_mode,
        })

    app.include_router(build_health_router(pool))

    @app.websocket("/v1/stream")
    async def _stream(websocket: WebSocket) -> None:
        await websocket.accept()
        try:
            session = state.sessions.create()
        except SessionRejected as exc:
            await _send_error(websocket, "session_rejected", str(exc), retryable=False)
            await websocket.close(code=_WS_CLOSE_TRY_AGAIN_LATER, reason="session_rejected")
            return

        try:
            await _handle_session(websocket, session, state)
        except WebSocketDisconnect:
            logger.info("session %s: client disconnected", session.id[:8])
        except Exception:  # pragma: no cover - defensive catch-all
            logger.exception("session %s: unhandled error", session.id[:8])
            with contextlib.suppress(InvalidSessionTransition):
                session.transition(SessionState.ERROR)
        finally:
            with contextlib.suppress(Exception):
                await state.pool.release(session.id)
            _cleanup_session(session, state)

    app.state.server_state = state
    return app

fastvideo.entrypoints.streaming.build_health_router

build_health_router(pool: PoolRef = None) -> APIRouter

Build a router exposing streaming liveness/readiness endpoints.

pool may be either a concrete pool or a zero-argument callable that returns the current pool. The callable form lets product servers keep their own lifespan-managed runtime singleton without adding public global state.

Source code in fastvideo/entrypoints/streaming/health.py
def build_health_router(pool: PoolRef = None) -> APIRouter:
    """Build a router exposing streaming liveness/readiness endpoints.

    ``pool`` may be either a concrete pool or a zero-argument callable that
    returns the current pool. The callable form lets product servers keep their
    own lifespan-managed runtime singleton without adding public global state.
    """
    router = APIRouter()

    @router.get("/health")
    @router.get("/healthz")
    async def get_healthz() -> dict[str, Any]:
        """Liveness probe for process-level health."""
        return {
            "status": "ok",
            "service": SERVICE_NAME,
            "ts": _utc_now_iso(),
        }

    @router.get("/readyz")
    async def get_readyz() -> dict[str, Any]:
        """Readiness probe for router/load-balancer health checks."""
        status_payload = await get_pool_status(pool)
        ready_workers = _ready_worker_count(status_payload)
        return {
            "status": "ready" if ready_workers > 0 else "warming",
            "service": SERVICE_NAME,
            "ready_gpu_workers": ready_workers,
            "total_gpus": _as_int(status_payload.get("total_gpus")),
            "available_gpus": _as_int(status_payload.get("available_gpus")),
            "warmup_successful_gpus": _as_int(status_payload.get("warmup_successful_gpus")),
            "warmup_failed_gpus": _as_int(status_payload.get("warmup_failed_gpus")),
            "queue_size": _as_int(status_payload.get("queue_size")),
            "ts": _utc_now_iso(),
        }

    @router.get("/status")
    async def get_status() -> dict[str, Any]:
        """Get the current status of the GPU pool."""
        return await get_pool_status(pool)

    return router

fastvideo.entrypoints.streaming.build_mock_app

build_mock_app(*, sleep_ms: float = 0.0)

Build a FastAPI app backed by :class:MockGenerator.

Source code in fastvideo/entrypoints/streaming/mock_server.py
def build_mock_app(*, sleep_ms: float = 0.0):
    """Build a FastAPI app backed by :class:`MockGenerator`."""
    serve_config = ServeConfig(
        generator=GeneratorConfig(model_path="/models/mock"),
        streaming=StreamingConfig(
            session_timeout_seconds=120,
            generation_segment_cap=6,
        ),
    )
    serve_config.default_request.sampling = SamplingConfig(
        num_frames=24,
        height=256,
        width=256,
        fps=24,
        num_inference_steps=1,
    )
    return build_app(serve_config, MockGenerator(sleep_ms=sleep_ms))

fastvideo.entrypoints.streaming.get_pool_status async

get_pool_status(pool: PoolRef = None) -> dict[str, Any]

Return the generic GPU pool status payload used by /status.

Source code in fastvideo/entrypoints/streaming/health.py
async def get_pool_status(pool: PoolRef = None) -> dict[str, Any]:
    """Return the generic GPU pool status payload used by ``/status``."""
    resolved = _resolve_pool(pool)
    if resolved is None:
        return _zero_pool_status()
    if _has_get_status(resolved):
        return dict(resolved.get_status())
    if _has_health(resolved):
        return _status_from_health(resolved.health())
    return _zero_pool_status()

fastvideo.entrypoints.streaming.run_server

run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None

Launch the streaming server.

Boots a :class:fastvideo.VideoGenerator from serve_config.generator unless generator is provided, then serves build_app(...) via uvicorn.

Source code in fastvideo/entrypoints/streaming/server.py
def run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None:
    """Launch the streaming server.

    Boots a :class:`fastvideo.VideoGenerator` from
    ``serve_config.generator`` unless ``generator`` is provided, then
    serves ``build_app(...)`` via uvicorn.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming server; "
                         "got None. Add a `streaming:` block to your serve config.")

    import uvicorn

    if generator is None:
        from fastvideo import VideoGenerator  # lazy to avoid boot cost

        generator = VideoGenerator.from_pretrained(config=serve_config.generator)
    app = build_app(serve_config, generator)
    uvicorn.run(
        app,
        host=serve_config.server.host,
        port=serve_config.server.port,
    )

Modules

fastvideo.entrypoints.streaming.gpu_pool

GPU pool manager for the streaming server.

Replaces the single-generator path in PR 7.5 with a typed pool abstraction. Three implementations ship here:

  • :class:InProcessGpuPool — one in-process VideoGenerator; used by tests and single-GPU dev deployments.
  • :class:SubprocessGpuPool — one multiprocessing.Process per GPU, each running :func:worker_main against a GeneratorConfig. Jobs are dispatched via multiprocessing.Queue.
  • :class:GpuPool (abstract) — the interface both use.

Session-to-GPU binding lives in the pool so continuation state stays on the GPU that generated the previous segment (matching the internal gpu_pool.py's per-GPU cache behavior). Cross-GPU handoff is supported via :class:SessionStore snapshot + hydrate, which serializes the state before the migration and rehydrates it on the new worker.

Typed config: workers start from a :class:GeneratorConfig (no flat LTX-2 kwargs), satisfying the PR 6 + PR 7 contracts that the public surface doesn't reintroduce the legacy kwarg bag.

Classes

fastvideo.entrypoints.streaming.gpu_pool.GpuPool

Bases: ABC

Abstract GPU pool.

acquire binds a session to a worker and holds that binding across segments so continuation state can stay hot. run submits a single GenerationRequest for a bound session.

Acquire / release are independent of run — a session can run many segments on one acquired worker, and must release on disconnect.

fastvideo.entrypoints.streaming.gpu_pool.InProcessGpuPool
InProcessGpuPool(generator: _GeneratorLike, *, gpu_id: int = 0, session_store: SessionStore | None = None)

Bases: GpuPool

Single-process pool backed by one :class:_GeneratorLike.

This is what PR 7.5's server uses by default; PR 7.6 adds the real SubprocessGpuPool alternative but keeps this one for tests and small deployments.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
def __init__(
    self,
    generator: _GeneratorLike,
    *,
    gpu_id: int = 0,
    session_store: SessionStore | None = None,
) -> None:
    self._generator = generator
    self._gpu_id = gpu_id
    self._worker_id = f"inproc-{uuid.uuid4().hex[:6]}"
    self._session_store = session_store or InMemorySessionStore()
    self._active: dict[str, PoolAssignment] = {}
    self._lock = asyncio.Lock()
    self._gen_lock = asyncio.Lock()
fastvideo.entrypoints.streaming.gpu_pool.PoolAcquireTimeout

Bases: RuntimeError

Raised when acquire times out waiting for a free worker.

fastvideo.entrypoints.streaming.gpu_pool.PoolAssignment dataclass
PoolAssignment(gpu_id: int, worker_id: str, pinned_at: float = monotonic())

The worker a session is currently bound to.

fastvideo.entrypoints.streaming.gpu_pool.SubprocessGpuPool
SubprocessGpuPool(generator_config: GeneratorConfig, *, pool_config: GpuPoolConfig, warmup_config: WarmupConfig | None = None, session_store: SessionStore | None = None, worker_factory: WorkerFactory | None = None)

Bases: GpuPool

One multiprocessing.Process per GPU.

Each worker boots :class:fastvideo.VideoGenerator from a typed :class:GeneratorConfig inside the child process (post- CUDA_VISIBLE_DEVICES setup) and consumes jobs from an mp Queue.

This is the production shape: the parent process stays CPU-only, and GPU state never crosses process boundaries. Continuation state is serialized through :class:SessionStore for cross-GPU handoff.

PR 7.6 ships this as an opt-in; PR 7.5's in-process pool remains the default until nightly runs validate the subprocess path.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
def __init__(
    self,
    generator_config: GeneratorConfig,
    *,
    pool_config: GpuPoolConfig,
    warmup_config: WarmupConfig | None = None,
    session_store: SessionStore | None = None,
    worker_factory: WorkerFactory | None = None,
) -> None:
    self._generator_config = generator_config
    self._pool_config = pool_config
    self._warmup_config = warmup_config or WarmupConfig()
    self._session_store = session_store or InMemorySessionStore()
    self._worker_factory = worker_factory or _default_worker_factory
    self._workers: list[_WorkerHandle] = []
    self._available: asyncio.Queue[int] = asyncio.Queue()
    self._assignments: dict[str, PoolAssignment] = {}
    self._worker_by_id: dict[str, _WorkerHandle] = {}
    self._pending: dict[str, _PendingJob] = {}
    self._lock = asyncio.Lock()
    self._result_reader_tasks: list[asyncio.Task] = []
Methods:
fastvideo.entrypoints.streaming.gpu_pool.SubprocessGpuPool.start async
start() -> None

Spawn worker processes and wait for each to report ready.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
async def start(self) -> None:
    """Spawn worker processes and wait for each to report ready."""
    num_workers = self._pool_config.num_workers or 1
    for gpu_id in range(num_workers):
        handle = self._worker_factory(
            gpu_id=gpu_id,
            generator_config=self._generator_config,
            warmup_config=self._warmup_config,
        )
        self._workers.append(handle)
        self._worker_by_id[handle.worker_id] = handle

    # Wait for each worker's ready event in a thread to avoid
    # blocking the event loop.
    loop = asyncio.get_running_loop()
    await asyncio.gather(*[
        loop.run_in_executor(None, handle.ready.wait, self._warmup_config.timeout_seconds)
        for handle in self._workers
    ])

    # Start background result readers — one task per worker
    # drains its result queue and resolves futures in _pending.
    for handle in self._workers:
        task = asyncio.create_task(self._drain_results(handle))
        self._result_reader_tasks.append(task)

    # Only admit workers that successfully booted. Anything that
    # failed boot (timeout, crash, error sentinel) stays out of the
    # available queue so we never assign a session to it.
    for idx, handle in enumerate(self._workers):
        if handle.boot_ok.is_set():
            await self._available.put(idx)
        else:
            logger.error(
                "pool: worker %s failed to boot; skipping",
                handle.worker_id,
            )

Functions:

fastvideo.entrypoints.streaming.gpu_pool.worker_main
worker_main(*, gpu_id: int, worker_id: str, generator_config: GeneratorConfig, warmup_config: WarmupConfig, job_queue: Queue, result_queue: Queue, shutdown_event: Any) -> None

Per-worker subprocess entry.

Runs inside the child spawned by SubprocessGpuPool. Blocking VideoGenerator construction + generation happens here, not in the parent's event loop.

Source code in fastvideo/entrypoints/streaming/worker.py
def worker_main(
    *,
    gpu_id: int,
    worker_id: str,
    generator_config: GeneratorConfig,
    warmup_config: WarmupConfig,
    job_queue: mp.Queue,
    result_queue: mp.Queue,
    shutdown_event: Any,
) -> None:  # pragma: no cover - exercised via integration only
    """Per-worker subprocess entry.

    Runs inside the child spawned by ``SubprocessGpuPool``. Blocking
    ``VideoGenerator`` construction + generation happens here, not in
    the parent's event loop.
    """
    import os

    os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
    try:
        from fastvideo import VideoGenerator

        generator = VideoGenerator.from_pretrained(config=generator_config)
        if warmup_config.enabled:
            _warmup_worker(generator, warmup_config)
        result_queue.put({"kind": "ready", "worker_id": worker_id})
    except Exception as exc:
        result_queue.put({"kind": "error", "error": repr(exc)})
        return

    while not shutdown_event.is_set():
        try:
            item = job_queue.get(timeout=0.5)
        except queue.Empty:
            continue
        if item is None:
            break
        job_id = item["job_id"]
        request = item["request"]
        try:
            result = generator.generate(request)
            result_queue.put({
                "kind": "result",
                "job_id": job_id,
                "result": result,
            })
        except Exception as exc:
            result_queue.put({
                "kind": "error",
                "job_id": job_id,
                "error": repr(exc),
            })

fastvideo.entrypoints.streaming.health

Health, readiness, and status routes for streaming servers.

Functions:

fastvideo.entrypoints.streaming.health.build_health_router
build_health_router(pool: PoolRef = None) -> APIRouter

Build a router exposing streaming liveness/readiness endpoints.

pool may be either a concrete pool or a zero-argument callable that returns the current pool. The callable form lets product servers keep their own lifespan-managed runtime singleton without adding public global state.

Source code in fastvideo/entrypoints/streaming/health.py
def build_health_router(pool: PoolRef = None) -> APIRouter:
    """Build a router exposing streaming liveness/readiness endpoints.

    ``pool`` may be either a concrete pool or a zero-argument callable that
    returns the current pool. The callable form lets product servers keep their
    own lifespan-managed runtime singleton without adding public global state.
    """
    router = APIRouter()

    @router.get("/health")
    @router.get("/healthz")
    async def get_healthz() -> dict[str, Any]:
        """Liveness probe for process-level health."""
        return {
            "status": "ok",
            "service": SERVICE_NAME,
            "ts": _utc_now_iso(),
        }

    @router.get("/readyz")
    async def get_readyz() -> dict[str, Any]:
        """Readiness probe for router/load-balancer health checks."""
        status_payload = await get_pool_status(pool)
        ready_workers = _ready_worker_count(status_payload)
        return {
            "status": "ready" if ready_workers > 0 else "warming",
            "service": SERVICE_NAME,
            "ready_gpu_workers": ready_workers,
            "total_gpus": _as_int(status_payload.get("total_gpus")),
            "available_gpus": _as_int(status_payload.get("available_gpus")),
            "warmup_successful_gpus": _as_int(status_payload.get("warmup_successful_gpus")),
            "warmup_failed_gpus": _as_int(status_payload.get("warmup_failed_gpus")),
            "queue_size": _as_int(status_payload.get("queue_size")),
            "ts": _utc_now_iso(),
        }

    @router.get("/status")
    async def get_status() -> dict[str, Any]:
        """Get the current status of the GPU pool."""
        return await get_pool_status(pool)

    return router
fastvideo.entrypoints.streaming.health.get_pool_status async
get_pool_status(pool: PoolRef = None) -> dict[str, Any]

Return the generic GPU pool status payload used by /status.

Source code in fastvideo/entrypoints/streaming/health.py
async def get_pool_status(pool: PoolRef = None) -> dict[str, Any]:
    """Return the generic GPU pool status payload used by ``/status``."""
    resolved = _resolve_pool(pool)
    if resolved is None:
        return _zero_pool_status()
    if _has_get_status(resolved):
        return dict(resolved.get_status())
    if _has_health(resolved):
        return _status_from_health(resolved.health())
    return _zero_pool_status()

fastvideo.entrypoints.streaming.mock_server

Mock streaming server — a frontend dev aid.

Boots the same FastAPI app the real streaming server uses, but backs it with :class:InProcessGpuPool wrapping a synthetic generator that emits pre-baked RGB frames. No GPU or model weights required.

Use cases:

  • Frontend development without a real model loaded.
  • Integration tests that exercise the WS protocol end-to-end.
  • Reproducing protocol bugs locally.

Launch: python -m fastvideo.entrypoints.streaming.mock_server.

Classes

fastvideo.entrypoints.streaming.mock_server.MockGenerator dataclass
MockGenerator(sleep_ms: float = 0.0)

Generator stand-in that returns synthetic gradient frames.

Each call produces one segment worth of frames whose pixels vary by a constant derived from the request seed and segment index. Latency is configurable via sleep_ms so the caller can exercise slow- generate scenarios without spinning a GPU.

Functions:

fastvideo.entrypoints.streaming.mock_server.build_mock_app
build_mock_app(*, sleep_ms: float = 0.0)

Build a FastAPI app backed by :class:MockGenerator.

Source code in fastvideo/entrypoints/streaming/mock_server.py
def build_mock_app(*, sleep_ms: float = 0.0):
    """Build a FastAPI app backed by :class:`MockGenerator`."""
    serve_config = ServeConfig(
        generator=GeneratorConfig(model_path="/models/mock"),
        streaming=StreamingConfig(
            session_timeout_seconds=120,
            generation_segment_cap=6,
        ),
    )
    serve_config.default_request.sampling = SamplingConfig(
        num_frames=24,
        height=256,
        width=256,
        fps=24,
        num_inference_steps=1,
    )
    return build_app(serve_config, MockGenerator(sleep_ms=sleep_ms))

fastvideo.entrypoints.streaming.prompt

Prompt pipeline for the streaming server.

  • :mod:providers — LLM backend abstraction + built-in adapters
  • :mod:enhancer — provider-agnostic enhance / auto-extend / rewrite operations on top of the provider layer

All of this is optional; the streaming server runs fine without it (PR 7.5's skeleton never invokes the enhancer). When the operator enables ServeConfig.streaming.prompt.enabled, the server routes each session_init_v2 curated prompt through enhance before the first segment.

Classes

fastvideo.entrypoints.streaming.prompt.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.prompt.LLMProviderError
LLMProviderError(message: str, *, retryable: bool = True)

Bases: RuntimeError

Raised when an LLM provider fails a request.

retryable controls whether the enhancer falls back to the next provider. It is settable per-instance so the same exception type can describe retryable transport errors (5xx, 429) and non-retryable client errors (4xx auth/bad-request) without forcing a separate subclass for every status family.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str, *, retryable: bool = True) -> None:
    super().__init__(message)
    self.retryable = retryable
fastvideo.entrypoints.streaming.prompt.LLMTimeoutError
LLMTimeoutError(message: str)

Bases: LLMProviderError

Raised when an LLM provider times out — always retryable.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str) -> None:
    super().__init__(message, retryable=True)
fastvideo.entrypoints.streaming.prompt.PromptEnhancer
PromptEnhancer(*, providers: Sequence[LLMProvider], model: str, timeout_ms: int = 20000, temperature: float = 0.7, max_tokens: int | None = 256, system_prompt_dir: str | None = None)

Orchestrates prompt operations across a priority-ordered provider list with structured fallback + hot-reloadable system prompts.

Usage::

enhancer = PromptEnhancer(
    providers=[CerebrasProvider(), GroqProvider()],
    model="gpt-oss-120b",
    system_prompt_dir="/etc/fastvideo/prompts",
)
response = await enhancer.enhance("a fox running through snow")
Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def __init__(
    self,
    *,
    providers: Sequence[LLMProvider],
    model: str,
    timeout_ms: int = 20000,
    temperature: float = 0.7,
    max_tokens: int | None = 256,
    system_prompt_dir: str | None = None,
) -> None:
    if not providers:
        raise ValueError("PromptEnhancer requires at least one LLMProvider")
    self._providers = list(providers)
    self._model = model
    self._timeout_ms = timeout_ms
    self._temperature = temperature
    self._max_tokens = max_tokens
    self._system_prompt_dir = system_prompt_dir
    self._system_prompts = self._load_system_prompts()
Methods:
fastvideo.entrypoints.streaming.prompt.PromptEnhancer.register_provider
register_provider(provider: LLMProvider, *, priority: int = -1) -> None

Insert an additional provider. priority=0 makes it primary; priority=-1 (default) appends as a fallback.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def register_provider(self, provider: LLMProvider, *, priority: int = -1) -> None:
    """Insert an additional provider. ``priority=0`` makes it primary;
    ``priority=-1`` (default) appends as a fallback."""
    if priority < 0:
        self._providers.append(provider)
    else:
        self._providers.insert(priority, provider)
fastvideo.entrypoints.streaming.prompt.PromptEnhancer.reload_system_prompts
reload_system_prompts() -> None

Re-read the system prompt files from system_prompt_dir.

The streaming server exposes this via a management endpoint so operators can iterate on prompt templates without restarting workers.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def reload_system_prompts(self) -> None:
    """Re-read the system prompt files from ``system_prompt_dir``.

    The streaming server exposes this via a management endpoint so
    operators can iterate on prompt templates without restarting
    workers.
    """
    self._system_prompts = self._load_system_prompts()
    logger.info("prompt enhancer: reloaded system prompts from %s", self._system_prompt_dir or "defaults")

Modules

fastvideo.entrypoints.streaming.prompt.enhancer

Provider-agnostic prompt orchestration for the streaming server.

Three operations the streaming server needs:

  • enhance — polish a user prompt (add cinematic detail, fix syntax)
  • auto_extend — generate a follow-on prompt for loop generation
  • rewrite — rewrite a seed prompt for a user-directed rewrite flow

All three share the same orchestration: pick a provider in priority order, submit an LLMRequest, fall back to the next provider on retryable errors, and surface a structured :class:LLMResponse back to the caller.

System prompts are loaded from system_prompt_dir on construction and can be hot-reloaded via :meth:PromptEnhancer.reload_system_prompts. The streaming server's management endpoint calls that method in response to a rewrite_seed_prompts_started frame.

Classes
fastvideo.entrypoints.streaming.prompt.enhancer.PromptEnhancer
PromptEnhancer(*, providers: Sequence[LLMProvider], model: str, timeout_ms: int = 20000, temperature: float = 0.7, max_tokens: int | None = 256, system_prompt_dir: str | None = None)

Orchestrates prompt operations across a priority-ordered provider list with structured fallback + hot-reloadable system prompts.

Usage::

enhancer = PromptEnhancer(
    providers=[CerebrasProvider(), GroqProvider()],
    model="gpt-oss-120b",
    system_prompt_dir="/etc/fastvideo/prompts",
)
response = await enhancer.enhance("a fox running through snow")
Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def __init__(
    self,
    *,
    providers: Sequence[LLMProvider],
    model: str,
    timeout_ms: int = 20000,
    temperature: float = 0.7,
    max_tokens: int | None = 256,
    system_prompt_dir: str | None = None,
) -> None:
    if not providers:
        raise ValueError("PromptEnhancer requires at least one LLMProvider")
    self._providers = list(providers)
    self._model = model
    self._timeout_ms = timeout_ms
    self._temperature = temperature
    self._max_tokens = max_tokens
    self._system_prompt_dir = system_prompt_dir
    self._system_prompts = self._load_system_prompts()
Methods:
fastvideo.entrypoints.streaming.prompt.enhancer.PromptEnhancer.register_provider
register_provider(provider: LLMProvider, *, priority: int = -1) -> None

Insert an additional provider. priority=0 makes it primary; priority=-1 (default) appends as a fallback.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def register_provider(self, provider: LLMProvider, *, priority: int = -1) -> None:
    """Insert an additional provider. ``priority=0`` makes it primary;
    ``priority=-1`` (default) appends as a fallback."""
    if priority < 0:
        self._providers.append(provider)
    else:
        self._providers.insert(priority, provider)
fastvideo.entrypoints.streaming.prompt.enhancer.PromptEnhancer.reload_system_prompts
reload_system_prompts() -> None

Re-read the system prompt files from system_prompt_dir.

The streaming server exposes this via a management endpoint so operators can iterate on prompt templates without restarting workers.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def reload_system_prompts(self) -> None:
    """Re-read the system prompt files from ``system_prompt_dir``.

    The streaming server exposes this via a management endpoint so
    operators can iterate on prompt templates without restarting
    workers.
    """
    self._system_prompts = self._load_system_prompts()
    logger.info("prompt enhancer: reloaded system prompts from %s", self._system_prompt_dir or "defaults")
Functions:
fastvideo.entrypoints.streaming.prompt.providers

LLM provider implementations used by the prompt enhancer.

Classes
fastvideo.entrypoints.streaming.prompt.providers.CerebrasProvider dataclass
CerebrasProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'cerebras')

Cerebras inference adapter.

api_key falls back to CEREBRAS_API_KEY when unset.

fastvideo.entrypoints.streaming.prompt.providers.GroqProvider dataclass
GroqProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'groq')

Groq inference adapter.

Identical wire format to :class:CerebrasProvider; both go through :func:complete_openai_compatible. The two providers differ only in base URL, env var, and model id conventions.

fastvideo.entrypoints.streaming.prompt.providers.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.prompt.providers.LLMProviderError
LLMProviderError(message: str, *, retryable: bool = True)

Bases: RuntimeError

Raised when an LLM provider fails a request.

retryable controls whether the enhancer falls back to the next provider. It is settable per-instance so the same exception type can describe retryable transport errors (5xx, 429) and non-retryable client errors (4xx auth/bad-request) without forcing a separate subclass for every status family.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str, *, retryable: bool = True) -> None:
    super().__init__(message)
    self.retryable = retryable
fastvideo.entrypoints.streaming.prompt.providers.LLMTimeoutError
LLMTimeoutError(message: str)

Bases: LLMProviderError

Raised when an LLM provider times out — always retryable.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str) -> None:
    super().__init__(message, retryable=True)
Modules
fastvideo.entrypoints.streaming.prompt.providers.base

LLM provider protocol + DTOs used by the prompt enhancer.

Third-party users add a new provider by implementing :class:LLMProvider and registering it with a prompt enhancer instance. The shipped providers live in sibling modules (cerebras.py, groq.py) and each is ~100-200 LOC — the provider layer is intentionally thin so the enhancer stays provider-agnostic.

Classes
fastvideo.entrypoints.streaming.prompt.providers.base.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.prompt.providers.base.LLMProviderError
LLMProviderError(message: str, *, retryable: bool = True)

Bases: RuntimeError

Raised when an LLM provider fails a request.

retryable controls whether the enhancer falls back to the next provider. It is settable per-instance so the same exception type can describe retryable transport errors (5xx, 429) and non-retryable client errors (4xx auth/bad-request) without forcing a separate subclass for every status family.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str, *, retryable: bool = True) -> None:
    super().__init__(message)
    self.retryable = retryable
fastvideo.entrypoints.streaming.prompt.providers.base.LLMTimeoutError
LLMTimeoutError(message: str)

Bases: LLMProviderError

Raised when an LLM provider times out — always retryable.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str) -> None:
    super().__init__(message, retryable=True)
fastvideo.entrypoints.streaming.prompt.providers.cerebras

Cerebras LLM provider (OpenAI-compatible chat endpoint).

Classes
fastvideo.entrypoints.streaming.prompt.providers.cerebras.CerebrasProvider dataclass
CerebrasProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'cerebras')

Cerebras inference adapter.

api_key falls back to CEREBRAS_API_KEY when unset.

Functions:
fastvideo.entrypoints.streaming.prompt.providers.groq

Groq LLM provider (OpenAI-compatible chat endpoint).

Classes
fastvideo.entrypoints.streaming.prompt.providers.groq.GroqProvider dataclass
GroqProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'groq')

Groq inference adapter.

Identical wire format to :class:CerebrasProvider; both go through :func:complete_openai_compatible. The two providers differ only in base URL, env var, and model id conventions.

Functions:
fastvideo.entrypoints.streaming.prompt.rewrite

Rewrite payload builder.

The UI's "rewrite seed prompts" flow asks the enhancer to produce a batch of alternative prompts given one seed. This module packages the seed + options into the payload the enhancer expects and unpacks the response back into a typed :class:RewriteResult.

Separating this from :mod:enhancer keeps the enhancer provider- agnostic; anything UI-specific (how many alternatives to request, how to split the response, temperature) lives here.

Classes
fastvideo.entrypoints.streaming.prompt.rewrite.RewriteOptions dataclass
RewriteOptions(count: int = 3, temperature: float | None = None)
Attributes
fastvideo.entrypoints.streaming.prompt.rewrite.RewriteOptions.count class-attribute instance-attribute
count: int = 3

Number of alternative prompts to request.

Functions:
fastvideo.entrypoints.streaming.prompt.rewrite.build_rewrite async
build_rewrite(enhancer: PromptEnhancer, seed_prompt: str, *, options: RewriteOptions | None = None) -> RewriteResult

Run a rewrite op through the enhancer and return a typed result.

Source code in fastvideo/entrypoints/streaming/prompt/rewrite.py
async def build_rewrite(
    enhancer: PromptEnhancer,
    seed_prompt: str,
    *,
    options: RewriteOptions | None = None,
) -> RewriteResult:
    """Run a rewrite op through the enhancer and return a typed result."""
    if not seed_prompt.strip():
        raise ValueError("rewrite seed prompt must be non-empty")
    options = options or RewriteOptions()
    response = await enhancer.rewrite(seed_prompt)
    alternatives = _split_response(response.content, limit=options.count)
    return RewriteResult(
        seed_prompt=seed_prompt,
        alternatives=alternatives,
        provider=response.provider,
        model=response.model,
        latency_ms=response.latency_ms,
        fallback_used=response.fallback_used,
    )
fastvideo.entrypoints.streaming.prompt.safety

Optional prompt safety filter.

Uses a fastText classifier to score prompts against a banned-content rubric. Only loaded when ServeConfig.streaming.safety.enabled is True and fastText is installed — users who don't need it see no runtime cost.

Install: pip install fastvideo[prompt-safety] (ships fasttext as an optional extra) or install fasttext directly.

Classes
fastvideo.entrypoints.streaming.prompt.safety.PromptSafetyFilter
PromptSafetyFilter(*, classifier_path: str | None, enabled: bool = True, block_threshold: float = 0.5)

Minimal fastText-backed prompt safety filter.

Loads the classifier lazily on first use so the streaming server can construct the filter eagerly at startup without paying the model-load cost when safety is disabled.

Source code in fastvideo/entrypoints/streaming/prompt/safety.py
def __init__(
    self,
    *,
    classifier_path: str | None,
    enabled: bool = True,
    block_threshold: float = 0.5,
) -> None:
    self._classifier_path = classifier_path
    self._enabled = enabled
    self._block_threshold = block_threshold
    self._model: Any | None = None
    self._load_attempted = False
    self._load_lock = threading.Lock()
fastvideo.entrypoints.streaming.prompt.safety.SafetyDecision

Bases: Enum

Attributes
fastvideo.entrypoints.streaming.prompt.safety.SafetyDecision.UNAVAILABLE class-attribute instance-attribute
UNAVAILABLE = 'unavailable'

Returned when the classifier can't run (not configured, fastText missing). Safety is opt-in; the server treats UNAVAILABLE as ALLOW but logs it so operators know the filter is off.

Functions:
fastvideo.entrypoints.streaming.prompt.safety.first_blocked
first_blocked(filter_: PromptSafetyFilter, prompts: list[str]) -> SafetyResult | None

Return the first prompt the filter blocks, or None.

Source code in fastvideo/entrypoints/streaming/prompt/safety.py
def first_blocked(
    filter_: PromptSafetyFilter,
    prompts: list[str],
) -> SafetyResult | None:
    """Return the first prompt the filter blocks, or ``None``."""
    for prompt in prompts:
        result = filter_.classify(prompt)
        if result.decision is SafetyDecision.BLOCK:
            return result
    return None

fastvideo.entrypoints.streaming.protocol

JSON WebSocket protocol schemas for the streaming server.

Every control message shares the envelope {"type": <str>, ...}. Pydantic models live here so the server can parse / validate incoming frames and emit well-typed outgoing frames without hand-rolled dicts.

The message catalogue matches the contract in docs/design/server_contracts/streaming.md; additions must land in both places in the same PR.

Classes

fastvideo.entrypoints.streaming.protocol.ContinuationStateSnapshot

Bases: BaseModel

Attributes
fastvideo.entrypoints.streaming.protocol.ContinuationStateSnapshot.state instance-attribute
state: dict[str, Any]

{kind, payload} dict matching :class:fastvideo.api.ContinuationState.

fastvideo.entrypoints.streaming.protocol.MediaInit

Bases: BaseModel

Descriptor for the fMP4 initialization segment that follows.

fastvideo.entrypoints.streaming.protocol.SegmentPromptSource

Bases: BaseModel

Request a new segment using a specific prompt.

fastvideo.entrypoints.streaming.protocol.SessionInitV2

Bases: BaseModel

Opening frame the client sends after the WebSocket handshake.

Attributes
fastvideo.entrypoints.streaming.protocol.SessionInitV2.continuation_state class-attribute instance-attribute
continuation_state: dict[str, Any] | None = None

Optional {kind, payload} dict; hydrated into :class:fastvideo.api.ContinuationState server-side.

fastvideo.entrypoints.streaming.protocol.SnapshotState

Bases: BaseModel

Request the current ContinuationState for export.

Functions:

fastvideo.entrypoints.streaming.protocol.parse_client_message
parse_client_message(raw: dict[str, Any]) -> ClientMessage

Parse an incoming WebSocket dict into a typed client message.

Unknown type values raise :class:pydantic.ValidationError; the server handler turns that into an error frame with code="invalid_message".

Source code in fastvideo/entrypoints/streaming/protocol.py
def parse_client_message(raw: dict[str, Any]) -> ClientMessage:
    """Parse an incoming WebSocket dict into a typed client message.

    Unknown ``type`` values raise :class:`pydantic.ValidationError`; the
    server handler turns that into an ``error`` frame with
    ``code="invalid_message"``.
    """
    from pydantic import TypeAdapter

    return TypeAdapter(ClientMessage).validate_python(raw)

fastvideo.entrypoints.streaming.router

Multi-replica load balancer + WebSocket proxy for the streaming server.

Sits in front of one-or-more streaming-server replicas and forwards WebSocket sessions to a healthy primary, with failover to secondaries. Kept in-repo under fastvideo/entrypoints/streaming/router/ per the PR plan's default; the alternative (separate package) is an open question deferred to review.

Classes

fastvideo.entrypoints.streaming.router.ReplicaRegistry
ReplicaRegistry(replicas: list[ReplicaEndpoint])

Stateful map of replica URL → :class:Replica.

Selection favors primary replicas when healthy; otherwise the first healthy non-primary is returned. When none are healthy, the registry returns None so the router can reject incoming sessions with gpu_unavailable.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def __init__(self, replicas: list[ReplicaEndpoint]) -> None:
    if not replicas:
        raise ValueError("ReplicaRegistry requires at least one replica")
    self._replicas: dict[str, Replica] = {endpoint.url: Replica(endpoint=endpoint) for endpoint in replicas}
    self._lock = asyncio.Lock()
Methods:
fastvideo.entrypoints.streaming.router.ReplicaRegistry.select
select() -> Replica | None

Pick the best healthy replica.

Priority order:

  1. The first healthy primary (insertion order).
  2. The first healthy non-primary (insertion order).
  3. None when nothing is healthy.

This MVP picks the first match within each tier; it does NOT load-balance across multiple healthy replicas of the same tier. Round-robin and weighted distribution are deferred until a real N-way active deployment exists.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def select(self) -> Replica | None:
    """Pick the best healthy replica.

    Priority order:

    1. The first healthy primary (insertion order).
    2. The first healthy non-primary (insertion order).
    3. ``None`` when nothing is healthy.

    This MVP picks the first match within each tier; it does NOT
    load-balance across multiple healthy replicas of the same tier.
    Round-robin and weighted distribution are deferred until a real
    N-way active deployment exists.
    """
    healthy_primaries = [r for r in self._replicas.values() if r.primary and r.is_healthy]
    if healthy_primaries:
        return healthy_primaries[0]
    healthy = [r for r in self._replicas.values() if r.is_healthy]
    if healthy:
        return healthy[0]
    return None
fastvideo.entrypoints.streaming.router.RouterConfig dataclass
RouterConfig(host: str = '0.0.0.0', port: int = 9000, replicas: list[ReplicaEndpoint] = list(), health_check_path: str = '/health', health_check_interval_seconds: float = 5.0, health_check_timeout_seconds: float = 2.0, failure_threshold: int = 3, recovery_threshold: int = 2)

Typed router config loaded from a YAML file.

Example::

router:
  host: 0.0.0.0
  port: 9000
  replicas:
    - url: http://streamer-a:8000
      primary: true
    - url: http://streamer-b:8000
  health_check:
    path: /health
    interval_seconds: 5
    failure_threshold: 3

Validation runs in __post_init__: empty replicas, non-positive intervals/timeouts, thresholds < 1, non-http(s) URLs, and more than one primary all raise ValueError so misconfigurations surface at load time rather than as confusing runtime failures.

Functions:

fastvideo.entrypoints.streaming.router.build_router_app
build_router_app(config: RouterConfig, *, registry: ReplicaRegistry | None = None) -> FastAPI

Build the router FastAPI app.

registry can be injected for tests; defaults to one built from config.replicas.

Source code in fastvideo/entrypoints/streaming/router/main.py
def build_router_app(
    config: RouterConfig,
    *,
    registry: ReplicaRegistry | None = None,
) -> FastAPI:
    """Build the router FastAPI app.

    ``registry`` can be injected for tests; defaults to one built from
    ``config.replicas``.
    """
    registry = registry or ReplicaRegistry(config.replicas)
    state = _RouterState(
        config=config,
        registry=registry,
        stop_event=asyncio.Event(),
    )

    @contextlib.asynccontextmanager
    async def _lifespan(_app: FastAPI):
        state.health_task = asyncio.create_task(
            run_health_check_loop(
                registry=state.registry,
                config=state.config,
                stop_event=state.stop_event,
            ))
        try:
            yield
        finally:
            state.stop_event.set()
            if state.health_task is not None:
                with contextlib.suppress(asyncio.CancelledError):
                    await state.health_task

    app = FastAPI(title="FastVideo Streaming Router", lifespan=_lifespan)

    @app.get("/status")
    async def _status() -> JSONResponse:
        return JSONResponse({
            "replicas": [{
                "url": r.url,
                "primary": r.primary,
                "status": r.health.status.value,
                "last_ok_at": r.health.last_ok_at,
                "last_latency_ms": r.health.last_latency_ms,
                "consecutive_failures": r.health.consecutive_failures,
            } for r in state.registry.all()],
        })

    @app.websocket("/v1/stream")
    async def _proxy(websocket: WebSocket) -> None:
        await websocket.accept()
        replica = state.registry.select()
        if replica is None:
            await websocket.send_json({
                "type": "error",
                "code": "gpu_unavailable",
                "message": "router: no healthy replica available",
                "retryable": True,
            })
            await websocket.close(code=1013, reason="no_healthy_replica")
            return

        ws_url = _websocket_url_for(replica.url)
        try:
            await _bridge_session(websocket, ws_url)
        except WebSocketDisconnect:
            logger.info("router: client disconnected")
        except Exception as exc:
            logger.exception("router: bridge failed: %s", exc)
            with contextlib.suppress(RuntimeError):
                await websocket.send_json({
                    "type": "error",
                    "code": "worker_failed",
                    "message": f"router bridge failed: {exc}",
                    "retryable": True,
                })
            with contextlib.suppress(RuntimeError):
                await websocket.close(code=1011)

    app.state.router_state = state
    return app

Modules

fastvideo.entrypoints.streaming.router.config

Typed router configuration.

Classes
fastvideo.entrypoints.streaming.router.config.ReplicaEndpoint dataclass
ReplicaEndpoint(url: str, name: str | None = None, primary: bool = False, weight: float = 1.0)

One backend replica the router can route to.

Attributes
fastvideo.entrypoints.streaming.router.config.ReplicaEndpoint.primary class-attribute instance-attribute
primary: bool = False

True = prefer this replica over others in steady state.

fastvideo.entrypoints.streaming.router.config.ReplicaEndpoint.url instance-attribute
url: str

HTTP base URL, e.g. http://host:8000. WebSocket URL is derived automatically by replacing the scheme.

fastvideo.entrypoints.streaming.router.config.RouterConfig dataclass
RouterConfig(host: str = '0.0.0.0', port: int = 9000, replicas: list[ReplicaEndpoint] = list(), health_check_path: str = '/health', health_check_interval_seconds: float = 5.0, health_check_timeout_seconds: float = 2.0, failure_threshold: int = 3, recovery_threshold: int = 2)

Typed router config loaded from a YAML file.

Example::

router:
  host: 0.0.0.0
  port: 9000
  replicas:
    - url: http://streamer-a:8000
      primary: true
    - url: http://streamer-b:8000
  health_check:
    path: /health
    interval_seconds: 5
    failure_threshold: 3

Validation runs in __post_init__: empty replicas, non-positive intervals/timeouts, thresholds < 1, non-http(s) URLs, and more than one primary all raise ValueError so misconfigurations surface at load time rather than as confusing runtime failures.

fastvideo.entrypoints.streaming.router.main

Router FastAPI entry point.

Exposes the same /v1/stream WebSocket path the backend servers do, accepts a client, picks a healthy replica from the registry, and proxies frames bidirectionally.

PR 7.9 ships the minimum-viable shape: explicit replica list, single primary, JSON + binary passthrough in both directions, and a /status endpoint for operators. Sticky-session routing (so a reconnect lands on the same backend) is left for a follow-up.

Classes
Functions:
fastvideo.entrypoints.streaming.router.main.build_router_app
build_router_app(config: RouterConfig, *, registry: ReplicaRegistry | None = None) -> FastAPI

Build the router FastAPI app.

registry can be injected for tests; defaults to one built from config.replicas.

Source code in fastvideo/entrypoints/streaming/router/main.py
def build_router_app(
    config: RouterConfig,
    *,
    registry: ReplicaRegistry | None = None,
) -> FastAPI:
    """Build the router FastAPI app.

    ``registry`` can be injected for tests; defaults to one built from
    ``config.replicas``.
    """
    registry = registry or ReplicaRegistry(config.replicas)
    state = _RouterState(
        config=config,
        registry=registry,
        stop_event=asyncio.Event(),
    )

    @contextlib.asynccontextmanager
    async def _lifespan(_app: FastAPI):
        state.health_task = asyncio.create_task(
            run_health_check_loop(
                registry=state.registry,
                config=state.config,
                stop_event=state.stop_event,
            ))
        try:
            yield
        finally:
            state.stop_event.set()
            if state.health_task is not None:
                with contextlib.suppress(asyncio.CancelledError):
                    await state.health_task

    app = FastAPI(title="FastVideo Streaming Router", lifespan=_lifespan)

    @app.get("/status")
    async def _status() -> JSONResponse:
        return JSONResponse({
            "replicas": [{
                "url": r.url,
                "primary": r.primary,
                "status": r.health.status.value,
                "last_ok_at": r.health.last_ok_at,
                "last_latency_ms": r.health.last_latency_ms,
                "consecutive_failures": r.health.consecutive_failures,
            } for r in state.registry.all()],
        })

    @app.websocket("/v1/stream")
    async def _proxy(websocket: WebSocket) -> None:
        await websocket.accept()
        replica = state.registry.select()
        if replica is None:
            await websocket.send_json({
                "type": "error",
                "code": "gpu_unavailable",
                "message": "router: no healthy replica available",
                "retryable": True,
            })
            await websocket.close(code=1013, reason="no_healthy_replica")
            return

        ws_url = _websocket_url_for(replica.url)
        try:
            await _bridge_session(websocket, ws_url)
        except WebSocketDisconnect:
            logger.info("router: client disconnected")
        except Exception as exc:
            logger.exception("router: bridge failed: %s", exc)
            with contextlib.suppress(RuntimeError):
                await websocket.send_json({
                    "type": "error",
                    "code": "worker_failed",
                    "message": f"router bridge failed: {exc}",
                    "retryable": True,
                })
            with contextlib.suppress(RuntimeError):
                await websocket.close(code=1011)

    app.state.router_state = state
    return app
fastvideo.entrypoints.streaming.router.registry

Replica registry + health-check loop.

The registry tracks the set of known backend replicas and their live health. The router consults it for "pick a backend for this session" decisions and a background task updates it from periodic HTTP probes.

State machine per replica::

HEALTHY ──(N consecutive failures)──▶ UNHEALTHY
   ▲                                     │
   └──────(M consecutive successes)──────┘

Where N = :attr:RouterConfig.failure_threshold and M = :attr:RouterConfig.recovery_threshold.

Attributes
fastvideo.entrypoints.streaming.router.registry.HttpProbe module-attribute
HttpProbe = Any

Structural alias for health-probe callables. Concrete signature is async def __call__(url: str, *, timeout: float) -> tuple[float, str | None]; typing.Callable cannot express keyword-only parameters, so duck-typing is the pragmatic compromise.

Classes
fastvideo.entrypoints.streaming.router.registry.ReplicaRegistry
ReplicaRegistry(replicas: list[ReplicaEndpoint])

Stateful map of replica URL → :class:Replica.

Selection favors primary replicas when healthy; otherwise the first healthy non-primary is returned. When none are healthy, the registry returns None so the router can reject incoming sessions with gpu_unavailable.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def __init__(self, replicas: list[ReplicaEndpoint]) -> None:
    if not replicas:
        raise ValueError("ReplicaRegistry requires at least one replica")
    self._replicas: dict[str, Replica] = {endpoint.url: Replica(endpoint=endpoint) for endpoint in replicas}
    self._lock = asyncio.Lock()
Methods:
fastvideo.entrypoints.streaming.router.registry.ReplicaRegistry.select
select() -> Replica | None

Pick the best healthy replica.

Priority order:

  1. The first healthy primary (insertion order).
  2. The first healthy non-primary (insertion order).
  3. None when nothing is healthy.

This MVP picks the first match within each tier; it does NOT load-balance across multiple healthy replicas of the same tier. Round-robin and weighted distribution are deferred until a real N-way active deployment exists.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def select(self) -> Replica | None:
    """Pick the best healthy replica.

    Priority order:

    1. The first healthy primary (insertion order).
    2. The first healthy non-primary (insertion order).
    3. ``None`` when nothing is healthy.

    This MVP picks the first match within each tier; it does NOT
    load-balance across multiple healthy replicas of the same tier.
    Round-robin and weighted distribution are deferred until a real
    N-way active deployment exists.
    """
    healthy_primaries = [r for r in self._replicas.values() if r.primary and r.is_healthy]
    if healthy_primaries:
        return healthy_primaries[0]
    healthy = [r for r in self._replicas.values() if r.is_healthy]
    if healthy:
        return healthy[0]
    return None
Functions:
fastvideo.entrypoints.streaming.router.registry.run_health_check_loop async
run_health_check_loop(registry: ReplicaRegistry, config: RouterConfig, *, stop_event: Event, http_get: HttpProbe | None = None) -> None

Poll all replicas' health endpoints in parallel on a fixed interval.

http_get is pluggable so unit tests can inject a deterministic probe without hitting the network. The default builds a single httpx.AsyncClient shared across the loop's lifetime so the common case (steady polling against a stable replica set) reuses TCP/TLS connections instead of paying handshake cost per probe.

Probes within one polling cycle run concurrently via asyncio.gather so a slow replica doesn't push the cycle past health_check_interval_seconds.

Source code in fastvideo/entrypoints/streaming/router/registry.py
async def run_health_check_loop(
    registry: ReplicaRegistry,
    config: RouterConfig,
    *,
    stop_event: asyncio.Event,
    http_get: HttpProbe | None = None,
) -> None:
    """Poll all replicas' health endpoints in parallel on a fixed interval.

    ``http_get`` is pluggable so unit tests can inject a deterministic
    probe without hitting the network. The default builds a single
    ``httpx.AsyncClient`` shared across the loop's lifetime so the
    common case (steady polling against a stable replica set) reuses
    TCP/TLS connections instead of paying handshake cost per probe.

    Probes within one polling cycle run concurrently via ``asyncio.gather``
    so a slow replica doesn't push the cycle past
    ``health_check_interval_seconds``.
    """
    if http_get is not None:
        await _run_loop(registry, config, stop_event, http_get)
        return
    async with _build_default_probe(config) as probe:
        await _run_loop(registry, config, stop_event, probe)

fastvideo.entrypoints.streaming.server

Single-generator FastAPI + WebSocket streaming server.

Classes

Functions:

fastvideo.entrypoints.streaming.server.build_app
build_app(serve_config: ServeConfig, generator: _GeneratorProto | None = None, *, pool: GpuPool | None = None, session_store: SessionStore | None = None) -> FastAPI

Build the FastAPI app used by :func:run_server.

Exposed so tests can drive the WebSocket endpoint in-process via starlette.testclient.TestClient(app).websocket_connect(...).

Exactly one of generator (backed by :class:InProcessGpuPool) or pool (for the subprocess-backed production shape) must be given.

Source code in fastvideo/entrypoints/streaming/server.py
def build_app(
    serve_config: ServeConfig,
    generator: _GeneratorProto | None = None,
    *,
    pool: GpuPool | None = None,
    session_store: SessionStore | None = None,
) -> FastAPI:
    """Build the FastAPI app used by :func:`run_server`.

    Exposed so tests can drive the WebSocket endpoint in-process via
    ``starlette.testclient.TestClient(app).websocket_connect(...)``.

    Exactly one of ``generator`` (backed by :class:`InProcessGpuPool`)
    or ``pool`` (for the subprocess-backed production shape) must be
    given.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming "
                         "server; got None. Add a `streaming:` block to your serve config.")
    streaming = serve_config.streaming
    if (generator is None) == (pool is None):
        raise ValueError("build_app requires exactly one of `generator` or `pool`")

    store = session_store or InMemorySessionStore()
    if pool is None:
        assert generator is not None
        pool = InProcessGpuPool(generator, session_store=store)

    sessions = SessionManager(
        segment_cap=serve_config.streaming.generation_segment_cap,
        session_timeout_seconds=serve_config.streaming.session_timeout_seconds,
    )
    state = ServerState(
        serve_config=serve_config,
        pool=pool,
        sessions=sessions,
        session_store=store,
    )

    app = FastAPI(title="FastVideo Streaming")

    @app.get("/health")
    async def _health() -> JSONResponse:
        return JSONResponse({
            "status": "ok",
            "sessions": len(state.sessions),
            "stream_mode": streaming.stream_mode,
        })

    app.include_router(build_health_router(pool))

    @app.websocket("/v1/stream")
    async def _stream(websocket: WebSocket) -> None:
        await websocket.accept()
        try:
            session = state.sessions.create()
        except SessionRejected as exc:
            await _send_error(websocket, "session_rejected", str(exc), retryable=False)
            await websocket.close(code=_WS_CLOSE_TRY_AGAIN_LATER, reason="session_rejected")
            return

        try:
            await _handle_session(websocket, session, state)
        except WebSocketDisconnect:
            logger.info("session %s: client disconnected", session.id[:8])
        except Exception:  # pragma: no cover - defensive catch-all
            logger.exception("session %s: unhandled error", session.id[:8])
            with contextlib.suppress(InvalidSessionTransition):
                session.transition(SessionState.ERROR)
        finally:
            with contextlib.suppress(Exception):
                await state.pool.release(session.id)
            _cleanup_session(session, state)

    app.state.server_state = state
    return app
fastvideo.entrypoints.streaming.server.run_server
run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None

Launch the streaming server.

Boots a :class:fastvideo.VideoGenerator from serve_config.generator unless generator is provided, then serves build_app(...) via uvicorn.

Source code in fastvideo/entrypoints/streaming/server.py
def run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None:
    """Launch the streaming server.

    Boots a :class:`fastvideo.VideoGenerator` from
    ``serve_config.generator`` unless ``generator`` is provided, then
    serves ``build_app(...)`` via uvicorn.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming server; "
                         "got None. Add a `streaming:` block to your serve config.")

    import uvicorn

    if generator is None:
        from fastvideo import VideoGenerator  # lazy to avoid boot cost

        generator = VideoGenerator.from_pretrained(config=serve_config.generator)
    app = build_app(serve_config, generator)
    uvicorn.run(
        app,
        host=serve_config.server.host,
        port=serve_config.server.port,
    )

fastvideo.entrypoints.streaming.session

Per-connection session lifecycle for the streaming server.

Each WebSocket opens exactly one :class:Session. :class:SessionManager enforces the generation_segment_cap and session_timeout_seconds budgets from :class:fastvideo.api.StreamingConfig.

Classes

fastvideo.entrypoints.streaming.session.InvalidSessionTransition

Bases: RuntimeError

Raised when a session is asked to transition along an illegal edge.

fastvideo.entrypoints.streaming.session.Session dataclass
Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())
Methods:
fastvideo.entrypoints.streaming.session.Session.transition
transition(target: SessionState) -> None

Move to target if the edge is allowed.

Raises :class:InvalidSessionTransition on illegal moves. The self-loop on ACTIVE is legal so the server can re-assert ACTIVE on segment completion without special casing.

Source code in fastvideo/entrypoints/streaming/session.py
def transition(self, target: SessionState) -> None:
    """Move to ``target`` if the edge is allowed.

    Raises :class:`InvalidSessionTransition` on illegal moves. The
    self-loop on ``ACTIVE`` is legal so the server can re-assert
    ACTIVE on segment completion without special casing.
    """
    allowed = _VALID_TRANSITIONS.get(self.state, frozenset())
    if target not in allowed and target is not self.state:
        raise InvalidSessionTransition(f"{self.state.value} -> {target.value} is not a valid "
                                       f"session transition")
    self.state = target
    self.last_activity = time.monotonic()
fastvideo.entrypoints.streaming.session.SessionManager
SessionManager(*, segment_cap: int, session_timeout_seconds: int, max_sessions: int = 1)

Registers sessions and enforces per-server session limits.

Source code in fastvideo/entrypoints/streaming/session.py
def __init__(
    self,
    *,
    segment_cap: int,
    session_timeout_seconds: int,
    max_sessions: int = 1,
) -> None:
    self._segment_cap = segment_cap
    self._session_timeout_seconds = session_timeout_seconds
    self._max_sessions = max_sessions
    self._sessions: dict[str, Session] = {}
Methods:
fastvideo.entrypoints.streaming.session.SessionManager.reap_timed_out
reap_timed_out(now: float | None = None) -> list[str]

Return the ids of sessions that have exceeded the idle timeout.

The caller is responsible for actually closing them — this method only identifies dead sessions so the server can emit session_timeout frames before dropping the WebSocket.

TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.

Source code in fastvideo/entrypoints/streaming/session.py
def reap_timed_out(self, now: float | None = None) -> list[str]:
    """Return the ids of sessions that have exceeded the idle timeout.

    The caller is responsible for actually closing them — this
    method only *identifies* dead sessions so the server can emit
    ``session_timeout`` frames before dropping the WebSocket.

    TODO: unused until a background driver calls it. Per-connection
    idle enforcement currently happens via asyncio.wait_for on
    receive_json; this helper catches sessions stuck before any
    receive (e.g. future QUEUED state) and is expected to be wired
    into the GPU-pool reaper.
    """
    now = now if now is not None else time.monotonic()
    dead: list[str] = []
    for sid, session in self._sessions.items():
        if session.state in {
                SessionState.COMPLETE,
                SessionState.ERROR,
                SessionState.TIMEOUT,
                SessionState.REJECTED,
        }:
            continue
        if now - session.last_activity > self._session_timeout_seconds:
            dead.append(sid)
    return dead
fastvideo.entrypoints.streaming.session.SessionRejected

Bases: RuntimeError

Raised when session creation fails (queue full, auth, etc.).

fastvideo.entrypoints.streaming.session.SessionState

Bases: Enum

State-machine positions for a streaming session.

Transitions are server-owned. See docs/design/server_contracts/streaming.md for the full diagram.

fastvideo.entrypoints.streaming.session_init_image

Persist the initial-image blob attached to a streaming session.

Classes

fastvideo.entrypoints.streaming.session_init_image.SessionInitImage dataclass
SessionInitImage(path: str, display_name: str, mime: str)

Location of the persisted init image.

Callers pass path to InputConfig.image_path; display_name is only used for logs.

Functions:

fastvideo.entrypoints.streaming.session_init_image.persist_session_init_image
persist_session_init_image(payload: Any, *, output_dir: str | None = None) -> SessionInitImage | None

Decode a client init-image blob and persist it to disk.

payload shape (matches the internal UI protocol)::

{
    "mime": "image/png",
    "name": "ref.png",
    "data": "<base64 bytes>",
}

Returns None when payload is falsy (no init image). Raises :class:ValueError on schema / size / decode errors so the caller can surface a user-facing error frame.

Source code in fastvideo/entrypoints/streaming/session_init_image.py
def persist_session_init_image(
    payload: Any,
    *,
    output_dir: str | None = None,
) -> SessionInitImage | None:
    """Decode a client init-image blob and persist it to disk.

    ``payload`` shape (matches the internal UI protocol)::

        {
            "mime": "image/png",
            "name": "ref.png",
            "data": "<base64 bytes>",
        }

    Returns ``None`` when ``payload`` is falsy (no init image). Raises
    :class:`ValueError` on schema / size / decode errors so the caller
    can surface a user-facing ``error`` frame.
    """
    if not payload:
        return None
    if not isinstance(payload, dict):
        raise ValueError("session init image must be an object")

    mime = payload.get("mime")
    if mime not in _ACCEPTED_MIMES:
        raise ValueError(f"session init image mime {mime!r} is not one of "
                         f"{sorted(_ACCEPTED_MIMES)}")
    data_b64 = payload.get("data")
    if not isinstance(data_b64, str):
        raise ValueError("session init image data must be a base64 string")
    try:
        data = base64.b64decode(data_b64, validate=True)
    except (binascii.Error, ValueError) as exc:
        raise ValueError(f"session init image data is not valid base64: {exc}") from exc
    if len(data) > _MAX_IMAGE_BYTES:
        raise ValueError(f"session init image is {len(data)} bytes; limit is "
                         f"{_MAX_IMAGE_BYTES}")
    if len(data) == 0:
        raise ValueError("session init image data is empty")

    ext = _ACCEPTED_MIMES[mime]
    display_name = _sanitize_display_name(payload.get("name")) or f"init{ext}"
    fd, path = tempfile.mkstemp(prefix="fastvideo-init-", suffix=ext, dir=output_dir)
    try:
        with os.fdopen(fd, "wb") as f:
            f.write(data)
    except Exception:
        with contextlib.suppress(FileNotFoundError):
            os.unlink(path)
        raise
    return SessionInitImage(path=path, display_name=display_name, mime=mime)

fastvideo.entrypoints.streaming.session_logger

Per-session JSONL event logger.

Each session gets its own JSONL file under the configured log root so post-hoc analytics (enhancer latency, GPU assignment, segment timings) can be recovered without a tracing backend. The internal UI uses this format; keeping the same shape makes log tooling portable.

Classes

fastvideo.entrypoints.streaming.session_logger.SessionLogEvent dataclass
SessionLogEvent(session_id: str, event: str, payload: dict[str, Any] = dict(), ts: float = time())

One line in the session JSONL file.

fastvideo.entrypoints.streaming.session_logger.SessionLogger
SessionLogger(log_dir: str | None)

Append-only JSONL logger keyed by session id.

Thread-safe; the server may be writing from multiple asyncio tasks (fMP4 encoder thread + control-frame handler) for the same session.

Source code in fastvideo/entrypoints/streaming/session_logger.py
def __init__(self, log_dir: str | None) -> None:
    self._log_dir = log_dir
    self._files: dict[str, TextIO] = {}
    self._locks: dict[str, threading.Lock] = {}
    self._registry_lock = threading.Lock()
    self._ensure_dir()

fastvideo.entrypoints.streaming.session_store

Session state store for the FastVideo streaming server.

The streaming server keeps continuation state (decoded frames + audio latents from the previous segment) server-side so the client doesn't re-upload multi-megabyte tensors each WebSocket message. Two operations are needed:

  • snapshot(session_id) -> ContinuationState — serialize the current state so it can be exported (e.g. over HTTP) or migrated to a different server.
  • hydrate(state) -> session_id — load a previously serialized state into a new session (for resume-after-disconnect flows).

The store is an ABC with an :class:InMemorySessionStore default; Redis or other backends can drop in without touching the pipeline.

Large tensor payloads (video frames, audio latents) are kept out of the JSON payload via an accompanying :class:BlobStore. Both stores share a process today; they are separate types so that a future implementation can put blobs on S3 while keeping session metadata in Redis.

Classes

fastvideo.entrypoints.streaming.session_store.BlobStore

Bases: ABC

Opaque byte-blob storage keyed by id.

A :class:ContinuationState payload can reference large tensors stored in a :class:BlobStore rather than inlining them, so the JSON payload stays small when the state travels over the wire.

Methods:
fastvideo.entrypoints.streaming.session_store.BlobStore.drop abstractmethod
drop(blob_id: str) -> None

Remove a blob. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, blob_id: str) -> None:
    """Remove a blob. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.session_store.BlobStore.get abstractmethod
get(blob_id: str) -> bytes

Load a previously stored blob. Raises KeyError if absent.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def get(self, blob_id: str) -> bytes:
    """Load a previously stored blob. Raises ``KeyError`` if absent."""
fastvideo.entrypoints.streaming.session_store.BlobStore.put abstractmethod
put(data: bytes, *, mime: str = 'application/octet-stream') -> str

Store data and return a blob id for later retrieval.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def put(self, data: bytes, *, mime: str = "application/octet-stream") -> str:
    """Store ``data`` and return a blob id for later retrieval."""
fastvideo.entrypoints.streaming.session_store.InMemoryBlobStore
InMemoryBlobStore()

Bases: BlobStore

Thread-safe in-memory :class:BlobStore for single-process servers.

No eviction policy — callers are responsible for calling :meth:drop when a blob's owning state is replaced or a session ends. A redis- or filesystem-backed :class:BlobStore should replace this when the streaming server lands as a real service (PR 7.5+).

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._blobs: dict[str, _BlobRecord] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.session_store.InMemorySessionStore
InMemorySessionStore()

Bases: SessionStore

Thread-safe in-memory :class:SessionStore.

Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.

No eviction / TTL / bounded capacity — sessions only leave via :meth:drop. The live streaming server (PR 7.5+) is responsible for bounding growth and for dropping any :class:BlobStore blobs referenced by a state when that state is replaced or a session ends; this class does not know about blobs.

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._sessions: dict[str, ContinuationState] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.session_store.SessionStore

Bases: ABC

Keyed store for per-session continuation state.

Implementations own the session-id → state mapping. The streaming server calls :meth:store after each segment and :meth:snapshot when a client explicitly asks for an exportable state handle.

Methods:
fastvideo.entrypoints.streaming.session_store.SessionStore.drop abstractmethod
drop(session_id: str) -> None

Forget a session. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, session_id: str) -> None:
    """Forget a session. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.session_store.SessionStore.hydrate abstractmethod
hydrate(state: ContinuationState, *, session_id: str | None = None) -> str

Install state as the starting point for a session.

When session_id is None the store allocates a fresh id (UUID4); when provided the store uses it verbatim, overwriting any prior state at that id.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def hydrate(
    self,
    state: ContinuationState,
    *,
    session_id: str | None = None,
) -> str:
    """Install ``state`` as the starting point for a session.

    When ``session_id`` is ``None`` the store allocates a fresh id
    (UUID4); when provided the store uses it verbatim, overwriting
    any prior state at that id.
    """
fastvideo.entrypoints.streaming.session_store.SessionStore.snapshot abstractmethod
snapshot(session_id: str) -> ContinuationState | None

Return the current state for session_id (or None).

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def snapshot(self, session_id: str) -> ContinuationState | None:
    """Return the current state for ``session_id`` (or ``None``)."""
fastvideo.entrypoints.streaming.session_store.SessionStore.store abstractmethod
store(session_id: str, state: ContinuationState) -> None

Persist state for session_id, replacing any prior value.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def store(self, session_id: str, state: ContinuationState) -> None:
    """Persist ``state`` for ``session_id``, replacing any prior value."""

fastvideo.entrypoints.streaming.stream

fMP4 stream encoder used by the streaming server.

The client's Media Source Extensions player needs a continuous fMP4 byte stream: first an initialization segment (ftyp + moov), then one or more media segments (moof + mdat). We pipe raw RGB frames into an ffmpeg subprocess configured for fragmented output via -movflags empty_moov+default_base_moof+frag_keyframe+faststart and stream the bytes back out.

Classes

fastvideo.entrypoints.streaming.stream.FragmentedMP4Chunk dataclass
FragmentedMP4Chunk(kind: Literal['init', 'media'], data: bytes, stream_id: str, segment_idx: int)

A single fMP4 byte chunk emitted by :class:FragmentedMP4Encoder.

kind identifies whether the chunk is the init segment (must be fed into the client's SourceBuffer first) or a media fragment.

fastvideo.entrypoints.streaming.stream.FragmentedMP4Encoder
FragmentedMP4Encoder(*, width: int, height: int, fps: int, segment_idx: int, stream_id: str | None = None, ffmpeg_path: str = 'ffmpeg', preset: str = 'ultrafast', pixel_format_out: str = 'yuv420p', extra_args: list[str] | None = None)

Stream RGB frames in, fMP4 chunks out.

One encoder covers one segment. The server creates a new encoder per :class:`ltx2_segment_start`` boundary so each segment becomes one media fragment the client can append independently.

Example::

encoder = FragmentedMP4Encoder(width=1024, height=576, fps=24,
                                segment_idx=0)
async with encoder:
    async for chunk in encoder.encode(frames):
        await websocket.send_bytes(chunk.data)
Source code in fastvideo/entrypoints/streaming/stream.py
def __init__(
    self,
    *,
    width: int,
    height: int,
    fps: int,
    segment_idx: int,
    stream_id: str | None = None,
    ffmpeg_path: str = "ffmpeg",
    preset: str = "ultrafast",
    pixel_format_out: str = "yuv420p",
    extra_args: list[str] | None = None,
) -> None:
    self.width = width
    self.height = height
    self.fps = fps
    self.segment_idx = segment_idx
    self.stream_id = stream_id or uuid.uuid4().hex
    self._ffmpeg_path = ffmpeg_path
    self._preset = preset
    self._pixel_format_out = pixel_format_out
    self._extra_args = list(extra_args or [])
    self._proc: subprocess.Popen | None = None
    self._init_emitted = False
Methods:
fastvideo.entrypoints.streaming.stream.FragmentedMP4Encoder.encode async
encode(frames: list[ndarray] | AsyncIterator[ndarray]) -> AsyncIterator[FragmentedMP4Chunk]

Feed frames into ffmpeg and yield fMP4 chunks as they appear.

Source code in fastvideo/entrypoints/streaming/stream.py
async def encode(
    self,
    frames: list[np.ndarray] | AsyncIterator[np.ndarray],
) -> AsyncIterator[FragmentedMP4Chunk]:
    """Feed frames into ffmpeg and yield fMP4 chunks as they appear."""
    if self._proc is None:
        self._spawn()
    assert self._proc is not None and self._proc.stdin is not None
    proc = self._proc

    loop = asyncio.get_running_loop()

    async def _writer() -> None:
        try:
            if hasattr(frames, "__aiter__"):
                async for frame in frames:  # type: ignore[union-attr]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
            else:
                for frame in frames:  # type: ignore[assignment]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
        finally:
            with contextlib.suppress(BrokenPipeError):
                proc.stdin.close()

    writer_task = asyncio.create_task(_writer())
    try:
        reader = proc.stdout
        assert reader is not None
        # Read in reasonably-sized chunks; MSE tolerates any size
        # but we don't want to starve the event loop.
        chunk_size = 64 * 1024
        while True:
            data = await loop.run_in_executor(None, reader.read, chunk_size)
            if not data:
                break
            kind: Literal["init", "media"] = "init" if not self._init_emitted else "media"
            self._init_emitted = True
            yield FragmentedMP4Chunk(
                kind=kind,
                data=bytes(data),
                stream_id=self.stream_id,
                segment_idx=self.segment_idx,
            )
    finally:
        await writer_task

fastvideo.entrypoints.streaming.worker

Per-GPU worker subprocess entry for :class:SubprocessGpuPool.

The pool manages binding, lifecycle, and message dispatch in the parent process. The worker constructs its :class:VideoGenerator from a typed :class:GeneratorConfig, runs the two-segment warmup so both initial-segment and continuation-branch compile graphs are hot, and then loops on the job queue.

Functions:

fastvideo.entrypoints.streaming.worker.worker_main
worker_main(*, gpu_id: int, worker_id: str, generator_config: GeneratorConfig, warmup_config: WarmupConfig, job_queue: Queue, result_queue: Queue, shutdown_event: Any) -> None

Per-worker subprocess entry.

Runs inside the child spawned by SubprocessGpuPool. Blocking VideoGenerator construction + generation happens here, not in the parent's event loop.

Source code in fastvideo/entrypoints/streaming/worker.py
def worker_main(
    *,
    gpu_id: int,
    worker_id: str,
    generator_config: GeneratorConfig,
    warmup_config: WarmupConfig,
    job_queue: mp.Queue,
    result_queue: mp.Queue,
    shutdown_event: Any,
) -> None:  # pragma: no cover - exercised via integration only
    """Per-worker subprocess entry.

    Runs inside the child spawned by ``SubprocessGpuPool``. Blocking
    ``VideoGenerator`` construction + generation happens here, not in
    the parent's event loop.
    """
    import os

    os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
    try:
        from fastvideo import VideoGenerator

        generator = VideoGenerator.from_pretrained(config=generator_config)
        if warmup_config.enabled:
            _warmup_worker(generator, warmup_config)
        result_queue.put({"kind": "ready", "worker_id": worker_id})
    except Exception as exc:
        result_queue.put({"kind": "error", "error": repr(exc)})
        return

    while not shutdown_event.is_set():
        try:
            item = job_queue.get(timeout=0.5)
        except queue.Empty:
            continue
        if item is None:
            break
        job_id = item["job_id"]
        request = item["request"]
        try:
            result = generator.generate(request)
            result_queue.put({
                "kind": "result",
                "job_id": job_id,
                "result": result,
            })
        except Exception as exc:
            result_queue.put({
                "kind": "error",
                "job_id": job_id,
                "error": repr(exc),
            })