Skip to content

worker

EvalWorker: a single-GPU bag of metric replicas.

One EvalWorker per GPU. Holds an instance of every requested metric on its own device, scores one sample at a time. Stateless across calls — the worker never sees more than one evaluate(...) invocation at a time (the parent :class:Evaluator ensures this by handing the worker to one thread at a time).

This mirrors FastVideo's Worker layer under :class:VideoGenerator, but in-process (threads, not processes) — eval metrics are independent and need no NCCL / TP / SP / distributed init, so process isolation is unnecessary overhead.

Classes

fastvideo.eval.worker.EvalWorker

EvalWorker(metric_names: list[str], device: str, *, compile: bool = False, pre_upload: bool = True, skip_missing_deps: bool = False)

Owns metric replicas on one device. Single-GPU, single-sample.

Per-sample metrics (is_set_metric=False) return one :class:MetricResult per evaluate call. Set metrics (is_set_metric=True) accumulate state on the worker's own instance and contribute nothing to the per-sample return — the Evaluator finalizes them after the pool drains.

Source code in fastvideo/eval/worker.py
def __init__(self,
             metric_names: list[str],
             device: str,
             *,
             compile: bool = False,
             pre_upload: bool = True,
             skip_missing_deps: bool = False) -> None:
    self._names = list(metric_names)
    self._device = device
    self._compile = compile
    self._pre_upload = pre_upload
    self._skip_missing_deps = skip_missing_deps
    self._metrics: dict[str, BaseMetric] = {}
    self._unloaded = False
    self._load()

Attributes

fastvideo.eval.worker.EvalWorker.metric_names property
metric_names: list[str]

Names of the metrics this worker owns, in load order.

Functions

fastvideo.eval.worker.EvalWorker.evaluate
evaluate(*, metrics: list[str] | None = None, **kwargs) -> dict[str, MetricResult]

Score one already-decoded sample.

Inputs (video / reference paths) are decoded and normalized upstream by the :class:VideoPool. The worker handles the optional pre-upload to its device and dispatches to each metric's compute or accumulate.

Samples tagged role="reference" are corpus context for set metrics only — per-sample metrics skip them so a mixed Evaluator (e.g. FVD + LPIPS + vbench) doesn't produce spurious per-sample scores on the reference half of the corpus.

metrics (when not None) further restricts dispatch to that subset of registered names — used by the per-call Evaluator.evaluate(metrics=...) filter.

Source code in fastvideo/eval/worker.py
def evaluate(self, *, metrics: list[str] | None = None, **kwargs) -> dict[str, MetricResult]:
    """Score one already-decoded sample.

    Inputs (``video`` / ``reference`` paths) are decoded and
    normalized upstream by the :class:`VideoPool`. The worker
    handles the optional pre-upload to its device and dispatches to
    each metric's ``compute`` or ``accumulate``.

    Samples tagged ``role="reference"`` are corpus context for set
    metrics only — per-sample metrics skip them so a mixed
    Evaluator (e.g. FVD + LPIPS + vbench) doesn't produce spurious
    per-sample scores on the reference half of the corpus.

    ``metrics`` (when not ``None``) further restricts dispatch to
    that subset of registered names — used by the per-call
    ``Evaluator.evaluate(metrics=...)`` filter.
    """
    if self._unloaded:
        raise RuntimeError("EvalWorker was unloaded; call reload() before evaluating.")

    sample = dict(kwargs)
    # Unwrap ``Video`` → its ``.frames`` tensor for per-sample metric
    # consumers (PSNR/SSIM/LPIPS/optical_flow). The pool decodes Video
    # instances but leaves the wrapper in place so source-aware metrics
    # (audio.imagebind_score) can still read ``Video.source``; those
    # metrics should additionally pass ``sample["video_path"]`` if they
    # need the path. Per-sample tensor consumers get a tensor either way.
    for _k in ("video", "reference"):
        _v = sample.get(_k)
        if isinstance(_v, Video):
            sample[_k] = _v.frames
    if self._pre_upload and self._any_needs_gpu:
        sample["video"] = _to_device(sample.get("video"), self._device)
        if "reference" in sample:
            sample["reference"] = _to_device(sample["reference"], self._device)

    is_ref = sample.get("role") == "reference"
    filter_set: set[str] | None = set(metrics) if metrics is not None else None
    results: dict[str, MetricResult] = {}
    broken: list[str] = []
    for name, m in self._metrics.items():
        if filter_set is not None and name not in filter_set:
            continue
        try:
            if m.is_set_metric:
                m.accumulate(sample)
            elif not is_ref:
                results[name] = m.compute(sample)
        except (ImportError, ModuleNotFoundError, FileNotFoundError) as e:
            # In skip_missing_deps mode, compute/accumulate-time failures
            # whose root cause is a missing dependency (lazy import of a
            # lib like torchcodec) or a missing resource (model checkpoint
            # not on disk) drop the metric for the remainder of this
            # Evaluator instead of bringing the whole run down. Strict
            # mode re-raises. Programmer bugs, OOM, and other runtime
            # failures are intentionally NOT caught here — they surface.
            if not self._skip_missing_deps:
                raise
            import logging
            logging.getLogger(__name__).exception("eval: dropping %s after %s: %s", name, type(e).__name__, e)
            broken.append(name)
    for n in broken:
        self._metrics.pop(n, None)
    return results
fastvideo.eval.worker.EvalWorker.release_cuda_memory
release_cuda_memory() -> None

Free CUDA caches without dropping models.

Source code in fastvideo/eval/worker.py
def release_cuda_memory(self) -> None:
    """Free CUDA caches without dropping models."""
    clear_cache()
    if torch.cuda.is_available():
        with contextlib.suppress(Exception):
            torch.cuda.ipc_collect()
fastvideo.eval.worker.EvalWorker.reload
reload() -> None

Rebuild metrics dropped by :meth:unload.

Source code in fastvideo/eval/worker.py
def reload(self) -> None:
    """Rebuild metrics dropped by :meth:`unload`."""
    if self._unloaded:
        self._load()
fastvideo.eval.worker.EvalWorker.reset_set_metrics
reset_set_metrics() -> None

Clear accumulator state on every set metric.

Source code in fastvideo/eval/worker.py
def reset_set_metrics(self) -> None:
    """Clear accumulator state on every set metric."""
    for m in self._metrics.values():
        if m.is_set_metric:
            m.reset()
fastvideo.eval.worker.EvalWorker.set_metrics
set_metrics() -> dict[str, BaseMetric]

Return {name: instance} for set metrics on this worker.

Source code in fastvideo/eval/worker.py
def set_metrics(self) -> dict[str, BaseMetric]:
    """Return ``{name: instance}`` for set metrics on this worker."""
    return {n: m for n, m in self._metrics.items() if m.is_set_metric}
fastvideo.eval.worker.EvalWorker.unload
unload() -> None

Drop metric refs so models become GC-able. Reverse with reload().

Source code in fastvideo/eval/worker.py
def unload(self) -> None:
    """Drop metric refs so models become GC-able. Reverse with reload()."""
    self._metrics = {}
    self._unloaded = True
    self.release_cuda_memory()

Functions