前言

  最近在上机器学习基础,作业要求手写一个决策树代码,一开始以为很难,所以一直没动工,但是当自己亲自去实践,学习一下别人的思路,然后自己动手敲代码,发现其实也不难。PPT转Python完全可行!关键是python要熟悉,敲代码要仔细,每写完一个函数一定要验证一下,不要都写完了再去验证,debug其实很难的

参考链接

  上面这个博客在决策树理论部分写得还行(当然如果时间充足建议还是去看西瓜书),所以这里就不再赘述决策树的理论了。但是代码部分感觉有点乱(当然不是分文件写这样基本的操作。。。),而且好像用的还是python2?但是代码思路还是非常不错的,是典型的面向过程编程,利用递归的思路,通过不断修剪数据集并传递给递归的函数,思路逻辑非常清晰,本文也主要是借鉴上面那篇博客的思路。

上代码!

# -*- coding: utf-8 -*-     支持文件中出现中文字符
#########################################################################
"""
Created on 2022.10.11
@author: Zoey
@version: Python 3.10
@简写说明
  + feat: feature
  + calc: calculate
  + Vec: Vector
  + chd: child
"""
#########################################################################
import base #自己写的库,不要在意,没啥用
from LSM import xlsRead #也是自己写的库
import numpy as np
from sklearn import tree
from scipy import signal
import math
import matplotlib.pylab as plt
#########################################################################
def labelGet2(s:list|np.ndarray)->np.ndarray:
    ''' @func: 给数据集打标签
        @para s: 划分好的列表
        @return: 一个标签list, 长度等于s的行数
    '''
    n = len(s) #划分子列表数
    labels = np.zeros(n,dtype="int") #给各个子列表分配的标签
    for i in range(n): #按均值来分配标签
        if(np.mean(s[i]) >= 0.004): labels[i] = 1
        elif(np.mean(s[i]) <= -0.006): labels[i] = -1
    return labels

def DataSetExtract(dataset:np.ndarray, featIndex, featValue) -> np.ndarray:
    ''' @func: 在给定数据集(特征+标签)中提取出某个特征为某个特定值的子数据集
        @para  dataset: 数据集, 行为样本数, 列包含特征和标签
                featIndex: 目标特征在数据集中的索引
                featValue: 目标特征的一个取值
        @return: 划分后的子数据集
    '''
    SampleNum = len(dataset) #样本数
    featList = dataset[:, featIndex] #dataset中该列特征的所有取值
    col = [] #符合特征值的样本的索引
    for i in range(SampleNum):
        if(featList[i] == featValue): col.append(i)
    subDataset = np.delete(dataset[col],featIndex,1) #取对应行, 并删除特征列
    return subDataset

def FreqCalc(A:np.ndarray, index) -> dict:
    ''' @func: 获取矩阵A的某一列(属性or标签)中各个取值的频次
        @para A: 对象矩阵
              index: 要取的列的索引
        @return: a dict
    '''
    Value = A[:, index]

    unique_value,occurrence = np.unique(Value, return_counts=True)
    Dict = dict(zip(unique_value,occurrence))

    # uniqueValue = set(Value) #这一列的不同取值
    # uniqueNum = len(uniqueValue) #取值有多少种情况
    # Dict = dict(zip(uniqueValue,np.zeros(uniqueNum, dtype=int))) #构建一个字典, value(标签对应的样本个数)初值全为0
    # for item in Value:
    # 	Dict[item] += 1
    
    return Dict

def InfoEntropy(dataset:np.ndarray) -> float:
    ''' @func: 计算某个数据集的标签信息熵
        @para dataset: 数据集, 行为样本数, 列为特征+标签
        @return: 数据集的信息熵
    '''
    labelDict = FreqCalc(dataset,-1) #取最后一列
    EN = 0.0
    for item in labelDict: #遍历标签字典——自动排除了样本数为零的标签
        tmp = float(labelDict[item]) / len(dataset)
        EN += - tmp * math.log2(tmp)
    return EN

def GiniRatio(dataset:np.ndarray) -> float:
    '''	@func: 计算基尼指数
        @para  dataset:数据集
        @return: Gini Ratio
    '''
    labelDict = FreqCalc(dataset,-1) #取最后一列
    Gini = 0.0
    for item in labelDict: #遍历标签字典——自动排除了样本数为零的标签
        tmp = float(labelDict[item]) / len(dataset)
        Gini += tmp**2
    return 1 - Gini

