Bases: ComposedPipelineBase
GEN3C Video Generation Pipeline.
This pipeline extends Cosmos with 3D cache support for camera-controlled
video generation. When an input image is provided, it runs the full
3D cache conditioning pipeline (depth estimation -> point cloud ->
camera trajectory -> forward warping -> VAE encoding).
Source code in fastvideo/pipelines/composed_pipeline_base.py
| def __init__(self,
model_path: str,
fastvideo_args: FastVideoArgs | TrainingArgs,
required_config_modules: list[str] | None = None,
loaded_modules: dict[str, torch.nn.Module] | None = None):
"""
Initialize the pipeline. After __init__, the pipeline should be ready to
use. The pipeline should be stateless and not hold any batch state.
"""
self.fastvideo_args = fastvideo_args
self.model_path: str = model_path
self._stages: list[PipelineStage] = []
self._stage_name_mapping: dict[str, PipelineStage] = {}
if required_config_modules is not None:
self._required_config_modules = required_config_modules
if self._required_config_modules is None:
raise NotImplementedError("Subclass must set _required_config_modules")
maybe_init_distributed_environment_and_model_parallel(fastvideo_args.tp_size, fastvideo_args.sp_size)
# Torch profiler. Enabled and configured through env vars:
# FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
self.profiler_controller = get_or_create_profiler(trace_dir)
self.profiler = self.profiler_controller.profiler
self.local_rank = get_world_group().local_rank
# Load modules directly in initialization
logger.info("Loading pipeline modules...")
with self.profiler_controller.region("profiler_region_model_loading"):
self.modules = self.load_modules(fastvideo_args, loaded_modules)
|
Functions
fastvideo.pipelines.basic.gen3c.gen3c_pipeline.Gen3CPipeline.create_pipeline_stages
Set up pipeline stages with proper dependency injection.
Source code in fastvideo/pipelines/basic/gen3c/gen3c_pipeline.py
| def create_pipeline_stages(self, fastvideo_args: FastVideoArgs):
"""Set up pipeline stages with proper dependency injection."""
self.add_stage(stage_name="cfg_policy_stage", stage=Gen3CCFGPolicyStage())
self.add_stage(stage_name="input_validation_stage", stage=InputValidationStage())
self.add_stage(stage_name="prompt_encoding_stage",
stage=TextEncodingStage(
text_encoders=[self.get_module("text_encoder")],
tokenizers=[self.get_module("tokenizer")],
))
self.add_stage(stage_name="conditioning_stage", stage=Gen3CConditioningStage(vae=self.get_module("vae")))
self.add_stage(stage_name="timestep_preparation_stage",
stage=TimestepPreparationStage(scheduler=self.get_module("scheduler")))
self.add_stage(stage_name="latent_preparation_stage",
stage=Gen3CLatentPreparationStage(scheduler=self.get_module("scheduler"),
transformer=self.get_module("transformer"),
vae=self.get_module("vae")))
self.add_stage(stage_name="denoising_stage",
stage=Gen3CDenoisingStage(transformer=self.get_module("transformer"),
scheduler=self.get_module("scheduler")))
self.add_stage(stage_name="decoding_stage", stage=DecodingStage(vae=self.get_module("vae")))
|