✅博主简介:擅长数据搜集与处理、建模仿真、程序设计、仿真代码、论文写作与指导,毕业论文、期刊论文经验交流。
✅成品或者定制,扫描文章底部微信二维码。
(1)分阶段粒子群优化算法的设计与实现
投资组合优化问题的核心在于如何在风险与收益之间寻求最佳平衡点,而粒子群优化算法作为一种模拟鸟群觅食行为的群体智能算法,在求解此类复杂优化问题时展现出独特的优势。传统粒子群算法在迭代过程中采用固定的参数配置,这种策略难以适应优化过程不同阶段的差异化需求,导致算法在全局探索与局部开发之间难以取得理想的平衡。针对这一问题,分阶段粒子群优化算法将整个迭代过程划分为两个具有明显特征差异的阶段,并针对每个阶段的特点设计相应的参数调整策略。
在迭代前期阶段,算法需要具备较强的全局搜索能力,以便在整个解空间中广泛探索潜在的优质解区域。为实现这一目标,采用非线性变化的惯性权重策略,惯性权重在迭代初期保持较大数值,使得粒子能够继承更多的历史速度信息,从而具备更强的探索能力。随着迭代次数的增加,惯性权重按照非线性规律逐渐减小,这种变化方式相比线性递减策略能够更好地适应算法的实际收敛状态。学习因子的设计同样遵循非线性变化原则,个体学习因子在迭代前期相对较大,鼓励粒子更多地依赖自身的历史最优经验进行探索;社会学习因子则相对较小,避免粒子过早地向群体最优位置聚集而陷入局部最优。
迭代后期阶段的主要任务是在已发现的优质解区域内进行精细搜索,以期找到更加精确的最优解。为提升算法在该阶段的收敛性能,引入收缩因子机制对粒子的速度更新进行约束。收缩因子的作用在于控制粒子速度的增长幅度,避免粒子在接近最优解时出现振荡现象,从而加速算法的收敛过程。收缩因子的计算基于学习因子的总和,当学习因子之和满足特定条件时,收缩因子能够有效地限制粒子的速度范围,使得粒子群在最优解附近形成稳定的收敛态势。
分阶段策略的阶段划分依据迭代次数与最大迭代次数的比值确定,当该比值小于预设阈值时,算法处于前期阶段,执行非线性参数变化策略;当比值超过阈值后,算法进入后期阶段,启用收缩因子机制。阈值的选择需要根据具体问题的特性进行调整,通常设置在总迭代次数的百分之六十至百分之八十之间,以确保算法有足够的迭代次数完成全局探索并实现有效收敛。在投资组合优化问题中,分阶段策略的优势尤为明显,因为投资组合的解空间通常具有多个局部最优区域,传统算法容易在迭代过程中陷入这些局部最优而无法找到全局最优的资产配置方案。
(2)基于策略梯度的粒子群优化算法构建
人工配置粒子群算法参数的方式存在明显的局限性,主要体现在参数设置的静态性和缺乏自适应能力两个方面。静态参数配置难以根据算法运行过程中的实际状态进行动态调整,导致算法性能在不同问题和不同迭代阶段表现不稳定。为解决这一问题,将深度强化学习中的策略梯度方法引入粒子群优化算法,构建一个能够自主学习参数调整策略的智能优化框架。
策略梯度粒子群优化算法的核心思想是构建一个策略神经网络,该网络接收粒子群的当前状态信息作为输入,输出各关键参数的调整值。状态信息的设计需要能够全面反映粒子群的搜索状态,通常包括当前迭代次数占总迭代次数的比例、粒子群的平均适应度值、最优粒子的适应度值、粒子群的多样性指标等。这些状态特征能够帮助策略网络准确判断当前的优化进程,从而做出合理的参数调整决策。
策略神经网络采用多层感知机结构,输入层节点数等于状态特征的维度,输出层节点数等于需要调整的参数数量,中间设置若干隐藏层以提升网络的表达能力。网络的激活函数选择对输出参数的范围具有重要影响,对于惯性权重等需要限制在特定区间内的参数,输出层采用sigmoid激活函数并进行线性变换;对于学习因子等参数,则采用softplus激活函数确保输出值为正。网络的训练采用策略梯度算法,通过最大化累积奖励来优化网络参数,奖励函数的设计直接关系到算法的优化效果。
奖励函数的设计需要综合考虑算法的收敛速度和解的质量两个方面。在每次参数更新后,根据粒子群适应度的改善程度计算即时奖励,适应度改善越大,奖励值越高。同时引入惩罚机制,当粒子群多样性过低或算法出现停滞现象时,给予负奖励以促使策略网络调整参数配置。累积奖励采用折扣累加的方式计算,折扣因子的设置影响算法对短期收益和长期收益的权衡,通常设置在零点九至零点九九之间。
策略网络与粒子群的交互学习过程采用在线更新方式,即在优化过程中不断收集状态动作奖励序列,并利用这些经验数据更新网络参数。为提升学习效率和稳定性,引入经验回放机制存储历史交互数据,并采用小批量采样的方式进行网络更新。策略梯度的计算采用REINFORCE算法,通过蒙特卡洛采样估计梯度期望,并引入基线函数减小梯度估计的方差。经过充分训练的策略网络能够根据粒子群的实时状态自动调整参数配置,有效提升算法的智能性和自适应能力。
(3)改进策略梯度粒子群算法及投资组合优化应用
在基础策略梯度粒子群优化算法的基础上,借鉴分阶段策略的思想进行进一步改进,形成改进的策略梯度粒子群优化算法。改进算法的核心创新在于为优化过程的不同阶段分别训练独立的策略网络,使得参数自适应调整策略能够更好地匹配各阶段的优化需求。前期阶段的策略网络侧重于学习如何维持粒子群的多样性并促进全局探索,后期阶段的策略网络则专注于学习如何加速收敛并提升解的精度。
两阶段策略网络的训练采用分离训练策略,即首先收集算法在前期阶段的交互数据训练前期策略网络,然后固定前期策略网络,收集后期阶段的交互数据训练后期策略网络。这种训练方式避免了单一网络需要同时学习两种差异较大的策略而导致的学习困难问题。阶段切换的判断引入软切换机制,通过设置过渡区间使得两阶段策略的切换更加平滑,避免参数的剧烈变化对算法稳定性造成影响。
在投资组合优化问题的实际应用中,选取上证五十指数和深证一百指数中的代表性股票构建投资组合。股票的选择覆盖金融、科技、消费、医药、能源等多个行业领域,以确保投资组合具有较好的分散性和代表性。历史数据的时间跨度选择近五年的日收益率数据,以获取充足的样本量进行模型训练和验证。数据预处理包括缺失值填补、异常值处理和收益率标准化等步骤,确保数据质量满足模型要求。
投资组合优化模型以夏普比率最大化作为优化目标,夏普比率综合考虑了投资组合的期望收益和风险水平,是衡量投资绩效的重要指标。约束条件包括各资产权重之和等于一、单一资产权重的上下限约束、以及可能的行业集中度约束等。优化算法的编码方式采用实数编码,每个粒子表示一种资产配置方案,粒子的各维度分量对应相应资产的投资权重。适应度函数计算组合的夏普比率值,并通过罚函数方法处理约束违反情况。
import numpy as np import matplotlib.pyplot as plt class DSDE_MOPSO: def __init__(self, dim, pop_size, max_iter, obj_funcs, constraints=None): self.dim = dim self.pop_size = pop_size self.max_iter = max_iter self.obj_funcs = obj_funcs self.constraints = constraints or (lambda x: True) self.pop = np.random.rand(pop_size, dim) * 10 - 5 self.vel = np.random.randn(pop_size, dim) * 0.5 self.p_best = self.pop.copy() self.p_best_objs = np.array([[f(x) for f in obj_funcs] for x in self.pop]) self.archive = [] self.archive_objs = [] self.crowd_dist = np.zeros(0) def cal_crowd_dist(self, objs): n = len(objs) dist = np.zeros(n) for i in range(len(self.obj_funcs)): sorted_idx = np.argsort(objs[:, i]) dist[sorted_idx[0]] = dist[sorted_idx[-1]] = np.inf for j in range(1, n-1): dist[sorted_idx[j]] += (objs[sorted_idx[j+1], i] - objs[sorted_idx[j-1], i]) / (np.max(objs[:, i]) - np.min(objs[:, i]) + 1e-6) return dist def update_archive(self, new_objs, new_pop): combined_pop = np.vstack([self.archive, new_pop]) combined_objs = np.vstack([self.archive_objs, new_objs]) non_dominated = [] for i in range(len(combined_objs)): dominated = False for j in range(len(combined_objs)): if i != j and np.all(combined_objs[j] <= combined_objs[i]) and np.any(combined_objs[j] < combined_objs[i]): dominated = True break if not dominated and self.constraints(combined_pop[i]): non_dominated.append(i) self.archive = combined_pop[non_dominated] self.archive_objs = combined_objs[non_dominated] if len(self.archive) > 200: crowd_dist = self.cal_crowd_dist(self.archive_objs) sorted_idx = np.argsort(crowd_dist)[::-1] self.archive = self.archive[sorted_idx[:200]] self.archive_objs = self.archive_objs[sorted_idx[:200]] def get_guide_particle(self): if len(self.archive) == 0: return self.pop[np.random.randint(self.pop_size)] crowd_dist = self.cal_crowd_dist(self.archive_objs) max_dist_idx = np.argmax(crowd_dist) return self.archive[max_dist_idx] def update(self): for t in range(self.max_iter): current_objs = np.array([[f(x) for f in self.obj_funcs] for x in self.pop]) self.update_archive(current_objs, self.pop) for i in range(self.pop_size): if self.constraints(self.pop[i]): if np.any(current_objs[i] < self.p_best_objs[i]) or (np.all(current_objs[i] == self.p_best_objs[i]) and np.random.rand() < 0.5): self.p_best[i] = self.pop[i].copy() self.p_best_objs[i] = current_objs[i].copy() guide = self.get_guide_particle() dist_to_front = np.min(np.linalg.norm(current_objs[i] - self.archive_objs, axis=1)) if len(self.archive_objs) > 0 else 1.0 speed_factor = 1.5 if dist_to_front > 0.5 else 0.8 w = 0.9 - 0.5 * t / self.max_iter c1 = 1.5 + np.random.rand() * 0.5 c2 = 1.5 + np.random.rand() * 0.5 self.vel[i] = w * self.vel[i] * speed_factor + c1 * np.random.rand() * (self.p_best[i] - self.pop[i]) + c2 * np.random.rand() * (guide - self.pop[i]) self.pop[i] += self.vel[i] if np.random.rand() < 0.1 and len(self.archive) > 10: idx1, idx2 = np.random.choice(len(self.archive), 2, replace=False) self.pop[i] = 0.5 * self.archive[idx1] + 0.5 * self.archive[idx2] + np.random.randn(self.dim) * 0.1 return self.archive, self.archive_objs class CECG_MOPSO: def __init__(self, dim, pop_size, max_iter, obj_funcs, constraints=None): self.dim = dim self.pop_size = pop_size self.max_iter = max_iter self.obj_funcs = obj_funcs self.constraints = constraints or (lambda x: True) self.pop = np.random.rand(pop_size, dim) * 10 - 5 self.vel = np.random.randn(pop_size, dim) * 0.5 self.p_best = self.pop.copy() self.p_best_objs = np.array([[f(x) for f in obj_funcs] for x in self.pop]) self.cross_gen_archive = [] self.cross_gen_objs = [] self.current_archive = [] self.current_objs = [] def cal_crowd_and_contribution(self, objs): n = len(objs) crowd_dist = np.zeros(n) for i in range(len(self.obj_funcs)): sorted_idx = np.argsort(objs[:, i]) crowd_dist[sorted_idx[0]] = crowd_dist[sorted_idx[-1]] = np.inf for j in range(1, n-1): crowd_dist[sorted_idx[j]] += (objs[sorted_idx[j+1], i] - objs[sorted_idx[j-1], i]) / (np.max(objs[:, i]) - np.min(objs[:, i]) + 1e-6) contribution = np.ones(n) for i in range(n): for j in range(n): if i != j and np.all(objs[j] <= objs[i]): contribution[i] *= 0.8 return crowd_dist, contribution def elite_competition(self): all_elites = np.vstack([self.cross_gen_archive, self.current_archive]) all_objs = np.vstack([self.cross_gen_objs, self.current_objs]) non_dominated = [] for i in range(len(all_objs)): dominated = False for j in range(len(all_objs)): if i != j and np.all(all_objs[j] <= all_objs[i]) and np.any(all_objs[j] < all_objs[i]): dominated = True break if not dominated and self.constraints(all_elites[i]): non_dominated.append(i) all_elites = all_elites[non_dominated] all_objs = all_objs[non_dominated] if len(all_elites) == 0: return self.pop[np.random.randint(self.pop_size)] crowd_dist, contribution = self.cal_crowd_and_contribution(all_objs) score = crowd_dist * contribution best_idx = np.argmax(score) return all_elites[best_idx] def update_archives(self, current_objs, current_pop): self.current_archive = [] self.current_objs = [] for i in range(len(current_pop)): if self.constraints(current_pop[i]): dominated = False for j in range(len(current_pop)): if i != j and np.all(current_objs[j] <= current_objs[i]) and np.any(current_objs[j] < current_objs[i]): dominated = True break if not dominated: self.current_archive.append(current_pop[i]) self.current_objs.append(current_objs[i]) self.current_archive = np.array(self.current_archive) self.current_objs = np.array(self.current_objs) self.cross_gen_archive = np.vstack([self.cross_gen_archive, self.current_archive]) if len(self.cross_gen_archive) > 0 else self.current_archive.copy() self.cross_gen_objs = np.vstack([self.cross_gen_objs, self.current_objs]) if len(self.cross_gen_objs) > 0 else self.current_objs.copy() if len(self.cross_gen_archive) > 300: crowd_dist, _ = self.cal_crowd_and_contribution(self.cross_gen_objs) sorted_idx = np.argsort(crowd_dist)[::-1] self.cross_gen_archive = self.cross_gen_archive[sorted_idx[:300]] self.cross_gen_objs = self.cross_gen_objs[sorted_idx[:300]] def update(self): for t in range(self.max_iter): current_objs = np.array([[f(x) for f in self.obj_funcs] for x in self.pop]) self.update_archives(current_objs, self.pop) global_guide = self.elite_competition() diversity = np.std(current_objs, axis=0).mean() for i in range(self.pop_size): if self.constraints(self.pop[i]): if np.any(current_objs[i] < self.p_best_objs[i]) or (np.all(current_objs[i] == self.p_best_objs[i]) and np.random.rand() < 0.5): self.p_best[i] = self.pop[i].copy() self.p_best_objs[i] = current_objs[i].copy() w = 0.8 - 0.4 * t / self.max_iter c1 = 1.2 + (diversity / 5.0) c2 = 1.8 - (diversity / 5.0) self.vel[i] = w * self.vel[i] + c1 * np.random.rand() * (self.p_best[i] - self.pop[i]) + c2 * np.random.rand() * (global_guide - self.pop[i]) self.pop[i] += self.vel[i] if diversity < 0.3 and np.random.rand() < 0.2: self.pop[i] += np.random.randn(self.dim) * 0.3 return self.cross_gen_archive, self.cross_gen_objs # 卫星轨道优化目标函数 def fuel_consumption(x): v0, v1, t = x[0], x[1], x[2] return np.abs(v0**2 + v1**2) * t def transfer_time(x): return np.abs(x[2]) + 0.1 * np.sum(np.abs(x[:2])) def orbit_constraint(x): return np.all(x[:2] >= -3) and np.all(x[:2] <= 3) and x[2] >= 1 and x[2] <= 10 obj_funcs = [fuel_consumption, transfer_time] dsde_mopso = DSDE_MOPSO(dim=3, pop_size=100, max_iter=200, obj_funcs=obj_funcs, constraints=orbit_constraint) dsde_archive, dsde_objs = dsde_mopso.update() cecg_mopso = CECG_MOPSO(dim=3, pop_size=100, max_iter=200, obj_funcs=obj_funcs, constraints=orbit_constraint) cecg_archive, cecg_objs = cecg_mopso.update() plt.scatter(dsde_objs[:,0], dsde_objs[:,1], label='DSDE-MOPSO') plt.scatter(cecg_objs[:,0], cecg_objs[:,1], label='CECG-MOPSO', alpha=0.7) plt.xlabel('Fuel Consumption') plt.ylabel('Transfer Time') plt.legend() plt.show()成品代码50-200,定制300起,可以直接沟通
👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