def EntropyGain(dataset:np.ndarray, featIndex) -> float:
    ''' @func: 计算按某个特征分类得到的熵增益
        @para  dataset: 数据集, 行为样本数, 列为特征+标签
                featIndex: 目标特征在数据集中的索引
        @return: 熵增益, 浮点类型
    '''
    EN0 = InfoEntropy(dataset) #分类前的数据集
    featDict = FreqCalc(dataset, featIndex) #特征取值字典
    EN1 = 0.0
    for item in featDict:
        subDataset = DataSetExtract(dataset,featIndex,item)
        EN1 += InfoEntropy(subDataset) * (featDict[item] / len(dataset)) #各个子数据集所占比例乘以各自的熵增益
    return EN0 - EN1

def EntropyGainRate(dataset:np.ndarray, featIndex) -> float:
    '''	@func: 计算特征的熵增益率
        @para  dataset: 数据集
               featIndex: 特征索引
        @return: 熵增益率
    '''
    ENG = EntropyGain(dataset,featIndex) #熵增益
    dic = FreqCalc(dataset,featIndex) #该特征对应的各个取值形成的字典
    num_sum = sum(list(dic.values()))
    Q = 0.0
    for item in dic:
        tmp = dic[item] / num_sum
        Q += - tmp * math.log2(tmp)
    # print(ENG/Q if Q else ENG)
    return ENG/Q if Q else ENG #如果tmp为1(特征只有一个取值),则Q为0,要返回原熵增益

def GetBestFeat(dataset:np.ndarray):
    ''' @func: 遍历当前数据集下各个特征, 然后取熵增益最大的特征
        @para  dataset:当前分类下的数据集
        @return: the index of the beat feature 
    '''
    featNum = len(dataset[0]) #当前数据集下剩余特征的个数+1
    MAX_EN = 0.0; INDEX = 0 #最大熵增益的特征及其对应的索引
    for i in range(featNum-1):
        EN_tmp = EntropyGain(dataset,i) #利用熵增益来选择特征
        # EN_tmp = EntropyGainRate(dataset,i) #利用熵增益率来选择特征
        if(EN_tmp > MAX_EN): MAX_EN = EN_tmp ; INDEX = i
    # print(MAX_EN)
    if(int(MAX_EN * 100) == 0): return -1 #剩下的特征熵增益基本接近零了, 分类就可以结束了, 通过调整100来调整精度
    else: return INDEX

def MajorFeature(labelList:np.ndarray):
    ''' @func: 当特征列都被删完了只剩下标签列, 但是仍然有不同的标签, 
               此时只能取样本数较多的标签作为叶子节点
        @para labelList: 最后的标签列表
        @return: A label as leaf node
    '''
    labelDict = FreqCalc(labelList,0)
    sortedDict = sorted(labelDict.items(), key=lambda d:d[1]) #按value排序,升序
    # import operator #利用operator库中的函数
    # sortedDict = sorted(labelDict,key=operator.itemgetter(1),reverse=True)
    return sortedDict[-1][0] #返回第一个, 样本数最多的一个标签


