CogView3---CogView-3Plus-微调代码源码解析-二-

news/2024/10/23 9:24:54

CogView3 & CogView-3Plus 微调代码源码解析(二)

.\cogview3-finetune\sat\sgm\models\__init__.py

# 从同一模块导入 AutoencodingEngine 类,用于后续的自动编码器操作
from .autoencoder import AutoencodingEngine# 注释文本(可能是无关信息或标识符)
#XuDwndGaCFo

.\cogview3-finetune\sat\sgm\modules\attention.py

# 导入数学库
import math
# 从 inspect 模块导入 isfunction 函数,用于检查对象是否为函数
from inspect import isfunction
# 导入 Any 和 Optional 类型
from typing import Any, Optional# 导入 PyTorch 库
import torch
# 导入 PyTorch 的功能性模块
import torch.nn.functional as F
# 从 einops 库导入 rearrange 和 repeat 函数
from einops import rearrange, repeat
# 导入版本管理工具
from packaging import version
# 导入 PyTorch 的神经网络模块
from torch import nn# 检查 PyTorch 版本是否大于或等于 2.0.0
if version.parse(torch.__version__) >= version.parse("2.0.0"):# 设置 SDP_IS_AVAILABLE 为 True,表示 SDP 后端可用SDP_IS_AVAILABLE = True# 从 PyTorch 导入 SDPBackend 和 sdp_kernelfrom torch.backends.cuda import SDPBackend, sdp_kernel# 定义后端映射字典,根据不同的后端配置相应的选项BACKEND_MAP = {SDPBackend.MATH: {"enable_math": True,"enable_flash": False,"enable_mem_efficient": False,},SDPBackend.FLASH_ATTENTION: {"enable_math": False,"enable_flash": True,"enable_mem_efficient": False,},SDPBackend.EFFICIENT_ATTENTION: {"enable_math": False,"enable_flash": False,"enable_mem_efficient": True,},None: {"enable_math": True, "enable_flash": True, "enable_mem_efficient": True},}
else:# 从上下文管理库导入 nullcontextfrom contextlib import nullcontext# 设置 SDP_IS_AVAILABLE 为 False,表示 SDP 后端不可用SDP_IS_AVAILABLE = False# 将 sdp_kernel 设置为 nullcontextsdp_kernel = nullcontext# 打印提示信息,告知用户当前 PyTorch 版本不支持 SDP 后端print(f"No SDP backend available, likely because you are running in pytorch versions < 2.0. In fact, "f"you are using PyTorch {torch.__version__}. You might want to consider upgrading.")# 尝试导入 xformers 和 xformers.ops
try:import xformersimport xformers.ops# 如果导入成功,设置 XFORMERS_IS_AVAILABLE 为 TrueXFORMERS_IS_AVAILABLE = True
# 如果导入失败,设置 XFORMERS_IS_AVAILABLE 为 False,并打印提示信息
except:XFORMERS_IS_AVAILABLE = Falseprint("no module 'xformers'. Processing without...")# 从 diffusionmodules.util 模块导入 checkpoint 函数
from .diffusionmodules.util import checkpoint# 定义 exists 函数,检查输入值是否存在
def exists(val):return val is not None# 定义 uniq 函数,返回数组中唯一元素的键
def uniq(arr):return {el: True for el in arr}.keys()# 定义 default 函数,如果 val 存在则返回它,否则返回默认值
def default(val, d):if exists(val):return valreturn d() if isfunction(d) else d# 定义 max_neg_value 函数,返回给定张量类型的最大负值
def max_neg_value(t):return -torch.finfo(t.dtype).max# 定义 init_ 函数,初始化张量
def init_(tensor):# 获取张量的最后一维的大小dim = tensor.shape[-1]# 计算标准差std = 1 / math.sqrt(dim)# 在区间 [-std, std] 内均匀初始化张量tensor.uniform_(-std, std)return tensor# 定义 GEGLU 类,继承自 nn.Module
class GEGLU(nn.Module):# 初始化方法,设置输入和输出维度def __init__(self, dim_in, dim_out):super().__init__()# 创建一个线性投影层,将输入维度映射到两倍的输出维度self.proj = nn.Linear(dim_in, dim_out * 2)# 前向传播方法def forward(self, x):# 将输入通过投影层,分割为 x 和 gatex, gate = self.proj(x).chunk(2, dim=-1)# 返回 x 与 gate 的 GELU 激活后的乘积return x * F.gelu(gate)# 定义 FeedForward 类,继承自 nn.Module
class FeedForward(nn.Module):# 初始化方法,设置维度、输出维度、乘法因子、是否使用 GEGLU 和 dropout 率def __init__(self, dim, dim_out=None, mult=4, glu=False, dropout=0.0):super().__init__()# 计算内部维度inner_dim = int(dim * mult)# 如果 dim_out 未定义,使用 dim 作为默认值dim_out = default(dim_out, dim)# 根据是否使用 GEGLU 创建输入投影层project_in = (nn.Sequential(nn.Linear(dim, inner_dim), nn.GELU())if not gluelse GEGLU(dim, inner_dim))# 定义网络结构self.net = nn.Sequential(project_in, nn.Dropout(dropout), nn.Linear(inner_dim, dim_out))# 前向传播方法def forward(self, x):# 通过网络结构处理输入return self.net(x)# 定义 zero_module 函数,清零模块的参数并返回该模块
def zero_module(module):"""Zero out the parameters of a module and return it."""# 遍历模块的所有参数for p in module.parameters():# 将参数的梯度断开并清零p.detach().zero_()return module# 定义 Normalize 函数,接收输入通道数
def Normalize(in_channels):# 返回一个 GroupNorm 实例,用于对输入进行分组归一化return torch.nn.GroupNorm(# 设置分组数量为 32num_groups=32, # 设置输入通道数量num_channels=in_channels, # 设置一个小的 epsilon 值以避免除零错误eps=1e-6, # 设定 affine 为 True,以便进行可学习的仿射变换affine=True)
# 定义线性注意力机制的类,继承自 nn.Module
class LinearAttention(nn.Module):# 初始化方法,设置参数维度和头数def __init__(self, dim, heads=4, dim_head=32):# 调用父类构造函数super().__init__()# 设置头数self.heads = heads# 计算隐藏维度hidden_dim = dim_head * heads# 定义输入到 QKV 的卷积层,输出通道为 hidden_dim 的三倍self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False)# 定义输出卷积层,输出通道为原始维度self.to_out = nn.Conv2d(hidden_dim, dim, 1)# 前向传播方法def forward(self, x):# 获取输入的批次大小、通道数、高度和宽度b, c, h, w = x.shape# 将输入通过 QKV 卷积层qkv = self.to_qkv(x)# 将 QKV 的输出重排以分开 Q、K、Vq, k, v = rearrange(qkv, "b (qkv heads c) h w -> qkv b heads c (h w)", heads=self.heads, qkv=3)# 对 K 进行 softmax 归一化k = k.softmax(dim=-1)# 计算上下文向量,通过爱因斯坦求和约定context = torch.einsum("bhdn,bhen->bhde", k, v)# 使用上下文向量和 Q 计算输出out = torch.einsum("bhde,bhdn->bhen", context, q)# 重排输出以恢复到原始形状out = rearrange(out, "b heads c (h w) -> b (heads c) h w", heads=self.heads, h=h, w=w)# 返回最终输出return self.to_out(out)# 定义空间自注意力机制的类,继承自 nn.Module
class SpatialSelfAttention(nn.Module):# 初始化方法,设置输入通道def __init__(self, in_channels):# 调用父类构造函数super().__init__()# 存储输入通道数self.in_channels = in_channels# 创建归一化层self.norm = Normalize(in_channels)# 创建 Q、K、V 的卷积层,输出通道与输入通道相同self.q = torch.nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1, padding=0)self.k = torch.nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1, padding=0)self.v = torch.nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1, padding=0)# 创建输出的卷积层self.proj_out = torch.nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1, padding=0)# 前向传播方法def forward(self, x):# 初始化 h_ 为输入 xh_ = x# 对 h_ 进行归一化处理h_ = self.norm(h_)# 计算 Q、K、Vq = self.q(h_)k = self.k(h_)v = self.v(h_)# 计算注意力权重b, c, h, w = q.shape# 将 Q 重排为 (batch_size, h*w, c) 的形状q = rearrange(q, "b c h w -> b (h w) c")# 将 K 重排为 (batch_size, c, h*w) 的形状k = rearrange(k, "b c h w -> b c (h w)")# 计算 Q 和 K 的点积以获得权重w_ = torch.einsum("bij,bjk->bik", q, k)# 对权重进行缩放w_ = w_ * (int(c) ** (-0.5))# 对权重进行 softmax 归一化w_ = torch.nn.functional.softmax(w_, dim=2)# 处理 Vv = rearrange(v, "b c h w -> b c (h w)")# 重排权重 w_ 为 (batch_size, h*w, h*w) 的形状w_ = rearrange(w_, "b i j -> b j i")# 计算 h_,通过 V 和权重相乘h_ = torch.einsum("bij,bjk->bik", v, w_)# 将 h_ 重排回原始形状h_ = rearrange(h_, "b c (h w) -> b c h w", h=h)# 通过 proj_out 进行最终的线性变换h_ = self.proj_out(h_)# 返回输入与处理后的 h_ 的和return x + h_# 定义交叉注意力机制的类,继承自 nn.Module
class CrossAttention(nn.Module):# 初始化方法,设置查询、上下文维度和其他参数def __init__(self,query_dim,context_dim=None,heads=8,dim_head=64,dropout=0.0,backend=None,):# 调用父类构造函数super().__init__()# 计算内部维度inner_dim = dim_head * heads# 如果没有提供上下文维度,默认使用查询维度context_dim = default(context_dim, query_dim)# 设置缩放因子self.scale = dim_head**-0.5# 设置头数self.heads = heads# 定义 Q、K、V 的线性变换self.to_q = nn.Linear(query_dim, inner_dim, bias=False)self.to_k = nn.Linear(context_dim, inner_dim, bias=False)self.to_v = nn.Linear(context_dim, inner_dim, bias=False)# 定义输出层,包括线性变换和 dropoutself.to_out = nn.Sequential(nn.Linear(inner_dim, query_dim), nn.Dropout(dropout))# 存储后端self.backend = backend# 定义前向传播函数,接收输入数据及相关参数def forward(self,x,context=None,mask=None,additional_tokens=None,n_times_crossframe_attn_in_self=0,):# 获取注意力头的数量h = self.heads# 如果有额外的 tokensif additional_tokens is not None:# 获取输出序列开始时的掩码 token 数量n_tokens_to_mask = additional_tokens.shape[1]# 将额外的 token 添加到输入数据前x = torch.cat([additional_tokens, x], dim=1)# 通过线性变换生成查询向量q = self.to_q(x)# 使用默认值或输入作为上下文context = default(context, x)# 通过线性变换生成键向量k = self.to_k(context)# 通过线性变换生成值向量v = self.to_v(context)# 如果需要进行跨帧注意力if n_times_crossframe_attn_in_self:# 验证输入批次大小可以被跨帧次数整除assert x.shape[0] % n_times_crossframe_attn_in_self == 0# 计算每次跨帧的批次大小n_cp = x.shape[0] // n_times_crossframe_attn_in_self# 重复键向量以适应跨帧注意力k = repeat(k[::n_times_crossframe_attn_in_self], "b ... -> (b n) ...", n=n_cp)# 重复值向量以适应跨帧注意力v = repeat(v[::n_times_crossframe_attn_in_self], "b ... -> (b n) ...", n=n_cp)# 将查询、键、值向量重排为适合多头注意力的形状q, k, v = map(lambda t: rearrange(t, "b n (h d) -> b h n d", h=h), (q, k, v))## old"""# 计算查询与键之间的相似度,并进行缩放sim = einsum('b i d, b j d -> b i j', q, k) * self.scale# 删除查询和键以节省内存del q, k# 如果存在掩码if exists(mask):# 将掩码重排为适合的形状mask = rearrange(mask, 'b ... -> b (...)')# 获取相似度的最大负值max_neg_value = -torch.finfo(sim.dtype).max# 重复掩码以适应多头mask = repeat(mask, 'b j -> (b h) () j', h=h)# 使用掩码填充相似度矩阵sim.masked_fill_(~mask, max_neg_value)# 应用 softmax 计算注意力权重sim = sim.softmax(dim=-1)# 使用注意力权重加权值向量,生成输出out = einsum('b i j, b j d -> b i d', sim, v)"""## new# 使用指定的后端进行缩放点积注意力计算with sdp_kernel(**BACKEND_MAP[self.backend]):# print("dispatching into backend", self.backend, "q/k/v shape: ", q.shape, k.shape, v.shape)out = F.scaled_dot_product_attention(q, k, v, attn_mask=mask)  # 默认缩放因子为 dim_head ** -0.5# 删除查询、键和值向量以释放内存del q, k, v# 将输出重排为适合最终输出的形状out = rearrange(out, "b h n d -> b n (h d)", h=h)# 如果有额外的 tokensif additional_tokens is not None:# 移除额外的 tokenout = out[:, n_tokens_to_mask:]# 返回最终的输出结果return self.to_out(out)
# 定义一个内存高效的交叉注意力模块,继承自 nn.Module
class MemoryEfficientCrossAttention(nn.Module):# 初始化方法,接受查询维度、上下文维度、头数、每个头的维度和丢弃率等参数def __init__(self, query_dim, context_dim=None, heads=8, dim_head=64, dropout=0.0, **kwargs):# 调用父类的初始化方法super().__init__()# 计算每个头的内部维度,等于头数乘以每个头的维度inner_dim = dim_head * heads# 如果上下文维度未提供,则将其设置为查询维度context_dim = default(context_dim, query_dim)# 保存头数和每个头的维度到实例变量self.heads = headsself.dim_head = dim_head# 创建线性层,将查询输入转换为内部维度,不使用偏置self.to_q = nn.Linear(query_dim, inner_dim, bias=False)# 创建线性层,将上下文输入转换为内部维度,不使用偏置self.to_k = nn.Linear(context_dim, inner_dim, bias=False)# 创建线性层,将上下文输入转换为内部维度,不使用偏置self.to_v = nn.Linear(context_dim, inner_dim, bias=False)# 创建一个顺序容器,包含将内部维度转换回查询维度的线性层和丢弃层self.to_out = nn.Sequential(nn.Linear(inner_dim, query_dim), nn.Dropout(dropout))# 初始化注意力操作为 Noneself.attention_op: Optional[Any] = None# 定义前向传播方法,接受输入、上下文、掩码和其他可选参数def forward(self,x,context=None,mask=None,additional_tokens=None,n_times_crossframe_attn_in_self=0,):# 检查是否提供了额外的令牌if additional_tokens is not None:# 获取输出序列开头的被遮掩令牌数量n_tokens_to_mask = additional_tokens.shape[1]# 将额外的令牌与当前输入合并x = torch.cat([additional_tokens, x], dim=1)# 将输入转换为查询向量q = self.to_q(x)# 使用默认值或输入作为上下文context = default(context, x)# 将上下文转换为键向量k = self.to_k(context)# 将上下文转换为值向量v = self.to_v(context)# 检查是否需要进行跨帧注意力的重新编程if n_times_crossframe_attn_in_self:# 进行跨帧注意力的重新编程,参考 https://arxiv.org/abs/2303.13439assert x.shape[0] % n_times_crossframe_attn_in_self == 0# k的维度处理,使用每n_times_crossframe_attn_in_self帧的一个k = repeat(k[::n_times_crossframe_attn_in_self],"b ... -> (b n) ...",n=n_times_crossframe_attn_in_self,)# v的维度处理,使用每n_times_crossframe_attn_in_self帧的一个v = repeat(v[::n_times_crossframe_attn_in_self],"b ... -> (b n) ...",n=n_times_crossframe_attn_in_self,)# 获取批次大小和特征维度b, _, _ = q.shape# 对 q, k, v 进行维度调整和重塑q, k, v = map(lambda t: t.unsqueeze(3)  # 在最后一维添加一个新维度.reshape(b, t.shape[1], self.heads, self.dim_head)  # 重塑为(batch, seq_len, heads, dim_head).permute(0, 2, 1, 3)  # 重新排列维度为(batch, heads, seq_len, dim_head).reshape(b * self.heads, t.shape[1], self.dim_head)  # 再次重塑为(batch * heads, seq_len, dim_head).contiguous(),  # 确保内存连续性(q, k, v),  # 对 q, k, v 进行相同处理)# 实际计算注意力,这个过程是最不可或缺的out = xformers.ops.memory_efficient_attention(q, k, v, attn_bias=None, op=self.attention_op)# TODO: 将这个直接用作注意力操作中的偏置if exists(mask):# 如果存在遮掩,抛出未实现异常raise NotImplementedError# 对输出进行维度调整,适配最终输出形状out = (out.unsqueeze(0)  # 在最前面添加一个新维度.reshape(b, self.heads, out.shape[1], self.dim_head)  # 重塑为(batch, heads, seq_len, dim_head).permute(0, 2, 1, 3)  # 重新排列维度为(batch, seq_len, heads, dim_head).reshape(b, out.shape[1], self.heads * self.dim_head)  # 再次重塑为(batch, seq_len, heads * dim_head))# 如果有额外的令牌,则移除它们if additional_tokens is not None:out = out[:, n_tokens_to_mask:]  # 切除被遮掩的令牌部分# 将输出转换为最终的输出格式return self.to_out(out)
# 定义基本的变换器模块类,继承自 nn.Module
class BasicTransformerBlock(nn.Module):# 定义可用的注意力模式,映射到对应的类ATTENTION_MODES = {"softmax": CrossAttention,  # 普通注意力"softmax-xformers": MemoryEfficientCrossAttention,  # 记忆高效注意力}# 初始化方法,设置模型的参数def __init__(self,dim,  # 输入的维度n_heads,  # 注意力头的数量d_head,  # 每个注意力头的维度dropout=0.0,  # dropout 比例context_dim=None,  # 上下文的维度gated_ff=True,  # 是否使用门控前馈网络checkpoint=True,  # 是否启用检查点disable_self_attn=False,  # 是否禁用自注意力attn_mode="softmax",  # 注意力模式,默认为 softmaxsdp_backend=None,  # 后端配置):# 调用父类初始化方法super().__init__()# 检查给定的注意力模式是否有效assert attn_mode in self.ATTENTION_MODES# 如果使用的注意力模式不支持,回退到默认模式if attn_mode != "softmax" and not XFORMERS_IS_AVAILABLE:print(f"Attention mode '{attn_mode}' is not available. Falling back to native attention. "f"This is not a problem in Pytorch >= 2.0. FYI, you are running with PyTorch version {torch.__version__}")attn_mode = "softmax"  # 回退到 softmax 模式# 如果使用普通注意力且不支持,给出提示并调整模式elif attn_mode == "softmax" and not SDP_IS_AVAILABLE:print("We do not support vanilla attention anymore, as it is too expensive. Sorry.")# 确保已安装 xformersif not XFORMERS_IS_AVAILABLE:assert (False), "Please install xformers via e.g. 'pip install xformers==0.0.16'"else:print("Falling back to xformers efficient attention.")attn_mode = "softmax-xformers"  # 使用 xformers 模式# 根据最终的注意力模式选择对应的类attn_cls = self.ATTENTION_MODES[attn_mode]# 检查 PyTorch 版本是否支持指定的后端if version.parse(torch.__version__) >= version.parse("2.0.0"):assert sdp_backend is None or isinstance(sdp_backend, SDPBackend)else:assert sdp_backend is None  # 对于旧版本,后端必须为 Noneself.disable_self_attn = disable_self_attn  # 保存禁用自注意力的设置# 初始化第一个注意力层self.attn1 = attn_cls(query_dim=dim,  # 查询的维度heads=n_heads,  # 注意力头数量dim_head=d_head,  # 每个头的维度dropout=dropout,  # dropout 比例context_dim=context_dim if self.disable_self_attn else None,  # 上下文维度backend=sdp_backend,  # 后端配置)  # 如果未禁用自注意力,则为自注意力层# 初始化前馈网络self.ff = FeedForward(dim, dropout=dropout, glu=gated_ff)  # 前馈网络配置# 初始化第二个注意力层self.attn2 = attn_cls(query_dim=dim,  # 查询的维度context_dim=context_dim,  # 上下文维度heads=n_heads,  # 注意力头数量dim_head=d_head,  # 每个头的维度dropout=dropout,  # dropout 比例backend=sdp_backend,  # 后端配置)  # 如果上下文维度为 None,则为自注意力层# 初始化层归一化层self.norm1 = nn.LayerNorm(dim)  # 第一个归一化层self.norm2 = nn.LayerNorm(dim)  # 第二个归一化层self.norm3 = nn.LayerNorm(dim)  # 第三个归一化层self.checkpoint = checkpoint  # 保存检查点设置# 如果启用检查点,输出相关信息(代码暂时注释掉)# if self.checkpoint:#     print(f"{self.__class__.__name__} is using checkpointing")# 前向传播方法,定义输入和上下文的处理def forward(self, x,  # 输入数据context=None,  # 上下文数据additional_tokens=None,  # 额外的 tokenn_times_crossframe_attn_in_self=0  # 跨帧自注意力的次数):# 创建一个字典 kwargs,初始包含键 "x" 和参数 x 的值kwargs = {"x": x}# 如果 context 参数不为 None,则将其添加到 kwargs 字典中if context is not None:kwargs.update({"context": context})# 如果 additional_tokens 参数不为 None,则将其添加到 kwargs 字典中if additional_tokens is not None:kwargs.update({"additional_tokens": additional_tokens})# 如果 n_times_crossframe_attn_in_self 为真,则将其添加到 kwargs 字典中if n_times_crossframe_attn_in_self:kwargs.update({"n_times_crossframe_attn_in_self": n_times_crossframe_attn_in_self})# 返回调用 checkpoint 函数,传入 _forward 方法和相关参数return checkpoint(self._forward, (x, context), self.parameters(), self.checkpoint)def _forward(# 定义 _forward 方法,接收 x、context、additional_tokens 和 n_times_crossframe_attn_in_self 参数self, x, context=None, additional_tokens=None, n_times_crossframe_attn_in_self=0):# 对 x 进行规范化,然后通过 attn1 方法进行自注意力计算,并根据条件选择 context 和其他参数x = (self.attn1(self.norm1(x),context=context if self.disable_self_attn else None,additional_tokens=additional_tokens,n_times_crossframe_attn_in_self=n_times_crossframe_attn_in_selfif not self.disable_self_attnelse 0,)+ x  # 将 self.attn1 的输出与原始 x 相加)# 继续对 x 进行规范化,通过 attn2 方法进行自注意力计算x = (self.attn2(self.norm2(x), context=context, additional_tokens=additional_tokens)+ x  # 将 self.attn2 的输出与当前 x 相加)# 对 x 进行规范化,然后通过前馈网络 ff 处理,再与原始 x 相加x = self.ff(self.norm3(x)) + x# 返回处理后的 xreturn x
# 定义基本的单层变换器块类,继承自 nn.Module
class BasicTransformerSingleLayerBlock(nn.Module):# 定义不同的注意力模式及其对应的类ATTENTION_MODES = {"softmax": CrossAttention,  # 标准注意力"softmax-xformers": MemoryEfficientCrossAttention  # 针对 A100s 的优化版本,速度可能略慢# (todo 可能依赖于 head_dim,需检查,对于 dim!=[16,32,64,128] 时退回到半优化内核)}# 初始化方法,设置基本参数def __init__(self,dim,  # 特征维度n_heads,  # 注意力头数d_head,  # 每个注意力头的维度dropout=0.0,  # 丢弃率context_dim=None,  # 上下文维度(可选)gated_ff=True,  # 是否使用门控前馈网络checkpoint=True,  # 是否启用检查点attn_mode="softmax",  # 注意力模式):# 调用父类构造函数super().__init__()# 确保所选的注意力模式在定义的模式中assert attn_mode in self.ATTENTION_MODES# 获取对应的注意力类attn_cls = self.ATTENTION_MODES[attn_mode]# 初始化注意力层self.attn1 = attn_cls(query_dim=dim,  # 查询维度heads=n_heads,  # 头数dim_head=d_head,  # 每头的维度dropout=dropout,  # 丢弃率context_dim=context_dim,  # 上下文维度)# 初始化前馈网络self.ff = FeedForward(dim, dropout=dropout, glu=gated_ff)# 初始化层归一化self.norm1 = nn.LayerNorm(dim)self.norm2 = nn.LayerNorm(dim)# 设置检查点标志self.checkpoint = checkpoint# 前向传播方法def forward(self, x, context=None):# 使用检查点机制来进行前向传播return checkpoint(self._forward, (x, context), self.parameters(), self.checkpoint)# 实际的前向传播实现def _forward(self, x, context=None):# 通过注意力层进行处理并添加残差连接x = self.attn1(self.norm1(x), context=context) + x# 通过前馈网络处理并添加残差连接x = self.ff(self.norm2(x)) + x# 返回处理后的结果return x# 定义空间变换器类,继承自 nn.Module
class SpatialTransformer(nn.Module):"""适用于图像数据的变换器块。首先,将输入(即嵌入)投影并重塑为 b, t, d 形状。然后应用标准的变换器操作。最后,重塑为图像。新增:使用线性层提高效率,而不是 1x1 卷积。"""# 初始化方法,设置变换器块的基本参数def __init__(self,in_channels,  # 输入通道数n_heads,  # 注意力头数d_head,  # 每个头的维度depth=1,  # 变换器块的深度dropout=0.0,  # 丢弃率context_dim=None,  # 上下文维度(可选)disable_self_attn=False,  # 是否禁用自注意力use_linear=False,  # 是否使用线性层attn_type="softmax",  # 注意力类型use_checkpoint=True,  # 是否使用检查点# sdp_backend=SDPBackend.FLASH_ATTENTION  # 可选的 SDP 后端sdp_backend=None,  # SDP 后端设置(默认值为 None)# 初始化类,调用父类构造函数):super().__init__()# 打印构造信息(已注释)# print(#     f"constructing {self.__class__.__name__} of depth {depth} w/ {in_channels} channels and {n_heads} heads"# )# 从 omegaconf 导入 ListConfig 类from omegaconf import ListConfig# 如果 context_dim 存在且不是列表或 ListConfig 类型if exists(context_dim) and not isinstance(context_dim, (list, ListConfig)):# 将 context_dim 转换为列表context_dim = [context_dim]# 如果 context_dim 存在且是列表if exists(context_dim) and isinstance(context_dim, list):# 检查 depth 是否与 context_dim 的长度匹配if depth != len(context_dim):# 打印警告信息(已注释)# print(#     f"WARNING: {self.__class__.__name__}: Found context dims {context_dim} of depth {len(context_dim)}, "#     f"which does not match the specified 'depth' of {depth}. Setting context_dim to {depth * [context_dim[0]]} now."# )# 确保所有 context_dim 元素相同assert all(map(lambda x: x == context_dim[0], context_dim)), "need homogenous context_dim to match depth automatically"# 如果不一致,设置 context_dim 为相同值的列表context_dim = depth * [context_dim[0]]# 如果 context_dim 为 Noneelif context_dim is None:# 创建与 depth 长度相同的 None 列表context_dim = [None] * depth# 保存输入通道数self.in_channels = in_channels# 计算内部维度inner_dim = n_heads * d_head# 归一化层self.norm = Normalize(in_channels)# 如果不使用线性层if not use_linear:# 使用卷积层进行输入投影self.proj_in = nn.Conv2d(in_channels, inner_dim, kernel_size=1, stride=1, padding=0)else:# 使用线性层进行输入投影self.proj_in = nn.Linear(in_channels, inner_dim)# 创建变压器模块列表self.transformer_blocks = nn.ModuleList([BasicTransformerBlock(inner_dim,n_heads,d_head,dropout=dropout,context_dim=context_dim[d],disable_self_attn=disable_self_attn,attn_mode=attn_type,checkpoint=use_checkpoint,sdp_backend=sdp_backend,)# 遍历深度范围,生成多个变压器块for d in range(depth)])# 如果不使用线性层if not use_linear:# 使用零初始化卷积层进行输出投影self.proj_out = zero_module(nn.Conv2d(inner_dim, in_channels, kernel_size=1, stride=1, padding=0))else:# 使用零初始化线性层进行输出投影(已注释)# self.proj_out = zero_module(nn.Linear(in_channels, inner_dim))self.proj_out = zero_module(nn.Linear(inner_dim, in_channels))# 保存是否使用线性层的标志self.use_linear = use_linear# 定义前向传播函数,接收输入 x 和可选的上下文 contextdef forward(self, x, context=None):# 注意:如果没有提供上下文,交叉注意力默认为自注意力if not isinstance(context, list):# 将上下文包装为列表,方便后续处理context = [context]# 获取输入张量的形状:批量大小 b,通道数 c,高 h,宽 wb, c, h, w = x.shape# 保存输入张量的原始值以便后续使用x_in = x# 对输入进行归一化处理x = self.norm(x)# 如果不使用线性变换,则进行投影变换if not self.use_linear:x = self.proj_in(x)# 重新排列张量的维度,将其从 (b, c, h, w) 变为 (b, h*w, c)x = rearrange(x, "b c h w -> b (h w) c").contiguous()# 如果使用线性变换,则再次进行投影变换if self.use_linear:x = self.proj_in(x)# 遍历所有的变换块for i, block in enumerate(self.transformer_blocks):# 如果不是第一个块且上下文长度为1,则使用同一个上下文if i > 0 and len(context) == 1:i = 0  # 每个块使用相同的上下文# 将输入传入当前变换块,并使用相应的上下文x = block(x, context=context[i])# 如果使用线性变换,则进行输出投影变换if self.use_linear:x = self.proj_out(x)# 重新排列张量的维度,将其从 (b, h*w, c) 变为 (b, c, h, w)x = rearrange(x, "b (h w) c -> b c h w", h=h, w=w).contiguous()# 如果不使用线性变换,则进行输出投影变换if not self.use_linear:x = self.proj_out(x)# 返回处理后的张量与原始输入的和return x + x_in

