Build a router exposing streaming liveness/readiness endpoints.
pool may be either a concrete pool or a zero-argument callable that
returns the current pool. The callable form lets product servers keep their
own lifespan-managed runtime singleton without adding public global state.
Source code in fastvideo/entrypoints/streaming/health.py
| def build_health_router(pool: PoolRef = None) -> APIRouter:
"""Build a router exposing streaming liveness/readiness endpoints.
``pool`` may be either a concrete pool or a zero-argument callable that
returns the current pool. The callable form lets product servers keep their
own lifespan-managed runtime singleton without adding public global state.
"""
router = APIRouter()
@router.get("/health")
@router.get("/healthz")
async def get_healthz() -> dict[str, Any]:
"""Liveness probe for process-level health."""
return {
"status": "ok",
"service": SERVICE_NAME,
"ts": _utc_now_iso(),
}
@router.get("/readyz")
async def get_readyz() -> dict[str, Any]:
"""Readiness probe for router/load-balancer health checks."""
status_payload = await get_pool_status(pool)
ready_workers = _ready_worker_count(status_payload)
return {
"status": "ready" if ready_workers > 0 else "warming",
"service": SERVICE_NAME,
"ready_gpu_workers": ready_workers,
"total_gpus": _as_int(status_payload.get("total_gpus")),
"available_gpus": _as_int(status_payload.get("available_gpus")),
"warmup_successful_gpus": _as_int(status_payload.get("warmup_successful_gpus")),
"warmup_failed_gpus": _as_int(status_payload.get("warmup_failed_gpus")),
"queue_size": _as_int(status_payload.get("queue_size")),
"ts": _utc_now_iso(),
}
@router.get("/status")
async def get_status() -> dict[str, Any]:
"""Get the current status of the GPU pool."""
return await get_pool_status(pool)
return router
|