Skip to content

session

Per-connection session lifecycle for the streaming server.

Each WebSocket opens exactly one :class:Session. :class:SessionManager enforces the generation_segment_cap and session_timeout_seconds budgets from :class:fastvideo.api.StreamingConfig.

Classes

fastvideo.entrypoints.streaming.session.InvalidSessionTransition

Bases: RuntimeError

Raised when a session is asked to transition along an illegal edge.

fastvideo.entrypoints.streaming.session.Session dataclass

Session(id: str = (lambda: hex)(), state: SessionState = INITIALIZING, created_at: float = monotonic(), last_activity: float = monotonic(), client_id: str | None = None, preset: str | None = None, preset_label: str | None = None, curated_prompts: list[str] = list(), segment_idx: int = 0, enhancement_enabled: bool = False, auto_extension_enabled: bool = False, loop_generation_enabled: bool = False, single_clip_mode: bool = False, generation_paused: bool = False, stream_mode: str = 'av_fmp4', gpu_id: int | None = None, continuation_state: ContinuationState | None = None, metadata: dict[str, Any] = dict())

Functions

fastvideo.entrypoints.streaming.session.Session.transition
transition(target: SessionState) -> None

Move to target if the edge is allowed.

Raises :class:InvalidSessionTransition on illegal moves. The self-loop on ACTIVE is legal so the server can re-assert ACTIVE on segment completion without special casing.

Source code in fastvideo/entrypoints/streaming/session.py
def transition(self, target: SessionState) -> None:
    """Move to ``target`` if the edge is allowed.

    Raises :class:`InvalidSessionTransition` on illegal moves. The
    self-loop on ``ACTIVE`` is legal so the server can re-assert
    ACTIVE on segment completion without special casing.
    """
    allowed = _VALID_TRANSITIONS.get(self.state, frozenset())
    if target not in allowed and target is not self.state:
        raise InvalidSessionTransition(f"{self.state.value} -> {target.value} is not a valid "
                                       f"session transition")
    self.state = target
    self.last_activity = time.monotonic()

fastvideo.entrypoints.streaming.session.SessionManager

SessionManager(*, segment_cap: int, session_timeout_seconds: int, max_sessions: int = 1)

Registers sessions and enforces per-server session limits.

Source code in fastvideo/entrypoints/streaming/session.py
def __init__(
    self,
    *,
    segment_cap: int,
    session_timeout_seconds: int,
    max_sessions: int = 1,
) -> None:
    self._segment_cap = segment_cap
    self._session_timeout_seconds = session_timeout_seconds
    self._max_sessions = max_sessions
    self._sessions: dict[str, Session] = {}

Functions

fastvideo.entrypoints.streaming.session.SessionManager.reap_timed_out
reap_timed_out(now: float | None = None) -> list[str]

Return the ids of sessions that have exceeded the idle timeout.

The caller is responsible for actually closing them — this method only identifies dead sessions so the server can emit session_timeout frames before dropping the WebSocket.

TODO: unused until a background driver calls it. Per-connection idle enforcement currently happens via asyncio.wait_for on receive_json; this helper catches sessions stuck before any receive (e.g. future QUEUED state) and is expected to be wired into the GPU-pool reaper.

Source code in fastvideo/entrypoints/streaming/session.py
def reap_timed_out(self, now: float | None = None) -> list[str]:
    """Return the ids of sessions that have exceeded the idle timeout.

    The caller is responsible for actually closing them — this
    method only *identifies* dead sessions so the server can emit
    ``session_timeout`` frames before dropping the WebSocket.

    TODO: unused until a background driver calls it. Per-connection
    idle enforcement currently happens via asyncio.wait_for on
    receive_json; this helper catches sessions stuck before any
    receive (e.g. future QUEUED state) and is expected to be wired
    into the GPU-pool reaper.
    """
    now = now if now is not None else time.monotonic()
    dead: list[str] = []
    for sid, session in self._sessions.items():
        if session.state in {
                SessionState.COMPLETE,
                SessionState.ERROR,
                SessionState.TIMEOUT,
                SessionState.REJECTED,
        }:
            continue
        if now - session.last_activity > self._session_timeout_seconds:
            dead.append(sid)
    return dead

fastvideo.entrypoints.streaming.session.SessionRejected

Bases: RuntimeError

Raised when session creation fails (queue full, auth, etc.).

fastvideo.entrypoints.streaming.session.SessionState

Bases: Enum

State-machine positions for a streaming session.

Transitions are server-owned. See docs/design/server_contracts/streaming.md for the full diagram.