golang线程池ants-实现架构

1、总体架构

ants协程池,在使用上有多种方式(使用方式参考这篇文章:golang线程池ants-四种使用方法),但是在实现的核心就一个,如下架构图:

总的来说,就是三个数据结构: Pool、WorkerStack、goWorker以及这三个结构实现的方法,了解了这些,基本上对ants的实现原理就了如指掌了。

2、详细实现

2.1 worker的设计实现

worker结构如下:

type goWorker struct {// pool who owns this worker.pool *Pool// task is a job should be done.task chan func()// lastUsed will be updated when putting a worker back into queue.lastUsed time.Time
}

该结构设计非常简单,三个成员:归属的线程池、执行函数、该worker最后一次运行时间,goWorker结构实现如下接口:

type worker interface {run()finish()lastUsedTime() time.TimeinputFunc(func())inputParam(interface{})
}

核心函数run,该函数从管道task里获取到任务函数,并执行,执行完成后,将此worker放回协程池(此时worker阻塞等待任务到来,调用函数:w.pool.revertWorker(w)放回池子中),以便复用:

func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {w.pool.once.Do(func() {close(w.pool.allDone)})}w.pool.workerCache.Put(w)if p := recover(); p != nil {if ph := w.pool.options.PanicHandler; ph != nil {ph(p)} else {w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())}}// Call Signal() here in case there are goroutines waiting for available workers.w.pool.cond.Signal()}()for f := range w.task {if f == nil {return}f()if ok := w.pool.revertWorker(w); !ok {return}}}()
}

finish函数,调用该函数,代表此worker的生命周期结束:

func (w *goWorker) finish() {w.task <- nil
}

这个时候run函数从遍历task管道中结束,进入defer函数,worker放入workerCache,备用。

inputFunc很容易理解,将任务放入管道,让worker去执行:

func (w *goWorker) inputFunc(fn func()) {w.task <- fn
}

2.2 workerStack结构

type workerStack struct {items  []workerexpiry []worker
}

该结构就两个成员,都为worker的切片,items切片用于存储正常执行的worker,expiry存放过期的worker,workStack结构实现了如下接口:

type workerQueue interface {len() intisEmpty() boolinsert(worker) errordetach() workerrefresh(duration time.Duration) []worker // clean up the stale workers and return themreset()
}

len函数:返回正在运行worker的长度

isEmpty函数:判断是否有正在运行的worker

insert函数:将worker插入切片。

detach函数:获取一个worker。

refresh:更新所有worker,淘汰过期worker。

reset:清除所有worker。

重点看refresh函数:

func (wq *workerStack) refresh(duration time.Duration) []worker {n := wq.len()if n == 0 {return nil}expiryTime := time.Now().Add(-duration)index := wq.binarySearch(0, n-1, expiryTime)wq.expiry = wq.expiry[:0]if index != -1 {wq.expiry = append(wq.expiry, wq.items[:index+1]...)m := copy(wq.items, wq.items[index+1:])for i := m; i < n; i++ {wq.items[i] = nil}wq.items = wq.items[:m]}return wq.expiry
}

 这个函数用于根据给定的时间间隔duration来刷新工作队列中的过期项。主要执行以下步骤:

  1. 获取队列长度:首先,通过调用wq.len()获取工作队列wq中当前元素的数量n。如果队列为空(即n == 0),则直接返回nil,表示没有过期项。

  2. 计算过期时间:通过time.Now().Add(-duration)计算出一个时间点,这个时间点是duration时间之前的时间,即认为是“过期”的时间点。

  3. 二分查找:使用wq.binarySearch(0, n-1, expiryTime)在队列中查找第一个过期项的位置(即第一个最后使用时间早于expiryTime的项)。这个函数返回一个索引,如果找到这样的项,则返回该项的索引;如果没有找到,则返回-1

  4. 清理过期项

    • 首先,清空wq.expiry切片,用它来存储所有过期的项。
    • 如果找到了过期项(即index != -1),则将wq.items中从0index(包含index)的所有项(即所有过期项)追加到wq.expiry中。
    • 然后,使用copy函数将wq.items中从index+1n-1的所有项向前移动,覆盖掉前面的过期项。这里mcopy函数返回的值,表示实际复制的元素数量,即队列中剩余的非过期项的数量。
    • 接下来,遍历wq.items中从mn-1的所有位置,将它们设置为nil。
    • 最后,通过wq.items = wq.items[:m]更新wq.items的长度,去除所有过期的项。
  5. 返回过期项:函数返回wq.expiry,这是一个包含所有被移除的过期项的切片。

