一次决策树作业


1. 作业要求

(1)实现ID3算法构建决策树模型。属性分类能力指标请使用信息增益比率(Information Gain Ratio);可以不实现剪枝操作。
(2)使用泰坦尼克号生存预测数据集进行模型训练和预测,并学习pandas模块


2. 关于决策树

决策树(decision tree)时一种基本的分类和回归方法。
决策树学习通常包含三个步骤:特征选择、决策树生成和决策树修剪。

(1)决策树的生成,往往通过计算信息增益或其它指标,从根节点开始,递归地生成树。
这相当于用信息增益或其他准则不断地选取局部最优的特征,或将训练集分割为能够基本正确分类的子集。

(2)决策树的剪枝。由于生成的决策树存在过拟合问题,需要对它进行剪枝,以简化学到的决策树。


3. 关于信息增益比

(1)样本集合$D$对特征$A$的信息增益

$$g(D, A)=H(D)-H(D|A)$$

$$H(D)=-\sum_{k=1}^{K}\frac{|C_{k}|}{|D|}log_{2}\frac{|C_{k}|}{|D|}$$

$$H(D | A)=\sum_{i=1}^{n} \frac{|D_{i}|}{|D|} H(D_{i})$$

其中,$H(D)$是数据集$D$的熵,$H(D_i)$是数据集$D_i$的熵,$H(D|A)$是数据集$D$对特征$A$的条件熵。 $D_i$是$D$中特征$A$取第$i$个值的样本子集,$C_k$是$D$中属于第$k$类的样本子集。$n$是特征$A$取 值的个数,$K$是类的个数。

(2)特征 $A$ 对训练数据集 $D$ 的信息增益比:

$$g_R(D, A)=\frac{g(D, A)}{H_A(D)}$$

其中,$g(D,A)$是信息增益,$H_A(D)$是数据数据集 $D$ 关于特征 $A$ 的值的熵。

$$H_A(D)=-\sum_{i=1}^n\frac{|D_i|}{D}log_2\frac{|D_i|}{D}$$

(3)一个计算信息增益的实例

# 最后一列是数据集D的标签
datasets = [['青年', '否', '否', '一般', '否'],
               ['青年', '否', '否', '好', '否'],
               ['青年', '是', '否', '好', '是'],
               ['青年', '是', '是', '一般', '是'],
               ['青年', '否', '否', '一般', '否'],
               ['中年', '否', '否', '一般', '否'],
               ['中年', '否', '否', '好', '否'],
               ['中年', '是', '是', '好', '是'],
               ['中年', '否', '是', '非常好', '是'],
               ['中年', '否', '是', '非常好', '是'],
               ['老年', '否', '是', '非常好', '是'],
               ['老年', '否', '是', '好', '是'],
               ['老年', '是', '否', '好', '是'],
               ['老年', '是', '否', '非常好', '是'],
               ['老年', '否', '否', '一般', '否'],
               ]
  1. 首先计算经验熵 $H(D)$
    $$H(D)=-\frac{9}{15}log_2\frac{9}{15}-\frac{6}{15}log_2\frac{6}{15}=0.971$$

  2. 然后计算各特征对数据集D的信息增益
    $$g(D,A_1)=H(D)-[\frac{5}{15}H(D_1)+\frac{5}{15}H(D_2)+\frac{5}{15}H(D_3)]=0.083$$
    $D_1$, $D_2$, $D_3$ 分别表示$A_1$(年龄)取定值时,数据集 D 的子集。

    $H(D_1)$计算方法同第一步。

    $$g(D,A_2)=0.324, g(D,A_3)=0.420, g(D,A_4)=0.363$$

  3. 根据信息增益,将$A_3$作为最优特征


4. 代码实现

4.1 数据处理

import pandas as pd

