Skip to content

router

Multi-replica load balancer + WebSocket proxy for the streaming server.

Sits in front of one-or-more streaming-server replicas and forwards WebSocket sessions to a healthy primary, with failover to secondaries. Kept in-repo under fastvideo/entrypoints/streaming/router/ per the PR plan's default; the alternative (separate package) is an open question deferred to review.

Classes

fastvideo.entrypoints.streaming.router.ReplicaRegistry

ReplicaRegistry(replicas: list[ReplicaEndpoint])

Stateful map of replica URL → :class:Replica.

Selection favors primary replicas when healthy; otherwise the first healthy non-primary is returned. When none are healthy, the registry returns None so the router can reject incoming sessions with gpu_unavailable.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def __init__(self, replicas: list[ReplicaEndpoint]) -> None:
    if not replicas:
        raise ValueError("ReplicaRegistry requires at least one replica")
    self._replicas: dict[str, Replica] = {endpoint.url: Replica(endpoint=endpoint) for endpoint in replicas}
    self._lock = asyncio.Lock()

Functions

fastvideo.entrypoints.streaming.router.ReplicaRegistry.select
select() -> Replica | None

Pick the best healthy replica.

Priority order:

  1. The first healthy primary (insertion order).
  2. The first healthy non-primary (insertion order).
  3. None when nothing is healthy.

This MVP picks the first match within each tier; it does NOT load-balance across multiple healthy replicas of the same tier. Round-robin and weighted distribution are deferred until a real N-way active deployment exists.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def select(self) -> Replica | None:
    """Pick the best healthy replica.

    Priority order:

    1. The first healthy primary (insertion order).
    2. The first healthy non-primary (insertion order).
    3. ``None`` when nothing is healthy.

    This MVP picks the first match within each tier; it does NOT
    load-balance across multiple healthy replicas of the same tier.
    Round-robin and weighted distribution are deferred until a real
    N-way active deployment exists.
    """
    healthy_primaries = [r for r in self._replicas.values() if r.primary and r.is_healthy]
    if healthy_primaries:
        return healthy_primaries[0]
    healthy = [r for r in self._replicas.values() if r.is_healthy]
    if healthy:
        return healthy[0]
    return None

fastvideo.entrypoints.streaming.router.RouterConfig dataclass

RouterConfig(host: str = '0.0.0.0', port: int = 9000, replicas: list[ReplicaEndpoint] = list(), health_check_path: str = '/health', health_check_interval_seconds: float = 5.0, health_check_timeout_seconds: float = 2.0, failure_threshold: int = 3, recovery_threshold: int = 2)

Typed router config loaded from a YAML file.

Example::

router:
  host: 0.0.0.0
  port: 9000
  replicas:
    - url: http://streamer-a:8000
      primary: true
    - url: http://streamer-b:8000
  health_check:
    path: /health
    interval_seconds: 5
    failure_threshold: 3

Validation runs in __post_init__: empty replicas, non-positive intervals/timeouts, thresholds < 1, non-http(s) URLs, and more than one primary all raise ValueError so misconfigurations surface at load time rather than as confusing runtime failures.

Functions

fastvideo.entrypoints.streaming.router.build_router_app

build_router_app(config: RouterConfig, *, registry: ReplicaRegistry | None = None) -> FastAPI

Build the router FastAPI app.

registry can be injected for tests; defaults to one built from config.replicas.

Source code in fastvideo/entrypoints/streaming/router/main.py
def build_router_app(
    config: RouterConfig,
    *,
    registry: ReplicaRegistry | None = None,
) -> FastAPI:
    """Build the router FastAPI app.

    ``registry`` can be injected for tests; defaults to one built from
    ``config.replicas``.
    """
    registry = registry or ReplicaRegistry(config.replicas)
    state = _RouterState(
        config=config,
        registry=registry,
        stop_event=asyncio.Event(),
    )

    @contextlib.asynccontextmanager
    async def _lifespan(_app: FastAPI):
        state.health_task = asyncio.create_task(
            run_health_check_loop(
                registry=state.registry,
                config=state.config,
                stop_event=state.stop_event,
            ))
        try:
            yield
        finally:
            state.stop_event.set()
            if state.health_task is not None:
                with contextlib.suppress(asyncio.CancelledError):
                    await state.health_task

    app = FastAPI(title="FastVideo Streaming Router", lifespan=_lifespan)

    @app.get("/status")
    async def _status() -> JSONResponse:
        return JSONResponse({
            "replicas": [{
                "url": r.url,
                "primary": r.primary,
                "status": r.health.status.value,
                "last_ok_at": r.health.last_ok_at,
                "last_latency_ms": r.health.last_latency_ms,
                "consecutive_failures": r.health.consecutive_failures,
            } for r in state.registry.all()],
        })

    @app.websocket("/v1/stream")
    async def _proxy(websocket: WebSocket) -> None:
        await websocket.accept()
        replica = state.registry.select()
        if replica is None:
            await websocket.send_json({
                "type": "error",
                "code": "gpu_unavailable",
                "message": "router: no healthy replica available",
                "retryable": True,
            })
            await websocket.close(code=1013, reason="no_healthy_replica")
            return

        ws_url = _websocket_url_for(replica.url)
        try:
            await _bridge_session(websocket, ws_url)
        except WebSocketDisconnect:
            logger.info("router: client disconnected")
        except Exception as exc:
            logger.exception("router: bridge failed: %s", exc)
            with contextlib.suppress(RuntimeError):
                await websocket.send_json({
                    "type": "error",
                    "code": "worker_failed",
                    "message": f"router bridge failed: {exc}",
                    "retryable": True,
                })
            with contextlib.suppress(RuntimeError):
                await websocket.close(code=1011)

    app.state.router_state = state
    return app

