EvalWorker: a single-GPU bag of metric replicas.
One EvalWorker per GPU. Holds an instance of every requested metric on
its own device, scores one sample at a time. Stateless across calls — the
worker never sees more than one evaluate(...) invocation at a time
(the parent :class:Evaluator ensures this by handing the worker to one
thread at a time).
This mirrors FastVideo's Worker layer under :class:VideoGenerator,
but in-process (threads, not processes) — eval metrics are independent
and need no NCCL / TP / SP / distributed init, so process isolation is
unnecessary overhead.
Classes
fastvideo.eval.worker.EvalWorker
EvalWorker(metric_names: list[str], device: str, *, compile: bool = False, pre_upload: bool = True, skip_missing_deps: bool = False)
Owns metric replicas on one device. Single-GPU, single-sample.
Per-sample metrics (is_set_metric=False) return one
:class:MetricResult per evaluate call. Set metrics
(is_set_metric=True) accumulate state on the worker's own
instance and contribute nothing to the per-sample return — the
Evaluator finalizes them after the pool drains.
Source code in fastvideo/eval/worker.py
| def __init__(self,
metric_names: list[str],
device: str,
*,
compile: bool = False,
pre_upload: bool = True,
skip_missing_deps: bool = False) -> None:
self._names = list(metric_names)
self._device = device
self._compile = compile
self._pre_upload = pre_upload
self._skip_missing_deps = skip_missing_deps
self._metrics: dict[str, BaseMetric] = {}
self._unloaded = False
self._load()
|
Attributes
fastvideo.eval.worker.EvalWorker.metric_names
property
Names of the metrics this worker owns, in load order.
Functions
fastvideo.eval.worker.EvalWorker.evaluate
Score one already-decoded sample.
Inputs (video / reference paths) are decoded and
normalized upstream by the :class:VideoPool. The worker
handles the optional pre-upload to its device and dispatches to
each metric's compute or accumulate.
Samples tagged role="reference" are corpus context for set
metrics only — per-sample metrics skip them so a mixed
Evaluator (e.g. FVD + LPIPS + vbench) doesn't produce spurious
per-sample scores on the reference half of the corpus.
metrics (when not None) further restricts dispatch to
that subset of registered names — used by the per-call
Evaluator.evaluate(metrics=...) filter.
Source code in fastvideo/eval/worker.py
| def evaluate(self, *, metrics: list[str] | None = None, **kwargs) -> dict[str, MetricResult]:
"""Score one already-decoded sample.
Inputs (``video`` / ``reference`` paths) are decoded and
normalized upstream by the :class:`VideoPool`. The worker
handles the optional pre-upload to its device and dispatches to
each metric's ``compute`` or ``accumulate``.
Samples tagged ``role="reference"`` are corpus context for set
metrics only — per-sample metrics skip them so a mixed
Evaluator (e.g. FVD + LPIPS + vbench) doesn't produce spurious
per-sample scores on the reference half of the corpus.
``metrics`` (when not ``None``) further restricts dispatch to
that subset of registered names — used by the per-call
``Evaluator.evaluate(metrics=...)`` filter.
"""
if self._unloaded:
raise RuntimeError("EvalWorker was unloaded; call reload() before evaluating.")
sample = dict(kwargs)
# Unwrap ``Video`` → its ``.frames`` tensor for per-sample metric
# consumers (PSNR/SSIM/LPIPS/optical_flow). The pool decodes Video
# instances but leaves the wrapper in place so source-aware metrics
# (audio.imagebind_score) can still read ``Video.source``; those
# metrics should additionally pass ``sample["video_path"]`` if they
# need the path. Per-sample tensor consumers get a tensor either way.
for _k in ("video", "reference"):
_v = sample.get(_k)
if isinstance(_v, Video):
sample[_k] = _v.frames
if self._pre_upload and self._any_needs_gpu:
sample["video"] = _to_device(sample.get("video"), self._device)
if "reference" in sample:
sample["reference"] = _to_device(sample["reference"], self._device)
is_ref = sample.get("role") == "reference"
filter_set: set[str] | None = set(metrics) if metrics is not None else None
results: dict[str, MetricResult] = {}
broken: list[str] = []
for name, m in self._metrics.items():
if filter_set is not None and name not in filter_set:
continue
try:
if m.is_set_metric:
m.accumulate(sample)
elif not is_ref:
results[name] = m.compute(sample)
except (ImportError, ModuleNotFoundError, FileNotFoundError) as e:
# In skip_missing_deps mode, compute/accumulate-time failures
# whose root cause is a missing dependency (lazy import of a
# lib like torchcodec) or a missing resource (model checkpoint
# not on disk) drop the metric for the remainder of this
# Evaluator instead of bringing the whole run down. Strict
# mode re-raises. Programmer bugs, OOM, and other runtime
# failures are intentionally NOT caught here — they surface.
if not self._skip_missing_deps:
raise
import logging
logging.getLogger(__name__).exception("eval: dropping %s after %s: %s", name, type(e).__name__, e)
broken.append(name)
for n in broken:
self._metrics.pop(n, None)
return results
|
fastvideo.eval.worker.EvalWorker.release_cuda_memory
release_cuda_memory() -> None
Free CUDA caches without dropping models.
Source code in fastvideo/eval/worker.py
| def release_cuda_memory(self) -> None:
"""Free CUDA caches without dropping models."""
clear_cache()
if torch.cuda.is_available():
with contextlib.suppress(Exception):
torch.cuda.ipc_collect()
|
fastvideo.eval.worker.EvalWorker.reload
Rebuild metrics dropped by :meth:unload.
Source code in fastvideo/eval/worker.py
| def reload(self) -> None:
"""Rebuild metrics dropped by :meth:`unload`."""
if self._unloaded:
self._load()
|
fastvideo.eval.worker.EvalWorker.reset_set_metrics
reset_set_metrics() -> None
Clear accumulator state on every set metric.
Source code in fastvideo/eval/worker.py
| def reset_set_metrics(self) -> None:
"""Clear accumulator state on every set metric."""
for m in self._metrics.values():
if m.is_set_metric:
m.reset()
|
fastvideo.eval.worker.EvalWorker.set_metrics
Return {name: instance} for set metrics on this worker.
Source code in fastvideo/eval/worker.py
| def set_metrics(self) -> dict[str, BaseMetric]:
"""Return ``{name: instance}`` for set metrics on this worker."""
return {n: m for n, m in self._metrics.items() if m.is_set_metric}
|
fastvideo.eval.worker.EvalWorker.unload
Drop metric refs so models become GC-able. Reverse with reload().
Source code in fastvideo/eval/worker.py
| def unload(self) -> None:
"""Drop metric refs so models become GC-able. Reverse with reload()."""
self._metrics = {}
self._unloaded = True
self.release_cuda_memory()
|
Functions