Skip to content

registry

Replica registry + health-check loop.

The registry tracks the set of known backend replicas and their live health. The router consults it for "pick a backend for this session" decisions and a background task updates it from periodic HTTP probes.

State machine per replica::

HEALTHY ──(N consecutive failures)──▶ UNHEALTHY
   ▲                                     │
   └──────(M consecutive successes)──────┘

Where N = :attr:RouterConfig.failure_threshold and M = :attr:RouterConfig.recovery_threshold.

Attributes

fastvideo.entrypoints.streaming.router.registry.HttpProbe module-attribute

HttpProbe = Any

Structural alias for health-probe callables. Concrete signature is async def __call__(url: str, *, timeout: float) -> tuple[float, str | None]; typing.Callable cannot express keyword-only parameters, so duck-typing is the pragmatic compromise.

Classes

fastvideo.entrypoints.streaming.router.registry.ReplicaRegistry

ReplicaRegistry(replicas: list[ReplicaEndpoint])

Stateful map of replica URL → :class:Replica.

Selection favors primary replicas when healthy; otherwise the first healthy non-primary is returned. When none are healthy, the registry returns None so the router can reject incoming sessions with gpu_unavailable.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def __init__(self, replicas: list[ReplicaEndpoint]) -> None:
    if not replicas:
        raise ValueError("ReplicaRegistry requires at least one replica")
    self._replicas: dict[str, Replica] = {endpoint.url: Replica(endpoint=endpoint) for endpoint in replicas}
    self._lock = asyncio.Lock()

Functions

fastvideo.entrypoints.streaming.router.registry.ReplicaRegistry.select
select() -> Replica | None

Pick the best healthy replica.

Priority order:

  1. The first healthy primary (insertion order).
  2. The first healthy non-primary (insertion order).
  3. None when nothing is healthy.

This MVP picks the first match within each tier; it does NOT load-balance across multiple healthy replicas of the same tier. Round-robin and weighted distribution are deferred until a real N-way active deployment exists.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def select(self) -> Replica | None:
    """Pick the best healthy replica.

    Priority order:

    1. The first healthy primary (insertion order).
    2. The first healthy non-primary (insertion order).
    3. ``None`` when nothing is healthy.

    This MVP picks the first match within each tier; it does NOT
    load-balance across multiple healthy replicas of the same tier.
    Round-robin and weighted distribution are deferred until a real
    N-way active deployment exists.
    """
    healthy_primaries = [r for r in self._replicas.values() if r.primary and r.is_healthy]
    if healthy_primaries:
        return healthy_primaries[0]
    healthy = [r for r in self._replicas.values() if r.is_healthy]
    if healthy:
        return healthy[0]
    return None

Functions

fastvideo.entrypoints.streaming.router.registry.run_health_check_loop async

run_health_check_loop(registry: ReplicaRegistry, config: RouterConfig, *, stop_event: Event, http_get: HttpProbe | None = None) -> None

Poll all replicas' health endpoints in parallel on a fixed interval.

http_get is pluggable so unit tests can inject a deterministic probe without hitting the network. The default builds a single httpx.AsyncClient shared across the loop's lifetime so the common case (steady polling against a stable replica set) reuses TCP/TLS connections instead of paying handshake cost per probe.

Probes within one polling cycle run concurrently via asyncio.gather so a slow replica doesn't push the cycle past health_check_interval_seconds.

Source code in fastvideo/entrypoints/streaming/router/registry.py
async def run_health_check_loop(
    registry: ReplicaRegistry,
    config: RouterConfig,
    *,
    stop_event: asyncio.Event,
    http_get: HttpProbe | None = None,
) -> None:
    """Poll all replicas' health endpoints in parallel on a fixed interval.

    ``http_get`` is pluggable so unit tests can inject a deterministic
    probe without hitting the network. The default builds a single
    ``httpx.AsyncClient`` shared across the loop's lifetime so the
    common case (steady polling against a stable replica set) reuses
    TCP/TLS connections instead of paying handshake cost per probe.

    Probes within one polling cycle run concurrently via ``asyncio.gather``
    so a slow replica doesn't push the cycle past
    ``health_check_interval_seconds``.
    """
    if http_get is not None:
        await _run_loop(registry, config, stop_event, http_get)
        return
    async with _build_default_probe(config) as probe:
        await _run_loop(registry, config, stop_event, probe)