Skip to content

entrypoints

Modules

fastvideo.entrypoints.cli

Modules

fastvideo.entrypoints.cli.bench

Runs benchmark against a running FastVideo OpenAI-compatible server.

Example usage

fastvideo bench --dataset vbench --num-prompts 20 --port 8000

Classes
fastvideo.entrypoints.cli.bench.BenchSubcommand
BenchSubcommand()

Bases: CLISubcommand

The bench subcommand — runs serving benchmarks.

Source code in fastvideo/entrypoints/cli/bench.py
def __init__(self) -> None:
    self.name = "bench"
    super().__init__()
Functions
fastvideo.entrypoints.cli.bench_serving

Benchmark online serving for diffusion models (Image/Video Generation).

Example usage
launch a server and benchmark on it
T2V or T2I or any other multimodal generation model

fastvideo serve --config serve.yaml

benchmark it and make sure the port is the same as the server's port

fastvideo bench --dataset vbench --num-prompts 20 --port 8000

Classes
fastvideo.entrypoints.cli.bench_serving.VBenchDataset
VBenchDataset(args, api_url: str, model: str)

Bases: BaseDataset

Dataset loader for VBench prompts. Supports t2v, i2v.

Source code in fastvideo/entrypoints/cli/bench_serving.py
def __init__(self, args, api_url: str, model: str):
    super().__init__(args, api_url, model)
    self.cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "fastvideo")
    self.items = self._load_data()
Functions
fastvideo.entrypoints.cli.cli_types
Classes
fastvideo.entrypoints.cli.cli_types.CLISubcommand

Base class for CLI subcommands

Functions
fastvideo.entrypoints.cli.cli_types.CLISubcommand.cmd
cmd(args: Namespace) -> None

Execute the command with the given arguments

Source code in fastvideo/entrypoints/cli/cli_types.py
def cmd(self, args: argparse.Namespace) -> None:
    """Execute the command with the given arguments"""
    raise NotImplementedError
fastvideo.entrypoints.cli.cli_types.CLISubcommand.subparser_init
subparser_init(subparsers: _SubParsersAction) -> FlexibleArgumentParser

Initialize the subparser for this command

Source code in fastvideo/entrypoints/cli/cli_types.py
def subparser_init(self, subparsers: argparse._SubParsersAction) -> FlexibleArgumentParser:
    """Initialize the subparser for this command"""
    raise NotImplementedError
fastvideo.entrypoints.cli.cli_types.CLISubcommand.validate
validate(args: Namespace) -> None

Validate the arguments for this command

Source code in fastvideo/entrypoints/cli/cli_types.py
def validate(self, args: argparse.Namespace) -> None:
    """Validate the arguments for this command"""
    pass
fastvideo.entrypoints.cli.generate
Classes
fastvideo.entrypoints.cli.generate.GenerateSubcommand
GenerateSubcommand()

Bases: CLISubcommand

The generate subcommand for the FastVideo CLI

Source code in fastvideo/entrypoints/cli/generate.py
def __init__(self) -> None:
    self.name = "generate"
    super().__init__()
Functions
fastvideo.entrypoints.cli.generate.GenerateSubcommand.validate
validate(args: Namespace) -> None

Validate the arguments for this command

Source code in fastvideo/entrypoints/cli/generate.py
def validate(self, args: argparse.Namespace) -> None:
    """Validate the arguments for this command"""
    if not args.config:
        raise ValueError("fastvideo generate requires --config PATH; use a nested "
                         "run config plus optional dotted overrides")
    if not os.path.exists(args.config):
        raise ValueError(f"Config file not found: {args.config}")
    setattr(
        args,
        _VALIDATED_RUN_CONFIG_ATTR,
        build_generate_run_config(
            args,
            overrides=getattr(args, "_unknown", None),
        ),
    )
Functions
fastvideo.entrypoints.cli.main
Classes
Functions
fastvideo.entrypoints.cli.main.cmd_init
cmd_init() -> list[CLISubcommand]

Initialize all commands from separate modules

Source code in fastvideo/entrypoints/cli/main.py
def cmd_init() -> list[CLISubcommand]:
    """Initialize all commands from separate modules"""
    commands = []
    commands.extend(generate_cmd_init())
    commands.extend(serve_cmd_init())
    commands.extend(bench_cmd_init())
    return commands
fastvideo.entrypoints.cli.serve
Classes
fastvideo.entrypoints.cli.serve.ServeSubcommand
ServeSubcommand()

Bases: CLISubcommand

Starts an OpenAI-compatible API server.

Source code in fastvideo/entrypoints/cli/serve.py
def __init__(self) -> None:
    self.name = "serve"
    super().__init__()
Functions
fastvideo.entrypoints.cli.utils
Functions
fastvideo.entrypoints.cli.utils.launch_distributed
launch_distributed(num_gpus: int, args: list[str], master_port: int | None = None) -> int

Launch a distributed job with the given arguments

Parameters:

Name Type Description Default
num_gpus int

Number of GPUs to use

required
args list[str]

Arguments to pass to v1_fastvideo_inference.py (defaults to sys.argv[1:])

required
master_port int | None

Port for the master process (default: random)

None
Source code in fastvideo/entrypoints/cli/utils.py
def launch_distributed(num_gpus: int, args: list[str], master_port: int | None = None) -> int:
    """
    Launch a distributed job with the given arguments

    Args:
        num_gpus: Number of GPUs to use
        args: Arguments to pass to v1_fastvideo_inference.py (defaults to sys.argv[1:])
        master_port: Port for the master process (default: random)
    """

    current_env = os.environ.copy()
    python_executable = sys.executable
    project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../.."))
    main_script = os.path.join(project_root, "fastvideo/sample/v1_fastvideo_inference.py")

    cmd = [python_executable, "-m", "torch.distributed.run", f"--nproc_per_node={num_gpus}"]

    if master_port is not None:
        cmd.append(f"--master_port={master_port}")

    cmd.append(main_script)
    cmd.extend(args)

    logger.info("Running inference with %d GPU(s)", num_gpus)
    logger.info("Launching command: %s", " ".join(cmd))

    current_env["PYTHONIOENCODING"] = "utf-8"
    process = subprocess.Popen(cmd,
                               env=current_env,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.STDOUT,
                               universal_newlines=True,
                               bufsize=1,
                               encoding='utf-8',
                               errors='replace')

    if process.stdout:
        for line in iter(process.stdout.readline, ''):
            print(line.strip())

    return process.wait()

fastvideo.entrypoints.openai

Modules

fastvideo.entrypoints.openai.api_server
Classes
Functions
fastvideo.entrypoints.openai.api_server.create_app
create_app(fastvideo_args: FastVideoArgs, output_dir: str = DEFAULT_OUTPUT_DIR, default_request: GenerationRequest | None = None) -> FastAPI

Build the FastAPI application with all routers mounted

Source code in fastvideo/entrypoints/openai/api_server.py
def create_app(
    fastvideo_args: FastVideoArgs,
    output_dir: str = DEFAULT_OUTPUT_DIR,
    default_request: GenerationRequest | None = None,
) -> FastAPI:
    """Build the FastAPI application with all routers mounted"""

    app = FastAPI(
        title="FastVideo OpenAI-Compatible API",
        version="0.1.0",
        lifespan=lifespan,
    )
    app.state.fastvideo_args = fastvideo_args
    app.state.output_dir = output_dir
    app.state.default_request = default_request

    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

    # Import and mount routers
    from fastvideo.entrypoints.openai.common_api import router as common_router
    from fastvideo.entrypoints.openai.image_api import router as image_router
    from fastvideo.entrypoints.openai.video_api import router as video_router

    app.include_router(common_router)
    app.include_router(video_router)
    app.include_router(image_router)

    @app.get("/health")
    async def health():
        return {"status": "ok"}

    return app
fastvideo.entrypoints.openai.api_server.lifespan async
lifespan(app: FastAPI) -> AsyncIterator[None]

Load model on startup, clean up on shutdown

