CogView3---CogView-3Plus-微调代码源码解析-四-

news/2024/10/23 9:24:56

CogView3 & CogView-3Plus 微调代码源码解析(四)

.\cogview3-finetune\sat\sgm\modules\diffusionmodules\sampling_utils.py

# 导入数学库以进行数学运算
import math
# 导入 PyTorch 库以进行张量操作
import torch
# 从 SciPy 库导入积分函数
from scipy import integrate# 从上级目录的 util 模块导入 append_dims 函数
from ...util import append_dims# 定义一个不进行动态阈值处理的类
class NoDynamicThresholding:# 定义类的调用方法def __call__(self, uncond, cond, scale, **kwargs):# 返回无条件和有条件的加权差值return uncond + scale * (cond - uncond)# 定义一个重新缩放阈值处理的类
class RescaleThresholding:# 初始化类,设置 phi 的默认值def __init__(self, phi=0.7):# 保存 phi 参数self.phi = phi# 定义类的调用方法def __call__(self, uncond, cond, scale, **kwargs):# 计算去噪后的配置denoised_cfg = uncond + scale * (cond - uncond)# 计算条件和去噪配置的标准差sigma_pos, sigma_cfg = cond.std(), denoised_cfg.std()# 计算缩放因子factor = self.phi * sigma_pos / sigma_cfg + (1 - self.phi)# 根据因子调整去噪结果denoised_final = denoised_cfg * factor# 返回最终去噪结果return denoised_final# 定义一个动态阈值处理的类
class DynamicThresholding:# 定义可选模式的列表Modes = ["Constant", "Linear Up", "Linear Down", "Half Cosine Up", "Half Cosine Down", "Power Up", "Power Down", "Cosine Down","Cosine Up"]# 初始化类并设置参数def __init__(self, interpret_mode, scale_min = 3,mimic_interpret_mode = 'Constant',mimic_scale = 3, mimic_scale_min = 3, threshold_percentile = 1.0,phi = 1.0,separate_feature_channels = True,measure = 'AD',scaling_startpoint = 'ZERO',):# 验证解释模式是否在可选模式中assert interpret_mode in self.Modes# 验证模仿解释模式是否在可选模式中assert mimic_interpret_mode in self.Modes# 验证测量方法是否合法assert measure in ['AD', 'STD']# 验证缩放起点是否合法assert scaling_startpoint in ['ZERO', 'MEAN']# 保存各种初始化参数self.mode = interpret_modeself.mimic_mode = mimic_interpret_modeself.scale_min = scale_minself.mimic_scale = mimic_scaleself.mimic_scale_min = mimic_scale_minself.threshold_percentile = threshold_percentileself.phi = phiself.separate_feature_channels = separate_feature_channelsself.measure = measureself.scaling_startpoint = scaling_startpoint# 定义解释缩放的方法def interpret_scale(self, mode, scale, scale_min, step, num_steps):"""num_steps = 50step from 0 to 50."""# 将缩放值减去最小缩放值scale -= scale_min# 计算当前步骤的比例frac = step / num_steps# 根据模式调整缩放值if mode == 'Constant':passelif mode == "Linear Up":scale *= fracelif mode == "Linear Down":scale *= 1.0 - fracelif mode == "Half Cosine Up":scale *= 1.0 - math.cos(frac)elif mode == "Half Cosine Down":scale *= math.cos(frac)elif mode == "Cosine Down":scale *= math.cos(frac * 1.5707)elif mode == "Cosine Up":scale *= 1.0 - math.cos(frac * 1.5707)elif mode == "Power Up":scale *= math.pow(frac, 2.0)elif mode == "Power Down":scale *= 1.0 - math.pow(frac, 2.0)# 将调整后的缩放值加回最小缩放值scale += scale_min# 返回最终的缩放值return scale# 定义调用方法,接受无条件和条件输入,以及缩放和步骤参数def __call__(self, uncond, cond, scale, step, num_steps):# 根据当前模式解释缩放参数,计算 cfg_scalecfg_scale = self.interpret_scale(self.mode, scale, self.scale_min, step, num_steps)# 根据模拟模式解释缩放参数,计算 mimic_cfg_scalemimic_cfg_scale = self.interpret_scale(self.mimic_mode, self.mimic_scale, self.mimic_scale_min, step, num_steps)# 计算 x,作为无条件输入和条件输入之间的线性插值x = uncond + cfg_scale*(cond - uncond)# 计算 mimic_x,作为无条件输入和条件输入之间的线性插值,使用 mimic_cfg_scalemimic_x = uncond + mimic_cfg_scale*(cond - uncond)  # 将 x 展平,以便于后续操作x_flattened = x.flatten(2)# 将 mimic_x 展平,以便于后续操作mimic_x_flattened = mimic_x.flatten(2)# 根据缩放起始点的选择,计算均值并中心化if self.scaling_startpoint == 'MEAN':# 计算 x 的均值,保留维度x_means = x_flattened.mean(dim=2, keepdim = True)# 计算 mimic_x 的均值,保留维度mimic_x_means = mimic_x_flattened.mean(dim=2, keepdim = True)# 通过均值中心化 xx_centered = x_flattened - x_means# 通过均值中心化 mimic_xmimic_x_centered = mimic_x_flattened - mimic_x_meanselse:# 如果不使用均值中心化,直接赋值x_centered = x_flattenedmimic_x_centered = mimic_x_flattened# 根据是否分开特征通道的选项,计算尺度参考if self.separate_feature_channels:# 如果测量方式为绝对差异if self.measure == 'AD':# 计算 x 的绝对差异的分位数作为尺度参考x_scaleref = torch.quantile(x_centered.abs(), self.threshold_percentile, dim=2, keepdim = True)# 计算 mimic_x 的绝对差异的最大值作为尺度参考mimic_x_scaleref = mimic_x_centered.abs().max(dim=2, keepdim = True).values# 如果测量方式为标准差elif self.measure == 'STD':# 计算 x 的标准差作为尺度参考x_scaleref = x_centered.std(dim=2, keepdim = True)# 计算 mimic_x 的标准差作为尺度参考mimic_x_scaleref = mimic_x_centered.std(dim=2, keepdim = True)else:# 如果不分开特征通道if self.measure == 'AD':# 计算 x 的绝对差异的分位数作为尺度参考x_scaleref = torch.quantile(x_centered.abs(), self.threshold_percentile)# 计算 mimic_x 的绝对差异的最大值作为尺度参考mimic_x_scaleref = mimic_x_centered.abs().max()# 如果测量方式为标准差elif self.measure == 'STD':# 计算 x 的标准差作为尺度参考x_scaleref = x_centered.std()# 计算 mimic_x 的标准差作为尺度参考mimic_x_scaleref = mimic_x_centered.std()# 根据测量方式调整 x 的尺度if self.measure == 'AD':# 计算 x_scaleref 和 mimic_x_scaleref 的最大值max_scaleref = torch.maximum(x_scaleref, mimic_x_scaleref)# 限制 x_centered 的值在 [-max_scaleref, max_scaleref] 范围内x_clamped = x_centered.clamp(-max_scaleref, max_scaleref)# 重新归一化 xx_renormed = x_clamped * (mimic_x_scaleref / max_scaleref)elif self.measure == 'STD':# 重新归一化 xx_renormed = x_centered * (mimic_x_scaleref / x_scaleref)# 根据缩放起始点选择调整 x_dyn 的值if self.scaling_startpoint == 'MEAN':# 将均值与重新归一化的结果相加x_dyn = x_means + x_renormedelse:# 直接使用重新归一化的结果x_dyn = x_renormed# 反展平 x_dyn,恢复原始形状x_dyn = x_dyn.unflatten(2, x.shape[2:])# 返回加权和结果return self.phi*x_dyn + (1-self.phi)*x
# 定义一个线性多步系数函数,接受阶数、时间点和索引等参数
def linear_multistep_coeff(order, t, i, j, epsrel=1e-4):# 检查阶数是否超出当前步骤的限制,超出则抛出错误if order - 1 > i:raise ValueError(f"Order {order} too high for step {i}")# 定义内部函数,用于计算特定tau下的乘积def fn(tau):prod = 1.0  # 初始化乘积为1# 遍历从0到阶数的每个kfor k in range(order):# 跳过与j相等的kif j == k:continue# 计算乘积,基于给定的tau和时间点prod *= (tau - t[i - k]) / (t[i - j] - t[i - k])return prod  # 返回计算得到的乘积# 使用数值积分计算fn在时间点[i]到[i+1]之间的积分,返回积分值return integrate.quad(fn, t[i], t[i + 1], epsrel=epsrel)[0]# 定义一个函数,用于计算祖先步骤的sigma值
def get_ancestral_step(sigma_from, sigma_to, eta=1.0):# 如果eta为0,则返回sigma_to和0.0if not eta:return sigma_to, 0.0# 计算sigma_up,限制为sigma_to与特定表达式的较小值sigma_up = torch.minimum(sigma_to,eta* (sigma_to**2 * (sigma_from**2 - sigma_to**2) / sigma_from**2) ** 0.5,)# 计算sigma_down,基于sigma_to和sigma_upsigma_down = (sigma_to**2 - sigma_up**2) ** 0.5# 返回计算得到的sigma_down和sigma_upreturn sigma_down, sigma_up# 定义一个函数,将去噪后的图像与输入图像进行处理,返回标准化结果
def to_d(x, sigma, denoised):# 计算去噪后的结果与输入的差异,并除以sigma的扩展维度return (x - denoised) / append_dims(sigma, x.ndim)# 定义一个函数,将sigma转换为负对数形式
def to_neg_log_sigma(sigma):# 计算sigma的对数并取负值return sigma.log().neg()# 定义一个函数,将负对数sigma转换为sigma
def to_sigma(neg_log_sigma):# 取负值并计算指数,得到sigmareturn neg_log_sigma.neg().exp()

.\cogview3-finetune\sat\sgm\modules\diffusionmodules\sigma_sampling.py

# 导入 PyTorch 库
import torch# 从相对路径导入 default 和 instantiate_from_config 函数
from ...util import default, instantiate_from_config# 定义 EDMSampling 类
class EDMSampling:# 初始化方法,设置均值和标准差def __init__(self, p_mean=-1.2, p_std=1.2):# 保存均值到实例变量self.p_mean = p_mean# 保存标准差到实例变量self.p_std = p_std# 定义调用方法,允许类实例像函数一样被调用def __call__(self, n_samples, rand=None):# 计算对数标准差,根据随机数生成 log_sigmalog_sigma = self.p_mean + self.p_std * default(rand, torch.randn((n_samples,)))# 返回 log_sigma 的指数值return log_sigma.exp()# 定义 DiscreteSampling 类
class DiscreteSampling:# 初始化方法,设置离散化配置和其他参数def __init__(self, discretization_config, num_idx, do_append_zero=False, flip=True, low_bound=0, up_bound=1):# 保存索引数量到实例变量self.num_idx = num_idx# 根据配置实例化 sigma 对象self.sigmas = instantiate_from_config(discretization_config)(num_idx, do_append_zero=do_append_zero, flip=flip)# 计算并保存下界和上界self.low_bound = int(low_bound * num_idx)self.up_bound = int(up_bound * num_idx)# 打印采样范围print(f'sigma sampling from {self.low_bound} to {self.up_bound}')# 将索引转换为 sigma 值def idx_to_sigma(self, idx):# 返回对应索引的 sigmareturn self.sigmas[idx]# 定义调用方法def __call__(self, n_samples, rand=None, return_idx=False):# 生成随机索引,如果没有提供随机数则使用默认随机数idx = default(rand,torch.randint(self.low_bound, self.up_bound, (n_samples,)),)# 根据 return_idx 参数决定返回 sigma 值或索引if return_idx:return self.idx_to_sigma(idx), idxelse:return self.idx_to_sigma(idx)# 定义 PartialDiscreteSampling 类
class PartialDiscreteSampling:# 初始化方法,设置完整和部分索引数量def __init__(self, discretization_config, total_num_idx, partial_num_idx, do_append_zero=False, flip=True):# 保存总索引数量到实例变量self.total_num_idx = total_num_idx# 保存部分索引数量到实例变量self.partial_num_idx = partial_num_idx# 根据配置实例化 sigma 对象self.sigmas = instantiate_from_config(discretization_config)(total_num_idx, do_append_zero=do_append_zero, flip=flip)# 将索引转换为 sigma 值def idx_to_sigma(self, idx):# 返回对应索引的 sigmareturn self.sigmas[idx]# 定义调用方法def __call__(self, n_samples, rand=None):# 生成随机索引,根据部分索引数量限制随机范围idx = default(rand,# torch.randint(self.total_num_idx-self.partial_num_idx, self.total_num_idx, (n_samples,)),torch.randint(0, self.partial_num_idx, (n_samples,)),)# 返回对应的 sigma 值return self.idx_to_sigma(idx)

