Skip to content

magi_human_pipeline

MagiHuman base text-to-AV pipeline.

Top-level composition for the daVinci-MagiHuman base model. Wires:

InputValidationStage -> TextEncodingStage (T5-Gemma)
    -> MagiHumanLatentPreparationStage
    -> MagiHumanDenoisingStage
    -> DecodingStage (Wan 2.2 TI2V-5B VAE decode for video)
    -> MagiHumanAudioDecodingStage (Stable Audio Open 1.0 VAE decode)

The base checkpoint is a joint audio-visual generator; both the video and audio paths run in the denoising loop and both are decoded.

load_modules is overridden so the four cross-variant shared components (text_encoder, tokenizer, audio_vae, video vae) lazy-load from their canonical upstream HF repos at first build time instead of being bundled inside every converted MagiHuman variant. This keeps each variant's converted repo at ~5-30 GB (transformer + scheduler + model_index.json) instead of ~30-55 GB, and lets all variants share the same ~25 GB of cached upstream weights.

Classes

fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanI2VPipeline

MagiHumanI2VPipeline(model_path: str, fastvideo_args: FastVideoArgs | TrainingArgs, required_config_modules: list[str] | None = None, loaded_modules: dict[str, Module] | None = None)

Bases: MagiHumanPipeline

MagiHuman text+image-to-AV pipeline using the T2V DiT weights.

Source code in fastvideo/pipelines/composed_pipeline_base.py
def __init__(self,
             model_path: str,
             fastvideo_args: FastVideoArgs | TrainingArgs,
             required_config_modules: list[str] | None = None,
             loaded_modules: dict[str, torch.nn.Module] | None = None):
    """
    Initialize the pipeline. After __init__, the pipeline should be ready to
    use. The pipeline should be stateless and not hold any batch state.
    """
    self.fastvideo_args = fastvideo_args

    self.model_path: str = model_path
    self._stages: list[PipelineStage] = []
    self._stage_name_mapping: dict[str, PipelineStage] = {}
    self._trace_mgr = None

    if required_config_modules is not None:
        self._required_config_modules = required_config_modules

    if self._required_config_modules is None:
        raise NotImplementedError("Subclass must set _required_config_modules")

    maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)

    # Torch profiler. Enabled and configured through env vars:
    # FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
    trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
    self.profiler_controller = get_or_create_profiler(trace_dir)
    self.profiler = self.profiler_controller.profiler

    self.local_rank = get_world_group().local_rank

    # Load modules directly in initialization
    logger.info("Loading pipeline modules...")
    with self.profiler_controller.region("profiler_region_model_loading"):
        self.modules = self.load_modules(fastvideo_args, loaded_modules)

fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanPipeline

MagiHumanPipeline(model_path: str, fastvideo_args: FastVideoArgs | TrainingArgs, required_config_modules: list[str] | None = None, loaded_modules: dict[str, Module] | None = None)

Bases: ComposedPipelineBase

Base MagiHuman text-to-AV pipeline (no LoRA, no distill, no SR).

Source code in fastvideo/pipelines/composed_pipeline_base.py
def __init__(self,
             model_path: str,
             fastvideo_args: FastVideoArgs | TrainingArgs,
             required_config_modules: list[str] | None = None,
             loaded_modules: dict[str, torch.nn.Module] | None = None):
    """
    Initialize the pipeline. After __init__, the pipeline should be ready to
    use. The pipeline should be stateless and not hold any batch state.
    """
    self.fastvideo_args = fastvideo_args

    self.model_path: str = model_path
    self._stages: list[PipelineStage] = []
    self._stage_name_mapping: dict[str, PipelineStage] = {}
    self._trace_mgr = None

    if required_config_modules is not None:
        self._required_config_modules = required_config_modules

    if self._required_config_modules is None:
        raise NotImplementedError("Subclass must set _required_config_modules")

    maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)

    # Torch profiler. Enabled and configured through env vars:
    # FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
    trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
    self.profiler_controller = get_or_create_profiler(trace_dir)
    self.profiler = self.profiler_controller.profiler

    self.local_rank = get_world_group().local_rank

    # Load modules directly in initialization
    logger.info("Loading pipeline modules...")
    with self.profiler_controller.region("profiler_region_model_loading"):
        self.modules = self.load_modules(fastvideo_args, loaded_modules)