Source code in fastvideo/entrypoints/openai/api_server.py
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    """Load model on startup, clean up on shutdown"""
    args: FastVideoArgs = app.state.fastvideo_args
    output_dir: str = app.state.output_dir
    default_request: GenerationRequest | None = getattr(app.state, "default_request", None)

    logger.info("Loading model from %s ...", args.model_path)
    generator = VideoGenerator.from_fastvideo_args(args)
    logger.info("Model loaded successfully.")

    set_state(generator, args, output_dir, default_request=default_request)

    yield  # server is running

    logger.info("Shutting down — releasing model resources ...")
    generator.shutdown()
    clear_state()
    logger.info("Shutdown complete.")
fastvideo.entrypoints.openai.api_server.run_server
run_server(fastvideo_args: FastVideoArgs, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, output_dir: str = DEFAULT_OUTPUT_DIR, default_request: GenerationRequest | None = None)

Create the app and run it with uvicorn

Source code in fastvideo/entrypoints/openai/api_server.py
def run_server(
    fastvideo_args: FastVideoArgs,
    host: str = DEFAULT_HOST,
    port: int = DEFAULT_PORT,
    output_dir: str = DEFAULT_OUTPUT_DIR,
    default_request: GenerationRequest | None = None,
):
    """Create the app and run it with uvicorn"""
    if default_request is not None:
        _validate_default_request_against_preset(default_request, fastvideo_args.model_path)

    app = create_app(
        fastvideo_args,
        output_dir=output_dir,
        default_request=default_request,
    )

    logger.info("Starting FastVideo server on %s:%d", host, port)
    logger.info("Model: %s", fastvideo_args.model_path)

    uvicorn.run(
        app,
        host=host,
        port=port,
        log_level="info",
        timeout_keep_alive=300,
    )
fastvideo.entrypoints.openai.common_api
Classes
fastvideo.entrypoints.openai.common_api.ModelCard

Bases: BaseModel

OpenAI-compatible model card

Functions
fastvideo.entrypoints.openai.common_api.available_models async
available_models()

Show available models

Source code in fastvideo/entrypoints/openai/common_api.py
@router.get("/models", response_class=ORJSONResponse)
async def available_models():
    """Show available models"""
    args = get_server_args()
    card = ModelCard(id=args.model_path, root=args.model_path)
    return {"object": "list", "data": [card.model_dump()]}
fastvideo.entrypoints.openai.common_api.model_info async
model_info()

Get basic model information

Source code in fastvideo/entrypoints/openai/common_api.py
@router.get("/model_info")
async def model_info():
    """Get basic model information"""
    args = get_server_args()
    return {"model_path": args.model_path}
fastvideo.entrypoints.openai.common_api.retrieve_model async
retrieve_model(model: str)

Retrieve a model by name

Source code in fastvideo/entrypoints/openai/common_api.py
@router.get("/models/{model:path}", response_class=ORJSONResponse)
async def retrieve_model(model: str):
    """Retrieve a model by name"""
    args = get_server_args()
    if model != args.model_path:
        return ORJSONResponse(
            status_code=404,
            content={
                "error": {
                    "message": f"The model '{model}' does not exist",
                    "type": "invalid_request_error",
                    "param": "model",
                    "code": "model_not_found",
                }
            },
        )
    card = ModelCard(id=model, root=model)
    return card.model_dump()
fastvideo.entrypoints.openai.image_api
Functions
fastvideo.entrypoints.openai.protocol
Functions
fastvideo.entrypoints.openai.protocol.generate_request_id
generate_request_id() -> str

Generate a unique request ID

Source code in fastvideo/entrypoints/openai/protocol.py
def generate_request_id() -> str:
    """Generate a unique request ID"""
    return uuid.uuid4().hex
fastvideo.entrypoints.openai.state

Global server state shared across API modules.

Keeping state in a dedicated module prevents the classic 'main vs package module' duplication that occurs when api_server.py is run with python -m. All modules that need the generator or server args should import from here.

Classes
Functions
fastvideo.entrypoints.openai.state.clear_state
clear_state() -> None

Clear server state on shutdown.

Source code in fastvideo/entrypoints/openai/state.py
def clear_state() -> None:
    """Clear server state on shutdown."""
    global _generator, _fastvideo_args, _default_request
    _generator = None
    _fastvideo_args = None
    _default_request = None
fastvideo.entrypoints.openai.state.get_default_request
get_default_request() -> GenerationRequest | None

Return the ServeConfig.default_request set at startup, if any.

Source code in fastvideo/entrypoints/openai/state.py
def get_default_request() -> GenerationRequest | None:
    """Return the ServeConfig.default_request set at startup, if any."""
    return _default_request
fastvideo.entrypoints.openai.state.get_generator
get_generator() -> VideoGenerator

Return the global VideoGenerator instance (set during startup).

Source code in fastvideo/entrypoints/openai/state.py
def get_generator() -> VideoGenerator:
    """Return the global VideoGenerator instance (set during startup)."""
    assert _generator is not None, "Server not initialized — generator is None"
    return _generator
fastvideo.entrypoints.openai.state.get_output_dir
get_output_dir() -> str

Return the configured output directory.

Source code in fastvideo/entrypoints/openai/state.py
def get_output_dir() -> str:
    """Return the configured output directory."""
    return _output_dir
fastvideo.entrypoints.openai.state.get_server_args
get_server_args() -> FastVideoArgs

Return the global FastVideoArgs (set during startup).

Source code in fastvideo/entrypoints/openai/state.py
def get_server_args() -> FastVideoArgs:
    """Return the global FastVideoArgs (set during startup)."""
    assert _fastvideo_args is not None, "Server not initialized — args is None"
    return _fastvideo_args
fastvideo.entrypoints.openai.state.set_state
set_state(generator: VideoGenerator, fastvideo_args: FastVideoArgs, output_dir: str, default_request: GenerationRequest | None = None) -> None

Set all server state at once (called from lifespan).

Source code in fastvideo/entrypoints/openai/state.py
def set_state(
    generator: VideoGenerator,
    fastvideo_args: FastVideoArgs,
    output_dir: str,
    default_request: GenerationRequest | None = None,
) -> None:
    """Set all server state at once (called from lifespan)."""
    global _generator, _fastvideo_args, _output_dir, _default_request
    _generator = generator
    _fastvideo_args = fastvideo_args
    _output_dir = output_dir
    _default_request = default_request
fastvideo.entrypoints.openai.stores
Classes
fastvideo.entrypoints.openai.stores.AsyncDictStore
AsyncDictStore()

A small async-safe in-memory key-value store for dict items.

This encapsulates the usual pattern of a module-level dict guarded by an asyncio.Lock and provides simple CRUD methods that are safe to call concurrently from FastAPI request handlers and background tasks.

Source code in fastvideo/entrypoints/openai/stores.py
def __init__(self) -> None:
    self._items: dict[str, dict[str, Any]] = {}
    self._lock = asyncio.Lock()
fastvideo.entrypoints.openai.utils
Functions
fastvideo.entrypoints.openai.utils.choose_image_ext
choose_image_ext(output_format: str | None, background: str | None) -> str

Pick a file extension for image outputs

Source code in fastvideo/entrypoints/openai/utils.py
def choose_image_ext(output_format: str | None, background: str | None) -> str:
    """Pick a file extension for image outputs"""
    fmt = (output_format or "").lower()
    if fmt in {"png", "webp", "jpeg", "jpg"}:
        return "jpg" if fmt == "jpeg" else fmt
    if (background or "auto").lower() == "transparent":
        return "png"
    return "jpg"
fastvideo.entrypoints.openai.utils.merge_image_input_list
merge_image_input_list(*inputs: list | Any | None) -> list

Merge multiple image input sources into a single flat list

Source code in fastvideo/entrypoints/openai/utils.py
def merge_image_input_list(*inputs: list | Any | None) -> list:
    """Merge multiple image input sources into a single flat list"""
    result = []
    for input_item in inputs:
        if input_item is not None:
            if isinstance(input_item, list):
                result.extend(input_item)
            else:
                result.append(input_item)
    return result
fastvideo.entrypoints.openai.utils.parse_size
parse_size(size: str) -> tuple[int, int] | tuple[None, None]

