Primeira vez aqui? Seja bem vindo e cheque o FAQ!
x

Como implementar em Python uma Rede Neural Recorrente que aprenda uma sequência com senos e cossenos?

+2 votos
1,038 visitas
perguntada Jun 27, 2016 em Aprendizagem de Máquinas por Caue (231 pontos)  

Implemente explicitamente uma rede neural recorrente (BPTT) para aprender
y(t) = sin(y(t − 1)) + cos(y(t − 4)) + u(t),
onde u(t) segue uma distribuição normal N(0, 0.09).
Teste a habilidade de seu preditor out of sample.

Compartilhe

1 Resposta

+1 voto
respondida Jun 27, 2016 por Caue (231 pontos)  
 
Melhor resposta

O código em Python é apresentado abaixo com os comentários.
Para uma melhor visualização do estudo, sugiro acessar o arquivo Recurrent Neural Network.ipynb


# coding: utf-8

# # Rede Neural Recorrente

# ## Objetivo
#
# Implementar uma Rede Neural Recorrente que aprenda $y_{t} = sin(y_{t-1}) + cos(y_{t-4}) + u_{t}$, onde $u_{t} \sim N\left(0, 0.09\right)$.

# ## Implementação
#
# Importamos os pacotes que serão utilizados e configuramos o *matplotlib*.

# In[1]:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import optimize

#import matplotlib

#get_ipython().magic('matplotlib inline')
#matplotlib.rcParams['figure.figsize'] = (14, 8)

# Fixamos o seed para manter os resultados entre as execuções e definimos a função que gera $u_{t}$.

# In[2]:

np.random.seed(seed=1)

# In[3]:

sigma = 0.09


def noise(m=0):
    return np.random.normal(m, sigma)


# Definimos o número de amostras e o número de elementos na sequência para o treinamento e para a validação da rede neural.

# In[4]:

n_of_samples = 100000  # number of samples
n_per_sample = 6  # number per sample


# Criamos duas sequências: Uma para treinamento e outra para validação posterior da rede (validação cruzada).

# In[5]:

def next(y_1, y_4):
    return np.sin(y_1) + np.cos(y_4) + noise()


# In[6]:

_x = [noise() for _ in range(4)]  # Treinamento
_x_val = [noise(np.pi / 4) for _ in range(4)]  # Validação
for i in range(n_per_sample + n_of_samples):
    _x.append(next(_x[-1], _x[-4]))
    _x_val.append(next(_x_val[-1], _x_val[-4]))

# Retiramos o estado transiente das séries
_x = _x[4:]
_x_val = _x_val[4:]

# Os dados para treinamento e validação serão armezenados nas variáveis *"X"* e *"X_val"*, respectivamente.

# In[7]:

X = np.zeros((n_of_samples, n_per_sample))
X_val = np.zeros((n_of_samples, n_per_sample))

target = np.zeros(n_of_samples)
target_val = np.zeros(n_of_samples)

for row in range(n_of_samples):
    X[row, :] = _x[row: row + n_per_sample]
    target[row] = _x[row + n_per_sample]
    X_val[row, :] = _x_val[row: row + n_per_sample]
    target_val[row] = _x_val[row + n_per_sample]


# ## Características da Rede Neural
#
# Definimos a função de ativação utilizada pela rede, bem como a derivada da função de ativação.
#
# Para a nossa rede, escolhemos a função $tanh(z)$.

# In[8]:

def g(z):
    return np.tanh(z)


def dg(z):
    return 1 - np.tanh(z) ** 2


# Definimos os métodos para a evolução dos estados da rede neural.
#
# É importante destacar que o último estágio não deverá passar pela função de ativação para não restringir nosso intervalo de saída.
#
# Definimos $S_{0}=0$.

# In[9]:

def forward_nn(X, W, U):
    m, n = X.shape[0], X.shape[1]
    S = np.zeros((m, n + 1))
    for k in range(n - 1):  # state[0] inicia com 0
        S[:, k + 1] = g(X[:, k] * W[k] + S[:, k] * U)
    S[:, n] = X[:, n - 1] * W[n - 1] + S[:, n - 1] * U  # Último estado não ativa
    return S


def output_nn(S):
    return S[:, -1]


# Definimos os métodos para calcular a função custo e o gradiente do custo.
#
# $$cost (y, target) = \frac{\sum (y - target)^{2}}{n} $$
#
# $$\frac{\partial cost}{\partial y} ( y, target ) = \frac{2 * (y - target)}{n}$$

# In[10]:

def cost(y, target):
    return np.sum((y - target) ** 2) / len(y)


def gradient_cost(y, target):
    return 2 * (y - target) / len(y)


# Definimos o método que calcula o *BPTT - Backpropagation Through Time*.

# In[11]:

def back_propagation(X, target, S, W, U):
    m = X.shape[0]
    n = X.shape[1]
    y = output_nn(S)
    grad_s = np.zeros((m, n + 1))
    grad_s[:, -1] = gradient_cost(y, target)
    grad_w = np.zeros(n)
    grad_u = 0
    for k in range(n):
        _x = X[:, n - k - 1]
        _s = S[:, n - k - 1]

        if k == 0:
            _g = 1  # O último estado não tem ativação
        else:
            _g = dg(_x * W[n - k - 1] + _s * U)

        grad_s[:, n - k - 1] = grad_s[:, n - k] * U * _g
        grad_w[n - k - 1] = np.sum(grad_s[:, n - k] * _x * _g)
        grad_u += np.sum(grad_s[:, n - k] * _s * _g)

    return grad_s, grad_w, grad_u


# ## Treinamento da Rede Neural
#
# Definimos as funções para o treinamento da rede utilizando otimização (Nonlinear Conjugate Gradient Algorithm).

# In[12]:

# Função custo com parâmetros w, u
def f(wu):
    w = wu[0:-1]
    u = wu[-1]
    y = output_nn(forward_nn(X, w, u))
    return cost(y, target)


# Gradiente da função custo em relação aos parâmetros w, u (Utilizando Backpropagation)
def fprime(wu):
    w = wu[0:-1]
    u = wu[-1]
    S = forward_nn(X, w, u)
    _, grad_w, grad_u = back_propagation(X, target, S, w, u)
    return np.append(grad_w, grad_u)


# In[13]:

def train(X, W_initial=[noise() for _ in range(n_per_sample)], U_initial=noise()):
    wu = np.append(W_initial, U_initial)
    return optimize.fmin_cg(f, wu, fprime=fprime)


# Faremos a checagem numérica para verificar se o cálculo do gradiente utilizando o BPTT está correto.
#
# Assim, esperamos que o erro seja um número próximo de zero.

# In[14]:

print("Erro: {:.2}".format(optimize.check_grad(f, fprime, [1 for _ in range(n_per_sample + 1)])))

# Com a checagem indicando que os cálculos estão corretos, faremos o treinamento da rede para saber os parâmetros W e U que minimizam nossa função de custo.

# In[15]:

t = train(X)
w_opt = t[0:-1]
u_opt = t[-1]

print("\nW: {} \nU: {}".format(w_opt, u_opt))

# ## Verificação do Resultado
#
# Com os parâmetros W e U calculados pela otimização, faremos a evolução dos dados com as sequências de treinamento e de validação (aquelas que não foram utilizadas no treinamento).

# In[16]:

y_opt = output_nn(forward_nn(X, w_opt, u_opt))

y_val = output_nn(forward_nn(X_val, w_opt, u_opt))


# Comparação da função custo (média do quadrado dos erros) e coeficiente de determinação.

# In[17]:

def r2_score(y, target):
    m = np.average(target)
    return 1 - (np.sum((y - target) ** 2) / np.sum((m - target) ** 2))


# In[18]:

print('Training')
print('  Cost: {:.3}'.format(cost(y_opt, target)))
print('  R\u00b2: {:.1f}%'.format(100 * r2_score(y_opt, target)))
print('Validation')
print('  Cost: {:.3}'.format(cost(y_val, target_val)))
print('  R\u00b2: {:.1f}%'.format(100 * r2_score(y_val, target_val)))

# Vemos que o custo final da amostra de validação foi próximo ao da amostra de treinamento. Isso sugere que a rede conseguiu aprender sem fazer *overfitting*.

# In[19]:

df = pd.DataFrame()
df['Target'] = target_val[0:100]
df['Predicted'] = y_val[0:100]
df.plot()
plt.show()


# In[20]:

def scatterPlot(actual, predicted):
    plt.scatter(actual, predicted)
    range = np.array([actual.min(), actual.max()])
    plt.plot(range, range, 'white')
    plt.plot(range, range + sigma, 'orange')
    plt.plot(range, range - sigma, 'orange')
    plt.xlabel("Target")
    plt.ylabel("Predicted")
    plt.show()


scatterPlot(target_val, y_val)

# Verificação do quadrado do erro ao longo da sequência de validação:

# In[21]:

df = pd.DataFrame()
df['Erro\u00b2'] = ((y_val - target_val) ** 2)
axes = df.plot()
plt.show()

# ## Evolução da Rede ao Longo do Tempo
#
# Verificaremos agora o poder da rede de prever os demais valores da sequência, comparando com uma seqência gerada através da regra $y_{t} = sin(y_{t-1}) + cos(y_{t-4}) + u_{t}$

# In[22]:

y_real = [noise(np.pi / 2) for _ in range(n_per_sample)]
y_rede_neural = y_real.copy()
for i in range(90):
    y_real.append(next(y_real[-1], y_real[-4]))
    l = np.asmatrix(y_rede_neural[i: i + n_per_sample])
    out = output_nn(forward_nn(l, w_opt, u_opt))  # calcula o próximo y utilizando a rede neural
    y_rede_neural.append(out[0])

# In[23]:

df = pd.DataFrame()
df['y_real'] = y_real
df['y_rede_neural'] = y_rede_neural
df.plot()
plt.show()
...