本专栏将主要介绍基于GAN的时序缺失数据填补。提起时序数据,就离不开一个神经网络——循环神经网络(Recurrent Neural Network, RNN)。RNN是一类用于处理序列数据的神经网络。RNN对具有序列特性的数据非常有效,它能挖掘数据中的时序信息。因为在介绍时序缺失数据填补,就离不开RNN的身影。本文将介绍循环神经网络RNN,并再次基础上完成基于pytorch的简单RNN代码实现,帮助更加深入了解RNN。
- 一、RNN介绍
- 二、PyTorch相关语法介绍
- 2.1 RNNcell
- 2.2 RNN
- 三、代码实战(1)
- 3.1 功能描述
- 3.2 利用RNNcell实现
- 3.3 利用RNN实现
- 四、代码实战(2)
- 4.1 功能描述
- 4.2 利用RNN实现
- 4.3 完整代码
- 4.4 运行结果
关于循环神经网络RNN的介绍可以参考这篇文章:循环神经网络RNN入门介绍,这里不进行过多赘述。
二、PyTorch相关语法介绍 2.1 RNNcellRNNcell结构如下: h0为先验,若不存在先验,则h0是维度与h1,h2相同的全零向量。 语法如下:
torch.nn.RNNCell(input_size, hidden_size, bias=True, nonlinearity='tanh', device=None, dtype=None)
具有tanh或ReLU非线性的RNN单元。
- input_size:输入维度;
- hidden_size:隐藏维度;
- bias:如果为False,则该层不使用偏置权重b_ih和b_hh,默认为True;
- nonlinearity:使用的非线性层,可以是’tanh’或’relu’,默认为’tanh’;
即 h 1 = R N N C e l l ( x 1 , h 0 ) h_1=RNNCell(x_1,h_0) h1=RNNCell(x1,h0) 输入维度:(batch_size,input_size) 隐含层维度:(batch_size,hidden_size) 输出维度:(batch_size,hidden_size)
因此数据集尺寸:(seqLen,batch_size,input_size)
代码示例:
import torch
import torch.nn as nn
batch_size=1
seq_len=3
input_size=4
hidden_size=2
cell=nn.RNNCell(input_size=input_size,hidden_size=hidden_size)
#(seq_len,batch,input)
dataset=torch.randn(seq_len,batch_size,input_size)
hidden=torch.zeros(batch_size,hidden_size) # h0
for idx,input in enumerate(dataset): # 3
print('='*20,idx,'='*20)
print('Input size:',input.shape)
hidden=cell(input,hidden)
print('output size:',hidden.shape)
print(hidden)
运行结果:
RNN用于定义多层循环神经网络。 上图中num_layers=3. 语法如下:
torch.nn.RNN(input_size, hidden_size, num_layers, bias=True, nonlinearity='tanh', batch_first, dropout, bidirectional)
batch_first:如果为True,则输入和输出张量将作为(batch,seq,feature)而不是(seq,batch,feature),默认为False;
out,hidden=rnn(x,h0) x=[x1,x2,…,xn] out=[h1,h2,…,hn] hidden=hn(最后一个h)
输入维度:(seq_len, batch_size,input_size) 隐含层维度:(num_layers, batch_size,hidden_size) 输出维度:(seq_len, batch_size,hidden_size) 隐含层维度:(num_layers, batch_size,hidden_size)
代码示例:
import torch
import torch.nn as nn
batch_size=1
seq_len=3
input_size=4
hidden_size=2
num_layers=1
rnn=nn.RNN(input_size=input_size,hidden_size=hidden_size,num_layers=num_layers)
#(seq_len,batch,input)
input=torch.randn(seq_len,batch_size,input_size)
hidden=torch.zeros(num_layers,batch_size,hidden_size) # h0
out,hidden=rnn(input,hidden)
print('Output size:',out.shape)
print(out)
print('Hidden size:',hidden.shape)
print(hidden)
运行结果:
本代码实现将文本序列“hello”转为序列“ohlol”。
3.2 利用RNNcell实现首先需要将文本转为独热编码:
idx2char=['e','h','l','o']
x_data=[1,0,2,2,3]
y_data=[3,1,2,3,2]
one_hot_lookup=[[1,0,0,0],
[0,1,0,0],
[0,0,1,0],
[0,0,0,1]]
x_one_hot=[one_hot_lookup[x] for x in x_data]
相关维度: input_size=4 hidden_size=4 输入维度:(batch_size,input_size) 隐含层维度:(batch_size,hidden_size) 输出维度:(batch_size,hidden_size)
因此序列尺寸:(seqLen,batch_size,input_size)
完整代码如下:
import torch
import torch.nn as nn
input_size=4
hidden_size=4
batch_size=1
idx2char=['e','h','l','o']
x_data=[1,0,2,2,3]
y_data=[3,1,2,3,2]
one_hot_lookup=[[1,0,0,0],
[0,1,0,0],
[0,0,1,0],
[0,0,0,1]]
x_one_hot=[one_hot_lookup[x] for x in x_data]
inputs=torch.Tensor(x_one_hot).view(-1,batch_size,input_size) #(5,1,4)
labels=torch.LongTensor(y_data).view(-1,1) #(5,1)
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, batch_size):
super(RNN, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.batch_size = batch_size
self.rnncell = nn.RNNCell(input_size=input_size, hidden_size=hidden_size)
def forward(self, input, hidden):
hidden = self.rnncell(input, hidden)
return hidden
def init_hidden(self):
return torch.zeros(self.batch_size, self.hidden_size) # h0
rnn = RNN(input_size, hidden_size, batch_size)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(rnn.parameters(),lr=0.1)
for epoch in range(20):
loss = 0
optimizer.zero_grad() # 梯度清零
hidden = rnn.init_hidden() # 初始化h0
print('Predicted string: ', end='')
for input, label in zip(inputs, labels):
hidden = rnn(input, hidden)
loss += criterion(hidden, label)
_, idx = hidden.max(dim=1) # 下标最大值
print(idx2char[idx.item()], end='')
loss.backward()
optimizer.step()
print(',Epoch [%d/15] loss=%.4f' % (epoch + 1, loss.item()))
运行结果:
代码示例:
import torch
import torch.nn as nn
input_size=4
hidden_size=4
batch_size=1
num_layers=1
seq_len=5
idx2char=['e','h','l','o']
x_data=[1,0,2,2,3]
y_data=[3,1,2,3,2]
one_hot_lookup=[[1,0,0,0],
[0,1,0,0],
[0,0,1,0],
[0,0,0,1]]
x_one_hot=[one_hot_lookup[x] for x in x_data]
inputs=torch.Tensor(x_one_hot).view(seq_len,batch_size,input_size) #(5,1,4)
labels=torch.LongTensor(y_data) #(5,)
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, batch_size,num_layers):
super(RNN, self).__init__()
self.num_layers=num_layers
self.input_size = input_size
self.hidden_size = hidden_size
self.batch_size = batch_size
self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size,num_layers=num_layers)
def forward(self, input,):
hidden=torch.zeros(self.num_layers,self.batch_size, self.hidden_size) # h0
out,_ = self.rnn(input, hidden)
return out.view(-1,self.hidden_size)
rnn = RNN(input_size, hidden_size, batch_size,num_layers)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(rnn.parameters(),lr=0.1)
for epoch in range(20):
optimizer.zero_grad() # 梯度清零
outputs = rnn(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
_, idx = outputs.max(dim=1) # 下标最大值
idx=idx.data.numpy()
print('Predicted: ',''.join([idx2char[x] for x in idx]), end='')
print(',Epoch [%d/15] loss=%.4f' % (epoch + 1, loss.item()))
运行结果:
本案例将实现利用输入正弦sin输出余弦cos曲线。
4.2 利用RNN实现首先进行参数定义:
input_size=1
hidden_size=16
batch_size=1
num_layers=1
seq_len=50
定义RNN网络框架:
class RNN(nn.Module):
def __init__(self):
super(RNN, self).__init__()
self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size,num_layers=num_layers)
self.linear=nn.Linear(hidden_size,1)
def forward(self,x,hidden):
out,_= self.rnn(x, hidden)
out=out.view(-1, hidden_size)
out=self.linear(out)
out=out.unsqueeze(dim=1) # 扩充维度 由(50,1)扩充为(50,1,1)
return out
模型实例化:
rnn = RNN()
print(rnn)
# RNN(
# (rnn): RNN(1, 16)
# (linear): Linear(in_features=16, out_features=1, bias=True)
# )
loss_fun = nn.MSELoss()
optimizer = torch.optim.Adam(rnn.parameters(),lr=0.01)
模型训练:
hidden_prev = torch.zeros(num_layers,batch_size,hidden_size) #h0
plt.figure(figsize=(20, 4))
plt.ion() # 不断绘制
for it in range(1000+1):
steps = np.linspace(it * np.pi, (it + 1) * np.pi, seq_len, dtype=np.float32)
x_np = np.sin(steps)
y_np = np.cos(steps)
x = torch.from_numpy(x_np[:, np.newaxis, np.newaxis]) # 扩展维度(50,1,1)
y = torch.from_numpy(y_np[:, np.newaxis, np.newaxis]) # 扩展维度(50,1,1)
y_pred = rnn(x, hidden_prev)
loss = loss_fun(y_pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 模型可视化
plt.plot(steps, y_np, 'b-')
plt.plot(steps, y_pred.data.numpy().flatten(), 'r-')
plt.draw() # 重新绘制
plt.pause(0.05)
if it % 100 ==0:
print("Iter:{} loss:{}".format(it,loss))
plt.ioff() # 停止绘制
plt.show()
最后进行测试:
#测试
steps = np.linspace(10 * np.pi, (10 + 1) * np.pi, seq_len, dtype=np.float32)
x_np = np.sin(steps)
y_np = np.cos(steps)
# 模型可视化
plt.scatter(steps, y_np, c='b')
x = torch.from_numpy(x_np[:, np.newaxis, np.newaxis]) # 扩展维度(50,1,1)
y_pred= rnn(x,hidden_prev)
plt.scatter(steps, y_pred.data.numpy().flatten(), c='r')
plt.show()
4.3 完整代码
完整代码如下:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
input_size=1
hidden_size=16
batch_size=1
num_layers=1
seq_len=50
class RNN(nn.Module):
def __init__(self):
super(RNN, self).__init__()
self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size,num_layers=num_layers)
self.linear=nn.Linear(hidden_size,1)
def forward(self,x,hidden):
out,_= self.rnn(x, hidden)
out=out.view(-1, hidden_size)
out=self.linear(out)
out=out.unsqueeze(dim=1) # 扩充维度 由(50,1)扩充为(50,1,1)
return out
rnn = RNN()
print(rnn)
# RNN(
# (rnn): RNN(1, 16)
# (linear): Linear(in_features=16, out_features=1, bias=True)
# )
loss_fun = nn.MSELoss()
optimizer = torch.optim.Adam(rnn.parameters(),lr=0.01)
hidden_prev = torch.zeros(num_layers,batch_size,hidden_size) #h0
plt.figure(figsize=(12, 3))
plt.ion() # 不断绘制
for it in range(1000+1):
steps = np.linspace(it * np.pi, (it + 1) * np.pi, seq_len, dtype=np.float32)
x_np = np.sin(steps)
y_np = np.cos(steps)
x = torch.from_numpy(x_np[:, np.newaxis, np.newaxis]) # 扩展维度(50,1,1)
y = torch.from_numpy(y_np[:, np.newaxis, np.newaxis]) # 扩展维度(50,1,1)
y_pred = rnn(x, hidden_prev)
loss = loss_fun(y_pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 模型可视化
plt.plot(steps, y_np, 'b-')
plt.plot(steps, y_pred.data.numpy().flatten(), 'r-')
plt.draw() # 重新绘制
plt.pause(0.05)
if it % 100 ==0:
print("Iter:{} loss:{}".format(it,loss))
plt.ioff() # 停止绘制
plt.show()
#测试
steps = np.linspace(10 * np.pi, (10 + 1) * np.pi, seq_len, dtype=np.float32)
x_np = np.sin(steps)
y_np = np.cos(steps)
# 模型可视化
plt.scatter(steps, y_np, c='b')
x = torch.from_numpy(x_np[:, np.newaxis, np.newaxis]) # 扩展维度(50,1,1)
y_pred= rnn(x,hidden_prev)
plt.scatter(steps, y_pred.data.numpy().flatten(), c='r')
plt.show()
4.4 运行结果
运行结果如下: loss损失如下:
测试结果如下:
【解释】:这里前4-5个点存在较大的偏差,原因可能是前面点没有先验h,因此存在偏差较大,为获得测试较小偏差,可以对测试代码进行适度修改:
#测试
steps = np.linspace(10 * np.pi, (10 + 1) * np.pi, seq_len, dtype=np.float32)
x_np = np.sin(steps)
y_np = np.cos(steps)
# 模型可视化
plt.scatter(steps[5:,], y_np[5:,], c='b')
x = torch.from_numpy(x_np[:, np.newaxis, np.newaxis]) # 扩展维度(50,1,1)
y_pred= rnn(x,hidden_prev)
plt.scatter(steps[5:,], y_pred.data.numpy().flatten()[5:,], c='r')
plt.show()
运行效果如下:
参考:
- https://blog.csdn.net/qq_41775769/article/details/121707309
- http://www.ichenhua.cn/read/302
- https://www.sohu.com/a/402391876_100286367