SortedSet和SkipList的Python实现代码

首先AI实现了一个SkipList类,最后基于这个SkipList类,又实现了一个SortedSet类。

import random import bisect class SkipListNode: """跳表节点""" def __init__(self, score, level): self.score = score # 分数 self.members = [] # 该分数对应的元素列表 self.next = [None] * level # 各层的后继节点
class SkipList: """跳表实现""" def __init__(self, max_level=32, p=0.25): """ 初始化跳表 max_level: 最大层数 p: 节点晋升概率 """ self.max_level = max_level # 最大层数 self.p = p # 节点晋升概率 self.level = 1 # 当前跳表层数 # 创建头节点,分数为负无穷,层数为max_level self.header = SkipListNode(float('-inf'), max_level) def _random_level(self): """生成随机层数""" level = 1 # 以p的概率晋升层数,最多到max_level while random.random() < self.p and level < self.max_level: level += 1 return level def search(self, score): """ 查找指定分数对应的元素列表 return: 元素列表或None """ current = self.header # 从最高层开始查找 for i in range(self.level - 1, -1, -1): # 在当前层找到第一个分数不小于目标分数的节点 while current.next[i] and current.next[i].score < score: current = current.next[i] # 移动到下一层 current = current.next[0] # 检查是否找到目标分数 if current and current.score == score: return current.members return None def insert(self, score, members): """ 插入分数和对应的元素列表 """ # 创建更新数组,记录每一层需要更新的节点 update = [None] * self.max_level current = self.header # 从最高层开始查找,记录每一层的前驱节点 for i in range(self.level - 1, -1, -1): while current.next[i] and current.next[i].score < score: current = current.next[i] update[i] = current # 移动到第0层,检查分数是否已存在 current = current.next[0] # 如果分数已存在,更新对应的元素列表 if current and current.score == score: current.members = members return # 生成新节点的随机层数 new_level = self._random_level() # 如果新节点的层数超过当前跳表的层数,更新跳表层数 if new_level > self.level: for i in range(self.level, new_level): update[i] = self.header self.level = new_level # 创建新节点 new_node = SkipListNode(score, new_level) new_node.members = members # 更新各层的指针 for i in range(new_level): new_node.next[i] = update[i].next[i] update[i].next[i] = new_node def delete(self, score): """ 删除指定分数对应的节点 return: True if deleted, False otherwise """ # 创建更新数组,记录每一层需要更新的节点 update = [None] * self.max_level current = self.header # 从最高层开始查找 for i in range(self.level - 1, -1, -1): while current.next[i] and current.next[i].score < score: current = current.next[i] update[i] = current # 移动到第0层,检查分数是否存在 current = current.next[0] # 如果分数不存在,返回False if not current or current.score != score: return False # 更新各层的指针,删除节点 for i in range(self.level): if update[i].next[i] != current: break update[i].next[i] = current.next[i] # 更新跳表的层数 while self.level > 1 and self.header.next[self.level - 1] is None: self.level -= 1 return True def traverse(self): """ 遍历跳表,返回所有(score, members)对,按分数升序排列 """ result = [] current = self.header.next[0] while current: result.append((current.score, current.members)) current = current.next[0] return result def range_search(self, min_score, max_score): """ 返回分数在[min_score, max_score]范围内的所有(score, members)对 """ result = [] current = self.header # 从最高层开始查找,找到第一个分数不小于min_score的节点 for i in range(self.level - 1, -1, -1): while current.next[i] and current.next[i].score < min_score: current = current.next[i] # 移动到第0层 current = current.next[0] # 收集分数在范围内的所有节点 while current and current.score <= max_score: result.append((current.score, current.members)) current = current.next[0] return result
# 基于SkipList实现的有序集合(SortedSet) class SortedSet: """基于SkipList实现的有序集合(SortedSet)""" def __init__(self, max_level=32, p=0.25): """ 初始化有序集合 max_level: 跳表的最大层数 p: 节点晋升概率 """ # SkipList,键为分数(score),值为具有该分数的元素列表(保持有序) self.skiplist = SkipList(max_level=max_level, p=p) # 反向映射,元素到分数的映射,用于快速查找元素的分数 self.member_to_score = {} # 元素总数 self.size = 0 def zadd(self, score, member): """ 添加元素到有序集合 score: 元素的分数 member: 元素 return: 1 if member是新添加的,0 if member已存在并更新了分数 """ is_new_member = member not in self.member_to_score old_score = self.member_to_score.get(member) # 如果元素已存在且分数相同,无需操作 if old_score is not None and old_score == score: return 0 # 如果元素已存在但分数不同,先删除旧的分数关联 if old_score is not None: # 从旧分数对应的元素列表中移除该元素 members = self.skiplist.search(old_score) if members: idx = bisect.bisect_left(members, member) if idx < len(members) and members[idx] == member: del members[idx] # 如果列表为空,删除该分数 if not members: self.skiplist.delete(old_score) else: # 更新分数对应的元素列表 self.skiplist.insert(old_score, members) # 添加或更新元素的分数映射 self.member_to_score[member] = score # 获取或创建新分数对应的元素列表 members = self.skiplist.search(score) if members is None: members = [] # 二分查找插入位置,保持列表有序 idx = bisect.bisect_left(members, member) if idx >= len(members) or members[idx] != member: bisect.insort_left(members, member) # 插入或更新分数对应的元素列表 self.skiplist.insert(score, members) # 如果是新元素,增加大小 if is_new_member: self.size += 1 return 1 return 0 def zadd_multi(self, *args): """ 批量添加元素,格式:zadd_multi(score1, member1, score2, member2, ...) return: 添加的新元素数量 """ if len(args) % 2 != 0: raise ValueError("参数数量必须为偶数") added = 0 for i in range(0, len(args), 2): score = args[i] member = args[i+1] added += self.zadd(score, member) return added def zscore(self, member): """ 获取元素的分数 return: 元素的分数,如果元素不存在返回None """ return self.member_to_score.get(member) def zcard(self): """ 获取有序集合的成员数量 return: 成员数量 """ return self.size def zrem(self, member): """ 从有序集合中删除元素 return: 1 if member存在并被删除,0 if member不存在 """ score = self.member_to_score.pop(member, None) if score is None: return 0 # 从分数对应的元素列表中移除该元素 members = self.skiplist.search(score) if members: idx = bisect.bisect_left(members, member) if idx < len(members) and members[idx] == member: del members[idx] self.size -= 1 # 如果列表为空,删除该分数 if not members: self.skiplist.delete(score) else: # 更新分数对应的元素列表 self.skiplist.insert(score, members) return 1 return 0 def zrange(self, start, end, withscores=False): """ 按分数从小到大返回指定范围的元素 start: 起始索引 end: 结束索引 withscores: 是否返回分数 return: 元素列表或(元素, 分数)元组列表 """ if self.size == 0: return [] # 处理索引 if start < 0: start = max(0, self.size + start) if end < 0: end = self.size + end else: end = min(end, self.size - 1) if start > end: return [] # 计算需要返回的元素数量 count = end - start + 1 result = [] current_idx = 0 # 直接遍历跳表,不需要先收集所有元素 current = self.skiplist.header.next[0] while current: members = current.members score = current.score # 如果当前分数段的所有元素都在start之前,跳过 if current_idx + len(members) <= start: current_idx += len(members) current = current.next[0] continue # 遍历当前分数段的元素 for member in members: if current_idx < start: current_idx += 1 continue if current_idx > end: break if withscores: result.append((member, score)) else: result.append(member) current_idx += 1 if len(result) == count: return result if current_idx > end: break current = current.next[0] return result def zrevrange(self, start, end, withscores=False): """ 按分数从大到小返回指定范围的元素 start: 起始索引 end: 结束索引 withscores: 是否返回分数 return: 元素列表或(元素, 分数)元组列表 """ if self.size == 0: return [] # 处理索引 if start < 0: start = max(0, self.size + start) if end < 0: end = self.size + end else: end = min(end, self.size - 1) if start > end: return [] # 计算需要返回的元素数量 count = end - start + 1 # 收集所有元素到列表 all_elements = [] current = self.skiplist.header.next[0] while current: score = current.score for member in current.members: all_elements.append((member, score)) current = current.next[0] # 直接从列表末尾开始取元素,不需要反转 result = [] total = len(all_elements) start_pos = total - 1 - end end_pos = total - 1 - start # 确保范围有效 if start_pos > end_pos: return [] # 提取需要的元素 for i in range(end_pos, start_pos - 1, -1): member, score = all_elements[i] if withscores: result.append((member, score)) else: result.append(member) if len(result) == count: break return result def zrangebyscore(self, min_score, max_score, withscores=False, limit=None): """ 按分数范围返回元素(从小到大) min_score: 最小分数 max_score: 最大分数 withscores: 是否返回分数 limit: (offset, count) 用于分页 return: 元素列表或(元素, 分数)元组列表 """ # 应用limit的默认值 offset = 0 count = None if limit: offset, count = limit # 使用跳表的range_search获取分数范围内的所有项 items = self.skiplist.range_search(min_score, max_score) # 转换为扁平化的元素列表,提前应用limit flat_list = [] current_offset = 0 for score, members in items: for member in members: # 跳过offset个元素 if current_offset < offset: current_offset += 1 continue # 添加元素 flat_list.append((member, score)) # 如果count不为None且已收集足够元素,提前返回 if count is not None and len(flat_list) >= count: break # 如果count不为None且已收集足够元素,提前返回 if count is not None and len(flat_list) >= count: break # 根据withscores参数返回不同格式 if withscores: return flat_list else: return [item[0] for item in flat_list] def zrevrangebyscore(self, max_score, min_score, withscores=False, limit=None): """ 按分数范围返回元素(从大到小) max_score: 最大分数 min_score: 最小分数 withscores: 是否返回分数 limit: (offset, count) 用于分页 return: 元素列表或(元素, 分数)元组列表 """ # 应用limit的默认值 offset = 0 count = None if limit: offset, count = limit # 使用跳表的range_search获取分数范围内的所有项 items = self.skiplist.range_search(min_score, max_score) # 先收集所有元素,然后反转 all_elements = [] for score, members in items: for member in members: all_elements.append((member, score)) # 反转列表 all_elements.reverse() # 应用limit,提前终止 flat_list = [] current_offset = 0 for element in all_elements: # 跳过offset个元素 if current_offset < offset: current_offset += 1 continue # 添加元素 flat_list.append(element) # 如果count不为None且已收集足够元素,提前返回 if count is not None and len(flat_list) >= count: break # 根据withscores参数返回不同格式 if withscores: return flat_list else: return [item[0] for item in flat_list] def zrank(self, member): """ 获取元素的排名(升序,从0开始) return: 排名,如果元素不存在返回None """ score = self.member_to_score.get(member) if score is None: return None # 获取所有分数和对应的元素 all_items = self.skiplist.traverse() # 计算排名 rank = 0 for s, members in all_items: if s < score: # 累加之前所有分数的元素数量 rank += len(members) elif s == score: # 找到目标分数,计算该分数内的排名 # 由于members已经是有序的,可以使用二分查找 idx = bisect.bisect_left(members, member) if idx < len(members) and members[idx] == member: return rank + idx break else: # 已经超过目标分数,无需继续遍历 break return None def zrevrank(self, member): """ 获取元素的排名(降序,从0开始) return: 排名,如果元素不存在返回None """ # 获取升序排名 asc_rank = self.zrank(member) if asc_rank is None: return None # 降序排名 = 总元素数 - 升序排名 - 1 return self.size - asc_rank - 1 def zincrby(self, increment, member): """ 增加元素的分数 increment: 增加的分数 member: 元素 return: 新的分数 """ current_score = self.member_to_score.get(member, 0) new_score = current_score + increment self.zadd(new_score, member) return new_score def zremrangebyrank(self, start, end): """ 删除指定排名范围的元素(升序) start: 起始排名 end: 结束排名 return: 删除的元素数量 """ if self.size == 0: return 0 # 处理索引 if start < 0: start = max(0, self.size + start) if end < 0: end = self.size + end else: end = min(end, self.size - 1) if start > end: return 0 # 计算需要删除的元素数量 count = end - start + 1 # 获取要删除的元素 to_delete = self.zrange(start, end) # 删除元素 deleted = 0 for member in to_delete: if self.zrem(member): deleted += 1 return deleted def zremrangebyscore(self, min_score, max_score): """ 删除指定分数范围的元素 min_score: 最小分数 max_score: 最大分数 return: 删除的元素数量 """ if self.size == 0: return 0 # 获取分数范围内的所有元素 to_delete = self.zrangebyscore(min_score, max_score) # 删除元素 deleted = 0 for member in to_delete: if self.zrem(member): deleted += 1 return deleted def zcount(self, min_score, max_score): """ 统计指定分数范围内的元素数量 min_score: 最小分数 max_score: 最大分数 return: 元素数量 """ # 获取分数范围内的所有元素 items = self.skiplist.range_search(min_score, max_score) # 计算元素总数 count = 0 for _, members in items: count += len(members) return count

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1165192.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Coding Agent 中 Skills、MCP、Prompt、SubAgent 的基本概念和定义

