Skip to content

prompt

Prompt pipeline for the streaming server.

  • :mod:providers — LLM backend abstraction + built-in adapters
  • :mod:enhancer — provider-agnostic enhance / auto-extend / rewrite operations on top of the provider layer

All of this is optional; the streaming server runs fine without it (PR 7.5's skeleton never invokes the enhancer). When the operator enables ServeConfig.streaming.prompt.enabled, the server routes each session_init_v2 curated prompt through enhance before the first segment.

Classes

fastvideo.entrypoints.streaming.prompt.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.prompt.LLMProviderError

LLMProviderError(message: str, *, retryable: bool = True)

Bases: RuntimeError

Raised when an LLM provider fails a request.

retryable controls whether the enhancer falls back to the next provider. It is settable per-instance so the same exception type can describe retryable transport errors (5xx, 429) and non-retryable client errors (4xx auth/bad-request) without forcing a separate subclass for every status family.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str, *, retryable: bool = True) -> None:
    super().__init__(message)
    self.retryable = retryable

fastvideo.entrypoints.streaming.prompt.LLMTimeoutError

LLMTimeoutError(message: str)

Bases: LLMProviderError

Raised when an LLM provider times out — always retryable.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str) -> None:
    super().__init__(message, retryable=True)

fastvideo.entrypoints.streaming.prompt.PromptEnhancer

PromptEnhancer(*, providers: Sequence[LLMProvider], model: str, timeout_ms: int = 20000, temperature: float = 0.7, max_tokens: int | None = 256, system_prompt_dir: str | None = None)

Orchestrates prompt operations across a priority-ordered provider list with structured fallback + hot-reloadable system prompts.

Usage::

enhancer = PromptEnhancer(
    providers=[CerebrasProvider(), GroqProvider()],
    model="gpt-oss-120b",
    system_prompt_dir="/etc/fastvideo/prompts",
)
response = await enhancer.enhance("a fox running through snow")
Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def __init__(
    self,
    *,
    providers: Sequence[LLMProvider],
    model: str,
    timeout_ms: int = 20000,
    temperature: float = 0.7,
    max_tokens: int | None = 256,
    system_prompt_dir: str | None = None,
) -> None:
    if not providers:
        raise ValueError("PromptEnhancer requires at least one LLMProvider")
    self._providers = list(providers)
    self._model = model
    self._timeout_ms = timeout_ms
    self._temperature = temperature
    self._max_tokens = max_tokens
    self._system_prompt_dir = system_prompt_dir
    self._system_prompts = self._load_system_prompts()

Functions

fastvideo.entrypoints.streaming.prompt.PromptEnhancer.register_provider
register_provider(provider: LLMProvider, *, priority: int = -1) -> None

Insert an additional provider. priority=0 makes it primary; priority=-1 (default) appends as a fallback.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def register_provider(self, provider: LLMProvider, *, priority: int = -1) -> None:
    """Insert an additional provider. ``priority=0`` makes it primary;
    ``priority=-1`` (default) appends as a fallback."""
    if priority < 0:
        self._providers.append(provider)
    else:
        self._providers.insert(priority, provider)
fastvideo.entrypoints.streaming.prompt.PromptEnhancer.reload_system_prompts
reload_system_prompts() -> None

Re-read the system prompt files from system_prompt_dir.

The streaming server exposes this via a management endpoint so operators can iterate on prompt templates without restarting workers.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def reload_system_prompts(self) -> None:
    """Re-read the system prompt files from ``system_prompt_dir``.

    The streaming server exposes this via a management endpoint so
    operators can iterate on prompt templates without restarting
    workers.
    """
    self._system_prompts = self._load_system_prompts()
    logger.info("prompt enhancer: reloaded system prompts from %s", self._system_prompt_dir or "defaults")

Modules

fastvideo.entrypoints.streaming.prompt.enhancer

Provider-agnostic prompt orchestration for the streaming server.

Three operations the streaming server needs:

  • enhance — polish a user prompt (add cinematic detail, fix syntax)
  • auto_extend — generate a follow-on prompt for loop generation
  • rewrite — rewrite a seed prompt for a user-directed rewrite flow

All three share the same orchestration: pick a provider in priority order, submit an LLMRequest, fall back to the next provider on retryable errors, and surface a structured :class:LLMResponse back to the caller.

System prompts are loaded from system_prompt_dir on construction and can be hot-reloaded via :meth:PromptEnhancer.reload_system_prompts. The streaming server's management endpoint calls that method in response to a rewrite_seed_prompts_started frame.

Classes

fastvideo.entrypoints.streaming.prompt.enhancer.PromptEnhancer
PromptEnhancer(*, providers: Sequence[LLMProvider], model: str, timeout_ms: int = 20000, temperature: float = 0.7, max_tokens: int | None = 256, system_prompt_dir: str | None = None)

Orchestrates prompt operations across a priority-ordered provider list with structured fallback + hot-reloadable system prompts.

Usage::

