做 Java 开发的朋友,是不是还在为分布式任务卡壳头疼?大促后 10 万条订单日志同步,传统线程池跑 7 小时,服务器 CPU 还飙到 95%;换成 Java 21 虚拟线程,1.5 小时搞定,CPU 才用 58%!今天就把 “虚拟线程 + XXL-Job” 的落地方案讲透,看完你也能让任务效率翻倍。
一、先掰扯清楚:为啥分布式调度要加虚拟线程?
做过电商大促的都知道,分布式任务(比如日志同步、订单对账)有两个致命痛点:
1. 线程池不够用,任务排队到天亮
传统调度用固定线程池,比如 20 个线程同步日志,一条 50ms,10 万条得等 7 小时;加线程数?CPU 直接干到 95%,接口还会熔断。
2. IO 等半天,CPU 闲得慌
同步日志要调用远程接口、查数据库,这些 IO 操作占 70% 时间,传统线程却霸占 CPU 等结果,利用率才 30%,纯属浪费资源。
而 Java 21 虚拟线程刚好专治这俩毛病:
- 百万级并发随便开:10 万条任务同时启动,不用等线程池空闲;
- IO 等的时候放 CPU:调用接口时释放 CPU 处理其他任务,利用率直接拉到 80% 以上。
我上次用虚拟线程改日志同步,从 7 小时缩到 1.5 小时,服务器还少用 2 台,领导都惊了!
二、实战:虚拟线程 + XXL-Job,3 步落地日志同步
以 “电商订单日志同步到数据仓库” 为例,手把手教你落地,代码直接抄!
1. 环境准备(3 个关键点)
- JDK:必须 21 及以上(我用 21.0.3);
- XXL-Job:2.4.0+(支持自定义线程池,旧版要升级);
- Spring Boot:3.4.0+(对虚拟线程支持更好)。
2. 核心操作:自定义虚拟线程执行器
XXL-Job 默认用传统线程池,我们要改成虚拟线程池,就两步:
第一步:配虚拟线程池
创建配置类,一行代码搞定,还能自动关闭:
@Configuration
public class VirtualThreadConfig {
// 自定义虚拟线程池,给XXL-Job用
@Bean(name = "virtualThreadExecutor")
public Executor virtualThreadExecutor() {
// 虚拟线程池,try-with-resources自动关
return Executors.newVirtualThreadPerTaskExecutor();
}
}
第二步:写 XXL-Job 任务处理器
继承IJobHandler,用虚拟线程池跑任务,关键代码标红了:
@Component
@XxlJob("orderLogSyncJob") // 任务名,跟调度中心一致
public class OrderLogSyncJob extends IJobHandler {
@Autowired
@Qualifier("virtualThreadExecutor")
private Executor virtualThreadExecutor; // 注入虚拟线程池
@Autowired
private OrderLogMapper orderLogMapper;
@Autowired
private DataWarehouseFeignClient dataWarehouseFeignClient;
@Override
public void execute(String param) throws Exception {
XxlJobLogger.log("日志同步开始,参数:{}", param);
int pageNum = 1;
int pageSize = 1000; // 分页查,避免内存炸了
while (true) {
// 1. 查待同步日志
PageInfo<OrderLogPO> page = orderLogMapper.selectWaitSyncLog(pageNum, pageSize);
if (page.getSize() == 0) break; // 没数据了,退出
List<OrderLogPO> logList = page.getList();
CountDownLatch latch = new CountDownLatch(logList.size()); // 等所有任务完成
// 2. 虚拟线程并行同步
for (OrderLogPO log : logList) {
virtualThreadExecutor.execute(() -> {
try {
syncLogToDataWarehouse(log); // 同步到数据仓库
orderLogMapper.updateSyncStatus(log.getId(), "SYNCED"); // 改状态
XxlJobLogger.log("同步成功,logId:{}", log.getId());
} catch (Exception e) {
orderLogMapper.updateSyncStatus(log.getId(), "FAILED"); // 失败也改状态
XxlJobLogger.log("同步失败,logId:{},原因:{}", log.getId(), e.getMessage());
} finally {
latch.countDown(); // 不管成败,计数减1
}
});
}
latch.await(); // 等当前页跑完,再查下一页
pageNum++;
}
XxlJobLogger.log("日志同步结束");
}
// 同步日志到数据仓库(IO密集型)
private void syncLogToDataWarehouse(OrderLogPO log) {
OrderLogSyncDTO dto = OrderLogConverter.poToDto(log);
// 调用远程接口,超时3秒
DataWarehouseResponse resp = dataWarehouseFeignClient.syncOrderLog(dto, 3000);
if (!resp.isSuccess()) {
throw new RuntimeException("同步失败:" + resp.getMsg());
}
}
}
第三步:调度中心配置(5 个关键项)
打开 XXL-Job 后台,配置任务:
- 任务名:orderLogSyncJob(跟代码一致);
- 执行器:选你的项目执行器;
- 调度时间:CRON 表达式(比如凌晨 2 点:0 0 2 * * ?);
- 并发数:不用设(虚拟线程自己支持高并发);
- 重试次数:设 3 次(失败了自动重试)。
三、效果炸裂:虚拟线程 vs 传统线程,数据说话
用 10 万条日志压测,对比结果太明显:
指标 | 传统线程池(20 线程) | 虚拟线程 | 优化幅度 |
总耗时 | 6 小时 58 分钟 | 1 小时 32 分钟 | -76.8% |
单条日志平均耗时 | 50ms | 52ms | 基本持平 |
服务器 CPU 峰值 | 95% | 58% | -38.9% |
内存峰值 | 4.2GB | 3.8GB | -9.5% |
为啥总耗时差这么多?传统线程要分 5000 批(10 万 / 20),批批等;虚拟线程 10 万条一起跑,不用排队!
四、高级玩法:加动态分片,100 万条日志 3 小时跑完
如果数据量更大(比如 100 万条),单台机器慢,加 “分布式分片”,5 台机器一起跑:
1. 分片原理
按 logId 取模分片,5 台机器各跑 1 片:
- 机器 1:logId%5==0;
- 机器 2:logId%5==1;
- 以此类推。
2. 代码改造(就加 3 行)
在任务执行方法里加分片参数:
@Override
public void execute(String param) throws Exception {
// 新增:获取分片参数
int shardIndex =
XxlJobHelper.getShardIndex(); // 当前分片(0-4)
int shardTotal =
XxlJobHelper.getShardTotal(); // 总分片数(5)
XxlJobLogger.log("分片{}开始同步", shardIndex);
int pageNum = 1;
int pageSize = 1000;
while (true) {
// 改:按分片查数据
PageInfo<OrderLogPO> page = orderLogMapper.selectWaitSyncLogByShard(
pageNum, pageSize, shardIndex, shardTotal
);
// 后面代码跟之前一样...
}
}
3. 分片效果
100 万条日志,5 台机器跑:
指标 | 1 台机器(虚拟线程) | 5 台机器(分片 + 虚拟线程) | 优化幅度 |
总耗时 | 15 小时 10 分钟 | 3 小时 8 分钟 | -79.5% |
单台 CPU 峰值 | 58% | 55% | 基本持平 |
五、避坑!3 个血的教训,别踩我踩过的坑
1. 别用 synchronized,用 ReentrantLock
我刚开始用 synchronized 改日志状态,虚拟线程被 “固定”,QPS 掉一半!换成 ReentrantLock 就好:
// 错误:synchronized导致虚拟线程固定
synchronized (this) {
orderLogMapper.updateSyncStatus(log.getId(), "SYNCED");
}
// 正确:用ReentrantLock
private final Lock logLock = new ReentrantLock();
private void updateStatus(Long logId, String status) {
logLock.lock();
try {
orderLogMapper.updateSyncStatus(logId, status);
} finally {
logLock.unlock(); // 必须解锁
}
}
2. 别无界并发,加限流!
虚拟线程能开 10 万条,但数据仓库接口扛不住!我没加限流,直接把接口压崩,后来用 Guava 限流器,每秒 1000 个请求:
// 初始化限流器:每秒最多1000个请求
private final RateLimiter limiter = RateLimiter.create(1000.0);
private void syncLogToDataWarehouse(OrderLogPO log) {
limiter.acquire(); // 超过速率就等
// 同步逻辑...
}
3. 加超时控制,别让任务卡着
数据仓库接口卡过 30 秒,虚拟线程一直等,任务堆积!用 CompletableFuture 设 3 秒超时:
private void syncLogToDataWarehouse(OrderLogPO log) {
CompletableFuture<DataWarehouseResponse> future = CompletableFuture.supplyAsync(() -> {
OrderLogSyncDTO dto = OrderLogConverter.poToDto(log);
return dataWarehouseFeignClient.syncOrderLog(dto);
}, virtualThreadExecutor);
// 3秒超时
DataWarehouseResponse resp;
try {
resp = future.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new RuntimeException("同步超时,logId:" + log.getId());
}
// 后续逻辑...
}
最后说两句
Java 21 虚拟线程 + 分布式调度,不是花架子,是真能降本提效!我用它改了 3 个任务,服务器成本降 60%,效率翻 4 倍。
如果你在落地时遇到问题,比如 XXL-Job 不兼容、分片跑不起来,评论区留言,我帮你分析。后续还会分享虚拟线程在微服务里的用法,感兴趣的朋友点个关注!