上篇。https://blog.csdn.net/alxws/article/details/140058117?spm=1001.2014.3001.5502四、前向加噪过程(ForwardProcess.py)
当我们的去噪器设计完成后,接下来,就是我们的另外两个重要的部分:前向加噪过程和反向去噪过程了。
我们先看看论文原文中的那个算法介绍:
左边为训练过程,右边为去噪采样过程。
在训练过程中,我们是通过那个公式模拟一步到加噪t步的图片。然后,我们将这个图片来假装是模型自己获得的t步的图片,并在此基础上进行“加噪”。
我们将模型生成的噪声与我们在高斯噪声采样中得到的结果做差,来模拟基于某张个时间步后得到的噪声。之后我们将在去噪过程中,按照公式完成我们的图片的生成。
那么,开始!首先看看我们的前向加噪过程是怎么实现的吧!
这是我们导入的包:
import torch import torch.nn.functional as F from torchvision.transforms import Compose, ToTensor, Lambda, ToPILImage, CenterCrop, Resize,RandomHorizontalFlip import numpy as np
1、 beta的选择
我们在前面简单介绍过程的时候,说了一嘴,且。实际上,这个 ? 的递增序列我们有各种各样不同的写法。最常见的就是让这个序列线性增长:
# 定义一个线性beta调度函数,生成一个线性增长的beta值序列。 def linear_beta_schedule(timesteps): # 设置beta值的起始和结束值。 beta_start = 0.0001 beta_end = 0.02 # 返回一个从起始值到结束值线性增长的序列。 return torch.linspace(beta_start, beta_end, timesteps)
或者使用二次函数的方式去生成这个序列:
# 定义一个二次方beta调度函数,生成一个二次方增长的beta值序列。 def quadratic_beta_schedule(timesteps): # 设置beta值的起始和结束值。 beta_start = 0.0001 beta_end = 0.02 # 返回一个从起始值的平方根到结束值的平方根线性增长的序列,并将其平方。 return torch.linspace(beta_start**0.5, beta_end**0.5, timesteps) ** 2
或者是“缓-急-缓”,即从sigmod函数中采样得到对应的序列:
# 定义一个S形beta调度函数,生成一个S形曲线的beta值序列。 def sigmoid_beta_schedule(timesteps): # 设置beta值的起始和结束值。 beta_start = 0.0001 beta_end = 0.02 # 创建一个从-6到6的等间隔序列,用于sigmoid函数的输入。 betas = torch.linspace(-6, 6, timesteps) # 应用sigmoid函数并调整序列,使其在指定的起始和结束值之间。 return torch.sigmoid(betas) * (beta_end - beta_start) + beta_start
还有人在这方面做过相关的研究,不过我们不去深入探究了。在本文中,笔者使用的还是线性调度的方式去生成相关的序列。
2、 其余相关参数的生成
有了,我们剩下的要做的,就是获得我们需要的一系列参数。让我们先来回顾一下比较重要的两个公式:
一步到位的前向加噪过程公式: 基于上述公式的反向去噪过程:其中有很多参数都是我们可以直接计算得到的。即通过我们的GetElements()函数得到。其中输出的结果有这样的对应关系:
betas对应;
alphas对应 ;
sqrt_recip_alphas对应 ;
alphas_cumprod对应;
alphas_cumprod_prev对应;
sqrt_alphas_cumprod对应;
sqrt_one_minus_alphas_cumprod对应;
posterior_variance对应。
让我们使用下面的函数来获得我们的结果:
# 获取相关参数数值; def GetElements(timesteps=300): betas = linear_beta_schedule(timesteps=timesteps) # 通过公式定义\bar{alpha}_t。 alphas = 1. - betas alphas_cumprod = torch.cumprod(alphas, axis=0) # 返回逐步累乘结果; alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0) sqrt_recip_alphas = torch.sqrt(1.0 / alphas) # 计算q(x_t | x_{t-1}); sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod) sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod) # 计算 q(x_{t-1} | x_t, x_0); posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod) return [betas,# β参数,控制噪声的加入; alphas, alphas_cumprod, alphas_cumprod_prev, sqrt_recip_alphas, # α的平方根的倒数; sqrt_alphas_cumprod, sqrt_one_minus_alphas_cumprod, # 1-α的累积乘积的平方根; posterior_variance] # 后验方差;
3、前向加噪过程部分
接下来,就是公式的直接应用!让我们使用这个公式来完成我们的一步加噪过程吧!
# 定义前向加噪过程,即一步骤到位的加噪过程; def q_sample(x_start, t, sqrt_alphas_cumprod, sqrt_one_minus_alphas_cumprod, noise=None): if noise is None: noise = torch.randn_like(x_start) sqrt_alphas_cumprod_t = extract(sqrt_alphas_cumprod, t, x_start.shape) sqrt_one_minus_alphas_cumprod_t = extract( sqrt_one_minus_alphas_cumprod, t, x_start.shape ) return sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise
其中有一个函数extract(),它的作用是从输入张量中提取特定的元素,并将它们重塑成与输入张量 x_shape 相关的形状。
# 从输入张量a中提取特定的元素,并将它们重塑成与输入张量 x_shape 相关的形状; def extract(a, t, x_shape): batch_size = t.shape[0] # 获取批次大小,即t张量的第一个维度的大小; # 使用gather函数根据t张量中的索引来提取a张量中的元素; out = a.gather(-1, t.cpu()) # 重塑提取出的元素,使其形状与x_shape的前n-1个维度相匹配,同时保持批次大小不变; reshaped_out = out.reshape(batch_size, *((1,) * (len(x_shape) - 1))) return reshaped_out.to(t.device)
现在,假设我们要对一张图片进行加噪。让我们看看这个加噪过程是怎么完成的吧!以每五十步加噪采样得到的结果组成一组套图来康康:
t=5,55,105,155,205的加噪图。
现在,整个加噪过程都已经完成了!但我们是要将输入的图片转化为张量后放入到其中进行运算的。因此我们需要一些辅助函数来帮忙计算一下。比如这个图像转张量的函数:
# 将一张图像转化为一个张量; def image2tensor(image,image_size=64): transform = Compose([ Resize(image_size), CenterCrop(image_size), RandomHorizontalFlip(), ToTensor(), Lambda(lambda t: (t * 2) - 1), ]) return transform(image)
还有将张量转化为图像的函数:
# 将一个张量转化为一张图像; def tensor2image(tensor): reverse_transform = Compose([ Lambda(lambda t: (t + 1) / 2), Lambda(lambda t: t.permute(1, 2, 0)), # CHW to HWC Lambda(lambda t: t * 255.), Lambda(lambda t: t.numpy().astype(np.uint8)), ToPILImage(), ]) return reverse_transform(tensor.squeeze())
到此,我们的前向传播过程就全部结束啦!根据算法1,我们接下来要做的就是Loss相关的计算了,并且构建我们的反向传播过程。先看看Loss吧!
五、损失函数的设计、训练集的构成(Loss.py)
有了前向加噪过程,我们要做的,就是在此基础上进行Loss的设计了。这其实相当的容易。
下面是我们要导入的包:
import ForwardProcess as FP import torch.nn.functional as F import torch from PIL import Image import os from torch.utils.data import Dataset, DataLoader
首先,是我们的Loss设计。
1、Loss设计
根据算法1,我们的Loss要做的其实很简单。我们实际上就是做算法1的4到5步。
为了不用来回翻,笔者在此再放一次图。
则Loss设计的代码如下:
# 损失函数; def p_losses(denoise_model, x_start, t, sqrt_alphas_cumprod, sqrt_one_minus_alphas_cumprod, noise=None, loss_type="l1"): # 如果没有提供噪声,就创建一个与x_start形状相同的随机噪声; if noise is None: noise = torch.randn_like(x_start) # 通过前向过程获得某一时间步的时候的加噪图像; x_noisy = FP.q_sample(x_start=x_start, t=t, sqrt_alphas_cumprod=sqrt_alphas_cumprod, sqrt_one_minus_alphas_cumprod=sqrt_one_minus_alphas_cumprod, noise=noise) # 让模型输出其结果; predicted_noise = denoise_model(x_noisy, t) # 根据不同type设计loss下降; if loss_type == 'l1': loss = F.l1_loss(noise, predicted_noise) elif loss_type == 'l2': loss = F.mse_loss(noise, predicted_noise) elif loss_type == "huber": loss = F.smooth_l1_loss(noise, predicted_noise) else: raise NotImplementedError() return loss
我们是通过公式模拟一步到加噪t步的图片。然后,我们将这个图片来假装是模型自己获得的t步的图片,并在此基础上进行“加噪”。
2、数据集构建
然后,我们就可以构建自己的数据集了。笔者采用的是ImageNet64x64数据集,并且使用了里面的pizza图。在文件夹里一眼望过去长这样:
每张图片尺寸大小为64x64,且均为RGB图像,一共3000张。
笔者便根据此构建数据集。是很常见的构建方法,正常重写__len__和__getitem__:
# DataLoader的创立; class CIFARdataset(Dataset): def __init__(self, dataset_folder, transform=None): super(CIFARdataset, self).__init__() self.dataset_folder = dataset_folder self.transform = transform self.image_files = \ [os.path.join(self.dataset_folder, file) for file in os.listdir(self.dataset_folder)] def __len__(self): return len(self.image_files) def __getitem__(self, idx): image_path = self.image_files[idx] image = Image.open(str(image_path)).convert('RGB') if self.transform: image = self.transform(image) return image
然后创建一个DataLoader来管理数据:
# 创建dataloader; def makeDataLoader(datapath="dataset"): dataset = CIFARdataset(datapath, transform=FP.image2tensor) dataloader = DataLoader(dataset,batch_size=32,shuffle=True) return dataloader
到此,我们的整个文件就全部写完啦!接下来,让我们再跑一遍算法1,来进行一遍完整的训练过程。
六、训练过程(train.py)
有了前向加噪过程和Loss的支持,我们就可以完成训练的整个过程了!其实就是完整跑一遍论文中的算法1啦。
首先,先导入一系列的包:
import torch import Loss import Model import ForwardProcess as FP from torch.optim import Adam from torchvision.utils import save_image
我们需要做一些训练前的准备。
1、参数、模型加载
首先,我们先加载我们的一些参数:
# 获得超参数; elements = FP.GetElements() # 其中不同的参数的含义; # [betas,# β参数,控制噪声的加入; 0 # alphas, 1 # alphas_cumprod, 2 # alphas_cumprod_prev, 3 # sqrt_recip_alphas, # α的平方根的倒数; 4 # sqrt_alphas_cumprod, 5 # sqrt_one_minus_alphas_cumprod, # 1-α的累积乘积的平方根; 6 # posterior_variance] # 后验方差; 7
然后加载我们的模型:
# 在gpu上使用程序; device = "cuda" if torch.cuda.is_available() else "cpu" # 确定模型参数; image_size = 64 channels = 3 model = Model.Unet( dim=image_size, channels=channels, dim_mults=(1, 2, 4,) ) model.to(device)
然后加载优化器和数据集:
# 确定优化器; optimizer = Adam(model.parameters(), lr=1e-3) # 确定dataset; dataset = Loss.makeDataLoader()
再确定我们的epochs和最大时间步:
epochs = 40 #确定训练轮次; timesteps = 300 #确定最大时间步;
接下来,就要开始我们的训练啦!
2、训练
我们先定义一个训练时的辅助函数,用来为我们的训练批次分组:
# 定义一个函数num_to_groups,接受两个参数:num(被分组的数)和divisor(每组的大小); def num_to_groups(num, divisor): groups = num // divisor remainder = num % divisor # 使用模运算符'%'计算分组后的余数; arr = [divisor] * groups # 创建一个列表,包含'groups'个'divisor',即完全分组的列表; if remainder > 0: arr.append(remainder) # 将余数作为一个新的组添加到列表中; return arr # 返回包含所有组的列表; save_and_sample_every = 1000
然后,开始我们正式的训练过程:
for epoch in range(epochs): for step, batch in enumerate(dataset): optimizer.zero_grad() batch_size = batch.shape[0] batch = batch.to(device) # 任取一个作为时间步进行下降; t = torch.randint(0, timesteps, (batch_size,), device=device).long() # 获取Loss,并计算得到结果; loss = Loss.p_losses(model, batch, t, elements[5], elements[6], loss_type="huber") if step % 100 == 0: print("Loss:", loss.item()) # 反向传播; loss.backward() optimizer.step() torch.save(model.state_dict(),'./model.pt')
这些都是很常规的东西,不再进行详细的介绍了。训练后的Loss打印出来是这样:
我们训练时打印出来的Loss。
到此,我们的模型就训练完成啦!接下来,我们要做的,就是开始我们的反向去噪过程,来让我们的去噪器能够从一张完整采样的高斯噪声中“拉”出我们的图像。
七、反向去噪过程(ReverseProcess.py)
反向去噪过程,其实就是严格按照我们的算法2进行的。
为了不用来回翻,笔者在此再放一次图。
我们首先要搞定第4步,也就是这个很多密密麻麻公式的这一步。这是我们推导出来的东西。相关的代码如下:
@torch.no_grad() def p_sample(model, x, # 当前的样本; t, # 当前的时间步; betas, sqrt_recip_alphas, sqrt_one_minus_alphas_cumprod, posterior_variance, t_index, timesteps=300): betas_t = extract(betas, t, x.shape) # 提取当前时间步的β; sqrt_one_minus_alphas_cumprod_t = extract( # 提取当前时间步的1-α的累积乘积的平方根; sqrt_one_minus_alphas_cumprod, t, x.shape) # 提取当前时间步的α的平方根的倒数; sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape) model_mean = sqrt_recip_alphas_t * ( # 计算模型的均值; x - betas_t * model(x, t) / sqrt_one_minus_alphas_cumprod_t) if t_index == 0: # 如果是第一个时间步; return model_mean # 直接返回模型的均值; else: # 如果不是第一个时间步; # 提取当前时间步的后验方差; posterior_variance_t = extract(posterior_variance, t, x.shape) noise = torch.randn_like(x) # 生成与x形状相同的随机噪声; # 返回带有噪声的模型均值; return model_mean + torch.sqrt(posterior_variance_t) * noise
上面的代码是一步采样的结果。我们需要执行整个时间步的采样,去实现我们最后的效果。
接下来是执行整个算法2:
# 执行整个模型采样过程; @torch.no_grad() def p_sample_loop(model, shape, betas, sqrt_recip_alphas, sqrt_one_minus_alphas_cumprod, posterior_variance, timesteps=300): device = next(model.parameters()).device b = shape[0] img = torch.randn(shape, device=device) imgs = [] # 添加加载条加载进度; for i in tqdm(reversed(range(0, timesteps)), desc='sampling loop time step', total=timesteps): img = p_sample(model, img, torch.full((b,), i, device=device, dtype=torch.long), betas, sqrt_recip_alphas, sqrt_one_minus_alphas_cumprod, posterior_variance, i) imgs.append(img.cpu().numpy()) return imgs
然后确定一个接口执行采样:
# 执行采样; @torch.no_grad() def sample(model, image_size, betas, sqrt_recip_alphas, sqrt_one_minus_alphas_cumprod, posterior_variance, batch_size=16, channels=3): return p_sample_loop(model, (batch_size, channels, image_size, image_size), betas, sqrt_recip_alphas, sqrt_one_minus_alphas_cumprod, posterior_variance)
搞定!接下来就让我们开始我们的test吧!已经迫不及待的想看我们模型的输出啦!
八、测试结果(test.py)
我们的测试,实际上就是用我们的模型执行我们的反向去噪的整个过程。开始吧!首先是导入的包:
import torch import Model import ForwardProcess as FP import ReverseProcess as RP import numpy as np from PIL import Image import os
首先,先导入我们的模型和参数:
# 在gpu上使用程序; device = "cuda" if torch.cuda.is_available() else "cpu" # 确定模型参数; image_size = 64 channels = 3 model = Model.Unet( dim=image_size, channels=channels, dim_mults=(1, 2, 4,) ) model.to(device) # 加载模型; model.load_state_dict(torch.load('model.pt')) model.eval() # 获得公式计算的相关参数; elements = FP.GetElements()
接下来,开始执行我们的反向去噪过程:
samples = RP.sample(model, image_size=image_size, batch_size=32, channels=channels, betas=elements[0], sqrt_recip_alphas=elements[4], sqrt_one_minus_alphas_cumprod=elements[6], posterior_variance=elements[7]) print("shape of samples:",np.shape(samples)) samples = torch.tensor(samples, dtype=torch.float32)
在这一步,这个变量samples的维度是(timesteps, batch_size, channels, image_size, image_size)。我们需要把我们想要的图像保存下来。
先整理一个文件夹:
if not os.path.exists('results'):# 当前目录下没有文件夹就创造一个。 os.makedirs('results')
然后遍历时间步,取我们需要的图片并保存下来:
# 遍历每个时间步 for i in range(samples.shape[0]): # 随机选择一个图像; image_index = 25 image = samples[i, image_index] # 将数值缩放到0-255之内; img_normalized = ((image - image.min()) * (255 / (image.max() - image.min()))) # 将numpy数组转换为PIL图像; img_normalized = img_normalized.numpy().astype(np.int8) img_normalized = np.transpose(img_normalized, (1, 2, 0)) img_pil = Image.fromarray(img_normalized, 'RGB') # 保存图像,图像名称为时间步的名称; img_pil.save(f'results/time_step_{i}.png') print("ending.")
我们的图片保存在当下的results文件夹中。采样的图片长这样:
部分采样的图片。
有些看不清?那让笔者将0步、49步、99步、149步、199步、249步和299步的生成图片单独拉出来看一下:
0步、49步、99步、149步、199步、249步和299步采样图。
我们可以看到,图片逐渐显现出了披萨的模样!这个多么小的数据量和训练批次居然能达到这样的效果,真的很棒!
我们也跑完了整个扩散模型的流程啦!
总结
能跟随笔者的代码,一步一步的到达这里,是一件非常不容易的事情。Diffusion的地位在图像生成领域和Transformer一样重要。当今很多惊人效果的展现,越发的体现出了DDPM的重要性和学习该算法的必要性啦!
笔者也会继续解析一些对目前现实影响深远的模型架构和实现方法。让我们一起加油吧!学无止境!有什么不是很懂的地方,可以在评论区询问!笔者也会一一回答的啦!
让我们一起共同进步!
总结
numpydivi数据集cpu文件夹gpuapppng图片尺寸数据集构建图像转化随机选择模型生成diffusion写完啦transformer扩散模型图像生成andi公式计算