Parse a 'WIDTHxHEIGHT' string into (width, height)

Source code in fastvideo/entrypoints/openai/utils.py
def parse_size(size: str) -> tuple[int, int] | tuple[None, None]:
    """Parse a 'WIDTHxHEIGHT' string into (width, height)"""
    try:
        parts = size.lower().replace(" ", "").split("x")
        if len(parts) != 2:
            raise ValueError
        w, h = int(parts[0]), int(parts[1])
        return w, h
    except Exception:
        return None, None
fastvideo.entrypoints.openai.utils.save_image_to_path async
save_image_to_path(image: UploadFile | str, target_path: str) -> str

Save an uploaded file or download from URL to target_path

Source code in fastvideo/entrypoints/openai/utils.py
async def save_image_to_path(image: UploadFile | str, target_path: str) -> str:
    """Save an uploaded file or download from URL to *target_path*"""
    input_path = await _maybe_url_image(image, target_path)
    if input_path is None:
        input_path = await _save_upload_to_path(image, target_path)
    return input_path
fastvideo.entrypoints.openai.video_api
Functions

fastvideo.entrypoints.streaming

Classes

fastvideo.entrypoints.streaming.BlobStore

Bases: ABC

Opaque byte-blob storage keyed by id.

A :class:ContinuationState payload can reference large tensors stored in a :class:BlobStore rather than inlining them, so the JSON payload stays small when the state travels over the wire.

Functions
fastvideo.entrypoints.streaming.BlobStore.drop abstractmethod
drop(blob_id: str) -> None

Remove a blob. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, blob_id: str) -> None:
    """Remove a blob. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.BlobStore.get abstractmethod
get(blob_id: str) -> bytes

Load a previously stored blob. Raises KeyError if absent.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def get(self, blob_id: str) -> bytes:
    """Load a previously stored blob. Raises ``KeyError`` if absent."""
fastvideo.entrypoints.streaming.BlobStore.put abstractmethod
put(data: bytes, *, mime: str = 'application/octet-stream') -> str

Store data and return a blob id for later retrieval.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def put(self, data: bytes, *, mime: str = "application/octet-stream") -> str:
    """Store ``data`` and return a blob id for later retrieval."""
fastvideo.entrypoints.streaming.FragmentedMP4Chunk dataclass
FragmentedMP4Chunk(kind: Literal['init', 'media'], data: bytes, stream_id: str, segment_idx: int)

A single fMP4 byte chunk emitted by :class:FragmentedMP4Encoder.

kind identifies whether the chunk is the init segment (must be fed into the client's SourceBuffer first) or a media fragment.

fastvideo.entrypoints.streaming.FragmentedMP4Encoder
FragmentedMP4Encoder(*, width: int, height: int, fps: int, segment_idx: int, stream_id: str | None = None, ffmpeg_path: str = 'ffmpeg', preset: str = 'ultrafast', pixel_format_out: str = 'yuv420p', extra_args: list[str] | None = None)

Stream RGB frames in, fMP4 chunks out.

One encoder covers one segment. The server creates a new encoder per :class:`ltx2_segment_start`` boundary so each segment becomes one media fragment the client can append independently.

Example::

encoder = FragmentedMP4Encoder(width=1024, height=576, fps=24,
                                segment_idx=0)
async with encoder:
    async for chunk in encoder.encode(frames):
        await websocket.send_bytes(chunk.data)
Source code in fastvideo/entrypoints/streaming/stream.py
def __init__(
    self,
    *,
    width: int,
    height: int,
    fps: int,
    segment_idx: int,
    stream_id: str | None = None,
    ffmpeg_path: str = "ffmpeg",
    preset: str = "ultrafast",
    pixel_format_out: str = "yuv420p",
    extra_args: list[str] | None = None,
) -> None:
    self.width = width
    self.height = height
    self.fps = fps
    self.segment_idx = segment_idx
    self.stream_id = stream_id or uuid.uuid4().hex
    self._ffmpeg_path = ffmpeg_path
    self._preset = preset
    self._pixel_format_out = pixel_format_out
    self._extra_args = list(extra_args or [])
    self._proc: subprocess.Popen | None = None
    self._init_emitted = False
Functions
fastvideo.entrypoints.streaming.FragmentedMP4Encoder.encode async
encode(frames: list[ndarray] | AsyncIterator[ndarray]) -> AsyncIterator[FragmentedMP4Chunk]

Feed frames into ffmpeg and yield fMP4 chunks as they appear.

Source code in fastvideo/entrypoints/streaming/stream.py
async def encode(
    self,
    frames: list[np.ndarray] | AsyncIterator[np.ndarray],
) -> AsyncIterator[FragmentedMP4Chunk]:
    """Feed frames into ffmpeg and yield fMP4 chunks as they appear."""
    if self._proc is None:
        self._spawn()
    assert self._proc is not None and self._proc.stdin is not None
    proc = self._proc

    loop = asyncio.get_running_loop()

    async def _writer() -> None:
        try:
            if hasattr(frames, "__aiter__"):
                async for frame in frames:  # type: ignore[union-attr]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
            else:
                for frame in frames:  # type: ignore[assignment]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
        finally:
            with contextlib.suppress(BrokenPipeError):
                proc.stdin.close()

    writer_task = asyncio.create_task(_writer())
    try:
        reader = proc.stdout
        assert reader is not None
        # Read in reasonably-sized chunks; MSE tolerates any size
        # but we don't want to starve the event loop.
        chunk_size = 64 * 1024
        while True:
            data = await loop.run_in_executor(None, reader.read, chunk_size)
            if not data:
                break
            kind: Literal["init", "media"] = "init" if not self._init_emitted else "media"
            self._init_emitted = True
            yield FragmentedMP4Chunk(
                kind=kind,
                data=bytes(data),
                stream_id=self.stream_id,
                segment_idx=self.segment_idx,
            )
    finally:
        await writer_task
fastvideo.entrypoints.streaming.InMemoryBlobStore
InMemoryBlobStore()

Bases: BlobStore

Thread-safe in-memory :class:BlobStore for single-process servers.

No eviction policy — callers are responsible for calling :meth:drop when a blob's owning state is replaced or a session ends. A redis- or filesystem-backed :class:BlobStore should replace this when the streaming server lands as a real service (PR 7.5+).

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._blobs: dict[str, _BlobRecord] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.InMemorySessionStore
InMemorySessionStore()

Bases: SessionStore

Thread-safe in-memory :class:SessionStore.

Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.

No eviction / TTL / bounded capacity — sessions only leave via :meth:drop. The live streaming server (PR 7.5+) is responsible for bounding growth and for dropping any :class:BlobStore blobs referenced by a state when that state is replaced or a session ends; this class does not know about blobs.

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._sessions: dict[str, ContinuationState] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.Session dataclass
Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())
Functions
fastvideo.entrypoints.streaming.Session.transition
transition(target: SessionState) -> None

Move to target if the edge is allowed.

Raises :class:InvalidSessionTransition on illegal moves. The self-loop on ACTIVE is legal so the server can re-assert ACTIVE on segment completion without special casing.

Source code in fastvideo/entrypoints/streaming/session.py
def transition(self, target: SessionState) -> None:
    """Move to ``target`` if the edge is allowed.

    Raises :class:`InvalidSessionTransition` on illegal moves. The
    self-loop on ``ACTIVE`` is legal so the server can re-assert
    ACTIVE on segment completion without special casing.
    """
    allowed = _VALID_TRANSITIONS.get(self.state, frozenset())
    if target not in allowed and target is not self.state:
        raise InvalidSessionTransition(f"{self.state.value} -> {target.value} is not a valid "
                                       f"session transition")
    self.state = target
    self.last_activity = time.monotonic()
fastvideo.entrypoints.streaming.SessionManager
SessionManager(*, segment_cap: int, session_timeout_seconds: int, max_sessions: int = 1)

Registers sessions and enforces per-server session limits.

Source code in fastvideo/entrypoints/streaming/session.py
def __init__(
    self,
    *,
    segment_cap: int,
    session_timeout_seconds: int,
    max_sessions: int = 1,
) -> None:
    self._segment_cap = segment_cap
    self._session_timeout_seconds = session_timeout_seconds
    self._max_sessions = max_sessions
    self._sessions: dict[str, Session] = {}
Functions
fastvideo.entrypoints.streaming.SessionManager.reap_timed_out
reap_timed_out(now: float | None = None) -> list[str]

Return the ids of sessions that have exceeded the idle timeout.

The caller is responsible for actually closing them — this method only identifies dead sessions so the server can emit session_timeout frames before dropping the WebSocket.

TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.

Source code in fastvideo/entrypoints/streaming/session.py
def reap_timed_out(self, now: float | None = None) -> list[str]:
    """Return the ids of sessions that have exceeded the idle timeout.

    The caller is responsible for actually closing them — this
    method only *identifies* dead sessions so the server can emit
    ``session_timeout`` frames before dropping the WebSocket.

    TODO: unused until a background driver calls it. Per-connection
    idle enforcement currently happens via asyncio.wait_for on
    receive_json; this helper catches sessions stuck before any
    receive (e.g. future QUEUED state) and is expected to be wired
    into the GPU-pool reaper.
    """
    now = now if now is not None else time.monotonic()
    dead: list[str] = []
    for sid, session in self._sessions.items():
        if session.state in {
                SessionState.COMPLETE,
                SessionState.ERROR,
                SessionState.TIMEOUT,
                SessionState.REJECTED,
        }:
            continue
        if now - session.last_activity > self._session_timeout_seconds:
            dead.append(sid)
    return dead
fastvideo.entrypoints.streaming.SessionState

Bases: Enum

State-machine positions for a streaming session.

Transitions are server-owned. See docs/design/server_contracts/streaming.md for the full diagram.

fastvideo.entrypoints.streaming.SessionStore

Bases: ABC

Keyed store for per-session continuation state.

Implementations own the session-id → state mapping. The streaming server calls :meth:store after each segment and :meth:snapshot when a client explicitly asks for an exportable state handle.

Functions
fastvideo.entrypoints.streaming.SessionStore.drop abstractmethod
drop(session_id: str) -> None

Forget a session. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, session_id: str) -> None:
    """Forget a session. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.SessionStore.hydrate abstractmethod