enhancer = PromptEnhancer(
    providers=[CerebrasProvider(), GroqProvider()],
    model="gpt-oss-120b",
    system_prompt_dir="/etc/fastvideo/prompts",
)
response = await enhancer.enhance("a fox running through snow")
Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def __init__(
    self,
    *,
    providers: Sequence[LLMProvider],
    model: str,
    timeout_ms: int = 20000,
    temperature: float = 0.7,
    max_tokens: int | None = 256,
    system_prompt_dir: str | None = None,
) -> None:
    if not providers:
        raise ValueError("PromptEnhancer requires at least one LLMProvider")
    self._providers = list(providers)
    self._model = model
    self._timeout_ms = timeout_ms
    self._temperature = temperature
    self._max_tokens = max_tokens
    self._system_prompt_dir = system_prompt_dir
    self._system_prompts = self._load_system_prompts()
Functions
fastvideo.entrypoints.streaming.prompt.enhancer.PromptEnhancer.register_provider
register_provider(provider: LLMProvider, *, priority: int = -1) -> None

Insert an additional provider. priority=0 makes it primary; priority=-1 (default) appends as a fallback.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def register_provider(self, provider: LLMProvider, *, priority: int = -1) -> None:
    """Insert an additional provider. ``priority=0`` makes it primary;
    ``priority=-1`` (default) appends as a fallback."""
    if priority < 0:
        self._providers.append(provider)
    else:
        self._providers.insert(priority, provider)
fastvideo.entrypoints.streaming.prompt.enhancer.PromptEnhancer.reload_system_prompts
reload_system_prompts() -> None

Re-read the system prompt files from system_prompt_dir.

The streaming server exposes this via a management endpoint so operators can iterate on prompt templates without restarting workers.

Source code in fastvideo/entrypoints/streaming/prompt/enhancer.py
def reload_system_prompts(self) -> None:
    """Re-read the system prompt files from ``system_prompt_dir``.

    The streaming server exposes this via a management endpoint so
    operators can iterate on prompt templates without restarting
    workers.
    """
    self._system_prompts = self._load_system_prompts()
    logger.info("prompt enhancer: reloaded system prompts from %s", self._system_prompt_dir or "defaults")

Functions

fastvideo.entrypoints.streaming.prompt.providers

LLM provider implementations used by the prompt enhancer.

Classes

fastvideo.entrypoints.streaming.prompt.providers.CerebrasProvider dataclass
CerebrasProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'cerebras')

Cerebras inference adapter.

api_key falls back to CEREBRAS_API_KEY when unset.

fastvideo.entrypoints.streaming.prompt.providers.GroqProvider dataclass
GroqProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'groq')

Groq inference adapter.

Identical wire format to :class:CerebrasProvider; both go through :func:complete_openai_compatible. The two providers differ only in base URL, env var, and model id conventions.

fastvideo.entrypoints.streaming.prompt.providers.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.prompt.providers.LLMProviderError
LLMProviderError(message: str, *, retryable: bool = True)

Bases: RuntimeError

Raised when an LLM provider fails a request.

retryable controls whether the enhancer falls back to the next provider. It is settable per-instance so the same exception type can describe retryable transport errors (5xx, 429) and non-retryable client errors (4xx auth/bad-request) without forcing a separate subclass for every status family.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str, *, retryable: bool = True) -> None:
    super().__init__(message)
    self.retryable = retryable
fastvideo.entrypoints.streaming.prompt.providers.LLMTimeoutError
LLMTimeoutError(message: str)

Bases: LLMProviderError

Raised when an LLM provider times out — always retryable.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str) -> None:
    super().__init__(message, retryable=True)

Modules

fastvideo.entrypoints.streaming.prompt.providers.base

LLM provider protocol + DTOs used by the prompt enhancer.

Third-party users add a new provider by implementing :class:LLMProvider and registering it with a prompt enhancer instance. The shipped providers live in sibling modules (cerebras.py, groq.py) and each is ~100-200 LOC — the provider layer is intentionally thin so the enhancer stays provider-agnostic.

Classes
fastvideo.entrypoints.streaming.prompt.providers.base.LLMProvider

Bases: Protocol

Provider interface every LLM adapter implements.

Providers are async-first because every built-in implementation talks to an HTTP API. Synchronous providers can wrap their call in asyncio.to_thread internally.

fastvideo.entrypoints.streaming.prompt.providers.base.LLMProviderError
LLMProviderError(message: str, *, retryable: bool = True)

Bases: RuntimeError

Raised when an LLM provider fails a request.

retryable controls whether the enhancer falls back to the next provider. It is settable per-instance so the same exception type can describe retryable transport errors (5xx, 429) and non-retryable client errors (4xx auth/bad-request) without forcing a separate subclass for every status family.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str, *, retryable: bool = True) -> None:
    super().__init__(message)
    self.retryable = retryable
fastvideo.entrypoints.streaming.prompt.providers.base.LLMTimeoutError
LLMTimeoutError(message: str)

Bases: LLMProviderError

Raised when an LLM provider times out — always retryable.

Source code in fastvideo/entrypoints/streaming/prompt/providers/base.py
def __init__(self, message: str) -> None:
    super().__init__(message, retryable=True)
