Skip to content

session_store

Session state store for the FastVideo streaming server.

The streaming server keeps continuation state (decoded frames + audio latents from the previous segment) server-side so the client doesn't re-upload multi-megabyte tensors each WebSocket message. Two operations are needed:

  • snapshot(session_id) -> ContinuationState — serialize the current state so it can be exported (e.g. over HTTP) or migrated to a different server.
  • hydrate(state) -> session_id — load a previously serialized state into a new session (for resume-after-disconnect flows).

The store is an ABC with an :class:InMemorySessionStore default; Redis or other backends can drop in without touching the pipeline.

Large tensor payloads (video frames, audio latents) are kept out of the JSON payload via an accompanying :class:BlobStore. Both stores share a process today; they are separate types so that a future implementation can put blobs on S3 while keeping session metadata in Redis.

Classes

fastvideo.entrypoints.streaming.session_store.BlobStore

Bases: ABC

Opaque byte-blob storage keyed by id.

A :class:ContinuationState payload can reference large tensors stored in a :class:BlobStore rather than inlining them, so the JSON payload stays small when the state travels over the wire.

Functions

fastvideo.entrypoints.streaming.session_store.BlobStore.drop abstractmethod
drop(blob_id: str) -> None

Remove a blob. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, blob_id: str) -> None:
    """Remove a blob. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.session_store.BlobStore.get abstractmethod
get(blob_id: str) -> bytes

Load a previously stored blob. Raises KeyError if absent.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def get(self, blob_id: str) -> bytes:
    """Load a previously stored blob. Raises ``KeyError`` if absent."""
fastvideo.entrypoints.streaming.session_store.BlobStore.put abstractmethod
put(data: bytes, *, mime: str = 'application/octet-stream') -> str

Store data and return a blob id for later retrieval.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def put(self, data: bytes, *, mime: str = "application/octet-stream") -> str:
    """Store ``data`` and return a blob id for later retrieval."""

fastvideo.entrypoints.streaming.session_store.InMemoryBlobStore

InMemoryBlobStore()

Bases: BlobStore

Thread-safe in-memory :class:BlobStore for single-process servers.

No eviction policy — callers are responsible for calling :meth:drop when a blob's owning state is replaced or a session ends. A redis- or filesystem-backed :class:BlobStore should replace this when the streaming server lands as a real service (PR 7.5+).

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._blobs: dict[str, _BlobRecord] = {}
    self._lock = threading.Lock()

fastvideo.entrypoints.streaming.session_store.InMemorySessionStore

InMemorySessionStore()

Bases: SessionStore

Thread-safe in-memory :class:SessionStore.

Default implementation used by single-process deployments; a future Redis-backed store can be dropped in without changes to the server.

No eviction / TTL / bounded capacity — sessions only leave via :meth:drop. The live streaming server (PR 7.5+) is responsible for bounding growth and for dropping any :class:BlobStore blobs referenced by a state when that state is replaced or a session ends; this class does not know about blobs.

Source code in fastvideo/entrypoints/streaming/session_store.py
def __init__(self) -> None:
    self._sessions: dict[str, ContinuationState] = {}
    self._lock = threading.Lock()

fastvideo.entrypoints.streaming.session_store.SessionStore

Bases: ABC

Keyed store for per-session continuation state.

Implementations own the session-id → state mapping. The streaming server calls :meth:store after each segment and :meth:snapshot when a client explicitly asks for an exportable state handle.

Functions

fastvideo.entrypoints.streaming.session_store.SessionStore.drop abstractmethod
drop(session_id: str) -> None

Forget a session. Missing ids are a no-op.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def drop(self, session_id: str) -> None:
    """Forget a session. Missing ids are a no-op."""
fastvideo.entrypoints.streaming.session_store.SessionStore.hydrate abstractmethod
hydrate(state: ContinuationState, *, session_id: str | None = None) -> str

Install state as the starting point for a session.

When session_id is None the store allocates a fresh id (UUID4); when provided the store uses it verbatim, overwriting any prior state at that id.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def hydrate(
    self,
    state: ContinuationState,
    *,
    session_id: str | None = None,
) -> str:
    """Install ``state`` as the starting point for a session.

    When ``session_id`` is ``None`` the store allocates a fresh id
    (UUID4); when provided the store uses it verbatim, overwriting
    any prior state at that id.
    """
fastvideo.entrypoints.streaming.session_store.SessionStore.snapshot abstractmethod
snapshot(session_id: str) -> ContinuationState | None

Return the current state for session_id (or None).

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def snapshot(self, session_id: str) -> ContinuationState | None:
    """Return the current state for ``session_id`` (or ``None``)."""
fastvideo.entrypoints.streaming.session_store.SessionStore.store abstractmethod
store(session_id: str, state: ContinuationState) -> None

Persist state for session_id, replacing any prior value.

Source code in fastvideo/entrypoints/streaming/session_store.py
@abstractmethod
def store(self, session_id: str, state: ContinuationState) -> None:
    """Persist ``state`` for ``session_id``, replacing any prior value."""