hydrate(state: ContinuationState, *, session_id: str | None = None) -> str

Install state as the starting point for a session.

When session_id is None the store allocates a fresh id (UUID4); when provided the store uses it verbatim, overwriting any prior state at that id.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def hydrate(
    self,
    state: ContinuationState,
    *,
    session_id: str | None = None,
) -> str:
    """Install ``state`` as the starting point for a session.

    When ``session_id`` is ``None`` the store allocates a fresh id
    (UUID4); when provided the store uses it verbatim, overwriting
    any prior state at that id.
    """
fastvideo.entrypoints.streaming.SessionStore.snapshot abstractmethod
snapshot(session_id: str) -> ContinuationState | None

Return the current state for session_id (or None).

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def snapshot(self, session_id: str) -> ContinuationState | None:
    """Return the current state for ``session_id`` (or ``None``)."""
fastvideo.entrypoints.streaming.SessionStore.store abstractmethod
store(session_id: str, state: ContinuationState) -> None

Persist state for session_id, replacing any prior value.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def store(self, session_id: str, state: ContinuationState) -> None:
    """Persist ``state`` for ``session_id``, replacing any prior value."""

Functions

fastvideo.entrypoints.streaming.build_app
build_app(serve_config: ServeConfig, generator: _GeneratorProto, *, session_store: SessionStore | None = None) -> FastAPI

Build the FastAPI app used by :func:run_server.

Exposed so tests can drive the WebSocket endpoint in-process via starlette.testclient.TestClient(app).websocket_connect(...).

Source code in fastvideo/entrypoints/streaming/server.py
def build_app(
    serve_config: ServeConfig,
    generator: _GeneratorProto,
    *,
    session_store: SessionStore | None = None,
) -> FastAPI:
    """Build the FastAPI app used by :func:`run_server`.

    Exposed so tests can drive the WebSocket endpoint in-process via
    ``starlette.testclient.TestClient(app).websocket_connect(...)``.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming "
                         "server; got None. Add a `streaming:` block to your serve config.")

    sessions = SessionManager(
        segment_cap=serve_config.streaming.generation_segment_cap,
        session_timeout_seconds=serve_config.streaming.session_timeout_seconds,
    )
    state = ServerState(
        serve_config=serve_config,
        generator=generator,
        sessions=sessions,
        session_store=session_store or InMemorySessionStore(),
    )

    app = FastAPI(title="FastVideo Streaming")

    @app.get("/health")
    async def _health() -> JSONResponse:
        return JSONResponse({
            "status": "ok",
            "sessions": len(state.sessions),
            "stream_mode": state.serve_config.streaming.stream_mode,
        })

    @app.websocket("/v1/stream")
    async def _stream(websocket: WebSocket) -> None:
        await websocket.accept()
        try:
            session = state.sessions.create()
        except SessionRejected as exc:
            await _send_error(websocket, "session_rejected", str(exc), retryable=False)
            await websocket.close(code=_WS_CLOSE_TRY_AGAIN_LATER, reason="session_rejected")
            return

        try:
            await _handle_session(websocket, session, state)
        except WebSocketDisconnect:
            logger.info("session %s: client disconnected", session.id[:8])
        except Exception:  # pragma: no cover - defensive catch-all
            logger.exception("session %s: unhandled error", session.id[:8])
            with contextlib.suppress(InvalidSessionTransition):
                session.transition(SessionState.ERROR)
        finally:
            _cleanup_session(session, state)

    app.state.server_state = state
    return app
fastvideo.entrypoints.streaming.run_server
run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None

Launch the streaming server.

Boots a :class:fastvideo.VideoGenerator from serve_config.generator unless generator is provided, then serves build_app(...) via uvicorn.

Source code in fastvideo/entrypoints/streaming/server.py
def run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None:
    """Launch the streaming server.

    Boots a :class:`fastvideo.VideoGenerator` from
    ``serve_config.generator`` unless ``generator`` is provided, then
    serves ``build_app(...)`` via uvicorn.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming server; "
                         "got None. Add a `streaming:` block to your serve config.")

    import uvicorn

    if generator is None:
        from fastvideo import VideoGenerator  # lazy to avoid boot cost

        generator = VideoGenerator.from_pretrained(config=serve_config.generator)
    app = build_app(serve_config, generator)
    uvicorn.run(
        app,
        host=serve_config.server.host,
        port=serve_config.server.port,
    )

Modules

fastvideo.entrypoints.streaming.protocol

JSON WebSocket protocol schemas for the streaming server.

Every control message shares the envelope {"type": <str>, ...}. Pydantic models live here so the server can parse / validate incoming frames and emit well-typed outgoing frames without hand-rolled dicts.

The message catalogue matches the contract in docs/design/server_contracts/streaming.md; additions must land in both places in the same PR.

Classes
fastvideo.entrypoints.streaming.protocol.ContinuationStateSnapshot

Bases: BaseModel

Attributes
fastvideo.entrypoints.streaming.protocol.ContinuationStateSnapshot.state instance-attribute
state: dict[str, Any]

{kind, payload} dict matching :class:fastvideo.api.ContinuationState.

fastvideo.entrypoints.streaming.protocol.MediaInit

Bases: BaseModel

Descriptor for the fMP4 initialization segment that follows.

fastvideo.entrypoints.streaming.protocol.SegmentPromptSource

Bases: BaseModel

Request a new segment using a specific prompt.

fastvideo.entrypoints.streaming.protocol.SessionInitV2

Bases: BaseModel

Opening frame the client sends after the WebSocket handshake.

Attributes
fastvideo.entrypoints.streaming.protocol.SessionInitV2.continuation_state class-attribute instance-attribute
continuation_state: dict[str, Any] | None = None

Optional {kind, payload} dict; hydrated into :class:fastvideo.api.ContinuationState server-side.

