streaming
¶
Classes¶
fastvideo.entrypoints.streaming.BlobStore
¶
Bases: ABC
Opaque byte-blob storage keyed by id.
A :class:ContinuationState payload can reference large tensors
stored in a :class:BlobStore rather than inlining them, so the
JSON payload stays small when the state travels over the wire.
fastvideo.entrypoints.streaming.FragmentedMP4Chunk
dataclass
¶
A single fMP4 byte chunk emitted by :class:FragmentedMP4Encoder.
kind identifies whether the chunk is the init segment (must be
fed into the client's SourceBuffer first) or a media fragment.
fastvideo.entrypoints.streaming.FragmentedMP4Encoder
¶
FragmentedMP4Encoder(*, width: int, height: int, fps: int, segment_idx: int, stream_id: str | None = None, ffmpeg_path: str = 'ffmpeg', preset: str = 'ultrafast', pixel_format_out: str = 'yuv420p', extra_args: list[str] | None = None)
Stream RGB frames in, fMP4 chunks out.
One encoder covers one segment. The server creates a new encoder per :class:`ltx2_segment_start`` boundary so each segment becomes one media fragment the client can append independently.
Example::
encoder = FragmentedMP4Encoder(width=1024, height=576, fps=24,
segment_idx=0)
async with encoder:
async for chunk in encoder.encode(frames):
await websocket.send_bytes(chunk.data)
Source code in fastvideo/entrypoints/streaming/stream.py
Functions¶
fastvideo.entrypoints.streaming.FragmentedMP4Encoder.encode
async
¶
encode(frames: list[ndarray] | AsyncIterator[ndarray]) -> AsyncIterator[FragmentedMP4Chunk]
Feed frames into ffmpeg and yield fMP4 chunks as they appear.
Source code in fastvideo/entrypoints/streaming/stream.py
fastvideo.entrypoints.streaming.InMemoryBlobStore
¶
Bases: BlobStore
Thread-safe in-memory :class:BlobStore for single-process servers.
No eviction policy — callers are responsible for calling
:meth:drop when a blob's owning state is replaced or a session
ends. A redis- or filesystem-backed :class:BlobStore should
replace this when the streaming server lands as a real service
(PR 7.5+).
Source code in fastvideo/entrypoints/streaming/session_store.py
fastvideo.entrypoints.streaming.InMemorySessionStore
¶
Bases: SessionStore
Thread-safe in-memory :class:SessionStore.
Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.
No eviction / TTL / bounded capacity — sessions only leave via
:meth:drop. The live streaming server (PR 7.5+) is responsible
for bounding growth and for dropping any :class:BlobStore blobs
referenced by a state when that state is replaced or a session
ends; this class does not know about blobs.
Source code in fastvideo/entrypoints/streaming/session_store.py
fastvideo.entrypoints.streaming.Session
dataclass
¶
Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())
Functions¶
fastvideo.entrypoints.streaming.Session.transition
¶
transition(target: SessionState) -> None
Move to target if the edge is allowed.
Raises :class:InvalidSessionTransition on illegal moves. The
self-loop on ACTIVE is legal so the server can re-assert
ACTIVE on segment completion without special casing.
Source code in fastvideo/entrypoints/streaming/session.py
fastvideo.entrypoints.streaming.SessionManager
¶
Registers sessions and enforces per-server session limits.
Source code in fastvideo/entrypoints/streaming/session.py
Functions¶
fastvideo.entrypoints.streaming.SessionManager.reap_timed_out
¶
Return the ids of sessions that have exceeded the idle timeout.
The caller is responsible for actually closing them — this
method only identifies dead sessions so the server can emit
session_timeout frames before dropping the WebSocket.
TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.
Source code in fastvideo/entrypoints/streaming/session.py
fastvideo.entrypoints.streaming.SessionState
¶
Bases: Enum
State-machine positions for a streaming session.
Transitions are server-owned. See
docs/design/server_contracts/streaming.md for the full diagram.
fastvideo.entrypoints.streaming.SessionStore
¶
Bases: ABC
Keyed store for per-session continuation state.
Implementations own the session-id → state mapping. The streaming
server calls :meth:store after each segment and :meth:snapshot
when a client explicitly asks for an exportable state handle.
Functions¶
fastvideo.entrypoints.streaming.SessionStore.hydrate
abstractmethod
¶
Install state as the starting point for a session.
When session_id is None the store allocates a fresh id
(UUID4); when provided the store uses it verbatim, overwriting
any prior state at that id.
Source code in fastvideo/entrypoints/streaming/session_store.py
fastvideo.entrypoints.streaming.SessionStore.snapshot
abstractmethod
¶
snapshot(session_id: str) -> ContinuationState | None
Functions¶
fastvideo.entrypoints.streaming.build_app
¶
build_app(serve_config: ServeConfig, generator: _GeneratorProto, *, session_store: SessionStore | None = None) -> FastAPI
Build the FastAPI app used by :func:run_server.
Exposed so tests can drive the WebSocket endpoint in-process via
starlette.testclient.TestClient(app).websocket_connect(...).
Source code in fastvideo/entrypoints/streaming/server.py
fastvideo.entrypoints.streaming.run_server
¶
run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None
Launch the streaming server.
Boots a :class:fastvideo.VideoGenerator from
serve_config.generator unless generator is provided, then
serves build_app(...) via uvicorn.
Source code in fastvideo/entrypoints/streaming/server.py
Modules¶
fastvideo.entrypoints.streaming.protocol
¶
JSON WebSocket protocol schemas for the streaming server.
Every control message shares the envelope {"type": <str>, ...}.
Pydantic models live here so the server can parse / validate incoming
frames and emit well-typed outgoing frames without hand-rolled dicts.
The message catalogue matches the contract in
docs/design/server_contracts/streaming.md; additions must land in
both places in the same PR.
Classes¶
fastvideo.entrypoints.streaming.protocol.ContinuationStateSnapshot
¶
fastvideo.entrypoints.streaming.protocol.MediaInit
¶
Bases: BaseModel
Descriptor for the fMP4 initialization segment that follows.
fastvideo.entrypoints.streaming.protocol.SegmentPromptSource
¶
Bases: BaseModel
Request a new segment using a specific prompt.
fastvideo.entrypoints.streaming.protocol.SessionInitV2
¶
Bases: BaseModel
Opening frame the client sends after the WebSocket handshake.
fastvideo.entrypoints.streaming.protocol.SnapshotState
¶
Bases: BaseModel
Request the current ContinuationState for export.
Functions¶
fastvideo.entrypoints.streaming.protocol.parse_client_message
¶
Parse an incoming WebSocket dict into a typed client message.
Unknown type values raise :class:pydantic.ValidationError; the
server handler turns that into an error frame with
code="invalid_message".
Source code in fastvideo/entrypoints/streaming/protocol.py
fastvideo.entrypoints.streaming.server
¶
Single-generator FastAPI + WebSocket streaming server.
Classes¶
Functions¶
fastvideo.entrypoints.streaming.server.build_app
¶
build_app(serve_config: ServeConfig, generator: _GeneratorProto, *, session_store: SessionStore | None = None) -> FastAPI
Build the FastAPI app used by :func:run_server.
Exposed so tests can drive the WebSocket endpoint in-process via
starlette.testclient.TestClient(app).websocket_connect(...).
Source code in fastvideo/entrypoints/streaming/server.py
fastvideo.entrypoints.streaming.server.run_server
¶
run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None
Launch the streaming server.
Boots a :class:fastvideo.VideoGenerator from
serve_config.generator unless generator is provided, then
serves build_app(...) via uvicorn.
Source code in fastvideo/entrypoints/streaming/server.py
fastvideo.entrypoints.streaming.session
¶
Per-connection session lifecycle for the streaming server.
Each WebSocket opens exactly one :class:Session. :class:SessionManager
enforces the generation_segment_cap and session_timeout_seconds
budgets from :class:fastvideo.api.StreamingConfig.
Classes¶
fastvideo.entrypoints.streaming.session.InvalidSessionTransition
¶
Bases: RuntimeError
Raised when a session is asked to transition along an illegal edge.
fastvideo.entrypoints.streaming.session.Session
dataclass
¶
Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())
Functions¶
fastvideo.entrypoints.streaming.session.Session.transition
¶transition(target: SessionState) -> None
Move to target if the edge is allowed.
Raises :class:InvalidSessionTransition on illegal moves. The
self-loop on ACTIVE is legal so the server can re-assert
ACTIVE on segment completion without special casing.
Source code in fastvideo/entrypoints/streaming/session.py
fastvideo.entrypoints.streaming.session.SessionManager
¶
Registers sessions and enforces per-server session limits.
Source code in fastvideo/entrypoints/streaming/session.py
Functions¶
fastvideo.entrypoints.streaming.session.SessionManager.reap_timed_out
¶Return the ids of sessions that have exceeded the idle timeout.
The caller is responsible for actually closing them — this
method only identifies dead sessions so the server can emit
session_timeout frames before dropping the WebSocket.
TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.
Source code in fastvideo/entrypoints/streaming/session.py
fastvideo.entrypoints.streaming.session.SessionRejected
¶
Bases: RuntimeError
Raised when session creation fails (queue full, auth, etc.).
fastvideo.entrypoints.streaming.session_init_image
¶
Persist the initial-image blob attached to a streaming session.
Classes¶
fastvideo.entrypoints.streaming.session_init_image.SessionInitImage
dataclass
¶
Location of the persisted init image.
Callers pass path to InputConfig.image_path; display_name
is only used for logs.
Functions¶
fastvideo.entrypoints.streaming.session_init_image.persist_session_init_image
¶
persist_session_init_image(payload: Any, *, output_dir: str | None = None) -> SessionInitImage | None
Decode a client init-image blob and persist it to disk.
payload shape (matches the internal UI protocol)::
{
"mime": "image/png",
"name": "ref.png",
"data": "<base64 bytes>",
}
Returns None when payload is falsy (no init image). Raises
:class:ValueError on schema / size / decode errors so the caller
can surface a user-facing error frame.
Source code in fastvideo/entrypoints/streaming/session_init_image.py
fastvideo.entrypoints.streaming.session_store
¶
Session state store for the FastVideo streaming server.
The streaming server keeps continuation state (decoded frames + audio latents from the previous segment) server-side so the client doesn't re-upload multi-megabyte tensors each WebSocket message. Two operations are needed:
snapshot(session_id) -> ContinuationState— serialize the current state so it can be exported (e.g. over HTTP) or migrated to a different server.hydrate(state) -> session_id— load a previously serialized state into a new session (for resume-after-disconnect flows).
The store is an ABC with an :class:InMemorySessionStore default; Redis
or other backends can drop in without touching the pipeline.
Large tensor payloads (video frames, audio latents) are kept out of the
JSON payload via an accompanying :class:BlobStore. Both stores share a
process today; they are separate types so that a future implementation
can put blobs on S3 while keeping session metadata in Redis.
Classes¶
fastvideo.entrypoints.streaming.session_store.BlobStore
¶
Bases: ABC
Opaque byte-blob storage keyed by id.
A :class:ContinuationState payload can reference large tensors
stored in a :class:BlobStore rather than inlining them, so the
JSON payload stays small when the state travels over the wire.
fastvideo.entrypoints.streaming.session_store.InMemoryBlobStore
¶
Bases: BlobStore
Thread-safe in-memory :class:BlobStore for single-process servers.
No eviction policy — callers are responsible for calling
:meth:drop when a blob's owning state is replaced or a session
ends. A redis- or filesystem-backed :class:BlobStore should
replace this when the streaming server lands as a real service
(PR 7.5+).
Source code in fastvideo/entrypoints/streaming/session_store.py
fastvideo.entrypoints.streaming.session_store.InMemorySessionStore
¶
Bases: SessionStore
Thread-safe in-memory :class:SessionStore.
Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.
No eviction / TTL / bounded capacity — sessions only leave via
:meth:drop. The live streaming server (PR 7.5+) is responsible
for bounding growth and for dropping any :class:BlobStore blobs
referenced by a state when that state is replaced or a session
ends; this class does not know about blobs.
Source code in fastvideo/entrypoints/streaming/session_store.py
fastvideo.entrypoints.streaming.session_store.SessionStore
¶
Bases: ABC
Keyed store for per-session continuation state.
Implementations own the session-id → state mapping. The streaming
server calls :meth:store after each segment and :meth:snapshot
when a client explicitly asks for an exportable state handle.
Functions¶
fastvideo.entrypoints.streaming.session_store.SessionStore.drop
abstractmethod
¶drop(session_id: str) -> None
fastvideo.entrypoints.streaming.session_store.SessionStore.hydrate
abstractmethod
¶Install state as the starting point for a session.
When session_id is None the store allocates a fresh id
(UUID4); when provided the store uses it verbatim, overwriting
any prior state at that id.
Source code in fastvideo/entrypoints/streaming/session_store.py
fastvideo.entrypoints.streaming.session_store.SessionStore.snapshot
abstractmethod
¶snapshot(session_id: str) -> ContinuationState | None
fastvideo.entrypoints.streaming.stream
¶
fMP4 stream encoder used by the streaming server.
The client's Media Source Extensions player needs a continuous fMP4
byte stream: first an initialization segment (ftyp + moov),
then one or more media segments (moof + mdat). We pipe raw
RGB frames into an ffmpeg subprocess configured for fragmented output
via -movflags empty_moov+default_base_moof+frag_keyframe+faststart
and stream the bytes back out.
Classes¶
fastvideo.entrypoints.streaming.stream.FragmentedMP4Chunk
dataclass
¶
A single fMP4 byte chunk emitted by :class:FragmentedMP4Encoder.
kind identifies whether the chunk is the init segment (must be
fed into the client's SourceBuffer first) or a media fragment.
fastvideo.entrypoints.streaming.stream.FragmentedMP4Encoder
¶
FragmentedMP4Encoder(*, width: int, height: int, fps: int, segment_idx: int, stream_id: str | None = None, ffmpeg_path: str = 'ffmpeg', preset: str = 'ultrafast', pixel_format_out: str = 'yuv420p', extra_args: list[str] | None = None)
Stream RGB frames in, fMP4 chunks out.
One encoder covers one segment. The server creates a new encoder per :class:`ltx2_segment_start`` boundary so each segment becomes one media fragment the client can append independently.
Example::
encoder = FragmentedMP4Encoder(width=1024, height=576, fps=24,
segment_idx=0)
async with encoder:
async for chunk in encoder.encode(frames):
await websocket.send_bytes(chunk.data)
Source code in fastvideo/entrypoints/streaming/stream.py
Functions¶
fastvideo.entrypoints.streaming.stream.FragmentedMP4Encoder.encode
async
¶encode(frames: list[ndarray] | AsyncIterator[ndarray]) -> AsyncIterator[FragmentedMP4Chunk]
Feed frames into ffmpeg and yield fMP4 chunks as they appear.