背景:存在一个耗时的任务,优先使用异步方式进行调用,但是有时有需要同步调用等待获取任务执行结果。

这里存在下面几个问题:

  • 同一任务已经异步提交执行了,现在需要同步执行等待结果
  • 同一任务正在同步执行等待结果,后面后进行了异步重复提交,需要确保不会重复调用该任务
  • 系统资源有效,耗时任务不能无限的添加执行

解决方案:

  1. 可以使用CompletableFutureAtomicReference来保存异步调用的结果,并在同步调用时检查是否有正在进行的任务。如果有,等待结果;如果没有,启动任务。
  2. 使用Semaphore来控制同时能有多少个任务可执行

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

public class TaskManager {
    private final ConcurrentHashMap<String, AtomicReference<CompletableFuture<String>>> tasks = new ConcurrentHashMap<>();
    private final Semaphore semaphore = new Semaphore(5);  // 限制最多同时执行5个任务

    // 同步执行任务
    public String executeTaskSynchronously(String taskId) {
        AtomicReference<CompletableFuture<String>> taskRef = tasks.computeIfAbsent(taskId, k -> new AtomicReference<>());
        CompletableFuture<String> task = taskRef.get();

        if (task == null) {
            task = startTask(taskId);
            if (!taskRef.compareAndSet(null, task)) {
                task = taskRef.get();
            }
        }

        return task.join();  // 等待异步任务完成并返回结果
    }

    // 异步执行任务
    public CompletableFuture<String> executeTaskAsynchronously(String taskId) {
        AtomicReference<CompletableFuture<String>> taskRef = tasks.computeIfAbsent(taskId, k -> new AtomicReference<>());
        CompletableFuture<String> task = taskRef.get();

        if (task == null) {
            task = startTask(taskId);
            if (!taskRef.compareAndSet(null, task)) {
                task = taskRef.get();
            }
        }

        return task;
    }

    // 实际启动任务
    private CompletableFuture<String> startTask(String taskId) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();  // 获取许可,最多只能有5个任务同时执行
                // 模拟耗时任务
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "Result for task " + taskId;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "Task interrupted";
            } finally {
                semaphore.release();  // 释放许可,允许下一个任务执行
                tasks.remove(taskId);
            }
        });
    }
}

代码说明:

  1. ConcurrentHashMap: tasks 是一个 ConcurrentHashMap,其中的每个任务都有一个唯一的 taskId 作为键。每个任务的执行状态用 AtomicReference<CompletableFuture<String>> 表示。
  2. Semaphore: Semaphore 初始化为 5,表示最多允许 5 个任务同时执行。如果一个任务完成了,会释放许可,允许下一个任务执行。
  3. 同步调用 (executeTaskSynchronously): 同步调用会获取任务的 CompletableFuture,如果任务还未启动,则启动任务并存储在 AtomicReference 中,然后等待任务完成并返回结果。
  4. 异步调用 (executeTaskAsynchronously): 异步调用与同步调用类似,只是不等待任务完成,而是直接返回 CompletableFuture,以便调用者在未来某个时间点处理结果。
  5. 任务完成后清理: 当任务完成后,tasks 中与该任务相关的记录会被移除,以便下次可以重新启动任务。

标签: java

已有 9 条评论

  1. 每次看到你的文章,我都觉得时间过得好快。http://www.xatbgg.com

  2. 你的文章内容非常卖力,让人点赞。http://www.gahaiqinaili.com

  3. 《铁原纪行》剧情片高清在线免费观看:https://www.jgz518.com/xingkong/94641.html

  4. 《新品上市便利餐厅》日韩综艺高清在线免费观看:https://www.jgz518.com/xingkong/100917.html

  5. 文章深入浅出,既有深度思考,又不乏广度覆盖,令人叹为观止。

  6. 创新略显不足,可尝试引入多元视角。

  7. 作者以非凡的视角解读平凡,让文字焕发出别样的光彩。

  8. 作者以简洁明了的语言,传达了深刻的思想和情感。

  9. 2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
    新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
    新车首发,新的一年,只带想赚米的人coinsrore.com
    新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
    做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
    新车上路,只带前10个人coinsrore.com
    新盘首开 新盘首开 征召客户!!!coinsrore.com
    新项目准备上线,寻找志同道合 的合作伙伴coinsrore.com
    新车即将上线 真正的项目,期待你的参与coinsrore.com
    新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
    新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com

添加新评论