fastvideo.entrypoints.streaming.protocol.SnapshotState

Bases: BaseModel

Request the current ContinuationState for export.

Functions
fastvideo.entrypoints.streaming.protocol.parse_client_message
parse_client_message(raw: dict[str, Any]) -> ClientMessage

Parse an incoming WebSocket dict into a typed client message.

Unknown type values raise :class:pydantic.ValidationError; the server handler turns that into an error frame with code="invalid_message".

Source code in fastvideo/entrypoints/streaming/protocol.py
def parse_client_message(raw: dict[str, Any]) -> ClientMessage:
    """Parse an incoming WebSocket dict into a typed client message.

    Unknown ``type`` values raise :class:`pydantic.ValidationError`; the
    server handler turns that into an ``error`` frame with
    ``code="invalid_message"``.
    """
    from pydantic import TypeAdapter

    return TypeAdapter(ClientMessage).validate_python(raw)
fastvideo.entrypoints.streaming.server

Single-generator FastAPI + WebSocket streaming server.

Classes
Functions
fastvideo.entrypoints.streaming.server.build_app
build_app(serve_config: ServeConfig, generator: _GeneratorProto, *, session_store: SessionStore | None = None) -> FastAPI

Build the FastAPI app used by :func:run_server.

Exposed so tests can drive the WebSocket endpoint in-process via starlette.testclient.TestClient(app).websocket_connect(...).

Source code in fastvideo/entrypoints/streaming/server.py
def build_app(
    serve_config: ServeConfig,
    generator: _GeneratorProto,
    *,
    session_store: SessionStore | None = None,
) -> FastAPI:
    """Build the FastAPI app used by :func:`run_server`.

    Exposed so tests can drive the WebSocket endpoint in-process via
    ``starlette.testclient.TestClient(app).websocket_connect(...)``.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming "
                         "server; got None. Add a `streaming:` block to your serve config.")

    sessions = SessionManager(
        segment_cap=serve_config.streaming.generation_segment_cap,
        session_timeout_seconds=serve_config.streaming.session_timeout_seconds,
    )
    state = ServerState(
        serve_config=serve_config,
        generator=generator,
        sessions=sessions,
        session_store=session_store or InMemorySessionStore(),
    )

    app = FastAPI(title="FastVideo Streaming")

    @app.get("/health")
    async def _health() -> JSONResponse:
        return JSONResponse({
            "status": "ok",
            "sessions": len(state.sessions),
            "stream_mode": state.serve_config.streaming.stream_mode,
        })

    @app.websocket("/v1/stream")
    async def _stream(websocket: WebSocket) -> None:
        await websocket.accept()
        try:
            session = state.sessions.create()
        except SessionRejected as exc:
            await _send_error(websocket, "session_rejected", str(exc), retryable=False)
            await websocket.close(code=_WS_CLOSE_TRY_AGAIN_LATER, reason="session_rejected")
            return

        try:
            await _handle_session(websocket, session, state)
        except WebSocketDisconnect:
            logger.info("session %s: client disconnected", session.id[:8])
        except Exception:  # pragma: no cover - defensive catch-all
            logger.exception("session %s: unhandled error", session.id[:8])
            with contextlib.suppress(InvalidSessionTransition):
                session.transition(SessionState.ERROR)
        finally:
            _cleanup_session(session, state)

    app.state.server_state = state
    return app
fastvideo.entrypoints.streaming.server.run_server
run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None

Launch the streaming server.

Boots a :class:fastvideo.VideoGenerator from serve_config.generator unless generator is provided, then serves build_app(...) via uvicorn.

Source code in fastvideo/entrypoints/streaming/server.py
def run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None:
    """Launch the streaming server.

    Boots a :class:`fastvideo.VideoGenerator` from
    ``serve_config.generator`` unless ``generator`` is provided, then
    serves ``build_app(...)`` via uvicorn.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming server; "
                         "got None. Add a `streaming:` block to your serve config.")

    import uvicorn

    if generator is None:
        from fastvideo import VideoGenerator  # lazy to avoid boot cost

        generator = VideoGenerator.from_pretrained(config=serve_config.generator)
    app = build_app(serve_config, generator)
    uvicorn.run(
        app,
        host=serve_config.server.host,
        port=serve_config.server.port,
    )
fastvideo.entrypoints.streaming.session

Per-connection session lifecycle for the streaming server.

Each WebSocket opens exactly one :class:Session. :class:SessionManager enforces the generation_segment_cap and session_timeout_seconds budgets from :class:fastvideo.api.StreamingConfig.

Classes
fastvideo.entrypoints.streaming.session.InvalidSessionTransition

Bases: RuntimeError

Raised when a session is asked to transition along an illegal edge.

fastvideo.entrypoints.streaming.session.Session dataclass
Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())
Functions
fastvideo.entrypoints.streaming.session.Session.transition
transition(target: SessionState) -> None

Move to target if the edge is allowed.

Raises :class:InvalidSessionTransition on illegal moves. The self-loop on ACTIVE is legal so the server can re-assert ACTIVE on segment completion without special casing.

Source code in fastvideo/entrypoints/streaming/session.py
def transition(self, target: SessionState) -> None:
    """Move to ``target`` if the edge is allowed.

    Raises :class:`InvalidSessionTransition` on illegal moves. The
    self-loop on ``ACTIVE`` is legal so the server can re-assert
    ACTIVE on segment completion without special casing.
    """
    allowed = _VALID_TRANSITIONS.get(self.state, frozenset())
    if target not in allowed and target is not self.state:
        raise InvalidSessionTransition(f"{self.state.value} -> {target.value} is not a valid "
                                       f"session transition")
    self.state = target
    self.last_activity = time.monotonic()
fastvideo.entrypoints.streaming.session.SessionManager
SessionManager(*, segment_cap: int, session_timeout_seconds: int, max_sessions: int = 1)

Registers sessions and enforces per-server session limits.

Source code in fastvideo/entrypoints/streaming/session.py
def __init__(
    self,
    *,
    segment_cap: int,
    session_timeout_seconds: int,
    max_sessions: int = 1,
) -> None:
    self._segment_cap = segment_cap
    self._session_timeout_seconds = session_timeout_seconds
    self._max_sessions = max_sessions
    self._sessions: dict[str, Session] = {}
Functions
fastvideo.entrypoints.streaming.session.SessionManager.reap_timed_out
reap_timed_out(now: float | None = None) -> list[str]

Return the ids of sessions that have exceeded the idle timeout.

The caller is responsible for actually closing them — this method only identifies dead sessions so the server can emit session_timeout frames before dropping the WebSocket.

TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.

Source code in fastvideo/entrypoints/streaming/session.py
def reap_timed_out(self, now: float | None = None) -> list[str]:
    """Return the ids of sessions that have exceeded the idle timeout.

    The caller is responsible for actually closing them — this
    method only *identifies* dead sessions so the server can emit
    ``session_timeout`` frames before dropping the WebSocket.

    TODO: unused until a background driver calls it. Per-connection
    idle enforcement currently happens via asyncio.wait_for on
    receive_json; this helper catches sessions stuck before any
    receive (e.g. future QUEUED state) and is expected to be wired
    into the GPU-pool reaper.
    """
    now = now if now is not None else time.monotonic()
    dead: list[str] = []
    for sid, session in self._sessions.items():
        if session.state in {
                SessionState.COMPLETE,
                SessionState.ERROR,
                SessionState.TIMEOUT,
                SessionState.REJECTED,
        }:
            continue
        if now - session.last_activity > self._session_timeout_seconds:
            dead.append(sid)
    return dead
fastvideo.entrypoints.streaming.session.SessionRejected

Bases: RuntimeError

Raised when session creation fails (queue full, auth, etc.).

fastvideo.entrypoints.streaming.session.SessionState

Bases: Enum

State-machine positions for a streaming session.

Transitions are server-owned. See docs/design/server_contracts/streaming.md for the full diagram.

fastvideo.entrypoints.streaming.session_init_image

Persist the initial-image blob attached to a streaming session.

Classes
fastvideo.entrypoints.streaming.session_init_image.SessionInitImage dataclass
SessionInitImage(path: str, display_name: str, mime: str)