.\cogview3-finetune\sat\sgm\modules\diffusionmodules\util.py

"""
引用自
https://github.com/openai/improved-diffusion/blob/main/improved_diffusion/gaussian_diffusion.py
和
https://github.com/lucidrains/denoising-diffusion-pytorch/blob/7706bdfc6f527f58d33f84b7b522e61e6e3164b3/denoising_diffusion_pytorch/denoising_diffusion_pytorch.py
和
https://github.com/openai/guided-diffusion/blob/0ba878e517b276c45d1195eb29f6f5f72659a05b/guided_diffusion/nn.py感谢!
"""# 导入数学库以执行数学运算
import math
# 从 typing 模块导入可选类型
from typing import Optional# 导入 PyTorch 库
import torch
# 导入 PyTorch 的神经网络模块
import torch.nn as nn
# 从 einops 库导入 rearrange 和 repeat 函数,用于张量操作
from einops import rearrange, repeat# 创建一个 beta 调度函数
def make_beta_schedule(schedule,  # 调度类型n_timestep,  # 时间步数linear_start=1e-4,  # 线性调度的起始值linear_end=2e-2,  # 线性调度的结束值
):# 如果调度类型为线性if schedule == "linear":# 生成从起始值到结束值的线性空间并平方betas = (torch.linspace(linear_start**0.5, linear_end**0.5, n_timestep, dtype=torch.float64)** 2)# 返回生成的 beta 值作为 NumPy 数组return betas.numpy()# 将张量中的值提取到指定形状的输出张量中
def extract_into_tensor(a, t, x_shape):# 解构 t 的形状,获取批大小b, *_ = t.shape# 从 a 中根据 t 的索引提取值out = a.gather(-1, t)# 将输出调整为指定的形状return out.reshape(b, *((1,) * (len(x_shape) - 1)))# 混合检查点功能的定义
def mixed_checkpoint(func, inputs: dict, params, flag):"""评估函数而不缓存中间激活,从而减少内存使用,但在反向传播中增加计算量。与原始的检查点函数不同,它也可以处理非张量输入。:param func: 要评估的函数。:param inputs: 传递给 `func` 的参数字典。:param params: `func` 依赖但不明确作为参数传递的参数序列。:param flag: 如果为 False,则禁用梯度检查点。"""# 如果标志为真if flag:# 获取所有张量键tensor_keys = [key for key in inputs if isinstance(inputs[key], torch.Tensor)]# 获取所有张量输入tensor_inputs = [inputs[key] for key in inputs if isinstance(inputs[key], torch.Tensor)]# 获取所有非张量键non_tensor_keys = [key for key in inputs if not isinstance(inputs[key], torch.Tensor)]# 获取所有非张量输入non_tensor_inputs = [inputs[key] for key in inputs if not isinstance(inputs[key], torch.Tensor)]# 将所有输入和参数组合成元组args = tuple(tensor_inputs) + tuple(non_tensor_inputs) + tuple(params)# 调用混合检查点功能并返回结果return MixedCheckpointFunction.apply(func,len(tensor_inputs),len(non_tensor_inputs),tensor_keys,non_tensor_keys,*args,)else:# 直接调用函数并返回结果return func(**inputs)# 定义混合检查点功能的类
class MixedCheckpointFunction(torch.autograd.Function):@staticmethod# 定义前向传播方法def forward(ctx,  # 上下文对象run_function,  # 要运行的函数length_tensors,  # 张量的长度length_non_tensors,  # 非张量的长度tensor_keys,  # 张量的键non_tensor_keys,  # 非张量的键*args,  # 其他参数):# 设置结束张量的数量ctx.end_tensors = length_tensors# 设置结束非张量的数量ctx.end_non_tensors = length_tensors + length_non_tensors# 初始化 GPU 自动混合精度参数ctx.gpu_autocast_kwargs = {"enabled": torch.is_autocast_enabled(),  # 检查自动混合精度是否启用"dtype": torch.get_autocast_gpu_dtype(),  # 获取当前自动混合精度数据类型"cache_enabled": torch.is_autocast_cache_enabled(),  # 检查缓存是否启用}# 确保张量键和非张量键的数量与传入长度一致assert (len(tensor_keys) == length_tensorsand len(non_tensor_keys) == length_non_tensors)# 将输入张量映射到字典,键为 tensor_keys,值为相应的 argsctx.input_tensors = {key: val for (key, val) in zip(tensor_keys, list(args[: ctx.end_tensors]))}# 将输入非张量映射到字典,键为 non_tensor_keys,值为相应的 argsctx.input_non_tensors = {key: valfor (key, val) in zip(non_tensor_keys, list(args[ctx.end_tensors : ctx.end_non_tensors]))}# 保存运行函数ctx.run_function = run_function# 获取剩余输入参数ctx.input_params = list(args[ctx.end_non_tensors :])# 在不计算梯度的上下文中运行with torch.no_grad():# 调用运行函数并传入输入张量和非张量output_tensors = ctx.run_function(**ctx.input_tensors, **ctx.input_non_tensors)# 返回输出张量return output_tensors@staticmethoddef backward(ctx, *output_grads):# 将输入张量中的所有张量设为需要梯度ctx.input_tensors = {key: ctx.input_tensors[key].detach().requires_grad_(True)for key in ctx.input_tensors}# 启用梯度计算并设置自动混合精度上下文with torch.enable_grad(), torch.cuda.amp.autocast(**ctx.gpu_autocast_kwargs):# 创建输入张量的浅拷贝以避免原地修改shallow_copies = {key: ctx.input_tensors[key].view_as(ctx.input_tensors[key])for key in ctx.input_tensors}# shallow_copies.update(additional_args)# 调用运行函数并传入浅拷贝和非张量输入output_tensors = ctx.run_function(**shallow_copies, **ctx.input_non_tensors)# 计算输出张量相对于输入张量和参数的梯度input_grads = torch.autograd.grad(output_tensors,list(ctx.input_tensors.values()) + ctx.input_params,output_grads,allow_unused=True,)# 删除上下文中的输入张量和参数del ctx.input_tensorsdel ctx.input_paramsdel output_tensors# 返回梯度和填充的 None 值return ((None, None, None, None, None)+ input_grads[: ctx.end_tensors]+ (None,) * (ctx.end_non_tensors - ctx.end_tensors)+ input_grads[ctx.end_tensors :])
# 定义一个检查点函数,用于评估给定的函数,降低内存使用,同时增加计算开销
def checkpoint(func, inputs, params, flag):"""评估一个函数,避免缓存中间激活,减少内存使用,但在反向传播时增加计算量。:param func: 要评估的函数。:param inputs: 传递给 `func` 的参数序列。:param params: `func` 依赖的参数序列,但并不显式作为参数接受。:param flag: 如果为 False,禁用梯度检查点。"""# 如果 flag 为 True,启用梯度检查点if flag:# 将输入参数和依赖参数组合成一个元组args = tuple(inputs) + tuple(params)# 应用检查点函数,返回计算结果return CheckpointFunction.apply(func, len(inputs), *args)else:# 如果 flag 为 False,直接调用 func 函数return func(*inputs)# 定义检查点函数的类,继承自 torch.autograd.Function
class CheckpointFunction(torch.autograd.Function):# 定义前向传播的静态方法@staticmethoddef forward(ctx, run_function, length, *args):# 将运行的函数保存到上下文中ctx.run_function = run_function# 保存输入张量(前 length 个参数)ctx.input_tensors = list(args[:length])# 保存输入参数(后面的参数)ctx.input_params = list(args[length:])# 获取当前的 GPU 自动混合精度设置ctx.gpu_autocast_kwargs = {"enabled": torch.is_autocast_enabled(),"dtype": torch.get_autocast_gpu_dtype(),"cache_enabled": torch.is_autocast_cache_enabled(),}# 在不计算梯度的情况下运行函数with torch.no_grad():output_tensors = ctx.run_function(*ctx.input_tensors)# 返回输出张量return output_tensors# 定义反向传播的静态方法@staticmethoddef backward(ctx, *output_grads):# 将输入张量分离,并设置为需要梯度ctx.input_tensors = [x.detach().requires_grad_(True) for x in ctx.input_tensors]# 启用梯度计算并设置自动混合精度with torch.enable_grad(), torch.cuda.amp.autocast(**ctx.gpu_autocast_kwargs):# 解决一个 bug,确保运行函数的第一个操作不会修改分离的张量存储shallow_copies = [x.view_as(x) for x in ctx.input_tensors]# 使用浅拷贝运行函数以获取输出张量output_tensors = ctx.run_function(*shallow_copies)# 计算输入梯度input_grads = torch.autograd.grad(output_tensors,ctx.input_tensors + ctx.input_params,output_grads,allow_unused=True,)# 删除上下文中的输入张量del ctx.input_tensors# 删除上下文中的输入参数del ctx.input_params# 删除输出张量del output_tensors# 返回 None 和输入梯度return (None, None) + input_grads# 定义时间步嵌入的函数
def timestep_embedding(timesteps, dim, max_period=10000, repeat_only=False, dtype=torch.float32):"""创建正弦时间步嵌入。:param timesteps: 一个 1-D 张量,包含每个批次元素的 N 个索引。这些索引可以是小数。:param dim: 输出的维度。:param max_period: 控制嵌入的最小频率。:return: 一个形状为 [N x dim] 的位置嵌入张量。"""# 如果不只是重复时序if not repeat_only:# 计算半个维度,用于频率计算half = dim // 2# 生成频率数组,基于最大周期和半个维度freqs = torch.exp(-math.log(max_period)  # 计算最大周期的对数* torch.arange(start=0, end=half, dtype=torch.float32)  # 生成从0到half的浮点数数组/ half  # 归一化,使频率在0到1之间).to(device=timesteps.device)  # 将频率数组移动到与timesteps相同的设备# 计算时序与频率的乘积,准备嵌入向量args = timesteps[:, None].float() * freqs[None]# 通过计算余弦和正弦生成嵌入向量embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1)# 如果维度是奇数,添加额外的零向量if dim % 2:embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1  # 在最后一维追加零向量)# 如果是重复时序,生成与时序相同的嵌入else:embedding = repeat(timesteps, "b -> b d", d=dim)  # 根据时序生成重复的嵌入向量# 返回嵌入向量并设置数据类型return embedding.to(dtype)  # 将嵌入向量转换为指定的数据类型
# 将模块的参数归零并返回该模块
def zero_module(module):"""Zero out the parameters of a module and return it."""# 遍历模块的所有参数for p in module.parameters():# 分离参数并将其值归零p.detach().zero_()# 返回修改后的模块return module# 对模块的参数进行缩放并返回该模块
def scale_module(module, scale):"""Scale the parameters of a module and return it."""# 遍历模块的所有参数for p in module.parameters():# 分离参数并按比例缩放p.detach().mul_(scale)# 返回修改后的模块return module# 计算张量中所有非批次维度的平均值
def mean_flat(tensor):"""Take the mean over all non-batch dimensions."""# 计算并返回张量在非批次维度上的平均值return tensor.mean(dim=list(range(1, len(tensor.shape))))# 创建标准化层
def normalization(channels):"""Make a standard normalization layer.:param channels: number of input channels.:return: an nn.Module for normalization."""# 返回一个具有指定输入通道数的GroupNorm32标准化层return GroupNorm32(32, channels)# SiLU激活函数类,兼容PyTorch 1.5
class SiLU(nn.Module):# 定义前向传播方法def forward(self, x):# 返回输入与其Sigmoid值的乘积return x * torch.sigmoid(x)# 自定义GroupNorm类,继承自nn.GroupNorm
class GroupNorm32(nn.GroupNorm):# 定义前向传播方法def forward(self, x):# 调用父类的前向方法并返回与输入相同的数据类型return super().forward(x).type(x.dtype)# 创建1D、2D或3D卷积模块
def conv_nd(dims, *args, **kwargs):"""Create a 1D, 2D, or 3D convolution module."""# 根据维度选择对应的卷积层if dims == 1:return nn.Conv1d(*args, **kwargs)elif dims == 2:return nn.Conv2d(*args, **kwargs)elif dims == 3:return nn.Conv3d(*args, **kwargs)# 如果维度不支持,抛出错误raise ValueError(f"unsupported dimensions: {dims}")# 创建线性模块
def linear(*args, **kwargs):"""Create a linear module."""# 返回一个线性层return nn.Linear(*args, **kwargs)# 创建1D、2D或3D平均池化模块
def avg_pool_nd(dims, *args, **kwargs):"""Create a 1D, 2D, or 3D average pooling module."""# 根据维度选择对应的平均池化层if dims == 1:return nn.AvgPool1d(*args, **kwargs)elif dims == 2:return nn.AvgPool2d(*args, **kwargs)elif dims == 3:return nn.AvgPool3d(*args, **kwargs)# 如果维度不支持,抛出错误raise ValueError(f"unsupported dimensions: {dims}")# AlphaBlender类,用于合并不同策略
class AlphaBlender(nn.Module):# 支持的合并策略strategies = ["learned", "fixed", "learned_with_images"]# 初始化方法def __init__(self,alpha: float,merge_strategy: str = "learned_with_images",rearrange_pattern: str = "b t -> (b t) 1 1",):super().__init__()  # 调用父类初始化# 保存合并策略和重排模式self.merge_strategy = merge_strategyself.rearrange_pattern = rearrange_pattern# 确保合并策略是支持的选项之一assert (merge_strategy in self.strategies), f"merge_strategy needs to be in {self.strategies}"# 根据合并策略注册混合因子if self.merge_strategy == "fixed":# 注册固定混合因子self.register_buffer("mix_factor", torch.Tensor([alpha]))elif (self.merge_strategy == "learned"or self.merge_strategy == "learned_with_images"):# 注册可学习的混合因子self.register_parameter("mix_factor", torch.nn.Parameter(torch.Tensor([alpha])))else:# 抛出不支持的合并策略错误raise ValueError(f"unknown merge strategy {self.merge_strategy}")# 定义获取 alpha 值的函数,接受一个图像指示器作为输入,返回一个张量def get_alpha(self, image_only_indicator: torch.Tensor) -> torch.Tensor:# 根据合并策略选择 alpha 值if self.merge_strategy == "fixed":# 如果策略是固定,则 alpha 等于混合因子alpha = self.mix_factorelif self.merge_strategy == "learned":# 如果策略是学习的,则通过 sigmoid 函数计算 alphaalpha = torch.sigmoid(self.mix_factor)elif self.merge_strategy == "learned_with_images":# 如果策略是基于图像学习的,确保提供了图像指示器assert image_only_indicator is not None, "need image_only_indicator ..."# 根据图像指示器的布尔值,决定 alpha 的值alpha = torch.where(image_only_indicator.bool(),# 如果为真,则 alpha 为全 1 的张量torch.ones(1, 1, device=image_only_indicator.device),# 否则通过 sigmoid 函数处理混合因子,并调整维度rearrange(torch.sigmoid(self.mix_factor), "... -> ... 1"),)# 根据给定的重排模式调整 alpha 的形状alpha = rearrange(alpha, self.rearrange_pattern)else:# 如果没有匹配的合并策略,抛出未实现的错误raise NotImplementedError# 返回计算得到的 alpha 值return alpha# 定义前向传播的函数,接受空间和时间的输入张量,返回一个张量def forward(self,x_spatial: torch.Tensor,x_temporal: torch.Tensor,image_only_indicator: Optional[torch.Tensor] = None,) -> torch.Tensor:# 调用 get_alpha 函数获取 alpha 值alpha = self.get_alpha(image_only_indicator)# 根据 alpha 值和输入张量进行加权求和x = (# x_spatial 乘以 alpha,转换为相同的数据类型alpha.to(x_spatial.dtype) * x_spatial+ # (1.0 - alpha) 乘以 x_temporal,转换为相同的数据类型(1.0 - alpha).to(x_spatial.dtype) * x_temporal)# 返回加权后的结果张量return x

