Skip to content

gpu_pool

GPU pool manager for the streaming server.

Replaces the single-generator path in PR 7.5 with a typed pool abstraction. Three implementations ship here:

  • :class:InProcessGpuPool — one in-process VideoGenerator; used by tests and single-GPU dev deployments.
  • :class:SubprocessGpuPool — one multiprocessing.Process per GPU, each running :func:worker_main against a GeneratorConfig. Jobs are dispatched via multiprocessing.Queue.
  • :class:GpuPool (abstract) — the interface both use.

Session-to-GPU binding lives in the pool so continuation state stays on the GPU that generated the previous segment (matching the internal gpu_pool.py's per-GPU cache behavior). Cross-GPU handoff is supported via :class:SessionStore snapshot + hydrate, which serializes the state before the migration and rehydrates it on the new worker.

Typed config: workers start from a :class:GeneratorConfig (no flat LTX-2 kwargs), satisfying the PR 6 + PR 7 contracts that the public surface doesn't reintroduce the legacy kwarg bag.

Classes

fastvideo.entrypoints.streaming.gpu_pool.GpuPool

Bases: ABC

Abstract GPU pool.

acquire binds a session to a worker and holds that binding across segments so continuation state can stay hot. run submits a single GenerationRequest for a bound session.

Acquire / release are independent of run — a session can run many segments on one acquired worker, and must release on disconnect.

fastvideo.entrypoints.streaming.gpu_pool.InProcessGpuPool

InProcessGpuPool(generator: _GeneratorLike, *, gpu_id: int = 0, session_store: SessionStore | None = None)

Bases: GpuPool

Single-process pool backed by one :class:_GeneratorLike.

This is what PR 7.5's server uses by default; PR 7.6 adds the real SubprocessGpuPool alternative but keeps this one for tests and small deployments.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
def __init__(
    self,
    generator: _GeneratorLike,
    *,
    gpu_id: int = 0,
    session_store: SessionStore | None = None,
) -> None:
    self._generator = generator
    self._gpu_id = gpu_id
    self._worker_id = f"inproc-{uuid.uuid4().hex[:6]}"
    self._session_store = session_store or InMemorySessionStore()
    self._active: dict[str, PoolAssignment] = {}
    self._lock = asyncio.Lock()
    self._gen_lock = asyncio.Lock()

fastvideo.entrypoints.streaming.gpu_pool.PoolAcquireTimeout

Bases: RuntimeError

Raised when acquire times out waiting for a free worker.

fastvideo.entrypoints.streaming.gpu_pool.PoolAssignment dataclass

PoolAssignment(gpu_id: int, worker_id: str, pinned_at: float = monotonic())

The worker a session is currently bound to.

fastvideo.entrypoints.streaming.gpu_pool.SubprocessGpuPool

SubprocessGpuPool(generator_config: GeneratorConfig, *, pool_config: GpuPoolConfig, warmup_config: WarmupConfig | None = None, session_store: SessionStore | None = None, worker_factory: WorkerFactory | None = None)

Bases: GpuPool

One multiprocessing.Process per GPU.

Each worker boots :class:fastvideo.VideoGenerator from a typed :class:GeneratorConfig inside the child process (post- CUDA_VISIBLE_DEVICES setup) and consumes jobs from an mp Queue.

This is the production shape: the parent process stays CPU-only, and GPU state never crosses process boundaries. Continuation state is serialized through :class:SessionStore for cross-GPU handoff.

PR 7.6 ships this as an opt-in; PR 7.5's in-process pool remains the default until nightly runs validate the subprocess path.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
def __init__(
    self,
    generator_config: GeneratorConfig,
    *,
    pool_config: GpuPoolConfig,
    warmup_config: WarmupConfig | None = None,
    session_store: SessionStore | None = None,
    worker_factory: WorkerFactory | None = None,
) -> None:
    self._generator_config = generator_config
    self._pool_config = pool_config
    self._warmup_config = warmup_config or WarmupConfig()
    self._session_store = session_store or InMemorySessionStore()
    self._worker_factory = worker_factory or _default_worker_factory
    self._workers: list[_WorkerHandle] = []
    self._available: asyncio.Queue[int] = asyncio.Queue()
    self._assignments: dict[str, PoolAssignment] = {}
    self._worker_by_id: dict[str, _WorkerHandle] = {}
    self._pending: dict[str, _PendingJob] = {}
    self._lock = asyncio.Lock()
    self._result_reader_tasks: list[asyncio.Task] = []

Functions

fastvideo.entrypoints.streaming.gpu_pool.SubprocessGpuPool.start async
start() -> None

Spawn worker processes and wait for each to report ready.

Source code in fastvideo/entrypoints/streaming/gpu_pool.py
async def start(self) -> None:
    """Spawn worker processes and wait for each to report ready."""
    num_workers = self._pool_config.num_workers or 1
    for gpu_id in range(num_workers):
        handle = self._worker_factory(
            gpu_id=gpu_id,
            generator_config=self._generator_config,
            warmup_config=self._warmup_config,
        )
        self._workers.append(handle)
        self._worker_by_id[handle.worker_id] = handle

    # Wait for each worker's ready event in a thread to avoid
    # blocking the event loop.
    loop = asyncio.get_running_loop()
    await asyncio.gather(*[
        loop.run_in_executor(None, handle.ready.wait, self._warmup_config.timeout_seconds)
        for handle in self._workers
    ])

    # Start background result readers — one task per worker
    # drains its result queue and resolves futures in _pending.
    for handle in self._workers:
        task = asyncio.create_task(self._drain_results(handle))
        self._result_reader_tasks.append(task)

    # Only admit workers that successfully booted. Anything that
    # failed boot (timeout, crash, error sentinel) stays out of the
    # available queue so we never assign a session to it.
    for idx, handle in enumerate(self._workers):
        if handle.boot_ok.is_set():
            await self._available.put(idx)
        else:
            logger.error(
                "pool: worker %s failed to boot; skipping",
                handle.worker_id,
            )

Functions

fastvideo.entrypoints.streaming.gpu_pool.worker_main

worker_main(*, gpu_id: int, worker_id: str, generator_config: GeneratorConfig, warmup_config: WarmupConfig, job_queue: Queue, result_queue: Queue, shutdown_event: Any) -> None

Per-worker subprocess entry.

Runs inside the child spawned by SubprocessGpuPool. Blocking VideoGenerator construction + generation happens here, not in the parent's event loop.

Source code in fastvideo/entrypoints/streaming/worker.py
def worker_main(
    *,
    gpu_id: int,
    worker_id: str,
    generator_config: GeneratorConfig,
    warmup_config: WarmupConfig,
    job_queue: mp.Queue,
    result_queue: mp.Queue,
    shutdown_event: Any,
) -> None:  # pragma: no cover - exercised via integration only
    """Per-worker subprocess entry.

    Runs inside the child spawned by ``SubprocessGpuPool``. Blocking
    ``VideoGenerator`` construction + generation happens here, not in
    the parent's event loop.
    """
    import os

    os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
    try:
        from fastvideo import VideoGenerator

        generator = VideoGenerator.from_pretrained(config=generator_config)
        if warmup_config.enabled:
            _warmup_worker(generator, warmup_config)
        result_queue.put({"kind": "ready", "worker_id": worker_id})
    except Exception as exc:
        result_queue.put({"kind": "error", "error": repr(exc)})
        return

    while not shutdown_event.is_set():
        try:
            item = job_queue.get(timeout=0.5)
        except queue.Empty:
            continue
        if item is None:
            break
        job_id = item["job_id"]
        request = item["request"]
        try:
            result = generator.generate(request)
            result_queue.put({
                "kind": "result",
                "job_id": job_id,
                "result": result,
            })
        except Exception as exc:
            result_queue.put({
                "kind": "error",
                "job_id": job_id,
                "error": repr(exc),
            })