Location of the persisted init image.

Callers pass path to InputConfig.image_path; display_name is only used for logs.

Functions
fastvideo.entrypoints.streaming.session_init_image.persist_session_init_image
persist_session_init_image(payload: Any, *, output_dir: str | None = None) -> SessionInitImage | None

Decode a client init-image blob and persist it to disk.

payload shape (matches the internal UI protocol)::

{
    "mime": "image/png",
    "name": "ref.png",
    "data": "<base64 bytes>",
}

Returns None when payload is falsy (no init image). Raises :class:ValueError on schema / size / decode errors so the caller can surface a user-facing error frame.

Source code in fastvideo/entrypoints/streaming/session_init_image.py
def persist_session_init_image(
    payload: Any,
    *,
    output_dir: str | None = None,
) -> SessionInitImage | None:
    """Decode a client init-image blob and persist it to disk.

    ``payload`` shape (matches the internal UI protocol)::

        {
            "mime": "image/png",
            "name": "ref.png",
            "data": "<base64 bytes>",
        }

    Returns ``None`` when ``payload`` is falsy (no init image). Raises
    :class:`ValueError` on schema / size / decode errors so the caller
    can surface a user-facing ``error`` frame.
    """
    if not payload:
        return None
    if not isinstance(payload, dict):
        raise ValueError("session init image must be an object")

    mime = payload.get("mime")
    if mime not in _ACCEPTED_MIMES:
        raise ValueError(f"session init image mime {mime!r} is not one of "
                         f"{sorted(_ACCEPTED_MIMES)}")
    data_b64 = payload.get("data")
    if not isinstance(data_b64, str):
        raise ValueError("session init image data must be a base64 string")
    try:
        data = base64.b64decode(data_b64, validate=True)
    except (binascii.Error, ValueError) as exc:
        raise ValueError(f"session init image data is not valid base64: {exc}") from exc
    if len(data) > _MAX_IMAGE_BYTES:
        raise ValueError(f"session init image is {len(data)} bytes; limit is "
                         f"{_MAX_IMAGE_BYTES}")
    if len(data) == 0:
        raise ValueError("session init image data is empty")

    ext = _ACCEPTED_MIMES[mime]
    display_name = _sanitize_display_name(payload.get("name")) or f"init{ext}"
    fd, path = tempfile.mkstemp(prefix="fastvideo-init-", suffix=ext, dir=output_dir)
    try:
        with os.fdopen(fd, "wb") as f:
            f.write(data)
    except Exception:
        with contextlib.suppress(FileNotFoundError):
            os.unlink(path)
        raise
    return SessionInitImage(path=path, display_name=display_name, mime=mime)
fastvideo.entrypoints.streaming.session_store

Session state store for the FastVideo streaming server.

The streaming server keeps continuation state (decoded frames + audio latents from the previous segment) server-side so the client doesn't re-upload multi-megabyte tensors each WebSocket message. Two operations are needed:

  • snapshot(session_id) -> ContinuationState — serialize the current state so it can be exported (e.g. over HTTP) or migrated to a different server.
  • hydrate(state) -> session_id — load a previously serialized state into a new session (for resume-after-disconnect flows).

The store is an ABC with an :class:InMemorySessionStore default; Redis or other backends can drop in without touching the pipeline.

Large tensor payloads (video frames, audio latents) are kept out of the JSON payload via an accompanying :class:BlobStore. Both stores share a process today; they are separate types so that a future implementation can put blobs on S3 while keeping session metadata in Redis.

Classes
fastvideo.entrypoints.streaming.session_store.BlobStore

Bases: ABC

Opaque byte-blob storage keyed by id.

A :class:ContinuationState payload can reference large tensors stored in a :class:BlobStore rather than inlining them, so the JSON payload stays small when the state travels over the wire.

Functions
fastvideo.entrypoints.streaming.session_store.BlobStore.drop abstractmethod
drop(blob_id: str) -> None

Remove a blob. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, blob_id: str) -> None:
    """Remove a blob. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.session_store.BlobStore.get abstractmethod
get(blob_id: str) -> bytes

Load a previously stored blob. Raises KeyError if absent.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def get(self, blob_id: str) -> bytes:
    """Load a previously stored blob. Raises ``KeyError`` if absent."""
fastvideo.entrypoints.streaming.session_store.BlobStore.put abstractmethod
put(data: bytes, *, mime: str = 'application/octet-stream') -> str

Store data and return a blob id for later retrieval.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def put(self, data: bytes, *, mime: str = "application/octet-stream") -> str:
    """Store ``data`` and return a blob id for later retrieval."""
fastvideo.entrypoints.streaming.session_store.InMemoryBlobStore
InMemoryBlobStore()

Bases: BlobStore

Thread-safe in-memory :class:BlobStore for single-process servers.

No eviction policy — callers are responsible for calling :meth:drop when a blob's owning state is replaced or a session ends. A redis- or filesystem-backed :class:BlobStore should replace this when the streaming server lands as a real service (PR 7.5+).

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._blobs: dict[str, _BlobRecord] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.session_store.InMemorySessionStore
InMemorySessionStore()

Bases: SessionStore

Thread-safe in-memory :class:SessionStore.

Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.

No eviction / TTL / bounded capacity — sessions only leave via :meth:drop. The live streaming server (PR 7.5+) is responsible for bounding growth and for dropping any :class:BlobStore blobs referenced by a state when that state is replaced or a session ends; this class does not know about blobs.

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._sessions: dict[str, ContinuationState] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.session_store.SessionStore

Bases: ABC

Keyed store for per-session continuation state.

Implementations own the session-id → state mapping. The streaming server calls :meth:store after each segment and :meth:snapshot when a client explicitly asks for an exportable state handle.

Functions
fastvideo.entrypoints.streaming.session_store.SessionStore.drop abstractmethod
drop(session_id: str) -> None

Forget a session. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, session_id: str) -> None:
    """Forget a session. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.session_store.SessionStore.hydrate abstractmethod
hydrate(state: ContinuationState, *, session_id: str | None = None) -> str

Install state as the starting point for a session.

When session_id is None the store allocates a fresh id (UUID4); when provided the store uses it verbatim, overwriting any prior state at that id.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def hydrate(
    self,
    state: ContinuationState,
    *,
    session_id: str | None = None,
) -> str:
    """Install ``state`` as the starting point for a session.

    When ``session_id`` is ``None`` the store allocates a fresh id
    (UUID4); when provided the store uses it verbatim, overwriting
    any prior state at that id.
    """
fastvideo.entrypoints.streaming.session_store.SessionStore.snapshot abstractmethod
snapshot(session_id: str) -> ContinuationState | None

Return the current state for session_id (or None).

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def snapshot(self, session_id: str) -> ContinuationState | None:
    """Return the current state for ``session_id`` (or ``None``)."""
fastvideo.entrypoints.streaming.session_store.SessionStore.store abstractmethod
store(session_id: str, state: ContinuationState) -> None

Persist state for session_id, replacing any prior value.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def store(self, session_id: str, state: ContinuationState) -> None:
    """Persist ``state`` for ``session_id``, replacing any prior value."""
fastvideo.entrypoints.streaming.stream

fMP4 stream encoder used by the streaming server.

The client's Media Source Extensions player needs a continuous fMP4 byte stream: first an initialization segment (ftyp + moov), then one or more media segments (moof + mdat). We pipe raw RGB frames into an ffmpeg subprocess configured for fragmented output via -movflags empty_moov+default_base_moof+frag_keyframe+faststart and stream the bytes back out.

Classes
fastvideo.entrypoints.streaming.stream.FragmentedMP4Chunk dataclass
FragmentedMP4Chunk(kind: Literal['init', 'media'], data: bytes, stream_id: str, segment_idx: int)

A single fMP4 byte chunk emitted by :class:FragmentedMP4Encoder.

