Skip to content

streaming

Classes

fastvideo.entrypoints.streaming.BlobStore

Bases: ABC

Opaque byte-blob storage keyed by id.

A :class:ContinuationState payload can reference large tensors stored in a :class:BlobStore rather than inlining them, so the JSON payload stays small when the state travels over the wire.

Functions

fastvideo.entrypoints.streaming.BlobStore.drop abstractmethod
drop(blob_id: str) -> None

Remove a blob. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, blob_id: str) -> None:
    """Remove a blob. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.BlobStore.get abstractmethod
get(blob_id: str) -> bytes

Load a previously stored blob. Raises KeyError if absent.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def get(self, blob_id: str) -> bytes:
    """Load a previously stored blob. Raises ``KeyError`` if absent."""
fastvideo.entrypoints.streaming.BlobStore.put abstractmethod
put(data: bytes, *, mime: str = 'application/octet-stream') -> str

Store data and return a blob id for later retrieval.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def put(self, data: bytes, *, mime: str = "application/octet-stream") -> str:
    """Store ``data`` and return a blob id for later retrieval."""

fastvideo.entrypoints.streaming.FragmentedMP4Chunk dataclass

FragmentedMP4Chunk(kind: Literal['init', 'media'], data: bytes, stream_id: str, segment_idx: int)

A single fMP4 byte chunk emitted by :class:FragmentedMP4Encoder.

kind identifies whether the chunk is the init segment (must be fed into the client's SourceBuffer first) or a media fragment.

fastvideo.entrypoints.streaming.FragmentedMP4Encoder

FragmentedMP4Encoder(*, width: int, height: int, fps: int, segment_idx: int, stream_id: str | None = None, ffmpeg_path: str = 'ffmpeg', preset: str = 'ultrafast', pixel_format_out: str = 'yuv420p', extra_args: list[str] | None = None)

Stream RGB frames in, fMP4 chunks out.

One encoder covers one segment. The server creates a new encoder per :class:`ltx2_segment_start`` boundary so each segment becomes one media fragment the client can append independently.

Example::

encoder = FragmentedMP4Encoder(width=1024, height=576, fps=24,
                                segment_idx=0)
async with encoder:
    async for chunk in encoder.encode(frames):
        await websocket.send_bytes(chunk.data)
Source code in fastvideo/entrypoints/streaming/stream.py
def __init__(
    self,
    *,
    width: int,
    height: int,
    fps: int,
    segment_idx: int,
    stream_id: str | None = None,
    ffmpeg_path: str = "ffmpeg",
    preset: str = "ultrafast",
    pixel_format_out: str = "yuv420p",
    extra_args: list[str] | None = None,
) -> None:
    self.width = width
    self.height = height
    self.fps = fps
    self.segment_idx = segment_idx
    self.stream_id = stream_id or uuid.uuid4().hex
    self._ffmpeg_path = ffmpeg_path
    self._preset = preset
    self._pixel_format_out = pixel_format_out
    self._extra_args = list(extra_args or [])
    self._proc: subprocess.Popen | None = None
    self._init_emitted = False

Functions

fastvideo.entrypoints.streaming.FragmentedMP4Encoder.encode async
encode(frames: list[ndarray] | AsyncIterator[ndarray]) -> AsyncIterator[FragmentedMP4Chunk]

Feed frames into ffmpeg and yield fMP4 chunks as they appear.

Source code in fastvideo/entrypoints/streaming/stream.py
async def encode(
    self,
    frames: list[np.ndarray] | AsyncIterator[np.ndarray],
) -> AsyncIterator[FragmentedMP4Chunk]:
    """Feed frames into ffmpeg and yield fMP4 chunks as they appear."""
    if self._proc is None:
        self._spawn()
    assert self._proc is not None and self._proc.stdin is not None
    proc = self._proc

    loop = asyncio.get_running_loop()

    async def _writer() -> None:
        try:
            if hasattr(frames, "__aiter__"):
                async for frame in frames:  # type: ignore[union-attr]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
            else:
                for frame in frames:  # type: ignore[assignment]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
        finally:
            with contextlib.suppress(BrokenPipeError):
                proc.stdin.close()

    writer_task = asyncio.create_task(_writer())
    try:
        reader = proc.stdout
        assert reader is not None
        # Read in reasonably-sized chunks; MSE tolerates any size
        # but we don't want to starve the event loop.
        chunk_size = 64 * 1024
        while True:
            data = await loop.run_in_executor(None, reader.read, chunk_size)
            if not data:
                break
            kind: Literal["init", "media"] = "init" if not self._init_emitted else "media"
            self._init_emitted = True
            yield FragmentedMP4Chunk(
                kind=kind,
                data=bytes(data),
                stream_id=self.stream_id,
                segment_idx=self.segment_idx,
            )
    finally:
        await writer_task

fastvideo.entrypoints.streaming.InMemoryBlobStore

InMemoryBlobStore()

Bases: BlobStore

Thread-safe in-memory :class:BlobStore for single-process servers.

No eviction policy — callers are responsible for calling :meth:drop when a blob's owning state is replaced or a session ends. A redis- or filesystem-backed :class:BlobStore should replace this when the streaming server lands as a real service (PR 7.5+).

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._blobs: dict[str, _BlobRecord] = {}
    self._lock = threading.Lock()

fastvideo.entrypoints.streaming.InMemorySessionStore

InMemorySessionStore()

Bases: SessionStore

Thread-safe in-memory :class:SessionStore.

Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.

No eviction / TTL / bounded capacity — sessions only leave via :meth:drop. The live streaming server (PR 7.5+) is responsible for bounding growth and for dropping any :class:BlobStore blobs referenced by a state when that state is replaced or a session ends; this class does not know about blobs.

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._sessions: dict[str, ContinuationState] = {}
    self._lock = threading.Lock()

fastvideo.entrypoints.streaming.Session dataclass

Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())

Functions

fastvideo.entrypoints.streaming.Session.transition
transition(target: SessionState) -> None

Move to target if the edge is allowed.

Raises :class:InvalidSessionTransition on illegal moves. The self-loop on ACTIVE is legal so the server can re-assert ACTIVE on segment completion without special casing.

Source code in fastvideo/entrypoints/streaming/session.py
def transition(self, target: SessionState) -> None:
    """Move to ``target`` if the edge is allowed.

    Raises :class:`InvalidSessionTransition` on illegal moves. The
    self-loop on ``ACTIVE`` is legal so the server can re-assert
    ACTIVE on segment completion without special casing.
    """
    allowed = _VALID_TRANSITIONS.get(self.state, frozenset())
    if target not in allowed and target is not self.state:
        raise InvalidSessionTransition(f"{self.state.value} -> {target.value} is not a valid "
                                       f"session transition")
    self.state = target
    self.last_activity = time.monotonic()

fastvideo.entrypoints.streaming.SessionManager

SessionManager(*, segment_cap: int, session_timeout_seconds: int, max_sessions: int = 1)

Registers sessions and enforces per-server session limits.

Source code in fastvideo/entrypoints/streaming/session.py
def __init__(
    self,
    *,
    segment_cap: int,
    session_timeout_seconds: int,
    max_sessions: int = 1,
) -> None:
    self._segment_cap = segment_cap
    self._session_timeout_seconds = session_timeout_seconds
    self._max_sessions = max_sessions
    self._sessions: dict[str, Session] = {}

Functions

fastvideo.entrypoints.streaming.SessionManager.reap_timed_out
reap_timed_out(now: float | None = None) -> list[str]

Return the ids of sessions that have exceeded the idle timeout.

The caller is responsible for actually closing them — this method only identifies dead sessions so the server can emit session_timeout frames before dropping the WebSocket.

TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.

Source code in fastvideo/entrypoints/streaming/session.py
def reap_timed_out(self, now: float | None = None) -> list[str]:
    """Return the ids of sessions that have exceeded the idle timeout.

    The caller is responsible for actually closing them — this
    method only *identifies* dead sessions so the server can emit
    ``session_timeout`` frames before dropping the WebSocket.

    TODO: unused until a background driver calls it. Per-connection
    idle enforcement currently happens via asyncio.wait_for on
    receive_json; this helper catches sessions stuck before any
    receive (e.g. future QUEUED state) and is expected to be wired
    into the GPU-pool reaper.
    """
    now = now if now is not None else time.monotonic()
    dead: list[str] = []
    for sid, session in self._sessions.items():
        if session.state in {
                SessionState.COMPLETE,
                SessionState.ERROR,
                SessionState.TIMEOUT,
                SessionState.REJECTED,
        }:
            continue
        if now - session.last_activity > self._session_timeout_seconds:
            dead.append(sid)
    return dead

fastvideo.entrypoints.streaming.SessionState

Bases: Enum

State-machine positions for a streaming session.

Transitions are server-owned. See docs/design/server_contracts/streaming.md for the full diagram.

fastvideo.entrypoints.streaming.SessionStore

Bases: ABC

Keyed store for per-session continuation state.

Implementations own the session-id → state mapping. The streaming server calls :meth:store after each segment and :meth:snapshot when a client explicitly asks for an exportable state handle.

Functions

fastvideo.entrypoints.streaming.SessionStore.drop abstractmethod
drop(session_id: str) -> None

Forget a session. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, session_id: str) -> None:
    """Forget a session. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.SessionStore.hydrate abstractmethod