.\cogview3-finetune\sat\sgm\modules\autoencoding\losses\__init__.py

# 导入类型提示 Any 和 Union
from typing import Any, Union# 导入 PyTorch 及其神经网络模块
import torch
import torch.nn as nn
# 从 einops 导入 rearrange 函数
from einops import rearrange# 导入自定义工具函数和类
from ....util import default, instantiate_from_config
# 从 lpips 库导入 LPIPS 损失类
from ..lpips.loss.lpips import LPIPS
# 从 lpips 模型导入 NLayerDiscriminator 和权重初始化函数
from ..lpips.model.model import NLayerDiscriminator, weights_init
# 从 vqperceptual 模块导入两种损失函数
from ..lpips.vqperceptual import hinge_d_loss, vanilla_d_loss# 定义 adopt_weight 函数,调整权重值
def adopt_weight(weight, global_step, threshold=0, value=0.0):# 如果全局步数小于阈值,则将权重设为给定值if global_step < threshold:weight = value# 返回调整后的权重return weight# 定义 LatentLPIPS 类,继承自 nn.Module
class LatentLPIPS(nn.Module):# 初始化方法,设置相关参数def __init__(self,decoder_config,perceptual_weight=1.0,latent_weight=1.0,scale_input_to_tgt_size=False,scale_tgt_to_input_size=False,perceptual_weight_on_inputs=0.0,):# 调用父类构造方法super().__init__()# 设置输入大小缩放标志self.scale_input_to_tgt_size = scale_input_to_tgt_sizeself.scale_tgt_to_input_size = scale_tgt_to_input_size# 初始化解码器self.init_decoder(decoder_config)# 初始化感知损失模型,并设置为评估模式self.perceptual_loss = LPIPS().eval()# 设置感知损失和潜在损失的权重self.perceptual_weight = perceptual_weightself.latent_weight = latent_weight# 设置对输入的感知权重self.perceptual_weight_on_inputs = perceptual_weight_on_inputs# 定义初始化解码器的方法def init_decoder(self, config):# 从配置实例化解码器self.decoder = instantiate_from_config(config)# 如果解码器有 encoder 属性,则删除该属性if hasattr(self.decoder, "encoder"):del self.decoder.encoder# 定义前向传播函数,接收潜在输入、潜在预测、图像输入和数据集切分信息def forward(self, latent_inputs, latent_predictions, image_inputs, split="train"):# 初始化一个字典用于记录日志信息log = dict()# 计算潜在输入与潜在预测之间的均方差损失loss = (latent_inputs - latent_predictions) ** 2# 将均方差损失的平均值添加到日志中,使用切分名称作为键log[f"{split}/latent_l2_loss"] = loss.mean().detach()# 初始化图像重建变量image_reconstructions = None# 如果感知损失权重大于0,则进行感知损失的计算if self.perceptual_weight > 0.0:# 解码潜在预测生成图像重建image_reconstructions = self.decoder.decode(latent_predictions)# 解码潜在输入生成目标图像image_targets = self.decoder.decode(latent_inputs)# 计算感知损失perceptual_loss = self.perceptual_loss(image_targets.contiguous(), image_reconstructions.contiguous())# 综合潜在损失和感知损失,更新总损失loss = (self.latent_weight * loss.mean()+ self.perceptual_weight * perceptual_loss.mean())# 将感知损失的平均值添加到日志中log[f"{split}/perceptual_loss"] = perceptual_loss.mean().detach()# 如果感知权重在输入上大于0,则进行相应的处理if self.perceptual_weight_on_inputs > 0.0:# 如果重建图像为空,则解码潜在预测生成图像重建image_reconstructions = default(image_reconstructions, self.decoder.decode(latent_predictions))# 如果需要将输入图像缩放到目标图像大小if self.scale_input_to_tgt_size:image_inputs = torch.nn.functional.interpolate(image_inputs,image_reconstructions.shape[2:],mode="bicubic",  # 使用双三次插值法antialias=True,  # 使用抗锯齿)# 如果需要将目标图像缩放到输入图像大小elif self.scale_tgt_to_input_size:image_reconstructions = torch.nn.functional.interpolate(image_reconstructions,image_inputs.shape[2:],mode="bicubic",  # 使用双三次插值法antialias=True,  # 使用抗锯齿)# 计算与输入图像的感知损失perceptual_loss2 = self.perceptual_loss(image_inputs.contiguous(), image_reconstructions.contiguous())# 更新总损失,加入输入的感知损失loss = loss + self.perceptual_weight_on_inputs * perceptual_loss2.mean()# 将输入的感知损失的平均值添加到日志中log[f"{split}/perceptual_loss_on_inputs"] = perceptual_loss2.mean().detach()# 返回总损失和日志信息return loss, log
# 定义一个带有判别器的通用 LPIPS 类,继承自 nn.Module
class GeneralLPIPSWithDiscriminator(nn.Module):# 初始化方法,接收多个参数以配置模型def __init__(self,disc_start: int,  # 判别器开始训练的迭代次数logvar_init: float = 0.0,  # 日志方差的初始值pixelloss_weight=1.0,  # 像素损失的权重disc_num_layers: int = 3,  # 判别器的层数disc_in_channels: int = 3,  # 判别器输入的通道数disc_factor: float = 1.0,  # 判别器的缩放因子disc_weight: float = 1.0,  # 判别器损失的权重perceptual_weight: float = 1.0,  # 感知损失的权重disc_loss: str = "hinge",  # 判别器使用的损失类型scale_input_to_tgt_size: bool = False,  # 是否将输入缩放到目标大小dims: int = 2,  # 数据的维度learn_logvar: bool = False,  # 是否学习日志方差regularization_weights: Union[None, dict] = None,  # 正则化权重):# 调用父类的初始化方法super().__init__()self.dims = dims  # 保存维度信息# 如果维度大于2,打印警告信息if self.dims > 2:print(f"running with dims={dims}. This means that for perceptual loss calculation, "f"the LPIPS loss will be applied to each frame independently. ")self.scale_input_to_tgt_size = scale_input_to_tgt_size  # 保存输入缩放标志# 确保判别器损失类型为 hinge 或 vanillaassert disc_loss in ["hinge", "vanilla"]self.pixel_weight = pixelloss_weight  # 保存像素损失权重self.perceptual_loss = LPIPS().eval()  # 初始化 LPIPS 感知损失并设置为评估模式self.perceptual_weight = perceptual_weight  # 保存感知损失权重# 输出日志方差,作为可学习的参数self.logvar = nn.Parameter(torch.ones(size=()) * logvar_init)self.learn_logvar = learn_logvar  # 保存是否学习日志方差的标志# 初始化 NLayerDiscriminator 作为判别器self.discriminator = NLayerDiscriminator(input_nc=disc_in_channels, n_layers=disc_num_layers, use_actnorm=False).apply(weights_init)  # 应用权重初始化self.discriminator_iter_start = disc_start  # 保存判别器开始训练的迭代次数# 根据损失类型选择合适的损失函数self.disc_loss = hinge_d_loss if disc_loss == "hinge" else vanilla_d_lossself.disc_factor = disc_factor  # 保存判别器缩放因子self.discriminator_weight = disc_weight  # 保存判别器损失权重self.regularization_weights = default(regularization_weights, {})  # 设置正则化权重,默认为空字典# 获取可训练的参数def get_trainable_parameters(self) -> Any:return self.discriminator.parameters()  # 返回判别器的参数# 获取可训练的自编码器参数def get_trainable_autoencoder_parameters(self) -> Any:if self.learn_logvar:  # 如果学习日志方差yield self.logvar  # 生成日志方差yield from ()  # 返回空生成器# 计算自适应权重def calculate_adaptive_weight(self, nll_loss, g_loss, last_layer=None):if last_layer is not None:  # 如果提供了最后一层# 计算负对数似然损失的梯度nll_grads = torch.autograd.grad(nll_loss, last_layer, retain_graph=True)[0]# 计算生成器损失的梯度g_grads = torch.autograd.grad(g_loss, last_layer, retain_graph=True)[0]else:# 如果没有提供最后一层,使用类属性中的最后一层nll_grads = torch.autograd.grad(nll_loss, self.last_layer[0], retain_graph=True)[0]g_grads = torch.autograd.grad(g_loss, self.last_layer[0], retain_graph=True)[0]# 计算判别器权重d_weight = torch.norm(nll_grads) / (torch.norm(g_grads) + 1e-4)d_weight = torch.clamp(d_weight, 0.0, 1e4).detach()  # 限制权重范围并分离计算图d_weight = d_weight * self.discriminator_weight  # 应用判别器权重return d_weight  # 返回计算得到的权重# 前向传播方法def forward(self,regularization_log,  # 正则化日志inputs,  # 输入数据reconstructions,  # 重建数据optimizer_idx,  # 优化器索引global_step,  # 全局步数last_layer=None,  # 最后一层(可选)split="train",  # 数据集划分(训练或验证)weights=None,  # 权重(可选)