需要注意的是,wq.items是一个切片,用于存储工作项;wq.expiry也是一个切片,用于临时存储过期的项。

 2.3 Pool结构

pool结构的定义源码稍作改了一下,之前poolCommon的结构就是Pool的结构,目前最新版本做了一个简单的封装。

type Pool struct {poolCommon
}
type poolCommon struct {// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool// which submits a new task to the same pool.capacity int32// running is the number of the currently running goroutines.running int32// lock for protecting the worker queue.lock sync.Locker// workers is a slice that store the available workers.workers workerQueue// state is used to notice the pool to closed itself.state int32// cond for waiting to get an idle worker.cond *sync.Cond// done is used to indicate that all workers are done.allDone chan struct{}// once is used to make sure the pool is closed just once.once *sync.Once// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.workerCache sync.Pool// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lockwaiting int32purgeDone int32purgeCtx  context.ContextstopPurge context.CancelFuncticktockDone int32ticktockCtx  context.ContextstopTicktock context.CancelFuncnow atomic.Valueoptions *Options
}

创建一个线程池:

// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {if size <= 0 {size = -1}opts := loadOptions(options...)if !opts.DisablePurge {if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{poolCommon: poolCommon{capacity: int32(size),allDone:  make(chan struct{}),lock:     syncx.NewSpinLock(),once:     &sync.Once{},options:  opts,}}p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}if p.options.PreAlloc {if size == -1 {return nil, ErrInvalidPreAllocSize}p.workers = newWorkerQueue(queueTypeLoopQueue, size)} else {p.workers = newWorkerQueue(queueTypeStack, 0)}p.cond = sync.NewCond(p.lock)p.goPurge()p.goTicktock()return p, nil
}

看如下几行代码:

	p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}

workerCache为sync.Pool类型,sync.Pool是Go语言标准库中提供的一个对象池化的工具,旨在通过复用对象来减少内存分配的频率并降低垃圾回收的开销,从而提高程序的性能。其内部维护了一组可复用的对象。当你需要一个对象时,可以尝试从sync.Pool中获取。如果sync.Pool中有可用的对象,它将返回一个;否则,它会调用你提供的构造函数来创建一个新对象,sync.PoolNew字段是一个可选的函数,用于在池中无可用对象时创建新的对象。

这里这样写即为:当无可用的worker时,则通过New函数创建一个新的worker。

创建workder列表,内部其实就是创建了了一个切片,类型为workerStack,用于管理所有的worker。

p.workers = newWorkerQueue(queueTypeStack, 0)

NewPool函数执行完成后,一个协程池就创建完成了。

协程池创建完成后,需要用来处理任务,如何将任务函数传递到worker去执行呢?看如下函数:

// Submit submits a task to this pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}w, err := p.retrieveWorker()if w != nil {w.inputFunc(task)}return err
}

函数的入参为一个无返回值、无入参的函数,因此所有需要worker执行的函数都是func()类型,w, err := p.retrieveWorker(),取出一个空闲worker,取出成功后,将任务传递到worker内部:w.inputFunc(task),注意,当线程池中所有worker都忙碌时,inputFunc函数阻塞,一直到有worker空闲。

其他主要的函数,从池中获取worker的函数:

func (p *Pool) retrieveWorker() (w worker, err error) {p.lock.Lock()retry:// First try to fetch the worker from the queue.if w = p.workers.detach(); w != nil {p.lock.Unlock()return}// If the worker queue is empty, and we don't run out of the pool capacity,// then just spawn a new worker goroutine.if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()w = p.workerCache.Get().(*goWorker)w.run()return}// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {p.lock.Unlock()return nil, ErrPoolOverload}// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)if p.IsClosed() {p.lock.Unlock()return nil, ErrPoolClosed}goto retry
}

这个函数,获取worker有三个逻辑:

  • 当池中有空闲worker,直接获取。
  • 当池中没有空闲worker,从缓存workerCache中取出过期的worker使用,复用资源,降低开销。
  • 等待有worker执行完任务释放。(阻塞情况)

revertWorker,将worker放回池中,以执行下次的任务。 

