Skip to content

entrypoints

Modules

fastvideo.entrypoints.cli

Modules

fastvideo.entrypoints.cli.bench

Runs benchmark against a running FastVideo OpenAI-compatible server.

Example usage

fastvideo bench --dataset vbench --num-prompts 20 --port 8000

Classes
fastvideo.entrypoints.cli.bench.BenchSubcommand
BenchSubcommand()

Bases: CLISubcommand

The bench subcommand — runs serving benchmarks.

Source code in fastvideo/entrypoints/cli/bench.py
def __init__(self) -> None:
    self.name = "bench"
    super().__init__()
Functions:
fastvideo.entrypoints.cli.bench_serving

Benchmark online serving for diffusion models (Image/Video Generation).

Example usage
launch a server and benchmark on it
T2V or T2I or any other multimodal generation model

fastvideo serve --config serve.yaml

benchmark it and make sure the port is the same as the server's port

fastvideo bench --dataset vbench --num-prompts 20 --port 8000

Classes
fastvideo.entrypoints.cli.bench_serving.VBenchDataset
VBenchDataset(args, api_url: str, model: str)

Bases: BaseDataset

Dataset loader for VBench prompts. Supports t2v, i2v.

Source code in fastvideo/entrypoints/cli/bench_serving.py
def __init__(self, args, api_url: str, model: str):
    super().__init__(args, api_url, model)
    self.cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "fastvideo")
    self.items = self._load_data()
Functions:
fastvideo.entrypoints.cli.cli_types
Classes
fastvideo.entrypoints.cli.cli_types.CLISubcommand

Base class for CLI subcommands

Methods:
fastvideo.entrypoints.cli.cli_types.CLISubcommand.cmd
cmd(args: Namespace) -> None

Execute the command with the given arguments

Source code in fastvideo/entrypoints/cli/cli_types.py
def cmd(self, args: argparse.Namespace) -> None:
    """Execute the command with the given arguments"""
    raise NotImplementedError
fastvideo.entrypoints.cli.cli_types.CLISubcommand.subparser_init
subparser_init(subparsers: _SubParsersAction) -> FlexibleArgumentParser

Initialize the subparser for this command

Source code in fastvideo/entrypoints/cli/cli_types.py
def subparser_init(self, subparsers: argparse._SubParsersAction) -> FlexibleArgumentParser:
    """Initialize the subparser for this command"""
    raise NotImplementedError
fastvideo.entrypoints.cli.cli_types.CLISubcommand.validate
validate(args: Namespace) -> None

Validate the arguments for this command

Source code in fastvideo/entrypoints/cli/cli_types.py
def validate(self, args: argparse.Namespace) -> None:
    """Validate the arguments for this command"""
    pass
fastvideo.entrypoints.cli.eval

fastvideo eval CLI: list registered eval metrics and run them against a set of videos.

This is a thin wrapper around :mod:fastvideo.eval. Heavy lifting (metric loading, GPU handling, batching) lives in :func:fastvideo.eval.create_evaluator.

Examples::