Modules

fastvideo.entrypoints.streaming.router.config

Typed router configuration.

Classes

fastvideo.entrypoints.streaming.router.config.ReplicaEndpoint dataclass
ReplicaEndpoint(url: str, name: str | None = None, primary: bool = False, weight: float = 1.0)

One backend replica the router can route to.

Attributes
fastvideo.entrypoints.streaming.router.config.ReplicaEndpoint.primary class-attribute instance-attribute
primary: bool = False

True = prefer this replica over others in steady state.

fastvideo.entrypoints.streaming.router.config.ReplicaEndpoint.url instance-attribute
url: str

HTTP base URL, e.g. http://host:8000. WebSocket URL is derived automatically by replacing the scheme.

fastvideo.entrypoints.streaming.router.config.RouterConfig dataclass
RouterConfig(host: str = '0.0.0.0', port: int = 9000, replicas: list[ReplicaEndpoint] = list(), health_check_path: str = '/health', health_check_interval_seconds: float = 5.0, health_check_timeout_seconds: float = 2.0, failure_threshold: int = 3, recovery_threshold: int = 2)

Typed router config loaded from a YAML file.

Example::

router:
  host: 0.0.0.0
  port: 9000
  replicas:
    - url: http://streamer-a:8000
      primary: true
    - url: http://streamer-b:8000
  health_check:
    path: /health
    interval_seconds: 5
    failure_threshold: 3

Validation runs in __post_init__: empty replicas, non-positive intervals/timeouts, thresholds < 1, non-http(s) URLs, and more than one primary all raise ValueError so misconfigurations surface at load time rather than as confusing runtime failures.

fastvideo.entrypoints.streaming.router.main

Router FastAPI entry point.

Exposes the same /v1/stream WebSocket path the backend servers do, accepts a client, picks a healthy replica from the registry, and proxies frames bidirectionally.

PR 7.9 ships the minimum-viable shape: explicit replica list, single primary, JSON + binary passthrough in both directions, and a /status endpoint for operators. Sticky-session routing (so a reconnect lands on the same backend) is left for a follow-up.

Classes

Functions

fastvideo.entrypoints.streaming.router.main.build_router_app
build_router_app(config: RouterConfig, *, registry: ReplicaRegistry | None = None) -> FastAPI

Build the router FastAPI app.

registry can be injected for tests; defaults to one built from config.replicas.

Source code in fastvideo/entrypoints/streaming/router/main.py
def build_router_app(
    config: RouterConfig,
    *,
    registry: ReplicaRegistry | None = None,
) -> FastAPI:
    """Build the router FastAPI app.

    ``registry`` can be injected for tests; defaults to one built from
    ``config.replicas``.
    """
    registry = registry or ReplicaRegistry(config.replicas)
    state = _RouterState(
        config=config,
        registry=registry,
        stop_event=asyncio.Event(),
    )

    @contextlib.asynccontextmanager
    async def _lifespan(_app: FastAPI):
        state.health_task = asyncio.create_task(
            run_health_check_loop(
                registry=state.registry,
                config=state.config,
                stop_event=state.stop_event,
            ))
        try:
            yield
        finally:
            state.stop_event.set()
            if state.health_task is not None:
                with contextlib.suppress(asyncio.CancelledError):
                    await state.health_task

    app = FastAPI(title="FastVideo Streaming Router", lifespan=_lifespan)

    @app.get("/status")
    async def _status() -> JSONResponse:
        return JSONResponse({
            "replicas": [{
                "url": r.url,
                "primary": r.primary,
                "status": r.health.status.value,
                "last_ok_at": r.health.last_ok_at,
                "last_latency_ms": r.health.last_latency_ms,
                "consecutive_failures": r.health.consecutive_failures,
            } for r in state.registry.all()],
        })

    @app.websocket("/v1/stream")
    async def _proxy(websocket: WebSocket) -> None:
        await websocket.accept()
        replica = state.registry.select()
        if replica is None:
            await websocket.send_json({
                "type": "error",
                "code": "gpu_unavailable",
                "message": "router: no healthy replica available",
                "retryable": True,
            })
            await websocket.close(code=1013, reason="no_healthy_replica")
            return

        ws_url = _websocket_url_for(replica.url)
        try:
            await _bridge_session(websocket, ws_url)
        except WebSocketDisconnect:
            logger.info("router: client disconnected")
        except Exception as exc:
            logger.exception("router: bridge failed: %s", exc)
            with contextlib.suppress(RuntimeError):
                await websocket.send_json({
                    "type": "error",
                    "code": "worker_failed",
                    "message": f"router bridge failed: {exc}",
                    "retryable": True,
                })
            with contextlib.suppress(RuntimeError):
                await websocket.close(code=1011)

    app.state.router_state = state
    return app