.\cogview3-finetune\sat\sgm\modules\autoencoding\lpips\loss\lpips.py

# 从 https://github.com/richzhang/PerceptualSimilarity/tree/master/models 中剥离的版本
"""Stripped version of https://github.com/richzhang/PerceptualSimilarity/tree/master/models"""# 从 collections 模块导入 namedtuple
from collections import namedtuple# 导入 PyTorch 和神经网络模块
import torch
import torch.nn as nn
# 从 torchvision 导入模型
from torchvision import models# 从上层目录的 util 模块导入获取检查点路径的函数
from ..util import get_ckpt_path# 定义 LPIPS 类,继承自 nn.Module
class LPIPS(nn.Module):# 学习的感知度量def __init__(self, use_dropout=True):# 调用父类构造函数super().__init__()# 初始化缩放层self.scaling_layer = ScalingLayer()# 定义特征通道数量,针对 VGG16 特征self.chns = [64, 128, 256, 512, 512]  # vg16 features# 加载预训练的 VGG16 模型,不计算其梯度self.net = vgg16(pretrained=True, requires_grad=False)# 为每个通道初始化线性层self.lin0 = NetLinLayer(self.chns[0], use_dropout=use_dropout)self.lin1 = NetLinLayer(self.chns[1], use_dropout=use_dropout)self.lin2 = NetLinLayer(self.chns[2], use_dropout=use_dropout)self.lin3 = NetLinLayer(self.chns[3], use_dropout=use_dropout)self.lin4 = NetLinLayer(self.chns[4], use_dropout=use_dropout)# 从预训练模型中加载权重self.load_from_pretrained()# 禁用所有参数的梯度更新for param in self.parameters():param.requires_grad = False# 从预训练模型加载权重的方法def load_from_pretrained(self, name="vgg_lpips"):# 获取检查点文件的路径ckpt = get_ckpt_path(name, "sgm/modules/autoencoding/lpips/loss")# 加载权重到当前模型中,严格匹配状态字典self.load_state_dict(torch.load(ckpt, map_location=torch.device("cpu")), strict=False)# 打印加载的模型路径print("loaded pretrained LPIPS loss from {}".format(ckpt))# 类方法,用于从预训练模型创建实例@classmethoddef from_pretrained(cls, name="vgg_lpips"):# 检查模型名称是否有效if name != "vgg_lpips":raise NotImplementedError# 创建当前类的实例model = cls()# 获取检查点路径ckpt = get_ckpt_path(name)# 加载权重到模型中model.load_state_dict(torch.load(ckpt, map_location=torch.device("cpu")), strict=False)# 返回模型实例return model# 前向传播方法def forward(self, input, target):# 对输入和目标进行缩放处理in0_input, in1_input = (self.scaling_layer(input), self.scaling_layer(target))# 通过网络提取特征outs0, outs1 = self.net(in0_input), self.net(in1_input)# 初始化特征和差异字典feats0, feats1, diffs = {}, {}, {}# 收集线性层lins = [self.lin0, self.lin1, self.lin2, self.lin3, self.lin4]# 遍历特征通道for kk in range(len(self.chns)):# 标准化特征并计算差异feats0[kk], feats1[kk] = normalize_tensor(outs0[kk]), normalize_tensor(outs1[kk])diffs[kk] = (feats0[kk] - feats1[kk]) ** 2# 计算每个通道的结果res = [spatial_average(lins[kk].model(diffs[kk]), keepdim=True)for kk in range(len(self.chns))]# 初始化最终值val = res[0]# 累加结果for l in range(1, len(self.chns)):val += res[l]# 返回最终的值return val# 定义缩放层类,继承自 nn.Module
class ScalingLayer(nn.Module):# 构造函数def __init__(self):# 调用父类构造函数super(ScalingLayer, self).__init__()# 注册偏移量的缓冲区self.register_buffer("shift", torch.Tensor([-0.030, -0.088, -0.188])[None, :, None, None])# 注册缩放因子的缓冲区self.register_buffer("scale", torch.Tensor([0.458, 0.448, 0.450])[None, :, None, None])# 前向传播方法def forward(self, inp):# 进行缩放操作return (inp - self.shift) / self.scale# 定义线性层类,继承自 nn.Module
class NetLinLayer(nn.Module):"""一个单独的线性层,执行 1x1 卷积"""# 初始化网络层,接收输入通道数、输出通道数和是否使用 dropoutdef __init__(self, chn_in, chn_out=1, use_dropout=False):# 调用父类的初始化方法super(NetLinLayer, self).__init__()# 根据是否使用 dropout 创建层列表layers = ([nn.Dropout(),  # 添加 dropout 层]if (use_dropout)  # 如果使用 dropoutelse []  # 否则为空列表)# 添加卷积层到层列表,输入通道为 chn_in,输出通道为 chn_out,卷积核大小为 1layers += [nn.Conv2d(chn_in, chn_out, 1, stride=1, padding=0, bias=False),]# 将层列表包装成一个顺序容器,便于按顺序调用各层self.model = nn.Sequential(*layers)
# 定义一个 VGG16 类,继承自 PyTorch 的 Module 类
class vgg16(torch.nn.Module):# 初始化函数,接受是否需要梯度和是否使用预训练模型的参数def __init__(self, requires_grad=False, pretrained=True):# 调用父类的初始化函数super(vgg16, self).__init__()# 获取预训练的 VGG16 特征提取层vgg_pretrained_features = models.vgg16(pretrained=pretrained).features# 定义五个序列层self.slice1 = torch.nn.Sequential()self.slice2 = torch.nn.Sequential()self.slice3 = torch.nn.Sequential()self.slice4 = torch.nn.Sequential()self.slice5 = torch.nn.Sequential()# 设置切片的数量为 5self.N_slices = 5# 将前4层添加到 slice1 中for x in range(4):self.slice1.add_module(str(x), vgg_pretrained_features[x])# 将第 4 到 8 层添加到 slice2 中for x in range(4, 9):self.slice2.add_module(str(x), vgg_pretrained_features[x])# 将第 9 到 15 层添加到 slice3 中for x in range(9, 16):self.slice3.add_module(str(x), vgg_pretrained_features[x])# 将第 16 到 22 层添加到 slice4 中for x in range(16, 23):self.slice4.add_module(str(x), vgg_pretrained_features[x])# 将第 23 到 29 层添加到 slice5 中for x in range(23, 30):self.slice5.add_module(str(x), vgg_pretrained_features[x])# 如果不需要梯度,则冻结所有参数if not requires_grad:for param in self.parameters():param.requires_grad = False# 定义前向传播函数def forward(self, X):# 将输入通过 slice1 层h = self.slice1(X)# 保存第一层的输出h_relu1_2 = h# 将输出通过 slice2 层h = self.slice2(h)# 保存第二层的输出h_relu2_2 = h# 将输出通过 slice3 层h = self.slice3(h)# 保存第三层的输出h_relu3_3 = h# 将输出通过 slice4 层h = self.slice4(h)# 保存第四层的输出h_relu4_3 = h# 将输出通过 slice5 层h = self.slice5(h)# 保存第五层的输出h_relu5_3 = h# 创建一个命名元组来存储不同层的输出vgg_outputs = namedtuple("VggOutputs", ["relu1_2", "relu2_2", "relu3_3", "relu4_3", "relu5_3"])# 将各层输出组合成一个元组out = vgg_outputs(h_relu1_2, h_relu2_2, h_relu3_3, h_relu4_3, h_relu5_3)# 返回元组return out# 定义一个函数,用于规范化张量
def normalize_tensor(x, eps=1e-10):# 计算张量的 L2 范数norm_factor = torch.sqrt(torch.sum(x**2, dim=1, keepdim=True))# 返回规范化后的张量,避免除以零return x / (norm_factor + eps)# 定义一个空间平均函数
def spatial_average(x, keepdim=True):# 在高和宽维度上计算平均值return x.mean([2, 3], keepdim=keepdim)

