北屋教程网

专注编程知识分享,从入门到精通的编程学习平台

Java 21 虚拟线程 + 分布式调度:大促 10 万条日志同步,时间压缩!

做 Java 开发的朋友,是不是还在为分布式任务卡壳头疼?大促后 10 万条订单日志同步,传统线程池跑 7 小时,服务器 CPU 还飙到 95%;换成 Java 21 虚拟线程,1.5 小时搞定,CPU 才用 58%!今天就把 “虚拟线程 + XXL-Job” 的落地方案讲透,看完你也能让任务效率翻倍。

一、先掰扯清楚:为啥分布式调度要加虚拟线程?

做过电商大促的都知道,分布式任务(比如日志同步、订单对账)有两个致命痛点:

1. 线程池不够用,任务排队到天亮

传统调度用固定线程池,比如 20 个线程同步日志,一条 50ms,10 万条得等 7 小时;加线程数?CPU 直接干到 95%,接口还会熔断。

2. IO 等半天,CPU 闲得慌

同步日志要调用远程接口、查数据库,这些 IO 操作占 70% 时间,传统线程却霸占 CPU 等结果,利用率才 30%,纯属浪费资源。

而 Java 21 虚拟线程刚好专治这俩毛病:

  • 百万级并发随便开:10 万条任务同时启动,不用等线程池空闲;
  • IO 等的时候放 CPU:调用接口时释放 CPU 处理其他任务,利用率直接拉到 80% 以上。

我上次用虚拟线程改日志同步,从 7 小时缩到 1.5 小时,服务器还少用 2 台,领导都惊了!

二、实战:虚拟线程 + XXL-Job,3 步落地日志同步

以 “电商订单日志同步到数据仓库” 为例,手把手教你落地,代码直接抄!

1. 环境准备(3 个关键点)

  • JDK:必须 21 及以上(我用 21.0.3);
  • XXL-Job:2.4.0+(支持自定义线程池,旧版要升级);
  • Spring Boot:3.4.0+(对虚拟线程支持更好)。

2. 核心操作:自定义虚拟线程执行器

XXL-Job 默认用传统线程池,我们要改成虚拟线程池,就两步:

第一步:配虚拟线程池

创建配置类,一行代码搞定,还能自动关闭:

@Configuration

public class VirtualThreadConfig {

// 自定义虚拟线程池,给XXL-Job用

@Bean(name = "virtualThreadExecutor")

public Executor virtualThreadExecutor() {

// 虚拟线程池,try-with-resources自动关

return Executors.newVirtualThreadPerTaskExecutor();

}

}

第二步:写 XXL-Job 任务处理器

继承IJobHandler,用虚拟线程池跑任务,关键代码标红了:

@Component

@XxlJob("orderLogSyncJob") // 任务名,跟调度中心一致

public class OrderLogSyncJob extends IJobHandler {

@Autowired

@Qualifier("virtualThreadExecutor")

private Executor virtualThreadExecutor; // 注入虚拟线程池

@Autowired

private OrderLogMapper orderLogMapper;

@Autowired

private DataWarehouseFeignClient dataWarehouseFeignClient;

@Override

public void execute(String param) throws Exception {

XxlJobLogger.log("日志同步开始,参数:{}", param);

int pageNum = 1;

int pageSize = 1000; // 分页查,避免内存炸了

while (true) {

// 1. 查待同步日志

PageInfo<OrderLogPO> page = orderLogMapper.selectWaitSyncLog(pageNum, pageSize);

if (page.getSize() == 0) break; // 没数据了,退出

List<OrderLogPO> logList = page.getList();

CountDownLatch latch = new CountDownLatch(logList.size()); // 等所有任务完成

// 2. 虚拟线程并行同步

for (OrderLogPO log : logList) {

virtualThreadExecutor.execute(() -> {

try {

syncLogToDataWarehouse(log); // 同步到数据仓库


orderLogMapper.updateSyncStatus(log.getId(), "SYNCED"); // 改状态

XxlJobLogger.log("同步成功,logId:{}", log.getId());

} catch (Exception e) {


orderLogMapper.updateSyncStatus(log.getId(), "FAILED"); // 失败也改状态

XxlJobLogger.log("同步失败,logId:{},原因:{}", log.getId(), e.getMessage());

} finally {

latch.countDown(); // 不管成败,计数减1

}

});

}

latch.await(); // 等当前页跑完,再查下一页

pageNum++;

}

XxlJobLogger.log("日志同步结束");

}

// 同步日志到数据仓库(IO密集型)

private void syncLogToDataWarehouse(OrderLogPO log) {

OrderLogSyncDTO dto = OrderLogConverter.poToDto(log);

// 调用远程接口,超时3秒

DataWarehouseResponse resp = dataWarehouseFeignClient.syncOrderLog(dto, 3000);

if (!resp.isSuccess()) {

throw new RuntimeException("同步失败:" + resp.getMsg());

}

}

}