def process_data(train, test):
    train = train.drop(['PassengerId','Name','Ticket', 'Fare', 'Cabin'], axis = 1)
    print(train.columns)

    mean_age = train['Age'].median()
    train['Age'] = train['Age'].fillna(mean_age)
    train["Embarked"] = train["Embarked"].fillna('S')

    def age_fun(x):
        if x < 12: return "children"
        elif x < 18: return "teenager"
        elif x < 65: return "audlt"
        else: return "old"

    def sib_fun(x):
        if x == 0: return "no"
        elif x <= 3: return "few"
        else: return "many"

    train["Age"] = train["Age"].apply(age_fun)
    train["SibSp"] = train["SibSp"].apply(sib_fun)
    train["Parch"] = train["Parch"].apply(sib_fun)

    print("{:*^50}".format(" train "))
    for feature in train.columns:
        print(feature, ": ",  train[feature].unique())

    test = test.drop(['PassengerId','Name','Ticket', 'Fare', 'Cabin'], axis = 1)
    test["Age"] = test["Age"].fillna(mean_age)
    test["Embarked"] = test["Embarked"].fillna('S')

    test["Age"] = test["Age"].apply(age_fun)
    test["SibSp"] = test["SibSp"].apply(sib_fun)
    test["Parch"] = test["Parch"].apply(sib_fun)

    print("{:*^50}".format(" test "))
    for feature in test.columns:
        print(feature, ": ",  test[feature].unique())

    return train, test

if __name__ == "__main__":
    train_data = pd.read_csv("train.csv")
    test_data = pd.read_csv("test.csv")
    train_data, test_data = process_data(train_data, test_data)

train_data.head(10)

4.2 计算熵

from collections import Counter
from math import log
import numpy as np

def calc_ent(datasets):
    n = len(datasets)
    d = Counter([datasets[i][0] for i in range(n)])    # 计数器,参考collections模块
    return sum([-(p/n)*log(p/n,2) for p in d.values()])

calc_ent(np.array(train_data))  # 找不到train_data的话,将第一步的train_data放入全局

4.3 计算信息增益比率

from collections import Counter, defaultdict
from math import log
import numpy as np

#计算熵
def calc_ent(datasets):
    n = len(datasets)
    d = Counter([datasets[i][0] for i in range(n)]) 
    return sum([-(p/n)*log(p/n,2) for p in d.values()])

#计算经验条件熵
def cond_ent(datasets, feature_col):
    n = len(datasets)
    d = defaultdict(list)
    for i in range(n):
        feature = datasets[i][feature_col]
        d[feature].append(datasets[i])
    return sum([(len(p)/n)*calc_ent(p) for p in d.values()])

#计算数据集 D 关于特征 A 的熵
def calc_had(datasets,feature_col):
    n = len(datasets)
    d = Counter([datasets[i][feature_col] for i in range(n)])
    print(feature_col,d.items())
    return sum([-(p/n)*log(p/n,2) for p in d.values()])

"""
def info_gain(datasets, feature_col):
    return calc_ent(datasets) - cond_ent(datasets, feature_col)

#按书上例子测试,这里没问题
"""

# 信息增益比率
def info_gain_ratio(datasets, feature_col):
    ent = calc_ent(datasets)
    ce = cond_ent(datasets, feature_col)
    had = calc_had(datasets, feature_col)
    return (ent-ce)/had


for i in range(1,7):
    print(info_gain_ratio(np.array(train_data), i))

#默认第 0 列为label, 1-6为属性

4.4 建树

造轮子好麻烦…

# 没有按照老师的来
# 建好之后如果self.can_use == [] 表示出结果了,结果的 0-1 放在了 self.datasets中

#写递归或迭代程序时,主要要注意出口

class Node:
    def __init__(self, datasets, can_use, feature = None):
        self.datasets = datasets
        self.can_use = can_use
        self.feature = feature
        self.children = {}

class DTree:

    def check(self, datasets):
        n = len(datasets)
        d = Counter([datasets[i][0] for i in range(n)]) 
        if len(d)==1: return True
        else: return False


    def compu(self, datasets, col):
        ans = {}
        feature = cols[col]
        for val in dic[feature]:
            new_data = [row for row in datasets if row[col]==val]
            n = len(new_data)
            d = Counter([new_data[i][0] for i in range(n)])
            if d[1]>=d[0]:
                ans[val] = Node(1,[])               #判定为存活
            else: 
                ans[val] = Node(0,[])
        return ans


    def train(self, root):
        if len(root.can_use)==1:
            col = root.can_use[0]
            root.feature = cols[col]
            root.children = self.compu(root.datasets, col)

        else:

            #print("--------------",root.can_use)

            if self.check(root.datasets):   #全部为正例或反例
                ans = root.datasets[0][0]
                root.datasets = ans
                root.can_use = []
                return 

            idx, max_ratio = 0, 0
            for i in root.can_use:
                ratio = info_gain_ratio(root.datasets, i)
                if ratio >= max_ratio:
                    idx = i
                    max_ratio =ratio

            root.feature = cols[idx]
            #print("!!",root.feature)
            #print(root.can_use,root.feature, idx)
            for feature_val in dic[root.feature]:
                #print(feature_val)
                new_data = np.array([row for row in root.datasets if row[idx]==feature_val])
                root.children[feature_val] = Node(new_data, [i for i in root.can_use if i!=idx])
                self.train(root.children[feature_val])

    def test(self, root, tmp):
        p = root
        while p.can_use != []:
            #print(p.feature)
            #print(re_cols[p.feature])
            #print(tmp[re_cols[p.feature]])
            x = tmp[re_cols[p.feature]]
            if x not in p.children:
                return p.children
            p = p.children[x]

        return p.datasets