.\cogview3-finetune\sat\sgm\modules\autoencoding\lpips\loss\__init__.py


.\cogview3-finetune\sat\sgm\modules\autoencoding\lpips\model\model.py

# 导入 functools 模块,用于函数工具
import functools# 导入 nn 模块,用于构建神经网络
import torch.nn as nn# 从上级目录的 util 模块导入 ActNorm
from ..util import ActNorm# 定义权重初始化函数
def weights_init(m):# 获取模块的类名classname = m.__class__.__name__# 如果类名包含 "Conv",则初始化卷积层的权重if classname.find("Conv") != -1:nn.init.normal_(m.weight.data, 0.0, 0.02)# 如果类名包含 "BatchNorm",则初始化批归一化层的权重和偏置elif classname.find("BatchNorm") != -1:nn.init.normal_(m.weight.data, 1.0, 0.02)nn.init.constant_(m.bias.data, 0)# 定义 NLayerDiscriminator 类,继承自 nn.Module
class NLayerDiscriminator(nn.Module):"""定义一个 PatchGAN 判别器,如 Pix2Pix 所示--> 参见 https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix/blob/master/models/networks.py"""# 构造函数,初始化参数def __init__(self, input_nc=3, ndf=64, n_layers=3, use_actnorm=False):"""构造一个 PatchGAN 判别器参数:input_nc (int)  -- 输入图像的通道数ndf (int)       -- 最后一个卷积层中的滤波器数量n_layers (int)  -- 判别器中的卷积层数量norm_layer      -- 归一化层"""# 调用父类的构造函数super(NLayerDiscriminator, self).__init__()# 根据是否使用 ActNorm 来选择归一化层if not use_actnorm:norm_layer = nn.BatchNorm2delse:norm_layer = ActNorm# 如果归一化层是 functools.partial 类型,判断是否使用偏置if (type(norm_layer) == functools.partial):  # 不需要使用偏置,因为 BatchNorm2d 有仿射参数use_bias = norm_layer.func != nn.BatchNorm2delse:use_bias = norm_layer != nn.BatchNorm2dkw = 4  # 卷积核的大小padw = 1  # 卷积层的填充# 初始化序列,添加第一个卷积层和 LeakyReLU 激活函数sequence = [nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw),nn.LeakyReLU(0.2, True),]nf_mult = 1  # 当前滤波器倍数nf_mult_prev = 1  # 上一层的滤波器倍数# 逐渐增加滤波器数量,构建后续的卷积层for n in range(1, n_layers):  # 逐渐增加滤波器数量nf_mult_prev = nf_multnf_mult = min(2**n, 8)  # 滤波器倍数最大为 8sequence += [nn.Conv2d(ndf * nf_mult_prev,  # 输入通道数ndf * nf_mult,  # 输出通道数kernel_size=kw,  # 卷积核大小stride=2,  # 步幅padding=padw,  # 填充bias=use_bias,  # 是否使用偏置),norm_layer(ndf * nf_mult),  # 添加归一化层nn.LeakyReLU(0.2, True),  # 添加 LeakyReLU 激活函数]nf_mult_prev = nf_mult  # 更新上一个滤波器倍数nf_mult = min(2**n_layers, 8)  # 计算最终滤波器倍数sequence += [nn.Conv2d(ndf * nf_mult_prev,  # 输入通道数ndf * nf_mult,  # 输出通道数kernel_size=kw,  # 卷积核大小stride=1,  # 步幅padding=padw,  # 填充bias=use_bias,  # 是否使用偏置),norm_layer(ndf * nf_mult),  # 添加归一化层nn.LeakyReLU(0.2, True),  # 添加 LeakyReLU 激活函数]sequence += [nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw)]  # 输出 1 通道的预测图# 将所有层组合成一个序列self.main = nn.Sequential(*sequence)# 定义前向传播函数def forward(self, input):"""标准前向传播。"""return self.main(input)  # 将输入传入序列并返回输出

.\cogview3-finetune\sat\sgm\modules\autoencoding\lpips\model\__init__.py


.\cogview3-finetune\sat\sgm\modules\autoencoding\lpips\util.py

