Skip to content

pool

Async path-→-tensor prefetcher for the Evaluator.

Hides video-decode latency behind metric compute by running a small thread pool of decoders that fill a bounded queue. One :class:VideoPool is owned by the Evaluator per evaluate(samples=...) call; workers consume via :meth:VideoPool.get. Decode order is non-deterministic; each yielded item carries its original input index so consumers can write back into a result list in input order.

Pool sizing: max_size = prefetch_factor * num_workers.

Classes

fastvideo.eval.pool.VideoPool

VideoPool(samples: list[dict], *, loader_threads: int = 1, max_size: int = 4)

Bounded prefetch queue feeding decoded samples to consumers.

Use as a context manager so loader threads are always cleaned up::

with VideoPool(samples, loader_threads=1, max_size=4) as pool:
    while True:
        item = pool.get()
        if item is None:
            break
        idx, decoded = item
        results[idx] = worker.evaluate(**decoded)
Source code in fastvideo/eval/pool.py
def __init__(
    self,
    samples: list[dict],
    *,
    loader_threads: int = 1,
    max_size: int = 4,
) -> None:
    if loader_threads < 1:
        raise ValueError("loader_threads must be >= 1")
    self._samples = samples
    self._loader_threads_n = loader_threads
    self._max_size = max(max_size, 1)

    self._task_q: queue.Queue = queue.Queue()
    self._ready_q: queue.Queue = queue.Queue(maxsize=self._max_size)
    self._loaders: list[threading.Thread] = []
    self._stop = threading.Event()

    self._consumed = 0
    self._consume_lock = threading.Lock()

Functions

fastvideo.eval.pool.VideoPool.get
get() -> tuple[int, dict] | None

Pop the next decoded (idx, sample).

Returns None when all input samples have been consumed. Polls in 0.1 s slices so extra consumer threads (when len(samples) < num_workers) wake up periodically to re-check _consumed and exit cleanly — without the poll, a blocking _ready_q.get(timeout=None) deadlocks because the loaders are already done. Re-raises any exception caught in a loader thread on the consumer's stack so callers don't hang on a dead loader. Thread-safe: multiple consumer threads may share one pool.

Source code in fastvideo/eval/pool.py
def get(self) -> tuple[int, dict] | None:
    """Pop the next decoded ``(idx, sample)``.

    Returns ``None`` when all input samples have been consumed.
    Polls in 0.1 s slices so extra consumer threads (when
    ``len(samples) < num_workers``) wake up periodically to
    re-check ``_consumed`` and exit cleanly — without the poll,
    a blocking ``_ready_q.get(timeout=None)`` deadlocks because
    the loaders are already done. Re-raises any exception caught
    in a loader thread on the consumer's stack so callers don't
    hang on a dead loader. Thread-safe: multiple consumer threads
    may share one pool.
    """
    while True:
        with self._consume_lock:
            if self._consumed >= len(self._samples):
                return None
        try:
            item = self._ready_q.get(timeout=0.1)
        except queue.Empty:
            continue
        with self._consume_lock:
            self._consumed += 1
        idx, payload = item
        if isinstance(payload, _DecodeError):
            raise payload.exc
        return item