You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
udi-spms-java/src/main/java/com/glxp/api/util/redis/RedisDelayedQueue.java

73 lines
2.5 KiB
Java

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.glxp.api.util.redis;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import java.util.List;
import java.util.Set;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.resps.Tuple;
@Component
public class RedisDelayedQueue {
private Jedis jedis = new Jedis("localhost", 6379); // Redis连接
// 将任务加入延时队列
public void addTaskToQueue(String taskId, long delaySeconds) {
long timestamp = System.currentTimeMillis() / 1000 + delaySeconds; // 计算任务执行的时间戳
jedis.zadd("rel_code_queue", timestamp, taskId); // 将任务添加到有序集合score 为任务的执行时间戳
System.out.println("Task " + taskId + " added to queue with " + delaySeconds + " seconds delay.");
}
public Jedis getJedis() {
jedis.select(10);
return jedis;
}
// 获取并处理超时的任务
public void processTasks() {
long currentTimestamp = System.currentTimeMillis() / 1000; // 当前时间戳(秒)
// 获取所有超时的任务 (score <= 当前时间戳)
List<Tuple> tasksToProcess = jedis.zrangeByScoreWithScores("delay_queue", "-inf", String.valueOf(currentTimestamp));
Iterator<Tuple> iterator = tasksToProcess.iterator();
while (iterator.hasNext()) {
Tuple task = iterator.next();
String taskId = task.getElement();
System.out.println("Processing task " + taskId);
// 执行任务逻辑 (你可以在这里调用实际的业务逻辑)
// 从队列中删除已执行的任务
jedis.zrem("delay_queue", taskId);
System.out.println("Task " + taskId + " removed from queue.");
}
}
// 定期检查并处理任务
// public static void startTaskProcessor() {
// while (true) {
// try {
// // 每隔1秒检查一次队列
// TimeUnit.SECONDS.sleep(1);
// processTasks(); // 处理超时的任务
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// }
// public static void main(String[] args) {
// // 添加任务到延时队列
// addTaskToQueue("task1", 60); // 任务 1 延迟 5 秒执行
// startTaskProcessor();
// System.out.println("123");
// return;
// // 启动任务处理器,定期检查队列
// }
}