hydrate(state: ContinuationState, *, session_id: str | None = None) -> str

Install state as the starting point for a session.

When session_id is None the store allocates a fresh id (UUID4); when provided the store uses it verbatim, overwriting any prior state at that id.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def hydrate(
    self,
    state: ContinuationState,
    *,
    session_id: str | None = None,
) -> str:
    """Install ``state`` as the starting point for a session.

    When ``session_id`` is ``None`` the store allocates a fresh id
    (UUID4); when provided the store uses it verbatim, overwriting
    any prior state at that id.
    """
fastvideo.entrypoints.streaming.SessionStore.snapshot abstractmethod
snapshot(session_id: str) -> ContinuationState | None

Return the current state for session_id (or None).

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def snapshot(self, session_id: str) -> ContinuationState | None:
    """Return the current state for ``session_id`` (or ``None``)."""
fastvideo.entrypoints.streaming.SessionStore.store abstractmethod
store(session_id: str, state: ContinuationState) -> None

Persist state for session_id, replacing any prior value.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def store(self, session_id: str, state: ContinuationState) -> None:
    """Persist ``state`` for ``session_id``, replacing any prior value."""

Functions

fastvideo.entrypoints.streaming.build_app

build_app(serve_config: ServeConfig, generator: _GeneratorProto, *, session_store: SessionStore | None = None) -> FastAPI