Functions

fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanPipeline.load_modules
load_modules(fastvideo_args: FastVideoArgs, loaded_modules: dict[str, Any] | None = None) -> dict[str, Any]

Load the variant-specific transformer + scheduler from the converted MagiHuman repo and lazy-load the four cross-variant shared components from their canonical upstream HF repos:

* text_encoder, tokenizer -> google/t5gemma-9b-9b-ul2 (gated, requires HF token with accepted terms of use) * audio_vae -> stabilityai/stable-audio-open-1.0 (gated) * vae -> Wan-AI/Wan2.2-TI2V-5B-Diffusers

Backwards-compatible with bundled converted repos: if any of these subfolders is present locally and listed in model_index.json, the standard component loader picks it up via super(). Otherwise the loader is told to skip the entry and we lazy-load it here.

Source code in fastvideo/pipelines/basic/magi_human/magi_human_pipeline.py
def load_modules(
    self,
    fastvideo_args: FastVideoArgs,
    loaded_modules: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Load the variant-specific transformer + scheduler from the
    converted MagiHuman repo and lazy-load the four cross-variant
    shared components from their canonical upstream HF repos:

      * text_encoder, tokenizer -> ``google/t5gemma-9b-9b-ul2``
        (gated, requires HF token with accepted terms of use)
      * audio_vae -> ``stabilityai/stable-audio-open-1.0`` (gated)
      * vae -> ``Wan-AI/Wan2.2-TI2V-5B-Diffusers``

    Backwards-compatible with bundled converted repos: if any of
    these subfolders is present locally and listed in
    ``model_index.json``, the standard component loader picks it up
    via super(). Otherwise the loader is told to skip the entry and
    we lazy-load it here.
    """
    # T5-Gemma is gated: expose `HF_API_KEY` as `HF_TOKEN` if needed.
    _ensure_hf_token_env()

    # Resolve to a local cache path so we can inspect
    # model_index.json before invoking super(). `maybe_download_model`
    # is idempotent for local paths; super() repeats the call cheaply
    # via `_load_config`.
    local_path = maybe_download_model(self.model_path)

    # Identify which cross-variant shared keys are bundled in the
    # converted repo (declared in model_index.json with a non-null
    # spec) versus absent (the umbrella scheme). Bundled keys stay
    # in `required_config_modules` and are loaded normally by super()
    # from `<model_path>/<key>/`. Absent keys are temporarily
    # dropped so super() does not fail the "every required entry
    # must appear in model_index.json" check, then lazy-loaded
    # below.
    model_index: dict[str, Any] = {}
    try:
        with open(Path(local_path) / "model_index.json") as f:
            model_index = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        pass

    def _is_bundled(key: str) -> bool:
        spec = model_index.get(key)
        return (isinstance(spec, list | tuple) and len(spec) >= 1 and spec[0] is not None)

    deferred = []
    for key in ("text_encoder", "tokenizer", "audio_vae", "vae"):
        if key in self.required_config_modules and not _is_bundled(key):
            self.required_config_modules.remove(key)
            deferred.append(key)

    try:
        modules = super().load_modules(fastvideo_args, loaded_modules)
    finally:
        for key in deferred:
            if key not in self.required_config_modules:
                self.required_config_modules.append(key)

    # For each lazy-load key, prefer whatever super() already loaded
    # (a bundled subfolder, or a caller-provided override merged in
    # via `loaded_modules`). Fall back to the caller-provided
    # `loaded_modules` entry for keys absent from model_index.json
    # (super() never iterates those). Otherwise lazy-load from the
    # canonical upstream HF repo.
    def _resolve(key: str) -> bool:
        """Return True if `modules[key]` is already populated."""
        if modules.get(key) is not None:
            return True
        if loaded_modules and key in loaded_modules:
            modules[key] = loaded_modules[key]
            return True
        return False

    if not _resolve("text_encoder"):
        logger.info("Building T5-Gemma text encoder (lazy-load from %s)", _T5GEMMA_HF_ID)
        enc_config = T5GemmaEncoderConfig()
        enc_config.arch_config.t5gemma_model_path = _T5GEMMA_HF_ID
        modules["text_encoder"] = T5GemmaEncoderModel(enc_config)

    if not _resolve("tokenizer"):
        logger.info("Loading T5-Gemma tokenizer from %s", _T5GEMMA_HF_ID)
        modules["tokenizer"] = AutoTokenizer.from_pretrained(_T5GEMMA_HF_ID)

    if not _resolve("audio_vae"):
        logger.info(
            "Building Stable Audio Open 1.0 VAE (lazy-load from %s) — "
            "requires HF terms accepted for gated repo",
            _SA_AUDIO_HF_ID,
        )
        audio_config = OobleckVAEConfig()
        audio_config.pretrained_path = _SA_AUDIO_HF_ID
        modules["audio_vae"] = SAAudioVAEModel(audio_config)

    if not _resolve("vae"):
        modules["vae"] = self._load_video_vae(fastvideo_args)

    return modules

fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanSR1080pI2VPipeline

MagiHumanSR1080pI2VPipeline(model_path: str, fastvideo_args: FastVideoArgs | TrainingArgs, required_config_modules: list[str] | None = None, loaded_modules: dict[str, Module] | None = None)

Bases: MagiHumanSRI2VPipeline

Two-stage MagiHuman base + SR-1080p text+image-to-AV pipeline.

Source code in fastvideo/pipelines/composed_pipeline_base.py
def __init__(self,
             model_path: str,
             fastvideo_args: FastVideoArgs | TrainingArgs,
             required_config_modules: list[str] | None = None,
             loaded_modules: dict[str, torch.nn.Module] | None = None):
    """
    Initialize the pipeline. After __init__, the pipeline should be ready to
    use. The pipeline should be stateless and not hold any batch state.
    """
    self.fastvideo_args = fastvideo_args

    self.model_path: str = model_path
    self._stages: list[PipelineStage] = []
    self._stage_name_mapping: dict[str, PipelineStage] = {}
    self._trace_mgr = None

    if required_config_modules is not None:
        self._required_config_modules = required_config_modules

    if self._required_config_modules is None:
        raise NotImplementedError("Subclass must set _required_config_modules")

    maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)

    # Torch profiler. Enabled and configured through env vars:
    # FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
    trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
    self.profiler_controller = get_or_create_profiler(trace_dir)
    self.profiler = self.profiler_controller.profiler

    self.local_rank = get_world_group().local_rank

    # Load modules directly in initialization
    logger.info("Loading pipeline modules...")
    with self.profiler_controller.region("profiler_region_model_loading"):
        self.modules = self.load_modules(fastvideo_args, loaded_modules)

fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanSR1080pPipeline

MagiHumanSR1080pPipeline(model_path: str, fastvideo_args: FastVideoArgs | TrainingArgs, required_config_modules: list[str] | None = None, loaded_modules: dict[str, Module] | None = None)

Bases: MagiHumanSRPipeline

Two-stage MagiHuman base + SR-1080p text-to-AV pipeline.

The stage chain is identical to SR-540p. The paired pipeline config enables block-sparse local-window attention on 32 SR-DiT layers and requests the 1080p latent target.

Source code in fastvideo/pipelines/composed_pipeline_base.py
def __init__(self,
             model_path: str,
             fastvideo_args: FastVideoArgs | TrainingArgs,
             required_config_modules: list[str] | None = None,
             loaded_modules: dict[str, torch.nn.Module] | None = None):
    """
    Initialize the pipeline. After __init__, the pipeline should be ready to
    use. The pipeline should be stateless and not hold any batch state.
    """
    self.fastvideo_args = fastvideo_args

    self.model_path: str = model_path
    self._stages: list[PipelineStage] = []
    self._stage_name_mapping: dict[str, PipelineStage] = {}
    self._trace_mgr = None

    if required_config_modules is not None:
        self._required_config_modules = required_config_modules

    if self._required_config_modules is None:
        raise NotImplementedError("Subclass must set _required_config_modules")

    maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)

    # Torch profiler. Enabled and configured through env vars:
    # FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
    trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
    self.profiler_controller = get_or_create_profiler(trace_dir)
    self.profiler = self.profiler_controller.profiler

    self.local_rank = get_world_group().local_rank

    # Load modules directly in initialization
    logger.info("Loading pipeline modules...")
    with self.profiler_controller.region("profiler_region_model_loading"):
        self.modules = self.load_modules(fastvideo_args, loaded_modules)

fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanSRI2VPipeline

MagiHumanSRI2VPipeline(model_path: str, fastvideo_args: FastVideoArgs | TrainingArgs, required_config_modules: list[str] | None = None, loaded_modules: dict[str, Module] | None = None)

Bases: MagiHumanSRPipeline

Two-stage MagiHuman base + SR-540p text+image-to-AV pipeline.

Source code in fastvideo/pipelines/composed_pipeline_base.py
def __init__(self,
             model_path: str,
             fastvideo_args: FastVideoArgs | TrainingArgs,
             required_config_modules: list[str] | None = None,
             loaded_modules: dict[str, torch.nn.Module] | None = None):
    """
    Initialize the pipeline. After __init__, the pipeline should be ready to
    use. The pipeline should be stateless and not hold any batch state.
    """
    self.fastvideo_args = fastvideo_args

    self.model_path: str = model_path
    self._stages: list[PipelineStage] = []
    self._stage_name_mapping: dict[str, PipelineStage] = {}
    self._trace_mgr = None

    if required_config_modules is not None:
        self._required_config_modules = required_config_modules

    if self._required_config_modules is None:
        raise NotImplementedError("Subclass must set _required_config_modules")

    maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)

    # Torch profiler. Enabled and configured through env vars:
    # FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
    trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
    self.profiler_controller = get_or_create_profiler(trace_dir)
    self.profiler = self.profiler_controller.profiler

    self.local_rank = get_world_group().local_rank

    # Load modules directly in initialization
    logger.info("Loading pipeline modules...")
    with self.profiler_controller.region("profiler_region_model_loading"):
        self.modules = self.load_modules(fastvideo_args, loaded_modules)

fastvideo.pipelines.basic.magi_human.magi_human_pipeline.MagiHumanSRPipeline

MagiHumanSRPipeline(model_path: str, fastvideo_args: FastVideoArgs | TrainingArgs, required_config_modules: list[str] | None = None, loaded_modules: dict[str, Module] | None = None)

Bases: MagiHumanPipeline

Two-stage MagiHuman base + SR-540p text-to-AV pipeline.

Source code in fastvideo/pipelines/composed_pipeline_base.py
def __init__(self,
             model_path: str,
             fastvideo_args: FastVideoArgs | TrainingArgs,
             required_config_modules: list[str] | None = None,
             loaded_modules: dict[str, torch.nn.Module] | None = None):
    """
    Initialize the pipeline. After __init__, the pipeline should be ready to
    use. The pipeline should be stateless and not hold any batch state.
    """
    self.fastvideo_args = fastvideo_args

    self.model_path: str = model_path
    self._stages: list[PipelineStage] = []
    self._stage_name_mapping: dict[str, PipelineStage] = {}
    self._trace_mgr = None

    if required_config_modules is not None:
        self._required_config_modules = required_config_modules

    if self._required_config_modules is None:
        raise NotImplementedError("Subclass must set _required_config_modules")

    maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)

    # Torch profiler. Enabled and configured through env vars:
    # FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
    trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
    self.profiler_controller = get_or_create_profiler(trace_dir)
    self.profiler = self.profiler_controller.profiler

    self.local_rank = get_world_group().local_rank

    # Load modules directly in initialization
    logger.info("Loading pipeline modules...")
    with self.profiler_controller.region("profiler_region_model_loading"):
        self.modules = self.load_modules(fastvideo_args, loaded_modules)

Functions