def FeatDiscrete(srcDataSet:np.ndarray, colIndex = ..., classNum:int = 2) -> np.ndarray:
    ''' @func: 采用C4.5中处理连续值的方式, 对连续值进行离散【如果数据集本身是离散的则不需要管】
        @para srcDataSet: 原始数据集, 特征矩阵
              colIndex: 需要处理的特征列的索引, 是一个索引列表, 默认取所有特征列(除标签列)
              classNum: 分类数目, 默认二分类
        @return: dstDataSet 
    '''
    def MidNumList(srcList:np.ndarray) -> np.ndarray:
        ''' @func: 提取一个列表的相邻元素中位数列表, n个数可以得到n-1个中位数
            @para srcList: 原始数据列表
            @return: dstList
        '''
        sortedList = np.sort(srcList) #升序排列
        dstList = np.array([])
        for i in range(len(sortedList)-1):
            dstList = np.append(dstList, np.median(sortedList[i:i+2]))
        return dstList

    def GetCombine(MidList:list|np.ndarray, Num = 2) -> list:
        ''' @func: 根据中位数列表取几个分界点
            @para MidList: 中位数列表
                  Num: 取点的个数
            @return: 取的关键点形成的列表, 是一个二维列表
        '''
        import itertools as it #求排列组合
        CombineList = [e for e in it.combinations(MidList, Num)]
        return CombineList

    def discrete(srcDataVec:list|np.ndarray, CombineList:list):
        ''' @func: 根据分界点列表, 分配离散值, 从0开始
            @para srcDataVec: 原始特征向量, 全部是连续值 
                  CombineList: 分界点列表中的一行, 即一种情况
            @return: 特征的离散值, 长度与原数据长度相同
        '''
        length = len(srcDataVec)
        discreteOut = - np.ones(length) #初始值全为-1
        for i in range(length):
            for j in range(len(CombineList)):
                if(srcDataVec[i] <= CombineList[j]): discreteOut[i] = j
            if(discreteOut[i] == -1): discreteOut[i] = len(CombineList) #大于所有值的情况
        return discreteOut

    if(colIndex == ...): colIndex = range(srcDataSet[0].size - 1) #特征数, 即列数
    for i in colIndex: #遍历选择的每一列
        featList = srcDataSet[:,i].copy() #取某一列数值 记得要复制一份
        midlist = MidNumList(featList) #中位数列表
        CombineList = GetCombine(midlist,classNum-1) #分界点列表
        # print(CombineList)
        # print(len(CombineList))
        EN_GAIN_MAX = 0.0 ; DEVIDE_LIST = [] #最大熵增益及对应的分界点
        for item in CombineList:
            srcDataSet[:,i] = discrete(featList,item) #注意copy的问题
            tmp = EntropyGain(srcDataSet,i) #某一种情况的熵增益
            if(tmp>EN_GAIN_MAX): EN_GAIN_MAX = tmp; DEVIDE_LIST = item			
        srcDataSet[:,i] = discrete(featList, DEVIDE_LIST) #确定分离点
    return srcDataSet


def CreateTree(dataset:np.ndarray, featList) -> dict:
    ''' @func: 根据输入的特征矩阵和标签向量构造决策树【核心函数】
        @para  dataset: 传入的样本数据, 即特征+标签
                featList: 特征列表, 存储特征的名称
        @return: A tree dict
    '''
    labelList = dataset[:,-1].copy() #取最后一列标签
    if(len(dataset[0]) == 1): return MajorFeature(dataset) #特征都删除了, 只剩下标签, 取值最多的种类
    if(np.unique(labelList).size == 1): return labelList[0] #分到叶子节点, 返回标签值
    BestFeatIndex = GetBestFeat(dataset)
    if(BestFeatIndex == -1): return MajorFeature(dataset) #特征增益都为0了, 没必要再分下去了
    BestFeat = featList[BestFeatIndex]
    Tree = {BestFeat:{}} #用字典来构造一个树, 其基本单元的结构为{feat:{featValue1:{chdTree1},featValue2:{chdTree2}}}
    BestFeatValue = np.unique(dataset[:,BestFeatIndex])
    for item in BestFeatValue: #遍历特征的每个取值
        subDataSet = DataSetExtract(dataset,BestFeatIndex,item)
        Tree[BestFeat][item] = CreateTree(subDataSet, np.delete(featList,BestFeatIndex,0))
    return Tree

def Predict(tree:dict, testDataset:np.ndarray, featList:list):
    ''' @func: 根据得到的树来预测某个样本的标签取值
        @para tree: 已经创建好的一个树, 是一个字典
              testDataset: 测试集
              featList: 特征列表
        @return: 预测出的标签取值
    '''
    if(testDataset.ndim == 1): testDataset = testDataset.reshape(1,-1) #只有一行数据, 就手动套一层括号
    sampleNum, _ = testDataset.shape #确定样本个数(行)
    labelPre = np.zeros(sampleNum,testDataset.dtype) #预测的标签值, 一个列表,考虑到
    for i in range(sampleNum): #遍历每一条数据
        testVec = testDataset[i] #取测试集中的这一行
        firstFeat = list(tree.keys())[0] #需要先转换为一个list
        featIndex = featList.index(firstFeat) #特征在特征表中的索引
        actualValue = testVec[featIndex] #输入的样本数据中该位置特征的实际取值
        subTree = tree[firstFeat][actualValue] #第二级的子树
        if isinstance(subTree, dict): #如果子树还是一个字典, 则需要继续剥
            labelPre[i] = Predict(subTree, testVec, featList)[0] #注意返回的也是一个列表,所以需要去掉括号
        else:  #子树剥到头, 变成叶子节点, 则其值即为标签取值
            labelPre[i] = subTree
    return labelPre