.\cogview3-finetune\sat\sgm\modules\diffusionmodules\wrappers.py

# 导入 PyTorch 库及其神经网络模块
import torch
import torch.nn as nn
# 导入版本控制模块
from packaging import version# 定义一个字符串,表示 OpenAI Wrapper 的路径
OPENAIUNETWRAPPER = "sgm.modules.diffusionmodules.wrappers.OpenAIWrapper"# 定义一个身份包装器类,继承自 nn.Module
class IdentityWrapper(nn.Module):# 初始化方法,接受扩散模型、是否编译模型的标志和数据类型def __init__(self, diffusion_model, compile_model: bool = False, dtype: torch.dtype = torch.float32):# 调用父类的初始化方法super().__init__()# 根据 PyTorch 版本决定是否编译模型compile = (torch.compileif (version.parse(torch.__version__) >= version.parse("2.0.0"))  # 检查 PyTorch 版本and compile_model  # 仅当 compile_model 为 True 时else lambda x: x  # 否则返回原始模型)# 对扩散模型进行编译self.diffusion_model = compile(diffusion_model)# 保存数据类型self.dtype = dtype# 前向传播方法,接受任意数量的位置和关键字参数def forward(self, *args, **kwargs):# 调用扩散模型并返回结果return self.diffusion_model(*args, **kwargs)# 定义 OpenAI Wrapper 类,继承自 IdentityWrapper
class OpenAIWrapper(IdentityWrapper):# 重写前向传播方法,接受输入张量、时间步、上下文字典和其他参数def forward(self, x: torch.Tensor, t: torch.Tensor, c: dict, **kwargs) -> torch.Tensor:# 将上下文字典中的每个值转换为指定的数据类型for key in c:c[key] = c[key].to(self.dtype)# 检查输入张量的形状是否为 3 维if len(x.shape) == 3:# 在最后一个维度拼接上下文中的 "concat" 数据x = torch.cat((x, c.get("concat", torch.Tensor([]).type_as(x))), dim=-1)else:# 在第一个维度拼接上下文中的 "concat" 数据x = torch.cat((x, c.get("concat", torch.Tensor([]).type_as(x))), dim=1)# 调用扩散模型进行前向传播,并返回结果return self.diffusion_model(x,  # 输入张量timesteps=t,  # 时间步context=c.get("crossattn", None),  # 上下文中的交叉注意力y=c.get("vector", None),  # 上下文中的向量**kwargs,  # 其他参数)

.\cogview3-finetune\sat\sgm\modules\diffusionmodules\__init__.py

# 从当前包导入去噪声器类
from .denoiser import Denoiser
# 从当前包导入离散化类
from .discretizer import Discretization
# 从当前包导入标准扩散损失类
from .loss import StandardDiffusionLoss
# 从当前包导入解码器、编码器和模型类
from .model import Decoder, Encoder, Model
# 从当前包导入 UNet 模型类
from .openaimodel import UNetModel
# 从当前包导入基础扩散采样器类
from .sampling import BaseDiffusionSampler
# 从当前包导入 OpenAI 封装器类
from .wrappers import OpenAIWrapper

.\cogview3-finetune\sat\sgm\modules\distributions\distributions.py

# 导入 NumPy 库,通常用于数值计算
import numpy as np
# 导入 PyTorch 库,主要用于深度学习计算
import torch# 定义抽象分布类,继承自基础类
class AbstractDistribution:# 抽象方法,样本生成def sample(self):# 抛出未实现错误,子类需实现此方法raise NotImplementedError()# 抽象方法,返回分布的众数def mode(self):# 抛出未实现错误,子类需实现此方法raise NotImplementedError()# 定义 Dirac 分布类,继承自抽象分布类
class DiracDistribution(AbstractDistribution):# 初始化方法,接收一个值作为分布的值def __init__(self, value):# 将传入的值存储为实例变量self.value = value# 实现样本生成方法def sample(self):# 返回 Dirac 分布的值return self.value# 实现众数方法def mode(self):# 返回 Dirac 分布的值return self.value# 定义对角高斯分布类
class DiagonalGaussianDistribution(object):# 初始化方法,接收参数和一个决定是否确定性的标志def __init__(self, parameters, deterministic=False):# 将参数存储为实例变量self.parameters = parameters# 将参数分割成均值和对数方差self.mean, self.logvar = torch.chunk(parameters, 2, dim=1)# 限制对数方差在 -30 到 20 之间self.logvar = torch.clamp(self.logvar, -30.0, 20.0)# 存储确定性标志self.deterministic = deterministic# 计算标准差self.std = torch.exp(0.5 * self.logvar)# 计算方差self.var = torch.exp(self.logvar)# 如果是确定性,方差和标准差设为零if self.deterministic:self.var = self.std = torch.zeros_like(self.mean).to(device=self.parameters.device)# 实现样本生成方法def sample(self):# x = self.mean + self.std * torch.randn(self.mean.shape).to(#     device=self.parameters.device# )# 生成样本,遵循均值和标准差的分布x = self.mean + self.std * torch.randn_like(self.mean).to(device=self.parameters.device)# 返回生成的样本return x# 实现 KL 散度计算方法def kl(self, other=None):# 如果是确定性,返回 0.0 的张量if self.deterministic:return torch.Tensor([0.0])else:# 如果没有其他分布,计算与标准正态分布的 KL 散度if other is None:return 0.5 * torch.sum(torch.pow(self.mean, 2) + self.var - 1.0 - self.logvar,dim=[1, 2, 3],)else:# 计算与另一分布的 KL 散度return 0.5 * torch.sum(torch.pow(self.mean - other.mean, 2) / other.var+ self.var / other.var- 1.0- self.logvar+ other.logvar,dim=[1, 2, 3],)# 实现负对数似然计算方法def nll(self, sample, dims=[1, 2, 3]):# 如果是确定性,返回 0.0 的张量if self.deterministic:return torch.Tensor([0.0])# 计算 2π 的对数值logtwopi = np.log(2.0 * np.pi)# 计算负对数似然并返回return 0.5 * torch.sum(logtwopi + self.logvar + torch.pow(sample - self.mean, 2) / self.var,dim=dims,)# 实现众数计算方法def mode(self):# 返回均值作为众数return self.mean# 定义计算两个高斯分布之间 KL 散度的函数
def normal_kl(mean1, logvar1, mean2, logvar2):"""来源: https://github.com/openai/guided-diffusion/blob/27c20a8fab9cb472df5d6bdd6c8d11c8f430b924/guided_diffusion/losses.py#L12计算两个高斯分布之间的 KL 散度。形状会自动广播,因此批次可以与标量等进行比较,适用于其他用例。"""# 初始化张量变量tensor = None# 遍历四个输入对象,找到第一个张量for obj in (mean1, logvar1, mean2, logvar2):if isinstance(obj, torch.Tensor):tensor = objbreak# 确保至少一个输入是张量assert tensor is not None, "at least one argument must be a Tensor"# 强制方差为张量类型。广播帮助将标量转换为# 张量,但对 torch.exp() 不起作用。# 将 logvar1 和 logvar2 进行处理,确保它们都是张量类型logvar1, logvar2 = [# 如果 x 是张量,保持不变;否则,将 x 转换为张量并移动到指定设备x if isinstance(x, torch.Tensor) else torch.tensor(x).to(tensor)for x in (logvar1, logvar2)  # 遍历 logvar1 和 logvar2]# 计算并返回一个值,公式由多个部分组成return 0.5 * (# -1.0 表示计算的偏移量-1.0+ logvar2  # 加上 logvar2 的值- logvar1  # 减去 logvar1 的值+ torch.exp(logvar1 - logvar2)  # 加上 logvar1 和 logvar2 之差的指数+ ((mean1 - mean2) ** 2) * torch.exp(-logvar2)  # 加上均值差的平方乘以 logvar2 的负指数)

