画图x-y折线图
随机梯度下降
%matplotlib inline
import random
import torch
import matplotlib.pyplot as plt
def synthetic_data(w, b, num_examples):
"""y=wx+b+noise"""
X = torch.normal(0, 1, size=(num_examples, len(w)))
y = X.matmul(w) + b + torch.randn(num_examples, 1) * 0.001
return X, y
true_w = torch.tensor([[2, -3.4, 4.2]]).reshape(3, 1)
true_b = torch.tensor(4.3)
num_examples = 10000
features, labels = synthetic_data(true_w, true_b, num_examples)
# 画图x-y折线图
figure = plt.figure(figsize=(10, 10))
plt.scatter(features[:, 2], labels, s=10, alpha=0.5)
def data_iter(batch_size, features, labels):
num_examples = len(features)
indices = list(range(num_examples))
random.shuffle(indices)
for i in range(0, num_examples, batch_size):
j = indices[i:min(i + batch_size, num_examples)]
yield features[j], labels[j]
batch_size = 10
for X, y in data_iter(batch_size, features, labels):
print(X, '\n', y)
break
w = torch.normal(0, 1, (3, 1), requires_grad=True)
b = torch.zeros(1, requires_grad=True)
def loss(y_true, y_pred):
return sum((y_true - y_pred) ** 2) / len(y_true)
model = torch.nn.Linear(3, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
epoches = 1000
for epoch in range(epoches):
y_pred = model(features)
loss_val = loss(labels, y_pred)
optimizer.zero_grad()
loss_val.backward()
optimizer.step()
if epoch % 100 == 0:
print(f"Epoch: {epoch}, Loss: {loss_val.item():.4f}")
# eval
X_test, y_test = synthetic_data(true_w, true_b, num_examples//10)
y_pred = model(X_test)
print(f"loss:{loss(y_test, y_pred).item()}")
单头注意力
import torch
import torch.nn as nn
import numpy as np
class ScaledDotProductAttention(nn.Module):
""" Scaled Dot-Product Attention """
def __init__(self, scale):
super().__init__()
self.scale = scale
self.softmax = nn.Softmax(dim=2)
def forward(self, q, k, v, mask=None):
u = torch.bmm(q, k.transpose(1, 2)) # 1.Matmul [B, n_q, n_k]
u = u / self.scale # 2.Scale
if mask is not None:
u = u.masked_fill(mask, -np.inf) # 3.Mask
attn = self.softmax(u) # 4.Softmax
output = torch.bmm(attn, v) # 5.Output [B, n_q, d_v]
return attn, output
if __name__ == "__main__":
n_q, n_k, n_v = 2, 4, 4
d_q, d_k, d_v = 128, 128, 64
batch = 128
q = torch.randn(batch, n_q, d_q) # [128 2 128]
k = torch.randn(batch, n_k, d_k) # [128 4 128]
v = torch.randn(batch, n_v, d_v) # [128 4 64]
mask = torch.zeros(batch, n_q, n_k).bool()
attention = ScaledDotProductAttention(scale=np.power(d_k, 0.5))
attn, output = attention(q, k, v, mask=mask)
print(attn.shape)
print(output.shape)
torch.Size([128, 2, 4])
torch.Size([128, 2, 64])
多头注意力
import torch
import torch.nn as nn
import numpy as np
class MultiHeadAttention1(nn.Module):
""" Multi-Head Attention """
def __init__(self, n_head, d_k_, d_v_, d_k, d_v, d_o):
super().__init__()
self.n_head = n_head
self.d_k = d_k
self.d_v = d_v
self.fc_q = nn.Linear(d_k_, n_head * d_k)
self.fc_k = nn.Linear(d_k_, n_head * d_k)
self.fc_v = nn.Linear(d_v_, n_head * d_v)
self.attention = ScaledDotProductAttention(scale=np.power(d_k, 0.5))
self.fc_o = nn.Linear(n_head * d_v, d_o)
def forward(self, q, k, v, mask=None):
n_head, d_q, d_k, d_v = self.n_head, self.d_k, self.d_k, self.d_v
batch, n_q, d_q_ = q.size()
batch, n_k, d_k_ = k.size()
batch, n_v, d_v_ = v.size()
q = self.fc_q(q) # 1.单头变多头
k = self.fc_k(k)
v = self.fc_v(v)
q = q.view(batch, n_q, n_head, d_q).permute(2, 0, 1, 3).contiguous().view(-1, n_q, d_q)
k = k.view(batch, n_k, n_head, d_k).permute(2, 0, 1, 3).contiguous().view(-1, n_k, d_k)
v = v.view(batch, n_v, n_head, d_v).permute(2, 0, 1, 3).contiguous().view(-1, n_v, d_v)
if mask is not None:
mask = mask.repeat(n_head, 1, 1)
attn, output = self.attention(q, k, v, mask=mask) # 2.当成单头注意力求输出
output = output.view(n_head, batch, n_q, d_v).permute(1, 2, 0, 3).contiguous().view(batch, n_q, -1) # 3.Concat
output = self.fc_o(output) # 4.仿射变换得到最终输出
return attn, output
if __name__ == "__main__":
n_q, n_k, n_v = 2, 4, 4
d_q_, d_k_, d_v_ = 128, 128, 128
batch = 32
q = torch.randn(batch, n_q, d_q_)
k = torch.randn(batch, n_k, d_k_)
v = torch.randn(batch, n_v, d_v_)
mask = torch.zeros(batch, n_q, n_k).bool()
mha = MultiHeadAttention1(n_head=4, d_k_=d_k_, d_v_=d_v_, d_k=32, d_v=32, d_o=128)
attn, output = mha(q, k, v, mask=mask)
print(attn.size())
print(output.size())
torch.Size([128, 2, 4])
torch.Size([32, 2, 128])
多头注意力(另一个版本)
import math
from typing import Optional, List
import torch
from torch import nn
from labml import tracker
class PrepareForMultiHeadAttention(nn.Module):
def __init__(self, d_model: int, heads: int, d_k: int, bias: bool):
super().__init__()
self.linear = nn.Linear(d_model, heads * d_k, bias=bias)
self.d_head = heads
self.d_k = d_k
def forward(self, x: torch.Tensor) -> torch.Tensor:
head_shape = x.shape[:-1]
x = self.linear(x)
x = x.view(*head_shape, self.d_head, self.d_k)
return x
class MultiHeadAttention(nn.Module):
def __init__(self, d_model: int, heads: int, d_k: int, bias: bool):
super().__init__()
self.query = PrepareForMultiHeadAttention(d_model, heads, d_k, bias)
self.key = PrepareForMultiHeadAttention(d_model, heads, d_k, bias)
self.value = PrepareForMultiHeadAttention(d_model, heads, d_k, bias)
self.softmax = nn.Softmax(dim=-1)
self.out = nn.Linear(heads * d_k, d_model, bias=bias)
self.scale = 1/math.sqrt(d_k)
self.atten = None
def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor:
batch_size, seq_len, _ = q.shape
q = self.query(q).permute(0, 2, 1, 3)
k = self.key(k).permute(0, 2, 1, 3)
v = self.value(v).permute(0, 2, 1, 3)
attn = torch.matmul(q, k.transpose(-2, -1))
attn = attn * self.scale
if mask is not None:
attn = attn.masked_fill(mask == 0, -1e9)
attn = self.softmax(attn)
self.atten = attn.detach()
out = torch.matmul(attn, v)
out = out.permute(0, 2, 1, 3).contiguous()
out = out.view(batch_size, seq_len, -1)
out = self.out(out)
return attn, out
batch = 4
n_q, n_k, n_v = 2, 4, 4
d_q_, d_k_, d_v_ = 128, 128, 128
heads = 4
q = torch.randn(batch, n_q, d_q_)
k = torch.randn(batch, n_k, d_k_)
v = torch.randn(batch, n_v, d_v_)
mask = torch.zeros(batch, heads, n_q, n_k).bool()
mha = MultiHeadAttention(d_model=128, heads=heads, d_k=32, bias=True)
attn, output = mha(q, k, v, mask=mask)
print(attn.size())
print(output.size())
torch.Size([4, 4, 2, 4])
torch.Size([4, 2, 128])
self attention
class SelfAttention(nn.Module):
""" Self-Attention """
def __init__(self, n_head, d_k, d_v, d_x, d_o):
super(SelfAttention, self).__init__()
self.wq = nn.Parameter(torch.Tensor(d_x, d_k))
self.wk = nn.Parameter(torch.Tensor(d_x, d_k))
self.wv = nn.Parameter(torch.Tensor(d_x, d_v))
self.mha = MultiHeadAttention1(n_head=n_head, d_k_=d_k, d_v_=d_v, d_k=d_k, d_v=d_v, d_o=d_o)
self.init_parameters()
def init_parameters(self):
for param in self.parameters():
stdv = 1. / np.power(param.size(-1), 0.5)
param.data.uniform_(-stdv, stdv)
def forward(self, x, mask=None):
q = torch.matmul(x, self.wq)
k = torch.matmul(x, self.wk)
v = torch.matmul(x, self.wv)
attn, output = self.mha(q, k, v, mask=mask)
return attn, output
if __name__ == "__main__":
n_x = 4
d_x = 80
x = torch.randn(batch, n_x, d_x)
mask = torch.zeros(batch, n_x, n_x).bool()
selfattn = SelfAttention(n_head=8, d_k=128, d_v=64, d_x=80, d_o=80)
attn, output = selfattn(x, mask=mask)
print(attn.size())
print(output.size())
torch.Size([256, 4, 4])
torch.Size([32, 4, 80])
beamsearch
import torch
import torch.nn.functional as F
def beam_search(LM_prob, beam_size=3):
batch,seqlen,vocab_size = LM_prob.shape
#对LM_prob取对数
log_LM_prob = LM_prob.log()
#先选择第0个位置的最大beam_size个token,log_emb_prob与indices的shape为(batch,beam)
log_beam_prob, indices = log_LM_prob[:,0,:].topk(beam_size,sorted = True)
indices = indices.unsqueeze(-1)
print(log_beam_prob.shape,indices.shape, log_LM_prob.shape)
#对每个长度进行beam search
for i in range(1,seqlen):
#log_beam_prob (batch,beam,vocab_size),每个beam的可能产生的概率
log_beam_prob = log_beam_prob.unsqueeze(-1) + log_LM_prob[:,i,:].unsqueeze(1).repeat(1,beam_size,1)
#选择当前步概率最高的token
log_beam_prob, index = log_beam_prob.view(batch,-1).topk(beam_size,sorted = True)
#下面的计算:beam_id选出新beam来源于之前的哪个beam;index代表真实的token id
#beam_id,index (batch,beam)
beam_id = index//vocab_size
index = index%vocab_size
mid = torch.Tensor([])
#对batch内每个样本循环,选出beam的同时拼接上新生成的token id
for j,bid,idx in zip(range(batch),beam_id,index):
x = torch.cat([indices[j][bid],idx.unsqueeze(-1)],-1)
mid = torch.cat([mid,x.unsqueeze(0)],0)
indices = mid
return indices,log_beam_prob
if __name__=='__main__':
# 建立一个语言模型 LM_prob (batch,seqlen,vocab_size)
LM_prob = F.softmax(torch.randn([32,20,1000]),dim = -1)
#最终返回每个候选,以及每个候选的log_prob,shape为(batch,beam_size,seqlen)
indices,log_prob = beam_search(LM_prob,beam_size = 3)
print(indices.shape,log_prob.shape)
print(indices[0, :, :])
torch.Size([32, 3]) torch.Size([32, 3, 1]) torch.Size([32, 20, 1000])
torch.Size([32, 3, 20]) torch.Size([32, 3])
tensor([[114., 442., 752., 774., 195., 187., 712., 711., 599., 941., 36., 146.,
301., 290., 575., 27., 628., 676., 3., 672.],
[114., 442., 752., 774., 195., 187., 712., 711., 599., 941., 36., 146.,
301., 290., 575., 27., 628., 756., 3., 672.],
[114., 442., 752., 774., 195., 187., 712., 711., 599., 941., 36., 146.,
301., 290., 575., 27., 547., 676., 3., 672.]])
K-means Clustering
import numpy as np
def kmeans(data, k, thresh=1, max_iterations=100):
# 随机初始化k个中心点
centers = data[np.random.choice(data.shape[0], k, replace=False)]
for _ in range(max_iterations):
# 计算每个样本到各个中心点的距离
distances = np.linalg.norm(data[:, None] - centers, axis=2)
# 根据距离最近的中心点将样本分配到对应的簇
labels = np.argmin(distances, axis=1)
# 更新中心点为每个簇的平均值
new_centers = np.array([data[labels == i].mean(axis=0) for i in range(k)])
# 判断中心点是否收敛,多种收敛条件可选
# 条件1:中心点不再改变
if np.all(centers == new_centers):
break
# 条件2:中心点的阈值小于某个阈值
# center_change = np.linalg.norm(new_centers - centers)
# if center_change < thresh:
# break
centers = new_centers
return labels, centers
# 生成一些随机数据作为示例输入
data = np.random.rand(100, 2) # 100个样本,每个样本有两个特征
# 手动实现K均值算法
k = 3 # 聚类数为3
labels, centers = kmeans(data, k)
# 打印簇标签和聚类中心点
print("簇标签:", labels)
print("聚类中心点:", centers)
簇标签: [0 1 0 0 2 1 2 1 1 0 0 0 1 1 2 1 2 1 0 2 2 1 2 0 0 1 1 0 2 0 1 1 2 0 1 0 1
0 0 1 2 2 1 2 2 2 2 2 1 1 0 0 0 2 0 0 1 2 2 1 2 1 1 1 0 1 1 0 1 0 0 1 2 2
1 2 0 0 0 0 0 1 0 1 2 2 0 0 2 2 2 0 0 2 1 1 1 0 1 2]
聚类中心点: [[0.77716581 0.33047021]
[0.50876926 0.77695796]
[0.22867546 0.25998421]]
Layer Normalization
import torch
from torch import nn
class LN(nn.Module):
# 初始化
def __init__(self, normalized_shape, # 在哪个维度上做LN
eps:float = 1e-5, # 防止分母为0
elementwise_affine:bool = True): # 是否使用可学习的缩放因子和偏移因子
super(LN, self).__init__()
# 需要对哪个维度的特征做LN, torch.size查看维度
self.normalized_shape = normalized_shape # [c,w*h]
self.eps = eps
self.elementwise_affine = elementwise_affine
# 构造可训练的缩放因子和偏置
if self.elementwise_affine:
self.gain = nn.Parameter(torch.ones(normalized_shape)) # [c,w*h]
self.bias = nn.Parameter(torch.zeros(normalized_shape)) # [c,w*h]
# 前向传播
def forward(self, x: torch.Tensor): # [b,c,w*h]
# 需要做LN的维度和输入特征图对应维度的shape相同
assert self.normalized_shape == x.shape[-len(self.normalized_shape):] # [-2:]
# 需要做LN的维度索引
dims = [-(i+1) for i in range(len(self.normalized_shape))] # [b,c,w*h]维度上取[-1,-2]维度,即[c,w*h]
# 计算特征图对应维度的均值和方差
mean = x.mean(dim=dims, keepdims=True) # [b,1,1]
mean_x2 = (x**2).mean(dim=dims, keepdims=True) # [b,1,1]
var = mean_x2 - mean**2 # [b,c,1,1]
x_norm = (x-mean) / torch.sqrt(var+self.eps) # [b,c,w*h]
# 线性变换
if self.elementwise_affine:
x_norm = self.gain * x_norm + self.bias # [b,c,w*h]
return x_norm
x = torch.linspace(0, 23, 24, dtype=torch.float32) # 构造输入层
x = x.reshape([2,3,2*2]) # [b,c,w*h]
print(f"x[0] before LN: {x[0]}")
# 实例化
ln = LN(x.shape[1:])
# 前向传播
x = ln(x)
print(f"x[0] after LN: {x[0]}")
print(x.shape)
x[0] before LN: tensor([[ 0., 1., 2., 3.],
[ 4., 5., 6., 7.],
[ 8., 9., 10., 11.]])
x[0] after LN: tensor([[-1.5933, -1.3036, -1.0139, -0.7242],
[-0.4345, -0.1448, 0.1448, 0.4345],
[ 0.7242, 1.0139, 1.3036, 1.5933]], grad_fn=<SelectBackward0>)
torch.Size([2, 3, 4])
Batch Normalization
import numpy as np
class MyBN:
def __init__(self, momentum=0.01, eps=1e-5, feat_dim=12):
"""
初始化参数值
:param momentum: 动量,用于计算每个batch均值和方差的滑动均值
:param eps: 防止分母为0
:param feat_dim: 特征维度
"""
# 均值和方差的滑动均值
self._running_mean = np.zeros(shape=(feat_dim, ))
self._running_var = np.ones(shape=(feat_dim, ))
# 更新self._running_xxx时的动量
self._momentum = momentum
# 防止分母计算为0
self._eps = eps
# 对应Batch Norm中需要更新的beta和gamma,采用pytorch文档中的初始化值
self._beta = np.zeros(shape=(feat_dim, ))
self._gamma = np.ones(shape=(feat_dim, ))
self.training = True
def batch_norm(self, x):
"""
BN向传播
:param x: 数据
:return: BN输出
"""
if self.training:
x_mean = x.mean(axis=0)
x_var = x.var(axis=0)
# 对应running_mean的更新公式
self._running_mean = (1-self._momentum)*x_mean + self._momentum*self._running_mean
self._running_var = (1-self._momentum)*x_var + self._momentum*self._running_var
# 对应论文中计算BN的公式
x_hat = (x-x_mean)/np.sqrt(x_var+self._eps)
else:
x_hat = (x-self._running_mean)/np.sqrt(self._running_var+self._eps)
return self._gamma*x_hat + self._beta
def __call__(self, x):
"""
调用方法,实现BN的前向传播
:param x: 输入数据
:return: BN输出
"""
return self.batch_norm(x)
x = np.linspace(0, 23, 24*10, dtype=np.float32) # 构造输入层
x = x.reshape([20,3*2*2]) # [b,c*w*h]
print(f"x[0] before BN: {x[:,0]}")
# 实例化
bn = MyBN()
# 前向传播
x = bn(x)
print(f"x[0] after BN: {x[:,0]}")
print(x.shape)
x[0] before BN: [ 0. 1.1548117 2.3096235 3.464435 4.619247 5.7740583
6.92887 8.083682 9.238494 10.393306 11.548117 12.702929
13.85774 15.012552 16.167364 17.322176 18.476988 19.6318
20.786612 21.941423 ]
x[0] after BN: [-1.64750874 -1.47408676 -1.30066478 -1.1272428 -0.95382082 -0.78039891
-0.60697693 -0.43355498 -0.26013297 -0.08671099 0.08671085 0.26013285
0.43355483 0.60697681 0.78039879 0.95382077 1.1272428 1.30066478
1.47408676 1.64750874]
(20, 12)
二维卷积
import numpy as np
# 2D 卷积实现,带偏置
def conv2d(image, kernel, bias=0, stride=1, padding=0):
# 获取输入图像和卷积核的大小
img_h, img_w = image.shape
kernel_h, kernel_w = kernel.shape
# 计算输出特征图的大小
output_h = (img_h - kernel_h + 2 * padding) // stride + 1
output_w = (img_w - kernel_w + 2 * padding) // stride + 1
# 对输入图像进行填充
if padding > 0:
image = np.pad(image, ((padding, padding), (padding, padding)), mode='constant', constant_values=0)
# 初始化输出矩阵
output = np.zeros((output_h, output_w))
# 执行卷积操作
for i in range(0, output_h):
for j in range(0, output_w):
# 选择输入图像中的对应区域
region = image[i*stride:i*stride+kernel_h, j*stride:j*stride+kernel_w]
# 进行逐点乘积并求和,再加上偏置
output[i, j] = np.sum(region * kernel) + bias
return output
# 示例输入图像和卷积核
image = np.array([
[1, 1, 1, 0, 0],
[0, 1, 1, 1, 0],
[0, 0, 1, 1, 1],
[1, 1, 0, 0, 0],
[0, 1, 1, 1, 0]
])
kernel = np.array([
[1, 0, 1],
[0, 1, 0],
[1, 0, 1]
])
# 定义偏置项
bias = 0.1
# 执行卷积
output = conv2d(image, kernel, bias=bias, stride=1, padding=0)
print("卷积结果带偏置:")
print(output)
卷积结果带偏置:
[[4.1 3.1 4.1]
[2.1 4.1 2.1]
[3.1 3.1 3.1]]
训练模型
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
# 超参数设置
batch_size = 64
epochs = 10
learning_rate = 0.001
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 数据预处理与加载
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 定义模型
class SimpleNN(nn.Module):
def __init__(self):
super(SimpleNN, self).__init__()
self.fc1 = nn.Linear(28*28, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(-1, 28*28) # Flatten
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
model = SimpleNN().to(device)
# 损失函数与优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 训练循环
for epoch in range(epochs):
model.train() # 设置为训练模式
running_loss = 0.0
correct = 0
total = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
epoch_loss = running_loss / len(train_loader)
epoch_accuracy = correct / total * 100
print(f"Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%")
# 验证模型
model.eval() # 设置为评估模式
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
accuracy = correct / total * 100
print(f"Validation Accuracy: {accuracy:.2f}%")
# 保存模型
torch.save(model.state_dict(), 'model.pth')
print("Model saved!")
100%|██████████| 9.91M/9.91M [01:54<00:00, 86.9kB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 137kB/s]
100%|██████████| 1.65M/1.65M [00:14<00:00, 113kB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 2.27MB/s]
Epoch [1/10], Loss: 0.3842, Accuracy: 88.89%
Validation Accuracy: 93.28%
Epoch [2/10], Loss: 0.1961, Accuracy: 94.20%
Validation Accuracy: 95.34%
Epoch [3/10], Loss: 0.1411, Accuracy: 95.78%
Validation Accuracy: 95.85%
Epoch [4/10], Loss: 0.1137, Accuracy: 96.63%
Validation Accuracy: 96.58%
Epoch [5/10], Loss: 0.0977, Accuracy: 97.01%
Validation Accuracy: 97.03%
Epoch [6/10], Loss: 0.0864, Accuracy: 97.33%
Validation Accuracy: 95.62%
Epoch [7/10], Loss: 0.0782, Accuracy: 97.55%
Validation Accuracy: 97.34%
Epoch [8/10], Loss: 0.0683, Accuracy: 97.83%
Validation Accuracy: 97.16%
Epoch [9/10], Loss: 0.0629, Accuracy: 97.95%
Validation Accuracy: 97.41%
Epoch [10/10], Loss: 0.0585, Accuracy: 98.15%
Validation Accuracy: 97.59%
Model saved!