Vert.x DB Task Queue 是一个基于数据库实现的轻量级任务队列系统,它提供了一种不依赖专门消息队列中间件的异步任务处理方案。系统利用数据库作为持久化存储,结合 Vert.x 的异步处理能力,实现了高可用、可靠的任务处理机制。
- 简单性:提供简洁的 API,降低使用门槛
- 可靠性:确保任务不丢失,支持故障恢复
- 可扩展性:支持多实例部署,水平扩展
- 可监控性:提供完整的任务状态监控和管理功能
- 轻量级:最小化外部依赖,仅需数据库支持
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Application │ │ Task Queue │ │ Database │
│ │ │ Service │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │Task Producer│─┼────>│ │Task Enqueue │ │────>│ │ Tasks │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ │ Table │ │
│ │ │ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ │ │ │
│ │Task Consumer│<┼─────│ │Task Poller │ │<────│ │ │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
-
TaskQueueService
- 提供任务队列的核心操作接口
- 负责任务的入队、完成、重入队等操作
- 确保事务一致性
-
TaskPollerVerticle
- 负责任务轮询的核心组件
- 管理轮询生命周期
- 支持动态启停
-
TaskProcessorVerticle
- 负责具体任务的处理
- 支持自定义处理逻辑
- 处理结果反馈
-
TaskQueueSupportService
- 提供任务监控和管理功能
- 支持任务状态查询和管理
- 提供 Web UI 界面
┌─────────┐ ┌────────────┐ ┌───────────┐
│ CREATED │────>│ PROCESSING │────>│ COMPLETED │
└─────────┘ └────────────┘ └───────────┘
│ │
│ │
│ v
│ ┌────────┐ ┌─────────┐
└──────────>│ ERROR │───────>│ POISON │
└────────┘ └─────────┘
系统采用两种方案处理多实例并发问题:
SELECT * FROM TASKS
WHERE STATUS = 'CREATED'
AND QUEUE_NAME = ?
FOR UPDATE SKIP LOCKED
UPDATE TASKS SET
STATUS = 'PROCESSING',
POLLER_INSTANCE = ?,
NEXT_PROCESS_TIME = ?
WHERE ID IN (SELECT ID FROM TASKS
WHERE STATUS = 'CREATED'
AND QUEUE_NAME = ?)
Tasks 表结构
字段名 | 类型 | 说明 |
---|---|---|
ID | NUMBER | 主键 |
QUEUE_NAME | VARCHAR2 | 队列名称 |
REFERENCE_NUMBER | VARCHAR2 | 任务引用号(业务标识) |
STATUS | VARCHAR2 | 任务状态 |
ATTEMPT | NUMBER | 尝试次数 |
PAYLOAD | CLOB | 任务数据 |
PROCESS_RESULT | VARCHAR2 | 处理结果 |
CREATE_TIME | TIMESTAMP | 创建时间 |
POLLER_INSTANCE | VARCHAR2 | 处理实例标识 |
NEXT_PROCESS_TIME | TIMESTAMP | 下次处理时间 |
Future<Long> enqueue(SqlConnection conn, String queueName, String refNumber, T payload) {
// 1. 序列化任务数据
// 2. 在事务中保存任务
// 3. 通知轮询器立即检查新任务
}
- 使用定时器定期检查任务
- 支持动态调整轮询间隔
- 实现退避策略处理空闲情况
-
任务重试机制
- 支持配置重试次数
- 支持延迟重试
- 支持自定义重试策略
-
毒药任务处理
- 标记无法处理的任务
- 支持人工干预
- 提供清理机制
-
任务处理器
- 自定义业务逻辑
- 自定义错误处理
- 自定义重试策略
-
存储层
- 支持不同数据库
- 支持自定义表结构
- 预留 Redis 支持
-
指标收集
- 任务处理时间
- 成功/失败率
- 队列深度
-
管理功能
- 任务查询
- 状态修改
- 手动触发
pool.withTransaction(conn ->
// 1. 执行业务逻辑
businessLogic(conn)
// 2. 入队任务
.compose(result ->
taskEnqueueService.enqueue(conn, "queue", "ref", payload))
);
class CustomTaskProcessor implements Function<Task<T>, Future<?>> {
@Override
public Future<?> apply(Task<T> task) {
return Future.future(promise -> {
// 1. 解析任务数据
// 2. 执行业务逻辑
// 3. 更新任务状态
});
}
}
- 任务优先级支持
- 分布式锁优化
- 批量任务处理
- 任务依赖关系
- 更多存储引擎支持
- 性能优化
- 连接池调优
- SQL 优化
- 缓存策略
系统被划分为以下几个主要模块:
-
核心模块(core/internal)
- 包路径:
io.github.colinzhu.taskqueue
和io.github.colinzhu.taskqueue.internal
- 主要组件:
TaskQueueService
- 任务队列服务接口TaskQueueServiceImpl
- 服务实现类TaskEntity
- 内部任务实体TaskStatus
- 任务状态枚举(包级私有)TaskRepo
- 数据访问层
- 主要职责:
- 提供任务队列的核心操作
- 管理任务状态和生命周期
- 处理数据持久化
- 提供基础工具方法
- 包路径:
-
轮询模块(polling)
- 包路径:
io.github.colinzhu.taskqueue.dispatch
- 主要组件:
TaskPoller
- 任务轮询器TaskPollerVerticle
- 轮询VerticleTaskPollerConfig
- 轮询配置
- 主要职责:
- 任务发现和获取
- 任务处理和执行
- 轮询生命周期管理
- 并发控制
- 包路径:
-
支持模块(support)
- 包路径:
io.github.colinzhu.taskqueue.support
- 主要组件:
TaskQueueSupportService
- 支持服务接口TaskQueueSupportHandler
- Web处理器
- 主要职责:
- 提供管理界面
- 任务监控和统计
- 问题任务处理
- 系统维护功能
- 包路径:
┌─────────────────────────────────────────────────────┐
│ 应用层 Application │
└─────────────────────────────────────────────────────┘
↑ ↑ ↑
│ │ │
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ 核心模块 │ │ 轮询模块 │ │ 支持模块 │
│core/internal│ │ polling │ │ support │
└─────────────┘ └──────────────┘ └──────────────┘
-
核心模块(core/internal)
- 不依赖其他模块
- 通过 module-info.java 控制包的可见性
- 内部实现放在 internal 包中
-
轮询模块(polling)
- 依赖核心模块
- 通过 module-info.java 获得对 core 包的访问权限
- 实现任务处理逻辑
-
支持模块(support)
- 依赖核心模块
- 通过 module-info.java 获得对 core 包的访问权限
- 实现Web界面
-
包级私有类
TaskStatus
- 仅在 internal 包内可见TaskEntity
- 仅在 internal 包内可见TaskQueueServiceImpl
- 仅在根包内可见TaskRepo
- 仅在 internal 包内可见
-
公开接口
TaskQueueService
- 主要服务接口TaskQueueSupportService
- 支持服务接口
-
模块化控制
- 通过 module-info.java 严格控制包的可见性
- 只向外暴露必要的 API
- 内部实现完全隐藏
pool.withTransaction(conn ->
// 1. 执行业务逻辑
businessLogic(conn)
// 2. 入队任务
.compose(result ->
taskEnqueueService.enqueue(conn, "queue", "ref", payload))
);
class CustomTaskProcessor implements Function<Task<T>, Future<?>> {
@Override
public Future<?> apply(Task<T> task) {
return Future.future(promise -> {
// 1. 解析任务数据
// 2. 执行业务逻辑
// 3. 更新任务状态
});
}
}