# 导入所需的库
import hashlib  # 导入 hashlib 库用于计算文件的 MD5 哈希
import os  # 导入 os 库用于文件和目录操作import requests  # 导入 requests 库用于发送 HTTP 请求
import torch  # 导入 PyTorch 库用于深度学习
import torch.nn as nn  # 导入 nn 模块用于构建神经网络
from tqdm import tqdm  # 导入 tqdm 库用于显示进度条# 定义模型 URL 映射字典
URL_MAP = {"vgg_lpips": "https://heibox.uni-heidelberg.de/f/607503859c864bc1b30b/?dl=1"}# 定义模型检查点文件名映射字典
CKPT_MAP = {"vgg_lpips": "vgg.pth"}# 定义模型 MD5 哈希值映射字典
MD5_MAP = {"vgg_lpips": "d507d7349b931f0638a25a48a722f98a"}# 定义下载函数
def download(url, local_path, chunk_size=1024):# 创建本地路径的父目录,如果不存在则创建os.makedirs(os.path.split(local_path)[0], exist_ok=True)# 发送 GET 请求以流式下载文件with requests.get(url, stream=True) as r:# 获取响应头中的内容长度total_size = int(r.headers.get("content-length", 0))# 使用 tqdm 显示下载进度条with tqdm(total=total_size, unit="B", unit_scale=True) as pbar:# 以二进制写入模式打开本地文件with open(local_path, "wb") as f:# 分块读取响应内容for data in r.iter_content(chunk_size=chunk_size):# 如果读取到数据,则写入文件if data:f.write(data)  # 写入数据到文件pbar.update(chunk_size)  # 更新进度条# 定义 MD5 哈希函数
def md5_hash(path):# 以二进制读取模式打开指定路径的文件with open(path, "rb") as f:content = f.read()  # 读取文件内容# 返回文件内容的 MD5 哈希值return hashlib.md5(content).hexdigest()# 定义获取检查点路径的函数
def get_ckpt_path(name, root, check=False):# 确保给定的模型名称在 URL 映射中assert name in URL_MAP# 组合根目录和检查点文件名,形成完整路径path = os.path.join(root, CKPT_MAP[name])# 检查文件是否存在或是否需要重新下载if not os.path.exists(path) or (check and not md5_hash(path) == MD5_MAP[name]):# 打印下载信息print("Downloading {} model from {} to {}".format(name, URL_MAP[name], path))download(URL_MAP[name], path)  # 下载文件md5 = md5_hash(path)  # 计算下载文件的 MD5 哈希值# 确保下载的文件 MD5 值与预期匹配assert md5 == MD5_MAP[name], md5return path  # 返回检查点路径# 定义一个自定义的神经网络模块
class ActNorm(nn.Module):# 构造函数,初始化参数def __init__(self, num_features, logdet=False, affine=True, allow_reverse_init=False):assert affine  # 确保启用仿射变换super().__init__()  # 调用父类构造函数self.logdet = logdet  # 保存 logdet 标志# 定义可学习的均值参数self.loc = nn.Parameter(torch.zeros(1, num_features, 1, 1))# 定义可学习的缩放参数self.scale = nn.Parameter(torch.ones(1, num_features, 1, 1))self.allow_reverse_init = allow_reverse_init  # 保存是否允许反向初始化标志# 注册一个缓冲区,用于记录初始化状态self.register_buffer("initialized", torch.tensor(0, dtype=torch.uint8))# 定义初始化函数def initialize(self, input):with torch.no_grad():  # 在不计算梯度的上下文中# 将输入张量重排列并展平flatten = input.permute(1, 0, 2, 3).contiguous().view(input.shape[1], -1)# 计算展平后的均值mean = (flatten.mean(1).unsqueeze(1).unsqueeze(2).unsqueeze(3).permute(1, 0, 2, 3))# 计算展平后的标准差std = (flatten.std(1).unsqueeze(1).unsqueeze(2).unsqueeze(3).permute(1, 0, 2, 3))# 将均值复制到 loc 参数self.loc.data.copy_(-mean)# 将标准差的倒数复制到 scale 参数self.scale.data.copy_(1 / (std + 1e-6))# 定义前向传播函数,接受输入和反向标志def forward(self, input, reverse=False):# 如果反向标志为真,调用反向函数处理输入if reverse:return self.reverse(input)# 检查输入的形状是否为二维if len(input.shape) == 2:# 将二维输入扩展为四维,增加两个新的维度input = input[:, :, None, None]squeeze = True  # 标记为需要压缩else:squeeze = False  # 不需要压缩# 解包输入的高度和宽度_, _, height, width = input.shape# 如果处于训练状态且尚未初始化if self.training and self.initialized.item() == 0:# 初始化参数self.initialize(input)# 标记为已初始化self.initialized.fill_(1)# 根据比例因子和位移量调整输入h = self.scale * (input + self.loc)# 如果需要压缩,移除最后两个维度if squeeze:h = h.squeeze(-1).squeeze(-1)# 如果需要计算对数行列式if self.logdet:# 计算比例因子的绝对值的对数log_abs = torch.log(torch.abs(self.scale))# 计算对数行列式的值logdet = height * width * torch.sum(log_abs)# 生成与批量大小相同的对数行列式张量logdet = logdet * torch.ones(input.shape[0]).to(input)# 返回调整后的输出和对数行列式return h, logdet# 返回调整后的输出return h# 定义反向传播函数,接受输出def reverse(self, output):# 如果处于训练状态且尚未初始化if self.training and self.initialized.item() == 0:# 如果不允许在反向方向初始化,则抛出错误if not self.allow_reverse_init:raise RuntimeError("Initializing ActNorm in reverse direction is ""disabled by default. Use allow_reverse_init=True to enable.")else:# 初始化参数self.initialize(output)# 标记为已初始化self.initialized.fill_(1)# 检查输出的形状是否为二维if len(output.shape) == 2:# 将二维输出扩展为四维,增加两个新的维度output = output[:, :, None, None]squeeze = True  # 标记为需要压缩else:squeeze = False  # 不需要压缩# 根据比例因子和位移量调整输出h = output / self.scale - self.loc# 如果需要压缩,移除最后两个维度if squeeze:h = h.squeeze(-1).squeeze(-1)# 返回调整后的输出return h

.\cogview3-finetune\sat\sgm\modules\autoencoding\lpips\vqperceptual.py

# 导入 PyTorch 库
import torch
# 导入 PyTorch 中的功能性模块
import torch.nn.functional as F# 定义对抗训练中的判别器损失函数(hinge 损失)
def hinge_d_loss(logits_real, logits_fake):# 计算真实样本的损失,使用 ReLU 激活函数loss_real = torch.mean(F.relu(1.0 - logits_real))# 计算假样本的损失,使用 ReLU 激活函数loss_fake = torch.mean(F.relu(1.0 + logits_fake))# 计算总的判别器损失,取真实和假样本损失的平均值d_loss = 0.5 * (loss_real + loss_fake)# 返回判别器损失return d_loss# 定义对抗训练中的另一种判别器损失函数(vanilla 损失)
def vanilla_d_loss(logits_real, logits_fake):# 计算总的判别器损失,使用 softplus 函数d_loss = 0.5 * (torch.mean(torch.nn.functional.softplus(-logits_real))  # 计算真实样本的 softplus 损失+ torch.mean(torch.nn.functional.softplus(logits_fake))   # 计算假样本的 softplus 损失)# 返回判别器损失return d_loss

.\cogview3-finetune\sat\sgm\modules\autoencoding\lpips\__init__.py

请提供需要添加注释的代码片段。

.\cogview3-finetune\sat\sgm\modules\autoencoding\regularizers\__init__.py

# 导入抽象方法的库
from abc import abstractmethod
# 导入任意类型和元组类型
from typing import Any, Tuple# 导入 PyTorch 库
import torch
# 导入神经网络模块
import torch.nn as nn
# 导入功能模块
import torch.nn.functional as F# 导入对角高斯分布类
from ....modules.distributions.distributions import DiagonalGaussianDistribution# 定义抽象正则化器类,继承自 nn.Module
class AbstractRegularizer(nn.Module):# 初始化方法def __init__(self):super().__init__()  # 调用父类构造方法# 前向传播方法,接受一个张量,返回一个张量和字典def forward(self, z: torch.Tensor) -> Tuple[torch.Tensor, dict]:raise NotImplementedError()  # 抛出未实现错误# 抽象方法,获取可训练的参数@abstractmethoddef get_trainable_parameters(self) -> Any:raise NotImplementedError()  # 抛出未实现错误# 定义对角高斯正则化器类,继承自 AbstractRegularizer
class DiagonalGaussianRegularizer(AbstractRegularizer):# 初始化方法,接受一个布尔参数,默认值为 Truedef __init__(self, sample: bool = True):super().__init__()  # 调用父类构造方法self.sample = sample  # 存储是否采样的参数# 获取可训练参数的方法,返回一个生成器def get_trainable_parameters(self) -> Any:yield from ()  # 生成器为空,表示没有可训练参数# 前向传播方法,接受一个张量,返回一个张量和字典def forward(self, z: torch.Tensor) -> Tuple[torch.Tensor, dict]:log = dict()  # 创建一个空字典,用于记录日志posterior = DiagonalGaussianDistribution(z)  # 创建对角高斯分布实例if self.sample:  # 如果需要采样z = posterior.sample()  # 从后验分布中采样else:  # 如果不需要采样z = posterior.mode()  # 取后验分布的众数kl_loss = posterior.kl()  # 计算 Kullback-Leibler 散度损失kl_loss = torch.sum(kl_loss) / kl_loss.shape[0]  # 计算均值损失log["kl_loss"] = kl_loss  # 将 KL 损失记录到日志中return z, log  # 返回采样或众数以及日志# 定义测量困惑度的函数,接受预测的索引和质心数量
def measure_perplexity(predicted_indices, num_centroids):# src: https://github.com/karpathy/deep-vector-quantization/blob/main/model.py# 评估聚类的困惑度。当困惑度等于 num_embeddings 时,所有聚类被完全均匀使用encodings = (F.one_hot(predicted_indices, num_centroids).float().reshape(-1, num_centroids)  # 将预测索引转换为独热编码)avg_probs = encodings.mean(0)  # 计算每个质心的平均概率perplexity = (-(avg_probs * torch.log(avg_probs + 1e-10)).sum()).exp()  # 计算困惑度cluster_use = torch.sum(avg_probs > 0)  # 计算被使用的聚类数量return perplexity, cluster_use  # 返回困惑度和聚类使用情况

.\cogview3-finetune\sat\sgm\modules\autoencoding\__init__.py

请提供需要注释的代码,我将帮助你添加详细的注释。

.\cogview3-finetune\sat\sgm\modules\diffusionmodules\denoiser.py

# 导入所需的类型定义,便于后续使用
from typing import Dict, Union# 导入 PyTorch 及其神经网络模块
import torch
import torch.nn as nn# 从上层目录导入实用工具函数
from ...util import append_dims, instantiate_from_config# 定义去噪器类,继承自 nn.Module
class Denoiser(nn.Module):# 初始化方法,接受加权配置和缩放配置def __init__(self, weighting_config, scaling_config):# 调用父类构造函数super().__init__()# 根据加权配置实例化加权模块self.weighting = instantiate_from_config(weighting_config)# 根据缩放配置实例化缩放模块self.scaling = instantiate_from_config(scaling_config)# 可能对 sigma 进行量化的占位符方法def possibly_quantize_sigma(self, sigma):return sigma  # 返回未修改的 sigma# 可能对噪声 c_noise 进行量化的占位符方法def possibly_quantize_c_noise(self, c_noise):return c_noise  # 返回未修改的 c_noise# 计算加权后的 sigma 值def w(self, sigma):return self.weighting(sigma)  # 返回加权后的 sigma# 前向传播方法,定义模型如何处理输入def forward(self,network: nn.Module,  # 网络模型input: torch.Tensor,  # 输入张量sigma: torch.Tensor,  # sigma 张量cond: Dict,  # 条件字典**additional_model_inputs,  # 其他模型输入) -> torch.Tensor:# 可能对 sigma 进行量化sigma = self.possibly_quantize_sigma(sigma)# 获取 sigma 的形状sigma_shape = sigma.shape# 调整 sigma 的维度,以匹配输入的维度sigma = append_dims(sigma, input.ndim)# 使用缩放模块处理 sigma 和额外模型输入,获取多个输出c_skip, c_out, c_in, c_noise = self.scaling(sigma, **additional_model_inputs)# 可能对噪声 c_noise 进行量化,并调整形状c_noise = self.possibly_quantize_c_noise(c_noise.reshape(sigma_shape))# 返回经过网络处理的结果,结合输入和噪声return (network(input * c_in, c_noise, cond, **additional_model_inputs) * c_out+ input * c_skip)# 定义离散去噪器类,继承自 Denoiser
class DiscreteDenoiser(Denoiser):# 初始化方法,接受多个配置参数def __init__(self,weighting_config,  # 加权配置scaling_config,  # 缩放配置num_idx,  # 索引数量discretization_config,  # 离散化配置do_append_zero=False,  # 是否附加零quantize_c_noise=True,  # 是否量化 c_noiseflip=True,  # 是否翻转):# 调用父类构造函数super().__init__(weighting_config, scaling_config)# 根据离散化配置实例化 sigmasigmas = instantiate_from_config(discretization_config)(num_idx, do_append_zero=do_append_zero, flip=flip)# 保存 sigma 值self.sigmas = sigmas# self.register_buffer("sigmas", sigmas)  # (可选)注册一个持久化的缓冲区# 设置是否量化 c_noiseself.quantize_c_noise = quantize_c_noise# 将 sigma 转换为索引的方法def sigma_to_idx(self, sigma):# 计算 sigma 与 sigma 列表的距离dists = sigma - self.sigmas.to(sigma.device)[:, None]# 返回距离最小的索引return dists.abs().argmin(dim=0).view(sigma.shape)# 将索引转换为 sigma 的方法def idx_to_sigma(self, idx):return self.sigmas.to(idx.device)[idx]  # 根据索引返回对应的 sigma# 可能对 sigma 进行量化的重写方法def possibly_quantize_sigma(self, sigma):return self.idx_to_sigma(self.sigma_to_idx(sigma))  # 返回量化后的 sigma# 可能对噪声 c_noise 进行量化的重写方法def possibly_quantize_c_noise(self, c_noise):if self.quantize_c_noise:  # 如果选择量化 c_noisereturn self.sigma_to_idx(c_noise)  # 返回量化后的索引else:return c_noise  # 返回未修改的 c_noise

.\cogview3-finetune\sat\sgm\modules\diffusionmodules\denoiser_scaling.py

