Skip to content

entrypoint

Modules

fastvideo.train.entrypoint.dcp_to_diffusers

Convert a DCP training checkpoint to a diffusers-style model directory.

Works on a single GPU regardless of how many GPUs were used for training (DCP handles resharding automatically).

Usage (no torchrun needed)::

python -m fastvideo.train.entrypoint.dcp_to_diffusers         --checkpoint /path/to/checkpoint-1000         --output-dir /path/to/diffusers_output

Or with torchrun (also fine)::

torchrun --nproc_per_node=1         -m fastvideo.train.entrypoint.dcp_to_diffusers         --checkpoint ... --output-dir ...

The checkpoint must contain metadata.json (written by CheckpointManager). If the checkpoint predates metadata support, pass --config explicitly to provide the training YAML.

Functions

fastvideo.train.entrypoint.dcp_to_diffusers.convert
convert(*, checkpoint_dir: str, output_dir: str, config_path: str | None = None, role: str = 'student', overwrite: bool = False) -> str

Load a DCP checkpoint and export as a diffusers model.

Returns the path to the exported model directory.

Source code in fastvideo/train/entrypoint/dcp_to_diffusers.py
def convert(
    *,
    checkpoint_dir: str,
    output_dir: str,
    config_path: str | None = None,
    role: str = "student",
    overwrite: bool = False,
) -> str:
    """Load a DCP checkpoint and export as a diffusers model.

    Returns the path to the exported model directory.
    """
    _ensure_distributed()

    from fastvideo.distributed import (
        maybe_init_distributed_environment_and_model_parallel, )
    from fastvideo.train.utils.builder import build_from_config
    from fastvideo.train.utils.checkpoint import (
        CheckpointManager,
        _resolve_resume_checkpoint,
    )
    from fastvideo.train.utils.config import (
        RunConfig,
        load_run_config,
    )

    import torch.distributed.checkpoint as dcp

    # -- Resolve checkpoint directory --
    resolved = _resolve_resume_checkpoint(
        checkpoint_dir,
        output_dir=checkpoint_dir,
    )
    dcp_dir = resolved / "dcp"
    if not dcp_dir.is_dir():
        raise FileNotFoundError(f"Missing dcp/ under {resolved}")

    # -- Obtain config --
    cfg: RunConfig
    if config_path is not None:
        cfg = load_run_config(config_path)
    else:
        metadata = CheckpointManager.load_metadata(resolved)
        raw_config = metadata.get("config")
        if raw_config is None:
            raise ValueError("Checkpoint metadata.json does not "
                             "contain 'config'. Pass --config "
                             "explicitly.")
        cfg = _run_config_from_raw(raw_config)

    tc = cfg.training

    # -- Init distributed (1 GPU is enough; DCP reshards) --
    maybe_init_distributed_environment_and_model_parallel(
        tp_size=1,
        sp_size=1,
    )

    # Override distributed config so model loading uses 1 GPU.
    tc.distributed.tp_size = 1
    tc.distributed.sp_size = 1
    tc.distributed.num_gpus = 1
    tc.distributed.hsdp_replicate_dim = 1
    tc.distributed.hsdp_shard_dim = 1

    # -- Build model (loads pretrained weights + FSDP) --
    _, method, _, _ = build_from_config(cfg)

    # -- Load DCP weights into the model --
    states = method.checkpoint_state()
    logger.info(
        "Loading DCP checkpoint from %s",
        resolved,
    )
    dcp.load(states, checkpoint_id=str(dcp_dir))

    # -- Export to diffusers format --
    model = method._role_models[role]
    base_model_path = str(tc.model_path)
    if not base_model_path:
        raise ValueError("Cannot determine base_model_path from "
                         "config. Ensure models.student.init_from "
                         "is set.")

    logger.info(
        "Exporting role=%s to %s (base=%s)",
        role,
        output_dir,
        base_model_path,
    )
    result = _save_role_pretrained(
        role=role,
        base_model_path=base_model_path,
        output_dir=output_dir,
        overwrite=overwrite,
        model=model,
    )
    logger.info("Export complete: %s", result)
    return result

fastvideo.train.entrypoint.misc

Modules

fastvideo.train.entrypoint.misc.wan_ode_init_conversion

Convert Self-Forcing ode_init.pt to HuggingFace diffusers format.