kind identifies whether the chunk is the init segment (must be fed into the client's SourceBuffer first) or a media fragment.

fastvideo.entrypoints.streaming.stream.FragmentedMP4Encoder
FragmentedMP4Encoder(*, width: int, height: int, fps: int, segment_idx: int, stream_id: str | None = None, ffmpeg_path: str = 'ffmpeg', preset: str = 'ultrafast', pixel_format_out: str = 'yuv420p', extra_args: list[str] | None = None)

Stream RGB frames in, fMP4 chunks out.

One encoder covers one segment. The server creates a new encoder per :class:`ltx2_segment_start`` boundary so each segment becomes one media fragment the client can append independently.

Example::

encoder = FragmentedMP4Encoder(width=1024, height=576, fps=24,
                                segment_idx=0)
async with encoder:
    async for chunk in encoder.encode(frames):
        await websocket.send_bytes(chunk.data)
Source code in fastvideo/entrypoints/streaming/stream.py
def __init__(
    self,
    *,
    width: int,
    height: int,
    fps: int,
    segment_idx: int,
    stream_id: str | None = None,
    ffmpeg_path: str = "ffmpeg",
    preset: str = "ultrafast",
    pixel_format_out: str = "yuv420p",
    extra_args: list[str] | None = None,
) -> None:
    self.width = width
    self.height = height
    self.fps = fps
    self.segment_idx = segment_idx
    self.stream_id = stream_id or uuid.uuid4().hex
    self._ffmpeg_path = ffmpeg_path
    self._preset = preset
    self._pixel_format_out = pixel_format_out
    self._extra_args = list(extra_args or [])
    self._proc: subprocess.Popen | None = None
    self._init_emitted = False
Functions
fastvideo.entrypoints.streaming.stream.FragmentedMP4Encoder.encode async
encode(frames: list[ndarray] | AsyncIterator[ndarray]) -> AsyncIterator[FragmentedMP4Chunk]

Feed frames into ffmpeg and yield fMP4 chunks as they appear.

Source code in fastvideo/entrypoints/streaming/stream.py
async def encode(
    self,
    frames: list[np.ndarray] | AsyncIterator[np.ndarray],
) -> AsyncIterator[FragmentedMP4Chunk]:
    """Feed frames into ffmpeg and yield fMP4 chunks as they appear."""
    if self._proc is None:
        self._spawn()
    assert self._proc is not None and self._proc.stdin is not None
    proc = self._proc

    loop = asyncio.get_running_loop()

    async def _writer() -> None:
        try:
            if hasattr(frames, "__aiter__"):
                async for frame in frames:  # type: ignore[union-attr]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
            else:
                for frame in frames:  # type: ignore[assignment]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
        finally:
            with contextlib.suppress(BrokenPipeError):
                proc.stdin.close()

    writer_task = asyncio.create_task(_writer())
    try:
        reader = proc.stdout
        assert reader is not None
        # Read in reasonably-sized chunks; MSE tolerates any size
        # but we don't want to starve the event loop.
        chunk_size = 64 * 1024
        while True:
            data = await loop.run_in_executor(None, reader.read, chunk_size)
            if not data:
                break
            kind: Literal["init", "media"] = "init" if not self._init_emitted else "media"
            self._init_emitted = True
            yield FragmentedMP4Chunk(
                kind=kind,
                data=bytes(data),
                stream_id=self.stream_id,
                segment_idx=self.segment_idx,
            )
    finally:
        await writer_task

fastvideo.entrypoints.streaming_generator

Classes

fastvideo.entrypoints.streaming_generator.StreamingVideoGenerator
StreamingVideoGenerator(fastvideo_args: FastVideoArgs, executor_class: type[Executor], log_stats: bool, use_queue_mode: bool = True)

Bases: VideoGenerator

This class extends VideoGenerator with streaming capabilities, allowing incremental video generation with step-by-step control.

Source code in fastvideo/entrypoints/streaming_generator.py
def __init__(self,
             fastvideo_args: FastVideoArgs,
             executor_class: type[Executor],
             log_stats: bool,
             use_queue_mode: bool = True):
    super().__init__(fastvideo_args, executor_class, log_stats)
    self.accumulated_frames: list[np.ndarray] = []
    self.sampling_param: SamplingParam | None = None
    self.batch: ForwardBatch | None = None
    self._use_queue_mode = use_queue_mode and isinstance(self.executor, MultiprocExecutor)
    self.writer: IncrementalVideoWriter | None = None
    self.block_dir: str | None = None
    self.block_idx: int = 0

Functions

fastvideo.entrypoints.video_generator

VideoGenerator module for FastVideo.

This module provides a consolidated interface for generating videos using diffusion models.

Classes

fastvideo.entrypoints.video_generator.VideoGenerator
VideoGenerator(fastvideo_args: FastVideoArgs, executor_class: type[Executor], log_stats: bool, *, log_queue=None)

A unified class for generating videos using diffusion models.

This class provides a simple interface for video generation with rich customization options, similar to popular frameworks like HF Diffusers.

Initialize the video generator.

Parameters:

Name Type Description Default
fastvideo_args FastVideoArgs

The inference arguments

required
executor_class type[Executor]

The executor class to use for inference

required
log_stats bool

Whether to log statistics

required
log_queue

Optional multiprocessing.Queue to forward worker logs to

None
Source code in fastvideo/entrypoints/video_generator.py
def __init__(
    self,
    fastvideo_args: FastVideoArgs,
    executor_class: type[Executor],
    log_stats: bool,
    *,
    log_queue=None,
):
    """
    Initialize the video generator.

    Args:
        fastvideo_args: The inference arguments
        executor_class: The executor class to use for inference
        log_stats: Whether to log statistics
        log_queue: Optional multiprocessing.Queue to forward worker logs to
    """
    self.config: GeneratorConfig | None = None
    self.fastvideo_args = fastvideo_args
    self.executor = executor_class(fastvideo_args, log_queue=log_queue)
Functions
fastvideo.entrypoints.video_generator.VideoGenerator.from_fastvideo_args classmethod
from_fastvideo_args(fastvideo_args: FastVideoArgs, *, log_queue=None) -> VideoGenerator

Create a video generator with the specified arguments.

Parameters:

Name Type Description Default
fastvideo_args FastVideoArgs

The inference arguments

required
log_queue

Optional multiprocessing.Queue to forward worker logs to

None

Returns:

Type Description
VideoGenerator

The created video generator

Source code in fastvideo/entrypoints/video_generator.py
@classmethod
def from_fastvideo_args(
    cls,
    fastvideo_args: FastVideoArgs,
    *,
    log_queue=None,
) -> "VideoGenerator":
    """
    Create a video generator with the specified arguments.

    Args:
        fastvideo_args: The inference arguments
        log_queue: Optional multiprocessing.Queue to forward worker logs to

    Returns:
        The created video generator
    """
    # Initialize distributed environment if needed
    # initialize_distributed_and_parallelism(fastvideo_args)

    executor_class = Executor.get_class(fastvideo_args)
    return cls(
        fastvideo_args=fastvideo_args,
        executor_class=executor_class,
        log_stats=False,  # TODO: implement
        log_queue=log_queue,
    )
fastvideo.entrypoints.video_generator.VideoGenerator.from_pretrained classmethod
from_pretrained(model_path: str | GeneratorConfig | Mapping[str, Any] | None = None, **kwargs) -> VideoGenerator

Create a video generator from a pretrained model.

Parameters:

Name Type Description Default
model_path str | GeneratorConfig | Mapping[str, Any] | None

Path or identifier for the pretrained model

None
pipeline_config

Pipeline config to use for inference

required
**kwargs

Additional arguments to customize model loading, set any FastVideoArgs or PipelineConfig attributes here.

{}

Returns:

Type Description
VideoGenerator

The created video generator

Priority level: Default pipeline config < User's pipeline config < User's kwargs

Stable convenience kwargs remain supported here for common engine and offload settings. Advanced model- or pipeline-specific options should move to VideoGenerator.from_config(...).

Source code in fastvideo/entrypoints/video_generator.py
@classmethod
def from_pretrained(
    cls,
    model_path: str | GeneratorConfig | Mapping[str, Any] | None = None,
    **kwargs,
) -> "VideoGenerator":
    """
    Create a video generator from a pretrained model.

    Args:
        model_path: Path or identifier for the pretrained model
        pipeline_config: Pipeline config to use for inference
        **kwargs: Additional arguments to customize model loading, set any FastVideoArgs or PipelineConfig attributes here.

    Returns:
        The created video generator

    Priority level: Default pipeline config < User's pipeline config < User's kwargs

    Stable convenience kwargs remain supported here for common engine and
    offload settings. Advanced model- or pipeline-specific options should
    move to VideoGenerator.from_config(...).
    """
    log_queue = kwargs.pop("log_queue", None)
    typed_config = kwargs.pop("config", None)
    if typed_config is not None:
        if model_path is not None:
            raise TypeError("Pass either model_path or config to from_pretrained, not both")
        if kwargs:
            unexpected = ", ".join(sorted(kwargs))
            raise TypeError(f"Unexpected keyword arguments with config: {unexpected}")
        return cls.from_config(typed_config, log_queue=log_queue)

    if isinstance(model_path, GeneratorConfig | Mapping):
        if kwargs:
            unexpected = ", ".join(sorted(kwargs))
            raise TypeError(f"Unexpected keyword arguments with typed config: {unexpected}")
        return cls.from_config(model_path, log_queue=log_queue)

    if model_path is None:
        raise TypeError("model_path or config is required")

    legacy_only_kwargs = sorted(set(kwargs) - _FROM_PRETRAINED_CONVENIENCE_KWARGS)
    if legacy_only_kwargs:
        warnings.warn(
            "VideoGenerator.from_pretrained(...) received legacy-only kwargs "
            f"({', '.join(legacy_only_kwargs)}); prefer VideoGenerator.from_config(...) "
            "for advanced configuration.",
            DeprecationWarning,
            stacklevel=2,
        )
    return cls.from_config(
        legacy_from_pretrained_to_config(model_path, kwargs),
        log_queue=log_queue,
    )
fastvideo.entrypoints.video_generator.VideoGenerator.generate
generate(request: GenerationRequest | Mapping[str, Any], *, log_queue=None) -> GenerationResult | list[GenerationResult]

Generate video or image outputs from a typed inference request.

Parameters:

Name Type Description Default
request GenerationRequest | Mapping[str, Any]

A GenerationRequest instance or a mapping that can be parsed into one. This is the primary public inference entrypoint for the typed API.

required
log_queue

Optional multiprocessing.Queue to forward worker logs to during this request.

None

Returns:

Type Description
GenerationResult | list[GenerationResult]

A GenerationResult for single-request generation, or a list of

GenerationResult | list[GenerationResult]

GenerationResult objects when the request expands into multiple

GenerationResult | list[GenerationResult]

prompts.

Source code in fastvideo/entrypoints/video_generator.py
def generate(
    self,
    request: GenerationRequest | Mapping[str, Any],
    *,
    log_queue=None,
) -> GenerationResult | list[GenerationResult]:
    """
    Generate video or image outputs from a typed inference request.

    Args:
        request: A `GenerationRequest` instance or a mapping that can be
            parsed into one. This is the primary public inference
            entrypoint for the typed API.
        log_queue: Optional multiprocessing.Queue to forward worker logs to
            during this request.

    Returns:
        A `GenerationResult` for single-request generation, or a list of
        `GenerationResult` objects when the request expands into multiple
        prompts.
    """
    normalized_request = normalize_generation_request(request)
    if log_queue:
        self.executor.set_log_queue(log_queue)

    try:
        return self._generate_request_impl(normalized_request)
    finally:
        if log_queue:
            self.executor.clear_log_queue()
fastvideo.entrypoints.video_generator.VideoGenerator.generate_video
generate_video(prompt: str | None = None, sampling_param: SamplingParam | None = None, mouse_cond: Tensor | None = None, keyboard_cond: Tensor | None = None, grid_sizes: tuple[int, int, int] | list[int] | Tensor | None = None, **kwargs) -> dict[str, Any] | list[dict[str, Any]]

Generate a video based on the given prompt.

Parameters:

Name Type Description Default
prompt str | None

The prompt to use for generation (optional if prompt_txt is provided)

None
negative_prompt

The negative prompt to use (overrides the one in fastvideo_args)

required
output_path

Path to save the video (overrides the one in fastvideo_args)

required
prompt_path

Path to prompt file

required
save_video

Whether to save the video to disk

required
return_frames

Whether to include raw frames in the result dict

required
num_inference_steps

Number of denoising steps (overrides fastvideo_args)

required
guidance_scale

Classifier-free guidance scale (overrides fastvideo_args)

required
num_frames

Number of frames to generate (overrides fastvideo_args)

required
height

Height of generated video (overrides fastvideo_args)

required
width

Width of generated video (overrides fastvideo_args)

required
fps

Frames per second for saved video (overrides fastvideo_args)

required
seed

Random seed for generation (overrides fastvideo_args)

required
callback

Callback function called after each step

required
callback_steps

Number of steps between each callback

required

Returns:

Type Description
dict[str, Any] | list[dict[str, Any]]

A metadata dictionary for single-prompt generation, or a list of

dict[str, Any] | list[dict[str, Any]]

metadata dictionaries for prompt-file batch generation.

Source code in fastvideo/entrypoints/video_generator.py
def generate_video(
    self,
    prompt: str | None = None,
    sampling_param: SamplingParam | None = None,
    # Action control inputs (Matrix-Game)
    mouse_cond: torch.Tensor | None = None,
    keyboard_cond: torch.Tensor | None = None,
    grid_sizes: tuple[int, int, int] | list[int] | torch.Tensor
    | None = None,
    **kwargs,
) -> dict[str, Any] | list[dict[str, Any]]:
    """
    Generate a video based on the given prompt.

    Args:
        prompt: The prompt to use for generation (optional if prompt_txt is provided)
        negative_prompt: The negative prompt to use (overrides the one in fastvideo_args)
        output_path: Path to save the video (overrides the one in fastvideo_args)
        prompt_path: Path to prompt file
        save_video: Whether to save the video to disk
        return_frames: Whether to include raw frames in the result dict
        num_inference_steps: Number of denoising steps (overrides fastvideo_args)
        guidance_scale: Classifier-free guidance scale (overrides fastvideo_args)
        num_frames: Number of frames to generate (overrides fastvideo_args)
        height: Height of generated video (overrides fastvideo_args)
        width: Width of generated video (overrides fastvideo_args)
        fps: Frames per second for saved video (overrides fastvideo_args)
        seed: Random seed for generation (overrides fastvideo_args)
        callback: Callback function called after each step
        callback_steps: Number of steps between each callback

    Returns:
        A metadata dictionary for single-prompt generation, or a list of
        metadata dictionaries for prompt-file batch generation.
    """
    log_queue = kwargs.pop("log_queue", None)
    warnings.warn(
        "VideoGenerator.generate_video(...) is deprecated; use "
        "VideoGenerator.generate(request=...) instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    if log_queue:
        self.executor.set_log_queue(log_queue)

    try:
        return self._generate_video_impl(
            prompt=prompt,
            sampling_param=sampling_param,
            mouse_cond=mouse_cond,
            keyboard_cond=keyboard_cond,
            grid_sizes=grid_sizes,
            **kwargs,
        )
    finally:
        if log_queue:
            self.executor.clear_log_queue()
fastvideo.entrypoints.video_generator.VideoGenerator.shutdown
shutdown()

Shutdown the video generator.

Source code in fastvideo/entrypoints/video_generator.py
def shutdown(self):
    """
    Shutdown the video generator.
    """
    self.executor.shutdown()
    del self.executor
fastvideo.entrypoints.video_generator.VideoGenerator.unmerge_lora_weights
unmerge_lora_weights() -> None

Use unmerged weights for inference to produce videos that align with validation videos generated during training.

Source code in fastvideo/entrypoints/video_generator.py
def unmerge_lora_weights(self) -> None:
    """
    Use unmerged weights for inference to produce videos that align with 
    validation videos generated during training.
    """
    self.executor.unmerge_lora_weights()

Functions