Skip to content

continuation

Typed continuation state for the LTX-2 streaming pipeline.

Segment N+1 conditions on segment N's trailing decoded frames and denoised audio latents. The streaming runtime used to hold this state as per-worker globals; lifting it into a typed, JSON-serializable object lets clients snapshot, migrate, or round-trip it through an HTTP/RPC boundary. The envelope ContinuationState(kind, payload) is the shared public API; the typed class here owns the LTX-2 payload shape.

Serialization contract:

  • Video frames → PNG bytes + base64, or a :class:BlobStore id.
  • Audio latents → a self-describing safetensors blob + base64, or a :class:BlobStore id. safetensors preserves bfloat16, which a raw-numpy round-trip cannot.
  • The returned payload is always a plain JSON-serializable dict.

Attributes

fastvideo.pipelines.basic.ltx2.continuation.DEFAULT_INLINE_THRESHOLD_BYTES module-attribute

DEFAULT_INLINE_THRESHOLD_BYTES = 2 * 1024 * 1024

Tensors larger than this go to the blob store (if available). 2 MiB is below typical single-JSON-message limits (Dynamo: 4 MiB, Postgres TOAST: 1 GiB) and well above per-frame PNG payloads (~200 KiB at 512x512).

fastvideo.pipelines.basic.ltx2.continuation.LTX2_CONTINUATION_KIND module-attribute

LTX2_CONTINUATION_KIND = 'ltx2.v1'

Public ContinuationState.kind for LTX-2 payloads.

fastvideo.pipelines.basic.ltx2.continuation.LTX2_CONTINUATION_SCHEMA_VERSION module-attribute

LTX2_CONTINUATION_SCHEMA_VERSION = 1

Payload schema version carried inside payload.schema_version.

Classes

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState dataclass

LTX2ContinuationState(segment_index: int = 0, video_frames: list[ndarray] | None = None, video_frames_blob_id: str | None = None, video_conditioning_frame_idx: int = 0, video_conditioning_strength: float = 1.0, audio_latents: Tensor | None = None, audio_latents_blob_id: str | None = None, audio_sample_rate: int | None = None, audio_conditioning_num_frames: int = 0, audio_conditioning_strength: float = 1.0, video_position_offset_sec: float = 0.0, metadata: dict[str, Any] = dict())

Typed LTX-2 continuation state carried between streaming segments.

video_frames hold trailing decoded RGB frames (uint8 HxWx3) from segment N for conditioning segment N+1 via the VAE encode path. audio_latents is the cached denoised audio latent tensor of shape [B, C, T, mel] that segment N+1 will copy into the overlap region of its clean-latent conditioning.

Most fields map 1:1 onto the internal gpu_pool's per-worker state; the only new concept is the *_blob_id fields, which allow large tensors to live outside the JSON payload. See module docstring.

Attributes

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.audio_conditioning_num_frames class-attribute instance-attribute
audio_conditioning_num_frames: int = 0

Number of trailing audio frames that carry over as clean context into segment N+1.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.audio_conditioning_strength class-attribute instance-attribute
audio_conditioning_strength: float = 1.0

Clean-latent mask value applied to the overlap region; 0.0 keeps the cached audio entirely, 1.0 renoises from scratch.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.audio_latents class-attribute instance-attribute
audio_latents: Tensor | None = None

Denoised audio latent tensor of shape [B, C, T, mel]. None when the state is blob-backed or unset.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.audio_latents_blob_id class-attribute instance-attribute
audio_latents_blob_id: str | None = None

Blob store id when audio latents live outside the payload.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.audio_sample_rate class-attribute instance-attribute
audio_sample_rate: int | None = None

Sample rate for the audio side (e.g. 24000).

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.metadata class-attribute instance-attribute
metadata: dict[str, Any] = field(default_factory=dict)

Opaque metadata bag for forward-compat fields that don't need their own typed slot yet (e.g. custom knob experiments).

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.segment_index class-attribute instance-attribute
segment_index: int = 0

Index of the just-completed segment. Segment 0 has no history; state returned after segment 0 carries segment_index=0 and the caller uses segment_index + 1 as the next segment number.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.video_conditioning_frame_idx class-attribute instance-attribute
video_conditioning_frame_idx: int = 0

