import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, KFold, cross_val_score
from sklearn.linear_model import Ridge
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import DBSCAN
import scipy.stats as stats
import warnings
warnings.filterwarnings('ignore')class AdaptiveRecipeFramework:def __init__(self, model_type='linear', cv_folds=5, random_state=42,k_range_strategy='auto'):"""初始化自适应工艺数据分析框架,融合DBSCAN思想确定K值:param model_type: 模型类型,'linear'或'neural':param cv_folds: 交叉验证折数:param random_state: 随机种子:param k_range_strategy: k值范围策略,'auto'自动或'fixed'固定"""self.model_type = model_typeself.cv = KFold(n_splits=cv_folds, shuffle=True, random_state=random_state)self.random_state = random_stateself.k_range_strategy = k_range_strategy# 动态确定的参数self.min_k = Noneself.max_k = Noneself.k_values = Noneself.min_k_ratio = Noneself.max_k_ratio = None# DBSCAN相关参数self.dbscan_labels = Noneself.core_samples = Noneself.cluster_info = {}  # 存储每个聚类的信息# 存储结果的字典self.evaluation_results = {}self.optimal_k = Noneself.best_model = Noneself.scaler = Noneself.data_metrics = {}self.sample_k = {}self.sample_densities = None  # 每个样本的密度self.eps_neighbors = None  # 每个样本在ε邻域内的点数(DBSCAN思想)# 评估指标self.metrics = {'mse': mean_squared_error,'rmse': lambda y_true, y_pred: np.sqrt(mean_squared_error(y_true, y_pred)),'r2': r2_score}def _calculate_dbscan_features(self, X):"""使用DBSCAN思想计算数据特征"""n_samples = len(X)# 1. 计算每个样本的k-距离(用于确定合适的ε)k = min(5, n_samples - 1)  # 使用5-最近邻距离nn = NearestNeighbors(n_neighbors=k)nn.fit(X)distances, _ = nn.kneighbors(X)k_distances = distances[:, -1]  # 第k个最近邻的距离# 确定DBSCAN的ε参数(使用k-距离的90%分位数)eps = np.percentile(k_distances, 90)# 2. 计算每个样本在ε邻域内的点数(核心DBSCAN概念)nn_eps = NearestNeighbors(radius=eps)nn_eps.fit(X)neighbors = nn_eps.radius_neighbors(X, return_distance=False)self.eps_neighbors = np.array([len(nb) for nb in neighbors])  # 每个样本的ε邻域内点数# 3. 运行DBSCAN获取聚类信息# 自动确定min_samples: 基于数据密度的中位数min_samples = max(5, int(np.median(self.eps_neighbors)))dbscan = DBSCAN(eps=eps, min_samples=min_samples, n_jobs=-1)self.dbscan_labels = dbscan.fit_predict(X)self.core_samples = dbscan.core_sample_indices_# 4. 分析每个聚类的特性unique_labels = np.unique(self.dbscan_labels)for label in unique_labels:if label == -1:  # 噪声点continuecluster_mask = self.dbscan_labels == labelcluster_size = np.sum(cluster_mask)cluster_density = np.mean(self.eps_neighbors[cluster_mask]) / cluster_sizeself.cluster_info[label] = {'size': cluster_size,'density': cluster_density,'samples': np.where(cluster_mask)[0]}return {'eps': eps,'min_samples': min_samples,'n_clusters': len(unique_labels) - (1 if -1 in unique_labels else 0),'n_noise': np.sum(self.dbscan_labels == -1),'avg_eps_neighbors': np.mean(self.eps_neighbors)}def _analyze_data_distribution(self, X):"""结合DBSCAN思想分析数据分布,确定k值范围参数"""n_samples = len(X)# 使用DBSCAN思想计算特征dbscan_metrics = self._calculate_dbscan_features(X)self.data_metrics.update(dbscan_metrics)# 1. 基于ε邻域内点数确定密度因子# DBSCAN核心思想:邻域内点数越多,密度越高density_factor = np.mean(self.eps_neighbors) / n_samples# 2. 聚类特性分析if self.data_metrics['n_clusters'] > 0:cluster_sizes = [info['size'] for info in self.cluster_info.values()]cluster_size_cv = np.std(cluster_sizes) / np.mean(cluster_sizes)avg_cluster_density = np.mean([info['density'] for info in self.cluster_info.values()])else:cluster_size_cv = 0avg_cluster_density = 0# 3. 综合确定k值比例# 高密度区域(ε邻域内点数多)使用更大的k比例density_component = min(1.5, max(0.5, density_factor * 10))# 聚类明显的数据集使用更大的k比例cluster_component = min(1.3, max(0.7, 1 - cluster_size_cv * 0.5))# 噪声比例高的数据集使用更小的k比例noise_ratio = self.data_metrics['n_noise'] / n_samplesnoise_component = min(1.0, max(0.5, 1 - noise_ratio * 2))# 综合因子综合_factor = density_component * cluster_component * noise_component# 确定最终k值比例self.min_k_ratio = max(0.02, min(0.2, 0.05 * 综合_factor * 1.2))self.max_k_ratio = max(0.1, min(0.6, 0.3 * 综合_factor * 1.2))# 对小数据集特殊处理if n_samples < 50:self.min_k_ratio = max(self.min_k_ratio, 0.1)self.max_k_ratio = max(self.max_k_ratio, 0.4)return {'density_factor': density_factor,'cluster_size_cv': cluster_size_cv,'avg_cluster_density': avg_cluster_density,'noise_ratio': noise_ratio,'density_component': density_component,'cluster_component': cluster_component,'noise_component': noise_component,'综合_factor': 综合_factor,'min_k_ratio': self.min_k_ratio,'max_k_ratio': self.max_k_ratio}def _calculate_data_metrics(self, X):"""计算数据特性指标,包括基于DBSCAN的动态k范围"""n_samples = len(X)# 分析数据分布以确定k值比例distribution_metrics = self._analyze_data_distribution(X)self.data_metrics.update(distribution_metrics)# 根据策略确定k值范围if self.k_range_strategy == 'auto':# 基于样本数量和DBSCAN特征的动态k范围self.min_k = max(2, int(n_samples * self.min_k_ratio))self.max_k = min(int(n_samples * self.max_k_ratio), n_samples - 1)# 确保min_k小于max_k且有合理数量的k值进行评估if self.min_k >= self.max_k:self.max_k = min(self.min_k + 5, n_samples - 1)self.min_k = max(2, self.max_k - 5)# 确定评估的k值点(最多10个点)k_step = max(1, (self.max_k - self.min_k) // 10)self.k_values = range(self.min_k, self.max_k + 1, k_step)else:# 固定范围if self.k_values is None:self.k_values = range(2, 13)self.min_k = min(self.k_values)self.max_k = max(self.k_values)# 计算密度百分位数self.sample_densities = 1.0 / (self.eps_neighbors + 1e-6)  # 用ε邻域点数表示密度(值越高密度越大)self.data_metrics['density_percentile_30'] = np.percentile(self.sample_densities, 30)self.data_metrics['density_percentile_70'] = np.percentile(self.sample_densities, 70)# 计算特征方差self.data_metrics['feature_variance'] = np.mean(X.var(axis=0))return self.data_metricsdef set_fixed_k_range(self, min_k, max_k, step=1):"""设置固定的k值范围"""self.k_range_strategy = 'fixed'self.k_values = range(min_k, max_k + 1, step)self.min_k = min_kself.max_k = max_kreturn selfdef _find_sample_optimal_k(self, X, y, sample_idx):"""结合DBSCAN思想为单个样本找到最优k值"""X_sample = X[sample_idx]y_sample = y[sample_idx]# 排除当前样本mask = np.ones(len(X), dtype=bool)mask[sample_idx] = FalseX_other = X[mask]y_other = y[mask]n_other = len(X_other)if n_other == 0:return self.min_k# 获取样本的DBSCAN特征sample_label = self.dbscan_labels[sample_idx]sample_eps_neighbors = self.eps_neighbors[sample_idx]  # ε邻域内的点数sample_density = self.sample_densities[sample_idx]# 根据DBSCAN聚类信息调整k值范围# 1. 噪声点使用较小的k值范围if sample_label == -1:base_k = max(self.min_k, min(self.optimal_k // 2, self.max_k))k_range = 0.2  # 小范围else:# 2. 核心点使用较大的k值范围is_core_sample = sample_idx in self.core_samplescluster_info = self.cluster_info.get(sample_label, {})cluster_size = cluster_info.get('size', n_other)# 聚类内样本数多则允许更大的k值base_k = min(self.optimal_k, int(cluster_size * 0.3))k_range = 0.4 if is_core_sample else 0.3# 基于样本密度和DBSCAN特征确定候选k范围base_min = max(self.min_k, int(base_k * (1 - k_range)))base_max = min(self.max_k, int(base_k * (1 + k_range)))# 结合密度调整if sample_density > self.data_metrics['density_percentile_70']:# 高密度区域:选择较大的k值candidate_min = max(base_min, base_k)candidate_max = base_maxelif sample_density < self.data_metrics['density_percentile_30']:# 低密度区域:选择较小的k值candidate_min = base_mincandidate_max = min(base_max, base_k)else:# 中等密度区域candidate_min = base_mincandidate_max = base_max# 确保候选范围有效candidate_min = max(self.min_k, candidate_min)candidate_max = min(candidate_max, n_other, sample_eps_neighbors)  # 不超过ε邻域内的点数candidate_ks = list(range(candidate_min, candidate_max + 1))# 处理极端情况if not candidate_ks:candidate_ks = [min(base_k, n_other, self.max_k)]# 评估每个候选k值scores = []for k in candidate_ks:# 找到k个最近邻作为整体参考集nn = NearestNeighbors(n_neighbors=k)nn.fit(X_other)distances, indices = nn.kneighbors([X_sample])# 映射回原始索引reference_indices = np.where(mask)[0][indices[0]]# 获取参考样本X_refs = X[reference_indices]y_refs = y[reference_indices]# 计算与所有参考样本的差分X_diffs = X_sample - X_refsy_diffs = y_sample - y_refs# 使用留一法交叉验证评估性能model = Ridge(alpha=0.1, random_state=self.random_state)cv_errors = []for i in range(len(X_diffs)):X_train = np.delete(X_diffs, i, axis=0)y_train = np.delete(y_diffs.ravel(), i, axis=0)X_val = X_diffs[i:i+1]y_val = y_diffs[i:i+1].ravel()if len(X_train) == 0:cv_errors.append(0)continuemodel.fit(X_train, y_train)y_pred = model.predict(X_val)cv_errors.append(mean_squared_error(y_val, y_pred))scores.append(np.mean(cv_errors))# 选择最佳k值best_k_idx = np.argmin(scores)return candidate_ks[best_k_idx]def _create_differential_data(self, X, y, k):"""创建指定k值的差分数据"""X_diff = []y_diff = []reference_indices = []for i in range(len(X)):mask = np.ones(len(X), dtype=bool)mask[i] = FalseX_other = X[mask]k_effective = min(k, len(X_other))nn = NearestNeighbors(n_neighbors=k_effective)nn.fit(X_other)distances, indices = nn.kneighbors([X[i]])original_indices = np.where(mask)[0][indices[0]]best_idx = original_indices[np.argmin(distances[0])]X_diff.append(X[i] - X[best_idx])y_diff.append(y[i] - y[best_idx])reference_indices.append(best_idx)return np.array(X_diff), np.array(y_diff), reference_indicesdef _evaluate_k_performance(self, X, y, k):"""评估特定k值的性能"""X_diff, y_diff, _ = self._create_differential_data(X, y, k)scaler = StandardScaler()X_scaled = scaler.fit_transform(X_diff)n_targets = y_diff.shape[1] if len(y_diff.shape) > 1 else 1if n_targets == 1:y_diff = y_diff.reshape(-1, 1)target_scores = {metric: [] for metric in self.metrics}for target_idx in range(n_targets):y_target = y_diff[:, target_idx]if self.model_type == 'linear':model = Ridge(alpha=0.1, random_state=self.random_state)else:model = MLPRegressor(hidden_layer_sizes=(64, 32),activation='relu',max_iter=500,random_state=self.random_state,early_stopping=True)for metric_name, metric_func in self.metrics.items():if metric_name == 'r2':scores = cross_val_score(model, X_scaled, y_target, cv=self.cv, scoring='r2')else:scoring = f'neg_mean_{metric_name}_error' if metric_name != 'rmse' else 'neg_root_mean_squared_error'scores = -cross_val_score(model, X_scaled, y_target, cv=self.cv, scoring=scoring)target_scores[metric_name].append(np.mean(scores))avg_scores = {metric: np.mean(scores) for metric, scores in target_scores.items()}std_scores = {metric: np.std(scores) for metric, scores in target_scores.items()}return avg_scores, std_scoresdef determine_optimal_k(self, X, y):"""确定全局最优邻域大小"""self._calculate_data_metrics(X)print("DBSCAN数据分析:")print(f" - 样本数量: {len(X)}")print(f" - 聚类数量: {self.data_metrics['n_clusters']}")print(f" - 噪声点比例: {self.data_metrics['noise_ratio']:.2%}")print(f" - 平均ε邻域点数: {self.data_metrics['avg_eps_neighbors']:.1f}")print(f" - 确定的k值比例: {self.min_k_ratio:.2%} 到 {self.max_k_ratio:.2%}")print(f" - 评估的k值范围: {self.min_k} 到 {self.max_k}")self.evaluation_results['mean'] = {metric: [] for metric in self.metrics}self.evaluation_results['std'] = {metric: [] for metric in self.metrics}print("\n评估不同邻域大小...")for k in self.k_values:print(f"评估k={k}", end='\r')avg_scores, std_scores = self._evaluate_k_performance(X, y, k)for metric in self.metrics:self.evaluation_results['mean'][metric].append(avg_scores[metric])self.evaluation_results['std'][metric].append(std_scores[metric])# 确定最优k值rmse_scores = np.array(self.evaluation_results['mean']['rmse'])r2_scores = np.array(self.evaluation_results['mean']['r2'])norm_rmse = (rmse_scores - np.min(rmse_scores)) / (np.max(rmse_scores) - np.min(rmse_scores))norm_r2 = (r2_scores - np.min(r2_scores)) / (np.max(r2_scores) - np.min(r2_scores))combined_score = 0.6 * norm_rmse + 0.4 * (1 - norm_r2)self.optimal_k = self.k_values[np.argmin(combined_score)]print(f"\n最优邻域大小: k={self.optimal_k}")return self.optimal_kdef prepare_adaptive_differential_data(self, X, y):"""准备自适应差分数据"""if self.optimal_k is None:self.determine_optimal_k(X, y)print("\n为每个样本确定最优邻域大小...")for i in range(len(X)):if i % 10 == 0:print(f"处理样本 {i}/{len(X)}", end='\r')self.sample_k[i] = self._find_sample_optimal_k(X, y, i)X_diff = []y_diff = []reference_indices = []for i in range(len(X)):k = self.sample_k[i]mask = np.ones(len(X), dtype=bool)mask[i] = FalseX_other = X[mask]k_effective = min(k, len(X_other))nn = NearestNeighbors(n_neighbors=k_effective)nn.fit(X_other)distances, indices = nn.kneighbors([X[i]])original_indices = np.where(mask)[0][indices[0]]best_idx = original_indices[np.argmin(distances[0])]X_diff.append(X[i] - X[best_idx])y_diff.append(y[i] - y[best_idx])reference_indices.append(best_idx)print("\n差分数据准备完成")return np.array(X_diff), np.array(y_diff), reference_indicesdef train_best_model(self, X, y):"""训练最佳模型"""X_diff, y_diff, reference_indices = self.prepare_adaptive_differential_data(X, y)self.scaler = StandardScaler()X_scaled = self.scaler.fit_transform(X_diff)n_targets = y_diff.shape[1] if len(y_diff.shape) > 1 else 1if n_targets == 1:y_diff = y_diff.reshape(-1, 1)self.best_model = {}print("\n训练最佳模型...")for target_idx in range(n_targets):y_target = y_diff[:, target_idx]X_train, X_val, y_train, y_val = train_test_split(X_scaled, y_target, test_size=0.2, random_state=self.random_state)if self.model_type == 'linear':model = Ridge(alpha=0.1, random_state=self.random_state)else:model = MLPRegressor(hidden_layer_sizes=(64, 32),activation='relu',max_iter=500,random_state=self.random_state,early_stopping=True)model.fit(X_train, y_train)y_pred = model.predict(X_val)mse = mean_squared_error(y_val, y_pred)r2 = r2_score(y_val, y_pred)print(f"目标变量 {target_idx} - MSE: {mse:.4f}, R²: {r2:.4f}")self.best_model[target_idx] = modelreturn {'X_diff': X_diff,'y_diff': y_diff,'reference_indices': reference_indices}def plot_analysis(self):"""绘制分析图表"""# 1. DBSCAN聚类结果可视化(取前两个特征)if hasattr(self, 'dbscan_labels'):plt.figure(figsize=(10, 6))unique_labels = np.unique(self.dbscan_labels)colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))for label, color in zip(unique_labels, colors):if label == -1:# 噪声点plt.scatter(X[:, 0], X[:, 1], c='k', marker='x', s=50, label='噪声点')else:mask = self.dbscan_labels == labelplt.scatter(X[mask, 0], X[mask, 1], c=[color], s=50, label=f'聚类 {label} (大小: {np.sum(mask)})')plt.title('DBSCAN聚类结果')plt.xlabel('特征 1')plt.ylabel('特征 2')plt.legend()plt.grid(True, alpha=0.3)plt.show()# 2. 不同k值的性能曲线plt.figure(figsize=(12, 8))plt.subplot(2, 1, 1)rmse_means = self.evaluation_results['mean']['rmse']rmse_stds = self.evaluation_results['std']['rmse']plt.errorbar(self.k_values, rmse_means, yerr=rmse_stds, fmt='-o', capsize=5, color='blue')plt.axvline(x=self.optimal_k, color='r', linestyle='--', label=f'最优k={self.optimal_k}')plt.title('RMSE与邻域大小k的关系')plt.xlabel('邻域大小k')plt.ylabel('RMSE')plt.grid(True, alpha=0.3)plt.legend()plt.subplot(2, 1, 2)r2_means = self.evaluation_results['mean']['r2']r2_stds = self.evaluation_results['std']['r2']plt.errorbar(self.k_values, r2_means, yerr=r2_stds, fmt='-o', capsize=5, color='green')plt.axvline(x=self.optimal_k, color='r', linestyle='--', label=f'最优k={self.optimal_k}')plt.title('R²与邻域大小k的关系')plt.xlabel('邻域大小k')plt.ylabel('R²')plt.grid(True, alpha=0.3)plt.legend()plt.tight_layout()plt.show()# 3. 样本k值分布plt.figure(figsize=(10, 6))sns.histplot(list(self.sample_k.values()), bins=10, kde=True)plt.axvline(x=self.optimal_k, color='r', linestyle='--', label=f'全局最优k={self.optimal_k}')plt.title('样本最优k值分布')plt.xlabel('k值')plt.ylabel('样本数量')plt.grid(True, alpha=0.3)plt.legend()plt.show()# 示例使用
if __name__ == "__main__":# 设置随机种子np.random.seed(42)# 生成具有不同聚类特性的数据test_cases = [{"name": "明显聚类数据","n_samples": 150, "n_features": 20, "n_targets": 3,"n_clusters": 4,"noise_ratio": 0.05},{"name": "噪声较多的数据","n_samples": 150, "n_features": 20, "n_targets": 3,"n_clusters": 2,"noise_ratio": 0.2}]for case in test_cases:print(f"\n===== 测试{case['name']} =====")n_samples = case["n_samples"]n_features = case["n_features"]n_targets = case["n_targets"]n_clusters = case["n_clusters"]noise_ratio = case["noise_ratio"]# 生成聚类数据X = []for i in range(n_clusters):cluster_size = int(n_samples * (1 - noise_ratio) / n_clusters)center = np.random.randn(n_features) * (i + 1)cluster = np.random.randn(cluster_size, n_features) * 0.8 + centerX.append(cluster)# 添加噪声点n_noise = int(n_samples * noise_ratio)noise = np.random.randn(n_noise, n_features) * 3X.append(noise)X = np.vstack(X)# 生成目标变量important_features = np.random.choice(n_features, 5, replace=False)coefficients = np.random.randn(5, n_targets)y = np.zeros((len(X), n_targets))for i in range(n_targets):y[:, i] = X[:, important_features].dot(coefficients[:, i]) + np.random.randn(len(X)) * 0.2# 初始化并训练模型(融合DBSCAN思想)framework = AdaptiveRecipeFramework(model_type='linear',k_range_strategy='auto')result = framework.train_best_model(X, y)framework.plot_analysis()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/935893.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!