4.5 完整代码

from collections import Counter, defaultdict
from math import log
import numpy as np
import pandas as pd

dic = {}
cols = ['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Embarked']
re_cols = {'Sex':1, 'Pclass':0, 'Age':2, 'SibSp':3, 'Parch':4, 'Embarked':5}

def process_data(train, test):
    train = train.drop(['PassengerId','Name','Ticket', 'Fare', 'Cabin'], axis = 1)
    print(train.columns)

    mean_age = train['Age'].median()
    train['Age'] = train['Age'].fillna(mean_age)
    train["Embarked"] = train["Embarked"].fillna('S')

    def age_fun(x):
        if x < 12: return "children"
        elif x < 18: return "teenager"
        elif x < 65: return "audlt"
        else: return "old"

    def sib_fun(x):
        if x == 0: return "no"
        elif x <= 3: return "few"
        else: return "many"

    train["Age"] = train["Age"].apply(age_fun)
    train["SibSp"] = train["SibSp"].apply(sib_fun)
    train["Parch"] = train["Parch"].apply(sib_fun)

    print("{:*^50}".format(" train "))
    for feature in train.columns:
        dic[feature] = train[feature].unique()
        print(feature, ": ",  train[feature].unique())

    test = test.drop(['PassengerId','Name','Ticket', 'Fare', 'Cabin'], axis = 1)
    test["Age"] = test["Age"].fillna(mean_age)
    test["Embarked"] = test["Embarked"].fillna('S')

    test["Age"] = test["Age"].apply(age_fun)
    test["SibSp"] = test["SibSp"].apply(sib_fun)
    test["Parch"] = test["Parch"].apply(sib_fun)

    print("{:*^50}".format(" test "))
    for feature in test.columns:
        print(feature, ": ",  test[feature].unique())

    return train, test

#计算熵
def calc_ent(datasets):
    n = len(datasets)
    d = Counter([datasets[i][0] for i in range(n)]) 
    return sum([-(p/n)*log(p/n,2) for p in d.values()])

#计算经验条件熵
def cond_ent(datasets, feature_col):
    n = len(datasets)
    d = defaultdict(list)
    for i in range(n):
        feature = datasets[i][feature_col]
        d[feature].append(datasets[i])
    return sum([(len(p)/n)*calc_ent(p) for p in d.values()])

#计算数据集 D 关于特征 A 的熵
def calc_had(datasets,feature_col):
    n = len(datasets)
    d = Counter([datasets[i][feature_col] for i in range(n)])
    #print(feature_col,d.items())
    return sum([-(p/n)*log(p/n,2) for p in d.values()])

"""
def info_gain(datasets, feature_col):
    return calc_ent(datasets) - cond_ent(datasets, feature_col)
"""

# 信息增益比率
def info_gain_ratio(datasets, feature_col):
    ent = calc_ent(datasets)
    ce = cond_ent(datasets, feature_col)
    had = calc_had(datasets, feature_col)
    if had==0: return 0
    return (ent-ce)/had

class Node:
    def __init__(self, datasets, can_use, feature = None):
        self.datasets = datasets
        self.can_use = can_use
        self.feature = feature
        self.children = {}