Target frame index inside the next segment that the trailing frames align with (matches the LTX-2 ltx2_video_conditions tuple's frame_idx slot).

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.video_conditioning_strength class-attribute instance-attribute
video_conditioning_strength: float = 1.0

Conditioning strength in [0, 1]. Matches the ltx2_video_ conditions tuple's strength slot.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.video_frames class-attribute instance-attribute
video_frames: list[ndarray] | None = None

Trailing decoded frames, each an RGB uint8 np.ndarray shaped (H, W, 3). None when the state is blob-backed or unset.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.video_frames_blob_id class-attribute instance-attribute
video_frames_blob_id: str | None = None

Blob store id when the frames live outside the payload.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.video_position_offset_sec class-attribute instance-attribute
video_position_offset_sec: float = 0.0

Seconds by which video RoPE is shifted forward so the audio prefix can sit at t >= 0 when audio conditioning is longer than video conditioning.

Functions

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.from_continuation_state classmethod
from_continuation_state(state: ContinuationState, *, blob_store: BlobStore | None = None) -> LTX2ContinuationState

Rebuild a typed state from a public :class:ContinuationState.

Raises :class:ValueError when the kind doesn't match or the schema version is unsupported.

Source code in fastvideo/pipelines/basic/ltx2/continuation.py
@classmethod
def from_continuation_state(
    cls,
    state: ContinuationState,
    *,
    blob_store: BlobStore | None = None,
) -> LTX2ContinuationState:
    """Rebuild a typed state from a public :class:`ContinuationState`.

    Raises :class:`ValueError` when the kind doesn't match or the
    schema version is unsupported.
    """
    if state.kind != LTX2_CONTINUATION_KIND:
        raise ValueError(f"Expected ContinuationState.kind={LTX2_CONTINUATION_KIND!r}, "
                         f"got {state.kind!r}")
    payload = state.payload or {}
    version = int(payload.get("schema_version", LTX2_CONTINUATION_SCHEMA_VERSION))
    if version != LTX2_CONTINUATION_SCHEMA_VERSION:
        raise ValueError(f"Unsupported LTX-2 continuation schema_version={version}; "
                         f"this build expects {LTX2_CONTINUATION_SCHEMA_VERSION}")

    out = cls(
        segment_index=int(payload.get("segment_index", 0)),
        video_conditioning_frame_idx=int(payload.get("video_conditioning_frame_idx", 0)),
        video_conditioning_strength=float(payload.get("video_conditioning_strength", 1.0)),
        audio_sample_rate=(int(payload["audio_sample_rate"]) if "audio_sample_rate" in payload else None),
        audio_conditioning_num_frames=int(payload.get("audio_conditioning_num_frames", 0)),
        audio_conditioning_strength=float(payload.get("audio_conditioning_strength", 1.0)),
        video_position_offset_sec=float(payload.get("video_position_offset_sec", 0.0)),
        metadata=dict(payload.get("metadata") or {}),
    )

    video = payload.get("video")
    if isinstance(video, Mapping):
        cls._decode_video_frames(out, video, blob_store=blob_store)

    audio = payload.get("audio")
    if isinstance(audio, Mapping):
        cls._decode_audio_latents(out, audio, blob_store=blob_store)

    return out
fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.to_continuation_state
to_continuation_state(*, blob_store: BlobStore | None = None, inline_threshold_bytes: int = DEFAULT_INLINE_THRESHOLD_BYTES) -> ContinuationState

Serialize into a public :class:ContinuationState.

When blob_store is given, tensors larger than inline_threshold_bytes are stored via :meth:BlobStore.put and referenced by id; otherwise all data is base64-encoded inline. The payload is always a plain JSON-serializable dict.

Source code in fastvideo/pipelines/basic/ltx2/continuation.py
def to_continuation_state(
    self,
    *,
    blob_store: BlobStore | None = None,
    inline_threshold_bytes: int = DEFAULT_INLINE_THRESHOLD_BYTES,
) -> ContinuationState:
    """Serialize into a public :class:`ContinuationState`.

    When ``blob_store`` is given, tensors larger than
    ``inline_threshold_bytes`` are stored via
    :meth:`BlobStore.put` and referenced by id; otherwise all data
    is base64-encoded inline. The payload is always a plain
    JSON-serializable dict.
    """
    payload: dict[str, Any] = {
        "schema_version": LTX2_CONTINUATION_SCHEMA_VERSION,
        "segment_index": int(self.segment_index),
        "video_conditioning_frame_idx": int(self.video_conditioning_frame_idx),
        "video_conditioning_strength": float(self.video_conditioning_strength),
        "audio_conditioning_num_frames": int(self.audio_conditioning_num_frames),
        "audio_conditioning_strength": float(self.audio_conditioning_strength),
        "video_position_offset_sec": float(self.video_position_offset_sec),
        "metadata": dict(self.metadata),
    }
    if self.audio_sample_rate is not None:
        payload["audio_sample_rate"] = int(self.audio_sample_rate)

    video_payload = self._encode_video_frames(
        blob_store=blob_store,
        inline_threshold_bytes=inline_threshold_bytes,
    )
    if video_payload is not None:
        payload["video"] = video_payload

    audio_payload = self._encode_audio_latents(
        blob_store=blob_store,
        inline_threshold_bytes=inline_threshold_bytes,
    )
    if audio_payload is not None:
        payload["audio"] = audio_payload

    return ContinuationState(
        kind=LTX2_CONTINUATION_KIND,
        payload=payload,
    )

Functions