Coding Agent 中 Skills、MCP、Prompt、SubAgent 的基本概念和定义 文章目录 Coding Agent 中 Skills、MCP、Prompt、SubAgent 的基本概念和定义 1. MCP(Model Context Protocol,模型上下文协议) 定义 核心架构 工作流程 MCP Server提供的功能类型 与Function Calling的区别…

【课程设计/毕业设计】基于Web的网上购物商城的设计与实现基于SpringBoot的网上购物商城设计与实现【附源码、数据库、万字文档】

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

【更新】量子遗传算法-遗传粒子群-混沌粒子群附Matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;擅长数据处理、建模仿真、程序设计、完整代码获取、论文复现及科研仿真。 &#x1f34e; 往期回顾关注个人主页&#xff1a;Matlab科研工作室 &#x1f447; 关注我领取海量matlab电子书和数学建模资料 &#x1…

MindSpore模型推理加速实战

&#x1f493; 博客主页&#xff1a;借口的CSDN主页 ⏩ 文章专栏&#xff1a;《热点资讯》 MindSpore模型推理加速实战&#xff1a;边缘设备能效优化新范式目录MindSpore模型推理加速实战&#xff1a;边缘设备能效优化新范式 引言 一、推理加速的核心挑战&#xff1a;能效失衡的…