.\cogview3-finetune\sat\sgm\modules\distributions\__init__.py

# 代码段为空,无法进行注释

.\cogview3-finetune\sat\sgm\modules\ema.py

# 导入 PyTorch 库
import torch
# 从 PyTorch 导入神经网络模块
from torch import nn# 定义一个继承自 nn.Module 的类 LitEma
class LitEma(nn.Module):# 初始化函数,接收模型、衰减率和是否使用更新计数def __init__(self, model, decay=0.9999, use_num_upates=True):# 调用父类初始化super().__init__()# 检查衰减率是否在 0 到 1 之间if decay < 0.0 or decay > 1.0:raise ValueError("Decay must be between 0 and 1")# 初始化模型参数名到阴影参数名的映射self.m_name2s_name = {}# 注册衰减率的缓冲区self.register_buffer("decay", torch.tensor(decay, dtype=torch.float32))# 根据是否使用更新计数注册更新次数的缓冲区self.register_buffer("num_updates",torch.tensor(0, dtype=torch.int)if use_num_upateselse torch.tensor(-1, dtype=torch.int),)# 遍历模型的命名参数for name, p in model.named_parameters():# 如果参数需要梯度更新if p.requires_grad:# 移除参数名中的 '.' 字符s_name = name.replace(".", "")# 更新模型参数名到阴影参数名的映射self.m_name2s_name.update({name: s_name})# 注册参数的缓冲区self.register_buffer(s_name, p.clone().detach().data)# 初始化收集的参数列表self.collected_params = []# 重置更新计数的方法def reset_num_updates(self):# 删除 num_updates 的缓冲区del self.num_updates# 注册更新计数的缓冲区,初始化为 0self.register_buffer("num_updates", torch.tensor(0, dtype=torch.int))# 前向传播方法,接收模型作为输入def forward(self, model):# 获取当前衰减率decay = self.decay# 如果更新计数有效if self.num_updates >= 0:# 增加更新计数self.num_updates += 1# 计算新的衰减率decay = min(self.decay, (1 + self.num_updates) / (10 + self.num_updates))# 计算 1 减去衰减率one_minus_decay = 1.0 - decay# 在不跟踪梯度的情况下执行操作with torch.no_grad():# 获取模型的命名参数字典m_param = dict(model.named_parameters())# 获取阴影参数的命名缓冲区字典shadow_params = dict(self.named_buffers())# 遍历模型参数字典for key in m_param:# 如果参数需要梯度更新if m_param[key].requires_grad:# 获取对应的阴影参数名sname = self.m_name2s_name[key]# 将阴影参数转换为与模型参数相同的数据类型shadow_params[sname] = shadow_params[sname].type_as(m_param[key])# 更新阴影参数的值shadow_params[sname].sub_(one_minus_decay * (shadow_params[sname] - m_param[key]))else:# 确保此参数不在映射中assert not key in self.m_name2s_name# 将阴影参数复制到模型参数的方法def copy_to(self, model):# 获取模型的命名参数字典m_param = dict(model.named_parameters())# 获取阴影参数的命名缓冲区字典shadow_params = dict(self.named_buffers())# 遍历模型参数字典for key in m_param:# 如果参数需要梯度更新if m_param[key].requires_grad:# 复制阴影参数的数据到模型参数m_param[key].data.copy_(shadow_params[self.m_name2s_name[key]].data)else:# 确保此参数不在映射中assert not key in self.m_name2s_name# 存储当前参数以供稍后恢复的方法def store(self, parameters):"""保存当前参数以供稍后恢复。参数:parameters: 可迭代的 `torch.nn.Parameter`;需要临时存储的参数。"""# 克隆参数并存储在收集的参数列表中self.collected_params = [param.clone() for param in parameters]# 定义一个恢复方法,用于恢复存储的参数def restore(self, parameters):"""恢复通过 `store` 方法存储的参数。这对于在不影响原始优化过程的情况下使用 EMA 参数验证模型很有用。在调用 `copy_to` 方法之前存储参数。验证(或保存模型)后,使用此方法恢复先前的参数。Args:parameters: 可迭代的 `torch.nn.Parameter`;需要用存储的参数更新的参数。"""# 遍历已收集的参数和输入参数,成对处理for c_param, param in zip(self.collected_params, parameters):# 将已收集参数的数据复制到输入参数的数据中param.data.copy_(c_param.data)

.\cogview3-finetune\sat\sgm\modules\encoders\modules.py