fastvideo.entrypoints.streaming.router.registry

Replica registry + health-check loop.

The registry tracks the set of known backend replicas and their live health. The router consults it for "pick a backend for this session" decisions and a background task updates it from periodic HTTP probes.

State machine per replica::

HEALTHY ──(N consecutive failures)──▶ UNHEALTHY
   ▲                                     │
   └──────(M consecutive successes)──────┘

Where N = :attr:RouterConfig.failure_threshold and M = :attr:RouterConfig.recovery_threshold.

Attributes

fastvideo.entrypoints.streaming.router.registry.HttpProbe module-attribute
HttpProbe = Any

Structural alias for health-probe callables. Concrete signature is async def __call__(url: str, *, timeout: float) -> tuple[float, str | None]; typing.Callable cannot express keyword-only parameters, so duck-typing is the pragmatic compromise.

Classes

fastvideo.entrypoints.streaming.router.registry.ReplicaRegistry
ReplicaRegistry(replicas: list[ReplicaEndpoint])

Stateful map of replica URL → :class:Replica.

Selection favors primary replicas when healthy; otherwise the first healthy non-primary is returned. When none are healthy, the registry returns None so the router can reject incoming sessions with gpu_unavailable.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def __init__(self, replicas: list[ReplicaEndpoint]) -> None:
    if not replicas:
        raise ValueError("ReplicaRegistry requires at least one replica")
    self._replicas: dict[str, Replica] = {endpoint.url: Replica(endpoint=endpoint) for endpoint in replicas}
    self._lock = asyncio.Lock()
Functions
fastvideo.entrypoints.streaming.router.registry.ReplicaRegistry.select
select() -> Replica | None

Pick the best healthy replica.

Priority order:

  1. The first healthy primary (insertion order).
  2. The first healthy non-primary (insertion order).
  3. None when nothing is healthy.

This MVP picks the first match within each tier; it does NOT load-balance across multiple healthy replicas of the same tier. Round-robin and weighted distribution are deferred until a real N-way active deployment exists.

Source code in fastvideo/entrypoints/streaming/router/registry.py
def select(self) -> Replica | None:
    """Pick the best healthy replica.

    Priority order:

    1. The first healthy primary (insertion order).
    2. The first healthy non-primary (insertion order).
    3. ``None`` when nothing is healthy.

    This MVP picks the first match within each tier; it does NOT
    load-balance across multiple healthy replicas of the same tier.
    Round-robin and weighted distribution are deferred until a real
    N-way active deployment exists.
    """
    healthy_primaries = [r for r in self._replicas.values() if r.primary and r.is_healthy]
    if healthy_primaries:
        return healthy_primaries[0]
    healthy = [r for r in self._replicas.values() if r.is_healthy]
    if healthy:
        return healthy[0]
    return None

Functions

fastvideo.entrypoints.streaming.router.registry.run_health_check_loop async
run_health_check_loop(registry: ReplicaRegistry, config: RouterConfig, *, stop_event: Event, http_get: HttpProbe | None = None) -> None

Poll all replicas' health endpoints in parallel on a fixed interval.

http_get is pluggable so unit tests can inject a deterministic probe without hitting the network. The default builds a single httpx.AsyncClient shared across the loop's lifetime so the common case (steady polling against a stable replica set) reuses TCP/TLS connections instead of paying handshake cost per probe.

Probes within one polling cycle run concurrently via asyncio.gather so a slow replica doesn't push the cycle past health_check_interval_seconds.

Source code in fastvideo/entrypoints/streaming/router/registry.py
async def run_health_check_loop(
    registry: ReplicaRegistry,
    config: RouterConfig,
    *,
    stop_event: asyncio.Event,
    http_get: HttpProbe | None = None,
) -> None:
    """Poll all replicas' health endpoints in parallel on a fixed interval.

    ``http_get`` is pluggable so unit tests can inject a deterministic
    probe without hitting the network. The default builds a single
    ``httpx.AsyncClient`` shared across the loop's lifetime so the
    common case (steady polling against a stable replica set) reuses
    TCP/TLS connections instead of paying handshake cost per probe.

    Probes within one polling cycle run concurrently via ``asyncio.gather``
    so a slow replica doesn't push the cycle past
    ``health_check_interval_seconds``.
    """
    if http_get is not None:
        await _run_loop(registry, config, stop_event, http_get)
        return
    async with _build_default_probe(config) as probe:
        await _run_loop(registry, config, stop_event, probe)