Convert a DCP training checkpoint to a diffusers-style model directory.
Works on a single GPU regardless of how many GPUs were used for training
(DCP handles resharding automatically).
Usage (no torchrun needed)::
python -m fastvideo.train.entrypoint.dcp_to_diffusers --checkpoint /path/to/checkpoint-1000 --output-dir /path/to/diffusers_output
Or with torchrun (also fine)::
torchrun --nproc_per_node=1 -m fastvideo.train.entrypoint.dcp_to_diffusers --checkpoint ... --output-dir ...
The checkpoint must contain metadata.json (written by
CheckpointManager). If the checkpoint predates metadata
support, pass --config explicitly to provide the training
YAML.
Functions
fastvideo.train.entrypoint.dcp_to_diffusers.convert
convert(*, checkpoint_dir: str, output_dir: str, config_path: str | None = None, role: str = 'student', overwrite: bool = False) -> str
Load a DCP checkpoint and export as a diffusers model.
Returns the path to the exported model directory.
Source code in fastvideo/train/entrypoint/dcp_to_diffusers.py
| def convert(
*,
checkpoint_dir: str,
output_dir: str,
config_path: str | None = None,
role: str = "student",
overwrite: bool = False,
) -> str:
"""Load a DCP checkpoint and export as a diffusers model.
Returns the path to the exported model directory.
"""
_ensure_distributed()
from fastvideo.distributed import (
maybe_init_distributed_environment_and_model_parallel, )
from fastvideo.train.utils.builder import build_from_config
from fastvideo.train.utils.checkpoint import (
CheckpointManager,
_resolve_resume_checkpoint,
)
from fastvideo.train.utils.config import (
RunConfig,
load_run_config,
)
import torch.distributed.checkpoint as dcp
# -- Resolve checkpoint directory --
resolved = _resolve_resume_checkpoint(
checkpoint_dir,
output_dir=checkpoint_dir,
)
dcp_dir = resolved / "dcp"
if not dcp_dir.is_dir():
raise FileNotFoundError(f"Missing dcp/ under {resolved}")
# -- Obtain config --
cfg: RunConfig
if config_path is not None:
cfg = load_run_config(config_path)
else:
metadata = CheckpointManager.load_metadata(resolved)
raw_config = metadata.get("config")
if raw_config is None:
raise ValueError("Checkpoint metadata.json does not "
"contain 'config'. Pass --config "
"explicitly.")
cfg = _run_config_from_raw(raw_config)
tc = cfg.training
# -- Init distributed (1 GPU is enough; DCP reshards) --
maybe_init_distributed_environment_and_model_parallel(
tp_size=1,
sp_size=1,
)
# Override distributed config so model loading uses 1 GPU.
tc.distributed.tp_size = 1
tc.distributed.sp_size = 1
tc.distributed.num_gpus = 1
tc.distributed.hsdp_replicate_dim = 1
tc.distributed.hsdp_shard_dim = 1
# -- Build model (loads pretrained weights + FSDP) --
_, method, _, _ = build_from_config(cfg)
# -- Load DCP weights into the model --
states = method.checkpoint_state()
logger.info(
"Loading DCP checkpoint from %s",
resolved,
)
dcp.load(states, checkpoint_id=str(dcp_dir))
# -- Export to diffusers format --
model = method._role_models[role]
base_model_path = str(tc.model_path)
if not base_model_path:
raise ValueError("Cannot determine base_model_path from "
"config. Ensure models.student.init_from "
"is set.")
logger.info(
"Exporting role=%s to %s (base=%s)",
role,
output_dir,
base_model_path,
)
result = _save_role_pretrained(
role=role,
base_model_path=base_model_path,
output_dir=output_dir,
overwrite=overwrite,
model=model,
)
logger.info("Export complete: %s", result)
return result
|