# 导入数学库,用于数学计算
import math
# 从上下文管理库导入 nullcontext,用于创建一个不执行任何操作的上下文管理器
from contextlib import nullcontext
# 从 functools 导入 partial,用于创建偏函数
from functools import partial
# 从 typing 导入各种类型注解,用于类型检查
from typing import Dict, List, Optional, Tuple, Union# 导入 kornia 库,用于计算机视觉的操作
import kornia
# 导入 numpy 库,用于数组和数值计算
import numpy as np
# 导入 open_clip 库,用于处理 CLIP 模型
import open_clip
# 导入 PyTorch 库,深度学习框架
import torch
# 导入 PyTorch 的分布式模块
import torch.distributed
# 导入 PyTorch 的神经网络模块
import torch.nn as nn
# 从 einops 导入 rearrange 和 repeat,用于重排和复制张量
from einops import rearrange, repeat
# 从 omegaconf 导入 ListConfig,用于处理配置文件
from omegaconf import ListConfig
# 从 torch.utils.checkpoint 导入 checkpoint,用于节省内存的检查点机制
from torch.utils.checkpoint import checkpoint
# 从 transformers 导入各种模型和分词器
from transformers import (ByT5Tokenizer,  # 导入 ByT5 的分词器CLIPTextModel,  # 导入 CLIP 的文本模型CLIPTokenizer,  # 导入 CLIP 的分词器T5EncoderModel,  # 导入 T5 的编码器模型T5Tokenizer,  # 导入 T5 的分词器AutoModel,  # 导入自动模型类,用于加载预训练模型AutoTokenizer  # 导入自动分词器类,用于加载预训练分词器
)# 从模块中导入正则化器、编码器和时间步等工具
from ...modules.autoencoding.regularizers import DiagonalGaussianRegularizer
from ...modules.diffusionmodules.model import Encoder
from ...modules.diffusionmodules.openaimodel import Timestep
from ...modules.diffusionmodules.util import extract_into_tensor, make_beta_schedule
from ...modules.distributions.distributions import DiagonalGaussianDistribution
from ...util import (append_dims,  # 导入函数,用于向张量追加维度autocast,  # 导入函数,用于自动混合精度训练count_params,  # 导入函数,用于计算模型参数数量default,  # 导入函数,用于获取默认值disabled_train,  # 导入函数,用于禁用训练模式expand_dims_like,  # 导入函数,用于扩展张量维度以匹配另一个张量instantiate_from_config,  # 导入函数,从配置实例化对象
)# 定义一个抽象的嵌入模型类,继承自 nn.Module
class AbstractEmbModel(nn.Module):# 初始化方法def __init__(self):super().__init__()  # 调用父类构造函数self._is_trainable = None  # 初始化可训练标志self._ucg_rate = None  # 初始化 UCG 率self._input_key = None  # 初始化输入键# 定义 is_trainable 属性的 getter 方法@propertydef is_trainable(self) -> bool:return self._is_trainable  # 返回可训练标志# 定义 ucg_rate 属性的 getter 方法@propertydef ucg_rate(self) -> Union[float, torch.Tensor]:return self._ucg_rate  # 返回 UCG 率# 定义 input_key 属性的 getter 方法@propertydef input_key(self) -> str:return self._input_key  # 返回输入键# 定义 is_trainable 属性的 setter 方法@is_trainable.setterdef is_trainable(self, value: bool):self._is_trainable = value  # 设置可训练标志# 定义 ucg_rate 属性的 setter 方法@ucg_rate.setterdef ucg_rate(self, value: Union[float, torch.Tensor]):self._ucg_rate = value  # 设置 UCG 率# 定义 input_key 属性的 setter 方法@input_key.setterdef input_key(self, value: str):self._input_key = value  # 设置输入键# 定义 is_trainable 属性的 deleter 方法@is_trainable.deleterdef is_trainable(self):del self._is_trainable  # 删除可训练标志# 定义 ucg_rate 属性的 deleter 方法@ucg_rate.deleterdef ucg_rate(self):del self._ucg_rate  # 删除 UCG 率# 定义 input_key 属性的 deleter 方法@input_key.deleterdef input_key(self):del self._input_key  # 删除输入键# 定义通用条件器类,继承自 nn.Module
class GeneralConditioner(nn.Module):# 输出维度到键的映射OUTPUT_DIM2KEYS = {2: "vector", 3: "crossattn", 4: "concat", 5: "concat"}# 键到拼接维度的映射KEY2CATDIM = {"vector": 1, "crossattn": 2, "concat": 1}# 初始化函数,接收嵌入模型配置及其他参数def __init__(self, emb_models: Union[List, ListConfig], cor_embs=[], cor_p=[]):# 调用父类的初始化方法super().__init__()# 存储嵌入模型的列表embedders = []# 遍历嵌入模型配置for n, embconfig in enumerate(emb_models):# 从配置中实例化嵌入模型embedder = instantiate_from_config(embconfig)# 确保嵌入模型继承自 AbstractEmbModelassert isinstance(embedder, AbstractEmbModel), f"embedder model {embedder.__class__.__name__} has to inherit from AbstractEmbModel"# 获取是否可训练的标志,默认为 Falseembedder.is_trainable = embconfig.get("is_trainable", False)# 获取 UCG 比率,默认为 0.0embedder.ucg_rate = embconfig.get("ucg_rate", 0.0)# 如果不可训练,禁用训练if not embedder.is_trainable:embedder.train = disabled_train# 将模型参数的梯度要求设为 Falsefor param in embedder.parameters():param.requires_grad = False# 将模型设置为评估模式embedder.eval()# print(#     f"Initialized embedder #{n}: {embedder.__class__.__name__} "#     f"with {count_params(embedder, False)} params. Trainable: {embedder.is_trainable}"# )# 检查是否有单一输入键if "input_key" in embconfig:embedder.input_key = embconfig["input_key"]# 检查是否有多个输入键elif "input_keys" in embconfig:embedder.input_keys = embconfig["input_keys"]# 如果没有输入键,则引发异常else:raise KeyError(f"need either 'input_key' or 'input_keys' for embedder {embedder.__class__.__name__}")# 获取遗留 UCG 值embedder.legacy_ucg_val = embconfig.get("legacy_ucg_value", None)# 如果遗留 UCG 值存在,初始化随机状态if embedder.legacy_ucg_val is not None:embedder.ucg_prng = np.random.RandomState()# 将嵌入模型添加到列表中embedders.append(embedder)# 将嵌入模型列表存储为模块列表self.embedders = nn.ModuleList(embedders)# 如果存在条件嵌入,确保条件概率长度匹配if len(cor_embs) > 0:assert len(cor_p) == 2**len(cor_embs)# 存储条件嵌入和概率self.cor_embs = cor_embsself.cor_p = cor_p# 根据嵌入模型和批量数据获取 UCG 值(可能)def possibly_get_ucg_val(self, embedder: AbstractEmbModel, batch: Dict) -> Dict:# 确保遗留 UCG 值存在assert embedder.legacy_ucg_val is not None# 获取 UCG 比率p = embedder.ucg_rate# 获取遗留 UCG 值val = embedder.legacy_ucg_val# 遍历批量数据的输入键for i in range(len(batch[embedder.input_key])):# 根据概率选择是否替换值if embedder.ucg_prng.choice(2, p=[1 - p, p]):batch[embedder.input_key][i] = val# 返回更新后的批量数据return batch# 根据嵌入模型和条件获取 UCG 值(必定)def surely_get_ucg_val(self, embedder: AbstractEmbModel, batch: Dict, cond_or_not) -> Dict:# 确保遗留 UCG 值存在assert embedder.legacy_ucg_val is not None# 获取遗留 UCG 值val = embedder.legacy_ucg_val# 遍历批量数据的输入键for i in range(len(batch[embedder.input_key])):# 如果条件满足,替换值if cond_or_not[i]:batch[embedder.input_key][i] = val# 返回更新后的批量数据return batch# 定义获取单个嵌入的方法def get_single_embedding(self, embedder, batch, output, cond_or_not: Optional[np.ndarray] = None, force_zero_embeddings: Optional[List] = None):# 根据嵌入器是否可训练选择上下文管理器embedding_context = nullcontext if embedder.is_trainable else torch.no_grad# 使用选定的上下文管理器with embedding_context():# 检查嵌入器是否有输入键,并且输入键不为 Noneif hasattr(embedder, "input_key") and (embedder.input_key is not None):# 如果嵌入器的 legacy_ucg_val 不为 Noneif embedder.legacy_ucg_val is not None:# 如果条件不为 Noneif cond_or_not is None:# 可能获取 ucg_val 的值batch = self.possibly_get_ucg_val(embedder, batch)else:# 确定获取 ucg_val 的值batch = self.surely_get_ucg_val(embedder, batch, cond_or_not)# 从批次中获取嵌入输出emb_out = embedder(batch[embedder.input_key])# 如果嵌入器有多个输入键elif hasattr(embedder, "input_keys"):# 从批次中解包并获取嵌入输出emb_out = embedder(*[batch[k] for k in embedder.input_keys])# 确保嵌入输出是张量、列表或元组assert isinstance(emb_out, (torch.Tensor, list, tuple)), f"encoder outputs must be tensors or a sequence, but got {type(emb_out)}"# 如果嵌入输出不是列表或元组,将其转为列表if not isinstance(emb_out, (list, tuple)):emb_out = [emb_out]    # 遍历嵌入输出for emb in emb_out:# 获取输出键out_key = self.OUTPUT_DIM2KEYS[emb.dim()]# 如果嵌入器的 ucg_rate 大于 0 且 legacy_ucg_val 为 Noneif embedder.ucg_rate > 0.0 and embedder.legacy_ucg_val is None:# 如果条件不为 Noneif cond_or_not is None:# 扩展嵌入维度并应用伯努利分布emb = (expand_dims_like(torch.bernoulli((1.0 - embedder.ucg_rate)* torch.ones(emb.shape[0], device=emb.device)),emb,)* emb)else:# 根据条件扩展嵌入维度emb = (expand_dims_like(torch.tensor(1-cond_or_not, dtype=emb.dtype, device=emb.device),emb,)* emb)# 如果嵌入器有输入键且在强制零嵌入列表中if (hasattr(embedder, "input_key")and embedder.input_key in force_zero_embeddings):# 将嵌入设置为全零emb = torch.zeros_like(emb)# 如果输出中已有该键if out_key in output:# 将新的嵌入与已有输出拼接output[out_key] = torch.cat((output[out_key], emb), self.KEY2CATDIM[out_key])else:# 否则,直接赋值新的嵌入output[out_key] = emb# 返回更新后的输出return output# 定义前向传播的方法def forward(self, batch: Dict, force_zero_embeddings: Optional[List] = None) -> Dict:  # 定义函数的返回类型为字典output = dict()  # 初始化一个空字典用于存储输出结果if force_zero_embeddings is None:  # 检查是否提供强制零嵌入参数force_zero_embeddings = []  # 如果没有,初始化为空列表if len(self.cor_embs) > 0:  # 如果相关嵌入存在batch_size = len(batch[list(batch.keys())[0]])  # 获取批次中第一个键的大小rand_idx = np.random.choice(len(self.cor_p), size=(batch_size,), p=self.cor_p)  # 根据相关概率随机选择索引for emb_idx in self.cor_embs:  # 遍历相关嵌入索引cond_or_not = rand_idx % 2  # 计算条件标志(0或1)rand_idx //= 2  # 更新随机索引embedder = self.embedders[emb_idx]  # 获取对应的嵌入器output = self.get_single_embedding(self.embedders[emb_idx], batch, output=output, cond_or_not=cond_or_not, force_zero_embeddings=force_zero_embeddings)  # 获取单个嵌入并更新输出for i, embedder in enumerate(self.embedders):  # 遍历所有嵌入器及其索引if i in self.cor_embs:  # 如果索引在相关嵌入中,则跳过continue  # 继续下一个循环output = self.get_single_embedding(embedder, batch, output=output, force_zero_embeddings=force_zero_embeddings)  # 获取单个嵌入并更新输出return output  # 返回最终的输出字典def get_unconditional_conditioning(  # 定义获取无条件调节的函数self, batch_c, batch_uc=None, force_uc_zero_embeddings=None  # 输入批次及可选参数):if force_uc_zero_embeddings is None:  # 检查强制无条件零嵌入参数force_uc_zero_embeddings = []  # 如果没有,初始化为空列表ucg_rates = list()  # 初始化列表用于存储原有的无条件生成率for embedder in self.embedders:  # 遍历所有嵌入器ucg_rates.append(embedder.ucg_rate)  # 保存当前的无条件生成率embedder.ucg_rate = 0.0  # 将无条件生成率设置为0cor_embs = self.cor_embs  # 保存当前相关嵌入cor_p = self.cor_p  # 保存当前相关概率self.cor_embs = []  # 清空相关嵌入self.cor_p = []  # 清空相关概率c = self(batch_c)  # 计算输入批次的输出uc = self(batch_c if batch_uc is None else batch_uc, force_uc_zero_embeddings)  # 计算无条件输出for embedder, rate in zip(self.embedders, ucg_rates):  # 恢复每个嵌入器的无条件生成率embedder.ucg_rate = rate  # 将原有的无条件生成率重新赋值self.cor_embs = cor_embs  # 恢复相关嵌入self.cor_p = cor_p  # 恢复相关概率return c, uc  # 返回有条件和无条件的输出
# 定义一个名为 InceptionV3 的类,继承自 nn.Module
class InceptionV3(nn.Module):"""对 https://github.com/mseitzer/pytorch-fid 的 Inception 端口进行包装,并在末尾增加一个 squeeze 操作"""# 初始化函数,接受 normalize_input 参数和其他可选参数def __init__(self, normalize_input=False, **kwargs):# 调用父类的初始化函数super().__init__()# 从 pytorch_fid 导入 inception 模块from pytorch_fid import inception# 设置输入调整标志为 Truekwargs["resize_input"] = True# 创建 InceptionV3 模型实例,并传入参数self.model = inception.InceptionV3(normalize_input=normalize_input, **kwargs)# 前向传播函数,接受输入张量 inpdef forward(self, inp):# 对输入进行尺寸调整(已注释)# inp = kornia.geometry.resize(inp, (299, 299),#                              interpolation='bicubic',#                              align_corners=False,#                              antialias=True)# 将输入值限制在 -1 到 1 之间(已注释)# inp = inp.clamp(min=-1, max=1)# 使用模型对输入进行处理,获得输出outp = self.model(inp)# 如果输出只有一个元素,去掉维度并返回if len(outp) == 1:return outp[0].squeeze()# 返回原始输出return outp# 定义一个名为 IdentityEncoder 的类,继承自 AbstractEmbModel
class IdentityEncoder(AbstractEmbModel):# 编码函数,直接返回输入def encode(self, x):return x# 前向传播函数,直接返回输入def forward(self, x):return x# 定义一个名为 ClassEmbedder 的类,继承自 AbstractEmbModel
class ClassEmbedder(AbstractEmbModel):# 初始化函数,接受嵌入维度、类数和是否添加序列维度的参数def __init__(self, embed_dim, n_classes=1000, add_sequence_dim=False):# 调用父类的初始化函数super().__init__()# 创建嵌入层,映射类到嵌入维度self.embedding = nn.Embedding(n_classes, embed_dim)# 保存类数和是否添加序列维度的标志self.n_classes = n_classesself.add_sequence_dim = add_sequence_dim# 前向传播函数,接受类的输入def forward(self, c):# 获取类的嵌入表示c = self.embedding(c)# 如果需要,添加序列维度if self.add_sequence_dim:c = c[:, None, :]# 返回嵌入表示return c# 获取无条件的条件信息def get_unconditional_conditioning(self, bs, device="cuda"):uc_class = (self.n_classes - 1)  # 1000 类 --> 0 ... 999,额外的类用于无条件生成# 创建一个全为 uc_class 的张量uc = torch.ones((bs,), device=device) * uc_class# 将类信息包装成字典uc = {self.key: uc.long()}# 返回字典return uc# 定义一个名为 ClassEmbedderForMultiCond 的类,继承自 ClassEmbedder
class ClassEmbedderForMultiCond(ClassEmbedder):# 前向传播函数,接受批量数据、键和是否禁用丢弃的标志def forward(self, batch, key=None, disable_dropout=False):# 将输出初始化为输入批次out = batch# 如果未提供键,使用默认键key = default(key, self.key)# 检查批次中的值是否为列表islist = isinstance(batch[key], list)# 如果是列表,则取第一个元素if islist:batch[key] = batch[key][0]# 调用父类的前向传播c_out = super().forward(batch, key, disable_dropout)# 根据是否为列表,更新输出out[key] = [c_out] if islist else c_out# 返回更新后的输出return out# 定义一个名为 FrozenT5Embedder 的类,继承自 AbstractEmbModel
class FrozenT5Embedder(AbstractEmbModel):"""使用 T5 转换器编码器进行文本处理"""# 初始化函数,接受模型目录、设备、最大长度、是否冻结和缓存目录的参数def __init__(self,model_dir="google/t5-v1_1-xxl",device="cuda",max_length=77,freeze=True,cache_dir=None,):# 调用父类的初始化函数super().__init__()# 如果模型目录不是默认的,加载相应的分词器和模型if model_dir is not "google/t5-v1_1-xxl":self.tokenizer = T5Tokenizer.from_pretrained(model_dir)self.transformer = T5EncoderModel.from_pretrained(model_dir)else:# 否则,使用缓存目录加载分词器和模型self.tokenizer = T5Tokenizer.from_pretrained(model_dir, cache_dir=cache_dir)self.transformer = T5EncoderModel.from_pretrained(model_dir, cache_dir=cache_dir)# 保存设备信息self.device = device# 保存最大长度self.max_length = max_length# 如果需要冻结,调用冻结函数if freeze:self.freeze()# 定义冻结模型参数的方法def freeze(self):# 将转换器设置为评估模式,以禁用训练时的行为(如 Dropout)self.transformer = self.transformer.eval()# 遍历所有模型参数for param in self.parameters():# 禁用参数的梯度计算,以减少内存使用和提高推理速度param.requires_grad = False# @autocast  # 可选装饰器,用于自动混合精度def forward(self, text):# 使用分词器对输入文本进行编码,返回编码后的批次信息batch_encoding = self.tokenizer(text,# 截断超出最大长度的文本truncation=True,# 设置最大长度max_length=self.max_length,# 返回编码后的文本长度return_length=True,# 不返回溢出的令牌return_overflowing_tokens=False,# 填充到最大长度padding="max_length",# 返回 PyTorch 张量return_tensors="pt",)# 将输入ID移动到指定的设备(CPU或GPU)tokens = batch_encoding["input_ids"].to(self.device)# 使用上下文管理器禁用混合精度计算with torch.autocast("cuda", enabled=False):# 将令牌输入到转换器中,获取输出outputs = self.transformer(input_ids=tokens)# 获取最后一个隐藏状态,作为编码的表示z = outputs.last_hidden_state# 返回编码结果return z# 定义编码文本的方法def encode(self, text):# 调用当前对象的 forward 方法进行编码return self(text)
# 定义一个名为 FrozenByT5Embedder 的类,继承自 AbstractEmbModel
class FrozenByT5Embedder(AbstractEmbModel):"""使用 ByT5 转换器编码器处理文本,具备字符意识。"""# 初始化方法,设置模型的版本、设备、最大长度和是否冻结参数def __init__(self, version="google/byt5-base", device="cuda", max_length=77, freeze=True):  # 其他可用版本为 google/t5-v1_1-xl 和 google/t5-v1_1-xxl# 调用父类构造函数super().__init__()# 加载预训练的 ByT5 分词器self.tokenizer = ByT5Tokenizer.from_pretrained(version)# 加载预训练的 T5 编码器模型self.transformer = T5EncoderModel.from_pretrained(version)# 设置设备类型(如 CUDA)self.device = device# 设置输入文本的最大长度self.max_length = max_length# 如果需要冻结参数,则调用冻结方法if freeze:self.freeze()# 冻结模型的参数,以避免训练时更新def freeze(self):# 将变换器设置为评估模式self.transformer = self.transformer.eval()# 遍历所有参数并设置为不可更新for param in self.parameters():param.requires_grad = False# 定义前向传播方法def forward(self, text):# 对输入文本进行编码,返回批次编码batch_encoding = self.tokenizer(text,truncation=True,max_length=self.max_length,return_length=True,return_overflowing_tokens=False,padding="max_length",return_tensors="pt",)# 将输入的 token 移动到指定设备tokens = batch_encoding["input_ids"].to(self.device)# 在不启用自动混合精度的情况下进行前向传播with torch.autocast("cuda", enabled=False):# 获取模型的输出outputs = self.transformer(input_ids=tokens)# 取出最后一层的隐藏状态z = outputs.last_hidden_state# 返回最后的隐藏状态return z# 定义编码方法,直接调用前向传播def encode(self, text):return self(text)# 定义一个名为 FrozenCLIPEmbedder 的类,继承自 AbstractEmbModel
class FrozenCLIPEmbedder(AbstractEmbModel):"""使用 CLIP 转换器编码器处理文本(来自 huggingface)"""# 定义可用的层类型LAYERS = ["last", "pooled", "hidden"]# 初始化方法,设置模型的版本、设备、最大长度、冻结状态和层类型def __init__(self,version="openai/clip-vit-large-patch14",device="cuda",max_length=77,freeze=True,layer="last",layer_idx=None,always_return_pooled=False,):  # clip-vit-base-patch32# 调用父类构造函数super().__init__()# 确保层类型在可用层中assert layer in self.LAYERS# 加载预训练的 CLIP 分词器self.tokenizer = CLIPTokenizer.from_pretrained(version)# 加载预训练的 CLIP 文本模型self.transformer = CLIPTextModel.from_pretrained(version)# 设置设备类型(如 CUDA)self.device = device# 设置输入文本的最大长度self.max_length = max_length# 如果需要冻结参数,则调用冻结方法if freeze:self.freeze()# 设置所用层的类型self.layer = layer# 设置所用层的索引self.layer_idx = layer_idx# 设置是否总是返回池化结果self.return_pooled = always_return_pooled# 如果层为隐藏层,确保层索引有效if layer == "hidden":assert layer_idx is not Noneassert 0 <= abs(layer_idx) <= 12# 冻结模型的参数,以避免训练时更新def freeze(self):# 将变换器设置为评估模式self.transformer = self.transformer.eval()# 遍历所有参数并设置为不可更新for param in self.parameters():param.requires_grad = False# 这里缺少方法体,可能是注释或未完成代码@autocast# 定义前向传播函数,接收文本输入def forward(self, text):# 对输入文本进行编码,生成批量编码,设置各种参数以控制编码行为batch_encoding = self.tokenizer(text,truncation=True,  # 超出最大长度时截断文本max_length=self.max_length,  # 最大长度限制return_length=True,  # 返回编码后每个文本的长度return_overflowing_tokens=False,  # 不返回溢出的标记padding="max_length",  # 填充到最大长度return_tensors="pt",  # 返回 PyTorch 张量格式)# 获取编码后的输入标记,并将其移动到指定设备上tokens = batch_encoding["input_ids"].to(self.device)# 使用 transformer 模型进行前向传播,获取输出outputs = self.transformer(input_ids=tokens, output_hidden_states=self.layer == "hidden"  # 根据条件决定是否返回隐藏状态)# 根据层级选择相应的输出if self.layer == "last":# 选择最后一层的隐藏状态z = outputs.last_hidden_stateelif self.layer == "pooled":# 选择池化后的输出,并增加一个维度z = outputs.pooler_output[:, None, :]else:# 选择指定索引的隐藏状态z = outputs.hidden_states[self.layer_idx]# 根据是否需要池化输出,返回相应结果if self.return_pooled:return z, outputs.pooler_output  # 返回输出和池化结果return z  # 返回仅隐藏状态# 定义编码函数,简化调用前向传播def encode(self, text):# 调用前向传播函数并返回结果return self(text)
# 定义一个名为 FrozenOpenCLIPEmbedder2 的类,继承自 AbstractEmbModel
class FrozenOpenCLIPEmbedder2(AbstractEmbModel):"""使用 OpenCLIP 变换器编码器进行文本处理"""# 定义可用的层名称LAYERS = ["pooled", "last", "penultimate"]# 初始化方法,设置模型的基本参数def __init__(self,arch="ViT-H-14",  # 模型架构version="laion2b_s32b_b79k",  # 版本信息device="cuda",  # 设备类型max_length=77,  # 最大输入长度freeze=True,  # 是否冻结模型参数layer="last",  # 选择的层always_return_pooled=False,  # 是否始终返回池化结果legacy=True,  # 是否使用遗留模式):super().__init__()  # 调用父类构造函数assert layer in self.LAYERS  # 确保指定的层有效# 创建模型和转换器,并将其移动到 CPUmodel, _, _ = open_clip.create_model_and_transforms(arch,device=torch.device("cpu"),)del model.visual  # 删除视觉部分self.model = model  # 保存模型self.device = device  # 设置设备self.max_length = max_length  # 设置最大长度self.return_pooled = always_return_pooled  # 设置是否返回池化if freeze:  # 如果需要冻结模型self.freeze()  # 调用冻结方法self.layer = layer  # 设置层# 根据选择的层更新层索引if self.layer == "last":self.layer_idx = 0elif self.layer == "penultimate":self.layer_idx = 1else:raise NotImplementedError()  # 不支持的层选择self.legacy = legacy  # 设置遗留模式# 冻结模型参数的方法def freeze(self):self.model = self.model.eval()  # 设置模型为评估模式for param in self.parameters():  # 遍历所有参数param.requires_grad = False  # 禁止梯度更新# 前向传播的方法,处理输入文本@autocastdef forward(self, text):tokens = open_clip.tokenize(text)  # 将文本转换为标记z = self.encode_with_transformer(tokens.to(self.device))  # 编码处理# 根据条件返回不同结果if not self.return_pooled and self.legacy:return zif self.return_pooled:assert not self.legacy  # 确保不在遗留模式下return z[self.layer], z["pooled"]  # 返回选定层和池化结果return z[self.layer]  # 返回选定层结果# 使用变换器进行编码的方法def encode_with_transformer(self, text):x = self.model.token_embedding(text)  # 获取标记嵌入x = x + self.model.positional_embedding  # 加入位置嵌入x = x.permute(1, 0, 2)  # 转换维度顺序x = self.text_transformer_forward(x, attn_mask=self.model.attn_mask)  # 通过变换器前向传播if self.legacy:  # 如果在遗留模式x = x[self.layer]  # 获取选定层结果x = self.model.ln_final(x)  # 最终归一化return x  # 返回结果else:# x 为字典,将保持为字典o = x["last"]  # 获取最后一层输出o = self.model.ln_final(o)  # 最终归一化pooled = self.pool(o, text)  # 进行池化处理x["pooled"] = pooled  # 将池化结果存入字典return x  # 返回字典# 池化处理的方法def pool(self, x, text):# 从 eot 嵌入中获取特征(eot_token 为每个序列中的最大值)x = (x[torch.arange(x.shape[0]), text.argmax(dim=-1)]  # 获取 eot 嵌入@ self.model.text_projection  # 应用文本投影)return x  # 返回池化结果# 定义文本转换器的前向传播方法,接受输入张量 x 和可选的注意力掩码def text_transformer_forward(self, x: torch.Tensor, attn_mask=None):# 创建一个空字典以存储输出outputs = {}# 遍历转换器的每个残差块for i, r in enumerate(self.model.transformer.resblocks):# 如果是最后一个残差块,将输入张量进行维度变换并存储if i == len(self.model.transformer.resblocks) - 1:outputs["penultimate"] = x.permute(1, 0, 2)  # 将维度从 LND 转换为 NLD# 如果启用了梯度检查点并且不是脚本模式if (self.model.transformer.grad_checkpointingand not torch.jit.is_scripting()):# 使用检查点技术进行前向传播以节省内存x = checkpoint(r, x, attn_mask)else:# 正常执行残差块的前向传播x = r(x, attn_mask=attn_mask)# 将最后输出张量的维度进行转换并存储outputs["last"] = x.permute(1, 0, 2)  # 将维度从 LND 转换为 NLD# 返回包含倒数第二层和最后一层输出的字典return outputs# 定义编码方法,接受文本输入def encode(self, text):# 调用文本转换器的前向传播方法并返回结果return self(text)
# 定义一个名为 FrozenOpenCLIPEmbedder 的类,继承自 AbstractEmbModel
class FrozenOpenCLIPEmbedder(AbstractEmbModel):# 定义一个类属性 LAYERS,包含模型中可用的层LAYERS = [# "pooled",  # 注释掉的层选项"last",  # 最后一个层"penultimate",  # 倒数第二个层]# 初始化方法,用于设置实例的基本参数def __init__(self,arch="ViT-H-14",  # 模型架构version="laion2b_s32b_b79k",  # 预训练模型版本device="cuda",  # 设备类型,默认为 GPUmax_length=77,  # 最大输入文本长度freeze=True,  # 是否冻结模型参数layer="last",  # 选择使用的层):# 调用父类的初始化方法super().__init__()# 确保选择的层在可用层中assert layer in self.LAYERS# 创建模型及其转换,使用指定的架构和设备model, _, _ = open_clip.create_model_and_transforms(arch, device=torch.device("cpu"), pretrained=version)# 删除视觉部分以冻结模型del model.visual# 将模型赋值给实例变量self.model = model# 设置设备属性self.device = device# 设置最大长度属性self.max_length = max_length# 如果需要冻结,则调用冻结方法if freeze:self.freeze()# 设置所选层self.layer = layer# 根据所选层设置层索引if self.layer == "last":self.layer_idx = 0elif self.layer == "penultimate":self.layer_idx = 1else:# 如果层不在可用选项中,则抛出异常raise NotImplementedError()# 冻结模型参数的方法def freeze(self):# 将模型设置为评估模式self.model = self.model.eval()# 将所有参数的 requires_grad 属性设置为 False,停止梯度计算for param in self.parameters():param.requires_grad = False# 前向传播方法,接受文本输入def forward(self, text):# 对文本进行分词处理tokens = open_clip.tokenize(text)# 使用变换器进行编码,并将结果传送到设备z = self.encode_with_transformer(tokens.to(self.device))# 返回编码结果return z# 使用变换器编码文本的方法def encode_with_transformer(self, text):# 获取文本的嵌入表示,形状为 [batch_size, n_ctx, d_model]x = self.model.token_embedding(text)  # 加上位置嵌入x = x + self.model.positional_embedding# 重新排列维度,将形状从 NLD 转换为 LNDx = x.permute(1, 0, 2)  # 执行变换器前向传播x = self.text_transformer_forward(x, attn_mask=self.model.attn_mask)# 重新排列维度回到 NLDx = x.permute(1, 0, 2)  # 通过最终层归一化x = self.model.ln_final(x)# 返回处理后的结果return x# 执行文本变换器前向传播的方法def text_transformer_forward(self, x: torch.Tensor, attn_mask=None):# 遍历变换器的残差块for i, r in enumerate(self.model.transformer.resblocks):# 如果达到所需层索引则停止if i == len(self.model.transformer.resblocks) - self.layer_idx:break# 检查是否使用梯度检查点if (self.model.transformer.grad_checkpointingand not torch.jit.is_scripting()):# 使用检查点方式更新输入x = checkpoint(r, x, attn_mask)else:# 否则直接通过残差块处理输入x = r(x, attn_mask=attn_mask)# 返回变换后的结果return x# 编码文本的简化方法,直接调用前向传播def encode(self, text):return self(text)# 定义一个名为 FrozenOpenCLIPImageEmbedder 的类,继承自 AbstractEmbModel
class FrozenOpenCLIPImageEmbedder(AbstractEmbModel):"""使用 OpenCLIP 视觉变换器编码器处理图像"""# 初始化方法,用于设置实例的基本参数def __init__(self,arch="ViT-H-14",  # 模型架构version="laion2b_s32b_b79k",  # 预训练模型版本device="cuda",  # 设备类型,默认为 GPUmax_length=77,  # 最大输入图像长度freeze=True,  # 是否冻结模型参数antialias=True,  # 是否使用抗锯齿ucg_rate=0.0,  # 用户定义的裁剪率unsqueeze_dim=False,  # 是否增加维度repeat_to_max_len=False,  # 是否重复到最大长度num_image_crops=0,  # 图像裁剪数量output_tokens=False,  # 是否输出 tokens):# 调用父类的初始化方法super().__init__()# 创建模型和转换器,使用指定的架构、设备和预训练版本model, _, _ = open_clip.create_model_and_transforms(arch,device=torch.device("cpu"),  # 使用 CPU 作为计算设备pretrained=version,  # 指定预训练版本)# 删除模型中的变换器部分del model.transformer# 将创建的模型赋值给实例变量self.model = model# 设置最大图像裁剪数量self.max_crops = num_image_crops# 检查是否需要填充到最大长度self.pad_to_max_len = self.max_crops > 0# 检查是否需要重复到最大长度self.repeat_to_max_len = repeat_to_max_len and (not self.pad_to_max_len)# 设置设备类型self.device = device# 设置最大长度self.max_length = max_length# 如果需要冻结模型参数,则调用冻结方法if freeze:self.freeze()# 设置抗锯齿参数self.antialias = antialias# 注册均值张量作为缓冲区,设置为非持久性self.register_buffer("mean", torch.Tensor([0.48145466, 0.4578275, 0.40821073]), persistent=False)# 注册标准差张量作为缓冲区,设置为非持久性self.register_buffer("std", torch.Tensor([0.26862954, 0.26130258, 0.27577711]), persistent=False)# 设置 UCG 速率self.ucg_rate = ucg_rate# 设置需要扩展的维度self.unsqueeze_dim = unsqueeze_dim# 存储的批次初始化为 Noneself.stored_batch = None# 设置视觉模型的输出标记self.model.visual.output_tokens = output_tokens# 保存输出标记的状态self.output_tokens = output_tokensdef preprocess(self, x):# 将输入归一化到 [0,1] 范围x = kornia.geometry.resize(x,(224, 224),  # 将图像大小调整为 224x224interpolation="bicubic",  # 使用双三次插值align_corners=True,  # 对齐角点antialias=self.antialias,  # 使用抗锯齿)# 将图像数据从 [-1,1] 范围转换到 [0,1]x = (x + 1.0) / 2.0# 根据 CLIP 模型的均值和标准差重新归一化图像x = kornia.enhance.normalize(x, self.mean, self.std)# 返回处理后的图像return xdef freeze(self):# 将模型设置为评估模式self.model = self.model.eval()# 禁用所有参数的梯度计算for param in self.parameters():param.requires_grad = False@autocast  # 启用自动混合精度计算# 前向传播方法,处理输入图像并返回特征或标记def forward(self, image, no_dropout=False):# 使用视觉变换器对输入图像进行编码,得到特征 zz = self.encode_with_vision_transformer(image)# 初始化 tokens 为 Nonetokens = None# 如果输出标记为真,分离特征和标记if self.output_tokens:z, tokens = z[0], z[1]# 将特征 z 转换为与图像相同的数据类型z = z.to(image.dtype)# 如果 ucg_rate 大于 0,且没有进行 dropout,且没有最大裁剪if self.ucg_rate > 0.0 and not no_dropout and not (self.max_crops > 0):# 根据 Bernoulli 分布随机丢弃特征z = (torch.bernoulli((1.0 - self.ucg_rate) * torch.ones(z.shape[0], device=z.device))[:, None]* z)# 如果 tokens 不为 None,应用相同的丢弃逻辑if tokens is not None:tokens = (expand_dims_like(torch.bernoulli((1.0 - self.ucg_rate)* torch.ones(tokens.shape[0], device=tokens.device)),tokens,)* tokens)# 如果需要扩展维度,将特征 z 变为三维if self.unsqueeze_dim:z = z[:, None, :]# 如果输出标记为真,检查标记和特征的重复与填充条件if self.output_tokens:assert not self.repeat_to_max_lenassert not self.pad_to_max_len# 返回标记和特征return tokens, z# 如果需要重复到最大长度if self.repeat_to_max_len:# 将二维特征扩展为三维if z.dim() == 2:z_ = z[:, None, :]else:z_ = z# 返回重复的特征return repeat(z_, "b 1 d -> b n d", n=self.max_length), z# 如果需要填充到最大长度elif self.pad_to_max_len:# 确保特征是三维的assert z.dim() == 3# 在特征后面填充零z_pad = torch.cat((z,torch.zeros(z.shape[0],self.max_length - z.shape[1],z.shape[2],device=z.device,),),1,)# 返回填充后的特征和第一个时间步的特征return z_pad, z_pad[:, 0, ...]# 默认返回特征 zreturn z# 使用视觉变换器对图像进行编码def encode_with_vision_transformer(self, img):# 如果最大裁剪数大于0,则对图像进行裁剪预处理# if self.max_crops > 0:#    img = self.preprocess_by_cropping(img)# 检查图像维度是否为5if img.dim() == 5:# 确保最大裁剪数与图像的第二维度匹配assert self.max_crops == img.shape[1]# 重排图像维度,将其从 (b n) c h w 变为 (b n) c h wimg = rearrange(img, "b n c h w -> (b n) c h w")# 对图像进行预处理img = self.preprocess(img)# 如果不需要输出tokensif not self.output_tokens:# 确保模型不输出tokensassert not self.model.visual.output_tokens# 将图像传入模型进行处理x = self.model.visual(img)tokens = Noneelse:# 确保模型输出tokensassert self.model.visual.output_tokens# 将图像传入模型并获取输出和tokensx, tokens = self.model.visual(img)# 如果最大裁剪数大于0if self.max_crops > 0:# 重排输出,将其从 (b n) d 变为 b n dx = rearrange(x, "(b n) d -> b n d", n=self.max_crops)# 在序列轴上进行drop out,控制一定比例的输出x = (torch.bernoulli((1.0 - self.ucg_rate)* torch.ones(x.shape[0], x.shape[1], 1, device=x.device))* x)# 如果tokens不为Noneif tokens is not None:# 重排tokens,将其从 (b n) t d 变为 b t (n d)tokens = rearrange(tokens, "(b n) t d -> b t (n d)", n=self.max_crops)# 输出实验性提示信息print(f"You are running very experimental token-concat in {self.__class__.__name__}. "f"Check what you are doing, and then remove this message.")# 如果需要输出tokens,则返回if self.output_tokens:return x, tokens# 返回处理后的图像return x# 对输入文本进行编码def encode(self, text):# 调用自身对文本进行处理return self(text)
# 定义一个继承自 AbstractEmbModel 的类,名为 FrozenCLIPT5Encoder
class FrozenCLIPT5Encoder(AbstractEmbModel):# 构造函数,初始化模型的参数def __init__(self,clip_version="openai/clip-vit-large-patch14",  # CLIP 模型的版本t5_version="google/t5-v1_1-xl",  # T5 模型的版本device="cuda",  # 指定使用的设备clip_max_length=77,  # CLIP 模型的最大输入长度t5_max_length=77,  # T5 模型的最大输入长度):super().__init__()  # 调用父类的构造函数# 创建 CLIP 嵌入模型实例self.clip_encoder = FrozenCLIPEmbedder(clip_version, device, max_length=clip_max_length)# 创建 T5 嵌入模型实例self.t5_encoder = FrozenT5Embedder(t5_version, device, max_length=t5_max_length)# 打印 CLIP 和 T5 模型的参数数量print(f"{self.clip_encoder.__class__.__name__} has {count_params(self.clip_encoder) * 1.e-6:.2f} M parameters, "f"{self.t5_encoder.__class__.__name__} comes with {count_params(self.t5_encoder) * 1.e-6:.2f} M params.")# 定义编码函数,调用前向传播def encode(self, text):return self(text)# 定义前向传播函数def forward(self, text):# 使用 CLIP 编码器对文本进行编码clip_z = self.clip_encoder.encode(text)# 使用 T5 编码器对文本进行编码t5_z = self.t5_encoder.encode(text)# 返回 CLIP 和 T5 的编码结果return [clip_z, t5_z]# 定义一个继承自 nn.Module 的类,名为 SpatialRescaler
class SpatialRescaler(nn.Module):# 构造函数,初始化空间重缩放器的参数def __init__(self,n_stages=1,  # 重缩放的阶段数method="bilinear",  # 插值方法multiplier=0.5,  # 缩放因子in_channels=3,  # 输入通道数out_channels=None,  # 输出通道数bias=False,  # 是否使用偏置wrap_video=False,  # 是否处理视频数据kernel_size=1,  # 卷积核大小remap_output=False,  # 是否重映射输出通道):super().__init__()  # 调用父类的构造函数self.n_stages = n_stages  # 保存阶段数assert self.n_stages >= 0  # 确保阶段数非负# 验证插值方法是否在支持的范围内assert method in ["nearest","linear","bilinear","trilinear","bicubic","area",]self.multiplier = multiplier  # 保存缩放因子# 创建部分应用的插值函数self.interpolator = partial(torch.nn.functional.interpolate, mode=method)# 判断是否需要重映射输出通道self.remap_output = out_channels is not None or remap_output# 如果需要重映射输出通道,创建卷积层if self.remap_output:print(f"Spatial Rescaler mapping from {in_channels} to {out_channels} channels after resizing.")self.channel_mapper = nn.Conv2d(in_channels,out_channels,kernel_size=kernel_size,bias=bias,padding=kernel_size // 2,)self.wrap_video = wrap_video  # 保存是否处理视频的标志# 定义前向传播函数def forward(self, x):# 如果处理视频数据且输入是五维张量,进行维度调整if self.wrap_video and x.ndim == 5:B, C, T, H, W = x.shape  # 解包维度x = rearrange(x, "b c t h w -> b t c h w")  # 调整维度顺序x = rearrange(x, "b t c h w -> (b t) c h w")  # 合并批次和时间维度# 进行指定阶段的重缩放操作for stage in range(self.n_stages):x = self.interpolator(x, scale_factor=self.multiplier)# 如果处理视频数据,恢复维度顺序if self.wrap_video:x = rearrange(x, "(b t) c h w -> b t c h w", b=B, t=T, c=C)  # 恢复维度x = rearrange(x, "b t c h w -> b c t h w")  # 再次调整维度# 如果需要重映射输出,应用卷积层if self.remap_output:x = self.channel_mapper(x)return x  # 返回处理后的张量# 定义编码函数,调用前向传播def encode(self, x):return self(x)# 定义一个继承自 nn.Module 的类,名为 LowScaleEncoder
class LowScaleEncoder(nn.Module):# 构造函数,初始化低缩放编码器的参数def __init__(self,model_config,  # 模型配置linear_start,  # 线性起始值linear_end,  # 线性结束值timesteps=1000,  # 时间步数max_noise_level=250,  # 最大噪声水平output_size=64,  # 输出大小scale_factor=1.0,  # 缩放因子# 定义一个类,继承自父类def __init__(self, max_noise_level, model_config, timesteps, linear_start, linear_end, output_size, scale_factor):# 调用父类的初始化方法super().__init__()# 设置最大噪声级别self.max_noise_level = max_noise_level# 根据配置实例化模型self.model = instantiate_from_config(model_config)# 注册一个调度表,用于控制噪声的变化self.augmentation_schedule = self.register_schedule(timesteps=timesteps, linear_start=linear_start, linear_end=linear_end)# 设置输出大小self.out_size = output_size# 设置缩放因子self.scale_factor = scale_factor# 注册一个调度表,用于控制噪声的变化def register_schedule(self, beta_schedule, timesteps, linear_start, linear_end, cosine_s):# 根据给定的参数生成 beta 调度表betas = make_beta_schedule(beta_schedule,timesteps,linear_start=linear_start,linear_end=linear_end,cosine_s=cosine_s,)# 根据 betas 计算 alphasalphas = 1.0 - betas# 计算 alphas 的累积乘积alphas_cumprod = np.cumprod(alphas, axis=0)# 计算 alphas 的累积乘积的前一个值alphas_cumprod_prev = np.append(1.0, alphas_cumprod[:-1])# 获取 betas 的形状(timesteps,) = betas.shape# 将 timesteps 转换为整数self.num_timesteps = int(timesteps)# 设置线性起始值self.linear_start = linear_start# 设置线性结束值self.linear_end = linear_end# 判断 alphas_cumprod 的形状是否与 num_timesteps 相同assert (alphas_cumprod.shape[0] == self.num_timesteps), "alphas have to be defined for each timestep"# 创建一个偏函数,用于将数组转换为 torch.tensorto_torch = partial(torch.tensor, dtype=torch.float32)# 注册缓冲区,存储 betasself.register_buffer("betas", to_torch(betas))# 注册缓冲区,存储 alphas_cumprodself.register_buffer("alphas_cumprod", to_torch(alphas_cumprod))# 注册缓冲区,存储 alphas_cumprod_prevself.register_buffer("alphas_cumprod_prev", to_torch(alphas_cumprod_prev))# 计算扩散 q(x_t | x_{t-1}) 和其他参数# 注册缓冲区,存储 sqrt_alphas_cumprodself.register_buffer("sqrt_alphas_cumprod", to_torch(np.sqrt(alphas_cumprod)))# 注册缓冲区,存储 sqrt_one_minus_alphas_cumprodself.register_buffer("sqrt_one_minus_alphas_cumprod", to_torch(np.sqrt(1.0 - alphas_cumprod)))# 注册缓冲区,存储 log_one_minus_alphas_cumprodself.register_buffer("log_one_minus_alphas_cumprod", to_torch(np.log(1.0 - alphas_cumprod)))# 注册缓冲区,存储 sqrt_recip_alphas_cumprodself.register_buffer("sqrt_recip_alphas_cumprod", to_torch(np.sqrt(1.0 / alphas_cumprod)))# 注册缓冲区,存储 sqrt_recipm1_alphas_cumprodself.register_buffer("sqrt_recipm1_alphas_cumprod", to_torch(np.sqrt(1.0 / alphas_cumprod - 1)))# 从初始值 x_start 和时间步 t 生成噪声样本def q_sample(self, x_start, t, noise):# 如果没有传入噪声,则生成一个与 x_start 形状相同的随机噪声noise = default(noise, lambda: torch.randn_like(x_start))# 根据噪声和调度表生成噪声样本return (extract_into_tensor(self.sqrt_alphas_cumprod, t, x_start.shape) * x_start+ extract_into_tensor(self.sqrt_one_minus_alphas_cumprod, t, x_start.shape)* noise)# 定义前向传播函数,接收输入 xdef forward(self, x):# 使用模型对输入 x 进行编码,得到潜在表示 zz = self.model.encode(x)# 检查 z 是否为对角高斯分布类型if isinstance(z, DiagonalGaussianDistribution):# 从高斯分布中采样,更新 zz = z.sample()# 将 z 乘以缩放因子,调整其大小z = z * self.scale_factor# 随机生成噪声水平,范围从 0 到 max_noise_level,形状与批大小相同noise_level = torch.randint(0, self.max_noise_level, (x.shape[0],), device=x.device).long()# 对 z 应用 q_sample 函数,根据噪声水平生成样本z = self.q_sample(z, noise_level)# 如果指定了输出大小,则调整 z 的尺寸if self.out_size is not None:z = torch.nn.functional.interpolate(z, size=self.out_size, mode="nearest")# z = z.repeat_interleave(2, -2).repeat_interleave(2, -1)  # 注释掉的代码:可能用于调整 z 的形状# 返回处理后的 z 和噪声水平return z, noise_level# 定义解码函数,接收潜在表示 zdef decode(self, z):# 将 z 除以缩放因子,恢复其原始尺度z = z / self.scale_factor# 使用模型对 z 进行解码,返回解码后的结果return self.model.decode(z)
# 定义一个多维时间步嵌入模型类,继承自抽象嵌入模型
class ConcatTimestepEmbedderND(AbstractEmbModel):"""嵌入每个维度并独立拼接它们"""# 初始化方法,接受输出维度参数def __init__(self, outdim):# 调用父类的初始化方法super().__init__()# 创建时间步嵌入对象self.timestep = Timestep(outdim)# 保存输出维度self.outdim = outdim# 前向传播方法,处理输入数据def forward(self, x):# 如果输入是1维,则增加一个维度if x.ndim == 1:x = x[:, None]# 确保输入为2维assert len(x.shape) == 2# 获取批大小和维度数量b, dims = x.shape[0], x.shape[1]# 重排输入数据为一维x = rearrange(x, "b d -> (b d)")# 获取时间步嵌入emb = self.timestep(x)# 重排嵌入为批大小和输出维度格式emb = rearrange(emb, "(b d) d2 -> b (d d2)", b=b, d=dims, d2=self.outdim)# 返回最终的嵌入return emb# 定义一个高斯编码器类,继承自编码器和抽象嵌入模型
class GaussianEncoder(Encoder, AbstractEmbModel):# 初始化方法,接受权重和是否扁平化输出参数def __init__(self, weight: float = 1.0, flatten_output: bool = True, *args, **kwargs):# 调用父类的初始化方法super().__init__(*args, **kwargs)# 创建对角高斯正则化器self.posterior = DiagonalGaussianRegularizer()# 保存权重self.weight = weight# 保存是否扁平化输出标志self.flatten_output = flatten_output# 前向传播方法,处理输入数据def forward(self, x) -> Tuple[Dict, torch.Tensor]:# 调用父类的前向传播,获取潜变量z = super().forward(x)# 通过正则化器处理潜变量z, log = self.posterior(z)# 记录损失和权重log["loss"] = log["kl_loss"]log["weight"] = self.weight# 如果需要,扁平化输出if self.flatten_output:z = rearrange(z, "b c h w -> b (h w ) c")# 返回日志和潜变量return log, z# 定义一个冻结的 OpenCLIP 图像预测嵌入模型类
class FrozenOpenCLIPImagePredictionEmbedder(AbstractEmbModel):# 初始化方法,接受配置、条件帧数和副本数def __init__(self,open_clip_embedding_config: Dict,n_cond_frames: int,n_copies: int,):# 调用父类的初始化方法super().__init__()# 保存条件帧数self.n_cond_frames = n_cond_frames# 保存副本数self.n_copies = n_copies# 实例化 OpenCLIP 嵌入对象self.open_clip = instantiate_from_config(open_clip_embedding_config)# 前向传播方法,处理视频输入def forward(self, vid):# 通过 OpenCLIP 嵌入视频数据vid = self.open_clip(vid)# 重排视频数据为批大小和时间步格式vid = rearrange(vid, "(b t) d -> b t d", t=self.n_cond_frames)# 重复视频数据以匹配副本数vid = repeat(vid, "b t d -> (b s) t d", s=self.n_copies)# 返回处理后的视频数据return vid# 定义一个原始图像嵌入模型类,继承自抽象嵌入模型
class RawImageEmbedder(AbstractEmbModel):"""在 Instructpix2pix 中将原始图像作为条件"""# 前向传播方法,直接返回输入图像def forward(self, image):return image

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/74923.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