Build the FastAPI app used by :func:run_server.

Exposed so tests can drive the WebSocket endpoint in-process via starlette.testclient.TestClient(app).websocket_connect(...).

Source code in fastvideo/entrypoints/streaming/server.py
def build_app(
    serve_config: ServeConfig,
    generator: _GeneratorProto,
    *,
    session_store: SessionStore | None = None,
) -> FastAPI:
    """Build the FastAPI app used by :func:`run_server`.

    Exposed so tests can drive the WebSocket endpoint in-process via
    ``starlette.testclient.TestClient(app).websocket_connect(...)``.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming "
                         "server; got None. Add a `streaming:` block to your serve config.")

    sessions = SessionManager(
        segment_cap=serve_config.streaming.generation_segment_cap,
        session_timeout_seconds=serve_config.streaming.session_timeout_seconds,
    )
    state = ServerState(
        serve_config=serve_config,
        generator=generator,
        sessions=sessions,
        session_store=session_store or InMemorySessionStore(),
    )

    app = FastAPI(title="FastVideo Streaming")

    @app.get("/health")
    async def _health() -> JSONResponse:
        return JSONResponse({
            "status": "ok",
            "sessions": len(state.sessions),
            "stream_mode": state.serve_config.streaming.stream_mode,
        })

    @app.websocket("/v1/stream")
    async def _stream(websocket: WebSocket) -> None:
        await websocket.accept()
        try:
            session = state.sessions.create()
        except SessionRejected as exc:
            await _send_error(websocket, "session_rejected", str(exc), retryable=False)
            await websocket.close(code=_WS_CLOSE_TRY_AGAIN_LATER, reason="session_rejected")
            return

        try:
            await _handle_session(websocket, session, state)
        except WebSocketDisconnect:
            logger.info("session %s: client disconnected", session.id[:8])
        except Exception:  # pragma: no cover - defensive catch-all
            logger.exception("session %s: unhandled error", session.id[:8])
            with contextlib.suppress(InvalidSessionTransition):
                session.transition(SessionState.ERROR)
        finally:
            _cleanup_session(session, state)

    app.state.server_state = state
    return app

fastvideo.entrypoints.streaming.run_server

run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None

Launch the streaming server.

Boots a :class:fastvideo.VideoGenerator from serve_config.generator unless generator is provided, then serves build_app(...) via uvicorn.

Source code in fastvideo/entrypoints/streaming/server.py
def run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None:
    """Launch the streaming server.

    Boots a :class:`fastvideo.VideoGenerator` from
    ``serve_config.generator`` unless ``generator`` is provided, then
    serves ``build_app(...)`` via uvicorn.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming server; "
                         "got None. Add a `streaming:` block to your serve config.")

    import uvicorn

    if generator is None:
        from fastvideo import VideoGenerator  # lazy to avoid boot cost

        generator = VideoGenerator.from_pretrained(config=serve_config.generator)
    app = build_app(serve_config, generator)
    uvicorn.run(
        app,
        host=serve_config.server.host,
        port=serve_config.server.port,
    )