fastvideo.entrypoints.streaming.prompt.providers.cerebras

Cerebras LLM provider (OpenAI-compatible chat endpoint).

Classes
fastvideo.entrypoints.streaming.prompt.providers.cerebras.CerebrasProvider dataclass
CerebrasProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'cerebras')

Cerebras inference adapter.

api_key falls back to CEREBRAS_API_KEY when unset.

Functions
fastvideo.entrypoints.streaming.prompt.providers.groq

Groq LLM provider (OpenAI-compatible chat endpoint).

Classes
fastvideo.entrypoints.streaming.prompt.providers.groq.GroqProvider dataclass
GroqProvider(api_key: str | None = None, base_url: str = _DEFAULT_BASE_URL, name: str = 'groq')

Groq inference adapter.

Identical wire format to :class:CerebrasProvider; both go through :func:complete_openai_compatible. The two providers differ only in base URL, env var, and model id conventions.

Functions

fastvideo.entrypoints.streaming.prompt.rewrite

Rewrite payload builder.

The UI's "rewrite seed prompts" flow asks the enhancer to produce a batch of alternative prompts given one seed. This module packages the seed + options into the payload the enhancer expects and unpacks the response back into a typed :class:RewriteResult.

Separating this from :mod:enhancer keeps the enhancer provider- agnostic; anything UI-specific (how many alternatives to request, how to split the response, temperature) lives here.

Classes

fastvideo.entrypoints.streaming.prompt.rewrite.RewriteOptions dataclass
RewriteOptions(count: int = 3, temperature: float | None = None)
Attributes
fastvideo.entrypoints.streaming.prompt.rewrite.RewriteOptions.count class-attribute instance-attribute
count: int = 3

Number of alternative prompts to request.

Functions

fastvideo.entrypoints.streaming.prompt.rewrite.build_rewrite async
build_rewrite(enhancer: PromptEnhancer, seed_prompt: str, *, options: RewriteOptions | None = None) -> RewriteResult

Run a rewrite op through the enhancer and return a typed result.

Source code in fastvideo/entrypoints/streaming/prompt/rewrite.py
async def build_rewrite(
    enhancer: PromptEnhancer,
    seed_prompt: str,
    *,
    options: RewriteOptions | None = None,
) -> RewriteResult:
    """Run a rewrite op through the enhancer and return a typed result."""
    if not seed_prompt.strip():
        raise ValueError("rewrite seed prompt must be non-empty")
    options = options or RewriteOptions()
    response = await enhancer.rewrite(seed_prompt)
    alternatives = _split_response(response.content, limit=options.count)
    return RewriteResult(
        seed_prompt=seed_prompt,
        alternatives=alternatives,
        provider=response.provider,
        model=response.model,
        latency_ms=response.latency_ms,
        fallback_used=response.fallback_used,
    )

fastvideo.entrypoints.streaming.prompt.safety

Optional prompt safety filter.

Uses a fastText classifier to score prompts against a banned-content rubric. Only loaded when ServeConfig.streaming.safety.enabled is True and fastText is installed — users who don't need it see no runtime cost.

Install: pip install fastvideo[prompt-safety] (ships fasttext as an optional extra) or install fasttext directly.

Classes

fastvideo.entrypoints.streaming.prompt.safety.PromptSafetyFilter
PromptSafetyFilter(*, classifier_path: str | None, enabled: bool = True, block_threshold: float = 0.5)

Minimal fastText-backed prompt safety filter.

Loads the classifier lazily on first use so the streaming server can construct the filter eagerly at startup without paying the model-load cost when safety is disabled.

Source code in fastvideo/entrypoints/streaming/prompt/safety.py
def __init__(
    self,
    *,
    classifier_path: str | None,
    enabled: bool = True,
    block_threshold: float = 0.5,
) -> None:
    self._classifier_path = classifier_path
    self._enabled = enabled
    self._block_threshold = block_threshold
    self._model: Any | None = None
    self._load_attempted = False
    self._load_lock = threading.Lock()
fastvideo.entrypoints.streaming.prompt.safety.SafetyDecision

Bases: Enum

Attributes
fastvideo.entrypoints.streaming.prompt.safety.SafetyDecision.UNAVAILABLE class-attribute instance-attribute
UNAVAILABLE = 'unavailable'

Returned when the classifier can't run (not configured, fastText missing). Safety is opt-in; the server treats UNAVAILABLE as ALLOW but logs it so operators know the filter is off.

Functions

fastvideo.entrypoints.streaming.prompt.safety.first_blocked
first_blocked(filter_: PromptSafetyFilter, prompts: list[str]) -> SafetyResult | None

Return the first prompt the filter blocks, or None.

Source code in fastvideo/entrypoints/streaming/prompt/safety.py
def first_blocked(
    filter_: PromptSafetyFilter,
    prompts: list[str],
) -> SafetyResult | None:
    """Return the first prompt the filter blocks, or ``None``."""
    for prompt in prompts:
        result = filter_.classify(prompt)
        if result.decision is SafetyDecision.BLOCK:
            return result
    return None