机器学习实验import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import math
import time
class Node:
"""
决策树节点类
"""
def init(self, is_leaf=False, label=None, feature=None, threshold=None, children=None):
self.is_leaf = is_leaf # 是否为叶节点
self.label = label # 叶节点的类别标签
self.feature = feature # 用于分割的特征索引
self.threshold = threshold # 连续特征的分割阈值
self.children = children if children else {} # 子节点字典
class C45DecisionTree:
"""
C4.5决策树算法实现,包含预剪枝和后剪枝功能
"""
def init(self, max_depth=None, min_samples_split=2, min_samples_leaf=1,
prune_method=None, confidence_threshold=0.05, use_pruning=True):
"""
初始化C4.5决策树
参数:- max_depth: 树的最大深度,用于预剪枝- min_samples_split: 节点分裂所需的最小样本数,用于预剪枝- min_samples_leaf: 叶节点所需的最小样本数,用于预剪枝- prune_method: 剪枝方法,'pre'为预剪枝,'post'为后剪枝,None为不剪枝- confidence_threshold: 用于后剪枝的置信度阈值- use_pruning: 是否使用剪枝"""self.root = Noneself.max_depth = max_depthself.min_samples_split = min_samples_splitself.min_samples_leaf = min_samples_leafself.prune_method = prune_methodself.confidence_threshold = confidence_thresholdself.use_pruning = use_pruningdef entropy(self, y):"""计算熵"""if len(y) == 0:return 0# 计算各类别概率_, counts = np.unique(y, return_counts=True)probabilities = counts / len(y)# 计算熵entropy_value = -np.sum(probabilities * np.log2(probabilities))return entropy_valuedef information_gain_ratio(self, X, y, feature_idx, threshold=None):"""计算信息增益比"""# 计算原始熵original_entropy = self.entropy(y)# 如果是连续特征,需要根据阈值进行划分if threshold is not None:left_mask = X[:, feature_idx] <= thresholdright_mask = X[:, feature_idx] > thresholdleft_y, right_y = y[left_mask], y[right_mask]if len(left_y) == 0 or len(right_y) == 0:return -np.inf# 计算条件熵left_entropy = self.entropy(left_y)right_entropy = self.entropy(right_y)weight_left = len(left_y) / len(y)weight_right = len(right_y) / len(y)conditional_entropy = weight_left * left_entropy + weight_right * right_entropy# 计算信息增益information_gain = original_entropy - conditional_entropy# 计算分裂信息split_info = -weight_left * np.log2(weight_left) - weight_right * np.log2(weight_right)# 计算信息增益比if split_info == 0:return 0gain_ratio = information_gain / split_inforeturn gain_ratioelse: # 离散特征unique_values = np.unique(X[:, feature_idx])weighted_entropy = 0split_info = 0for value in unique_values:mask = X[:, feature_idx] == valuesubset_y = y[mask]weight = len(subset_y) / len(y)weighted_entropy += weight * self.entropy(subset_y)split_info -= weight * np.log2(weight)information_gain = original_entropy - weighted_entropyif split_info == 0:return 0gain_ratio = information_gain / split_inforeturn gain_ratiodef find_best_split(self, X, y):"""寻找最佳分裂特征和阈值"""best_feature = Nonebest_threshold = Nonebest_gain_ratio = -np.infn_features = X.shape[1]for feature_idx in range(n_features):# 对连续特征寻找最佳阈值unique_values = np.unique(X[:, feature_idx])# 如果特征值较少,考虑作为离散特征处理if len(unique_values) <= 10:# 离散特征处理gain_ratio = self.information_gain_ratio(X, y, feature_idx)if gain_ratio > best_gain_ratio:best_gain_ratio = gain_ratiobest_feature = feature_idxbest_threshold = Noneelse:# 连续特征处理,尝试所有可能的阈值thresholds = (unique_values[:-1] + unique_values[1:]) / 2 # 取中间值作为候选阈值for threshold in thresholds:gain_ratio = self.information_gain_ratio(X, y, feature_idx, threshold)if gain_ratio > best_gain_ratio:best_gain_ratio = gain_ratiobest_feature = feature_idxbest_threshold = thresholdreturn best_feature, best_thresholddef majority_vote(self, y):"""多数投票确定类别"""if len(y) == 0:return Nonevalues, counts = np.unique(y, return_counts=True)return values[np.argmax(counts)]def build_tree(self, X, y, depth=0):"""递归构建决策树"""# 如果所有样本属于同一类别,创建叶节点if len(np.unique(y)) == 1:return Node(is_leaf=True, label=y[0])# 如果达到最大深度,创建叶节点(预剪枝)if self.use_pruning and self.prune_method == 'pre' and self.max_depth is not None and depth >= self.max_depth:return Node(is_leaf=True, label=self.majority_vote(y))# 如果样本数少于最小分裂样本数,创建叶节点(预剪枝)if self.use_pruning and self.prune_method == 'pre' and len(X) < self.min_samples_split:return Node(is_leaf=True, label=self.majority_vote(y))# 寻找最佳分裂点best_feature, best_threshold = self.find_best_split(X, y)# 如果无法找到有意义的分裂点,创建叶节点if best_feature is None:return Node(is_leaf=True, label=self.majority_vote(y))# 创建决策节点node = Node(feature=best_feature, threshold=best_threshold)# 根据最佳分裂点分割数据并递归构建子树if best_threshold is not None: # 连续特征left_mask = X[:, best_feature] <= best_thresholdright_mask = X[:, best_feature] > best_threshold# 预剪枝:检查子节点样本数if self.use_pruning and self.prune_method == 'pre':if len(X[left_mask]) < self.min_samples_leaf or len(X[right_mask]) < self.min_samples_leaf:return Node(is_leaf=True, label=self.majority_vote(y))node.children['<='] = self.build_tree(X[left_mask], y[left_mask], depth + 1)node.children['>'] = self.build_tree(X[right_mask], y[right_mask], depth + 1)else: # 离散特征unique_values = np.unique(X[:, best_feature])for value in unique_values:mask = X[:, best_feature] == valuesubset_X, subset_y = X[mask], y[mask]# 预剪枝:检查子节点样本数if self.use_pruning and self.prune_method == 'pre':if len(subset_X) < self.min_samples_leaf:continuenode.children[value] = self.build_tree(subset_X, subset_y, depth + 1)return nodedef fit(self, X, y):"""训练决策树"""self.root = self.build_tree(X, y)# 如果启用后剪枝if self.use_pruning and self.prune_method == 'post':self.prune_tree(X, y)def prune_tree(self, X, y):"""后剪枝函数,使用悲观错误剪枝法"""def _prune(node, X, y):if node.is_leaf:return node, len(y), np.sum(y == node.label)correct_predictions = 0total_samples = 0child_nodes = []# 递归剪枝子节点if node.threshold is not None: # 连续特征left_mask = X[:, node.feature] <= node.thresholdright_mask = X[:, node.feature] > node.thresholdif len(X[left_mask]) > 0:node.children['<='], left_total, left_correct = _prune(node.children['<='], X[left_mask], y[left_mask])total_samples += left_totalcorrect_predictions += left_correctif len(X[right_mask]) > 0:node.children['>'], right_total, right_correct = _prune(node.children['>'], X[right_mask], y[right_mask])total_samples += right_totalcorrect_predictions += right_correctelse: # 离散特征for value, child in node.children.items():mask = X[:, node.feature] == valueif np.any(mask):pruned_child, child_total, child_correct = _prune(child, X[mask], y[mask])node.children[value] = pruned_childtotal_samples += child_totalcorrect_predictions += child_correct