一文搞懂:AI上下文理解中的实体链接技术

一文搞懂:AI上下文理解中的实体链接技术 引言:从日常对话到AI理解的鸿沟 "帮我预订明天去北京的机票,顺便查查三里屯附近有什么好吃的日料店。“这句看似简单的人类对话,对AI系统而言却蕴含着巨大的理解挑战。其中"北京”、“三里屯”、"日料店"这些…

零基础入门 Go 语言

作为一名长期深耕Java生态的开发者&#xff0c;你或许早已习惯了JVM的繁琐配置、GC的调优难题、高并发场景下线程池的复杂管控。而Go语言&#xff08;Golang&#xff09;自2009年由Google推出以来&#xff0c;凭借“简单、高效、天生支持并发”的特性&#xff0c;迅速成为云原生…

强烈安利MBA必看!10个一键生成论文工具深度测评

强烈安利MBA必看&#xff01;10个一键生成论文工具深度测评 2026年MBA论文写作工具测评&#xff1a;为什么你需要这份榜单&#xff1f; MBA学习过程中&#xff0c;论文写作是每位学生必须面对的挑战。从选题构思到文献综述&#xff0c;再到数据分析与结论撰写&#xff0c;每一…

java.io.IOException: Previous writer likely failed to write hdfs报错解决方案

