안녕하세요. 이번 포스트를 마지막으로 김성훈 교수님의 PytorchToAll 강의를 끝내려합니다. 마지막 포스트에서는 시계열 처리에 유효한 대표적인 인공신경망 구조인 RNN을 다루도록 하겠습니다. 본 포스트에서 사용된 코드는 유투브에 있는 Sung Kim님의 강의에서 가져왔음을 밝힙니다.
RNN은 Recurrent Neural Network, 즉 자기 회기적인 인공신경망을 의미합니다. RNN이 다른 인공신경망과 가지는 가장 큰 특징은 한 시점의 학습이 다른 시점의 학습의 영향을 받는다는점에 있습니다. 다양한 RNN 구조를 pytorch에서는 다음과 같이 표현할 수 있습니다.
cell = nn.RNN(input_size=4, hidden_size=2, batch_first=True)
cell = nn.GRU(input_size=4, hidden_size=2, batch_first=True)
cell = nn.LSTM(input_size=4, hidden_size=2, batch_first=True)
hidden size는 output size와 동일하며, input단에서는 batch_first를 True값을 줍니다.
기본적인 input과 output을 만드는 과정은 아래 코드와 같습니다.
import torch
import torch.nn as nn
from torch.autograd import Variable
# One hot encoding for each char in 'hello'
h = [1, 0, 0, 0]
e = [0, 1, 0, 0]
l = [0, 0, 1, 0]
o = [0, 0, 0, 1]
# One cell RNN input_dim (4) -> output_dim (2). sequence: 5
cell = nn.RNN(input_size=4, hidden_size=2, batch_first=True)
# (num_layers * num_directions, batch, hidden_size) whether batch_first=True or False
hidden = Variable(torch.randn(1, 1, 2))
# Propagate input through RNN
# Input: (batch, seq_len, input_size) when batch_first=True
inputs = Variable(torch.Tensor([h, e, l, l, o]))
inputs = inputs.view(1, 5, -1)
out, hidden = cell(inputs, hidden)
print("sequence input size", inputs.size(), "out size", out.size())
hihell을 인풋으로, ihello를 output으로 하는 기본적인 RNN 모델을 구축해봅시다. 위의 과정과 동일하게 one-hot encoding을 거칩니다.
import sys
import torch
import torch.nn as nn
from torch.autograd import Variable
idx2char = ['h', 'i', 'e', 'l', 'o']
# Teach hihell -> ihello
x_data = [0, 1, 0, 2, 3, 3] # hihell
one_hot_lookup = [[1, 0, 0, 0, 0], # 0
[0, 1, 0, 0, 0], # 1
[0, 0, 1, 0, 0], # 2
[0, 0, 0, 1, 0], # 3
[0, 0, 0, 0, 1]] # 4
y_data = [1, 0, 2, 3, 3, 4] # ihello
x_one_hot = [one_hot_lookup[x] for x in x_data]
# As we have one batch of samples, we will change them to variables only once
inputs = Variable(torch.Tensor(x_one_hot))
labels = Variable(torch.LongTensor(y_data))
그 후 hyperparameter를 세팅합니다.
num_classes = 5
input_size = 5 # one-hot size
hidden_size = 5 # output from the RNN. 5 to directly predict one-hot
batch_size = 1 # one sentence
sequence_length = 1 # One by one
num_layers = 1 # one-layer rnn
이후 RNN을 사용하는 모델을 구축합니다. Input x에 대한 output의 shape을 주의해야합니다.
class Model(nn.Module):
def __init__(self):
super(Model, self).__init__()
self.rnn = nn.RNN(input_size=input_size,
hidden_size=hidden_size, batch_first=True)
def forward(self, hidden, x):
# Reshape input (batch first)
x = x.view(batch_size, sequence_length, input_size)
# Propagate input through RNN
# Input: (batch, seq_len, input_size)
# hidden: (num_layers * num_directions, batch, hidden_size)
out, hidden = self.rnn(x, hidden)
return hidden, out.view(-1, num_classes)
def init_hidden(self):
# Initialize hidden and cell states
# (num_layers * num_directions, batch, hidden_size)
return Variable(torch.zeros(num_layers, batch_size, hidden_size))
# Instantiate RNN model
model = Model()
모델의 학습을 위한 loss function을 도입합니다.
# Set loss and optimizer function
# CrossEntropyLoss = LogSoftmax + NLLLoss
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)
반복을 통한 학습을 진행합니다.
# Train the model
for epoch in range(100):
optimizer.zero_grad()
loss = 0
hidden = model.init_hidden()
sys.stdout.write("predicted string: ")
for input, label in zip(inputs, labels):
# print(input.size(), label.size())
hidden, output = model(hidden, input)
val, idx = output.max(1)
sys.stdout.write(idx2char[idx.data[0]])
loss += criterion(output, label)
print(", epoch: %d, loss: %1.3f" % (epoch + 1, loss.data[0]))
loss.backward()
optimizer.step()
print("Learning finished!")
위와는 다르게 loop를 돌리지 않고 더 쉽게 각 sequence 마다 학습을 진행할수도 있습니다.
# Train the model
for epoch in range(100):
outputs = rnn(inputs)
optimizer.zero_grad()
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
_, idx = outputs.max(1)
idx = idx.data.numpy()
result_str = [idx2char[c] for c in idx.squeeze()]
print("epoch: %d, loss: %1.3f" % (epoch + 1, loss.data[0]))
print("Predicted string: ", ''.join(result_str))
print("Learning finished!")
강의에서는 one hot보다 embedding 방식으로 학습을 진행하면 더 좋은 성능을 보임을 말합니다. 이는 0,1로 값이 고정된 one-hot보다 embedding을 통해 학습이 더욱 유연하게 진행되기 때문입니다.
RNN은 many to one, one to many, many to many와 같은 다양한 task에 사용할 수 있습니다. 강의에서는 특히 many to one 상황의 classification task에서 RNN이 어떻게 활용될 수 있는지를 보여줍니다.
예시로 사용한 Name classification에서 Input으로 one-hot이 아닌 아스키 코드를 적용한 embedding을 활용합니다. 이 과정은 아래의 코드와 같습니다.
def str2ascii_arr(msg):
arr = [ord(c) for c in msg]
return arr, len(arr)
class RNNClassifier():
def __init__():
...
self.embedding = nn.Embedding(input_size, hidden_size)
def forward():
...
embedded = self.embedding(input)
many to one을 반영한 모델 구축 부분은 아래와 같습니다.
class RNNClassifier(nn.Module):
def __init__(self, input_size, hidden_size, output_size, n_layers=1):
super(RNNClassifier, self).__init__()
self.hidden_size = hidden_size
self.n_layers = n_layers
self.embedding = nn.Embedding(input_size, hidden_size)
self.gru = nn.GRU(hidden_size, hidden_size, n_layers)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, input):
# Note: we run this all at once (over the whole input sequence)
# input = B x S . size(0) = B
batch_size = input.size(0)
# input: B x S -- (transpose) --> S x B
input = input.t()
# Embedding S x B -> S x B x I (embedding size)
print(" input", input.size())
embedded = self.embedding(input)
print(" embedding", embedded.size())
# Make a hidden
hidden = self._init_hidden(batch_size)
output, hidden = self.gru(embedded, hidden)
print(" gru hidden output", hidden.size())
# Use the last layer output as FC's input
# No need to unpack, since we are going to use hidden
fc_output = self.fc(hidden)
print(" fc output", fc_output.size())
return fc_output
def _init_hidden(self, batch_size):
hidden = torch.zeros(self.n_layers, batch_size, self.hidden_size)
return Variable(hidden)
만일 batch를 사용하여 한번에 여러 데이터를 처리한다면 각 이름의 length가 달라서 적합한 padding과정이 필요합니다. 이를 해결하기 위해 zero padding을 사용합니다.
# pad sequences and sort the tensor
def pad_sequences(vectorized_seqs, seq_lengths):
seq_tensor = torch.zeros((len(vectorized_seqs), seq_lengths.max())).long()
for idx, (seq, seq_len) in enumerate(zip(vectorized_seqs, seq_lengths)):
seq_tensor[idx, :seq_len] = torch.LongTensor(seq)
return seq_tensor
# Create necessary variables, lengths, and target
def make_variables(names):
sequence_and_length = [str2ascii_arr(name) for name in names]
vectorized_seqs = [sl[0] for sl in sequence_and_length]
seq_lengths = torch.LongTensor([sl[1] for sl in sequence_and_length])
return pad_sequences(vectorized_seqs, seq_lengths)
이후 학습을 진행하면 됩니다.
추가적으로 여러 배치를 사용할 경우 pack_padded_sequence라는 기능을 사용하면 여러 배치를 unpack하여 한번에 처리한 후 다시 pack하는 과정을 통해 효율적인 학습을 시도할 수 있습니다.
Pytorch에서는 tensor.cuda(), classfier.cuda()를 통해 GPU를 사용할 수 있으며 여러 GPU가 있을 경우에 nn.DataParallel()을 활용해 병렬 처리도 가능합니다. 더 자세히 알아보시려면 pytorch의 tutorial를 참고하시면 됩니다.
지금까지 PytorchZeroToAll를 들으며 저 나름대로 정리를 하였습니다. 많이 부족하지만 딥러닝을 공부하시면서 pytorch를 처음 시작하는 분들에게 좋은 자료가 되었으면 하는 바람입니다. 동시에 좋은 자료를 무상으로 공유하여 많이 배울 수 있는 기회를 주신 김성훈님께 깊은 감사의 말씀을 보냅니다. 앞으로도 좋은 자료로 블로그에서 찾아뵙겠습니다.