func (p *Pool) revertWorker(worker *goWorker) bool {if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {p.cond.Broadcast()return false}worker.lastUsed = p.nowTime()p.lock.Lock()// To avoid memory leaks, add a double check in the lock scope.// Issue: https://github.com/panjf2000/ants/issues/113if p.IsClosed() {p.lock.Unlock()return false}if err := p.workers.insert(worker); err != nil {p.lock.Unlock()return false}// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.p.cond.Signal()p.lock.Unlock()return true
}

以上就为ants线程池实现的主要技术细节,希望对各位热爱技术的同学们提供一些些帮助。

3、总结

ants协程池是一个高性能、易用的Go语言协程池库,它通过复用goroutines、自动调度任务、定期清理过期goroutines等方式,帮助开发者更加高效地管理并发任务。无论是处理网络请求、数据处理还是其他需要高并发性能的场景,ants协程池都是一个值得推荐的选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/867417.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Laravel任务调度:自动化运维的魔法师

标题&#xff1a;Laravel任务调度&#xff1a;自动化运维的魔法师 在现代Web应用开发中&#xff0c;自动化任务调度是一项不可或缺的功能。Laravel框架提供了一个强大的任务调度系统&#xff0c;允许开发者安排定时任务&#xff0c;如定期发送邮件、备份数据库或执行定时脚本。…

面试经典 150 题

文章目录 6、轮转数组 6、轮转数组 1、描述 给定一个整数数组 nums&#xff0c;将数组中的元素向右轮转 k 个位置&#xff0c;其中 k 是非负数。 示例 1: 输入: nums [1,2,3,4,5,6,7], k 3 输出: [5,6,7,1,2,3,4] 解释: 向右轮转 1 步: [7,1,2,3,4,5,6] 向右轮转 2 步: [6…

【前端实现】在父组件中调用公共子组件:注意事项逻辑示例 + 将后端数组数据格式转换为前端对象数组形式 + 增加和删除行

【前端】在父组件中调用公共子组件的实现方法 写在最前面一、调用公共子组件子组件CommonRow.vue父组件ParentComponent.vue 二、实现功能1. 将后端数组数据格式转换为前端对象数组形式2. 增加和删除row 三、小结 &#x1f308;你好呀&#xff01;我是 是Yu欸 &#x1f30c; 2…

全景图三维3D模型VR全景上传展示H5开发

全景图三维3D模型VR全景上传展示H5开发 3D互动体验平台的核心功能概览 兼容广泛格式&#xff1a;支持OBJ、FBX、GLTF等主流及前沿3D模型格式的无缝上传与展示&#xff0c;确保创意无界。 动态交互探索&#xff1a;用户可自由旋转、缩放、平移模型&#xff0c;深度挖掘每一处…

STMF4 硬件IIC(天空星开发板)

前言&#xff1a;笔记参考立创开发文档&#xff0c;连接放在最后 #IIC概念介绍 #IIC介绍 IIC通信协议&#xff0c;一种常见的串行通信协议&#xff0c;英文全程是 Inter-Integrated Circuit 使用这种通信方式的模块&#xff0c;通常有SCL&#xff08;Serial Clock Line&…

pytest使用报错(以及解决pytest所谓的“抑制print输出”)

1. 测试类的类名问题 #codingutf-8import pytestclass TestClass1:def setup(self) -> None:print(setup)def test_01(self) -> None:print(test_01111111111111111111111)def test_02(self) -> None:print(test_02)以上述代码为例&#xff0c;如果类名是Test开头&am…

Chair Footrest Protective Cover

Chair Footrest Protective Cover 万能通用型椅子脚垫保护套凳子耐磨硅胶加厚垫桌椅脚垫防滑静音套

【代码随想录算法训练Day60】卡码网107.寻找存在的路径

Day60 图论第五天 卡码网107.寻找存在的路径 #include <iostream> #include <vector> using namespace std;int n; // 节点数量 vector<int> father vector<int> (101, 0); // 按照节点大小定义数组大小// 并查集初始化 void init() {for (int i 1…

windows10/11 不小心删除卓越性能模式后再次开启卓越性能模式,显示无法创建新的电源方案 指定的电源方案、子组或设置不存在。