本文已收录在Github&#xff0c;关注我&#xff0c;紧跟本系列专栏文章&#xff0c;咱们下篇再续&#xff01; &#x1f680; 魔都架构师 | 全网30W技术追随者&#x1f527; 大厂分布式系统/数据中台实战专家&#x1f3c6; 主导交易系统百万级流量调优 & 车联网平台架构&a…

CameraLink 一个连接器的26个信号线

方向核心功能描述配置说明1GND电源-接地引脚PoCL 模式可复用为 12V 供电2CC4-LVDS 差分采集卡→相机相机控制信号 4&#xff08;负极&#xff09;用于相机参数配置、触发控制3CC4LVDS 差分采集卡→相机相机控制信号 4&#xff08;正极&#xff09;与 Pin2 组成 CC4 差分对4CC3…

连锁火锅智慧餐饮管理系统python后台-计算机毕业设计源码+LW文档

一、选题意义 随着信息技术的快速发展和餐饮行业竞争的加剧&#xff0c;传统的餐饮管理方式已难以满足连锁火锅企业的需求。智慧餐饮管理系统能够利用现代互联网、物联网等技术手段对火锅企业的各个运营环节进行高效管理。这有助于提高连锁火锅企业的运营效率&#xff0c;减少人…

鸟类保护管理系统小程序-计算机毕业设计源码+LW文档

