O código em Python é apresentado abaixo com os comentários.
Para uma melhor visualização do estudo, sugiro acessar o arquivo Recurrent Neural Network.ipynb
# coding: utf-8
# # Rede Neural Recorrente
# ## Objetivo
#
# Implementar uma Rede Neural Recorrente que aprenda $y_{t} = sin(y_{t-1}) + cos(y_{t-4}) + u_{t}$, onde $u_{t} \sim N\left(0, 0.09\right)$.
# ## Implementação
#
# Importamos os pacotes que serão utilizados e configuramos o *matplotlib*.
# In[1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import optimize
#import matplotlib
#get_ipython().magic('matplotlib inline')
#matplotlib.rcParams['figure.figsize'] = (14, 8)
# Fixamos o seed para manter os resultados entre as execuções e definimos a função que gera $u_{t}$.
# In[2]:
np.random.seed(seed=1)
# In[3]:
sigma = 0.09
def noise(m=0):
return np.random.normal(m, sigma)
# Definimos o número de amostras e o número de elementos na sequência para o treinamento e para a validação da rede neural.
# In[4]:
n_of_samples = 100000 # number of samples
n_per_sample = 6 # number per sample
# Criamos duas sequências: Uma para treinamento e outra para validação posterior da rede (validação cruzada).
# In[5]:
def next(y_1, y_4):
return np.sin(y_1) + np.cos(y_4) + noise()
# In[6]:
_x = [noise() for _ in range(4)] # Treinamento
_x_val = [noise(np.pi / 4) for _ in range(4)] # Validação
for i in range(n_per_sample + n_of_samples):
_x.append(next(_x[-1], _x[-4]))
_x_val.append(next(_x_val[-1], _x_val[-4]))
# Retiramos o estado transiente das séries
_x = _x[4:]
_x_val = _x_val[4:]
# Os dados para treinamento e validação serão armezenados nas variáveis *"X"* e *"X_val"*, respectivamente.
# In[7]:
X = np.zeros((n_of_samples, n_per_sample))
X_val = np.zeros((n_of_samples, n_per_sample))
target = np.zeros(n_of_samples)
target_val = np.zeros(n_of_samples)
for row in range(n_of_samples):
X[row, :] = _x[row: row + n_per_sample]
target[row] = _x[row + n_per_sample]
X_val[row, :] = _x_val[row: row + n_per_sample]
target_val[row] = _x_val[row + n_per_sample]
# ## Características da Rede Neural
#
# Definimos a função de ativação utilizada pela rede, bem como a derivada da função de ativação.
#
# Para a nossa rede, escolhemos a função $tanh(z)$.
# In[8]:
def g(z):
return np.tanh(z)
def dg(z):
return 1 - np.tanh(z) ** 2
# Definimos os métodos para a evolução dos estados da rede neural.
#
# É importante destacar que o último estágio não deverá passar pela função de ativação para não restringir nosso intervalo de saída.
#
# Definimos $S_{0}=0$.
# In[9]:
def forward_nn(X, W, U):
m, n = X.shape[0], X.shape[1]
S = np.zeros((m, n + 1))
for k in range(n - 1): # state[0] inicia com 0
S[:, k + 1] = g(X[:, k] * W[k] + S[:, k] * U)
S[:, n] = X[:, n - 1] * W[n - 1] + S[:, n - 1] * U # Último estado não ativa
return S
def output_nn(S):
return S[:, -1]
# Definimos os métodos para calcular a função custo e o gradiente do custo.
#
# $$cost (y, target) = \frac{\sum (y - target)^{2}}{n} $$
#
# $$\frac{\partial cost}{\partial y} ( y, target ) = \frac{2 * (y - target)}{n}$$
# In[10]:
def cost(y, target):
return np.sum((y - target) ** 2) / len(y)
def gradient_cost(y, target):
return 2 * (y - target) / len(y)
# Definimos o método que calcula o *BPTT - Backpropagation Through Time*.
# In[11]:
def back_propagation(X, target, S, W, U):
m = X.shape[0]
n = X.shape[1]
y = output_nn(S)
grad_s = np.zeros((m, n + 1))
grad_s[:, -1] = gradient_cost(y, target)
grad_w = np.zeros(n)
grad_u = 0
for k in range(n):
_x = X[:, n - k - 1]
_s = S[:, n - k - 1]
if k == 0:
_g = 1 # O último estado não tem ativação
else:
_g = dg(_x * W[n - k - 1] + _s * U)
grad_s[:, n - k - 1] = grad_s[:, n - k] * U * _g
grad_w[n - k - 1] = np.sum(grad_s[:, n - k] * _x * _g)
grad_u += np.sum(grad_s[:, n - k] * _s * _g)
return grad_s, grad_w, grad_u
# ## Treinamento da Rede Neural
#
# Definimos as funções para o treinamento da rede utilizando otimização (Nonlinear Conjugate Gradient Algorithm).
# In[12]:
# Função custo com parâmetros w, u
def f(wu):
w = wu[0:-1]
u = wu[-1]
y = output_nn(forward_nn(X, w, u))
return cost(y, target)
# Gradiente da função custo em relação aos parâmetros w, u (Utilizando Backpropagation)
def fprime(wu):
w = wu[0:-1]
u = wu[-1]
S = forward_nn(X, w, u)
_, grad_w, grad_u = back_propagation(X, target, S, w, u)
return np.append(grad_w, grad_u)
# In[13]:
def train(X, W_initial=[noise() for _ in range(n_per_sample)], U_initial=noise()):
wu = np.append(W_initial, U_initial)
return optimize.fmin_cg(f, wu, fprime=fprime)
# Faremos a checagem numérica para verificar se o cálculo do gradiente utilizando o BPTT está correto.
#
# Assim, esperamos que o erro seja um número próximo de zero.
# In[14]:
print("Erro: {:.2}".format(optimize.check_grad(f, fprime, [1 for _ in range(n_per_sample + 1)])))
# Com a checagem indicando que os cálculos estão corretos, faremos o treinamento da rede para saber os parâmetros W e U que minimizam nossa função de custo.
# In[15]:
t = train(X)
w_opt = t[0:-1]
u_opt = t[-1]
print("\nW: {} \nU: {}".format(w_opt, u_opt))
# ## Verificação do Resultado
#
# Com os parâmetros W e U calculados pela otimização, faremos a evolução dos dados com as sequências de treinamento e de validação (aquelas que não foram utilizadas no treinamento).
# In[16]:
y_opt = output_nn(forward_nn(X, w_opt, u_opt))
y_val = output_nn(forward_nn(X_val, w_opt, u_opt))
# Comparação da função custo (média do quadrado dos erros) e coeficiente de determinação.
# In[17]:
def r2_score(y, target):
m = np.average(target)
return 1 - (np.sum((y - target) ** 2) / np.sum((m - target) ** 2))
# In[18]:
print('Training')
print(' Cost: {:.3}'.format(cost(y_opt, target)))
print(' R\u00b2: {:.1f}%'.format(100 * r2_score(y_opt, target)))
print('Validation')
print(' Cost: {:.3}'.format(cost(y_val, target_val)))
print(' R\u00b2: {:.1f}%'.format(100 * r2_score(y_val, target_val)))
# Vemos que o custo final da amostra de validação foi próximo ao da amostra de treinamento. Isso sugere que a rede conseguiu aprender sem fazer *overfitting*.
# In[19]:
df = pd.DataFrame()
df['Target'] = target_val[0:100]
df['Predicted'] = y_val[0:100]
df.plot()
plt.show()
# In[20]:
def scatterPlot(actual, predicted):
plt.scatter(actual, predicted)
range = np.array([actual.min(), actual.max()])
plt.plot(range, range, 'white')
plt.plot(range, range + sigma, 'orange')
plt.plot(range, range - sigma, 'orange')
plt.xlabel("Target")
plt.ylabel("Predicted")
plt.show()
scatterPlot(target_val, y_val)
# Verificação do quadrado do erro ao longo da sequência de validação:
# In[21]:
df = pd.DataFrame()
df['Erro\u00b2'] = ((y_val - target_val) ** 2)
axes = df.plot()
plt.show()
# ## Evolução da Rede ao Longo do Tempo
#
# Verificaremos agora o poder da rede de prever os demais valores da sequência, comparando com uma seqência gerada através da regra $y_{t} = sin(y_{t-1}) + cos(y_{t-4}) + u_{t}$
# In[22]:
y_real = [noise(np.pi / 2) for _ in range(n_per_sample)]
y_rede_neural = y_real.copy()
for i in range(90):
y_real.append(next(y_real[-1], y_real[-4]))
l = np.asmatrix(y_rede_neural[i: i + n_per_sample])
out = output_nn(forward_nn(l, w_opt, u_opt)) # calcula o próximo y utilizando a rede neural
y_rede_neural.append(out[0])
# In[23]:
df = pd.DataFrame()
df['y_real'] = y_real
df['y_rede_neural'] = y_rede_neural
df.plot()
plt.show()