Modules

fastvideo.entrypoints.streaming.protocol

JSON WebSocket protocol schemas for the streaming server.

Every control message shares the envelope {"type": <str>, ...}. Pydantic models live here so the server can parse / validate incoming frames and emit well-typed outgoing frames without hand-rolled dicts.

The message catalogue matches the contract in docs/design/server_contracts/streaming.md; additions must land in both places in the same PR.

Classes

fastvideo.entrypoints.streaming.protocol.ContinuationStateSnapshot

Bases: BaseModel

Attributes
fastvideo.entrypoints.streaming.protocol.ContinuationStateSnapshot.state instance-attribute
state: dict[str, Any]

{kind, payload} dict matching :class:fastvideo.api.ContinuationState.

fastvideo.entrypoints.streaming.protocol.MediaInit

Bases: BaseModel

Descriptor for the fMP4 initialization segment that follows.

fastvideo.entrypoints.streaming.protocol.SegmentPromptSource

Bases: BaseModel

Request a new segment using a specific prompt.

fastvideo.entrypoints.streaming.protocol.SessionInitV2

Bases: BaseModel

Opening frame the client sends after the WebSocket handshake.

Attributes
fastvideo.entrypoints.streaming.protocol.SessionInitV2.continuation_state class-attribute instance-attribute
continuation_state: dict[str, Any] | None = None

Optional {kind, payload} dict; hydrated into :class:fastvideo.api.ContinuationState server-side.

fastvideo.entrypoints.streaming.protocol.SnapshotState

Bases: BaseModel

Request the current ContinuationState for export.

Functions

fastvideo.entrypoints.streaming.protocol.parse_client_message
parse_client_message(raw: dict[str, Any]) -> ClientMessage

Parse an incoming WebSocket dict into a typed client message.

Unknown type values raise :class:pydantic.ValidationError; the server handler turns that into an error frame with code="invalid_message".

Source code in fastvideo/entrypoints/streaming/protocol.py
def parse_client_message(raw: dict[str, Any]) -> ClientMessage:
    """Parse an incoming WebSocket dict into a typed client message.

    Unknown ``type`` values raise :class:`pydantic.ValidationError`; the
    server handler turns that into an ``error`` frame with
    ``code="invalid_message"``.
    """
    from pydantic import TypeAdapter

    return TypeAdapter(ClientMessage).validate_python(raw)

fastvideo.entrypoints.streaming.server

Single-generator FastAPI + WebSocket streaming server.

Classes

Functions

fastvideo.entrypoints.streaming.server.build_app
build_app(serve_config: ServeConfig, generator: _GeneratorProto, *, session_store: SessionStore | None = None) -> FastAPI

Build the FastAPI app used by :func:run_server.

Exposed so tests can drive the WebSocket endpoint in-process via starlette.testclient.TestClient(app).websocket_connect(...).