def ACC_Calc(src:np.ndarray|list, pre:np.ndarray|list) -> float:
    '''	@func: 计算分类的准确率
        @para src:原标签列表
              pre:预测的标签
        @return: 准确率
    '''
    src = np.array(src); pre = np.array(pre)
    return np.sum(src == pre) / src.size

def TreePlot(tree:dict, layer = 0) -> None:
    '''	@func: 以更易读的方式展示字典形式的树
        @para tree: 树字典
        @return: None
    '''
    firstFeat = list(tree.keys())[0] #需要先转换为一个list
    secondTree = tree[firstFeat] #二级树
    valuelist = list(secondTree.keys()) #第一级特征的取值
    for item in valuelist:
        chdtree = secondTree[item]
        if isinstance(chdtree, dict):
            print('|\t'*layer, firstFeat, item, ' : ')
            TreePlot(chdtree, layer+1)
        else: print('|\t'*layer, firstFeat, item, ' : ', chdtree)

# USE_MY_TREE = 1 # 决定是否使用自己写的决策树代码
USE_MY_TREE = 0

# 主函数
def main():
    print("Please Wait for a few seconds...\n")
    for e in range(2): #循环次数,不需要可以去掉
        # 1. 数据预处理, 得到标签
		# 不需要关注

        # 2. 读取特征表, 并加上标签构成数据集
		# 不需要关注,只需要知道,后面用的dataset是特征矩阵加上一列标签值这样的形式就行,最好是numpy类型,注意是离散值哦~
		# cols是特征的名称列表,相当于特征题头

        # 3. 训练测试集划分
        train = 0.8; test = 1-train #训练与测试集的分配
        trainSet = dataset[:int(train*len(dataset))] #训练集
        testSet = dataset[int(train*len(dataset)):] #测试集

        # 4. 调用自己写的树进行训练并预测
        if(USE_MY_TREE):
            treedic = CreateTree(trainSet,cols) #构建树,生成树字典
            print('Tree Structure of',e,':'); TreePlot(treedic) #画出树字典
            labelOUT = Predict(treedic,testSet,cols) #根据树字典测试测试集
            print("Predicted LabelList:\n",labelOUT)
            print("Source LabelList:\n",testSet[:,-1])
            print("Accuracy: ",ACC_Calc(testSet[:,-1], labelOUT),'\n')
        
        # 5. 调用第三方库进行训练并预测
        else: 
            clf = tree.DecisionTreeClassifier()
            clf.fit(trainSet[:,:-1],trainSet[:,-1]) #拆分特征和标签
            tree.plot_tree(clf)
            labelOUT = clf.predict(testSet[:,:-1])
            print('Predicted LabelList:\n',labelOUT)
            print('Source LabelList:\n',testSet[:,-1])
            print("Accuracy: ",ACC_Calc(testSet[:,-1], labelOUT),'\n')

if __name__ == "__main__":
    main()
    input("按下任意键继续...")

总结

  • 本文所用代码为基于过程编程,通过不断修剪数据集并传递给下一级递归函数,并不断给决策树字典添加内容,从而完成决策树的构建。这样的好处在于逻辑清晰明了,但不如类的方式紧凑。用类的形式来完成最主要的难点在于节点数据结构的创建,这里提供一种思路:将节点定义为一个子类,其包含了①符合该特征的当前数据集,②剩余属性列表,③标签取值,④下一级节点(类似于一个指针)
  • 此外,这里对连续数据的处理采用的是二分类,但本文代码完全可以实现多分类来离散特征,只需要修改离散函数即可。这里采用的是排列组合的方式生成分界点,然后遍历所有情况。因此,当样本数越多(或连续值个数越多),则代码运行时间成本将以指数增长,比如300个连续值,如果取一个分界点,则有299个取值,如果取2个分界点,则有44551个取值,如果取3个分界点,则有4410549个取值。因此需要根据该特征的重要性合理选择。但是需要注意,增加分界点的意义只在于扩大搜索范围,但并不代表一定能找到一个相比于二分类更优的分界点,也有可能得到的仍然是二分类数据。
  • 对于最优特征的选择,本文代码也添加了根据熵增益率基尼指数来选择的代码,合理选择。
  • 仍有未尽之处,可以在评论区指出,但希望是完全了解决策树且具有基本python常识提出的问题,否则,还请先把代码看完。
11-13 14:15