CogView3---CogView-3Plus-微调代码源码解析-三-

CogView3 & CogView-3Plus 微调代码源码解析(三) .\cogview3-finetune\sat\sgm\modules\diffusionmodules\guiders.py # 导入 logging 模块,用于记录日志信息 import logging # 从 abc 模块导入 ABC 类和 abstractmethod 装饰器,用于定义抽象基类和抽象方法 from abc i…

CogView3---CogView-3Plus-微调代码源码解析-二-

CogView3 & CogView-3Plus 微调代码源码解析(二) .\cogview3-finetune\sat\sgm\models\__init__.py # 从同一模块导入 AutoencodingEngine 类,用于后续的自动编码器操作 from .autoencoder import AutoencodingEngine# 注释文本(可能是无关信息或标识符) #XuDwndGaCFo…

券后价复杂根源和解法

券后价领域划分不清楚 券后价在电商系统中是个很奇怪的存在 无论是按商品领域还是营销领域划分,它都不合适归类到这两者中间。结果就是券后价是个很不理想的拆分逻辑。 券后价可以理解是商品的价格属性,这个属性是由营销来计算控制。领域划分可以理解为商品领域,营销做计算!…

营销领域分析

用户与商品的连接用户购买商品是整个商业的基本盘。用户与商品是多对多关系,在这个基础之上就可衍生出许多行为。可以跟据商品的属性又可以设计各种运营方式。 用几个条件来归类交易产生的条件条件 人 物when 人什么时候需要商品 商品什么时候被需要why 人为什么需要商品 商品…

