Skip to content

worker

Per-GPU worker subprocess entry for :class:SubprocessGpuPool.

The pool manages binding, lifecycle, and message dispatch in the parent process. The worker constructs its :class:VideoGenerator from a typed :class:GeneratorConfig, runs the two-segment warmup so both initial-segment and continuation-branch compile graphs are hot, and then loops on the job queue.

Functions

fastvideo.entrypoints.streaming.worker.worker_main

worker_main(*, gpu_id: int, worker_id: str, generator_config: GeneratorConfig, warmup_config: WarmupConfig, job_queue: Queue, result_queue: Queue, shutdown_event: Any) -> None

Per-worker subprocess entry.

Runs inside the child spawned by SubprocessGpuPool. Blocking VideoGenerator construction + generation happens here, not in the parent's event loop.

Source code in fastvideo/entrypoints/streaming/worker.py
def worker_main(
    *,
    gpu_id: int,
    worker_id: str,
    generator_config: GeneratorConfig,
    warmup_config: WarmupConfig,
    job_queue: mp.Queue,
    result_queue: mp.Queue,
    shutdown_event: Any,
) -> None:  # pragma: no cover - exercised via integration only
    """Per-worker subprocess entry.

    Runs inside the child spawned by ``SubprocessGpuPool``. Blocking
    ``VideoGenerator`` construction + generation happens here, not in
    the parent's event loop.
    """
    import os

    os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
    try:
        from fastvideo import VideoGenerator

        generator = VideoGenerator.from_pretrained(config=generator_config)
        if warmup_config.enabled:
            _warmup_worker(generator, warmup_config)
        result_queue.put({"kind": "ready", "worker_id": worker_id})
    except Exception as exc:
        result_queue.put({"kind": "error", "error": repr(exc)})
        return

    while not shutdown_event.is_set():
        try:
            item = job_queue.get(timeout=0.5)
        except queue.Empty:
            continue
        if item is None:
            break
        job_id = item["job_id"]
        request = item["request"]
        try:
            result = generator.generate(request)
            result_queue.put({
                "kind": "result",
                "job_id": job_id,
                "result": result,
            })
        except Exception as exc:
            result_queue.put({
                "kind": "error",
                "job_id": job_id,
                "error": repr(exc),
            })