import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, KFold, cross_val_score
from sklearn.linear_model import Ridge
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors, KNeighborsClassifier
from sklearn.cluster import KMeans
import warnings
warnings.filterwarnings('ignore')class AdaptiveRecipeFramework:def __init__(self, model_type='linear', cv_folds=5, random_state=42,k_range_strategy='auto'):"""初始化自适应工艺数据分析框架,支持完全自适应的k值范围:param model_type: 模型类型,'linear'或'neural':param cv_folds: 交叉验证折数:param random_state: 随机种子:param k_range_strategy: k值范围策略,'auto'自动或'fixed'固定"""self.model_type = model_typeself.cv = KFold(n_splits=cv_folds, shuffle=True, random_state=random_state)self.random_state = random_stateself.k_range_strategy = k_range_strategy# 动态确定的参数self.min_k = Noneself.max_k = Noneself.k_values = Noneself.min_k_ratio = Noneself.max_k_ratio = None# 存储结果的字典self.evaluation_results = {}self.optimal_k = Noneself.best_model = Noneself.scaler = Noneself.data_metrics = {}self.sample_k = {}self.sample_densities = None # 每个样本的密度# 评估指标self.metrics = {'mse': mean_squared_error,'rmse': lambda y_true, y_pred: np.sqrt(mean_squared_error(y_true, y_pred)),'r2': r2_score}def _analyze_data_distribution(self, X):"""深入分析数据分布特征,为k值范围提供依据"""n_samples = len(X)# 1. 计算数据稀疏性指标(平均最近邻距离的变异系数)nn = NearestNeighbors(n_neighbors=5)nn.fit(X)distances, _ = nn.kneighbors(X)self.sample_densities = np.mean(distances, axis=1)density_cv = np.std(self.sample_densities) / np.mean(self.sample_densities) # 变异系数# 2. 评估数据聚类特性# 使用轮廓系数评估聚类质量(值越高表示聚类越明显)n_clusters = min(10, max(2, int(n_samples ** 0.5))) # 聚类数量kmeans = KMeans(n_clusters=n_clusters, random_state=self.random_state)clusters = kmeans.fit_predict(X)# 计算每个聚类的样本数量cluster_sizes = np.bincount(clusters)cluster_size_cv = np.std(cluster_sizes) / np.mean(cluster_sizes) # 聚类大小的变异系数# 3. 计算数据分布的峰度和偏度density_kurtosis = stats.kurtosis(self.sample_densities)density_skew = stats.skew(self.sample_densities)# 4. 综合以上指标确定k值范围比例# 稀疏性高(density_cv大)的数据需要更小的k比例sparsity_factor = 1 - min(0.8, density_cv)# 聚类明显(cluster_size_cv小)的数据需要更大的k比例clustering_factor = 1 + min(0.5, 1 - cluster_size_cv)# 峰度高(数据集中)的数据需要更小的k比例kurtosis_factor = 1 - min(0.4, max(0, density_kurtosis / 10))# 综合因子(0.3-1.5之间)综合_factor = sparsity_factor * clustering_factor * kurtosis_factor# 确定最终比例(根据综合因子调整)self.min_k_ratio = max(0.02, min(0.15, 0.05 * 综合_factor))self.max_k_ratio = max(0.1, min(0.5, 0.3 * 综合_factor))# 对于非常小的数据集特殊处理if n_samples < 50:self.min_k_ratio = max(self.min_k_ratio, 0.1)self.max_k_ratio = max(self.max_k_ratio, 0.3)return {'density_cv': density_cv,'cluster_size_cv': cluster_size_cv,'density_kurtosis': density_kurtosis,'density_skew': density_skew,'sparsity_factor': sparsity_factor,'clustering_factor': clustering_factor,'kurtosis_factor': kurtosis_factor,'综合因子': 综合_factor,'min_k_ratio': self.min_k_ratio,'max_k_ratio': self.max_k_ratio}def _calculate_data_metrics(self, X):"""计算数据特性指标,包括基于数据分布的动态k范围"""n_samples = len(X)# 分析数据分布以确定k值比例distribution_metrics = self._analyze_data_distribution(X)self.data_metrics.update(distribution_metrics)# 根据策略确定k值范围if self.k_range_strategy == 'auto':# 基于样本数量和数据分布的动态k范围self.min_k = max(2, int(n_samples * self.min_k_ratio))self.max_k = min(int(n_samples * self.max_k_ratio), n_samples - 1)# 确保min_k小于max_k且有合理数量的k值进行评估if self.min_k >= self.max_k:self.max_k = min(self.min_k + 5, n_samples - 1)self.min_k = max(2, self.max_k - 5)# 确定评估的k值点(最多10个点)k_step = max(1, (self.max_k - self.min_k) // 10)self.k_values = range(self.min_k, self.max_k + 1, k_step)else:# 固定范围(默认2-12,但可通过set_fixed_k_range方法修改)if self.k_values is None:self.k_values = range(2, 13)self.min_k = min(self.k_values)self.max_k = max(self.k_values)# 计算密度百分位数self.data_metrics['density_percentile_30'] = np.percentile(self.sample_densities, 30)self.data_metrics['density_percentile_70'] = np.percentile(self.sample_densities, 70)# 计算特征方差self.data_metrics['feature_variance'] = np.mean(X.var(axis=0))return self.data_metricsdef set_fixed_k_range(self, min_k, max_k, step=1):"""设置固定的k值范围"""self.k_range_strategy = 'fixed'self.k_values = range(min_k, max_k + 1, step)self.min_k = min_kself.max_k = max_kreturn selfdef _create_differential_data(self, X, y, k):"""创建指定k值的差分数据"""X_diff = []y_diff = []reference_indices = []for i in range(len(X)):# 排除当前样本mask = np.ones(len(X), dtype=bool)mask[i] = FalseX_other = X[mask]# 找到k个最近邻k_effective = min(k, len(X_other))nn = NearestNeighbors(n_neighbors=k_effective)nn.fit(X_other)distances, indices = nn.kneighbors([X[i]])# 映射回原始索引original_indices = np.where(mask)[0][indices[0]]# 选择最佳参考样本(基于最小距离)best_idx = original_indices[np.argmin(distances[0])]# 计算差分X_diff.append(X[i] - X[best_idx])y_diff.append(y[i] - y[best_idx])reference_indices.append(best_idx)return np.array(X_diff), np.array(y_diff), reference_indicesdef _evaluate_k_performance(self, X, y, k):"""评估特定k值的性能"""X_diff, y_diff, _ = self._create_differential_data(X, y, k)scaler = StandardScaler()X_scaled = scaler.fit_transform(X_diff)# 多目标变量处理n_targets = y_diff.shape[1] if len(y_diff.shape) > 1 else 1if n_targets == 1:y_diff = y_diff.reshape(-1, 1)# 存储每个目标变量的评分target_scores = {metric: [] for metric in self.metrics}# 对每个目标变量评估for target_idx in range(n_targets):y_target = y_diff[:, target_idx]# 定义模型if self.model_type == 'linear':model = Ridge(alpha=0.1, random_state=self.random_state)else:model = MLPRegressor(hidden_layer_sizes=(64, 32),activation='relu',max_iter=500,random_state=self.random_state,early_stopping=True)# 交叉验证评估for metric_name, metric_func in self.metrics.items():if metric_name == 'r2':scores = cross_val_score(model, X_scaled, y_target, cv=self.cv, scoring='r2')else:# 误差指标使用负值scoring = f'neg_mean_{metric_name}_error' if metric_name != 'rmse' else 'neg_root_mean_squared_error'scores = -cross_val_score(model, X_scaled, y_target, cv=self.cv, scoring=scoring)target_scores[metric_name].append(np.mean(scores))# 计算所有目标变量的平均性能avg_scores = {metric: np.mean(scores) for metric, scores in target_scores.items()}std_scores = {metric: np.std(scores) for metric, scores in target_scores.items()}return avg_scores, std_scoresdef determine_optimal_k(self, X, y):"""确定全局最优邻域大小"""# 计算数据特性(包括动态k范围)self._calculate_data_metrics(X)print("数据分布分析:")print(f" - 样本数量: {len(X)}")print(f" - 密度变异系数(稀疏性): {self.data_metrics['density_cv']:.4f}")print(f" - 聚类大小变异系数: {self.data_metrics['cluster_size_cv']:.4f}")print(f" - 综合因子: {self.data_metrics['综合因子']:.4f}")print(f" - 确定的k值比例: {self.min_k_ratio:.2%} 到 {self.max_k_ratio:.2%}")print(f" - 评估的k值范围: {self.min_k} 到 {self.max_k}")# 评估所有k值self.evaluation_results['mean'] = {metric: [] for metric in self.metrics}self.evaluation_results['std'] = {metric: [] for metric in self.metrics}print("\n评估不同邻域大小...")for k in self.k_values:print(f"评估k={k}", end='\r')avg_scores, std_scores = self._evaluate_k_performance(X, y, k)for metric in self.metrics:self.evaluation_results['mean'][metric].append(avg_scores[metric])self.evaluation_results['std'][metric].append(std_scores[metric])# 确定最优k值rmse_scores = np.array(self.evaluation_results['mean']['rmse'])r2_scores = np.array(self.evaluation_results['mean']['r2'])# 标准化分数并加权norm_rmse = (rmse_scores - np.min(rmse_scores)) / (np.max(rmse_scores) - np.min(rmse_scores))norm_r2 = (r2_scores - np.min(r2_scores)) / (np.max(r2_scores) - np.min(r2_scores))# 综合评分combined_score = 0.6 * norm_rmse + 0.4 * (1 - norm_r2)self.optimal_k = self.k_values[np.argmin(combined_score)]print(f"\n最优邻域大小: k={self.optimal_k}")return self.optimal_kdef _find_sample_optimal_k(self, X, y, sample_idx):"""为单个样本找到最优k值,使用完全自适应范围"""X_sample = X[sample_idx]y_sample = y[sample_idx]# 排除当前样本mask = np.ones(len(X), dtype=bool)mask[sample_idx] = FalseX_other = X[mask]y_other = y[mask]n_other = len(X_other)if n_other == 0:return self.min_k # 处理只有一个样本的极端情况# 获取当前样本的密度和数据分布特性sample_density = self.sample_densities[sample_idx]综合_factor = self.data_metrics['综合因子']# 根据综合因子调整k值范围的灵敏度sensitivity = 0.3 + 0.2 *综合_factor# 基础候选k范围(基于全局最优k)base_min = max(self.min_k, int(self.optimal_k * (1 - sensitivity)))base_max = min(self.max_k, int(self.optimal_k * (1 + sensitivity)))# 根据样本密度调整候选k范围if sample_density < self.data_metrics['density_percentile_30']:# 高密度区域:选择较大的k值candidate_min = max(base_min, self.optimal_k)candidate_max = base_maxelif sample_density > self.data_metrics['density_percentile_70']:# 低密度区域:选择较小的k值candidate_min = base_mincandidate_max = min(base_max, self.optimal_k)else:# 中等密度区域:使用全部可能的k值candidate_min = base_mincandidate_max = base_max# 确保候选范围有效且不超过可用样本数candidate_min = max(self.min_k, candidate_min)candidate_max = min(candidate_max, n_other)candidate_ks = list(range(candidate_min, candidate_max + 1))# 如果没有候选k值,使用安全默认值if not candidate_ks:candidate_ks = [min(self.optimal_k, n_other, self.max_k)]# 评估每个候选k值scores = []for k in candidate_ks:# 找到k个最近邻作为整体参考集nn = NearestNeighbors(n_neighbors=k)nn.fit(X_other)distances, indices = nn.kneighbors([X_sample])# 映射回原始索引reference_indices = np.where(mask)[0][indices[0]]# 获取参考样本X_refs = X[reference_indices]y_refs = y[reference_indices]# 计算与所有参考样本的差分X_diffs = X_sample - X_refsy_diffs = y_sample - y_refs# 使用留一法交叉验证评估性能model = Ridge(alpha=0.1, random_state=self.random_state)cv_errors = []for i in range(len(X_diffs)):# 留一法:每次留一个作为验证集X_train = np.delete(X_diffs, i, axis=0)y_train = np.delete(y_diffs.ravel(), i, axis=0)X_val = X_diffs[i:i+1]y_val = y_diffs[i:i+1].ravel()if len(X_train) == 0: # 处理k=1的情况cv_errors.append(0)continuemodel.fit(X_train, y_train)y_pred = model.predict(X_val)cv_errors.append(mean_squared_error(y_val, y_pred))# 使用平均交叉验证误差作为评分scores.append(np.mean(cv_errors))# 选择最佳k值(误差最小)best_k_idx = np.argmin(scores)return candidate_ks[best_k_idx]def prepare_adaptive_differential_data(self, X, y):"""准备自适应差分数据"""if self.optimal_k is None:self.determine_optimal_k(X, y)# 为每个样本确定最优k值print("\n为每个样本确定最优邻域大小...")for i in range(len(X)):if i % 10 == 0:print(f"处理样本 {i}/{len(X)}", end='\r')self.sample_k[i] = self._find_sample_optimal_k(X, y, i)# 创建自适应差分数据X_diff = []y_diff = []reference_indices = []for i in range(len(X)):k = self.sample_k[i]mask = np.ones(len(X), dtype=bool)mask[i] = FalseX_other = X[mask]# 找到k个最近邻k_effective = min(k, len(X_other))nn = NearestNeighbors(n_neighbors=k_effective)nn.fit(X_other)distances, indices = nn.kneighbors([X[i]])# 映射回原始索引并选择最佳参考样本original_indices = np.where(mask)[0][indices[0]]best_idx = original_indices[np.argmin(distances[0])]# 计算差分X_diff.append(X[i] - X[best_idx])y_diff.append(y[i] - y[best_idx])reference_indices.append(best_idx)print("\n差分数据准备完成")return np.array(X_diff), np.array(y_diff), reference_indicesdef train_best_model(self, X, y):"""使用最优邻域大小训练最终模型"""# 准备自适应差分数据X_diff, y_diff, reference_indices = self.prepare_adaptive_differential_data(X, y)# 标准化self.scaler = StandardScaler()X_scaled = self.scaler.fit_transform(X_diff)# 多目标变量处理n_targets = y_diff.shape[1] if len(y_diff.shape) > 1 else 1if n_targets == 1:y_diff = y_diff.reshape(-1, 1)# 存储每个目标变量的最佳模型self.best_model = {}print("\n训练最佳模型...")for target_idx in range(n_targets):y_target = y_diff[:, target_idx]# 划分训练集和验证集X_train, X_val, y_train, y_val = train_test_split(X_scaled, y_target, test_size=0.2, random_state=self.random_state)# 定义并训练模型if self.model_type == 'linear':model = Ridge(alpha=0.1, random_state=self.random_state)else:model = MLPRegressor(hidden_layer_sizes=(64, 32),activation='relu',max_iter=500,random_state=self.random_state,early_stopping=True)model.fit(X_train, y_train)# 评估模型y_pred = model.predict(X_val)mse = mean_squared_error(y_val, y_pred)r2 = r2_score(y_val, y_pred)print(f"目标变量 {target_idx} - MSE: {mse:.4f}, R²: {r2:.4f}")self.best_model[target_idx] = modelreturn {'X_diff': X_diff,'y_diff': y_diff,'reference_indices': reference_indices}def plot_analysis(self):"""绘制分析图表"""# 1. 不同k值的性能曲线plt.figure(figsize=(12, 8))# RMSE曲线plt.subplot(2, 1, 1)rmse_means = self.evaluation_results['mean']['rmse']rmse_stds = self.evaluation_results['std']['rmse']plt.errorbar(self.k_values, rmse_means, yerr=rmse_stds, fmt='-o', capsize=5, color='blue')plt.axvline(x=self.optimal_k, color='r', linestyle='--', label=f'最优k={self.optimal_k}')plt.title('RMSE与邻域大小k的关系')plt.xlabel('邻域大小k')plt.ylabel('RMSE')plt.grid(True, alpha=0.3)plt.legend()# R²曲线plt.subplot(2, 1, 2)r2_means = self.evaluation_results['mean']['r2']r2_stds = self.evaluation_results['std']['r2']plt.errorbar(self.k_values, r2_means, yerr=r2_stds, fmt='-o', capsize=5, color='green')plt.axvline(x=self.optimal_k, color='r', linestyle='--', label=f'最优k={self.optimal_k}')plt.title('R²与邻域大小k的关系')plt.xlabel('邻域大小k')plt.ylabel('R²')plt.grid(True, alpha=0.3)plt.legend()plt.tight_layout()plt.show()# 2. 样本k值分布plt.figure(figsize=(10, 6))sns.histplot(list(self.sample_k.values()), bins=10, kde=True)plt.axvline(x=self.optimal_k, color='r', linestyle='--', label=f'全局最优k={self.optimal_k}')plt.title('样本最优k值分布')plt.xlabel('k值')plt.ylabel('样本数量')plt.grid(True, alpha=0.3)plt.legend()plt.show()# 3. 样本密度与选择的k值关系plt.figure(figsize=(10, 6))plt.scatter(self.sample_densities, list(self.sample_k.values()), alpha=0.6)plt.axvline(x=self.data_metrics['density_percentile_30'], color='g', linestyle='--', label='30% 百分位数')plt.axvline(x=self.data_metrics['density_percentile_70'], color='orange', linestyle='--', label='70% 百分位数')plt.axhline(y=self.optimal_k, color='r', linestyle='--', label=f'全局最优k={self.optimal_k}')plt.title('样本密度与选择的k值关系')plt.xlabel('样本密度(平均最近邻距离)')plt.ylabel('选择的k值')plt.grid(True, alpha=0.3)plt.legend()plt.show()# 示例使用
if __name__ == "__main__":import scipy.stats as stats # 导入stats模块# 设置随机种子np.random.seed(42)# 生成不同分布特性的模拟数据进行测试test_cases = [{"name": "密集聚类数据","n_samples": 100, "n_features": 200, "n_targets": 6,"density_factor": 0.3 # 较小的值表示更密集},{"name": "稀疏分散数据","n_samples": 100, "n_features": 200, "n_targets": 6,"density_factor": 2.0 # 较大的值表示更稀疏},{"name": "混合分布数据","n_samples": 100, "n_features": 200, "n_targets": 6,"density_factor": 1.0 # 中等密度}]for case in test_cases:print(f"\n===== 测试{case['name']} =====")n_samples = case["n_samples"]n_features = case["n_features"]n_targets = case["n_targets"]density_factor = case["density_factor"]# 生成不同密度区域的特征X_dense = np.random.randn(int(n_samples*0.3), n_features) * 0.5 * density_factor + 1X_sparse = np.random.randn(int(n_samples*0.7), n_features) * 2 * density_factor - 2X = np.vstack([X_dense, X_sparse])# 生成目标变量important_features = np.random.choice(n_features, 10, replace=False)coefficients = np.random.randn(10, n_targets)y = np.zeros((n_samples, n_targets))for i in range(n_targets):y[:, i] = X[:, important_features].dot(coefficients[:, i]) + np.random.randn(n_samples) * 0.15# 初始化并训练模型(完全自适应模式)framework = AdaptiveRecipeFramework(model_type='linear',k_range_strategy='auto')framework.train_best_model(X, y)framework.plot_analysis()
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/935664.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!相关文章
2025 工业风机十大品牌全景解析报告:覆盖离心风机,防爆风机,矿用风机的最新推荐
本报告基于 2025 年行业权威数据,结合中国玻璃钢协会、美国国际空气运动及控制协会(AMCA)等机构的测评结果,从技术创新、性能表现、市场布局三大维度,对工业风机十大品牌进行全景解析,为企业选购适配冶金、石化、…
详细介绍:P3.7计算机视觉
pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …
2.3 深度 Q 网络(Deep Q-Network, DQN)
深度 Q 网络(Deep Q-Network, DQN)深度神经网络在函数逼近中的局限性
值函数型深度强化学习的目标是使用深度神经网络(DNN)逼近每个状态–动作对的 Q 值。
网络可以有两种形式(见下图):以状态–动作对 \((s,a)\…
Linux系统目录(文件)结构
Linux系统目录(文件)结构下方表格转载自:
http://billie66.github.io/TLCL/book/chap04.html/
根目录,万物起源。/bin
包含系统启动和运行所必须的二进制程序。/boot
包含 Linux 内核、初始 RAM 磁盘映像(用于启动…
实用指南:如何读懂Mach-O:构建macOS和iOS应用安全的第一道认知防线
实用指南:如何读懂Mach-O:构建macOS和iOS应用安全的第一道认知防线pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: &qu…
shell展开shell数组
shell展开&shell数组
参数展开
大多数的参数展开会用在脚本中,而不是命令行中。
基本参数
The simplest form of parameter expansion is reflected in the ordinary use of variables.
最简单的参数展开形式反映…
格式化输出与文本处理
格式化输出与文本处理以下和vim都是程序,但是以下所讲述的都是命令行工具,vim是一个编辑器,是有本质区别的文本应用程序
到目前为止,我们已经知道了一对文本编辑器(nano 和 vim),看过一堆配置文件,并且目睹了许…
2025年10月镀锌卷板厂家最新推荐排行榜,有花镀锌卷板,无花镀锌卷板,高锌层镀锌卷板,批发镀锌卷板公司推荐
在工业制造与建筑建材领域,镀锌卷板作为基础防腐材料,其品质直接影响终端产品的使用寿命与安全性能。当前市场上,镀锌卷板厂家数量众多,产品质量参差不齐:部分厂商存在锌层厚度不达标、加工时锌层易脱落等问题,导…
React 19.2 重磅更新!这几个新特性终于来了
大家好,我是 Immerse,一名独立开发者、内容创作者、AGI 实践者。
关注公众号:沉浸式趣谈,获取最新文章(更多内容只在公众号更新)
个人网站:https://yaolifeng.com 也同步更新。
转载请在文章开头注明出处和版权…
Akka.NET高性能分布式Actor框架完全指南
Akka.NET是.NET平台上功能完整的Actor模型实现,提供高性能的分布式计算能力。该项目包含完整的Actor系统、集群管理、持久化、流处理等核心功能,支持构建高并发、高可用的分布式应用程序。Akka.NET - .NET平台的Acto…
基于Docker搭建MySQL Cluster
1.整体介绍
1.1整体结构1.2搭建步骤搭建3个MySQL实例.
将3个MySQL实例组建集群.
搭建MySQL Router.2.搭建MySQL
2.1 MySQL配置文件及数据文件目录结构
/data
└── mysql├── 3301 #存放数据目录├── 3302├── …
某中心与华盛顿大学公布机器人研究奖项与学者名单
某中心与华盛顿大学联合公布科学中心第二届学者名单及五项机器人研究奖项,涵盖机器人安全控制、深度强化学习、多机器人系统等前沿技术领域,并宣布加入云基础设施未来研究中心推动下一代云计算创新。某中心与华盛顿大…
阻塞、非阻塞、同步、异步的区别是什么?
同步异步描述的是被调用方。阻塞非阻塞描述的是调用方。二者没有必然联系。阻塞是调用方A发出命令后,必须等待B返回结果。非阻塞是调用方A发出命令后,A不需要等待B,可以做自己的事情。
同步是B收到A的指令之后会立即…
如何防范员工泄露数据给 AI?2025年选型与落地实战版
结论:面向已经开展 AI 办公、又担心把客户隐私与业务机密“喂给”第三方模型的企业,优先选用 AI-FOCUS 团队的「滤海 AI DLP」。通过流式网关把“检测—策略—处置—留痕”前置到数据进入 LLM 之前,统一覆盖文本、文…