Source code in fastvideo/entrypoints/streaming/server.py
def build_app(
    serve_config: ServeConfig,
    generator: _GeneratorProto,
    *,
    session_store: SessionStore | None = None,
) -> FastAPI:
    """Build the FastAPI app used by :func:`run_server`.

    Exposed so tests can drive the WebSocket endpoint in-process via
    ``starlette.testclient.TestClient(app).websocket_connect(...)``.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming "
                         "server; got None. Add a `streaming:` block to your serve config.")

    sessions = SessionManager(
        segment_cap=serve_config.streaming.generation_segment_cap,
        session_timeout_seconds=serve_config.streaming.session_timeout_seconds,
    )
    state = ServerState(
        serve_config=serve_config,
        generator=generator,
        sessions=sessions,
        session_store=session_store or InMemorySessionStore(),
    )

    app = FastAPI(title="FastVideo Streaming")

    @app.get("/health")
    async def _health() -> JSONResponse:
        return JSONResponse({
            "status": "ok",
            "sessions": len(state.sessions),
            "stream_mode": state.serve_config.streaming.stream_mode,
        })

    @app.websocket("/v1/stream")
    async def _stream(websocket: WebSocket) -> None:
        await websocket.accept()
        try:
            session = state.sessions.create()
        except SessionRejected as exc:
            await _send_error(websocket, "session_rejected", str(exc), retryable=False)
            await websocket.close(code=_WS_CLOSE_TRY_AGAIN_LATER, reason="session_rejected")
            return

        try:
            await _handle_session(websocket, session, state)
        except WebSocketDisconnect:
            logger.info("session %s: client disconnected", session.id[:8])
        except Exception:  # pragma: no cover - defensive catch-all
            logger.exception("session %s: unhandled error", session.id[:8])
            with contextlib.suppress(InvalidSessionTransition):
                session.transition(SessionState.ERROR)
        finally:
            _cleanup_session(session, state)

    app.state.server_state = state
    return app
fastvideo.entrypoints.streaming.server.run_server
run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None

Launch the streaming server.

Boots a :class:fastvideo.VideoGenerator from serve_config.generator unless generator is provided, then serves build_app(...) via uvicorn.

Source code in fastvideo/entrypoints/streaming/server.py
def run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None:
    """Launch the streaming server.

    Boots a :class:`fastvideo.VideoGenerator` from
    ``serve_config.generator`` unless ``generator`` is provided, then
    serves ``build_app(...)`` via uvicorn.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming server; "
                         "got None. Add a `streaming:` block to your serve config.")

    import uvicorn

    if generator is None:
        from fastvideo import VideoGenerator  # lazy to avoid boot cost

        generator = VideoGenerator.from_pretrained(config=serve_config.generator)
    app = build_app(serve_config, generator)
    uvicorn.run(
        app,
        host=serve_config.server.host,
        port=serve_config.server.port,
    )

fastvideo.entrypoints.streaming.session

Per-connection session lifecycle for the streaming server.

Each WebSocket opens exactly one :class:Session. :class:SessionManager enforces the generation_segment_cap and session_timeout_seconds budgets from :class:fastvideo.api.StreamingConfig.

Classes

fastvideo.entrypoints.streaming.session.InvalidSessionTransition

Bases: RuntimeError

Raised when a session is asked to transition along an illegal edge.

fastvideo.entrypoints.streaming.session.Session dataclass
Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())
Functions
fastvideo.entrypoints.streaming.session.Session.transition
transition(target: SessionState) -> None

Move to target if the edge is allowed.

Raises :class:InvalidSessionTransition on illegal moves. The self-loop on ACTIVE is legal so the server can re-assert ACTIVE on segment completion without special casing.

Source code in fastvideo/entrypoints/streaming/session.py
def transition(self, target: SessionState) -> None:
    """Move to ``target`` if the edge is allowed.

    Raises :class:`InvalidSessionTransition` on illegal moves. The
    self-loop on ``ACTIVE`` is legal so the server can re-assert
    ACTIVE on segment completion without special casing.
    """
    allowed = _VALID_TRANSITIONS.get(self.state, frozenset())
    if target not in allowed and target is not self.state:
        raise InvalidSessionTransition(f"{self.state.value} -> {target.value} is not a valid "
                                       f"session transition")
    self.state = target
    self.last_activity = time.monotonic()
fastvideo.entrypoints.streaming.session.SessionManager
SessionManager(*, segment_cap: int, session_timeout_seconds: int, max_sessions: int = 1)

Registers sessions and enforces per-server session limits.

Source code in fastvideo/entrypoints/streaming/session.py
def __init__(
    self,
    *,
    segment_cap: int,
    session_timeout_seconds: int,
    max_sessions: int = 1,
) -> None:
    self._segment_cap = segment_cap
    self._session_timeout_seconds = session_timeout_seconds
    self._max_sessions = max_sessions
    self._sessions: dict[str, Session] = {}
Functions
fastvideo.entrypoints.streaming.session.SessionManager.reap_timed_out
reap_timed_out(now: float | None = None) -> list[str]

Return the ids of sessions that have exceeded the idle timeout.

