# -*- coding: utf-8 -*-
# CÓDIGO COMPLETO #
"""
Created on Sat Dec 12 10:03:59 2020
@author: lucas
"""
from __future__ import print_function
import pandas as pd
import math
from matplotlib import pyplot as plt
###### IMPORTANDO OS DADOS ######
data=pd.read_csv('Cleveland_Heart_Disease_Data.csv',sep=";")
header = []
for col in data:
header.append(col)
data_cleveland = []
for ind in data.index:
new_list = []
for i in data.loc[ind]:
new_list.append(i)
data_cleveland.append(new_list)
###### FUNÇÕES AUXILIARES ######
def unique_vals(rows, col):
"""Encontra os valores únicos em uma coluna de dados"""
return set([row[col] for row in rows])
unique_vals(data_cleveland,5)
def class_counts(rows):
"""Armazena a contagem de observações de cada classe"""
counts = {}
for row in rows:
label = row[-1]
if label not in counts:
counts[label] = 0
counts[label] += 1
return counts
class_counts(data_cleveland)
def is_numeric(value):
"""Testa se um valor é numérico"""
return isinstance(value, int) or isinstance(value, float)
is_numeric(12)
def partition(rows, question):
"""
Particiona os dados.
Para cada linha dos dados, confere a resposta para a pergunta.
Se True, aloca essa linha para a 'true_rows'
Se False, aloca essa linha para a 'false_rows'
"""
true_rows, false_rows = [], []
for row in rows:
if question.match(row):
true_rows.append(row)
else:
false_rows.append(row)
return true_rows, false_rows
def gini(rows):
"""
Calcula o Gini de um conjunto de dados
"""
counts = class_counts(rows)
impurity = 1
for lbl in counts:
prob_of_lbl = counts[lbl] / float(len(rows))
impurity -= prob_of_lbl**2
return impurity
def entropy(rows):
"""
Calcula a função entropia de um conjunto de dados
"""
impurity = float()
counts = class_counts(rows)
for lbl in counts:
prob_of_lbl = counts[lbl] / float(len(rows))
impurity -= (prob_of_lbl)*(math.log(prob_of_lbl))
return impurity
def info_gain(left, right, current_uncertainty):
"""
Ganho de informação
É a impureza do nó pai menos a média ponderada das impurezas dos nós filhos
"""
p = float(len(left)) / (len(left) + len(right))
#return current_uncertainty - p * gini(left) - (1 - p) * gini(right)
return current_uncertainty - (p * entropy(left)) - ((1 - p) * entropy(right))
def find_best_split(rows):
"""
Encontra a pergunta que melhor particiona um conjunto de dados
Passa por todas as variáveis (colunas) e por todos os valores de partição possíveis
"""
best_gain = 0
best_question = None
#current_uncertainty = gini(rows)
current_uncertainty = entropy(rows)
n_features = len(rows[0]) - 1
for col in range(n_features):
values = set([row[col] for row in rows])
for val in values:
question = Question(col, val)
true_rows, false_rows = partition(rows, question)
if len(true_rows) == 0 or len(false_rows) == 0:
continue
gain = info_gain(true_rows, false_rows, current_uncertainty)
if gain >= best_gain:
best_gain, best_question = gain, question
return best_gain, best_question
###### CLASSES ######
class Question:
""" Uma pergunta é usada para particionar os dados.
'column' é o número da coluna da variável de interesse
'value' é o valor de partição que se pretende utilizar
'match' é um método que aplica a pergunta a uma linha qualquer de dados
e retorna True se a resposta for afirmativa e False caso contrário"""
def __init__(self, column, value):
self.column = column
self.value = value
def match(self, example):
val = example[self.column]
if is_numeric(val):
return val >= self.value
else:
return val == self.value
def __repr__(self):
condition = "=="
if is_numeric(self.value):
condition = ">="
return "Is %s %s %s?" % (
header[self.column], condition, str(self.value))
class Leaf_Node:
""" O nó folha classifica os dados. É um ponto terminal da árvore.
'predictions' é um dicionário que mostra a contagem de classes nessa folha.
O mais comum é classificar uma nova observação a partir da classe da maioria"""
def __init__(self, rows,level):
self.predictions = class_counts(rows)
self.error = min(self.predictions.values())/number_of_obs if len(self.predictions.values()) > 1 else 0
self.level = level
#self.T=0
class Decision_Node:
""" Um nó de decisão basicamente faz uma pergunta e armazena os nós filhos
além de outros parâmetros úteis.
'question' é um objeto da classe Question
'true_branch' é o particionamento de dados que responde True à pergunta
'false_branch' é o particionamento de dados que responde False à pergunta
'rows' é o conjunto de dados que chegou até esse nó (esses dados serão particionados pela pergunta)
'error' é o erro desse nó, calculado como [n de classificações erradas/n total de observações]
'level' é o nível da árvore, sendo 1 o root node
'sub_tree_error' é o erro combinado das folhas da sub-árvore formada a partir desse nó de decisão
'T' é o número de folhas (nós terminais) da sub-árvore formada a partir desse nó de decisão
'pruning' é a cost-complexity measure que será utilizada na podagem"""
def __init__(self,
question,
true_branch,
false_branch,
rows,
level,
sub_tree_error,
T,
cost_complexity_pruning,
):
self.question = question
self.true_branch = true_branch
self.false_branch = false_branch
self.predictions = class_counts(rows)
self.error = min(self.predictions.values())/number_of_obs if len(self.predictions.values()) > 1 else 0
self.level = level
self.sub_tree_error = sub_tree_error
self.T = T
self.cost_complexity_pruning = cost_complexity_pruning
self.rows = rows
###### FUNÇÕES PARA CONSTRUÇÃO DA ÁRVORE #######
def build_tree(rows,level=0,sub_tree_error=0,T=0):
"""
Constroi a árvore por meio de recursão
Recebe uma série de dados e o nível da árvore em que se encontra
Se a melhor partição não traz ganhos -> chega-se a uma folha
Caso contrário, efetua a partição, constrói os ramos da esquerda e da direita
A cada recursão:
i) guarda o erro da sub-árvore, que é a soma dos erros das folhas que estão abaixo;
ii) conta o número de folhas abaixo daquele nó
Essas duas informações são finalmente usadas para construir a cost_complexity_measure
"""
global alpha
gain, question = find_best_split(rows)
if gain == 0:
return Leaf_Node(rows, level+1)
true_rows, false_rows = partition(rows, question)
true_branch = build_tree(true_rows,level+1)
false_branch = build_tree(false_rows,level+1)
if isinstance(true_branch, Leaf_Node):
T += 1
else:
T += true_branch.T
if isinstance(false_branch, Leaf_Node):
T += 1
else:
T += false_branch.T
sub_tree_error += true_branch.error
sub_tree_error += false_branch.error
cost_complexity_pruning = sub_tree_error + alpha*T
return Decision_Node(question, true_branch, false_branch,rows, level+1, sub_tree_error,T,cost_complexity_pruning)
def pruning_tree(node,n):
"""
Podagem da árvore
Faz uma recursão parecida ao build_tree
Se cost-complexity measure > erro do nó-pai, o nó-pai passa a ser uma folha
O valor alpha de penalização é definido exogenamente
"""
if isinstance(node, Leaf_Node):
return node
if isinstance(node.true_branch, Leaf_Node) and isinstance(node.false_branch, Leaf_Node):
if node.cost_complexity_pruning >= node.error:
node = Leaf_Node(node.rows,node.level)
return node
elif isinstance(node.true_branch, Leaf_Node) or isinstance(node.false_branch, Leaf_Node):
if node.cost_complexity_pruning >= node.error:
node = Leaf_Node(node.rows,node.level)
return node
true_branch = pruning_tree(node.true_branch,n)
false_branch = pruning_tree(node.false_branch,n)
""" A partir daqui, atualizamos os parâmetros após cada poda
pois o erro da sub-árvore e o número de nós-folha mudam"""
node.T=0
node.sub_tree_error=0
if isinstance(true_branch, Leaf_Node):
node.T += 1
node.sub_tree_error += true_branch.error
else:
node.T += true_branch.T
node.sub_tree_error += true_branch.sub_tree_error
if isinstance(false_branch, Leaf_Node):
node.T += 1
node.sub_tree_error += false_branch.error
else:
node.T += false_branch.T
node.sub_tree_error += false_branch.sub_tree_error
node.cost_complexity_pruning = node.sub_tree_error + alpha*node.T
new_node = Decision_Node(node.question, true_branch,false_branch,node.rows,node.level,node.sub_tree_error,node.T,node.cost_complexity_pruning)
"""O processo é repetido n vezes """
if n==0:
return new_node
return pruning_tree(new_node,n-1)
def max_depth(node,Tmax):
"""
Permitindo que a árvore cresça somente até um limite Tmax
"""
if isinstance(node, Leaf_Node):
return node
if node.level >= Tmax:
node = Leaf_Node(node.rows,node.level)
return node
true_branch = max_depth(node.true_branch, Tmax)
false_branch = max_depth(node.false_branch, Tmax)
return Decision_Node(node.question, true_branch, false_branch,node.rows, node.level, node.sub_tree_error,node.T,node.cost_complexity_pruning)
def print_tree(node, spacing=""):
"""
Desenha a árvore no console do Python
"""
if isinstance(node, Leaf_Node):
print (spacing + "Leaf Node:", "Predict", node.predictions, "Error", node.error,"Level", node.level)
return
print (spacing +
"Decision Node:", node.predictions,
"Error:", round(node.error,5),
"Level:", node.level,
"sub-tree error:", round(node.sub_tree_error,5),
"Terminal nodes beyound:", node.T,
"Cost-complexity pruning measure:",round(node.cost_complexity_pruning,5))
print (spacing + str(node.question))
print (spacing + '--> True:')
print_tree(node.true_branch, spacing + " ")
print (spacing + '--> False:')
print_tree(node.false_branch, spacing + " ")
##### EXECUTANDO O CÓDIGO #####
# Observações
# Código construído em Python puro, sem auxílio de ferramentas de Machine Learning.
# Interessante para entender os passos algorítmicos.
# Útil em exemplos simples. Em aplicações maiores, utilizar ferramentas mais sofisticadas.
if __name__ == '__main__':
# Vamos checar o tipo de cada variável.
for i in data_cleveland[0]:
if is_numeric(i):
print("Numeric")
else:
print("Categorical")
# Construindo a árvore completa (cheia de overfitting):
if __name__ == '__main__':
alpha = 0
sub_tree_error=0
T=0
number_of_obs = len(data_cleveland)
my_tree = build_tree(data_cleveland)
print('')
print('####### FULL TREE #######')
print('')
print_tree(my_tree)
# Segundo Izenman, existem três formas de lidar com o overfitting em decisions trees.
# As duas primeiras são medidas meio grosseiras e pouco técnicas:
# consistem em delimitar o número de observações em um leaf node a um piso Mmin ou delimitar a profundidade da árvore a um nível Tmax.
# A mais recomendada consiste em podar a árvore.
# Isto é, primeiramente construímos a árvore inteira, até que cada folha represente uma observação (ou mais de uma observação, caso nenhuma pergunta consiga separá-las).
# Essa árvore é excelente para o conjunto de treino, mas uma catástrofe para o de testes. A partir das folhas, subimos pela árvore sempre comparando
# uma medida de custo-complexidade da sub-árvore com o erro do nó de decisão que é o root node dessa sub-árvore. Se o erro do root node for
# menor que a medida de custo-complexidade, vale a pena transformar esse nó em uma folha (isto é, podar a árvore).
# A medida de custo-complexidade penaliza pela quantidade de nós terminais.
if __name__ == '__main__':
alpha = 0
sub_tree_error=0
T=0
my_tree = build_tree(data_cleveland)
pruned_tree = pruning_tree(my_tree,3)
print('')
print('####### PRUNED TREE #######')
print('')
print_tree(pruned_tree)
# Utilizando os métodos ad-hoc para controle do overfitting:
# O primeiro é transformar o nó em folha sempre que o ganho for menor que dado valor escolhido.
# Editar linha 207 [if gain == 0:]
# O segundo consiste em limitar os níveis da árvore.
alpha = 0
trained_tree_error = 0
T = 0
number_of_obs = len(data_cleveland)
my_tree = build_tree(data_cleveland)
choose_max_depth = 8
final_tree=max_depth(my_tree, choose_max_depth)
print('')
print('')
print('#### MAX DEPTH TREE = %s ####'%(choose_max_depth))
print('')
print('')
print_tree(final_tree)
# Encontrando o melhor split para cada variável a partir do root node:
if __name__ == '__main__':
best_choices = {}
n_features = len(data_cleveland[0]) - 1
current_uncertainty = entropy(data_cleveland)
best_question=None
for col in range(n_features):
best_gain=0
best_partition=None
values = unique_vals(data_cleveland, col)
for val in values:
true_rows, false_rows = partition(data_cleveland,Question(col,val))
gain = info_gain(true_rows,false_rows,current_uncertainty)
#print(Question(col,val),"has gain",gain)
if gain >= best_gain:
best_gain = gain
choice = ("Best split is at:", val, "with gain:",gain)
best_choices[header[col]] = choice
best_choices
# Figura 9.4:
if __name__ == '__main__':
current_uncertainty = entropy(data_cleveland)
values = unique_vals(data_cleveland, 0)
x_axis = []
y_axis = []
tau_l = []
tau_r = []
for val in values:
x_axis.append(val)
true_rows, false_rows = partition(data_cleveland,Question(0,val))
tau_l.append(entropy(false_rows))
tau_r.append(entropy(true_rows))
gain = info_gain(true_rows,false_rows,current_uncertainty)
y_axis.append(gain)
plt.plot(x_axis[3:],tau_l[3:])
plt.plot(x_axis[:-1],tau_r[:-1])
plt.xlabel("Age at split")
plt.ylabel("i(tau_L),i(tau_R)")
plt.savefig("Fig9.3.1.png")
plt.plot(x_axis,y_axis)
plt.xlabel("Age at split")
plt.ylabel("Goodness of Split")
plt.savefig("Fig9.3.2.png")