Skip to content

hunyuan15_2sr_pipeline

Hunyuan video diffusion pipeline implementation.

This module contains an implementation of the Hunyuan video diffusion pipeline using the modular pipeline architecture.

Classes

fastvideo.pipelines.basic.hunyuan15.hunyuan15_2sr_pipeline.HunyuanVideo152SRPipeline

HunyuanVideo152SRPipeline(model_path: str, fastvideo_args: FastVideoArgs | TrainingArgs, required_config_modules: list[str] | None = None, loaded_modules: dict[str, Module] | None = None)

Bases: ComposedPipelineBase

Source code in fastvideo/pipelines/composed_pipeline_base.py
def __init__(self,
             model_path: str,
             fastvideo_args: FastVideoArgs | TrainingArgs,
             required_config_modules: list[str] | None = None,
             loaded_modules: dict[str, torch.nn.Module] | None = None):
    """
    Initialize the pipeline. After __init__, the pipeline should be ready to
    use. The pipeline should be stateless and not hold any batch state.
    """
    self.fastvideo_args = fastvideo_args

    self.model_path: str = model_path
    self._stages: list[PipelineStage] = []
    self._stage_name_mapping: dict[str, PipelineStage] = {}

    if required_config_modules is not None:
        self._required_config_modules = required_config_modules

    if self._required_config_modules is None:
        raise NotImplementedError("Subclass must set _required_config_modules")

    maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)

    # Torch profiler. Enabled and configured through env vars:
    # FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
    trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
    self.profiler_controller = get_or_create_profiler(trace_dir)
    self.profiler = self.profiler_controller.profiler

    self.local_rank = get_world_group().local_rank

    # Load modules directly in initialization
    logger.info("Loading pipeline modules...")
    with self.profiler_controller.region("profiler_region_model_loading"):
        self.modules = self.load_modules(fastvideo_args, loaded_modules)

Functions

fastvideo.pipelines.basic.hunyuan15.hunyuan15_2sr_pipeline.HunyuanVideo152SRPipeline.create_pipeline_stages
create_pipeline_stages(fastvideo_args: FastVideoArgs)

Set up pipeline stages with proper dependency injection.

Source code in fastvideo/pipelines/basic/hunyuan15/hunyuan15_2sr_pipeline.py
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs):
    """Set up pipeline stages with proper dependency injection."""

    self.add_stage(stage_name="input_validation_stage", stage=InputValidationStage())

    self.add_stage(stage_name="prompt_encoding_stage_primary",
                   stage=TextEncodingStage(
                       text_encoders=[self.get_module("text_encoder"),
                                      self.get_module("text_encoder_2")],
                       tokenizers=[self.get_module("tokenizer"),
                                   self.get_module("tokenizer_2")],
                   ))

    self.add_stage(stage_name="conditioning_stage", stage=ConditioningStage())

    self.add_stage(stage_name="timestep_preparation_stage",
                   stage=TimestepPreparationStage(scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="latent_preparation_stage",
                   stage=LatentPreparationStage(scheduler=self.get_module("scheduler"),
                                                transformer=self.get_module("transformer")))

    self.add_stage(stage_name="image_encoding_stage",
                   stage=Hy15ImageEncodingStage(image_encoder=None, image_processor=None))

    self.add_stage(stage_name="denoising_stage",
                   stage=DenoisingStage(transformer=self.get_module("transformer"),
                                        scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="sr_720p_latent_preparation_stage",
                   stage=LatentPreparationStage(scheduler=self.get_module("scheduler"),
                                                transformer=self.get_module("transformer_2")))

    self.add_stage(stage_name="sr_720p_denoising_stage",
                   stage=SRDenoisingStage(transformer=self.get_module("transformer_2"),
                                          scheduler=self.get_module("scheduler"),
                                          upsampler=self.get_module("upsampler")))

    self.add_stage(stage_name="sr_1080p_latent_preparation_stage",
                   stage=LatentPreparationStage(scheduler=self.get_module("scheduler"),
                                                transformer=self.get_module("transformer_3")))

    self.add_stage(stage_name="sr_1080p_denoising_stage",
                   stage=SRDenoisingStage(transformer=self.get_module("transformer_3"),
                                          scheduler=self.get_module("scheduler"),
                                          upsampler=self.get_module("upsampler_2")))

    self.add_stage(stage_name="decoding_stage", stage=DecodingStage(vae=self.get_module("vae")))
fastvideo.pipelines.basic.hunyuan15.hunyuan15_2sr_pipeline.HunyuanVideo152SRPipeline.forward
forward(batch: ForwardBatch, fastvideo_args: FastVideoArgs) -> ForwardBatch

Generate a video or image using the pipeline.

Parameters:

Name Type Description Default
batch ForwardBatch

The batch to generate from.

required
fastvideo_args FastVideoArgs

The inference arguments.

required

Returns: ForwardBatch: The batch with the generated video or image.

Source code in fastvideo/pipelines/basic/hunyuan15/hunyuan15_2sr_pipeline.py
@torch.no_grad()
def forward(
    self,
    batch: ForwardBatch,
    fastvideo_args: FastVideoArgs,
) -> ForwardBatch:
    """
    Generate a video or image using the pipeline.

    Args:
        batch: The batch to generate from.
        fastvideo_args: The inference arguments.
    Returns:
        ForwardBatch: The batch with the generated video or image.
    """
    if not self.post_init_called:
        self.post_init()

    self.get_module("transformer").to(get_local_torch_device())
    # Execute each stage
    logger.info("Running pipeline stages: %s", self._stage_name_mapping.keys())
    # logger.info("Batch: %s", batch)
    batch = self.input_validation_stage(batch, fastvideo_args)
    batch = self.prompt_encoding_stage_primary(batch, fastvideo_args)
    batch = self.conditioning_stage(batch, fastvideo_args)
    batch = self.timestep_preparation_stage(batch, fastvideo_args)
    batch = self.latent_preparation_stage(batch, fastvideo_args)
    batch = self.image_encoding_stage(batch, fastvideo_args)
    batch = self.denoising_stage(batch, fastvideo_args)
    self.get_module("transformer").to("cpu")

    # 720p SR
    self.get_module("transformer_2").to(get_local_torch_device())
    batch.lq_latents = batch.latents
    batch.latents = None
    batch.height = 720
    batch.width = 1280
    batch.num_inference_steps_sr = 6
    batch = self.sr_720p_latent_preparation_stage(batch, fastvideo_args)
    batch = self.image_encoding_stage(batch, fastvideo_args)
    batch = self.sr_720p_denoising_stage(batch, fastvideo_args)
    self.get_module("transformer_2").to("cpu")

    # 1080p SR
    self.get_module("transformer_3").to(get_local_torch_device())
    batch.lq_latents = batch.latents
    batch.latents = None
    batch.height = 1072
    batch.width = 1920
    batch.num_inference_steps_sr = 8
    batch = self.sr_1080p_latent_preparation_stage(batch, fastvideo_args)
    batch = self.image_encoding_stage(batch, fastvideo_args)
    batch = self.sr_1080p_denoising_stage(batch, fastvideo_args)
    self.get_module("transformer_3").to("cpu")

    start_time = time.time()
    batch = self.decoding_stage(batch, fastvideo_args)
    end_time = time.time()
    logger.info("Decoding time: %s seconds", end_time - start_time)

    # Return the output
    return batch

Functions