The caller is responsible for actually closing them — this method only identifies dead sessions so the server can emit session_timeout frames before dropping the WebSocket.

TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.

Source code in fastvideo/entrypoints/streaming/session.py
def reap_timed_out(self, now: float | None = None) -> list[str]:
    """Return the ids of sessions that have exceeded the idle timeout.

    The caller is responsible for actually closing them — this
    method only *identifies* dead sessions so the server can emit
    ``session_timeout`` frames before dropping the WebSocket.

    TODO: unused until a background driver calls it. Per-connection
    idle enforcement currently happens via asyncio.wait_for on
    receive_json; this helper catches sessions stuck before any
    receive (e.g. future QUEUED state) and is expected to be wired
    into the GPU-pool reaper.
    """
    now = now if now is not None else time.monotonic()
    dead: list[str] = []
    for sid, session in self._sessions.items():
        if session.state in {
                SessionState.COMPLETE,
                SessionState.ERROR,
                SessionState.TIMEOUT,
                SessionState.REJECTED,
        }:
            continue
        if now - session.last_activity > self._session_timeout_seconds:
            dead.append(sid)
    return dead
fastvideo.entrypoints.streaming.session.SessionRejected

Bases: RuntimeError

Raised when session creation fails (queue full, auth, etc.).

fastvideo.entrypoints.streaming.session.SessionState

Bases: Enum

State-machine positions for a streaming session.

Transitions are server-owned. See docs/design/server_contracts/streaming.md for the full diagram.

fastvideo.entrypoints.streaming.session_init_image

Persist the initial-image blob attached to a streaming session.

Classes

fastvideo.entrypoints.streaming.session_init_image.SessionInitImage dataclass
SessionInitImage(path: str, display_name: str, mime: str)

Location of the persisted init image.

Callers pass path to InputConfig.image_path; display_name is only used for logs.

Functions

fastvideo.entrypoints.streaming.session_init_image.persist_session_init_image
persist_session_init_image(payload: Any, *, output_dir: str | None = None) -> SessionInitImage | None

Decode a client init-image blob and persist it to disk.

payload shape (matches the internal UI protocol)::

{
    "mime": "image/png",
    "name": "ref.png",
    "data": "<base64 bytes>",
}

Returns None when payload is falsy (no init image). Raises :class:ValueError on schema / size / decode errors so the caller can surface a user-facing error frame.

Source code in fastvideo/entrypoints/streaming/session_init_image.py
def persist_session_init_image(
    payload: Any,
    *,
    output_dir: str | None = None,
) -> SessionInitImage | None:
    """Decode a client init-image blob and persist it to disk.

    ``payload`` shape (matches the internal UI protocol)::

        {
            "mime": "image/png",
            "name": "ref.png",
            "data": "<base64 bytes>",
        }

    Returns ``None`` when ``payload`` is falsy (no init image). Raises
    :class:`ValueError` on schema / size / decode errors so the caller
    can surface a user-facing ``error`` frame.
    """
    if not payload:
        return None
    if not isinstance(payload, dict):
        raise ValueError("session init image must be an object")

    mime = payload.get("mime")
    if mime not in _ACCEPTED_MIMES:
        raise ValueError(f"session init image mime {mime!r} is not one of "
                         f"{sorted(_ACCEPTED_MIMES)}")
    data_b64 = payload.get("data")
    if not isinstance(data_b64, str):
        raise ValueError("session init image data must be a base64 string")
    try:
        data = base64.b64decode(data_b64, validate=True)
    except (binascii.Error, ValueError) as exc:
        raise ValueError(f"session init image data is not valid base64: {exc}") from exc
    if len(data) > _MAX_IMAGE_BYTES:
        raise ValueError(f"session init image is {len(data)} bytes; limit is "
                         f"{_MAX_IMAGE_BYTES}")
    if len(data) == 0:
        raise ValueError("session init image data is empty")

    ext = _ACCEPTED_MIMES[mime]
    display_name = _sanitize_display_name(payload.get("name")) or f"init{ext}"
    fd, path = tempfile.mkstemp(prefix="fastvideo-init-", suffix=ext, dir=output_dir)
    try:
        with os.fdopen(fd, "wb") as f:
            f.write(data)
    except Exception:
        with contextlib.suppress(FileNotFoundError):
            os.unlink(path)
        raise
    return SessionInitImage(path=path, display_name=display_name, mime=mime)

