MagiHuman base text-to-AV pipeline.
Top-level composition for the daVinci-MagiHuman base model. Wires:
InputValidationStage -> TextEncodingStage (T5-Gemma)
-> MagiHumanLatentPreparationStage
-> MagiHumanDenoisingStage
-> DecodingStage (Wan 2.2 TI2V-5B VAE decode for video)
-> MagiHumanAudioDecodingStage (Stable Audio Open 1.0 VAE decode)
The base checkpoint is a joint audio-visual generator; both the video
and audio paths run in the denoising loop and both are decoded.
load_modules is overridden so the four cross-variant shared components
(text_encoder, tokenizer, audio_vae, video vae) lazy-load from their
canonical upstream HF repos at first build time instead of being
bundled inside every converted MagiHuman variant. This keeps each
variant's converted repo at ~5-30 GB (transformer + scheduler +
model_index.json) instead of ~30-55 GB, and lets all variants share
the same ~25 GB of cached upstream weights.
Classes
fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanI2VPipeline
Bases: MagiHumanPipeline
MagiHuman text+image-to-AV pipeline using the T2V DiT weights.
Source code in fastvideo/pipelines/composed_pipeline_base.py
| def __init__(self,
model_path: str,
fastvideo_args: FastVideoArgs | TrainingArgs,
required_config_modules: list[str] | None = None,
loaded_modules: dict[str, torch.nn.Module] | None = None):
"""
Initialize the pipeline. After __init__, the pipeline should be ready to
use. The pipeline should be stateless and not hold any batch state.
"""
self.fastvideo_args = fastvideo_args
self.model_path: str = model_path
self._stages: list[PipelineStage] = []
self._stage_name_mapping: dict[str, PipelineStage] = {}
self._trace_mgr = None
if required_config_modules is not None:
self._required_config_modules = required_config_modules
if self._required_config_modules is None:
raise NotImplementedError("Subclass must set _required_config_modules")
maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)
# Torch profiler. Enabled and configured through env vars:
# FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
self.profiler_controller = get_or_create_profiler(trace_dir)
self.profiler = self.profiler_controller.profiler
self.local_rank = get_world_group().local_rank
# Load modules directly in initialization
logger.info("Loading pipeline modules...")
with self.profiler_controller.region("profiler_region_model_loading"):
self.modules = self.load_modules(fastvideo_args, loaded_modules)
|
fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanPipeline
Bases: ComposedPipelineBase
Base MagiHuman text-to-AV pipeline (no LoRA, no distill, no SR).
Source code in fastvideo/pipelines/composed_pipeline_base.py
| def __init__(self,
model_path: str,
fastvideo_args: FastVideoArgs | TrainingArgs,
required_config_modules: list[str] | None = None,
loaded_modules: dict[str, torch.nn.Module] | None = None):
"""
Initialize the pipeline. After __init__, the pipeline should be ready to
use. The pipeline should be stateless and not hold any batch state.
"""
self.fastvideo_args = fastvideo_args
self.model_path: str = model_path
self._stages: list[PipelineStage] = []
self._stage_name_mapping: dict[str, PipelineStage] = {}
self._trace_mgr = None
if required_config_modules is not None:
self._required_config_modules = required_config_modules
if self._required_config_modules is None:
raise NotImplementedError("Subclass must set _required_config_modules")
maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)
# Torch profiler. Enabled and configured through env vars:
# FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
self.profiler_controller = get_or_create_profiler(trace_dir)
self.profiler = self.profiler_controller.profiler
self.local_rank = get_world_group().local_rank
# Load modules directly in initialization
logger.info("Loading pipeline modules...")
with self.profiler_controller.region("profiler_region_model_loading"):
self.modules = self.load_modules(fastvideo_args, loaded_modules)
|
Functions
fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanPipeline.load_modules
Load the variant-specific transformer + scheduler from the
converted MagiHuman repo and lazy-load the four cross-variant
shared components from their canonical upstream HF repos:
* text_encoder, tokenizer -> google/t5gemma-9b-9b-ul2
(gated, requires HF token with accepted terms of use)
* audio_vae -> stabilityai/stable-audio-open-1.0 (gated)
* vae -> Wan-AI/Wan2.2-TI2V-5B-Diffusers
Backwards-compatible with bundled converted repos: if any of
these subfolders is present locally and listed in
model_index.json, the standard component loader picks it up
via super(). Otherwise the loader is told to skip the entry and
we lazy-load it here.
Source code in fastvideo/pipelines/basic/magi_human/magi_human_pipeline.py
| def load_modules(
self,
fastvideo_args: FastVideoArgs,
loaded_modules: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Load the variant-specific transformer + scheduler from the
converted MagiHuman repo and lazy-load the four cross-variant
shared components from their canonical upstream HF repos:
* text_encoder, tokenizer -> ``google/t5gemma-9b-9b-ul2``
(gated, requires HF token with accepted terms of use)
* audio_vae -> ``stabilityai/stable-audio-open-1.0`` (gated)
* vae -> ``Wan-AI/Wan2.2-TI2V-5B-Diffusers``
Backwards-compatible with bundled converted repos: if any of
these subfolders is present locally and listed in
``model_index.json``, the standard component loader picks it up
via super(). Otherwise the loader is told to skip the entry and
we lazy-load it here.
"""
# T5-Gemma is gated: expose `HF_API_KEY` as `HF_TOKEN` if needed.
_ensure_hf_token_env()
# Resolve to a local cache path so we can inspect
# model_index.json before invoking super(). `maybe_download_model`
# is idempotent for local paths; super() repeats the call cheaply
# via `_load_config`.
local_path = maybe_download_model(self.model_path)
# Identify which cross-variant shared keys are bundled in the
# converted repo (declared in model_index.json with a non-null
# spec) versus absent (the umbrella scheme). Bundled keys stay
# in `required_config_modules` and are loaded normally by super()
# from `<model_path>/<key>/`. Absent keys are temporarily
# dropped so super() does not fail the "every required entry
# must appear in model_index.json" check, then lazy-loaded
# below.
model_index: dict[str, Any] = {}
try:
with open(Path(local_path) / "model_index.json") as f:
model_index = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
pass
def _is_bundled(key: str) -> bool:
spec = model_index.get(key)
return (isinstance(spec, list | tuple) and len(spec) >= 1 and spec[0] is not None)
deferred = []
for key in ("text_encoder", "tokenizer", "audio_vae", "vae"):
if key in self.required_config_modules and not _is_bundled(key):
self.required_config_modules.remove(key)
deferred.append(key)
try:
modules = super().load_modules(fastvideo_args, loaded_modules)
finally:
for key in deferred:
if key not in self.required_config_modules:
self.required_config_modules.append(key)
# For each lazy-load key, prefer whatever super() already loaded
# (a bundled subfolder, or a caller-provided override merged in
# via `loaded_modules`). Fall back to the caller-provided
# `loaded_modules` entry for keys absent from model_index.json
# (super() never iterates those). Otherwise lazy-load from the
# canonical upstream HF repo.
def _resolve(key: str) -> bool:
"""Return True if `modules[key]` is already populated."""
if modules.get(key) is not None:
return True
if loaded_modules and key in loaded_modules:
modules[key] = loaded_modules[key]
return True
return False
if not _resolve("text_encoder"):
logger.info("Building T5-Gemma text encoder (lazy-load from %s)", _T5GEMMA_HF_ID)
enc_config = T5GemmaEncoderConfig()
enc_config.arch_config.t5gemma_model_path = _T5GEMMA_HF_ID
modules["text_encoder"] = T5GemmaEncoderModel(enc_config)
if not _resolve("tokenizer"):
logger.info("Loading T5-Gemma tokenizer from %s", _T5GEMMA_HF_ID)
modules["tokenizer"] = AutoTokenizer.from_pretrained(_T5GEMMA_HF_ID)
if not _resolve("audio_vae"):
logger.info(
"Building Stable Audio Open 1.0 VAE (lazy-load from %s) — "
"requires HF terms accepted for gated repo",
_SA_AUDIO_HF_ID,
)
audio_config = OobleckVAEConfig()
audio_config.pretrained_path = _SA_AUDIO_HF_ID
modules["audio_vae"] = SAAudioVAEModel(audio_config)
if not _resolve("vae"):
modules["vae"] = self._load_video_vae(fastvideo_args)
return modules
|
fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanSR1080pI2VPipeline
Bases: MagiHumanSRI2VPipeline
Two-stage MagiHuman base + SR-1080p text+image-to-AV pipeline.
Source code in fastvideo/pipelines/composed_pipeline_base.py
| def __init__(self,
model_path: str,
fastvideo_args: FastVideoArgs | TrainingArgs,
required_config_modules: list[str] | None = None,
loaded_modules: dict[str, torch.nn.Module] | None = None):
"""
Initialize the pipeline. After __init__, the pipeline should be ready to
use. The pipeline should be stateless and not hold any batch state.
"""
self.fastvideo_args = fastvideo_args
self.model_path: str = model_path
self._stages: list[PipelineStage] = []
self._stage_name_mapping: dict[str, PipelineStage] = {}
self._trace_mgr = None
if required_config_modules is not None:
self._required_config_modules = required_config_modules
if self._required_config_modules is None:
raise NotImplementedError("Subclass must set _required_config_modules")
maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)
# Torch profiler. Enabled and configured through env vars:
# FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
self.profiler_controller = get_or_create_profiler(trace_dir)
self.profiler = self.profiler_controller.profiler
self.local_rank = get_world_group().local_rank
# Load modules directly in initialization
logger.info("Loading pipeline modules...")
with self.profiler_controller.region("profiler_region_model_loading"):
self.modules = self.load_modules(fastvideo_args, loaded_modules)
|
fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanSR1080pPipeline
Bases: MagiHumanSRPipeline
Two-stage MagiHuman base + SR-1080p text-to-AV pipeline.
The stage chain is identical to SR-540p. The paired pipeline config enables
block-sparse local-window attention on 32 SR-DiT layers and requests the
1080p latent target.
Source code in fastvideo/pipelines/composed_pipeline_base.py
| def __init__(self,
model_path: str,
fastvideo_args: FastVideoArgs | TrainingArgs,
required_config_modules: list[str] | None = None,
loaded_modules: dict[str, torch.nn.Module] | None = None):
"""
Initialize the pipeline. After __init__, the pipeline should be ready to
use. The pipeline should be stateless and not hold any batch state.
"""
self.fastvideo_args = fastvideo_args
self.model_path: str = model_path
self._stages: list[PipelineStage] = []
self._stage_name_mapping: dict[str, PipelineStage] = {}
self._trace_mgr = None
if required_config_modules is not None:
self._required_config_modules = required_config_modules
if self._required_config_modules is None:
raise NotImplementedError("Subclass must set _required_config_modules")
maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)
# Torch profiler. Enabled and configured through env vars:
# FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
self.profiler_controller = get_or_create_profiler(trace_dir)
self.profiler = self.profiler_controller.profiler
self.local_rank = get_world_group().local_rank
# Load modules directly in initialization
logger.info("Loading pipeline modules...")
with self.profiler_controller.region("profiler_region_model_loading"):
self.modules = self.load_modules(fastvideo_args, loaded_modules)
|
fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanSRI2VPipeline
Bases: MagiHumanSRPipeline
Two-stage MagiHuman base + SR-540p text+image-to-AV pipeline.
Source code in fastvideo/pipelines/composed_pipeline_base.py
| def __init__(self,
model_path: str,
fastvideo_args: FastVideoArgs | TrainingArgs,
required_config_modules: list[str] | None = None,
loaded_modules: dict[str, torch.nn.Module] | None = None):
"""
Initialize the pipeline. After __init__, the pipeline should be ready to
use. The pipeline should be stateless and not hold any batch state.
"""
self.fastvideo_args = fastvideo_args
self.model_path: str = model_path
self._stages: list[PipelineStage] = []
self._stage_name_mapping: dict[str, PipelineStage] = {}
self._trace_mgr = None
if required_config_modules is not None:
self._required_config_modules = required_config_modules
if self._required_config_modules is None:
raise NotImplementedError("Subclass must set _required_config_modules")
maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)
# Torch profiler. Enabled and configured through env vars:
# FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
self.profiler_controller = get_or_create_profiler(trace_dir)
self.profiler = self.profiler_controller.profiler
self.local_rank = get_world_group().local_rank
# Load modules directly in initialization
logger.info("Loading pipeline modules...")
with self.profiler_controller.region("profiler_region_model_loading"):
self.modules = self.load_modules(fastvideo_args, loaded_modules)
|
fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanSRPipeline
Bases: MagiHumanPipeline
Two-stage MagiHuman base + SR-540p text-to-AV pipeline.
Source code in fastvideo/pipelines/composed_pipeline_base.py
| def __init__(self,
model_path: str,
fastvideo_args: FastVideoArgs | TrainingArgs,
required_config_modules: list[str] | None = None,
loaded_modules: dict[str, torch.nn.Module] | None = None):
"""
Initialize the pipeline. After __init__, the pipeline should be ready to
use. The pipeline should be stateless and not hold any batch state.
"""
self.fastvideo_args = fastvideo_args
self.model_path: str = model_path
self._stages: list[PipelineStage] = []
self._stage_name_mapping: dict[str, PipelineStage] = {}
self._trace_mgr = None
if required_config_modules is not None:
self._required_config_modules = required_config_modules
if self._required_config_modules is None:
raise NotImplementedError("Subclass must set _required_config_modules")
maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)
# Torch profiler. Enabled and configured through env vars:
# FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
self.profiler_controller = get_or_create_profiler(trace_dir)
self.profiler = self.profiler_controller.profiler
self.local_rank = get_world_group().local_rank
# Load modules directly in initialization
logger.info("Loading pipeline modules...")
with self.profiler_controller.region("profiler_region_model_loading"):
self.modules = self.load_modules(fastvideo_args, loaded_modules)
|
Functions