class DTree:

    def check(self, datasets):
        n = len(datasets)
        d = Counter([datasets[i][0] for i in range(n)]) 
        if len(d)==1: return True
        else: return False


    def compu(self, datasets, col):
        ans = {}
        feature = cols[col]
        for val in dic[feature]:
            new_data = [row for row in datasets if row[col]==val]
            n = len(new_data)
            d = Counter([new_data[i][0] for i in range(n)])
            if d[1]>=d[0]:
                ans[val] = Node(1,[])               #判定为存活
            else: 
                ans[val] = Node(0,[])
        return ans


    def train(self, root):
        if len(root.can_use)==1:
            col = root.can_use[0]
            root.feature = cols[col]
            root.children = self.compu(root.datasets, col)

        else:

            #print("--------------",root.can_use)

            if self.check(root.datasets):   #全部为正例或反例
                ans = root.datasets[0][0]
                root.datasets = ans
                root.can_use = []
                return 

            idx, max_ratio = 0, 0
            for i in root.can_use:
                ratio = info_gain_ratio(root.datasets, i)
                if ratio >= max_ratio:
                    idx = i
                    max_ratio =ratio

            root.feature = cols[idx]
            #print("!!",root.feature)
            #print(root.can_use,root.feature, idx)
            for feature_val in dic[root.feature]:
                #print(feature_val)
                new_data = np.array([row for row in root.datasets if row[idx]==feature_val])
                root.children[feature_val] = Node(new_data, [i for i in root.can_use if i!=idx])
                self.train(root.children[feature_val])

    def test(self, root, tmp):
        p = root
        while p.can_use != []:
            #print(p.feature)
            #print(re_cols[p.feature])
            #print(tmp[re_cols[p.feature]])
            x = tmp[re_cols[p.feature]]
            if x not in p.children:
                return p.children
            p = p.children[x]

        return p.datasets



#if __name__ == "__main__":
train_data = pd.read_csv("train.csv")
test_data = pd.read_csv("test.csv")
train_data, test_data = process_data(train_data, test_data)

train_data = np.array(train_data)
root = Node(train_data, [1,2,3,4,5,6])

print("{:*^50}".format(" predict the test "))
dt = DTree()
dt.train(root)

test_data = np.array(test_data)
for data in test_data:
    print(data, dt.test(root, data))

4.6 优化

为了防止过拟合(决策树的分支过多,导致实际准确率下降),可设置一个阈值使建树过程提早结束,更改check函数即可。

5. 决策树的可视化

#测试代码
#不适用于以上的树的结构
from graphviz import Digraph


def plot_model(tree, name):
    g = Digraph("G", filename=name, format='png', strict=False)
    first_label = list(tree.keys())[0]
    g.node("0", first_label)
    _sub_plot(g, tree, "0")
    g.view()


root = "0"


def _sub_plot(g, tree, inc):
    global root

    first_label = list(tree.keys())[0]
    ts = tree[first_label]
    for i in ts.keys():
        if isinstance(tree[first_label][i], dict):
            root = str(int(root) + 1)
            g.node(root, list(tree[first_label][i].keys())[0])
            g.edge(inc, root, str(i))
            _sub_plot(g, tree[first_label][i], root)
        else:
            root = str(int(root) + 1)
            g.node(root, tree[first_label][i])
            g.edge(inc, root, str(i))


d1 = {"no surfacing": {0: "no", 1: {"flippers": {0: "no", 1: "yes"}}}}

d2 = {'tearRate': {'reduced': 'no lenses', 'normal': {'astigmatic': {'yes': {
    'prescript': {'myope': 'hard', 'hyper': {'age': {'young': 'hard', 'presbyopic': 'no lenses', 'pre': 'no lenses'}}}},
    'no': {'age': {'young': 'soft', 'presbyopic': {
        'prescript': {'myope': 'no lenses',
                      'hyper': 'soft'}},
                   'pre': 'soft'}}}}}}

plot_model(d1, "hello")
plot_model(d2, "hello2")

测试代码会报错:GraphViz's executables not found

然后去安装软件,http://www.graphviz.org/download/

安装时勾选:加入到所有用户的系统路径。

重新打开 jupyter 并运行测试代码
成功后,会用图片管理器打开决策树的可视化图。如图

# 2021-04-24更新,可视化代码,适用,将其运行即可

from graphviz import Digraph

def plot_model(root, name):
    g = Digraph("G", filename=name, format='png', strict=False)
    first_label = root.feature
    g.node("0", first_label)
    _sub_plot(g, root, "0")
    g.view()

_root = "0"

def _sub_plot(g, root, inc):
    global _root

    ts = root.children
    for i in ts.keys():
        if ts[i].can_use != []:
            _root = str(int(_root) + 1)
            g.node(_root, ts[i].feature)
            g.edge(inc, _root, str(i))
            _sub_plot(g, ts[i], _root)
        else:
            _root = str(int(_root) + 1)
            g.node(_root, str(ts[i].datasets))
            g.edge(inc, _root, str(i))

plot_model(root, "hello")

文章作者: ╯晓~
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 ╯晓~ !
评论
  目录