# 从 abc 模块导入抽象基类和抽象方法
from abc import ABC, abstractmethod
# 从 typing 模块导入任意类型和元组
from typing import Any, Tuple# 导入 PyTorch 库
import torch# 定义一个抽象基类 DenoiserScaling
class DenoiserScaling(ABC):# 定义一个抽象方法 __call__@abstractmethoddef __call__(self, sigma: torch.Tensor, **additional_model_inputs) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:# 抽象方法没有具体实现pass# 定义 EDMScaling 类
class EDMScaling:# 初始化方法,接受一个 sigma 数据,默认值为 0.5def __init__(self, sigma_data: float = 0.5):# 将 sigma_data 保存为实例变量self.sigma_data = sigma_data# 定义一个可调用方法 __call__def __call__(self, sigma: torch.Tensor, **additional_model_inputs) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:# 计算 c_skip 的值c_skip = self.sigma_data**2 / (sigma**2 + self.sigma_data**2)# 计算 c_out 的值c_out = sigma * self.sigma_data / (sigma**2 + self.sigma_data**2) ** 0.5# 计算 c_in 的值c_in = 1 / (sigma**2 + self.sigma_data**2) ** 0.5# 计算 c_noise 的值c_noise = 0.25 * sigma.log()# 返回四个计算结果return c_skip, c_out, c_in, c_noise# 定义 EpsScaling 类
class EpsScaling:# 定义一个可调用方法 __call__def __call__(self, sigma: torch.Tensor, **additional_model_inputs) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:# 创建与 sigma 相同形状的全 1 张量作为 c_skipc_skip = torch.ones_like(sigma, device=sigma.device)# 计算 c_out 的值为 -sigmac_out = -sigma# 计算 c_in 的值c_in = 1 / (sigma**2 + 1.0) ** 0.5# 复制 sigma 作为 c_noisec_noise = sigma.clone()# 返回四个计算结果return c_skip, c_out, c_in, c_noise# 定义 VScaling 类
class VScaling:# 定义一个可调用方法 __call__def __call__(self, sigma: torch.Tensor, **additional_model_inputs) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:# 计算 c_skip 的值c_skip = 1.0 / (sigma**2 + 1.0)# 计算 c_out 的值c_out = -sigma / (sigma**2 + 1.0) ** 0.5# 计算 c_in 的值c_in = 1.0 / (sigma**2 + 1.0) ** 0.5# 复制 sigma 作为 c_noisec_noise = sigma.clone()# 返回四个计算结果return c_skip, c_out, c_in, c_noise# 定义 VScalingWithEDMcNoise 类,继承自 DenoiserScaling
class VScalingWithEDMcNoise(DenoiserScaling):# 定义一个可调用方法 __call__def __call__(self, sigma: torch.Tensor, **additional_model_inputs) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:# 计算 c_skip 的值c_skip = 1.0 / (sigma**2 + 1.0)# 计算 c_out 的值c_out = -sigma / (sigma**2 + 1.0) ** 0.5# 计算 c_in 的值c_in = 1.0 / (sigma**2 + 1.0) ** 0.5# 计算 c_noise 的值c_noise = 0.25 * sigma.log()# 返回四个计算结果return c_skip, c_out, c_in, c_noise# 定义 ZeroSNRScaling 类,类似于 VScaling
class ZeroSNRScaling: # similar to VScaling# 定义一个可调用方法 __call__def __call__(self, alphas_cumprod_sqrt: torch.Tensor, **additional_model_inputs) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:# 将 alphas_cumprod_sqrt 作为 c_skipc_skip = alphas_cumprod_sqrt# 计算 c_out 的值c_out = - (1 - alphas_cumprod_sqrt**2) ** 0.5# 创建与 alphas_cumprod_sqrt 相同形状的全 1 张量作为 c_inc_in = torch.ones_like(alphas_cumprod_sqrt, device=alphas_cumprod_sqrt.device)# 复制额外输入的 'idx' 作为 c_noisec_noise = additional_model_inputs['idx'].clone()# 返回四个计算结果return c_skip, c_out, c_in, c_noise

.\cogview3-finetune\sat\sgm\modules\diffusionmodules\denoiser_weighting.py

# 导入 PyTorch 库
import torch# 定义一个单位加权类
class UnitWeighting:# 定义可调用方法,接收参数 sigmadef __call__(self, sigma):# 返回与 sigma 形状相同的全 1 张量,设备与 sigma 相同return torch.ones_like(sigma, device=sigma.device)# 定义 EDM 加权类
class EDMWeighting:# 初始化方法,设置 sigma_data 的默认值为 0.5def __init__(self, sigma_data=0.5):# 将传入的 sigma_data 存储为实例变量self.sigma_data = sigma_data# 定义可调用方法,接收参数 sigmadef __call__(self, sigma):# 根据公式计算加权值并返回return (sigma**2 + self.sigma_data**2) / (sigma * self.sigma_data) ** 2# 定义 V 加权类,继承自 EDMWeighting
class VWeighting(EDMWeighting):# 初始化方法def __init__(self):# 调用父类初始化方法,设置 sigma_data 为 1.0super().__init__(sigma_data=1.0)# 定义 Eps 加权类
class EpsWeighting:# 定义可调用方法,接收参数 sigmadef __call__(self, sigma):# 返回 sigma 的 -2 次幂return sigma**-2.0

.\cogview3-finetune\sat\sgm\modules\diffusionmodules\discretizer.py

# 从 abc 模块导入抽象方法,用于定义抽象基类
from abc import abstractmethod
# 从 functools 模块导入 partial,用于创建偏函数
from functools import partial# 导入 numpy 库,主要用于数值计算
import numpy as np
# 导入 torch 库,主要用于深度学习和张量操作
import torch# 从自定义模块中导入 make_beta_schedule 函数,用于生成 beta 时间表
from ...modules.diffusionmodules.util import make_beta_schedule
# 从自定义模块中导入 append_zero 函数,用于处理数组
from ...util import append_zero# 定义函数 generate_roughly_equally_spaced_steps,用于生成大致均匀间隔的步骤
def generate_roughly_equally_spaced_steps(num_substeps: int, max_step: int
) -> np.ndarray:# 生成从 max_step-1 到 0 的均匀间隔数组,包含 num_substeps 个元素,并将结果反转return np.linspace(max_step - 1, 0, num_substeps, endpoint=False).astype(int)[::-1]# 定义函数 sub_generate_roughly_equally_spaced_steps,用于生成两个子步骤的均匀间隔
def sub_generate_roughly_equally_spaced_steps(num_substeps_1: int, num_substeps_2: int, max_step: int
) -> np.ndarray:# 生成第二组子步骤的均匀间隔substeps_2 = np.linspace(max_step - 1, 0, num_substeps_2, endpoint=False).astype(int)[::-1]# 生成第一组子步骤的均匀间隔substeps_1 = np.linspace(num_substeps_2 - 1, 0, num_substeps_1, endpoint=False).astype(int)[::-1]# 返回根据第一组子步骤索引获取第二组子步骤的数组return substeps_2[substeps_1]# 定义离散化的抽象基类 Discretization
class Discretization:# 定义可调用方法,接受 n、do_append_zero、device 和 flip 参数def __call__(self, n, do_append_zero=True, device="cpu", flip=False):# 调用 get_sigmas 方法获取 sigma 值sigmas = self.get_sigmas(n, device=device)# 如果 do_append_zero 为真,向 sigmas 添加零sigmas = append_zero(sigmas) if do_append_zero else sigmas# 根据 flip 参数决定是否反转 sigmasreturn sigmas if not flip else torch.flip(sigmas, (0,))# 定义抽象方法 get_sigmas,必须在子类中实现@abstractmethoddef get_sigmas(self, n, device):pass# 定义 EDMDiscretization 类,继承自 Discretization
class EDMDiscretization(Discretization):# 初始化方法,设置 sigma 的最小值、最大值和 rho 值def __init__(self, sigma_min=0.002, sigma_max=80.0, rho=7.0):self.sigma_min = sigma_minself.sigma_max = sigma_maxself.rho = rho# 实现抽象方法 get_sigmas,计算 sigma 值def get_sigmas(self, n, device="cpu"):# 生成从 0 到 1 的等间隔张量ramp = torch.linspace(0, 1, n, device=device)# 计算最小和最大 rho 的倒数min_inv_rho = self.sigma_min ** (1 / self.rho)max_inv_rho = self.sigma_max ** (1 / self.rho)# 根据公式计算 sigmassigmas = (max_inv_rho + ramp * (min_inv_rho - max_inv_rho)) ** self.rho# 返回计算得到的 sigmasreturn sigmas# 定义 LegacyDDPMDiscretization 类,继承自 Discretization
class LegacyDDPMDiscretization(Discretization):# 初始化方法,设置线性开始、结束和时间步数def __init__(self,linear_start=0.00085,linear_end=0.0120,num_timesteps=1000,):# 调用父类初始化方法super().__init__()self.num_timesteps = num_timesteps# 使用 make_beta_schedule 生成 beta 时间表betas = make_beta_schedule("linear", num_timesteps, linear_start=linear_start, linear_end=linear_end)# 计算 alphasalphas = 1.0 - betas# 计算累积的 alphasself.alphas_cumprod = np.cumprod(alphas, axis=0)# 创建将 numpy 数组转换为 torch 张量的偏函数self.to_torch = partial(torch.tensor, dtype=torch.float32)# 实现抽象方法 get_sigmas,根据 n 计算 sigma 值def get_sigmas(self, n, device="cpu"):# 如果 n 小于总时间步数if n < self.num_timesteps:# 生成大致均匀间隔的时间步timesteps = generate_roughly_equally_spaced_steps(n, self.num_timesteps)# 获取对应的累积 alphasalphas_cumprod = self.alphas_cumprod[timesteps]# 如果 n 等于总时间步数elif n == self.num_timesteps:alphas_cumprod = self.alphas_cumprod# 如果 n 大于总时间步数,抛出异常else:raise ValueError# 创建将 numpy 数组转换为 torch 张量的偏函数,指定设备to_torch = partial(torch.tensor, dtype=torch.float32, device=device)# 计算 sigmas 值sigmas = to_torch((1 - alphas_cumprod) / alphas_cumprod) ** 0.5# 返回反转的 sigmasreturn torch.flip(sigmas, (0,)) # sigma_t: 14.4 -> 0.029# 定义 ZeroSNRDDPMDiscretization 类,继承自 Discretization
class ZeroSNRDDPMDiscretization(Discretization):# 初始化方法,设置线性开始、结束、时间步数和 shift_scaledef __init__(self,linear_start=0.00085,linear_end=0.0120,num_timesteps=1000,shift_scale=1.,# 初始化父类):super().__init__()# 设置时间步数self.num_timesteps = num_timesteps# 生成线性调度的 beta 值betas = make_beta_schedule("linear", num_timesteps, linear_start=linear_start, linear_end=linear_end)# 计算 alpha 值alphas = 1.0 - betas# 计算累积的 alpha 值self.alphas_cumprod = np.cumprod(alphas, axis=0)# 将数据转换为 torch 张量的函数self.to_torch = partial(torch.tensor, dtype=torch.float32)# 对累积的 alpha 值进行缩放self.alphas_cumprod = self.alphas_cumprod / (shift_scale + (1-shift_scale) * self.alphas_cumprod)# 获取 sigma 值的方法def get_sigmas(self, n, device="cpu", return_idx=False):# 判断请求的时间步数是否小于总时间步数if n < self.num_timesteps:# 生成等间距的时间步timesteps = generate_roughly_equally_spaced_steps(n, self.num_timesteps)# 获取对应的累积 alpha 值alphas_cumprod = self.alphas_cumprod[timesteps]# 判断请求的时间步数是否等于总时间步数elif n == self.num_timesteps:alphas_cumprod = self.alphas_cumprod# 如果超出范围,抛出异常else:raise ValueError# 将 alpha 值转换为 torch 张量to_torch = partial(torch.tensor, dtype=torch.float32, device=device)alphas_cumprod = to_torch(alphas_cumprod)# 计算累积 alpha 值的平方根alphas_cumprod_sqrt = alphas_cumprod.sqrt()# 克隆初始和最终的 alpha 值alphas_cumprod_sqrt_0 = alphas_cumprod_sqrt[0].clone()alphas_cumprod_sqrt_T = alphas_cumprod_sqrt[-1].clone()# 对平方根进行归一化处理alphas_cumprod_sqrt -= alphas_cumprod_sqrt_Talphas_cumprod_sqrt *= alphas_cumprod_sqrt_0 / (alphas_cumprod_sqrt_0 - alphas_cumprod_sqrt_T)# 根据返回标志返回结果if return_idx:return torch.flip(alphas_cumprod_sqrt, (0,)), timestepselse:# 返回反转的平方根 alpha 值return torch.flip(alphas_cumprod_sqrt, (0,)) # sqrt(alpha_t): 0 -> 0.99# 使对象可调用的方法def __call__(self, n, do_append_zero=True, device="cpu", flip=False, return_idx=False):# 根据返回标志调用获取 sigma 值的方法if return_idx:sigmas, idx = self.get_sigmas(n, device=device, return_idx=True)sigmas = append_zero(sigmas) if do_append_zero else sigmas# 根据 flip 标志返回结果return (sigmas, idx) if not flip else (torch.flip(sigmas, (0,)), torch.flip(idx, (0,)))else:# 获取 sigma 值并处理sigmas = self.get_sigmas(n, device=device)sigmas = append_zero(sigmas) if do_append_zero else sigmas# 根据 flip 标志返回结果return sigmas if not flip else torch.flip(sigmas, (0,))