The official ode_init.pt from https://huggingface.co/gdhe17/Self-Forcing/resolve/main/checkpoints/ode_init.pt stores weights under {"generator": {<original_wan_keys>}}.

This script converts those keys to diffusers WanTransformer3DModel format, verifies them against a reference model, and saves a complete diffusers-compatible model directory (transformer + scheduler + vae + text_encoder + tokenizer).

Usage

python -m fastvideo.train.entrypoint.misc.wan_ode_init_conversion --input /path/to/ode_init.pt --output /path/to/WanOdeInit --base-model Wan-AI/Wan2.1-T2V-1.3B-Diffusers

Functions
fastvideo.train.entrypoint.misc.wan_ode_init_conversion.convert_state_dict
convert_state_dict(orig_sd: dict[str, Tensor]) -> dict[str, Tensor]

Convert an entire original-Wan state dict.

Source code in fastvideo/train/entrypoint/misc/wan_ode_init_conversion.py
def convert_state_dict(orig_sd: dict[str, torch.Tensor], ) -> dict[str, torch.Tensor]:
    """Convert an entire original-Wan state dict."""
    return {_convert_key(k): v for k, v in orig_sd.items()}

fastvideo.train.entrypoint.train

YAML-only training entrypoint.

Usage::

torchrun --nproc_per_node=<N> -m fastvideo.train.entrypoint.train         --config path/to/run.yaml

Any unknown --dotted.key value arguments are applied as overrides to the YAML config before parsing. For example::

torchrun --nproc_per_node=8 -m fastvideo.train.entrypoint.train         --config path/to/run.yaml         --training.distributed.num_gpus 8         --training.optimizer.learning_rate 1e-5

Functions

fastvideo.train.entrypoint.train.run_training_from_config
run_training_from_config(config_path: str, *, dry_run: bool = False, overrides: list[str] | None = None) -> None

YAML-only training entrypoint (schema v2).

Source code in fastvideo/train/entrypoint/train.py
def run_training_from_config(
    config_path: str,
    *,
    dry_run: bool = False,
    overrides: list[str] | None = None,
) -> None:
    """YAML-only training entrypoint (schema v2)."""

    from fastvideo.distributed import (
        maybe_init_distributed_environment_and_model_parallel, )
    from fastvideo.train import Trainer
    from fastvideo.train.utils.checkpoint import (
        CheckpointConfig,
        CheckpointManager,
    )
    from fastvideo.train.utils.builder import build_from_config
    from fastvideo.train.utils.config import load_run_config

    # Enable deterministic mode for reproducibility.
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

    cfg = load_run_config(config_path, overrides=overrides)
    tc = cfg.training

    # Auto-set attention backend for VSA when sparsity is configured.
    if tc.vsa_sparsity > 0.0:
        os.environ.setdefault(
            "FASTVIDEO_ATTENTION_BACKEND",
            "VIDEO_SPARSE_ATTN",
        )

    maybe_init_distributed_environment_and_model_parallel(
        tc.distributed.tp_size,
        tc.distributed.sp_size,
    )

    _, method, dataloader, start_step = build_from_config(cfg)

    if dry_run:
        logger.info("Dry-run: config parsed and "
                    "build_from_config succeeded.")
        return

    trainer = Trainer(
        tc,
        config=cfg.resolved_config(),
        callback_configs=cfg.callbacks,
    )

    # Attach the exact YAML used for this run to the
    # tracker (e.g., W&B Files).
    trainer.tracker.log_file(
        os.path.abspath(os.path.expanduser(config_path)),
        name="run.yaml",
    )

    ckpt_config = CheckpointConfig(
        save_steps=int(tc.checkpoint.training_state_checkpointing_steps or 0),
        keep_last=int(tc.checkpoint.checkpoints_total_limit or 0),
    )

    checkpoint_manager = CheckpointManager(
        method=method,
        dataloader=dataloader,
        output_dir=tc.checkpoint.output_dir,
        config=ckpt_config,
        callbacks=trainer.callbacks,
        raw_config=cfg.raw,
    )

    trainer.run(
        method,
        dataloader=dataloader,
        max_steps=tc.loop.max_train_steps,
        start_step=start_step,
        checkpoint_manager=checkpoint_manager,
    )