出现这个问题可能是因为系统中缺少必要的电源计划或权限不足。可以按照以下步骤尝试解决&#xff1a; 检查权限&#xff1a;确保你以管理员身份运行命令提示符或PowerShell。 恢复默认电源计划&#xff1a; 打开命令提示符&#xff08;以管理员身份运行&#xff09;&#xff0…

Docker逃逸CVE-2019-5736、procfs云安全漏洞复现,全文5k字,超详细解析!

Docker容器挂载procfs 逃逸 procfs是展示系统进程状态的虚拟文件系统&#xff0c;包含敏感信息。直接将其挂载到不受控的容器内&#xff0c;特别是容器默认拥有root权限且未启用用户隔离时&#xff0c;将极大地增加安全风险。因此&#xff0c;需谨慎处理&#xff0c;确保容器环…

java多线程之ThreadLocal详解

ThreadLocal 一、ThreadLocal概述二、ThreadLocal解决并发时的线程不安全问题三、ThreadLocal的工作原理 一、ThreadLocal概述 ThreadLocal 是一个线程变量工具类。主要用于将私有线程和该线程存放的副本对象做一个映射&#xff0c;各个线程之间的变量互不干扰&#xff0c;例如…

我使用HarmonyOs Next开发了b站的首页

1.实现效果展示&#xff1a; 2.图标准备 我使用的是iconfont图标&#xff0c;下面为项目中所使用到的图标 3. 代码 &#xff08;1&#xff09;Index.ets&#xff1a; import {InfoTop} from ../component/InfoTop import {InfoCenter} from ../component/InfoCenter import…

Mxnet转Onnx 踩坑记录

0. 前言 使用将MXNET模型转换为ONNX的过程中有很多算子不兼容&#xff0c;在此对那些不兼容的算子替换。在此之前需要安装mxnet分支v1.x版本作为mx2onnx的工具&#xff0c;git地址如下&#xff1a; mxnet/python/mxnet/onnx at v1.x apache/mxnet GitHub 同时还参考了如下…

【postgresql】 基础知识学习

PostgreSQL是一个高度可扩展的开源对象关系型数据库管理系统&#xff08;ORDBMS&#xff09;&#xff0c;它以其强大的功能、灵活性和可靠性而闻名。 官网地址&#xff1a;https://www.postgresql.org/ 中文社区&#xff1a;文档目录/Document Index: 世界上功能最强大的开源…

PHP期末复习题

一、选择题 1&#xff0e;在下面&#xff08; A &#xff09;文件夹里面能找到Apache服务器的配置文件。 A&#xff0e;conf B&#xff0e; bin C&#xff0e;error D&#xff0e; data 2&#xff0e;取余数运算符的符号是&#xff08; B &#xff09;。 A. &a…

python 高级技巧 0706

python 33个高级用法技巧 列表推导式 简化了基于现有列表创建新列表的过程。 squares [x**2 for x in range(10)] print(squares)[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]字典推导式 用简洁的方式创建字典。 square_dict {x: x**2 for x in range(10)} print(square_dict){0…

数据结构1:C++实现边长数组

数组作为线性表的一种&#xff0c;具有内存连续这一特点&#xff0c;可以通过下标访问元素&#xff0c;并且下标访问的时间复杂的是O(1)&#xff0c;在数组的末尾插入和删除元素的时间复杂度同样是O(1)&#xff0c;我们使用C实现一个简单的边长数组。 数据结构定义 class Arr…

web零碎知识2

不知道我的这个axios的包导进去没。 找一下关键词&#xff1a; http请求协议&#xff1a;就是进行交互式的格式 需要定义好 这个式一发一收短连接 而且没有记忆 这个分为三个部分 第一个式请求行&#xff0c;第二个就是请求头 第三个就是请求体 以get方式进行请求的失手请求…

Vatee万腾平台:智慧生活的无限可能

在科技日新月异的今天&#xff0c;我们的生活正被各种智能技术悄然改变。从智能家居到智慧城市&#xff0c;从个人健康管理到企业数字化转型&#xff0c;科技的力量正以前所未有的速度渗透到我们生活的每一个角落。而在这场智能革命的浪潮中&#xff0c;Vatee万腾平台以其卓越的…

JavaWeb学习------Servlet

目录 request用法response用法sessionServletContext对象 request用法 -常用方法 String uname request.getParameter(); String[] hobbys request.getParameter("hobby");if(hobby ! null && hobby.length > 0) {for ( String hobby : hobbys) {Syst…