.\cogview3-finetune\sat\sgm\modules\diffusionmodules\dit.py

# 从 omegaconf 导入 DictConfig 类,用于配置管理
from omegaconf import DictConfig
# 从 functools 导入 partial 函数,用于偏函数应用
from functools import partial
# 从 einops 导入 rearrange 函数,用于重排张量
from einops import rearrange
# 导入 numpy 库,用于数值计算
import numpy as np# 导入 PyTorch 库
import torch
# 从 torch 导入 nn 模块,包含神经网络构建相关的工具
from torch import nn
# 导入 PyTorch 的分布式训练模块
import torch.distributed# 从 sat.model.base_model 导入 BaseModel 类,作为模型基类
from sat.model.base_model import BaseModel
# 从 sat.model.mixins 导入 BaseMixin 类,用于混入模型功能
from sat.model.mixins import BaseMixin
# 从 sat.ops.layernorm 导入 LayerNorm 类,用于层归一化
from sat.ops.layernorm import LayerNorm
# 从 sat.transformer_defaults 导入默认的 hooks 和注意力函数
from sat.transformer_defaults import HOOKS_DEFAULT, attention_fn_default
# 从 sat.mpu.utils 导入用于张量分割的工具
from sat.mpu.utils import split_tensor_along_last_dim
# 从 sgm.util 导入一些工具函数
from sgm.util import (disabled_train,          # 禁用训练的装饰器instantiate_from_config, # 从配置实例化对象
)# 从 sgm.modules.diffusionmodules.openaimodel 导入时间步类
from sgm.modules.diffusionmodules.openaimodel import Timestep
# 从 sgm.modules.diffusionmodules.util 导入卷积、线性层和时间步嵌入等工具
from sgm.modules.diffusionmodules.util import (conv_nd,                # 多维卷积函数linear,                 # 线性变换函数timestep_embedding,     # 时间步嵌入函数
)# 定义调制函数,接受输入张量、偏移量和缩放因子
def modulate(x, shift, scale):# 根据缩放因子和偏移量对输入张量进行调制return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1)# 定义解patch化函数,将补丁张量恢复为图像张量
def unpatchify(x, channels, patch_size, height, width):# 使用 rearrange 函数将补丁张量重排为图像张量x = rearrange(x,"b (h w) (c p1 p2) -> b c (h p1) (w p2)",  # 指定输入和输出的张量维度h=height // patch_size,  # 计算行数w=width // patch_size,    # 计算列数p1=patch_size,            # 每个补丁的高度p2=patch_size,            # 每个补丁的宽度)# 返回解patch化后的图像张量return x# 定义图像补丁嵌入混入类,继承自 BaseMixin
class ImagePatchEmbeddingMixin(BaseMixin):# 初始化函数,设置输入通道、隐藏层大小、补丁大小等属性def __init__(self,in_channels,hidden_size,patch_size,text_hidden_size=None,do_rearrange=True,):# 调用父类初始化super().__init__()# 设置输入通道数self.in_channels = in_channels# 设置隐藏层大小self.hidden_size = hidden_size# 设置补丁大小self.patch_size = patch_size# 设置文本隐藏层大小(如果有的话)self.text_hidden_size = text_hidden_size# 设置是否重排张量的标志self.do_rearrange = do_rearrange# 初始化线性层,将补丁的通道数映射到隐藏层大小self.proj = nn.Linear(in_channels * patch_size ** 2, hidden_size)# 如果提供了文本隐藏层大小,则初始化相应的线性层if text_hidden_size is not None:self.text_proj = nn.Linear(text_hidden_size, hidden_size)# 定义词嵌入前向传播函数,接受输入ID、图像和编码器输出def word_embedding_forward(self, input_ids, images, encoder_outputs, **kwargs):# images: B x C x H x W,表示批量图像的形状# 如果需要重排图像张量if self.do_rearrange:# 使用 rearrange 函数将图像重排为补丁格式patches_images = rearrange(images, "b c (h p1) (w p2) -> b (h w) (c p1 p2)",  # 指定输入和输出的张量维度p1=self.patch_size,  # 每个补丁的高度p2=self.patch_size,  # 每个补丁的宽度)else:# 否则直接使用原始图像张量patches_images = images# 通过线性层对补丁图像进行映射emb = self.proj(patches_images)# 如果有文本隐藏层大小if self.text_hidden_size is not None:# 对编码器输出进行线性映射text_emb = self.text_proj(encoder_outputs)# 将文本嵌入与图像嵌入在维度1上进行连接emb = torch.cat([text_emb, emb], dim=1)# 返回最终的嵌入结果return emb# 定义重新初始化函数def reinit(self, parent_model=None):# 获取线性层的权重w = self.proj.weight.data# 使用 Xavier 均匀分布初始化权重nn.init.xavier_uniform_(self.proj.weight)# 将偏置初始化为零nn.init.constant_(self.proj.bias, 0)# 删除 transformer 的词嵌入del self.transformer.word_embeddings# 定义获取 2D 正弦余弦位置嵌入的函数
def get_2d_sincos_pos_embed(embed_dim, grid_height, grid_width, cls_token=False, extra_tokens=0):"""grid_size: int of the grid height and widthreturn:pos_embed: [grid_size*grid_size, embed_dim] or [1+grid_size*grid_size, embed_dim] (w/ or w/o cls_token)"""# 创建一个表示网格高度的数组grid_h = np.arange(grid_height, dtype=np.float32)# 创建一个表示网格宽度的数组grid_w = np.arange(grid_width, dtype=np.float32)# 生成网格坐标,宽度优先grid = np.meshgrid(grid_w, grid_h)  # here w goes first# 将网格数组堆叠到一个新的维度grid = np.stack(grid, axis=0)# 将网格重塑为 [2, 1, grid_height, grid_width] 形状grid = grid.reshape([2, 1, grid_height, grid_width])# 从网格生成二维正弦余弦位置嵌入pos_embed = get_2d_sincos_pos_embed_from_grid(embed_dim, grid)# 如果需要分类标记且额外的标记数大于0,则进行处理if cls_token and extra_tokens > 0:# 在位置嵌入前添加额外的零嵌入,形成新的位置嵌入pos_embed = np.concatenate([np.zeros([extra_tokens, embed_dim]), pos_embed], axis=0)# 返回最终的位置嵌入return pos_embed
# 从网格生成二维正弦余弦位置嵌入
def get_2d_sincos_pos_embed_from_grid(embed_dim, grid):# 确保嵌入维度是偶数assert embed_dim % 2 == 0# 使用一半的维度编码网格高度emb_h = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0])  # (H*W, D/2)# 使用一半的维度编码网格宽度emb_w = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1])  # (H*W, D/2)# 将高度和宽度的嵌入合并emb = np.concatenate([emb_h, emb_w], axis=1)  # (H*W, D)# 返回最终的嵌入return emb# 从网格生成一维正弦余弦位置嵌入
def get_1d_sincos_pos_embed_from_grid(embed_dim, pos):"""embed_dim: 每个位置的输出维度pos: 需要编码的位置列表:大小 (M,)out: (M, D)"""# 确保嵌入维度是偶数assert embed_dim % 2 == 0# 生成 omega 数组用于位置编码omega = np.arange(embed_dim // 2, dtype=np.float64)omega /= embed_dim / 2.0# 计算频率,得到 (D/2,)omega = 1.0 / 10000 ** omega  # (D/2,)# 将位置调整为一维pos = pos.reshape(-1)  # (M,)# 计算位置与频率的外积out = np.einsum("m,d->md", pos, omega)  # (M, D/2), outer product# 计算正弦和余弦嵌入emb_sin = np.sin(out)  # (M, D/2)emb_cos = np.cos(out)  # (M, D/2)# 将正弦和余弦嵌入合并emb = np.concatenate([emb_sin, emb_cos], axis=1)  # (M, D)# 返回最终的嵌入return emb# 位置嵌入混合类
class PositionEmbeddingMixin(BaseMixin):def __init__(self,max_height,max_width,hidden_size,text_length=0,block_size=16,**kwargs,):# 初始化父类super().__init__()# 设置最大高度self.max_height = max_height# 设置最大宽度self.max_width = max_width# 设置隐藏层大小self.hidden_size = hidden_size# 设置文本长度self.text_length = text_length# 设置块大小self.block_size = block_size# 初始化图像位置嵌入参数self.image_pos_embedding = nn.Parameter(torch.zeros(self.max_height, self.max_width, hidden_size), requires_grad=False)# 前向传播计算位置嵌入def position_embedding_forward(self, position_ids, target_size, **kwargs):ret = []# 遍历目标大小for h, w in target_size:# 将高度和宽度除以块大小h, w = h // self.block_size, w // self.block_size# 获取图像位置嵌入并重塑image_pos_embed = self.image_pos_embedding[:h, :w].reshape(h * w, -1)# 连接文本嵌入与图像嵌入pos_embed = torch.cat([torch.zeros((self.text_length, self.hidden_size),dtype=image_pos_embed.dtype,device=image_pos_embed.device,),image_pos_embed,],dim=0,)# 添加到结果列表中ret.append(pos_embed[None, ...])# 合并所有位置嵌入return torch.cat(ret, dim=0)# 重新初始化位置嵌入def reinit(self, parent_model=None):# 删除当前位置嵌入del self.transformer.position_embeddings# 获取新的二维正弦余弦位置嵌入pos_embed = get_2d_sincos_pos_embed(self.image_pos_embedding.shape[-1], self.max_height, self.max_width)# 重塑位置嵌入为二维形状pos_embed = pos_embed.reshape(self.max_height, self.max_width, -1)# 复制新的位置嵌入数据self.image_pos_embedding.data.copy_(torch.from_numpy(pos_embed).float())# 最终层混合类
class FinalLayerMixin(BaseMixin):def __init__(self,hidden_size,time_embed_dim,patch_size,block_size,out_channels,elementwise_affine=False,eps=1e-6,do_unpatchify=True,):# 调用父类构造函数进行初始化super().__init__()# 设置隐藏层大小self.hidden_size = hidden_size# 设置每个补丁的大小self.patch_size = patch_size# 设置块的大小self.block_size = block_size# 设置输出通道数self.out_channels = out_channels# 确定是否进行去补丁处理self.do_unpatchify = do_unpatchify# 初始化最终的层归一化,带有可学习的参数self.norm_final = nn.LayerNorm(hidden_size,elementwise_affine=elementwise_affine,eps=eps,)# 创建一个包含SiLU激活和线性层的序列self.adaln = nn.Sequential(nn.SiLU(),nn.Linear(time_embed_dim, 2 * hidden_size),)# 初始化线性层以将隐藏状态映射到输出通道self.linear = nn.Linear(hidden_size, out_channels * patch_size ** 2)def final_forward(self, logits, emb, text_length, target_size=None, **kwargs):# 截取logits以获取文本长度后的部分x = logits[:, text_length:]# 使用adaln模块对嵌入进行变换并拆分为偏移和缩放shift, scale = self.adaln(emb).chunk(2, dim=1)# 对x进行归一化并应用偏移和缩放x = modulate(self.norm_final(x), shift, scale)# 通过线性层获得最终输出x = self.linear(x)# 如果需要去补丁处理if self.do_unpatchify:# 从目标大小中提取高度和宽度target_height, target_width = target_size[0]# 断言目标大小必须能被块大小整除assert (target_height % self.block_size == 0 and target_width % self.block_size == 0), "target size must be divisible by block size"# 计算输出高度和宽度out_height, out_width = (target_height // self.block_size * self.patch_size,target_width // self.block_size * self.patch_size,)# 进行去补丁处理,恢复原图x = unpatchify(x, channels=self.out_channels, patch_size=self.patch_size, height=out_height, width=out_width)# 返回最终输出return xdef reinit(self, parent_model=None):# 使用Xavier均匀分布初始化线性层权重nn.init.xavier_uniform_(self.linear.weight)# 将线性层偏置初始化为0nn.init.constant_(self.linear.bias, 0)
# 定义一个混合类 AdalnAttentionMixin,继承自 BaseMixin
class AdalnAttentionMixin(BaseMixin):# 初始化函数,接受多个参数以设置模型的各项属性def __init__(self,hidden_size,  # 隐藏层大小num_layers,  # 层数time_embed_dim,  # 时间嵌入维度qk_ln=True,  # 是否使用查询和键的层归一化hidden_size_head=None,  # 头部的隐藏层大小elementwise_affine=False,  # 是否使用逐元素仿射变换eps=1e-6,  # 用于层归一化的平滑项):# 调用父类的初始化方法super().__init__()# 创建一个包含多个顺序模块的模块列表,每个模块由 SiLU 激活函数和线性层组成self.adaln_modules = nn.ModuleList([nn.Sequential(nn.SiLU(), nn.Linear(time_embed_dim, 12 * hidden_size)) for _ in range(num_layers)])# 记录是否使用查询和键的层归一化self.qk_ln = qk_ln# 如果使用层归一化,则为查询和键分别创建模块列表if qk_ln:# 创建用于查询的层归一化模块列表self.query_layernorms = nn.ModuleList([LayerNorm(hidden_size_head, elementwise_affine=elementwise_affine, eps=eps)for _ in range(num_layers)  # 为每一层创建一个层归一化模块])# 创建用于键的层归一化模块列表self.key_layernorms = nn.ModuleList([LayerNorm(hidden_size_head, elementwise_affine=elementwise_affine, eps=eps)for _ in range(num_layers)  # 为每一层创建一个层归一化模块])# 定义前向传播的方法,接收隐藏状态、掩码、文本长度等参数def layer_forward(self,hidden_states,  # 当前层的隐藏状态mask,  # 掩码,用于忽略特定位置text_length,  # 文本的长度layer_id,  # 当前层的索引emb,  # 嵌入表示*args,  # 可变参数**kwargs,  # 关键字参数# 定义一个方法的结尾,接受必要的参数):# 获取指定层的 Transformer 层layer = self.transformer.layers[layer_id]# 获取与该层对应的自适应层归一化模块adaln_module = self.adaln_modules[layer_id]# 从自适应层归一化模块中处理输入并将结果分块为 12 个部分(shift_msa_img,scale_msa_img,gate_msa_img,shift_mlp_img,scale_mlp_img,gate_mlp_img,shift_msa_txt,scale_msa_txt,gate_msa_txt,shift_mlp_txt,scale_mlp_txt,gate_mlp_txt,) = adaln_module(emb).chunk(12, dim=1)# 扩展门控张量的维度,以便后续处理gate_msa_img, gate_mlp_img, gate_msa_txt, gate_mlp_txt = (gate_msa_img.unsqueeze(1),gate_mlp_img.unsqueeze(1),gate_msa_txt.unsqueeze(1),gate_mlp_txt.unsqueeze(1),)# 对输入的隐藏状态进行层归一化处理attention_input = layer.input_layernorm(hidden_states)# 对文本输入进行调制,应用相应的偏移和缩放text_attention_input = modulate(attention_input[:, :text_length], shift_msa_txt, scale_msa_txt)# 对图像输入进行调制,应用相应的偏移和缩放image_attention_input = modulate(attention_input[:, text_length:], shift_msa_img, scale_msa_img)# 将文本和图像的注意力输入合并attention_input = torch.cat((text_attention_input, image_attention_input), dim=1)# 计算注意力输出,应用遮罩和层 IDattention_output = layer.attention(attention_input, mask, layer_id=layer_id, **kwargs)# 如果层归一化顺序是 "sandwich",则应用第三次层归一化if self.transformer.layernorm_order == "sandwich":attention_output = layer.third_layernorm(attention_output)# 将隐藏状态分为文本和图像部分text_hidden_states, image_hidden_states = hidden_states[:, :text_length], hidden_states[:, text_length:]# 将注意力输出分为文本和图像部分text_attention_output, image_attention_output = (attention_output[:, :text_length],attention_output[:, text_length:],)# 更新文本隐藏状态,加上文本注意力输出的加权text_hidden_states = text_hidden_states + gate_msa_txt * text_attention_output# 更新图像隐藏状态,加上图像注意力输出的加权image_hidden_states = image_hidden_states + gate_msa_img * image_attention_output# 合并更新后的隐藏状态hidden_states = torch.cat((text_hidden_states, image_hidden_states), dim=1)# 对合并后的隐藏状态进行后注意力层归一化mlp_input = layer.post_attention_layernorm(hidden_states)# 对文本输入进行调制,应用相应的偏移和缩放text_mlp_input = modulate(mlp_input[:, :text_length], shift_mlp_txt, scale_mlp_txt)# 对图像输入进行调制,应用相应的偏移和缩放image_mlp_input = modulate(mlp_input[:, text_length:], shift_mlp_img, scale_mlp_img)# 将文本和图像的 MLP 输入合并mlp_input = torch.cat((text_mlp_input, image_mlp_input), dim=1)# 计算 MLP 输出,应用层 IDmlp_output = layer.mlp(mlp_input, layer_id=layer_id, **kwargs)# 如果层归一化顺序是 "sandwich",则应用第四次层归一化if self.transformer.layernorm_order == "sandwich":mlp_output = layer.fourth_layernorm(mlp_output)# 将隐藏状态分为文本和图像部分text_hidden_states, image_hidden_states = hidden_states[:, :text_length], hidden_states[:, text_length:]# 将 MLP 输出分为文本和图像部分text_mlp_output, image_mlp_output = mlp_output[:, :text_length], mlp_output[:, text_length:]# 更新文本隐藏状态,加上文本 MLP 输出的加权text_hidden_states = text_hidden_states + gate_mlp_txt * text_mlp_output# 更新图像隐藏状态,加上图像 MLP 输出的加权image_hidden_states = image_hidden_states + gate_mlp_img * image_mlp_output# 合并更新后的隐藏状态hidden_states = torch.cat((text_hidden_states, image_hidden_states), dim=1)# 返回最终的隐藏状态return hidden_states# 定义注意力前向传播函数,接收隐藏状态、掩码、层ID及其他参数def attention_forward(self, hidden_states, mask, layer_id, **kwargs):# 获取指定层的注意力模块attention = self.transformer.layers[layer_id].attention# 默认的注意力计算函数attention_fn = attention_fn_default# 如果注意力模块有自定义的注意力函数,则使用它if "attention_fn" in attention.hooks:attention_fn = attention.hooks["attention_fn"]# 通过隐藏状态计算查询、键、值qkv = attention.query_key_value(hidden_states)# 将查询、键、值沿最后一个维度分离mixed_query_layer, mixed_key_layer, mixed_value_layer = split_tensor_along_last_dim(qkv, 3)# 根据训练状态选择是否应用 dropoutdropout_fn = attention.attention_dropout if self.training else None# 转置查询、键、值以便于后续计算query_layer = attention._transpose_for_scores(mixed_query_layer)key_layer = attention._transpose_for_scores(mixed_key_layer)value_layer = attention._transpose_for_scores(mixed_value_layer)# 如果使用层归一化,应用于查询和键if self.qk_ln:query_layernorm = self.query_layernorms[layer_id]key_layernorm = self.key_layernorms[layer_id]query_layer = query_layernorm(query_layer)key_layer = key_layernorm(key_layer)# 计算上下文层,使用指定的注意力函数context_layer = attention_fn(query_layer, key_layer, value_layer, mask, dropout_fn, **kwargs)# 调整上下文层的维度顺序context_layer = context_layer.permute(0, 2, 1, 3).contiguous()# 创建新的上下文层形状,适配后续计算new_context_layer_shape = context_layer.size()[:-2] + (attention.hidden_size_per_partition,)# 重新调整上下文层的形状context_layer = context_layer.view(*new_context_layer_shape)# 通过全连接层计算输出output = attention.dense(context_layer)# 如果处于训练状态,应用输出的 dropoutif self.training:output = attention.output_dropout(output)# 返回最终输出return output# 定义多层感知机的前向传播函数def mlp_forward(self, hidden_states, layer_id, **kwargs):# 获取指定层的多层感知机模块mlp = self.transformer.layers[layer_id].mlp# 通过全连接层将隐藏状态映射到更高维度intermediate_parallel = mlp.dense_h_to_4h(hidden_states)# 应用激活函数intermediate_parallel = mlp.activation_func(intermediate_parallel)# 将高维结果映射回原维度output = mlp.dense_4h_to_h(intermediate_parallel)# 如果处于训练状态,应用 dropoutif self.training:output = mlp.dropout(output)# 返回最终输出return output# 定义重新初始化函数,接受可选的父模型参数def reinit(self, parent_model=None):# 遍历自适应层模块for layer in self.adaln_modules:# 将最后一层的权重初始化为 0nn.init.constant_(layer[-1].weight, 0)# 将最后一层的偏置初始化为 0nn.init.constant_(layer[-1].bias, 0)
# 定义一个字符串到数据类型的映射
str_to_dtype = {"fp32": torch.float32, "fp16": torch.float16, "bf16": torch.bfloat16}# 定义一个扩散变换器类,继承自基础模型
class DiffusionTransformer(BaseModel):# 初始化方法,接受多个参数以配置模型def __init__(self,in_channels,  # 输入通道数out_channels,  # 输出通道数hidden_size,  # 隐藏层大小patch_size,  # 图像块大小num_layers,  # 层数num_attention_heads,  # 注意力头数text_length,  # 文本长度time_embed_dim=None,  # 时间嵌入维度,默认为 Nonenum_classes=None,  # 类别数量,默认为 Noneadm_in_channels=None,  # 自适应输入通道,默认为 Nonemodules={},  # 额外模块,默认为空字典dtype="fp32",  # 数据类型,默认为 fp32layernorm_order="pre",  # 层归一化顺序,默认为 "pre"elementwise_affine=False,  # 是否启用逐元素仿射,默认为 Falseparallel_output=True,  # 是否并行输出,默认为 Trueblock_size=16,  # 块大小,默认为 16**kwargs,  # 其他关键字参数):# 初始化基类super().__init__(**kwargs)# 前向传播方法,定义模型的前向计算def forward(self, x, timesteps=None, context=None, y=None, **kwargs):# 将输入张量 x 转换为指定的数据类型x = x.to(self.dtype)# 获取时间步的嵌入表示t_emb = timestep_embedding(timesteps, self.model_channels, repeat_only=False, dtype=self.dtype)# 对时间嵌入进行处理emb = self.time_embed(t_emb)# 确保 y 和 x 的批次大小一致assert y.shape[0] == x.shape[0]# 将标签嵌入与时间嵌入相加emb = emb + self.label_emb(y)# 创建输入 ID、位置 ID 和注意力掩码,均初始化为 1input_ids = position_ids = attention_mask = torch.ones((1, 1)).to(x.dtype)# 调用基类的前向方法,传入多个参数以计算输出output = super().forward(images=x,  # 输入图像emb=emb,  # 嵌入表示encoder_outputs=context,  # 编码器输出text_length=self.text_length,  # 文本长度input_ids=input_ids,  # 输入 IDposition_ids=position_ids,  # 位置 IDattention_mask=attention_mask,  # 注意力掩码**kwargs,  # 其他关键字参数)[0]  # 获取输出的第一个元素# 返回模型的输出return output

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/74921.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