第三步:调度中心配置(5 个关键项)

打开 XXL-Job 后台,配置任务:

  • 任务名:orderLogSyncJob(跟代码一致);
  • 执行器:选你的项目执行器;
  • 调度时间:CRON 表达式(比如凌晨 2 点:0 0 2 * * ?);
  • 并发数:不用设(虚拟线程自己支持高并发);
  • 重试次数:设 3 次(失败了自动重试)。

三、效果炸裂:虚拟线程 vs 传统线程,数据说话

用 10 万条日志压测,对比结果太明显:

指标

传统线程池(20 线程)

虚拟线程

优化幅度

总耗时

6 小时 58 分钟

1 小时 32 分钟

-76.8%

单条日志平均耗时

50ms

52ms

基本持平

服务器 CPU 峰值

95%

58%

-38.9%

内存峰值

4.2GB

3.8GB

-9.5%

为啥总耗时差这么多?传统线程要分 5000 批(10 万 / 20),批批等;虚拟线程 10 万条一起跑,不用排队!

四、高级玩法:加动态分片,100 万条日志 3 小时跑完

如果数据量更大(比如 100 万条),单台机器慢,加 “分布式分片”,5 台机器一起跑:

1. 分片原理

按 logId 取模分片,5 台机器各跑 1 片:

  • 机器 1:logId%5==0;
  • 机器 2:logId%5==1;
  • 以此类推。

2. 代码改造(就加 3 行)

在任务执行方法里加分片参数:

@Override

public void execute(String param) throws Exception {

// 新增:获取分片参数

int shardIndex =
XxlJobHelper.getShardIndex(); // 当前分片(0-4)

int shardTotal =
XxlJobHelper.getShardTotal(); // 总分片数(5)

XxlJobLogger.log("分片{}开始同步", shardIndex);

int pageNum = 1;

int pageSize = 1000;

while (true) {

// 改:按分片查数据

PageInfo<OrderLogPO> page = orderLogMapper.selectWaitSyncLogByShard(

pageNum, pageSize, shardIndex, shardTotal

);

// 后面代码跟之前一样...

}

}

3. 分片效果

100 万条日志,5 台机器跑:

指标

1 台机器(虚拟线程)

5 台机器(分片 + 虚拟线程)

优化幅度

总耗时

15 小时 10 分钟

3 小时 8 分钟

-79.5%

单台 CPU 峰值

58%

55%

基本持平

五、避坑!3 个血的教训,别踩我踩过的坑

1. 别用 synchronized,用 ReentrantLock

我刚开始用 synchronized 改日志状态,虚拟线程被 “固定”,QPS 掉一半!换成 ReentrantLock 就好:

// 错误:synchronized导致虚拟线程固定

synchronized (this) {

orderLogMapper.updateSyncStatus(log.getId(), "SYNCED");

}

// 正确:用ReentrantLock

private final Lock logLock = new ReentrantLock();

private void updateStatus(Long logId, String status) {

logLock.lock();

try {

orderLogMapper.updateSyncStatus(logId, status);

} finally {

logLock.unlock(); // 必须解锁

}

}

2. 别无界并发,加限流!

虚拟线程能开 10 万条,但数据仓库接口扛不住!我没加限流,直接把接口压崩,后来用 Guava 限流器,每秒 1000 个请求:

// 初始化限流器:每秒最多1000个请求

private final RateLimiter limiter = RateLimiter.create(1000.0);

private void syncLogToDataWarehouse(OrderLogPO log) {

limiter.acquire(); // 超过速率就等

// 同步逻辑...

}

3. 加超时控制,别让任务卡着

数据仓库接口卡过 30 秒,虚拟线程一直等,任务堆积!用 CompletableFuture 设 3 秒超时:

private void syncLogToDataWarehouse(OrderLogPO log) {

CompletableFuture<DataWarehouseResponse> future = CompletableFuture.supplyAsync(() -> {

OrderLogSyncDTO dto = OrderLogConverter.poToDto(log);

return dataWarehouseFeignClient.syncOrderLog(dto);

}, virtualThreadExecutor);

// 3秒超时

DataWarehouseResponse resp;

try {

resp = future.get(3, TimeUnit.SECONDS);

} catch (TimeoutException e) {

throw new RuntimeException("同步超时,logId:" + log.getId());

}

// 后续逻辑...

}

最后说两句

Java 21 虚拟线程 + 分布式调度,不是花架子,是真能降本提效!我用它改了 3 个任务,服务器成本降 60%,效率翻 4 倍。

如果你在落地时遇到问题,比如 XXL-Job 不兼容、分片跑不起来,评论区留言,我帮你分析。后续还会分享虚拟线程在微服务里的用法,感兴趣的朋友点个关注!

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言