Skip to content

ltx2

Modules

fastvideo.pipelines.basic.ltx2.continuation

Typed continuation state for the LTX-2 streaming pipeline.

Segment N+1 conditions on segment N's trailing decoded frames and denoised audio latents. The streaming runtime used to hold this state as per-worker globals; lifting it into a typed, JSON-serializable object lets clients snapshot, migrate, or round-trip it through an HTTP/RPC boundary. The envelope ContinuationState(kind, payload) is the shared public API; the typed class here owns the LTX-2 payload shape.

Serialization contract:

  • Video frames → PNG bytes + base64, or a :class:BlobStore id.
  • Audio latents → a self-describing safetensors blob + base64, or a :class:BlobStore id. safetensors preserves bfloat16, which a raw-numpy round-trip cannot.
  • The returned payload is always a plain JSON-serializable dict.

Attributes

fastvideo.pipelines.basic.ltx2.continuation.DEFAULT_INLINE_THRESHOLD_BYTES module-attribute
DEFAULT_INLINE_THRESHOLD_BYTES = 2 * 1024 * 1024

Tensors larger than this go to the blob store (if available). 2 MiB is below typical single-JSON-message limits (Dynamo: 4 MiB, Postgres TOAST: 1 GiB) and well above per-frame PNG payloads (~200 KiB at 512x512).

fastvideo.pipelines.basic.ltx2.continuation.LTX2_CONTINUATION_KIND module-attribute
LTX2_CONTINUATION_KIND = 'ltx2.v1'

Public ContinuationState.kind for LTX-2 payloads.

fastvideo.pipelines.basic.ltx2.continuation.LTX2_CONTINUATION_SCHEMA_VERSION module-attribute
LTX2_CONTINUATION_SCHEMA_VERSION = 1

Payload schema version carried inside payload.schema_version.

Classes

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState dataclass
LTX2ContinuationState(segment_index: int = 0, video_frames: list[ndarray] | None = None, video_frames_blob_id: str | None = None, video_conditioning_frame_idx: int = 0, video_conditioning_strength: float = 1.0, audio_latents: Tensor | None = None, audio_latents_blob_id: str | None = None, audio_sample_rate: int | None = None, audio_conditioning_num_frames: int = 0, audio_conditioning_strength: float = 1.0, video_position_offset_sec: float = 0.0, metadata: dict[str, Any] = dict())

Typed LTX-2 continuation state carried between streaming segments.

video_frames hold trailing decoded RGB frames (uint8 HxWx3) from segment N for conditioning segment N+1 via the VAE encode path. audio_latents is the cached denoised audio latent tensor of shape [B, C, T, mel] that segment N+1 will copy into the overlap region of its clean-latent conditioning.

Most fields map 1:1 onto the internal gpu_pool's per-worker state; the only new concept is the *_blob_id fields, which allow large tensors to live outside the JSON payload. See module docstring.

Attributes
fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.audio_conditioning_num_frames class-attribute instance-attribute
audio_conditioning_num_frames: int = 0

Number of trailing audio frames that carry over as clean context into segment N+1.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.audio_conditioning_strength class-attribute instance-attribute
audio_conditioning_strength: float = 1.0

Clean-latent mask value applied to the overlap region; 0.0 keeps the cached audio entirely, 1.0 renoises from scratch.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.audio_latents class-attribute instance-attribute
audio_latents: Tensor | None = None

Denoised audio latent tensor of shape [B, C, T, mel]. None when the state is blob-backed or unset.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.audio_latents_blob_id class-attribute instance-attribute
audio_latents_blob_id: str | None = None

Blob store id when audio latents live outside the payload.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.audio_sample_rate class-attribute instance-attribute
audio_sample_rate: int | None = None

Sample rate for the audio side (e.g. 24000).

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.metadata class-attribute instance-attribute
metadata: dict[str, Any] = field(default_factory=dict)