券后价复杂根源和解法

券后价领域划分不清楚 券后价在电商系统中是个很奇怪的存在 无论是按商品领域还是营销领域划分,它都不合适归类到这两者中间。结果就是券后价是个很不理想的拆分逻辑。 券后价可以理解是商品的价格属性,这个属性是由营销来计算控制。领域划分可以理解为商品领域,营销做计算!…

营销领域分析

用户与商品的连接用户购买商品是整个商业的基本盘。用户与商品是多对多关系,在这个基础之上就可衍生出许多行为。可以跟据商品的属性又可以设计各种运营方式。 用几个条件来归类交易产生的条件条件 人 物when 人什么时候需要商品 商品什么时候被需要why 人为什么需要商品 商品…

PbootCMS授权码可以更换域名吗? 授权码丢失怎么办?

授权码可以更换域名吗?不可以:授权码是绑定特定域名的,如果需要更换域名,建议重新获取新的授权码。授权码丢失怎么办?重新获取:如果授权码丢失,可以重新访问授权页面,输入相同的域名再次获取授权码。扫码添加技术【解决问题】专注中小企业网站建设、网站安全12年。熟悉…

PbootCMS授权码是否可以用于不同域名的子域名?是否可以用于不同域名的子目录?

授权码是否可以用于不同域名的子域名?不可以:授权码是绑定特定域名的,不支持不同域名的子域名。例如,sub1.example.com 和 sub2.anotherdomain.com 需要分别获取授权码。18. 授权码是否可以用于不同域名的子目录?不可以:授权码是绑定特定域名的,不支持不同域名的子目录。…

PbootCMS验证码不显示或显示不清楚怎么办

验证码不显示或显示不清楚问题描述:后台登录时验证码不显示或显示不清楚。 解决方案:避免使用中文路径:确保所有文件和目录名称均为英文或数字。 切换PHP版本:推荐使用PHP 7.3、7.2、5.6版本。 检查文件权限:确保验证码相关文件和目录具有适当的读写权限(通常为755或644)…

值得信赖的FTP替代方案有哪些,一文带你详细了解!

FTP(文件传输协议)因其传输速度慢、安全隐患、管理复杂性、稳定性不足以及审计难题等缺陷,使得企业在寻找更高效的替代方案时显得尤为迫切。 FTP替代方案有哪些,简单了解看下吧: 1、SFTP:SFTP是建立在SSH(Secure Shell)协议之上的文件传输协议,提供了数据传输的加密和…

HTTP 2.0 新特性

HTTP 2.0 新特性HTTP 2.0 为什么使用二进制分帧?二进制协议比文本协议更加紧凑,减少占用空间 分帧层相当于将 HTTP 切分,更加灵活,比如可以对 header 帧做单独的特殊处理 分帧层有着属于自己的报文头,其中的 Stream Identity 使得操作系统具备将多个响应以及请求一一匹配的…

Python脚本检测笑脸漏洞

Python脚本检测笑脸漏洞 一、漏洞介绍 ​ vsftpd2.3.4中在6200端口存在一个shell,使得任何人都可以进行连接,并且VSFTPD v2.3.4 服务,是以 root 权限运行的,最终我们提到的权限也是root;当连接带有vsftpd 2.3.4版本的服务器的21端口时,输入用户中带有“😃 ”,密码…