文章目录
- 用goroutine来替代mq做异步的应用
- 心跳
- contenx的超时设置
- 定时器
- break label
- 核心代码
用goroutine来替代mq做异步的应用
方法在创建ai任务接口中用协程的方式异步调用go s.handleResultPolling(ctx, algorithm, taskId, iAiHandle)
,来更新ai任务的状态
心跳
心跳机制是一种用于检测连接或任务是否仍处于活动状态的方法。在分布式系统或网络通信中,心跳机制可以用来监视节点和服务的可用性、性能和故障。心跳通常是通过定期发送小的数据包或信号来实现的,然后接收方会对这些信号进行响应,以表示它们仍然在线并正常运行。
在这段代码中,心跳机制用于监控AI任务的进行情况。每隔2秒,会将当前时间戳写入Redis缓存,作为心跳信号。这样,外部监控系统可以检查Redis缓存中的心跳信号来判断任务是否仍在进行。如果在一定时间内没有收到新的心跳信号,那么可以认为任务已经中止或出现故障。
contenx的超时设置
context没有默认的超时时间,如果要设置超时时间的话,记得调用
ctxTimeout, timeoutCancel := context.WithTimeout(ctx, time.Hour)
defer timeoutCancel() // 操作完成时立即释放资源
定时器
在这段代码中,time.NewTicker(time.Millisecond)
创建的定时器最初每隔1毫秒触发一次,ticker.Reset(time.Second * 2)
的作用是重置定时器(ticker)的时间间隔为2s,这样能每2s从通道中取到信息case <-ticker.C:
。
break label
breakFor是一个标签,它用于在嵌套循环或选择语句中提供更精确的控制。在这个例子中,
breakFor标签用于跳出外层的
for循环。 在Go语言中,
break语句默认只跳出当前层次的循环或选择语句。在这个例子中,当
ctxTimeout.Done()通道接收到一个值或者满足其他条件时,我们希望跳出整个
for循环,而不仅仅是
select语句。通过在
for循环前添加
breakFor标签,并在
break`语句中指定该标签,我们可以实现这个功能。
核心代码
// handleCreateAsync 启动协程处理结果
func (s *aiHandle) handleCreateAsync(ctx context.Context, algorithm string, taskId string, data interface{}, iAiHandle IAiHandle) {// 上报神策properties := map[string]interface{}{"uid": util.GetCtx(ctx).User.WsId,"alg": algorithm,}sensorsdata.Track(gtrace.GetTraceID(ctx), "SeAlgCreate", properties, util.GetCtx(ctx).User.WsId != "", "")// 创建任务s.createDbSave(ctx, taskId, algorithm, data)// 任务数量+1s.handleCreateActive(ctx, algorithm, true)// 异步处理结果go s.handleResultPolling(ctx, algorithm, taskId, iAiHandle)
}
// handleCreateAsync 启动协程处理结果
func (s *aiHandle) handleCreateAsync(ctx context.Context, algorithm string, taskId string, data interface{}, iAiHandle IAiHandle) {// 上报神策properties := map[string]interface{}{"uid": util.GetCtx(ctx).User.WsId,"alg": algorithm,}sensorsdata.Track(gtrace.GetTraceID(ctx), "SeAlgCreate", properties, util.GetCtx(ctx).User.WsId != "", "")// 创建任务s.createDbSave(ctx, taskId, algorithm, data)// 任务数量+1s.handleCreateActive(ctx, algorithm, true)// 异步处理结果go s.handleResultPolling(ctx, algorithm, taskId, iAiHandle)
}// handleResultPolling 轮询结果
func (s *aiHandle) handleResultPolling(ctx context.Context, algorithm, taskId string, iAiHandle IAiHandle) {// 60分钟超时// 1. 首先,设置一个60分钟的超时上下文,确保处理不会无限期地进行下去。ctxTimeout, timeoutCancel := context.WithTimeout(ctx, time.Hour)defer timeoutCancel() // 超时前调用这个,用来释放资源;不调用这个方法的话,会在超时时间释放资源// 2. 创建一个心跳goroutine,每隔2秒更新一次心跳缓存。这样可以在外部监控任务的进行情况。// 启动心跳,2秒一次go func(ctx context.Context, taskId string) {ticker := time.NewTicker(time.Millisecond)defer ticker.Stop()heartbeatKey := aiCache.GetAiTaskHeartBeat(taskId)for {select {case <-ctx.Done():returncase <-ticker.C:ticker.Reset(time.Second * 2)if cache.RedisExists(ctx, heartbeatKey) == false {_ = cache.RedisSet(ctx, heartbeatKey, time.Now().Unix(), time.Hour*24)}_ = cache.RedisIncBy(ctx, heartbeatKey, 2)}}}(ctxTimeout, taskId)// 3. 初始化任务结果的状态和数据变量。// 任务结果taskEnd := falsetaskStatus := consts.TaskStatusProcessingtaskMsg := ""var taskData any// 4. 设置一个定时器,每隔3秒执行一次轮询查询结果。// 每3秒重试一次step := time.NewTimer(time.Millisecond)defer step.Stop()// 查询失败时,重试5次retry := 1//5. 使用`breakFor`标签和`select`语句进行轮询操作,当超时或查询到结果时跳出循环。//- 调用`iAiHandle.ResultHandle`方法查询任务结果。//- 如果查询失败,重试5次。//- 如果任务状态不是处理中或等待中,则更新任务状态和数据,设置`taskEnd`为`true`,跳出循环。// 轮询结果
breakFor:for {select {case <-ctxTimeout.Done():break breakForcase <-step.C:step.Reset(time.Second * 3)// 查结果taskResult, err := iAiHandle.ResultHandle(ctx, &v1.ResultReq{Algorithm: algorithm,TaskId: taskId,})taskData = taskResultif err != nil {if retry > 5 {taskStatus = consts.TaskStatusServerFailtaskMsg = "handle result err retry than 5 times"break breakFor}retry++} else {// 状态不一致则返回if taskResult != nil && taskResult.Status != consts.TaskStatusProcessing && taskResult.Status != consts.TaskStatusWaiting {taskStatus = taskResult.StatustaskMsg = taskResult.ReasontaskEnd = truebreak breakFor}}}}// 程序处理超时if taskEnd == false {taskStatus = consts.TaskStatusServerOutTimetaskMsg = "handle result than 60 minutes"}// 7. 更新任务计数器。// 任务数量-1s.handleCreateActive(ctx, algorithm, false)// 8. 保存任务结果到数据库。// 更数任务s.resultDbSave(ctx, taskId, taskStatus, taskMsg, taskData)}