摘 要 当今社会正处于科技进步与经济社会迅猛发展的全新阶段&#xff0c;国际间的信息交流与学术互动日益频繁。计算机技术对经济社会的发展和民众生活质量的提升产生了深远影响&#xff0c;同时也悄然改变着人类的生存方式与思维模式。传统鸟博士依赖于人工管理方式&#x…

师大校友惠超市管理系统微信小程序-计算机毕业设计源码+LW文档

摘 要 随着时代的迅猛发展&#xff0c;各行各业都在积极采纳先进技术以提升自身实力和竞争优势&#xff0c;师大校友惠超市管理系统自然也不例外。这款师大校友惠超市管理的开发&#xff0c;是基于实际应用需求与软件工程原理&#xff0c;运用了微信开发者工具、Java编程语言以…

校园食堂点餐小程序-计算机毕业设计源码+LW文档

摘要 当前社会&#xff0c;随着人们生活质量的提高和思想观念的演进&#xff0c;加之经济全球化的推动&#xff0c;互联网技术正以前所未有的速度提高社会综合发展的效能。这一技术正广泛渗透到各行各业中&#xff0c;而传统管理方式已经不能对时间和地点的严格限制而显得力不从…

【车间调度】基于粒子群算法求解置换流水车间调度问题PFSP附Matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;擅长数据处理、建模仿真、程序设计、完整代码获取、论文复现及科研仿真。 &#x1f34e; 往期回顾关注个人主页&#xff1a;Matlab科研工作室 &#x1f447; 关注我领取海量matlab电子书和数学建模资料 &#x1…

【数据库】【MySQL】事务隔离深度解析:MVCC 实现与幻读解决机制

MySQL 事务隔离深度解析&#xff1a;MVCC 实现与幻读解决机制 MySQL InnoDB 引擎通过 MVCC&#xff08;多版本并发控制&#xff09; 与 Next-Key Lock 的精密组合&#xff0c;在保障事务隔离性的同时实现了高性能并发。本文将深入剖析其实现原理与演进机制。一、事务隔离级别与…

Jina Embeddings v4: 多模态多语言检索的通用向量

作者&#xff1a;Elastic JINA.ai Jina Embeddings v4 是一个 38 亿参数的通用向量模型&#xff0c;用于多模态多语言检索&#xff0c;支持单向量和多向量输出。 今天&#xff08;2025年6月25日&#xff09;我们发布了 jina-embeddings-v4&#xff0c;这是我们新的 38 亿参数通…

RocketMQ延迟消息实现原理解析

一、核心原理概述RocketMQ的延迟消息实现采用 "预置延迟等级 定时扫描转发" 的机制&#xff0c;并非真正的实时延迟&#xff0c;就是通过预定延迟等级将消息暂存到特定队列&#xff0c;等待时间到达后再投递给消费者。1. 实现方式RocketMQ 将延时消息转换为普通消息…

django-flask基于python的高校在线考试系统设计与实现

目录高校在线考试系统设计与实现摘要关于博主开发技术路线相关技术介绍核心代码参考示例结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;高校在线考试系统设计与实现摘要 随着信息技术的快速发展&#xff0c;传统纸质考试方式在效率、…

架构 CPU SOC 核心板

1. 架构 & CPU & SOC 先有架构&#xff0c;再有内核&#xff0c;一个架构可以衍生出多种内核 内核之所以称之为内核&#xff0c;是因为他是在SOC、MCU内部中最核心的逻辑处理部分&#xff0c;就是SOC、MCU的CPU。所以内核也可以叫做处理器。 别的公司可以向ARM公司购买…

【计算机毕业设计案例】基于JavaSpribgBoot的水果生鲜团购平台基于SpribgBoot的生鲜团购平台(程序+文档+讲解+定制)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…