Opaque metadata bag for forward-compat fields that don't need their own typed slot yet (e.g. custom knob experiments).

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.segment_index class-attribute instance-attribute
segment_index: int = 0

Index of the just-completed segment. Segment 0 has no history; state returned after segment 0 carries segment_index=0 and the caller uses segment_index + 1 as the next segment number.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.video_conditioning_frame_idx class-attribute instance-attribute
video_conditioning_frame_idx: int = 0

Target frame index inside the next segment that the trailing frames align with (matches the LTX-2 ltx2_video_conditions tuple's frame_idx slot).

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.video_conditioning_strength class-attribute instance-attribute
video_conditioning_strength: float = 1.0

Conditioning strength in [0, 1]. Matches the ltx2_video_ conditions tuple's strength slot.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.video_frames class-attribute instance-attribute
video_frames: list[ndarray] | None = None

Trailing decoded frames, each an RGB uint8 np.ndarray shaped (H, W, 3). None when the state is blob-backed or unset.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.video_frames_blob_id class-attribute instance-attribute
video_frames_blob_id: str | None = None

Blob store id when the frames live outside the payload.

fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.video_position_offset_sec class-attribute instance-attribute
video_position_offset_sec: float = 0.0

Seconds by which video RoPE is shifted forward so the audio prefix can sit at t >= 0 when audio conditioning is longer than video conditioning.

Functions
fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.from_continuation_state classmethod
from_continuation_state(state: ContinuationState, *, blob_store: BlobStore | None = None) -> LTX2ContinuationState

Rebuild a typed state from a public :class:ContinuationState.

Raises :class:ValueError when the kind doesn't match or the schema version is unsupported.

Source code in fastvideo/pipelines/basic/ltx2/continuation.py
@classmethod
def from_continuation_state(
    cls,
    state: ContinuationState,
    *,
    blob_store: BlobStore | None = None,
) -> LTX2ContinuationState:
    """Rebuild a typed state from a public :class:`ContinuationState`.

    Raises :class:`ValueError` when the kind doesn't match or the
    schema version is unsupported.
    """
    if state.kind != LTX2_CONTINUATION_KIND:
        raise ValueError(f"Expected ContinuationState.kind={LTX2_CONTINUATION_KIND!r}, "
                         f"got {state.kind!r}")
    payload = state.payload or {}
    version = int(payload.get("schema_version", LTX2_CONTINUATION_SCHEMA_VERSION))
    if version != LTX2_CONTINUATION_SCHEMA_VERSION:
        raise ValueError(f"Unsupported LTX-2 continuation schema_version={version}; "
                         f"this build expects {LTX2_CONTINUATION_SCHEMA_VERSION}")

    out = cls(
        segment_index=int(payload.get("segment_index", 0)),
        video_conditioning_frame_idx=int(payload.get("video_conditioning_frame_idx", 0)),
        video_conditioning_strength=float(payload.get("video_conditioning_strength", 1.0)),
        audio_sample_rate=(int(payload["audio_sample_rate"]) if "audio_sample_rate" in payload else None),
        audio_conditioning_num_frames=int(payload.get("audio_conditioning_num_frames", 0)),
        audio_conditioning_strength=float(payload.get("audio_conditioning_strength", 1.0)),
        video_position_offset_sec=float(payload.get("video_position_offset_sec", 0.0)),
        metadata=dict(payload.get("metadata") or {}),
    )

    video = payload.get("video")
    if isinstance(video, Mapping):
        cls._decode_video_frames(out, video, blob_store=blob_store)

    audio = payload.get("audio")
    if isinstance(audio, Mapping):
        cls._decode_audio_latents(out, audio, blob_store=blob_store)

    return out
fastvideo.pipelines.basic.ltx2.continuation.LTX2ContinuationState.to_continuation_state
to_continuation_state(*, blob_store: BlobStore | None = None, inline_threshold_bytes: int = DEFAULT_INLINE_THRESHOLD_BYTES) -> ContinuationState

Serialize into a public :class:ContinuationState.

When blob_store is given, tensors larger than inline_threshold_bytes are stored via :meth:BlobStore.put and referenced by id; otherwise all data is base64-encoded inline. The payload is always a plain JSON-serializable dict.

Source code in fastvideo/pipelines/basic/ltx2/continuation.py
def to_continuation_state(
    self,
    *,
    blob_store: BlobStore | None = None,
    inline_threshold_bytes: int = DEFAULT_INLINE_THRESHOLD_BYTES,
) -> ContinuationState:
    """Serialize into a public :class:`ContinuationState`.

    When ``blob_store`` is given, tensors larger than
    ``inline_threshold_bytes`` are stored via
    :meth:`BlobStore.put` and referenced by id; otherwise all data
    is base64-encoded inline. The payload is always a plain
    JSON-serializable dict.
    """
    payload: dict[str, Any] = {
        "schema_version": LTX2_CONTINUATION_SCHEMA_VERSION,
        "segment_index": int(self.segment_index),
        "video_conditioning_frame_idx": int(self.video_conditioning_frame_idx),
        "video_conditioning_strength": float(self.video_conditioning_strength),
        "audio_conditioning_num_frames": int(self.audio_conditioning_num_frames),
        "audio_conditioning_strength": float(self.audio_conditioning_strength),
        "video_position_offset_sec": float(self.video_position_offset_sec),
        "metadata": dict(self.metadata),
    }
    if self.audio_sample_rate is not None:
        payload["audio_sample_rate"] = int(self.audio_sample_rate)

    video_payload = self._encode_video_frames(
        blob_store=blob_store,
        inline_threshold_bytes=inline_threshold_bytes,
    )
    if video_payload is not None:
        payload["video"] = video_payload

    audio_payload = self._encode_audio_latents(
        blob_store=blob_store,
        inline_threshold_bytes=inline_threshold_bytes,
    )
    if audio_payload is not None:
        payload["audio"] = audio_payload

    return ContinuationState(
        kind=LTX2_CONTINUATION_KIND,
        payload=payload,
    )

Functions

fastvideo.pipelines.basic.ltx2.ltx2_pipeline

LTX-2 text-to-video pipeline.

Classes

Functions

fastvideo.pipelines.basic.ltx2.pipeline_configs

Classes

fastvideo.pipelines.basic.ltx2.pipeline_configs.LTX2T2VConfig dataclass
LTX2T2VConfig(model_path: str = '', pipeline_config_path: str | None = None, embedded_cfg_scale: float = 6.0, flow_shift: float | None = None, flow_shift_sr: float | None = None, disable_autocast: bool = False, is_causal: bool = False, dit_config: DiTConfig = LTX2VideoConfig(), dit_precision: str = 'bf16', upsampler_config: UpsamplerConfig = UpsamplerConfig(), upsampler_precision: str = 'fp32', vae_config: VAEConfig = LTX2VAEConfig(), vae_precision: str = 'bf16', vae_tiling: bool = True, vae_sp: bool = False, image_encoder_config: EncoderConfig = EncoderConfig(), image_encoder_precision: str = 'fp32', text_encoder_configs: tuple[EncoderConfig, ...] = (lambda: (LTX2GemmaConfig(),))(), text_encoder_precisions: tuple[str, ...] = (lambda: ('bf16',))(), preprocess_text_funcs: tuple[Callable[[str], str], ...] = (lambda: (preprocess_text,))(), postprocess_text_funcs: tuple[Callable[[BaseEncoderOutput], Tensor], ...] = (lambda: (ltx2_postprocess_text,))(), dmd_denoising_steps: list[int] | None = None, ti2v_task: bool = False, boundary_ratio: float | None = None, audio_decoder_config: ModelConfig = LTX2AudioDecoderConfig(), vocoder_config: ModelConfig = LTX2VocoderConfig(), audio_decoder_precision: str = 'bf16', vocoder_precision: str = 'bf16')

Bases: PipelineConfig

Configuration for LTX-2 T2V pipeline.

fastvideo.pipelines.basic.ltx2.presets

LTX2 model family pipeline presets.

Classes

fastvideo.pipelines.basic.ltx2.stage_overrides

Typed override surfaces for the LTX-2 two-stage refine flow.

  • preset_overrides.refine — init-time knobs (see :class:LTX2RefinePresetOverride).
  • stage_overrides.refine — per-request knobs (see :class:LTX2RefineStageOverride).

Asset paths live on :class:~fastvideo.api.schema.ComponentConfig (upsampler_weights and lora_path).

Classes

fastvideo.pipelines.basic.ltx2.stage_overrides.LTX2RefinePresetOverride dataclass
LTX2RefinePresetOverride(enabled: bool | None = None, add_noise: bool | None = None)

Init-time refine wiring under preset_overrides.refine.

fastvideo.pipelines.basic.ltx2.stage_overrides.LTX2RefineStageOverride dataclass
LTX2RefineStageOverride(num_inference_steps: int | None = None, guidance_scale: float | None = None, image_crf: int | None = None, video_position_offset_sec: float | None = None)

Per-request refine tuning under stage_overrides.refine.

Functions

fastvideo.pipelines.basic.ltx2.stage_overrides.refine_override_to_dict
refine_override_to_dict(override: LTX2RefinePresetOverride | LTX2RefineStageOverride) -> dict[str, Any]

Serialise a refine override, dropping None entries so only user-set fields reach preset_overrides.refine or stage_overrides.refine.

Source code in fastvideo/pipelines/basic/ltx2/stage_overrides.py
def refine_override_to_dict(override: LTX2RefinePresetOverride | LTX2RefineStageOverride, ) -> dict[str, Any]:
    """Serialise a refine override, dropping ``None`` entries so only
    user-set fields reach ``preset_overrides.refine`` or
    ``stage_overrides.refine``."""
    return {k: v for k, v in asdict(override).items() if v is not None}

fastvideo.pipelines.basic.ltx2.stages

LTX-2 family pipeline stages.

Classes

fastvideo.pipelines.basic.ltx2.stages.LTX2AudioDecodingStage
LTX2AudioDecodingStage(audio_decoder, vocoder)

Bases: PipelineStage

Decode LTX-2 audio latents into a waveform.

Source code in fastvideo/pipelines/basic/ltx2/stages/ltx2_audio_decoding.py
def __init__(self, audio_decoder, vocoder) -> None:
    super().__init__()
    self.audio_decoder = audio_decoder
    self.vocoder = vocoder
fastvideo.pipelines.basic.ltx2.stages.LTX2DenoisingStage
LTX2DenoisingStage(transformer)

Bases: PipelineStage

Run the LTX-2 denoising loop over the sigma schedule.

Source code in fastvideo/pipelines/basic/ltx2/stages/ltx2_denoising.py
def __init__(self, transformer) -> None:
    super().__init__()
    self.transformer = transformer
fastvideo.pipelines.basic.ltx2.stages.LTX2LatentPreparationStage
LTX2LatentPreparationStage(transformer)

Bases: PipelineStage

Prepare initial LTX-2 latents without relying on a diffusers scheduler.

Source code in fastvideo/pipelines/basic/ltx2/stages/ltx2_latent_preparation.py
def __init__(self, transformer) -> None:
    super().__init__()
    self.transformer = transformer
fastvideo.pipelines.basic.ltx2.stages.LTX2TextEncodingStage
LTX2TextEncodingStage(text_encoders, tokenizers)

Bases: TextEncodingStage

LTX2 text encoding stage with sequence parallelism support.

When SP is enabled (sp_world_size > 1), only rank 0 runs the text encoder and broadcasts embeddings to other ranks. This avoids I/O contention from all ranks loading the Gemma model simultaneously, which can cause text encoding to take 100+ seconds instead of ~5 seconds.

Source code in fastvideo/pipelines/stages/text_encoding.py
def __init__(self, text_encoders, tokenizers) -> None:
    """
    Initialize the prompt encoding stage.

    Args:
        enable_logging: Whether to enable logging for this stage.
        is_secondary: Whether this is a secondary text encoder.
    """
    super().__init__()
    self.tokenizers = tokenizers
    self.text_encoders = text_encoders
    self._last_audio_embeds: list[torch.Tensor] | None = None

Modules

fastvideo.pipelines.basic.ltx2.stages.ltx2_audio_decoding

Audio decoding stage for LTX-2 pipelines.

Classes
fastvideo.pipelines.basic.ltx2.stages.ltx2_audio_decoding.LTX2AudioDecodingStage
LTX2AudioDecodingStage(audio_decoder, vocoder)

Bases: PipelineStage

Decode LTX-2 audio latents into a waveform.

Source code in fastvideo/pipelines/basic/ltx2/stages/ltx2_audio_decoding.py
def __init__(self, audio_decoder, vocoder) -> None:
    super().__init__()
    self.audio_decoder = audio_decoder
    self.vocoder = vocoder
Functions
fastvideo.pipelines.basic.ltx2.stages.ltx2_denoising

LTX-2 denoising stage using the native sigma schedule.

Classes
fastvideo.pipelines.basic.ltx2.stages.ltx2_denoising.LTX2DenoisingStage
LTX2DenoisingStage(transformer)

Bases: PipelineStage

Run the LTX-2 denoising loop over the sigma schedule.

Source code in fastvideo/pipelines/basic/ltx2/stages/ltx2_denoising.py
def __init__(self, transformer) -> None:
    super().__init__()
    self.transformer = transformer
Functions
fastvideo.pipelines.basic.ltx2.stages.ltx2_latent_preparation

Latent preparation stage for LTX-2 pipelines.

Classes
fastvideo.pipelines.basic.ltx2.stages.ltx2_latent_preparation.LTX2LatentPreparationStage
LTX2LatentPreparationStage(transformer)

Bases: PipelineStage

Prepare initial LTX-2 latents without relying on a diffusers scheduler.

Source code in fastvideo/pipelines/basic/ltx2/stages/ltx2_latent_preparation.py
def __init__(self, transformer) -> None:
    super().__init__()
    self.transformer = transformer
Functions
fastvideo.pipelines.basic.ltx2.stages.ltx2_text_encoding

LTX2-specific text encoding stage with sequence parallelism broadcast support.

When running with sequence parallelism (SP), the Gemma text encoder is only executed on rank 0, and the embeddings are broadcast to all other ranks. This avoids I/O contention from all ranks loading the Gemma model simultaneously.

Classes
fastvideo.pipelines.basic.ltx2.stages.ltx2_text_encoding.LTX2TextEncodingStage
LTX2TextEncodingStage(text_encoders, tokenizers)

Bases: TextEncodingStage

LTX2 text encoding stage with sequence parallelism support.

When SP is enabled (sp_world_size > 1), only rank 0 runs the text encoder and broadcasts embeddings to other ranks. This avoids I/O contention from all ranks loading the Gemma model simultaneously, which can cause text encoding to take 100+ seconds instead of ~5 seconds.

Source code in fastvideo/pipelines/stages/text_encoding.py
def __init__(self, text_encoders, tokenizers) -> None:
    """
    Initialize the prompt encoding stage.

    Args:
        enable_logging: Whether to enable logging for this stage.
        is_secondary: Whether this is a secondary text encoder.
    """
    super().__init__()
    self.tokenizers = tokenizers
    self.text_encoders = text_encoders
    self._last_audio_embeds: list[torch.Tensor] | None = None
Functions