PbootCMS授权码可以更换域名吗? 授权码丢失怎么办?

授权码可以更换域名吗?不可以:授权码是绑定特定域名的,如果需要更换域名,建议重新获取新的授权码。授权码丢失怎么办?重新获取:如果授权码丢失,可以重新访问授权页面,输入相同的域名再次获取授权码。扫码添加技术【解决问题】专注中小企业网站建设、网站安全12年。熟悉…

PbootCMS授权码是否可以用于不同域名的子域名?是否可以用于不同域名的子目录?

授权码是否可以用于不同域名的子域名?不可以:授权码是绑定特定域名的,不支持不同域名的子域名。例如,sub1.example.com 和 sub2.anotherdomain.com 需要分别获取授权码。18. 授权码是否可以用于不同域名的子目录?不可以:授权码是绑定特定域名的,不支持不同域名的子目录。…

PbootCMS验证码不显示或显示不清楚怎么办

验证码不显示或显示不清楚问题描述:后台登录时验证码不显示或显示不清楚。 解决方案:避免使用中文路径:确保所有文件和目录名称均为英文或数字。 切换PHP版本:推荐使用PHP 7.3、7.2、5.6版本。 检查文件权限:确保验证码相关文件和目录具有适当的读写权限(通常为755或644)…

值得信赖的FTP替代方案有哪些,一文带你详细了解!

FTP(文件传输协议)因其传输速度慢、安全隐患、管理复杂性、稳定性不足以及审计难题等缺陷,使得企业在寻找更高效的替代方案时显得尤为迫切。 FTP替代方案有哪些,简单了解看下吧: 1、SFTP:SFTP是建立在SSH(Secure Shell)协议之上的文件传输协议,提供了数据传输的加密和…