fastvideo.entrypoints.streaming.session_store

Session state store for the FastVideo streaming server.

The streaming server keeps continuation state (decoded frames + audio latents from the previous segment) server-side so the client doesn't re-upload multi-megabyte tensors each WebSocket message. Two operations are needed:

  • snapshot(session_id) -> ContinuationState — serialize the current state so it can be exported (e.g. over HTTP) or migrated to a different server.
  • hydrate(state) -> session_id — load a previously serialized state into a new session (for resume-after-disconnect flows).

The store is an ABC with an :class:InMemorySessionStore default; Redis or other backends can drop in without touching the pipeline.

Large tensor payloads (video frames, audio latents) are kept out of the JSON payload via an accompanying :class:BlobStore. Both stores share a process today; they are separate types so that a future implementation can put blobs on S3 while keeping session metadata in Redis.

Classes

fastvideo.entrypoints.streaming.session_store.BlobStore

Bases: ABC

Opaque byte-blob storage keyed by id.

A :class:ContinuationState payload can reference large tensors stored in a :class:BlobStore rather than inlining them, so the JSON payload stays small when the state travels over the wire.

Functions
fastvideo.entrypoints.streaming.session_store.BlobStore.drop abstractmethod
drop(blob_id: str) -> None

Remove a blob. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, blob_id: str) -> None:
    """Remove a blob. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.session_store.BlobStore.get abstractmethod
get(blob_id: str) -> bytes

Load a previously stored blob. Raises KeyError if absent.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def get(self, blob_id: str) -> bytes:
    """Load a previously stored blob. Raises ``KeyError`` if absent."""
fastvideo.entrypoints.streaming.session_store.BlobStore.put abstractmethod
put(data: bytes, *, mime: str = 'application/octet-stream') -> str

Store data and return a blob id for later retrieval.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def put(self, data: bytes, *, mime: str = "application/octet-stream") -> str:
    """Store ``data`` and return a blob id for later retrieval."""
fastvideo.entrypoints.streaming.session_store.InMemoryBlobStore
InMemoryBlobStore()

Bases: BlobStore

Thread-safe in-memory :class:BlobStore for single-process servers.

No eviction policy — callers are responsible for calling :meth:drop when a blob's owning state is replaced or a session ends. A redis- or filesystem-backed :class:BlobStore should replace this when the streaming server lands as a real service (PR 7.5+).

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._blobs: dict[str, _BlobRecord] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.session_store.InMemorySessionStore
InMemorySessionStore()

Bases: SessionStore

Thread-safe in-memory :class:SessionStore.

Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.

No eviction / TTL / bounded capacity — sessions only leave via :meth:drop. The live streaming server (PR 7.5+) is responsible for bounding growth and for dropping any :class:BlobStore blobs referenced by a state when that state is replaced or a session ends; this class does not know about blobs.

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._sessions: dict[str, ContinuationState] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.session_store.SessionStore

Bases: ABC

Keyed store for per-session continuation state.

Implementations own the session-id → state mapping. The streaming server calls :meth:store after each segment and :meth:snapshot when a client explicitly asks for an exportable state handle.

Functions
fastvideo.entrypoints.streaming.session_store.SessionStore.drop abstractmethod
drop(session_id: str) -> None

Forget a session. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, session_id: str) -> None:
    """Forget a session. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.session_store.SessionStore.hydrate abstractmethod
hydrate(state: ContinuationState, *, session_id: str | None = None) -> str

Install state as the starting point for a session.

When session_id is None the store allocates a fresh id (UUID4); when provided the store uses it verbatim, overwriting any prior state at that id.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def hydrate(
    self,
    state: ContinuationState,
    *,
    session_id: str | None = None,
) -> str:
    """Install ``state`` as the starting point for a session.

    When ``session_id`` is ``None`` the store allocates a fresh id
    (UUID4); when provided the store uses it verbatim, overwriting
    any prior state at that id.
    """
fastvideo.entrypoints.streaming.session_store.SessionStore.snapshot abstractmethod
snapshot(session_id: str) -> ContinuationState | None

Return the current state for session_id (or None).

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def snapshot(self, session_id: str) -> ContinuationState | None:
    """Return the current state for ``session_id`` (or ``None``)."""
fastvideo.entrypoints.streaming.session_store.SessionStore.store abstractmethod
store(session_id: str, state: ContinuationState) -> None