fastvideo eval list
fastvideo eval list --group vbench
fastvideo eval run --videos path/to/videos/*.mp4 \
    --metrics common.ssim --reference path/to/refs/
fastvideo eval run --videos clip.mp4 --metrics vbench.aesthetic_quality \
    --output scores.json
Classes
fastvideo.entrypoints.cli.eval.EvalSubcommand
EvalSubcommand()

Bases: CLISubcommand

The eval subcommand — entry point for the eval suite.

Source code in fastvideo/entrypoints/cli/eval.py
def __init__(self) -> None:
    self.name = "eval"
    super().__init__()
Functions:
fastvideo.entrypoints.cli.generate
Classes
fastvideo.entrypoints.cli.generate.GenerateSubcommand
GenerateSubcommand()

Bases: CLISubcommand

The generate subcommand for the FastVideo CLI

Source code in fastvideo/entrypoints/cli/generate.py
def __init__(self) -> None:
    self.name = "generate"
    super().__init__()
Methods:
fastvideo.entrypoints.cli.generate.GenerateSubcommand.validate
validate(args: Namespace) -> None

Validate the arguments for this command

Source code in fastvideo/entrypoints/cli/generate.py
def validate(self, args: argparse.Namespace) -> None:
    """Validate the arguments for this command"""
    if not args.config:
        raise ValueError("fastvideo generate requires --config PATH; use a nested "
                         "run config plus optional dotted overrides")
    if not os.path.exists(args.config):
        raise ValueError(f"Config file not found: {args.config}")
    setattr(
        args,
        _VALIDATED_RUN_CONFIG_ATTR,
        build_generate_run_config(
            args,
            overrides=getattr(args, "_unknown", None),
        ),
    )
Functions:
fastvideo.entrypoints.cli.main
Classes
Functions:
fastvideo.entrypoints.cli.main.cmd_init
cmd_init() -> list[CLISubcommand]

Initialize all commands from separate modules

Source code in fastvideo/entrypoints/cli/main.py
def cmd_init() -> list[CLISubcommand]:
    """Initialize all commands from separate modules"""
    commands = []
    commands.extend(generate_cmd_init())
    commands.extend(serve_cmd_init())
    commands.extend(router_serve_cmd_init())
    commands.extend(bench_cmd_init())
    commands.extend(eval_cmd_init())
    return commands
fastvideo.entrypoints.cli.router_serve

fastvideo router-serve CLI subcommand.

Launches the streaming router from a YAML config. Separate from fastvideo serve because the router is an orthogonal process: it fronts one or more running servers rather than hosting a generator itself.

Classes
fastvideo.entrypoints.cli.router_serve.RouterServeSubcommand
RouterServeSubcommand()

Bases: CLISubcommand

Start the multi-replica WebSocket router.

Source code in fastvideo/entrypoints/cli/router_serve.py
def __init__(self) -> None:
    self.name = "router-serve"
    super().__init__()
Functions:
fastvideo.entrypoints.cli.serve
Classes
fastvideo.entrypoints.cli.serve.ServeSubcommand
ServeSubcommand()

Bases: CLISubcommand

Starts an OpenAI-compatible API server.

Source code in fastvideo/entrypoints/cli/serve.py
def __init__(self) -> None:
    self.name = "serve"
    super().__init__()
Functions:
fastvideo.entrypoints.cli.utils
Functions:
fastvideo.entrypoints.cli.utils.launch_distributed
launch_distributed(num_gpus: int, args: list[str], master_port: int | None = None) -> int

Launch a distributed job with the given arguments

Parameters:

Name Type Description Default
num_gpus int

Number of GPUs to use

required
args list[str]

Arguments to pass to v1_fastvideo_inference.py (defaults to sys.argv[1:])

required
master_port int | None

Port for the master process (default: random)

None
Source code in fastvideo/entrypoints/cli/utils.py
def launch_distributed(num_gpus: int, args: list[str], master_port: int | None = None) -> int:
    """
    Launch a distributed job with the given arguments

    Args:
        num_gpus: Number of GPUs to use
        args: Arguments to pass to v1_fastvideo_inference.py (defaults to sys.argv[1:])
        master_port: Port for the master process (default: random)
    """

    current_env = os.environ.copy()
    python_executable = sys.executable
    project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../.."))
    main_script = os.path.join(project_root, "fastvideo/sample/v1_fastvideo_inference.py")

    cmd = [python_executable, "-m", "torch.distributed.run", f"--nproc_per_node={num_gpus}"]

    if master_port is not None:
        cmd.append(f"--master_port={master_port}")

    cmd.append(main_script)
    cmd.extend(args)

    logger.info("Running inference with %d GPU(s)", num_gpus)
    logger.info("Launching command: %s", " ".join(cmd))

    current_env["PYTHONIOENCODING"] = "utf-8"
    process = subprocess.Popen(cmd,
                               env=current_env,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.STDOUT,
                               universal_newlines=True,
                               bufsize=1,
                               encoding='utf-8',
                               errors='replace')

    if process.stdout:
        for line in iter(process.stdout.readline, ''):
            print(line.strip())

    return process.wait()

fastvideo.entrypoints.openai

Modules

fastvideo.entrypoints.openai.api_server
Classes
Functions:
fastvideo.entrypoints.openai.api_server.create_app
create_app(fastvideo_args: FastVideoArgs, output_dir: str = DEFAULT_OUTPUT_DIR, default_request: GenerationRequest | None = None) -> FastAPI

Build the FastAPI application with all routers mounted

Source code in fastvideo/entrypoints/openai/api_server.py
def create_app(
    fastvideo_args: FastVideoArgs,
    output_dir: str = DEFAULT_OUTPUT_DIR,
    default_request: GenerationRequest | None = None,
) -> FastAPI:
    """Build the FastAPI application with all routers mounted"""

    app = FastAPI(
        title="FastVideo OpenAI-Compatible API",
        version="0.1.0",
        lifespan=lifespan,
    )
    app.state.fastvideo_args = fastvideo_args
    app.state.output_dir = output_dir
    app.state.default_request = default_request

    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

    # Import and mount routers
    from fastvideo.entrypoints.openai.common_api import router as common_router
    from fastvideo.entrypoints.openai.image_api import router as image_router
    from fastvideo.entrypoints.openai.video_api import router as video_router

    app.include_router(common_router)
    app.include_router(video_router)
    app.include_router(image_router)

    @app.get("/health")
    async def health():
        return {"status": "ok"}

    return app
fastvideo.entrypoints.openai.api_server.lifespan async
lifespan(app: FastAPI) -> AsyncIterator[None]

Load model on startup, clean up on shutdown

Source code in fastvideo/entrypoints/openai/api_server.py
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    """Load model on startup, clean up on shutdown"""
    args: FastVideoArgs = app.state.fastvideo_args
    output_dir: str = app.state.output_dir
    default_request: GenerationRequest | None = getattr(app.state, "default_request", None)

    logger.info("Loading model from %s ...", args.model_path)
    generator = VideoGenerator.from_fastvideo_args(args)
    logger.info("Model loaded successfully.")

    set_state(generator, args, output_dir, default_request=default_request)

    yield  # server is running

    logger.info("Shutting down — releasing model resources ...")
    generator.shutdown()
    clear_state()
    logger.info("Shutdown complete.")
fastvideo.entrypoints.openai.api_server.run_server
run_server(fastvideo_args: FastVideoArgs, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, output_dir: str = DEFAULT_OUTPUT_DIR, default_request: GenerationRequest | None = None)

Create the app and run it with uvicorn

Source code in fastvideo/entrypoints/openai/api_server.py
def run_server(
    fastvideo_args: FastVideoArgs,
    host: str = DEFAULT_HOST,
    port: int = DEFAULT_PORT,
    output_dir: str = DEFAULT_OUTPUT_DIR,
    default_request: GenerationRequest | None = None,
):
    """Create the app and run it with uvicorn"""
    if default_request is not None:
        _validate_default_request_against_preset(default_request, fastvideo_args.model_path)

    app = create_app(
        fastvideo_args,
        output_dir=output_dir,
        default_request=default_request,
    )

    logger.info("Starting FastVideo server on %s:%d", host, port)
    logger.info("Model: %s", fastvideo_args.model_path)

    uvicorn.run(
        app,
        host=host,
        port=port,
        log_level="info",
        timeout_keep_alive=300,
    )
fastvideo.entrypoints.openai.common_api
Classes
fastvideo.entrypoints.openai.common_api.ModelCard

Bases: BaseModel

OpenAI-compatible model card

Functions:
fastvideo.entrypoints.openai.common_api.available_models async
available_models()

Show available models

Source code in fastvideo/entrypoints/openai/common_api.py
@router.get("/models", response_class=ORJSONResponse)
async def available_models():
    """Show available models"""
    args = get_server_args()
    card = ModelCard(id=args.model_path, root=args.model_path)
    return {"object": "list", "data": [card.model_dump()]}
fastvideo.entrypoints.openai.common_api.model_info async
model_info()

Get basic model information

Source code in fastvideo/entrypoints/openai/common_api.py
@router.get("/model_info")
async def model_info():
    """Get basic model information"""
    args = get_server_args()
    return {"model_path": args.model_path}
fastvideo.entrypoints.openai.common_api.retrieve_model async
retrieve_model(model: str)

Retrieve a model by name

Source code in fastvideo/entrypoints/openai/common_api.py
@router.get("/models/{model:path}", response_class=ORJSONResponse)
async def retrieve_model(model: str):
    """Retrieve a model by name"""
    args = get_server_args()
    if model != args.model_path:
        return ORJSONResponse(
            status_code=404,
            content={
                "error": {
                    "message": f"The model '{model}' does not exist",
                    "type": "invalid_request_error",
                    "param": "model",
                    "code": "model_not_found",
                }
            },
        )
    card = ModelCard(id=model, root=model)
    return card.model_dump()
fastvideo.entrypoints.openai.image_api
Functions:
fastvideo.entrypoints.openai.protocol
Functions:
fastvideo.entrypoints.openai.protocol.generate_request_id
generate_request_id() -> str

Generate a unique request ID

Source code in fastvideo/entrypoints/openai/protocol.py
def generate_request_id() -> str:
    """Generate a unique request ID"""
    return uuid.uuid4().hex
fastvideo.entrypoints.openai.state

Global server state shared across API modules.

Keeping state in a dedicated module prevents the classic 'main vs package module' duplication that occurs when api_server.py is run with python -m. All modules that need the generator or server args should import from here.

Classes
Functions:
fastvideo.entrypoints.openai.state.clear_state
clear_state() -> None

Clear server state on shutdown.

Source code in fastvideo/entrypoints/openai/state.py
def clear_state() -> None:
    """Clear server state on shutdown."""
    global _generator, _fastvideo_args, _default_request
    _generator = None
    _fastvideo_args = None
    _default_request = None
fastvideo.entrypoints.openai.state.get_default_request
get_default_request() -> GenerationRequest | None

Return the ServeConfig.default_request set at startup, if any.

Source code in fastvideo/entrypoints/openai/state.py
def get_default_request() -> GenerationRequest | None:
    """Return the ServeConfig.default_request set at startup, if any."""
    return _default_request
fastvideo.entrypoints.openai.state.get_generator
get_generator() -> VideoGenerator

Return the global VideoGenerator instance (set during startup).

Source code in fastvideo/entrypoints/openai/state.py
def get_generator() -> VideoGenerator:
    """Return the global VideoGenerator instance (set during startup)."""
    assert _generator is not None, "Server not initialized — generator is None"
    return _generator
fastvideo.entrypoints.openai.state.get_output_dir
get_output_dir() -> str

Return the configured output directory.

Source code in fastvideo/entrypoints/openai/state.py
def get_output_dir() -> str:
    """Return the configured output directory."""
    return _output_dir
fastvideo.entrypoints.openai.state.get_server_args
get_server_args() -> FastVideoArgs

Return the global FastVideoArgs (set during startup).

Source code in fastvideo/entrypoints/openai/state.py
def get_server_args() -> FastVideoArgs:
    """Return the global FastVideoArgs (set during startup)."""
    assert _fastvideo_args is not None, "Server not initialized — args is None"
    return _fastvideo_args
fastvideo.entrypoints.openai.state.set_state
set_state(generator: VideoGenerator, fastvideo_args: FastVideoArgs, output_dir: str, default_request: GenerationRequest | None = None) -> None

Set all server state at once (called from lifespan).

Source code in fastvideo/entrypoints/openai/state.py
def set_state(
    generator: VideoGenerator,
    fastvideo_args: FastVideoArgs,
    output_dir: str,
    default_request: GenerationRequest | None = None,
) -> None:
    """Set all server state at once (called from lifespan)."""
    global _generator, _fastvideo_args, _output_dir, _default_request
    _generator = generator
    _fastvideo_args = fastvideo_args
    _output_dir = output_dir
    _default_request = default_request
fastvideo.entrypoints.openai.stores
Classes
fastvideo.entrypoints.openai.stores.AsyncDictStore
AsyncDictStore()

A small async-safe in-memory key-value store for dict items.

This encapsulates the usual pattern of a module-level dict guarded by an asyncio.Lock and provides simple CRUD methods that are safe to call concurrently from FastAPI request handlers and background tasks.

Source code in fastvideo/entrypoints/openai/stores.py
def __init__(self) -> None:
    self._items: dict[str, dict[str, Any]] = {}
    self._lock = asyncio.Lock()
fastvideo.entrypoints.openai.utils
Functions:
fastvideo.entrypoints.openai.utils.choose_image_ext
choose_image_ext(output_format: str | None, background: str | None) -> str

Pick a file extension for image outputs

Source code in fastvideo/entrypoints/openai/utils.py
def choose_image_ext(output_format: str | None, background: str | None) -> str:
    """Pick a file extension for image outputs"""
    fmt = (output_format or "").lower()
    if fmt in {"png", "webp", "jpeg", "jpg"}:
        return "jpg" if fmt == "jpeg" else fmt
    if (background or "auto").lower() == "transparent":
        return "png"
    return "jpg"
fastvideo.entrypoints.openai.utils.merge_image_input_list
merge_image_input_list(*inputs: list | Any | None) -> list

Merge multiple image input sources into a single flat list

Source code in fastvideo/entrypoints/openai/utils.py
def merge_image_input_list(*inputs: list | Any | None) -> list:
    """Merge multiple image input sources into a single flat list"""
    result = []
    for input_item in inputs:
        if input_item is not None:
            if isinstance(input_item, list):
                result.extend(input_item)
            else:
                result.append(input_item)
    return result
fastvideo.entrypoints.openai.utils.parse_size
parse_size(size: str) -> tuple[int, int] | tuple[None, None]

Parse a 'WIDTHxHEIGHT' string into (width, height)

Source code in fastvideo/entrypoints/openai/utils.py
def parse_size(size: str) -> tuple[int, int] | tuple[None, None]:
    """Parse a 'WIDTHxHEIGHT' string into (width, height)"""
    try:
        parts = size.lower().replace(" ", "").split("x")
        if len(parts) != 2:
            raise ValueError
        w, h = int(parts[0]), int(parts[1])
        return w, h
    except Exception:
        return None, None
fastvideo.entrypoints.openai.utils.save_image_to_path async
save_image_to_path(image: UploadFile | str, target_path: str) -> str

Save an uploaded file or download from URL to target_path

Source code in fastvideo/entrypoints/openai/utils.py
async def save_image_to_path(image: UploadFile | str, target_path: str) -> str:
    """Save an uploaded file or download from URL to *target_path*"""
    input_path = await _maybe_url_image(image, target_path)
    if input_path is None:
        input_path = await _save_upload_to_path(image, target_path)
    return input_path
fastvideo.entrypoints.openai.video_api
Functions:

fastvideo.entrypoints.streaming

Classes

fastvideo.entrypoints.streaming.BlobStore

Bases: ABC

Opaque byte-blob storage keyed by id.

A :class:ContinuationState payload can reference large tensors stored in a :class:BlobStore rather than inlining them, so the JSON payload stays small when the state travels over the wire.

Methods:
fastvideo.entrypoints.streaming.BlobStore.drop abstractmethod
drop(blob_id: str) -> None

Remove a blob. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, blob_id: str) -> None:
    """Remove a blob. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.BlobStore.get abstractmethod
get(blob_id: str) -> bytes

Load a previously stored blob. Raises KeyError if absent.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def get(self, blob_id: str) -> bytes:
    """Load a previously stored blob. Raises ``KeyError`` if absent."""
fastvideo.entrypoints.streaming.BlobStore.put abstractmethod
put(data: bytes, *, mime: str = 'application/octet-stream') -> str

Store data and return a blob id for later retrieval.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def put(self, data: bytes, *, mime: str = "application/octet-stream") -> str:
    """Store ``data`` and return a blob id for later retrieval."""
fastvideo.entrypoints.streaming.FragmentedMP4Chunk dataclass
FragmentedMP4Chunk(kind: Literal['init', 'media'], data: bytes, stream_id: str, segment_idx: int)

A single fMP4 byte chunk emitted by :class:FragmentedMP4Encoder.

kind identifies whether the chunk is the init segment (must be fed into the client's SourceBuffer first) or a media fragment.

fastvideo.entrypoints.streaming.FragmentedMP4Encoder
FragmentedMP4Encoder(*, width: int, height: int, fps: int, segment_idx: int, stream_id: str | None = None, ffmpeg_path: str = 'ffmpeg', preset: str = 'ultrafast', pixel_format_out: str = 'yuv420p', extra_args: list[str] | None = None)

Stream RGB frames in, fMP4 chunks out.

One encoder covers one segment. The server creates a new encoder per :class:`ltx2_segment_start`` boundary so each segment becomes one media fragment the client can append independently.

Example::

encoder = FragmentedMP4Encoder(width=1024, height=576, fps=24,
                                segment_idx=0)
async with encoder:
    async for chunk in encoder.encode(frames):
        await websocket.send_bytes(chunk.data)
Source code in fastvideo/entrypoints/streaming/stream.py
def __init__(
    self,
    *,
    width: int,
    height: int,
    fps: int,
    segment_idx: int,
    stream_id: str | None = None,
    ffmpeg_path: str = "ffmpeg",
    preset: str = "ultrafast",
    pixel_format_out: str = "yuv420p",
    extra_args: list[str] | None = None,
) -> None:
    self.width = width
    self.height = height
    self.fps = fps
    self.segment_idx = segment_idx
    self.stream_id = stream_id or uuid.uuid4().hex
    self._ffmpeg_path = ffmpeg_path
    self._preset = preset
    self._pixel_format_out = pixel_format_out
    self._extra_args = list(extra_args or [])
    self._proc: subprocess.Popen | None = None
    self._init_emitted = False
Methods:
fastvideo.entrypoints.streaming.FragmentedMP4Encoder.encode async
encode(frames: list[ndarray] | AsyncIterator[ndarray]) -> AsyncIterator[FragmentedMP4Chunk]

Feed frames into ffmpeg and yield fMP4 chunks as they appear.

Source code in fastvideo/entrypoints/streaming/stream.py
async def encode(
    self,
    frames: list[np.ndarray] | AsyncIterator[np.ndarray],
) -> AsyncIterator[FragmentedMP4Chunk]:
    """Feed frames into ffmpeg and yield fMP4 chunks as they appear."""
    if self._proc is None:
        self._spawn()
    assert self._proc is not None and self._proc.stdin is not None
    proc = self._proc

    loop = asyncio.get_running_loop()

    async def _writer() -> None:
        try:
            if hasattr(frames, "__aiter__"):
                async for frame in frames:  # type: ignore[union-attr]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
            else:
                for frame in frames:  # type: ignore[assignment]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
        finally:
            with contextlib.suppress(BrokenPipeError):
                proc.stdin.close()

    writer_task = asyncio.create_task(_writer())
    try:
        reader = proc.stdout
        assert reader is not None
        # Read in reasonably-sized chunks; MSE tolerates any size
        # but we don't want to starve the event loop.
        chunk_size = 64 * 1024
        while True:
            data = await loop.run_in_executor(None, reader.read, chunk_size)
            if not data:
                break
            kind: Literal["init", "media"] = "init" if not self._init_emitted else "media"
            self._init_emitted = True
            yield FragmentedMP4Chunk(
                kind=kind,
                data=bytes(data),
                stream_id=self.stream_id,
                segment_idx=self.segment_idx,
            )
    finally:
        await writer_task
fastvideo.entrypoints.streaming.GpuPool

Bases: ABC

Abstract GPU pool.

acquire binds a session to a worker and holds that binding across segments so continuation state can stay hot. run submits a single GenerationRequest for a bound session.

Acquire / release are independent of run — a session can run many segments on one acquired worker, and must release on disconnect.

fastvideo.entrypoints.streaming.InMemoryBlobStore
InMemoryBlobStore()

Bases: BlobStore

Thread-safe in-memory :class:BlobStore for single-process servers.

No eviction policy — callers are responsible for calling :meth:drop when a blob's owning state is replaced or a session ends. A redis- or filesystem-backed :class:BlobStore should replace this when the streaming server lands as a real service (PR 7.5+).

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._blobs: dict[str, _BlobRecord] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.InMemorySessionStore
InMemorySessionStore()

Bases: SessionStore

Thread-safe in-memory :class:SessionStore.

Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.

No eviction / TTL / bounded capacity — sessions only leave via :meth:drop. The live streaming server (PR 7.5+) is responsible for bounding growth and for dropping any :class:BlobStore blobs referenced by a state when that state is replaced or a session ends; this class does not know about blobs.

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._sessions: dict[str, ContinuationState] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.InProcessGpuPool
InProcessGpuPool(generator: _GeneratorLike, *, gpu_id: int = 0, session_store: SessionStore | None = None)

Bases: GpuPool

Single-process pool backed by one :class:_GeneratorLike.

This is what PR 7.5's server uses by default; PR 7.6 adds the real SubprocessGpuPool alternative but keeps this one for tests and small deployments.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
def __init__(
    self,
    generator: _GeneratorLike,
    *,
    gpu_id: int = 0,
    session_store: SessionStore | None = None,
) -> None:
    self._generator = generator
    self._gpu_id = gpu_id
    self._worker_id = f"inproc-{uuid.uuid4().hex[:6]}"
    self._session_store = session_store or InMemorySessionStore()
    self._active: dict[str, PoolAssignment] = {}
    self._lock = asyncio.Lock()
    self._gen_lock = asyncio.Lock()
fastvideo.entrypoints.streaming.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.MockGenerator dataclass
MockGenerator(sleep_ms: float = 0.0)

Generator stand-in that returns synthetic gradient frames.

Each call produces one segment worth of frames whose pixels vary by a constant derived from the request seed and segment index. Latency is configurable via sleep_ms so the caller can exercise slow- generate scenarios without spinning a GPU.

fastvideo.entrypoints.streaming.PoolAcquireTimeout

Bases: RuntimeError

Raised when acquire times out waiting for a free worker.

fastvideo.entrypoints.streaming.PromptEnhancer
PromptEnhancer(*, providers: Sequence[LLMProvider], model: str, timeout_ms: int = 20000, temperature: float = 0.7, max_tokens: int | None = 256, system_prompt_dir: str | None = None)

Orchestrates prompt operations across a priority-ordered provider list with structured fallback + hot-reloadable system prompts.

Usage::

enhancer = PromptEnhancer(
    providers=[CerebrasProvider(), GroqProvider()],
    model="gpt-oss-120b",
    system_prompt_dir="/etc/fastvideo/prompts",
)
response = await enhancer.enhance("a fox running through snow")
Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def __init__(
    self,
    *,
    providers: Sequence[LLMProvider],
    model: str,
    timeout_ms: int = 20000,
    temperature: float = 0.7,
    max_tokens: int | None = 256,
    system_prompt_dir: str | None = None,
) -> None:
    if not providers:
        raise ValueError("PromptEnhancer requires at least one LLMProvider")
    self._providers = list(providers)
    self._model = model
    self._timeout_ms = timeout_ms
    self._temperature = temperature
    self._max_tokens = max_tokens
    self._system_prompt_dir = system_prompt_dir
    self._system_prompts = self._load_system_prompts()
Methods:
fastvideo.entrypoints.streaming.PromptEnhancer.register_provider
register_provider(provider: LLMProvider, *, priority: int = -1) -> None

Insert an additional provider. priority=0 makes it primary; priority=-1 (default) appends as a fallback.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def register_provider(self, provider: LLMProvider, *, priority: int = -1) -> None:
    """Insert an additional provider. ``priority=0`` makes it primary;
    ``priority=-1`` (default) appends as a fallback."""
    if priority < 0:
        self._providers.append(provider)
    else:
        self._providers.insert(priority, provider)
fastvideo.entrypoints.streaming.PromptEnhancer.reload_system_prompts
reload_system_prompts() -> None

Re-read the system prompt files from system_prompt_dir.

The streaming server exposes this via a management endpoint so operators can iterate on prompt templates without restarting workers.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def reload_system_prompts(self) -> None:
    """Re-read the system prompt files from ``system_prompt_dir``.

    The streaming server exposes this via a management endpoint so
    operators can iterate on prompt templates without restarting
    workers.
    """
    self._system_prompts = self._load_system_prompts()
    logger.info("prompt enhancer: reloaded system prompts from %s", self._system_prompt_dir or "defaults")
fastvideo.entrypoints.streaming.PromptSafetyFilter
PromptSafetyFilter(*, classifier_path: str | None, enabled: bool = True, block_threshold: float = 0.5)

Minimal fastText-backed prompt safety filter.

Loads the classifier lazily on first use so the streaming server can construct the filter eagerly at startup without paying the model-load cost when safety is disabled.

Source code in fastvideo/entrypoints/streaming/prompt/safety.py
def __init__(
    self,
    *,
    classifier_path: str | None,
    enabled: bool = True,
    block_threshold: float = 0.5,
) -> None:
    self._classifier_path = classifier_path
    self._enabled = enabled
    self._block_threshold = block_threshold
    self._model: Any | None = None
    self._load_attempted = False
    self._load_lock = threading.Lock()
fastvideo.entrypoints.streaming.SafetyDecision

Bases: Enum

Attributes
fastvideo.entrypoints.streaming.SafetyDecision.UNAVAILABLE class-attribute instance-attribute
UNAVAILABLE = 'unavailable'

Returned when the classifier can't run (not configured, fastText missing). Safety is opt-in; the server treats UNAVAILABLE as ALLOW but logs it so operators know the filter is off.

fastvideo.entrypoints.streaming.Session dataclass
Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())
Methods:
fastvideo.entrypoints.streaming.Session.transition
transition(target: SessionState) -> None

Move to target if the edge is allowed.

Raises :class:InvalidSessionTransition on illegal moves. The self-loop on ACTIVE is legal so the server can re-assert ACTIVE on segment completion without special casing.

Source code in fastvideo/entrypoints/streaming/session.py
def transition(self, target: SessionState) -> None:
    """Move to ``target`` if the edge is allowed.

    Raises :class:`InvalidSessionTransition` on illegal moves. The
    self-loop on ``ACTIVE`` is legal so the server can re-assert
    ACTIVE on segment completion without special casing.
    """
    allowed = _VALID_TRANSITIONS.get(self.state, frozenset())
    if target not in allowed and target is not self.state:
        raise InvalidSessionTransition(f"{self.state.value} -> {target.value} is not a valid "
                                       f"session transition")
    self.state = target
    self.last_activity = time.monotonic()
fastvideo.entrypoints.streaming.SessionLogEvent dataclass
SessionLogEvent(session_id: str, event: str, payload: dict[str, Any] = dict(), ts: float = time())

One line in the session JSONL file.

fastvideo.entrypoints.streaming.SessionLogger
SessionLogger(log_dir: str | None)

Append-only JSONL logger keyed by session id.

Thread-safe; the server may be writing from multiple asyncio tasks (fMP4 encoder thread + control-frame handler) for the same session.

Source code in fastvideo/entrypoints/streaming/session_logger.py
def __init__(self, log_dir: str | None) -> None:
    self._log_dir = log_dir
    self._files: dict[str, TextIO] = {}
    self._locks: dict[str, threading.Lock] = {}
    self._registry_lock = threading.Lock()
    self._ensure_dir()
fastvideo.entrypoints.streaming.SessionManager
SessionManager(*, segment_cap: int, session_timeout_seconds: int, max_sessions: int = 1)

Registers sessions and enforces per-server session limits.

Source code in fastvideo/entrypoints/streaming/session.py
def __init__(
    self,
    *,
    segment_cap: int,
    session_timeout_seconds: int,
    max_sessions: int = 1,
) -> None:
    self._segment_cap = segment_cap
    self._session_timeout_seconds = session_timeout_seconds
    self._max_sessions = max_sessions
    self._sessions: dict[str, Session] = {}
Methods:
fastvideo.entrypoints.streaming.SessionManager.reap_timed_out
reap_timed_out(now: float | None = None) -> list[str]

Return the ids of sessions that have exceeded the idle timeout.

The caller is responsible for actually closing them — this method only identifies dead sessions so the server can emit session_timeout frames before dropping the WebSocket.

TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.

Source code in fastvideo/entrypoints/streaming/session.py
def reap_timed_out(self, now: float | None = None) -> list[str]:
    """Return the ids of sessions that have exceeded the idle timeout.

    The caller is responsible for actually closing them — this
    method only *identifies* dead sessions so the server can emit
    ``session_timeout`` frames before dropping the WebSocket.

    TODO: unused until a background driver calls it. Per-connection
    idle enforcement currently happens via asyncio.wait_for on
    receive_json; this helper catches sessions stuck before any
    receive (e.g. future QUEUED state) and is expected to be wired
    into the GPU-pool reaper.
    """
    now = now if now is not None else time.monotonic()
    dead: list[str] = []
    for sid, session in self._sessions.items():
        if session.state in {
                SessionState.COMPLETE,
                SessionState.ERROR,
                SessionState.TIMEOUT,
                SessionState.REJECTED,
        }:
            continue
        if now - session.last_activity > self._session_timeout_seconds:
            dead.append(sid)
    return dead
fastvideo.entrypoints.streaming.SessionState

Bases: Enum

State-machine positions for a streaming session.

Transitions are server-owned. See docs/design/server_contracts/streaming.md for the full diagram.

fastvideo.entrypoints.streaming.SessionStore

Bases: ABC

Keyed store for per-session continuation state.

Implementations own the session-id → state mapping. The streaming server calls :meth:store after each segment and :meth:snapshot when a client explicitly asks for an exportable state handle.

Methods:
fastvideo.entrypoints.streaming.SessionStore.drop abstractmethod
drop(session_id: str) -> None

Forget a session. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, session_id: str) -> None:
    """Forget a session. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.SessionStore.hydrate abstractmethod
hydrate(state: ContinuationState, *, session_id: str | None = None) -> str

Install state as the starting point for a session.

When session_id is None the store allocates a fresh id (UUID4); when provided the store uses it verbatim, overwriting any prior state at that id.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def hydrate(
    self,
    state: ContinuationState,
    *,
    session_id: str | None = None,
) -> str:
    """Install ``state`` as the starting point for a session.

    When ``session_id`` is ``None`` the store allocates a fresh id
    (UUID4); when provided the store uses it verbatim, overwriting
    any prior state at that id.
    """
fastvideo.entrypoints.streaming.SessionStore.snapshot abstractmethod
snapshot(session_id: str) -> ContinuationState | None

Return the current state for session_id (or None).

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def snapshot(self, session_id: str) -> ContinuationState | None:
    """Return the current state for ``session_id`` (or ``None``)."""
fastvideo.entrypoints.streaming.SessionStore.store abstractmethod
store(session_id: str, state: ContinuationState) -> None

Persist state for session_id, replacing any prior value.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def store(self, session_id: str, state: ContinuationState) -> None:
    """Persist ``state`` for ``session_id``, replacing any prior value."""
fastvideo.entrypoints.streaming.SubprocessGpuPool
SubprocessGpuPool(generator_config: GeneratorConfig, *, pool_config: GpuPoolConfig, warmup_config: WarmupConfig | None = None, session_store: SessionStore | None = None, worker_factory: WorkerFactory | None = None)

Bases: GpuPool

One multiprocessing.Process per GPU.

Each worker boots :class:fastvideo.VideoGenerator from a typed :class:GeneratorConfig inside the child process (post- CUDA_VISIBLE_DEVICES setup) and consumes jobs from an mp Queue.

This is the production shape: the parent process stays CPU-only, and GPU state never crosses process boundaries. Continuation state is serialized through :class:SessionStore for cross-GPU handoff.

PR 7.6 ships this as an opt-in; PR 7.5's in-process pool remains the default until nightly runs validate the subprocess path.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
def __init__(
    self,
    generator_config: GeneratorConfig,
    *,
    pool_config: GpuPoolConfig,
    warmup_config: WarmupConfig | None = None,
    session_store: SessionStore | None = None,
    worker_factory: WorkerFactory | None = None,
) -> None:
    self._generator_config = generator_config
    self._pool_config = pool_config
    self._warmup_config = warmup_config or WarmupConfig()
    self._session_store = session_store or InMemorySessionStore()
    self._worker_factory = worker_factory or _default_worker_factory
    self._workers: list[_WorkerHandle] = []
    self._available: asyncio.Queue[int] = asyncio.Queue()
    self._assignments: dict[str, PoolAssignment] = {}
    self._worker_by_id: dict[str, _WorkerHandle] = {}
    self._pending: dict[str, _PendingJob] = {}
    self._lock = asyncio.Lock()
    self._result_reader_tasks: list[asyncio.Task] = []
Methods:
fastvideo.entrypoints.streaming.SubprocessGpuPool.start async
start() -> None

Spawn worker processes and wait for each to report ready.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
async def start(self) -> None:
    """Spawn worker processes and wait for each to report ready."""
    num_workers = self._pool_config.num_workers or 1
    for gpu_id in range(num_workers):
        handle = self._worker_factory(
            gpu_id=gpu_id,
            generator_config=self._generator_config,
            warmup_config=self._warmup_config,
        )
        self._workers.append(handle)
        self._worker_by_id[handle.worker_id] = handle

    # Wait for each worker's ready event in a thread to avoid
    # blocking the event loop.
    loop = asyncio.get_running_loop()
    await asyncio.gather(*[
        loop.run_in_executor(None, handle.ready.wait, self._warmup_config.timeout_seconds)
        for handle in self._workers
    ])

    # Start background result readers — one task per worker
    # drains its result queue and resolves futures in _pending.
    for handle in self._workers:
        task = asyncio.create_task(self._drain_results(handle))
        self._result_reader_tasks.append(task)

    # Only admit workers that successfully booted. Anything that
    # failed boot (timeout, crash, error sentinel) stays out of the
    # available queue so we never assign a session to it.
    for idx, handle in enumerate(self._workers):
        if handle.boot_ok.is_set():
            await self._available.put(idx)
        else:
            logger.error(
                "pool: worker %s failed to boot; skipping",
                handle.worker_id,
            )

Functions:

fastvideo.entrypoints.streaming.build_app
build_app(serve_config: ServeConfig, generator: _GeneratorProto | None = None, *, pool: GpuPool | None = None, session_store: SessionStore | None = None) -> FastAPI

Build the FastAPI app used by :func:run_server.

Exposed so tests can drive the WebSocket endpoint in-process via starlette.testclient.TestClient(app).websocket_connect(...).

Exactly one of generator (backed by :class:InProcessGpuPool) or pool (for the subprocess-backed production shape) must be given.

Source code in fastvideo/entrypoints/streaming/server.py
def build_app(
    serve_config: ServeConfig,
    generator: _GeneratorProto | None = None,
    *,
    pool: GpuPool | None = None,
    session_store: SessionStore | None = None,
) -> FastAPI:
    """Build the FastAPI app used by :func:`run_server`.

    Exposed so tests can drive the WebSocket endpoint in-process via
    ``starlette.testclient.TestClient(app).websocket_connect(...)``.

    Exactly one of ``generator`` (backed by :class:`InProcessGpuPool`)
    or ``pool`` (for the subprocess-backed production shape) must be
    given.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming "
                         "server; got None. Add a `streaming:` block to your serve config.")
    streaming = serve_config.streaming
    if (generator is None) == (pool is None):
        raise ValueError("build_app requires exactly one of `generator` or `pool`")

    store = session_store or InMemorySessionStore()
    if pool is None:
        assert generator is not None
        pool = InProcessGpuPool(generator, session_store=store)

    sessions = SessionManager(
        segment_cap=serve_config.streaming.generation_segment_cap,
        session_timeout_seconds=serve_config.streaming.session_timeout_seconds,
    )
    state = ServerState(
        serve_config=serve_config,
        pool=pool,
        sessions=sessions,
        session_store=store,
    )

    app = FastAPI(title="FastVideo Streaming")

    @app.get("/health")
    async def _health() -> JSONResponse:
        return JSONResponse({
            "status": "ok",
            "sessions": len(state.sessions),
            "stream_mode": streaming.stream_mode,
        })

    app.include_router(build_health_router(pool))

    @app.websocket("/v1/stream")
    async def _stream(websocket: WebSocket) -> None:
        await websocket.accept()
        try:
            session = state.sessions.create()
        except SessionRejected as exc:
            await _send_error(websocket, "session_rejected", str(exc), retryable=False)
            await websocket.close(code=_WS_CLOSE_TRY_AGAIN_LATER, reason="session_rejected")
            return

        try:
            await _handle_session(websocket, session, state)
        except WebSocketDisconnect:
            logger.info("session %s: client disconnected", session.id[:8])
        except Exception:  # pragma: no cover - defensive catch-all
            logger.exception("session %s: unhandled error", session.id[:8])
            with contextlib.suppress(InvalidSessionTransition):
                session.transition(SessionState.ERROR)
        finally:
            with contextlib.suppress(Exception):
                await state.pool.release(session.id)
            _cleanup_session(session, state)

    app.state.server_state = state
    return app
fastvideo.entrypoints.streaming.build_health_router
build_health_router(pool: PoolRef = None) -> APIRouter

Build a router exposing streaming liveness/readiness endpoints.

pool may be either a concrete pool or a zero-argument callable that returns the current pool. The callable form lets product servers keep their own lifespan-managed runtime singleton without adding public global state.

Source code in fastvideo/entrypoints/streaming/health.py
def build_health_router(pool: PoolRef = None) -> APIRouter:
    """Build a router exposing streaming liveness/readiness endpoints.

    ``pool`` may be either a concrete pool or a zero-argument callable that
    returns the current pool. The callable form lets product servers keep their
    own lifespan-managed runtime singleton without adding public global state.
    """
    router = APIRouter()

    @router.get("/health")
    @router.get("/healthz")
    async def get_healthz() -> dict[str, Any]:
        """Liveness probe for process-level health."""
        return {
            "status": "ok",
            "service": SERVICE_NAME,
            "ts": _utc_now_iso(),
        }

    @router.get("/readyz")
    async def get_readyz() -> dict[str, Any]:
        """Readiness probe for router/load-balancer health checks."""
        status_payload = await get_pool_status(pool)
        ready_workers = _ready_worker_count(status_payload)
        return {
            "status": "ready" if ready_workers > 0 else "warming",
            "service": SERVICE_NAME,
            "ready_gpu_workers": ready_workers,
            "total_gpus": _as_int(status_payload.get("total_gpus")),
            "available_gpus": _as_int(status_payload.get("available_gpus")),
            "warmup_successful_gpus": _as_int(status_payload.get("warmup_successful_gpus")),
            "warmup_failed_gpus": _as_int(status_payload.get("warmup_failed_gpus")),
            "queue_size": _as_int(status_payload.get("queue_size")),
            "ts": _utc_now_iso(),
        }

    @router.get("/status")
    async def get_status() -> dict[str, Any]:
        """Get the current status of the GPU pool."""
        return await get_pool_status(pool)

    return router
fastvideo.entrypoints.streaming.build_mock_app
build_mock_app(*, sleep_ms: float = 0.0)

Build a FastAPI app backed by :class:MockGenerator.

Source code in fastvideo/entrypoints/streaming/mock_server.py
def build_mock_app(*, sleep_ms: float = 0.0):
    """Build a FastAPI app backed by :class:`MockGenerator`."""
    serve_config = ServeConfig(
        generator=GeneratorConfig(model_path="/models/mock"),
        streaming=StreamingConfig(
            session_timeout_seconds=120,
            generation_segment_cap=6,
        ),
    )
    serve_config.default_request.sampling = SamplingConfig(
        num_frames=24,
        height=256,
        width=256,
        fps=24,
        num_inference_steps=1,
    )
    return build_app(serve_config, MockGenerator(sleep_ms=sleep_ms))
fastvideo.entrypoints.streaming.get_pool_status async
get_pool_status(pool: PoolRef = None) -> dict[str, Any]

Return the generic GPU pool status payload used by /status.

Source code in fastvideo/entrypoints/streaming/health.py
async def get_pool_status(pool: PoolRef = None) -> dict[str, Any]:
    """Return the generic GPU pool status payload used by ``/status``."""
    resolved = _resolve_pool(pool)
    if resolved is None:
        return _zero_pool_status()
    if _has_get_status(resolved):
        return dict(resolved.get_status())
    if _has_health(resolved):
        return _status_from_health(resolved.health())
    return _zero_pool_status()
fastvideo.entrypoints.streaming.run_server
run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None

Launch the streaming server.

Boots a :class:fastvideo.VideoGenerator from serve_config.generator unless generator is provided, then serves build_app(...) via uvicorn.

Source code in fastvideo/entrypoints/streaming/server.py
def run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None:
    """Launch the streaming server.

    Boots a :class:`fastvideo.VideoGenerator` from
    ``serve_config.generator`` unless ``generator`` is provided, then
    serves ``build_app(...)`` via uvicorn.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming server; "
                         "got None. Add a `streaming:` block to your serve config.")

    import uvicorn

    if generator is None:
        from fastvideo import VideoGenerator  # lazy to avoid boot cost

        generator = VideoGenerator.from_pretrained(config=serve_config.generator)
    app = build_app(serve_config, generator)
    uvicorn.run(
        app,
        host=serve_config.server.host,
        port=serve_config.server.port,
    )

Modules

fastvideo.entrypoints.streaming.gpu_pool

GPU pool manager for the streaming server.

Replaces the single-generator path in PR 7.5 with a typed pool abstraction. Three implementations ship here:

  • :class:InProcessGpuPool — one in-process VideoGenerator; used by tests and single-GPU dev deployments.
  • :class:SubprocessGpuPool — one multiprocessing.Process per GPU, each running :func:worker_main against a GeneratorConfig. Jobs are dispatched via multiprocessing.Queue.
  • :class:GpuPool (abstract) — the interface both use.

Session-to-GPU binding lives in the pool so continuation state stays on the GPU that generated the previous segment (matching the internal gpu_pool.py's per-GPU cache behavior). Cross-GPU handoff is supported via :class:SessionStore snapshot + hydrate, which serializes the state before the migration and rehydrates it on the new worker.

Typed config: workers start from a :class:GeneratorConfig (no flat LTX-2 kwargs), satisfying the PR 6 + PR 7 contracts that the public surface doesn't reintroduce the legacy kwarg bag.

Classes
fastvideo.entrypoints.streaming.gpu_pool.GpuPool

Bases: ABC

Abstract GPU pool.

acquire binds a session to a worker and holds that binding across segments so continuation state can stay hot. run submits a single GenerationRequest for a bound session.

Acquire / release are independent of run — a session can run many segments on one acquired worker, and must release on disconnect.

fastvideo.entrypoints.streaming.gpu_pool.InProcessGpuPool
InProcessGpuPool(generator: _GeneratorLike, *, gpu_id: int = 0, session_store: SessionStore | None = None)

Bases: GpuPool

Single-process pool backed by one :class:_GeneratorLike.

This is what PR 7.5's server uses by default; PR 7.6 adds the real SubprocessGpuPool alternative but keeps this one for tests and small deployments.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
def __init__(
    self,
    generator: _GeneratorLike,
    *,
    gpu_id: int = 0,
    session_store: SessionStore | None = None,
) -> None:
    self._generator = generator
    self._gpu_id = gpu_id
    self._worker_id = f"inproc-{uuid.uuid4().hex[:6]}"
    self._session_store = session_store or InMemorySessionStore()
    self._active: dict[str, PoolAssignment] = {}
    self._lock = asyncio.Lock()
    self._gen_lock = asyncio.Lock()
fastvideo.entrypoints.streaming.gpu_pool.PoolAcquireTimeout

Bases: RuntimeError

Raised when acquire times out waiting for a free worker.

fastvideo.entrypoints.streaming.gpu_pool.PoolAssignment dataclass
PoolAssignment(gpu_id: int, worker_id: str, pinned_at: float = monotonic())

The worker a session is currently bound to.

fastvideo.entrypoints.streaming.gpu_pool.SubprocessGpuPool
SubprocessGpuPool(generator_config: GeneratorConfig, *, pool_config: GpuPoolConfig, warmup_config: WarmupConfig | None = None, session_store: SessionStore | None = None, worker_factory: WorkerFactory | None = None)

Bases: GpuPool

One multiprocessing.Process per GPU.

Each worker boots :class:fastvideo.VideoGenerator from a typed :class:GeneratorConfig inside the child process (post- CUDA_VISIBLE_DEVICES setup) and consumes jobs from an mp Queue.

This is the production shape: the parent process stays CPU-only, and GPU state never crosses process boundaries. Continuation state is serialized through :class:SessionStore for cross-GPU handoff.

PR 7.6 ships this as an opt-in; PR 7.5's in-process pool remains the default until nightly runs validate the subprocess path.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
def __init__(
    self,
    generator_config: GeneratorConfig,
    *,
    pool_config: GpuPoolConfig,
    warmup_config: WarmupConfig | None = None,
    session_store: SessionStore | None = None,
    worker_factory: WorkerFactory | None = None,
) -> None:
    self._generator_config = generator_config
    self._pool_config = pool_config
    self._warmup_config = warmup_config or WarmupConfig()
    self._session_store = session_store or InMemorySessionStore()
    self._worker_factory = worker_factory or _default_worker_factory
    self._workers: list[_WorkerHandle] = []
    self._available: asyncio.Queue[int] = asyncio.Queue()
    self._assignments: dict[str, PoolAssignment] = {}
    self._worker_by_id: dict[str, _WorkerHandle] = {}
    self._pending: dict[str, _PendingJob] = {}
    self._lock = asyncio.Lock()
    self._result_reader_tasks: list[asyncio.Task] = []
Methods:
fastvideo.entrypoints.streaming.gpu_pool.SubprocessGpuPool.start async
start() -> None

Spawn worker processes and wait for each to report ready.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
async def start(self) -> None:
    """Spawn worker processes and wait for each to report ready."""
    num_workers = self._pool_config.num_workers or 1
    for gpu_id in range(num_workers):
        handle = self._worker_factory(
            gpu_id=gpu_id,
            generator_config=self._generator_config,
            warmup_config=self._warmup_config,
        )
        self._workers.append(handle)
        self._worker_by_id[handle.worker_id] = handle

    # Wait for each worker's ready event in a thread to avoid
    # blocking the event loop.
    loop = asyncio.get_running_loop()
    await asyncio.gather(*[
        loop.run_in_executor(None, handle.ready.wait, self._warmup_config.timeout_seconds)
        for handle in self._workers
    ])

    # Start background result readers — one task per worker
    # drains its result queue and resolves futures in _pending.
    for handle in self._workers:
        task = asyncio.create_task(self._drain_results(handle))
        self._result_reader_tasks.append(task)

    # Only admit workers that successfully booted. Anything that
    # failed boot (timeout, crash, error sentinel) stays out of the
    # available queue so we never assign a session to it.
    for idx, handle in enumerate(self._workers):
        if handle.boot_ok.is_set():
            await self._available.put(idx)
        else:
            logger.error(
                "pool: worker %s failed to boot; skipping",
                handle.worker_id,
            )
Functions:
fastvideo.entrypoints.streaming.gpu_pool.worker_main
worker_main(*, gpu_id: int, worker_id: str, generator_config: GeneratorConfig, warmup_config: WarmupConfig, job_queue: Queue, result_queue: Queue, shutdown_event: Any) -> None

Per-worker subprocess entry.

Runs inside the child spawned by SubprocessGpuPool. Blocking VideoGenerator construction + generation happens here, not in the parent's event loop.

Source code in fastvideo/entrypoints/streaming/worker.py
def worker_main(
    *,
    gpu_id: int,
    worker_id: str,
    generator_config: GeneratorConfig,
    warmup_config: WarmupConfig,
    job_queue: mp.Queue,
    result_queue: mp.Queue,
    shutdown_event: Any,
) -> None:  # pragma: no cover - exercised via integration only
    """Per-worker subprocess entry.

    Runs inside the child spawned by ``SubprocessGpuPool``. Blocking
    ``VideoGenerator`` construction + generation happens here, not in
    the parent's event loop.
    """
    import os

    os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
    try:
        from fastvideo import VideoGenerator

        generator = VideoGenerator.from_pretrained(config=generator_config)
        if warmup_config.enabled:
            _warmup_worker(generator, warmup_config)
        result_queue.put({"kind": "ready", "worker_id": worker_id})
    except Exception as exc:
        result_queue.put({"kind": "error", "error": repr(exc)})
        return

    while not shutdown_event.is_set():
        try:
            item = job_queue.get(timeout=0.5)
        except queue.Empty:
            continue
        if item is None:
            break
        job_id = item["job_id"]
        request = item["request"]
        try:
            result = generator.generate(request)
            result_queue.put({
                "kind": "result",
                "job_id": job_id,
                "result": result,
            })
        except Exception as exc:
            result_queue.put({
                "kind": "error",
                "job_id": job_id,
                "error": repr(exc),
            })
fastvideo.entrypoints.streaming.health

Health, readiness, and status routes for streaming servers.

Functions:
fastvideo.entrypoints.streaming.health.build_health_router
build_health_router(pool: PoolRef = None) -> APIRouter

Build a router exposing streaming liveness/readiness endpoints.

pool may be either a concrete pool or a zero-argument callable that returns the current pool. The callable form lets product servers keep their own lifespan-managed runtime singleton without adding public global state.

Source code in fastvideo/entrypoints/streaming/health.py
def build_health_router(pool: PoolRef = None) -> APIRouter:
    """Build a router exposing streaming liveness/readiness endpoints.

    ``pool`` may be either a concrete pool or a zero-argument callable that
    returns the current pool. The callable form lets product servers keep their
    own lifespan-managed runtime singleton without adding public global state.
    """
    router = APIRouter()

    @router.get("/health")
    @router.get("/healthz")
    async def get_healthz() -> dict[str, Any]:
        """Liveness probe for process-level health."""
        return {
            "status": "ok",
            "service": SERVICE_NAME,
            "ts": _utc_now_iso(),
        }

    @router.get("/readyz")
    async def get_readyz() -> dict[str, Any]:
        """Readiness probe for router/load-balancer health checks."""
        status_payload = await get_pool_status(pool)
        ready_workers = _ready_worker_count(status_payload)
        return {
            "status": "ready" if ready_workers > 0 else "warming",
            "service": SERVICE_NAME,
            "ready_gpu_workers": ready_workers,
            "total_gpus": _as_int(status_payload.get("total_gpus")),
            "available_gpus": _as_int(status_payload.get("available_gpus")),
            "warmup_successful_gpus": _as_int(status_payload.get("warmup_successful_gpus")),
            "warmup_failed_gpus": _as_int(status_payload.get("warmup_failed_gpus")),
            "queue_size": _as_int(status_payload.get("queue_size")),
            "ts": _utc_now_iso(),
        }

    @router.get("/status")
    async def get_status() -> dict[str, Any]:
        """Get the current status of the GPU pool."""
        return await get_pool_status(pool)

    return router
fastvideo.entrypoints.streaming.health.get_pool_status async
get_pool_status(pool: PoolRef = None) -> dict[str, Any]

Return the generic GPU pool status payload used by /status.

Source code in fastvideo/entrypoints/streaming/health.py
async def get_pool_status(pool: PoolRef = None) -> dict[str, Any]:
    """Return the generic GPU pool status payload used by ``/status``."""
    resolved = _resolve_pool(pool)
    if resolved is None:
        return _zero_pool_status()
    if _has_get_status(resolved):
        return dict(resolved.get_status())
    if _has_health(resolved):
        return _status_from_health(resolved.health())
    return _zero_pool_status()
fastvideo.entrypoints.streaming.mock_server

Mock streaming server — a frontend dev aid.

Boots the same FastAPI app the real streaming server uses, but backs it with :class:InProcessGpuPool wrapping a synthetic generator that emits pre-baked RGB frames. No GPU or model weights required.

Use cases:

  • Frontend development without a real model loaded.
  • Integration tests that exercise the WS protocol end-to-end.
  • Reproducing protocol bugs locally.

Launch: python -m fastvideo.entrypoints.streaming.mock_server.

Classes
fastvideo.entrypoints.streaming.mock_server.MockGenerator dataclass
MockGenerator(sleep_ms: float = 0.0)

Generator stand-in that returns synthetic gradient frames.

Each call produces one segment worth of frames whose pixels vary by a constant derived from the request seed and segment index. Latency is configurable via sleep_ms so the caller can exercise slow- generate scenarios without spinning a GPU.

Functions:
fastvideo.entrypoints.streaming.mock_server.build_mock_app
build_mock_app(*, sleep_ms: float = 0.0)

Build a FastAPI app backed by :class:MockGenerator.

Source code in fastvideo/entrypoints/streaming/mock_server.py
def build_mock_app(*, sleep_ms: float = 0.0):
    """Build a FastAPI app backed by :class:`MockGenerator`."""
    serve_config = ServeConfig(
        generator=GeneratorConfig(model_path="/models/mock"),
        streaming=StreamingConfig(
            session_timeout_seconds=120,
            generation_segment_cap=6,
        ),
    )
    serve_config.default_request.sampling = SamplingConfig(
        num_frames=24,
        height=256,
        width=256,
        fps=24,
        num_inference_steps=1,
    )
    return build_app(serve_config, MockGenerator(sleep_ms=sleep_ms))
fastvideo.entrypoints.streaming.prompt

Prompt pipeline for the streaming server.

  • :mod:providers — LLM backend abstraction + built-in adapters
  • :mod:enhancer — provider-agnostic enhance / auto-extend / rewrite operations on top of the provider layer

All of this is optional; the streaming server runs fine without it (PR 7.5's skeleton never invokes the enhancer). When the operator enables ServeConfig.streaming.prompt.enabled, the server routes each session_init_v2 curated prompt through enhance before the first segment.

Classes
fastvideo.entrypoints.streaming.prompt.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.prompt.LLMProviderError
LLMProviderError(message: str, *, retryable: bool = True)

Bases: RuntimeError

Raised when an LLM provider fails a request.

retryable controls whether the enhancer falls back to the next provider. It is settable per-instance so the same exception type can describe retryable transport errors (5xx, 429) and non-retryable client errors (4xx auth/bad-request) without forcing a separate subclass for every status family.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str, *, retryable: bool = True) -> None:
    super().__init__(message)
    self.retryable = retryable
fastvideo.entrypoints.streaming.prompt.LLMTimeoutError
LLMTimeoutError(message: str)

Bases: LLMProviderError

Raised when an LLM provider times out — always retryable.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str) -> None:
    super().__init__(message, retryable=True)
fastvideo.entrypoints.streaming.prompt.PromptEnhancer
PromptEnhancer(*, providers: Sequence[LLMProvider], model: str, timeout_ms: int = 20000, temperature: float = 0.7, max_tokens: int | None = 256, system_prompt_dir: str | None = None)

Orchestrates prompt operations across a priority-ordered provider list with structured fallback + hot-reloadable system prompts.

Usage::

enhancer = PromptEnhancer(
    providers=[CerebrasProvider(), GroqProvider()],
    model="gpt-oss-120b",
    system_prompt_dir="/etc/fastvideo/prompts",
)
response = await enhancer.enhance("a fox running through snow")
Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def __init__(
    self,
    *,
    providers: Sequence[LLMProvider],
    model: str,
    timeout_ms: int = 20000,
    temperature: float = 0.7,
    max_tokens: int | None = 256,
    system_prompt_dir: str | None = None,
) -> None:
    if not providers:
        raise ValueError("PromptEnhancer requires at least one LLMProvider")
    self._providers = list(providers)
    self._model = model
    self._timeout_ms = timeout_ms
    self._temperature = temperature
    self._max_tokens = max_tokens
    self._system_prompt_dir = system_prompt_dir
    self._system_prompts = self._load_system_prompts()
Methods:
fastvideo.entrypoints.streaming.prompt.PromptEnhancer.register_provider
register_provider(provider: LLMProvider, *, priority: int = -1) -> None

Insert an additional provider. priority=0 makes it primary; priority=-1 (default) appends as a fallback.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def register_provider(self, provider: LLMProvider, *, priority: int = -1) -> None:
    """Insert an additional provider. ``priority=0`` makes it primary;
    ``priority=-1`` (default) appends as a fallback."""
    if priority < 0:
        self._providers.append(provider)
    else:
        self._providers.insert(priority, provider)
fastvideo.entrypoints.streaming.prompt.PromptEnhancer.reload_system_prompts
reload_system_prompts() -> None

Re-read the system prompt files from system_prompt_dir.

The streaming server exposes this via a management endpoint so operators can iterate on prompt templates without restarting workers.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def reload_system_prompts(self) -> None:
    """Re-read the system prompt files from ``system_prompt_dir``.

    The streaming server exposes this via a management endpoint so
    operators can iterate on prompt templates without restarting
    workers.
    """
    self._system_prompts = self._load_system_prompts()
    logger.info("prompt enhancer: reloaded system prompts from %s", self._system_prompt_dir or "defaults")
Modules
fastvideo.entrypoints.streaming.prompt.enhancer

Provider-agnostic prompt orchestration for the streaming server.

Three operations the streaming server needs:

  • enhance — polish a user prompt (add cinematic detail, fix syntax)
  • auto_extend — generate a follow-on prompt for loop generation
  • rewrite — rewrite a seed prompt for a user-directed rewrite flow

All three share the same orchestration: pick a provider in priority order, submit an LLMRequest, fall back to the next provider on retryable errors, and surface a structured :class:LLMResponse back to the caller.

System prompts are loaded from system_prompt_dir on construction and can be hot-reloaded via :meth:PromptEnhancer.reload_system_prompts. The streaming server's management endpoint calls that method in response to a rewrite_seed_prompts_started frame.

Classes
fastvideo.entrypoints.streaming.prompt.enhancer.PromptEnhancer
PromptEnhancer(*, providers: Sequence[LLMProvider], model: str, timeout_ms: int = 20000, temperature: float = 0.7, max_tokens: int | None = 256, system_prompt_dir: str | None = None)

Orchestrates prompt operations across a priority-ordered provider list with structured fallback + hot-reloadable system prompts.

Usage::

enhancer = PromptEnhancer(
    providers=[CerebrasProvider(), GroqProvider()],
    model="gpt-oss-120b",
    system_prompt_dir="/etc/fastvideo/prompts",
)
response = await enhancer.enhance("a fox running through snow")
Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def __init__(
    self,
    *,
    providers: Sequence[LLMProvider],
    model: str,
    timeout_ms: int = 20000,
    temperature: float = 0.7,
    max_tokens: int | None = 256,
    system_prompt_dir: str | None = None,
) -> None:
    if not providers:
        raise ValueError("PromptEnhancer requires at least one LLMProvider")
    self._providers = list(providers)
    self._model = model
    self._timeout_ms = timeout_ms
    self._temperature = temperature
    self._max_tokens = max_tokens
    self._system_prompt_dir = system_prompt_dir
    self._system_prompts = self._load_system_prompts()
Methods:
fastvideo.entrypoints.streaming.prompt.enhancer.PromptEnhancer.register_provider
register_provider(provider: LLMProvider, *, priority: int = -1) -> None

Insert an additional provider. priority=0 makes it primary; priority=-1 (default) appends as a fallback.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def register_provider(self, provider: LLMProvider, *, priority: int = -1) -> None:
    """Insert an additional provider. ``priority=0`` makes it primary;
    ``priority=-1`` (default) appends as a fallback."""
    if priority < 0:
        self._providers.append(provider)
    else:
        self._providers.insert(priority, provider)
fastvideo.entrypoints.streaming.prompt.enhancer.PromptEnhancer.reload_system_prompts
reload_system_prompts() -> None

Re-read the system prompt files from system_prompt_dir.

The streaming server exposes this via a management endpoint so operators can iterate on prompt templates without restarting workers.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def reload_system_prompts(self) -> None:
    """Re-read the system prompt files from ``system_prompt_dir``.

    The streaming server exposes this via a management endpoint so
    operators can iterate on prompt templates without restarting
    workers.
    """
    self._system_prompts = self._load_system_prompts()
    logger.info("prompt enhancer: reloaded system prompts from %s", self._system_prompt_dir or "defaults")
Functions:
fastvideo.entrypoints.streaming.prompt.providers

LLM provider implementations used by the prompt enhancer.

Classes
fastvideo.entrypoints.streaming.prompt.providers.CerebrasProvider dataclass
CerebrasProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'cerebras')

Cerebras inference adapter.

api_key falls back to CEREBRAS_API_KEY when unset.

fastvideo.entrypoints.streaming.prompt.providers.GroqProvider dataclass
GroqProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'groq')

Groq inference adapter.

Identical wire format to :class:CerebrasProvider; both go through :func:complete_openai_compatible. The two providers differ only in base URL, env var, and model id conventions.

fastvideo.entrypoints.streaming.prompt.providers.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.prompt.providers.LLMProviderError
LLMProviderError(message: str, *, retryable: bool = True)

Bases: RuntimeError

Raised when an LLM provider fails a request.

retryable controls whether the enhancer falls back to the next provider. It is settable per-instance so the same exception type can describe retryable transport errors (5xx, 429) and non-retryable client errors (4xx auth/bad-request) without forcing a separate subclass for every status family.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str, *, retryable: bool = True) -> None:
    super().__init__(message)
    self.retryable = retryable
fastvideo.entrypoints.streaming.prompt.providers.LLMTimeoutError
LLMTimeoutError(message: str)

Bases: LLMProviderError

Raised when an LLM provider times out — always retryable.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str) -> None:
    super().__init__(message, retryable=True)
Modules
fastvideo.entrypoints.streaming.prompt.providers.base

LLM provider protocol + DTOs used by the prompt enhancer.

Third-party users add a new provider by implementing :class:LLMProvider and registering it with a prompt enhancer instance. The shipped providers live in sibling modules (cerebras.py, groq.py) and each is ~100-200 LOC — the provider layer is intentionally thin so the enhancer stays provider-agnostic.

Classes
fastvideo.entrypoints.streaming.prompt.providers.base.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.prompt.providers.base.LLMProviderError
LLMProviderError(message: str, *, retryable: bool = True)

Bases: RuntimeError

Raised when an LLM provider fails a request.

retryable controls whether the enhancer falls back to the next provider. It is settable per-instance so the same exception type can describe retryable transport errors (5xx, 429) and non-retryable client errors (4xx auth/bad-request) without forcing a separate subclass for every status family.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str, *, retryable: bool = True) -> None:
    super().__init__(message)
    self.retryable = retryable
fastvideo.entrypoints.streaming.prompt.providers.base.LLMTimeoutError
LLMTimeoutError(message: str)

Bases: LLMProviderError

Raised when an LLM provider times out — always retryable.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str) -> None:
    super().__init__(message, retryable=True)
fastvideo.entrypoints.streaming.prompt.providers.cerebras

Cerebras LLM provider (OpenAI-compatible chat endpoint).

Classes
fastvideo.entrypoints.streaming.prompt.providers.cerebras.CerebrasProvider dataclass
CerebrasProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'cerebras')

Cerebras inference adapter.

api_key falls back to CEREBRAS_API_KEY when unset.

Functions:
fastvideo.entrypoints.streaming.prompt.providers.groq

Groq LLM provider (OpenAI-compatible chat endpoint).

Classes
fastvideo.entrypoints.streaming.prompt.providers.groq.GroqProvider dataclass
GroqProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'groq')

Groq inference adapter.

Identical wire format to :class:CerebrasProvider; both go through :func:complete_openai_compatible. The two providers differ only in base URL, env var, and model id conventions.

Functions:
fastvideo.entrypoints.streaming.prompt.rewrite

Rewrite payload builder.

The UI's "rewrite seed prompts" flow asks the enhancer to produce a batch of alternative prompts given one seed. This module packages the seed + options into the payload the enhancer expects and unpacks the response back into a typed :class:RewriteResult.

Separating this from :mod:enhancer keeps the enhancer provider- agnostic; anything UI-specific (how many alternatives to request, how to split the response, temperature) lives here.

Classes
fastvideo.entrypoints.streaming.prompt.rewrite.RewriteOptions dataclass
RewriteOptions(count: int = 3, temperature: float | None = None)
Attributes
fastvideo.entrypoints.streaming.prompt.rewrite.RewriteOptions.count class-attribute instance-attribute
count: int = 3

Number of alternative prompts to request.

Functions:
fastvideo.entrypoints.streaming.prompt.rewrite.build_rewrite async
build_rewrite(enhancer: PromptEnhancer, seed_prompt: str, *, options: RewriteOptions | None = None) -> RewriteResult

Run a rewrite op through the enhancer and return a typed result.

Source code in fastvideo/entrypoints/streaming/prompt/rewrite.py
async def build_rewrite(
    enhancer: PromptEnhancer,
    seed_prompt: str,
    *,
    options: RewriteOptions | None = None,
) -> RewriteResult:
    """Run a rewrite op through the enhancer and return a typed result."""
    if not seed_prompt.strip():
        raise ValueError("rewrite seed prompt must be non-empty")
    options = options or RewriteOptions()
    response = await enhancer.rewrite(seed_prompt)
    alternatives = _split_response(response.content, limit=options.count)
    return RewriteResult(
        seed_prompt=seed_prompt,
        alternatives=alternatives,
        provider=response.provider,
        model=response.model,
        latency_ms=response.latency_ms,
        fallback_used=response.fallback_used,
    )
fastvideo.entrypoints.streaming.prompt.safety

Optional prompt safety filter.

Uses a fastText classifier to score prompts against a banned-content rubric. Only loaded when ServeConfig.streaming.safety.enabled is True and fastText is installed — users who don't need it see no runtime cost.

Install: pip install fastvideo[prompt-safety] (ships fasttext as an optional extra) or install fasttext directly.

Classes
fastvideo.entrypoints.streaming.prompt.safety.PromptSafetyFilter
PromptSafetyFilter(*, classifier_path: str | None, enabled: bool = True, block_threshold: float = 0.5)

Minimal fastText-backed prompt safety filter.

Loads the classifier lazily on first use so the streaming server can construct the filter eagerly at startup without paying the model-load cost when safety is disabled.

Source code in fastvideo/entrypoints/streaming/prompt/safety.py
def __init__(
    self,
    *,
    classifier_path: str | None,
    enabled: bool = True,
    block_threshold: float = 0.5,
) -> None:
    self._classifier_path = classifier_path
    self._enabled = enabled
    self._block_threshold = block_threshold
    self._model: Any | None = None
    self._load_attempted = False
    self._load_lock = threading.Lock()
fastvideo.entrypoints.streaming.prompt.safety.SafetyDecision

Bases: Enum

Attributes
fastvideo.entrypoints.streaming.prompt.safety.SafetyDecision.UNAVAILABLE class-attribute instance-attribute
UNAVAILABLE = 'unavailable'

Returned when the classifier can't run (not configured, fastText missing). Safety is opt-in; the server treats UNAVAILABLE as ALLOW but logs it so operators know the filter is off.

Functions:
fastvideo.entrypoints.streaming.prompt.safety.first_blocked
first_blocked(filter_: PromptSafetyFilter, prompts: list[str]) -> SafetyResult | None

Return the first prompt the filter blocks, or None.

Source code in fastvideo/entrypoints/streaming/prompt/safety.py
def first_blocked(
    filter_: PromptSafetyFilter,
    prompts: list[str],
) -> SafetyResult | None:
    """Return the first prompt the filter blocks, or ``None``."""
    for prompt in prompts:
        result = filter_.classify(prompt)
        if result.decision is SafetyDecision.BLOCK:
            return result
    return None
fastvideo.entrypoints.streaming.protocol

JSON WebSocket protocol schemas for the streaming server.

Every control message shares the envelope {"type": <str>, ...}. Pydantic models live here so the server can parse / validate incoming frames and emit well-typed outgoing frames without hand-rolled dicts.

The message catalogue matches the contract in docs/design/server_contracts/streaming.md; additions must land in both places in the same PR.

Classes
fastvideo.entrypoints.streaming.protocol.ContinuationStateSnapshot

Bases: BaseModel

Attributes
fastvideo.entrypoints.streaming.protocol.ContinuationStateSnapshot.state instance-attribute
state: dict[str, Any]

{kind, payload} dict matching :class:fastvideo.api.ContinuationState.

fastvideo.entrypoints.streaming.protocol.MediaInit

Bases: BaseModel

Descriptor for the fMP4 initialization segment that follows.

fastvideo.entrypoints.streaming.protocol.SegmentPromptSource

Bases: BaseModel

Request a new segment using a specific prompt.

fastvideo.entrypoints.streaming.protocol.SessionInitV2

Bases: BaseModel

Opening frame the client sends after the WebSocket handshake.

Attributes
fastvideo.entrypoints.streaming.protocol.SessionInitV2.continuation_state class-attribute instance-attribute
continuation_state: dict[str, Any] | None = None

Optional {kind, payload} dict; hydrated into :class:fastvideo.api.ContinuationState server-side.

fastvideo.entrypoints.streaming.protocol.SnapshotState

Bases: BaseModel

Request the current ContinuationState for export.

Functions:
fastvideo.entrypoints.streaming.protocol.parse_client_message
parse_client_message(raw: dict[str, Any]) -> ClientMessage

Parse an incoming WebSocket dict into a typed client message.

Unknown type values raise :class:pydantic.ValidationError; the server handler turns that into an error frame with code="invalid_message".

Source code in fastvideo/entrypoints/streaming/protocol.py
def parse_client_message(raw: dict[str, Any]) -> ClientMessage:
    """Parse an incoming WebSocket dict into a typed client message.

    Unknown ``type`` values raise :class:`pydantic.ValidationError`; the
    server handler turns that into an ``error`` frame with
    ``code="invalid_message"``.
    """
    from pydantic import TypeAdapter

    return TypeAdapter(ClientMessage).validate_python(raw)
fastvideo.entrypoints.streaming.router

Multi-replica load balancer + WebSocket proxy for the streaming server.

Sits in front of one-or-more streaming-server replicas and forwards WebSocket sessions to a healthy primary, with failover to secondaries. Kept in-repo under fastvideo/entrypoints/streaming/router/ per the PR plan's default; the alternative (separate package) is an open question deferred to review.

Classes
fastvideo.entrypoints.streaming.router.ReplicaRegistry
ReplicaRegistry(replicas: list[ReplicaEndpoint])

Stateful map of replica URL → :class:Replica.

Selection favors primary replicas when healthy; otherwise the first healthy non-primary is returned. When none are healthy, the registry returns None so the router can reject incoming sessions with gpu_unavailable.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def __init__(self, replicas: list[ReplicaEndpoint]) -> None:
    if not replicas:
        raise ValueError("ReplicaRegistry requires at least one replica")
    self._replicas: dict[str, Replica] = {endpoint.url: Replica(endpoint=endpoint) for endpoint in replicas}
    self._lock = asyncio.Lock()
Methods:
fastvideo.entrypoints.streaming.router.ReplicaRegistry.select
select() -> Replica | None

Pick the best healthy replica.

Priority order:

  1. The first healthy primary (insertion order).
  2. The first healthy non-primary (insertion order).
  3. None when nothing is healthy.

This MVP picks the first match within each tier; it does NOT load-balance across multiple healthy replicas of the same tier. Round-robin and weighted distribution are deferred until a real N-way active deployment exists.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def select(self) -> Replica | None:
    """Pick the best healthy replica.

    Priority order:

    1. The first healthy primary (insertion order).
    2. The first healthy non-primary (insertion order).
    3. ``None`` when nothing is healthy.

    This MVP picks the first match within each tier; it does NOT
    load-balance across multiple healthy replicas of the same tier.
    Round-robin and weighted distribution are deferred until a real
    N-way active deployment exists.
    """
    healthy_primaries = [r for r in self._replicas.values() if r.primary and r.is_healthy]
    if healthy_primaries:
        return healthy_primaries[0]
    healthy = [r for r in self._replicas.values() if r.is_healthy]
    if healthy:
        return healthy[0]
    return None
fastvideo.entrypoints.streaming.router.RouterConfig dataclass
RouterConfig(host: str = '0.0.0.0', port: int = 9000, replicas: list[ReplicaEndpoint] = list(), health_check_path: str = '/health', health_check_interval_seconds: float = 5.0, health_check_timeout_seconds: float = 2.0, failure_threshold: int = 3, recovery_threshold: int = 2)

Typed router config loaded from a YAML file.

Example::

router:
  host: 0.0.0.0
  port: 9000
  replicas:
    - url: http://streamer-a:8000
      primary: true
    - url: http://streamer-b:8000
  health_check:
    path: /health
    interval_seconds: 5
    failure_threshold: 3

Validation runs in __post_init__: empty replicas, non-positive intervals/timeouts, thresholds < 1, non-http(s) URLs, and more than one primary all raise ValueError so misconfigurations surface at load time rather than as confusing runtime failures.

Functions:
fastvideo.entrypoints.streaming.router.build_router_app
build_router_app(config: RouterConfig, *, registry: ReplicaRegistry | None = None) -> FastAPI

Build the router FastAPI app.

registry can be injected for tests; defaults to one built from config.replicas.

Source code in fastvideo/entrypoints/streaming/router/main.py
def build_router_app(
    config: RouterConfig,
    *,
    registry: ReplicaRegistry | None = None,
) -> FastAPI:
    """Build the router FastAPI app.

    ``registry`` can be injected for tests; defaults to one built from
    ``config.replicas``.
    """
    registry = registry or ReplicaRegistry(config.replicas)
    state = _RouterState(
        config=config,
        registry=registry,
        stop_event=asyncio.Event(),
    )

    @contextlib.asynccontextmanager
    async def _lifespan(_app: FastAPI):
        state.health_task = asyncio.create_task(
            run_health_check_loop(
                registry=state.registry,
                config=state.config,
                stop_event=state.stop_event,
            ))
        try:
            yield
        finally:
            state.stop_event.set()
            if state.health_task is not None:
                with contextlib.suppress(asyncio.CancelledError):
                    await state.health_task

    app = FastAPI(title="FastVideo Streaming Router", lifespan=_lifespan)

    @app.get("/status")
    async def _status() -> JSONResponse:
        return JSONResponse({
            "replicas": [{
                "url": r.url,
                "primary": r.primary,
                "status": r.health.status.value,
                "last_ok_at": r.health.last_ok_at,
                "last_latency_ms": r.health.last_latency_ms,
                "consecutive_failures": r.health.consecutive_failures,
            } for r in state.registry.all()],
        })

    @app.websocket("/v1/stream")
    async def _proxy(websocket: WebSocket) -> None:
        await websocket.accept()
        replica = state.registry.select()
        if replica is None:
            await websocket.send_json({
                "type": "error",
                "code": "gpu_unavailable",
                "message": "router: no healthy replica available",
                "retryable": True,
            })
            await websocket.close(code=1013, reason="no_healthy_replica")
            return

        ws_url = _websocket_url_for(replica.url)
        try:
            await _bridge_session(websocket, ws_url)
        except WebSocketDisconnect:
            logger.info("router: client disconnected")
        except Exception as exc:
            logger.exception("router: bridge failed: %s", exc)
            with contextlib.suppress(RuntimeError):
                await websocket.send_json({
                    "type": "error",
                    "code": "worker_failed",
                    "message": f"router bridge failed: {exc}",
                    "retryable": True,
                })
            with contextlib.suppress(RuntimeError):
                await websocket.close(code=1011)

    app.state.router_state = state
    return app
Modules
fastvideo.entrypoints.streaming.router.config

Typed router configuration.

Classes
fastvideo.entrypoints.streaming.router.config.ReplicaEndpoint dataclass
ReplicaEndpoint(url: str, name: str | None = None, primary: bool = False, weight: float = 1.0)

One backend replica the router can route to.

Attributes
fastvideo.entrypoints.streaming.router.config.ReplicaEndpoint.primary class-attribute instance-attribute
primary: bool = False

True = prefer this replica over others in steady state.

fastvideo.entrypoints.streaming.router.config.ReplicaEndpoint.url instance-attribute
url: str

HTTP base URL, e.g. http://host:8000. WebSocket URL is derived automatically by replacing the scheme.

fastvideo.entrypoints.streaming.router.config.RouterConfig dataclass
RouterConfig(host: str = '0.0.0.0', port: int = 9000, replicas: list[ReplicaEndpoint] = list(), health_check_path: str = '/health', health_check_interval_seconds: float = 5.0, health_check_timeout_seconds: float = 2.0, failure_threshold: int = 3, recovery_threshold: int = 2)

Typed router config loaded from a YAML file.

Example::

router:
  host: 0.0.0.0
  port: 9000
  replicas:
    - url: http://streamer-a:8000
      primary: true
    - url: http://streamer-b:8000
  health_check:
    path: /health
    interval_seconds: 5
    failure_threshold: 3

Validation runs in __post_init__: empty replicas, non-positive intervals/timeouts, thresholds < 1, non-http(s) URLs, and more than one primary all raise ValueError so misconfigurations surface at load time rather than as confusing runtime failures.

fastvideo.entrypoints.streaming.router.main

Router FastAPI entry point.

Exposes the same /v1/stream WebSocket path the backend servers do, accepts a client, picks a healthy replica from the registry, and proxies frames bidirectionally.

PR 7.9 ships the minimum-viable shape: explicit replica list, single primary, JSON + binary passthrough in both directions, and a /status endpoint for operators. Sticky-session routing (so a reconnect lands on the same backend) is left for a follow-up.

Classes Functions:
fastvideo.entrypoints.streaming.router.main.build_router_app
build_router_app(config: RouterConfig, *, registry: ReplicaRegistry | None = None) -> FastAPI

Build the router FastAPI app.

registry can be injected for tests; defaults to one built from config.replicas.

Source code in fastvideo/entrypoints/streaming/router/main.py
def build_router_app(
    config: RouterConfig,
    *,
    registry: ReplicaRegistry | None = None,
) -> FastAPI:
    """Build the router FastAPI app.

    ``registry`` can be injected for tests; defaults to one built from
    ``config.replicas``.
    """
    registry = registry or ReplicaRegistry(config.replicas)
    state = _RouterState(
        config=config,
        registry=registry,
        stop_event=asyncio.Event(),
    )

    @contextlib.asynccontextmanager
    async def _lifespan(_app: FastAPI):
        state.health_task = asyncio.create_task(
            run_health_check_loop(
                registry=state.registry,
                config=state.config,
                stop_event=state.stop_event,
            ))
        try:
            yield
        finally:
            state.stop_event.set()
            if state.health_task is not None:
                with contextlib.suppress(asyncio.CancelledError):
                    await state.health_task

    app = FastAPI(title="FastVideo Streaming Router", lifespan=_lifespan)

    @app.get("/status")
    async def _status() -> JSONResponse:
        return JSONResponse({
            "replicas": [{
                "url": r.url,
                "primary": r.primary,
                "status": r.health.status.value,
                "last_ok_at": r.health.last_ok_at,
                "last_latency_ms": r.health.last_latency_ms,
                "consecutive_failures": r.health.consecutive_failures,
            } for r in state.registry.all()],
        })

    @app.websocket("/v1/stream")
    async def _proxy(websocket: WebSocket) -> None:
        await websocket.accept()
        replica = state.registry.select()
        if replica is None:
            await websocket.send_json({
                "type": "error",
                "code": "gpu_unavailable",
                "message": "router: no healthy replica available",
                "retryable": True,
            })
            await websocket.close(code=1013, reason="no_healthy_replica")
            return

        ws_url = _websocket_url_for(replica.url)
        try:
            await _bridge_session(websocket, ws_url)
        except WebSocketDisconnect:
            logger.info("router: client disconnected")
        except Exception as exc:
            logger.exception("router: bridge failed: %s", exc)
            with contextlib.suppress(RuntimeError):
                await websocket.send_json({
                    "type": "error",
                    "code": "worker_failed",
                    "message": f"router bridge failed: {exc}",
                    "retryable": True,
                })
            with contextlib.suppress(RuntimeError):
                await websocket.close(code=1011)

    app.state.router_state = state
    return app
fastvideo.entrypoints.streaming.router.registry

Replica registry + health-check loop.

The registry tracks the set of known backend replicas and their live health. The router consults it for "pick a backend for this session" decisions and a background task updates it from periodic HTTP probes.

State machine per replica::

HEALTHY ──(N consecutive failures)──▶ UNHEALTHY
   ▲                                     │
   └──────(M consecutive successes)──────┘

Where N = :attr:RouterConfig.failure_threshold and M = :attr:RouterConfig.recovery_threshold.

Attributes
fastvideo.entrypoints.streaming.router.registry.HttpProbe module-attribute
HttpProbe = Any

Structural alias for health-probe callables. Concrete signature is async def __call__(url: str, *, timeout: float) -> tuple[float, str | None]; typing.Callable cannot express keyword-only parameters, so duck-typing is the pragmatic compromise.

Classes
fastvideo.entrypoints.streaming.router.registry.ReplicaRegistry
ReplicaRegistry(replicas: list[ReplicaEndpoint])

Stateful map of replica URL → :class:Replica.

Selection favors primary replicas when healthy; otherwise the first healthy non-primary is returned. When none are healthy, the registry returns None so the router can reject incoming sessions with gpu_unavailable.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def __init__(self, replicas: list[ReplicaEndpoint]) -> None:
    if not replicas:
        raise ValueError("ReplicaRegistry requires at least one replica")
    self._replicas: dict[str, Replica] = {endpoint.url: Replica(endpoint=endpoint) for endpoint in replicas}
    self._lock = asyncio.Lock()
Methods:
fastvideo.entrypoints.streaming.router.registry.ReplicaRegistry.select
select() -> Replica | None

Pick the best healthy replica.

Priority order:

  1. The first healthy primary (insertion order).
  2. The first healthy non-primary (insertion order).
  3. None when nothing is healthy.

This MVP picks the first match within each tier; it does NOT load-balance across multiple healthy replicas of the same tier. Round-robin and weighted distribution are deferred until a real N-way active deployment exists.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def select(self) -> Replica | None:
    """Pick the best healthy replica.

    Priority order:

    1. The first healthy primary (insertion order).
    2. The first healthy non-primary (insertion order).
    3. ``None`` when nothing is healthy.

    This MVP picks the first match within each tier; it does NOT
    load-balance across multiple healthy replicas of the same tier.
    Round-robin and weighted distribution are deferred until a real
    N-way active deployment exists.
    """
    healthy_primaries = [r for r in self._replicas.values() if r.primary and r.is_healthy]
    if healthy_primaries:
        return healthy_primaries[0]
    healthy = [r for r in self._replicas.values() if r.is_healthy]
    if healthy:
        return healthy[0]
    return None
Functions:
fastvideo.entrypoints.streaming.router.registry.run_health_check_loop async
run_health_check_loop(registry: ReplicaRegistry, config: RouterConfig, *, stop_event: Event, http_get: HttpProbe | None = None) -> None

Poll all replicas' health endpoints in parallel on a fixed interval.

http_get is pluggable so unit tests can inject a deterministic probe without hitting the network. The default builds a single httpx.AsyncClient shared across the loop's lifetime so the common case (steady polling against a stable replica set) reuses TCP/TLS connections instead of paying handshake cost per probe.

Probes within one polling cycle run concurrently via asyncio.gather so a slow replica doesn't push the cycle past health_check_interval_seconds.

Source code in fastvideo/entrypoints/streaming/router/registry.py
async def run_health_check_loop(
    registry: ReplicaRegistry,
    config: RouterConfig,
    *,
    stop_event: asyncio.Event,
    http_get: HttpProbe | None = None,
) -> None:
    """Poll all replicas' health endpoints in parallel on a fixed interval.

    ``http_get`` is pluggable so unit tests can inject a deterministic
    probe without hitting the network. The default builds a single
    ``httpx.AsyncClient`` shared across the loop's lifetime so the
    common case (steady polling against a stable replica set) reuses
    TCP/TLS connections instead of paying handshake cost per probe.

    Probes within one polling cycle run concurrently via ``asyncio.gather``
    so a slow replica doesn't push the cycle past
    ``health_check_interval_seconds``.
    """
    if http_get is not None:
        await _run_loop(registry, config, stop_event, http_get)
        return
    async with _build_default_probe(config) as probe:
        await _run_loop(registry, config, stop_event, probe)
fastvideo.entrypoints.streaming.server

Single-generator FastAPI + WebSocket streaming server.

Classes
Functions:
fastvideo.entrypoints.streaming.server.build_app
build_app(serve_config: ServeConfig, generator: _GeneratorProto | None = None, *, pool: GpuPool | None = None, session_store: SessionStore | None = None) -> FastAPI

Build the FastAPI app used by :func:run_server.

Exposed so tests can drive the WebSocket endpoint in-process via starlette.testclient.TestClient(app).websocket_connect(...).

Exactly one of generator (backed by :class:InProcessGpuPool) or pool (for the subprocess-backed production shape) must be given.

Source code in fastvideo/entrypoints/streaming/server.py
def build_app(
    serve_config: ServeConfig,
    generator: _GeneratorProto | None = None,
    *,
    pool: GpuPool | None = None,
    session_store: SessionStore | None = None,
) -> FastAPI:
    """Build the FastAPI app used by :func:`run_server`.

    Exposed so tests can drive the WebSocket endpoint in-process via
    ``starlette.testclient.TestClient(app).websocket_connect(...)``.

    Exactly one of ``generator`` (backed by :class:`InProcessGpuPool`)
    or ``pool`` (for the subprocess-backed production shape) must be
    given.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming "
                         "server; got None. Add a `streaming:` block to your serve config.")
    streaming = serve_config.streaming
    if (generator is None) == (pool is None):
        raise ValueError("build_app requires exactly one of `generator` or `pool`")

    store = session_store or InMemorySessionStore()
    if pool is None:
        assert generator is not None
        pool = InProcessGpuPool(generator, session_store=store)

    sessions = SessionManager(
        segment_cap=serve_config.streaming.generation_segment_cap,
        session_timeout_seconds=serve_config.streaming.session_timeout_seconds,
    )
    state = ServerState(
        serve_config=serve_config,
        pool=pool,
        sessions=sessions,
        session_store=store,
    )

    app = FastAPI(title="FastVideo Streaming")

    @app.get("/health")
    async def _health() -> JSONResponse:
        return JSONResponse({
            "status": "ok",
            "sessions": len(state.sessions),
            "stream_mode": streaming.stream_mode,
        })

    app.include_router(build_health_router(pool))

    @app.websocket("/v1/stream")
    async def _stream(websocket: WebSocket) -> None:
        await websocket.accept()
        try:
            session = state.sessions.create()
        except SessionRejected as exc:
            await _send_error(websocket, "session_rejected", str(exc), retryable=False)
            await websocket.close(code=_WS_CLOSE_TRY_AGAIN_LATER, reason="session_rejected")
            return

        try:
            await _handle_session(websocket, session, state)
        except WebSocketDisconnect:
            logger.info("session %s: client disconnected", session.id[:8])
        except Exception:  # pragma: no cover - defensive catch-all
            logger.exception("session %s: unhandled error", session.id[:8])
            with contextlib.suppress(InvalidSessionTransition):
                session.transition(SessionState.ERROR)
        finally:
            with contextlib.suppress(Exception):
                await state.pool.release(session.id)
            _cleanup_session(session, state)

    app.state.server_state = state
    return app
fastvideo.entrypoints.streaming.server.run_server
run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None

Launch the streaming server.

Boots a :class:fastvideo.VideoGenerator from serve_config.generator unless generator is provided, then serves build_app(...) via uvicorn.

Source code in fastvideo/entrypoints/streaming/server.py
def run_server(serve_config: ServeConfig, *, generator: _GeneratorProto | None = None) -> None:
    """Launch the streaming server.

    Boots a :class:`fastvideo.VideoGenerator` from
    ``serve_config.generator`` unless ``generator`` is provided, then
    serves ``build_app(...)`` via uvicorn.
    """
    if serve_config.streaming is None:
        raise ValueError("ServeConfig.streaming must be set to launch the streaming server; "
                         "got None. Add a `streaming:` block to your serve config.")

    import uvicorn

    if generator is None:
        from fastvideo import VideoGenerator  # lazy to avoid boot cost

        generator = VideoGenerator.from_pretrained(config=serve_config.generator)
    app = build_app(serve_config, generator)
    uvicorn.run(
        app,
        host=serve_config.server.host,
        port=serve_config.server.port,
    )
fastvideo.entrypoints.streaming.session

Per-connection session lifecycle for the streaming server.

Each WebSocket opens exactly one :class:Session. :class:SessionManager enforces the generation_segment_cap and session_timeout_seconds budgets from :class:fastvideo.api.StreamingConfig.

Classes
fastvideo.entrypoints.streaming.session.InvalidSessionTransition

Bases: RuntimeError

Raised when a session is asked to transition along an illegal edge.

fastvideo.entrypoints.streaming.session.Session dataclass
Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())
Methods:
fastvideo.entrypoints.streaming.session.Session.transition
transition(target: SessionState) -> None

Move to target if the edge is allowed.

Raises :class:InvalidSessionTransition on illegal moves. The self-loop on ACTIVE is legal so the server can re-assert ACTIVE on segment completion without special casing.

Source code in fastvideo/entrypoints/streaming/session.py
def transition(self, target: SessionState) -> None:
    """Move to ``target`` if the edge is allowed.

    Raises :class:`InvalidSessionTransition` on illegal moves. The
    self-loop on ``ACTIVE`` is legal so the server can re-assert
    ACTIVE on segment completion without special casing.
    """
    allowed = _VALID_TRANSITIONS.get(self.state, frozenset())
    if target not in allowed and target is not self.state:
        raise InvalidSessionTransition(f"{self.state.value} -> {target.value} is not a valid "
                                       f"session transition")
    self.state = target
    self.last_activity = time.monotonic()
fastvideo.entrypoints.streaming.session.SessionManager
SessionManager(*, segment_cap: int, session_timeout_seconds: int, max_sessions: int = 1)

Registers sessions and enforces per-server session limits.

Source code in fastvideo/entrypoints/streaming/session.py
def __init__(
    self,
    *,
    segment_cap: int,
    session_timeout_seconds: int,
    max_sessions: int = 1,
) -> None:
    self._segment_cap = segment_cap
    self._session_timeout_seconds = session_timeout_seconds
    self._max_sessions = max_sessions
    self._sessions: dict[str, Session] = {}
Methods:
fastvideo.entrypoints.streaming.session.SessionManager.reap_timed_out
reap_timed_out(now: float | None = None) -> list[str]

Return the ids of sessions that have exceeded the idle timeout.

The caller is responsible for actually closing them — this method only identifies dead sessions so the server can emit session_timeout frames before dropping the WebSocket.

TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.

Source code in fastvideo/entrypoints/streaming/session.py
def reap_timed_out(self, now: float | None = None) -> list[str]:
    """Return the ids of sessions that have exceeded the idle timeout.

    The caller is responsible for actually closing them — this
    method only *identifies* dead sessions so the server can emit
    ``session_timeout`` frames before dropping the WebSocket.

    TODO: unused until a background driver calls it. Per-connection
    idle enforcement currently happens via asyncio.wait_for on
    receive_json; this helper catches sessions stuck before any
    receive (e.g. future QUEUED state) and is expected to be wired
    into the GPU-pool reaper.
    """
    now = now if now is not None else time.monotonic()
    dead: list[str] = []
    for sid, session in self._sessions.items():
        if session.state in {
                SessionState.COMPLETE,
                SessionState.ERROR,
                SessionState.TIMEOUT,
                SessionState.REJECTED,
        }:
            continue
        if now - session.last_activity > self._session_timeout_seconds:
            dead.append(sid)
    return dead
fastvideo.entrypoints.streaming.session.SessionRejected

Bases: RuntimeError

Raised when session creation fails (queue full, auth, etc.).

fastvideo.entrypoints.streaming.session.SessionState

Bases: Enum

State-machine positions for a streaming session.

Transitions are server-owned. See docs/design/server_contracts/streaming.md for the full diagram.

fastvideo.entrypoints.streaming.session_init_image

Persist the initial-image blob attached to a streaming session.

Classes
fastvideo.entrypoints.streaming.session_init_image.SessionInitImage dataclass
SessionInitImage(path: str, display_name: str, mime: str)

Location of the persisted init image.

Callers pass path to InputConfig.image_path; display_name is only used for logs.

Functions:
fastvideo.entrypoints.streaming.session_init_image.persist_session_init_image
persist_session_init_image(payload: Any, *, output_dir: str | None = None) -> SessionInitImage | None

Decode a client init-image blob and persist it to disk.

payload shape (matches the internal UI protocol)::

{
    "mime": "image/png",
    "name": "ref.png",
    "data": "<base64 bytes>",
}

Returns None when payload is falsy (no init image). Raises :class:ValueError on schema / size / decode errors so the caller can surface a user-facing error frame.

Source code in fastvideo/entrypoints/streaming/session_init_image.py
def persist_session_init_image(
    payload: Any,
    *,
    output_dir: str | None = None,
) -> SessionInitImage | None:
    """Decode a client init-image blob and persist it to disk.

    ``payload`` shape (matches the internal UI protocol)::

        {
            "mime": "image/png",
            "name": "ref.png",
            "data": "<base64 bytes>",
        }

    Returns ``None`` when ``payload`` is falsy (no init image). Raises
    :class:`ValueError` on schema / size / decode errors so the caller
    can surface a user-facing ``error`` frame.
    """
    if not payload:
        return None
    if not isinstance(payload, dict):
        raise ValueError("session init image must be an object")

    mime = payload.get("mime")
    if mime not in _ACCEPTED_MIMES:
        raise ValueError(f"session init image mime {mime!r} is not one of "
                         f"{sorted(_ACCEPTED_MIMES)}")
    data_b64 = payload.get("data")
    if not isinstance(data_b64, str):
        raise ValueError("session init image data must be a base64 string")
    try:
        data = base64.b64decode(data_b64, validate=True)
    except (binascii.Error, ValueError) as exc:
        raise ValueError(f"session init image data is not valid base64: {exc}") from exc
    if len(data) > _MAX_IMAGE_BYTES:
        raise ValueError(f"session init image is {len(data)} bytes; limit is "
                         f"{_MAX_IMAGE_BYTES}")
    if len(data) == 0:
        raise ValueError("session init image data is empty")

    ext = _ACCEPTED_MIMES[mime]
    display_name = _sanitize_display_name(payload.get("name")) or f"init{ext}"
    fd, path = tempfile.mkstemp(prefix="fastvideo-init-", suffix=ext, dir=output_dir)
    try:
        with os.fdopen(fd, "wb") as f:
            f.write(data)
    except Exception:
        with contextlib.suppress(FileNotFoundError):
            os.unlink(path)
        raise
    return SessionInitImage(path=path, display_name=display_name, mime=mime)
fastvideo.entrypoints.streaming.session_logger

Per-session JSONL event logger.

Each session gets its own JSONL file under the configured log root so post-hoc analytics (enhancer latency, GPU assignment, segment timings) can be recovered without a tracing backend. The internal UI uses this format; keeping the same shape makes log tooling portable.

Classes
fastvideo.entrypoints.streaming.session_logger.SessionLogEvent dataclass
SessionLogEvent(session_id: str, event: str, payload: dict[str, Any] = dict(), ts: float = time())

One line in the session JSONL file.

fastvideo.entrypoints.streaming.session_logger.SessionLogger
SessionLogger(log_dir: str | None)

Append-only JSONL logger keyed by session id.

Thread-safe; the server may be writing from multiple asyncio tasks (fMP4 encoder thread + control-frame handler) for the same session.

Source code in fastvideo/entrypoints/streaming/session_logger.py
def __init__(self, log_dir: str | None) -> None:
    self._log_dir = log_dir
    self._files: dict[str, TextIO] = {}
    self._locks: dict[str, threading.Lock] = {}
    self._registry_lock = threading.Lock()
    self._ensure_dir()
fastvideo.entrypoints.streaming.session_store

Session state store for the FastVideo streaming server.

The streaming server keeps continuation state (decoded frames + audio latents from the previous segment) server-side so the client doesn't re-upload multi-megabyte tensors each WebSocket message. Two operations are needed:

  • snapshot(session_id) -> ContinuationState — serialize the current state so it can be exported (e.g. over HTTP) or migrated to a different server.
  • hydrate(state) -> session_id — load a previously serialized state into a new session (for resume-after-disconnect flows).

The store is an ABC with an :class:InMemorySessionStore default; Redis or other backends can drop in without touching the pipeline.

Large tensor payloads (video frames, audio latents) are kept out of the JSON payload via an accompanying :class:BlobStore. Both stores share a process today; they are separate types so that a future implementation can put blobs on S3 while keeping session metadata in Redis.

Classes
fastvideo.entrypoints.streaming.session_store.BlobStore

Bases: ABC

Opaque byte-blob storage keyed by id.

A :class:ContinuationState payload can reference large tensors stored in a :class:BlobStore rather than inlining them, so the JSON payload stays small when the state travels over the wire.

Methods:
fastvideo.entrypoints.streaming.session_store.BlobStore.drop abstractmethod
drop(blob_id: str) -> None

Remove a blob. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, blob_id: str) -> None:
    """Remove a blob. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.session_store.BlobStore.get abstractmethod
get(blob_id: str) -> bytes

Load a previously stored blob. Raises KeyError if absent.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def get(self, blob_id: str) -> bytes:
    """Load a previously stored blob. Raises ``KeyError`` if absent."""
fastvideo.entrypoints.streaming.session_store.BlobStore.put abstractmethod
put(data: bytes, *, mime: str = 'application/octet-stream') -> str

Store data and return a blob id for later retrieval.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def put(self, data: bytes, *, mime: str = "application/octet-stream") -> str:
    """Store ``data`` and return a blob id for later retrieval."""
fastvideo.entrypoints.streaming.session_store.InMemoryBlobStore
InMemoryBlobStore()

Bases: BlobStore

Thread-safe in-memory :class:BlobStore for single-process servers.

No eviction policy — callers are responsible for calling :meth:drop when a blob's owning state is replaced or a session ends. A redis- or filesystem-backed :class:BlobStore should replace this when the streaming server lands as a real service (PR 7.5+).

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._blobs: dict[str, _BlobRecord] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.session_store.InMemorySessionStore
InMemorySessionStore()

Bases: SessionStore

Thread-safe in-memory :class:SessionStore.

Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.

No eviction / TTL / bounded capacity — sessions only leave via :meth:drop. The live streaming server (PR 7.5+) is responsible for bounding growth and for dropping any :class:BlobStore blobs referenced by a state when that state is replaced or a session ends; this class does not know about blobs.

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._sessions: dict[str, ContinuationState] = {}
    self._lock = threading.Lock()
fastvideo.entrypoints.streaming.session_store.SessionStore

Bases: ABC

Keyed store for per-session continuation state.

Implementations own the session-id → state mapping. The streaming server calls :meth:store after each segment and :meth:snapshot when a client explicitly asks for an exportable state handle.

Methods:
fastvideo.entrypoints.streaming.session_store.SessionStore.drop abstractmethod
drop(session_id: str) -> None

Forget a session. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, session_id: str) -> None:
    """Forget a session. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.session_store.SessionStore.hydrate abstractmethod
hydrate(state: ContinuationState, *, session_id: str | None = None) -> str

Install state as the starting point for a session.

When session_id is None the store allocates a fresh id (UUID4); when provided the store uses it verbatim, overwriting any prior state at that id.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def hydrate(
    self,
    state: ContinuationState,
    *,
    session_id: str | None = None,
) -> str:
    """Install ``state`` as the starting point for a session.

    When ``session_id`` is ``None`` the store allocates a fresh id
    (UUID4); when provided the store uses it verbatim, overwriting
    any prior state at that id.
    """
fastvideo.entrypoints.streaming.session_store.SessionStore.snapshot abstractmethod
snapshot(session_id: str) -> ContinuationState | None

Return the current state for session_id (or None).

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def snapshot(self, session_id: str) -> ContinuationState | None:
    """Return the current state for ``session_id`` (or ``None``)."""
fastvideo.entrypoints.streaming.session_store.SessionStore.store abstractmethod
store(session_id: str, state: ContinuationState) -> None

Persist state for session_id, replacing any prior value.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def store(self, session_id: str, state: ContinuationState) -> None:
    """Persist ``state`` for ``session_id``, replacing any prior value."""
fastvideo.entrypoints.streaming.stream

fMP4 stream encoder used by the streaming server.

The client's Media Source Extensions player needs a continuous fMP4 byte stream: first an initialization segment (ftyp + moov), then one or more media segments (moof + mdat). We pipe raw RGB frames into an ffmpeg subprocess configured for fragmented output via -movflags empty_moov+default_base_moof+frag_keyframe+faststart and stream the bytes back out.

Classes
fastvideo.entrypoints.streaming.stream.FragmentedMP4Chunk dataclass
FragmentedMP4Chunk(kind: Literal['init', 'media'], data: bytes, stream_id: str, segment_idx: int)

A single fMP4 byte chunk emitted by :class:FragmentedMP4Encoder.

kind identifies whether the chunk is the init segment (must be fed into the client's SourceBuffer first) or a media fragment.

fastvideo.entrypoints.streaming.stream.FragmentedMP4Encoder
FragmentedMP4Encoder(*, width: int, height: int, fps: int, segment_idx: int, stream_id: str | None = None, ffmpeg_path: str = 'ffmpeg', preset: str = 'ultrafast', pixel_format_out: str = 'yuv420p', extra_args: list[str] | None = None)

Stream RGB frames in, fMP4 chunks out.

One encoder covers one segment. The server creates a new encoder per :class:`ltx2_segment_start`` boundary so each segment becomes one media fragment the client can append independently.

Example::

encoder = FragmentedMP4Encoder(width=1024, height=576, fps=24,
                                segment_idx=0)
async with encoder:
    async for chunk in encoder.encode(frames):
        await websocket.send_bytes(chunk.data)
Source code in fastvideo/entrypoints/streaming/stream.py
def __init__(
    self,
    *,
    width: int,
    height: int,
    fps: int,
    segment_idx: int,
    stream_id: str | None = None,
    ffmpeg_path: str = "ffmpeg",
    preset: str = "ultrafast",
    pixel_format_out: str = "yuv420p",
    extra_args: list[str] | None = None,
) -> None:
    self.width = width
    self.height = height
    self.fps = fps
    self.segment_idx = segment_idx
    self.stream_id = stream_id or uuid.uuid4().hex
    self._ffmpeg_path = ffmpeg_path
    self._preset = preset
    self._pixel_format_out = pixel_format_out
    self._extra_args = list(extra_args or [])
    self._proc: subprocess.Popen | None = None
    self._init_emitted = False
Methods:
fastvideo.entrypoints.streaming.stream.FragmentedMP4Encoder.encode async
encode(frames: list[ndarray] | AsyncIterator[ndarray]) -> AsyncIterator[FragmentedMP4Chunk]

Feed frames into ffmpeg and yield fMP4 chunks as they appear.

Source code in fastvideo/entrypoints/streaming/stream.py
async def encode(
    self,
    frames: list[np.ndarray] | AsyncIterator[np.ndarray],
) -> AsyncIterator[FragmentedMP4Chunk]:
    """Feed frames into ffmpeg and yield fMP4 chunks as they appear."""
    if self._proc is None:
        self._spawn()
    assert self._proc is not None and self._proc.stdin is not None
    proc = self._proc

    loop = asyncio.get_running_loop()

    async def _writer() -> None:
        try:
            if hasattr(frames, "__aiter__"):
                async for frame in frames:  # type: ignore[union-attr]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
            else:
                for frame in frames:  # type: ignore[assignment]
                    await loop.run_in_executor(None, _write_frame, proc.stdin, frame)
        finally:
            with contextlib.suppress(BrokenPipeError):
                proc.stdin.close()

    writer_task = asyncio.create_task(_writer())
    try:
        reader = proc.stdout
        assert reader is not None
        # Read in reasonably-sized chunks; MSE tolerates any size
        # but we don't want to starve the event loop.
        chunk_size = 64 * 1024
        while True:
            data = await loop.run_in_executor(None, reader.read, chunk_size)
            if not data:
                break
            kind: Literal["init", "media"] = "init" if not self._init_emitted else "media"
            self._init_emitted = True
            yield FragmentedMP4Chunk(
                kind=kind,
                data=bytes(data),
                stream_id=self.stream_id,
                segment_idx=self.segment_idx,
            )
    finally:
        await writer_task
fastvideo.entrypoints.streaming.worker

Per-GPU worker subprocess entry for :class:SubprocessGpuPool.

The pool manages binding, lifecycle, and message dispatch in the parent process. The worker constructs its :class:VideoGenerator from a typed :class:GeneratorConfig, runs the two-segment warmup so both initial-segment and continuation-branch compile graphs are hot, and then loops on the job queue.

Functions:
fastvideo.entrypoints.streaming.worker.worker_main
worker_main(*, gpu_id: int, worker_id: str, generator_config: GeneratorConfig, warmup_config: WarmupConfig, job_queue: Queue, result_queue: Queue, shutdown_event: Any) -> None

Per-worker subprocess entry.

Runs inside the child spawned by SubprocessGpuPool. Blocking VideoGenerator construction + generation happens here, not in the parent's event loop.

Source code in fastvideo/entrypoints/streaming/worker.py
def worker_main(
    *,
    gpu_id: int,
    worker_id: str,
    generator_config: GeneratorConfig,
    warmup_config: WarmupConfig,
    job_queue: mp.Queue,
    result_queue: mp.Queue,
    shutdown_event: Any,
) -> None:  # pragma: no cover - exercised via integration only
    """Per-worker subprocess entry.

    Runs inside the child spawned by ``SubprocessGpuPool``. Blocking
    ``VideoGenerator`` construction + generation happens here, not in
    the parent's event loop.
    """
    import os

    os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
    try:
        from fastvideo import VideoGenerator

        generator = VideoGenerator.from_pretrained(config=generator_config)
        if warmup_config.enabled:
            _warmup_worker(generator, warmup_config)
        result_queue.put({"kind": "ready", "worker_id": worker_id})
    except Exception as exc:
        result_queue.put({"kind": "error", "error": repr(exc)})
        return

    while not shutdown_event.is_set():
        try:
            item = job_queue.get(timeout=0.5)
        except queue.Empty:
            continue
        if item is None:
            break
        job_id = item["job_id"]
        request = item["request"]
        try:
            result = generator.generate(request)
            result_queue.put({
                "kind": "result",
                "job_id": job_id,
                "result": result,
            })
        except Exception as exc:
            result_queue.put({
                "kind": "error",
                "job_id": job_id,
                "error": repr(exc),
            })

fastvideo.entrypoints.streaming_generator

Classes

fastvideo.entrypoints.streaming_generator.StreamingVideoGenerator
StreamingVideoGenerator(fastvideo_args: FastVideoArgs, executor_class: type[Executor], log_stats: bool, use_queue_mode: bool = True)

Bases: VideoGenerator

This class extends VideoGenerator with streaming capabilities, allowing incremental video generation with step-by-step control.

Source code in fastvideo/entrypoints/streaming_generator.py
def __init__(self,
             fastvideo_args: FastVideoArgs,
             executor_class: type[Executor],
             log_stats: bool,
             use_queue_mode: bool = True):
    super().__init__(fastvideo_args, executor_class, log_stats)
    self.accumulated_frames: list[np.ndarray] = []
    self.sampling_param: SamplingParam | None = None
    self.batch: ForwardBatch | None = None
    self._use_queue_mode = use_queue_mode and isinstance(self.executor, MultiprocExecutor)
    self.writer: IncrementalVideoWriter | None = None
    self.block_dir: str | None = None
    self.block_idx: int = 0

Functions:

fastvideo.entrypoints.video_generator

VideoGenerator module for FastVideo.

This module provides a consolidated interface for generating videos using diffusion models.

Classes

fastvideo.entrypoints.video_generator.VideoGenerator
VideoGenerator(fastvideo_args: FastVideoArgs, executor_class: type[Executor], log_stats: bool, *, log_queue=None)

A unified class for generating videos using diffusion models.

This class provides a simple interface for video generation with rich customization options, similar to popular frameworks like HF Diffusers.

Initialize the video generator.

Parameters:

Name Type Description Default
fastvideo_args FastVideoArgs

The inference arguments

required
executor_class type[Executor]

The executor class to use for inference

required
log_stats bool

Whether to log statistics

required
log_queue

Optional multiprocessing.Queue to forward worker logs to

None
Source code in fastvideo/entrypoints/video_generator.py
def __init__(
    self,
    fastvideo_args: FastVideoArgs,
    executor_class: type[Executor],
    log_stats: bool,
    *,
    log_queue=None,
):
    """
    Initialize the video generator.

    Args:
        fastvideo_args: The inference arguments
        executor_class: The executor class to use for inference
        log_stats: Whether to log statistics
        log_queue: Optional multiprocessing.Queue to forward worker logs to
    """
    self.config: GeneratorConfig | None = None
    self.fastvideo_args = fastvideo_args
    self.executor = executor_class(fastvideo_args, log_queue=log_queue)
Methods:
fastvideo.entrypoints.video_generator.VideoGenerator.default_health_check_request staticmethod
default_health_check_request() -> GenerationRequest

Return the minimal typed request Dynamo uses for probes.

256x256, 8 frames, 1 inference step -- fast enough to be a viable liveness check, non-trivial enough to exercise the DiT -> VAE -> decode path. Consumers adapt this shape to their transport's health-check payload (see docs/design/server_contracts/dynamo.md).

Source code in fastvideo/entrypoints/video_generator.py
@staticmethod
def default_health_check_request() -> GenerationRequest:
    """Return the minimal typed request Dynamo uses for probes.

    256x256, 8 frames, 1 inference step -- fast enough to be a
    viable liveness check, non-trivial enough to exercise the
    DiT -> VAE -> decode path. Consumers adapt this shape to their
    transport's health-check payload (see
    ``docs/design/server_contracts/dynamo.md``).
    """
    return GenerationRequest(
        prompt="health check",
        inputs=InputConfig(),
        sampling=SamplingConfig(
            num_frames=8,
            height=256,
            width=256,
            fps=24,
            num_inference_steps=1,
            guidance_scale=1.0,
        ),
        output=OutputConfig(save_video=False, return_frames=False),
    )
fastvideo.entrypoints.video_generator.VideoGenerator.from_fastvideo_args classmethod
from_fastvideo_args(fastvideo_args: FastVideoArgs, *, log_queue=None) -> VideoGenerator

Create a video generator with the specified arguments.

Parameters:

Name Type Description Default
fastvideo_args FastVideoArgs

The inference arguments

required
log_queue

Optional multiprocessing.Queue to forward worker logs to

None

Returns:

Type Description
VideoGenerator

The created video generator

Source code in fastvideo/entrypoints/video_generator.py
@classmethod
def from_fastvideo_args(
    cls,
    fastvideo_args: FastVideoArgs,
    *,
    log_queue=None,
) -> "VideoGenerator":
    """
    Create a video generator with the specified arguments.

    Args:
        fastvideo_args: The inference arguments
        log_queue: Optional multiprocessing.Queue to forward worker logs to

    Returns:
        The created video generator
    """
    # Initialize distributed environment if needed
    # initialize_distributed_and_parallelism(fastvideo_args)

    executor_class = Executor.get_class(fastvideo_args)
    return cls(
        fastvideo_args=fastvideo_args,
        executor_class=executor_class,
        log_stats=False,  # TODO: implement
        log_queue=log_queue,
    )
fastvideo.entrypoints.video_generator.VideoGenerator.from_pretrained classmethod
from_pretrained(model_path: str | GeneratorConfig | Mapping[str, Any] | None = None, **kwargs) -> VideoGenerator

Create a video generator from a pretrained model.

Parameters:

Name Type Description Default
model_path str | GeneratorConfig | Mapping[str, Any] | None

Path or identifier for the pretrained model

None
pipeline_config

Pipeline config to use for inference

required
**kwargs

Additional arguments to customize model loading, set any FastVideoArgs or PipelineConfig attributes here.

{}

Returns:

Type Description
VideoGenerator

The created video generator

Priority level: Default pipeline config < User's pipeline config < User's kwargs

Stable convenience kwargs remain supported here for common engine and offload settings. Advanced model- or pipeline-specific options should move to VideoGenerator.from_config(...).

Source code in fastvideo/entrypoints/video_generator.py
@classmethod
def from_pretrained(
    cls,
    model_path: str | GeneratorConfig | Mapping[str, Any] | None = None,
    **kwargs,
) -> "VideoGenerator":
    """
    Create a video generator from a pretrained model.

    Args:
        model_path: Path or identifier for the pretrained model
        pipeline_config: Pipeline config to use for inference
        **kwargs: Additional arguments to customize model loading, set any FastVideoArgs or PipelineConfig attributes here.

    Returns:
        The created video generator

    Priority level: Default pipeline config < User's pipeline config < User's kwargs

    Stable convenience kwargs remain supported here for common engine and
    offload settings. Advanced model- or pipeline-specific options should
    move to VideoGenerator.from_config(...).
    """
    log_queue = kwargs.pop("log_queue", None)
    if kwargs.pop("nvfp4_fa4", False):
        import os
        os.environ["FASTVIDEO_NVFP4_FA4"] = "1"
        os.environ.setdefault("CUTE_DSL_ENABLE_TVM_FFI", "1")
    typed_config = kwargs.pop("config", None)
    if typed_config is not None:
        if model_path is not None:
            raise TypeError("Pass either model_path or config to from_pretrained, not both")
        if kwargs:
            unexpected = ", ".join(sorted(kwargs))
            raise TypeError(f"Unexpected keyword arguments with config: {unexpected}")
        return cls.from_config(typed_config, log_queue=log_queue)

    if isinstance(model_path, GeneratorConfig | Mapping):
        if kwargs:
            unexpected = ", ".join(sorted(kwargs))
            raise TypeError(f"Unexpected keyword arguments with typed config: {unexpected}")
        return cls.from_config(model_path, log_queue=log_queue)

    if model_path is None:
        raise TypeError("model_path or config is required")

    legacy_only_kwargs = sorted(set(kwargs) - _FROM_PRETRAINED_CONVENIENCE_KWARGS)
    if legacy_only_kwargs:
        warnings.warn(
            "VideoGenerator.from_pretrained(...) received legacy-only kwargs "
            f"({', '.join(legacy_only_kwargs)}); prefer VideoGenerator.from_config(...) "
            "for advanced configuration.",
            DeprecationWarning,
            stacklevel=2,
        )
    return cls.from_config(
        legacy_from_pretrained_to_config(model_path, kwargs),
        log_queue=log_queue,
    )
fastvideo.entrypoints.video_generator.VideoGenerator.generate
generate(request: GenerationRequest | Mapping[str, Any], *, log_queue=None) -> GenerationResult | list[GenerationResult]

Generate video or image outputs from a typed inference request.

Parameters:

Name Type Description Default
request GenerationRequest | Mapping[str, Any]

A GenerationRequest instance or a mapping that can be parsed into one. This is the primary public inference entrypoint for the typed API.

required
log_queue

Optional multiprocessing.Queue to forward worker logs to during this request.

None

Returns:

Type Description
GenerationResult | list[GenerationResult]

A GenerationResult for single-request generation, or a list of

GenerationResult | list[GenerationResult]

GenerationResult objects when the request expands into multiple

GenerationResult | list[GenerationResult]

prompts.

Source code in fastvideo/entrypoints/video_generator.py
def generate(
    self,
    request: GenerationRequest | Mapping[str, Any],
    *,
    log_queue=None,
) -> GenerationResult | list[GenerationResult]:
    """
    Generate video or image outputs from a typed inference request.

    Args:
        request: A `GenerationRequest` instance or a mapping that can be
            parsed into one. This is the primary public inference
            entrypoint for the typed API.
        log_queue: Optional multiprocessing.Queue to forward worker logs to
            during this request.

    Returns:
        A `GenerationResult` for single-request generation, or a list of
        `GenerationResult` objects when the request expands into multiple
        prompts.
    """
    normalized_request = normalize_generation_request(request)
    if log_queue:
        self.executor.set_log_queue(log_queue)

    try:
        return self._generate_request_impl(normalized_request)
    finally:
        if log_queue:
            self.executor.clear_log_queue()
fastvideo.entrypoints.video_generator.VideoGenerator.generate_async async
generate_async(request: GenerationRequest | Mapping[str, Any], *, log_queue=None)

Async generation that yields typed :class:VideoEvents.

Three consumers share this substrate:

  • Streaming server (:mod:fastvideo.entrypoints.streaming) — pipes :class:VideoPartialEvent frames into fMP4.
  • Stateless OpenAI server — ignores progress events, forwards :class:VideoFinalEvent as the HTTP response body.
  • Dynamo native backend (components/src/dynamo/fastvideo/) — wraps each event as an NvVideosResponse chunk.

The aggregated code path shipped here yields a single :class:VideoProgressEvent at start and one :class:VideoFinalEvent at end. Future work will thread per-step progress events through the pipeline's denoise loop so streaming consumers don't have to wait on a materialized final.

Source code in fastvideo/entrypoints/video_generator.py
async def generate_async(
    self,
    request: GenerationRequest | Mapping[str, Any],
    *,
    log_queue=None,
):
    """Async generation that yields typed :class:`VideoEvent`s.

    Three consumers share this substrate:

    * Streaming server (:mod:`fastvideo.entrypoints.streaming`) —
      pipes :class:`VideoPartialEvent` frames into fMP4.
    * Stateless OpenAI server — ignores progress events, forwards
      :class:`VideoFinalEvent` as the HTTP response body.
    * Dynamo native backend
      (``components/src/dynamo/fastvideo/``) — wraps each event as
      an ``NvVideosResponse`` chunk.

    The aggregated code path shipped here yields a single
    :class:`VideoProgressEvent` at start and one
    :class:`VideoFinalEvent` at end. Future work will thread
    per-step progress events through the pipeline's denoise loop
    so streaming consumers don't have to wait on a materialized
    final.
    """
    import asyncio

    normalized = normalize_generation_request(request)
    total_steps = max(1, normalized.sampling.num_inference_steps)
    yield VideoProgressEvent(step=0, total_steps=total_steps, stage="denoise")

    if log_queue:
        self.executor.set_log_queue(log_queue)
    try:
        result = await asyncio.to_thread(self._generate_request_impl, normalized)
    finally:
        if log_queue:
            self.executor.clear_log_queue()

    if isinstance(result, list):
        # Prompt-batch expansion — emit one Final per sub-result.
        for sub in result:
            yield await asyncio.to_thread(_final_event_from_result, sub)
        return
    yield await asyncio.to_thread(_final_event_from_result, result)
fastvideo.entrypoints.video_generator.VideoGenerator.generate_video
generate_video(prompt: str | None = None, sampling_param: SamplingParam | None = None, mouse_cond: Tensor | None = None, keyboard_cond: Tensor | None = None, grid_sizes: tuple[int, int, int] | list[int] | Tensor | None = None, **kwargs) -> dict[str, Any] | list[dict[str, Any]]

Generate a video based on the given prompt.

Parameters:

Name Type Description Default
prompt str | None

The prompt to use for generation (optional if prompt_txt is provided)

None
negative_prompt

The negative prompt to use (overrides the one in fastvideo_args)

required
output_path

Path to save the video (overrides the one in fastvideo_args)

required
prompt_path

Path to prompt file

required
save_video

Whether to save the video to disk

required
return_frames

Whether to include raw frames in the result dict

required
num_inference_steps

Number of denoising steps (overrides fastvideo_args)

required
guidance_scale

Classifier-free guidance scale (overrides fastvideo_args)

required
num_frames

Number of frames to generate (overrides fastvideo_args)

required
height

Height of generated video (overrides fastvideo_args)

required
width

Width of generated video (overrides fastvideo_args)

required
fps

Frames per second for saved video (overrides fastvideo_args)

required
seed

Random seed for generation (overrides fastvideo_args)

required
callback

Callback function called after each step

required
callback_steps

Number of steps between each callback

required

Returns:

Type Description
dict[str, Any] | list[dict[str, Any]]

A metadata dictionary for single-prompt generation, or a list of

dict[str, Any] | list[dict[str, Any]]

metadata dictionaries for prompt-file batch generation.

Source code in fastvideo/entrypoints/video_generator.py
def generate_video(
    self,
    prompt: str | None = None,
    sampling_param: SamplingParam | None = None,
    # Action control inputs (Matrix-Game)
    mouse_cond: torch.Tensor | None = None,
    keyboard_cond: torch.Tensor | None = None,
    grid_sizes: tuple[int, int, int] | list[int] | torch.Tensor
    | None = None,
    **kwargs,
) -> dict[str, Any] | list[dict[str, Any]]:
    """
    Generate a video based on the given prompt.

    Args:
        prompt: The prompt to use for generation (optional if prompt_txt is provided)
        negative_prompt: The negative prompt to use (overrides the one in fastvideo_args)
        output_path: Path to save the video (overrides the one in fastvideo_args)
        prompt_path: Path to prompt file
        save_video: Whether to save the video to disk
        return_frames: Whether to include raw frames in the result dict
        num_inference_steps: Number of denoising steps (overrides fastvideo_args)
        guidance_scale: Classifier-free guidance scale (overrides fastvideo_args)
        num_frames: Number of frames to generate (overrides fastvideo_args)
        height: Height of generated video (overrides fastvideo_args)
        width: Width of generated video (overrides fastvideo_args)
        fps: Frames per second for saved video (overrides fastvideo_args)
        seed: Random seed for generation (overrides fastvideo_args)
        callback: Callback function called after each step
        callback_steps: Number of steps between each callback

    Returns:
        A metadata dictionary for single-prompt generation, or a list of
        metadata dictionaries for prompt-file batch generation.
    """
    log_queue = kwargs.pop("log_queue", None)
    warnings.warn(
        "VideoGenerator.generate_video(...) is deprecated; use "
        "VideoGenerator.generate(request=...) instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    if log_queue:
        self.executor.set_log_queue(log_queue)

    try:
        extra_overrides: dict[str, Any] = {}
        for _ek in _BATCH_EXTRA_PASSTHROUGH_KEYS:
            if _ek in kwargs:
                extra_overrides[_ek] = kwargs.pop(_ek)

        request = legacy_generate_call_to_request(
            prompt,
            sampling_param,
            mouse_cond=mouse_cond,
            keyboard_cond=keyboard_cond,
            grid_sizes=grid_sizes,
            legacy_kwargs=kwargs,
        )

        fastvideo_args = self.fastvideo_args
        pipeline_overrides = request_to_pipeline_overrides(request)
        if pipeline_overrides:
            fastvideo_args = deepcopy(self.fastvideo_args)
            for key, value in pipeline_overrides.items():
                if not hasattr(fastvideo_args.pipeline_config, key):
                    raise ValueError(f"Request field {key!r} is not supported by pipeline config overrides")
                setattr(fastvideo_args.pipeline_config, key, deepcopy(value))

        resolved_sampling_param = request_to_sampling_param(
            request,
            model_path=self.fastvideo_args.model_path,
        )
        return self._generate_video_impl(
            prompt=request.prompt,
            sampling_param=resolved_sampling_param,
            fastvideo_args=fastvideo_args,
            **extra_overrides,
        )
    finally:
        if log_queue:
            self.executor.clear_log_queue()
fastvideo.entrypoints.video_generator.VideoGenerator.shutdown
shutdown()

Shutdown the video generator.

Source code in fastvideo/entrypoints/video_generator.py
def shutdown(self):
    """
    Shutdown the video generator.
    """
    self.executor.shutdown()
    del self.executor
fastvideo.entrypoints.video_generator.VideoGenerator.unmerge_lora_weights
unmerge_lora_weights() -> None

Use unmerged weights for inference to produce videos that align with validation videos generated during training.

Source code in fastvideo/entrypoints/video_generator.py
def unmerge_lora_weights(self) -> None:
    """
    Use unmerged weights for inference to produce videos that align with 
    validation videos generated during training.
    """
    self.executor.unmerge_lora_weights()

Functions: