效率工具
- 推荐一个程序员的常用工具网站,效率加倍嘎嘎好用:程序员常用工具
云服务器
- 云服务器限时免费领:轻量服务器2核4G
- 腾讯云:2核2G4M云服务器新老同享99元/年,续费同价
- 阿里云:2核2G3M的ECS服务器只需99元/年,续费同价
在分布式系统中,延时队列是一种常见的需求,它允许我们将任务延迟一段时间后再执行。常见的应用场景包括订单超时处理、短信发送延迟、缓存失效处理等。本文将介绍如何在Spring Boot项目中,结合Redis和Lua脚本实现一个高效的延时队列。
一、项目准备
1.1 引入依赖
在Spring Boot项目中,我们需要引入以下依赖:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency>
</dependencies>
1.2 配置Redis
在application.yml
中配置Redis连接信息:
spring:redis:host: localhostport: 6379password: lettuce:pool:max-active: 8max-idle: 8min-idle: 0
1.3 创建Redis配置类
创建一个配置类来配置RedisTemplate:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new StringRedisSerializer());return template;}
}
二、实现延时队列
延时队列的核心思想是使用Redis的有序集合(Sorted Set)来存储任务,每个任务关联一个延时时间。当时间到达时,通过Lua脚本将任务从有序集合中移到处理队列中。
2.1 创建任务发布接口
首先,我们创建一个接口来发布延时任务:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import java.time.Instant;@Service
public class DelayQueueService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;private static final String DELAY_QUEUE_KEY = "delay_queue";public void addTask(String taskId, long delayInSeconds) {long score = Instant.now().getEpochSecond() + delayInSeconds;redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, taskId, score);}
}
2.2 Lua脚本处理任务
Lua脚本用于从有序集合中取出到期的任务,并将其移到处理队列中:
local delayQueueKey = KEYS[1]
local readyQueueKey = KEYS[2]
local currentTime = tonumber(ARGV[1])
local tasks = redis.call('ZRANGEBYSCORE', delayQueueKey, 0, currentTime)if next(tasks) ~= nil thenfor _, task in ipairs(tasks) doredis.call('ZREM', delayQueueKey, task)redis.call('LPUSH', readyQueueKey, task)end
endreturn tasks
2.3 创建任务处理接口
我们创建一个接口来处理到期的任务:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;import java.time.Instant;
import java.util.List;@Service
public class TaskProcessor {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate JedisPool jedisPool;private static final String DELAY_QUEUE_KEY = "delay_queue";private static final String READY_QUEUE_KEY = "ready_queue";private static final String LUA_SCRIPT = "local delayQueueKey = KEYS[1] " +"local readyQueueKey = KEYS[2] " +"local currentTime = tonumber(ARGV[1]) " +"local tasks = redis.call('ZRANGEBYSCORE', delayQueueKey, 0, currentTime) " +"if next(tasks) ~= nil then " +" for _, task in ipairs(tasks) do " +" redis.call('ZREM', delayQueueKey, task) " +" redis.call('LPUSH', readyQueueKey, task) " +" end " +"end " +"return tasks ";@Scheduled(fixedRate = 1000)public void processTasks() {try (Jedis jedis = jedisPool.getResource()) {List<String> tasks = (List<String>) jedis.eval(LUA_SCRIPT, 2, DELAY_QUEUE_KEY, READY_QUEUE_KEY, String.valueOf(Instant.now().getEpochSecond()));for (String task : tasks) {// 处理任务System.out.println("Processing task: " + task);}}}
}
在这个实现中,processTasks
方法每秒执行一次,通过Lua脚本检查并处理到期的任务。
三、测试与验证
3.1 添加测试任务
在控制器中添加接口来发布延时任务:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class DelayQueueController {@Autowiredprivate DelayQueueService delayQueueService;@PostMapping("/addTask")public String addTask(@RequestParam String taskId, @RequestParam long delayInSeconds) {delayQueueService.addTask(taskId, delayInSeconds);return "Task added";}
}
3.2 验证任务处理
启动Spring Boot应用程序,通过HTTP请求添加任务:
curl -X POST "http://localhost:8080/addTask?taskId=task1&delayInSeconds=10"
curl -X POST "http://localhost:8080/addTask?taskId=task2&delayInSeconds=20"
检查控制台输出,确认任务在指定的延迟时间后被正确处理:
Processing task: task1
Processing task: task2
四、总结
通过本文,我们学习了如何在Spring Boot项目中使用Redis和Lua脚本实现延时队列。通过Redis的有序集合存储任务和Lua脚本处理到期任务,可以实现高效的延时任务处理机制。结合Spring Boot的定时任务调度,能够方便地实现任务的定期检查和处理。这种方法不仅简单高效,还能很好地扩展和维护,是实现延时队列的一个优秀方案。