Persist state for session_id, replacing any prior value.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def store(self, session_id: str, state: ContinuationState) -> None:
    """Persist ``state`` for ``session_id``, replacing any prior value."""

fastvideo.entrypoints.streaming.stream

fMP4 stream encoder used by the streaming server.

The client's Media Source Extensions player needs a continuous fMP4 byte stream: first an initialization segment (ftyp + moov), then one or more media segments (moof + mdat). We pipe raw RGB frames into an ffmpeg subprocess configured for fragmented output via -movflags empty_moov+default_base_moof+frag_keyframe+faststart and stream the bytes back out.

Classes

fastvideo.entrypoints.streaming.stream.FragmentedMP4Chunk dataclass
FragmentedMP4Chunk(kind: Literal['init', 'media'], data: bytes, stream_id: str, segment_idx: int)

A single fMP4 byte chunk emitted by :class:FragmentedMP4Encoder.

kind identifies whether the chunk is the init segment (must be fed into the client's SourceBuffer first) or a media fragment.

fastvideo.entrypoints.streaming.stream.FragmentedMP4Encoder
FragmentedMP4Encoder(*, width: int, height: int, fps: int, segment_idx: int, stream_id: str | None = None, ffmpeg_path: str = 'ffmpeg', preset: str = 'ultrafast', pixel_format_out: str = 'yuv420p', extra_args: list[str] | None = None)

Stream RGB frames in, fMP4 chunks out.

One encoder covers one segment. The server creates a new encoder per :class:`ltx2_segment_start`` boundary so each segment becomes one media fragment the client can append independently.

Example::

encoder = FragmentedMP4Encoder(width=1024, height=576, fps=24,
                                segment_idx=0)
async with encoder:
    async for chunk in encoder.encode(frames):
        await websocket.send_bytes(chunk.data)
Source code in fastvideo/entrypoints/streaming/stream.py
def __init__(
    self,
    *,
    width: int,
    height: int,
    fps: int,
    segment_idx: int,
    stream_id: str | None = None,
    ffmpeg_path: str = "ffmpeg",
    preset: str = "ultrafast",
    pixel_format_out: str = "yuv420p",
    extra_args: list[str] | None = None,
) -> None:
    self.width = width
    self.height = height
    self.fps = fps
    self.segment_idx = segment_idx
    self.stream_id = stream_id or uuid.uuid4().hex
    self._ffmpeg_path = ffmpeg_path
    self._preset = preset
    self._pixel_format_out = pixel_format_out
    self._extra_args = list(extra_args or [])
    self._proc: subprocess.Popen | None = None
    self._init_emitted = False
Functions
fastvideo.entrypoints.streaming.stream.FragmentedMP4Encoder.encode async
encode(frames: list[ndarray] | AsyncIterator[ndarray]) -> AsyncIterator[FragmentedMP4Chunk]

Feed frames into ffmpeg and yield fMP4 chunks as they appear.

Source code in fastvideo/entrypoints/streaming/stream.py
async def encode(
    self,
    frames: list[np.ndarray] | AsyncIterator[np.ndarray],
) -> AsyncIterator[FragmentedMP4Chunk]:
    """Feed frames into ffmpeg and yield fMP4 chunks as they appear."""
    if self._proc is None:
        self._spawn()
    assert self._proc is not None and self._proc.stdin is not None
    proc = self._proc

    loop = asyncio.get_running_loop()

    async def _writer() -> None:
        try:
            if hasattr(frames, "__aiter__"):
                async for frame in frames:  # type: ignore[union-attr]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
            else:
                for frame in frames:  # type: ignore[assignment]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
        finally:
            with contextlib.suppress(BrokenPipeError):
                proc.stdin.close()

    writer_task = asyncio.create_task(_writer())
    try:
        reader = proc.stdout
        assert reader is not None
        # Read in reasonably-sized chunks; MSE tolerates any size
        # but we don't want to starve the event loop.
        chunk_size = 64 * 1024
        while True:
            data = await loop.run_in_executor(None, reader.read, chunk_size)
            if not data:
                break
            kind: Literal["init", "media"] = "init" if not self._init_emitted else "media"
            self._init_emitted = True
            yield FragmentedMP4Chunk(
                kind=kind,
                data=bytes(data),
                stream_id=self.stream_id,
                segment_idx=self.segment_idx,
            )
    finally:
        await writer_task