CompletableFuture和AtomicReference解决异步和同步调用获取结果的问题
背景:存在一个耗时的任务,优先使用异步方式进行调用,但是有时有需要同步调用等待获取任务执行结果。
这里存在下面几个问题:
- 同一任务已经异步提交执行了,现在需要同步执行等待结果
 - 同一任务正在同步执行等待结果,后面后进行了异步重复提交,需要确保不会重复调用该任务
 - 系统资源有效,耗时任务不能无限的添加执行
 
解决方案:
- 可以使用
CompletableFuture和AtomicReference来保存异步调用的结果,并在同步调用时检查是否有正在进行的任务。如果有,等待结果;如果没有,启动任务。 - 使用
Semaphore来控制同时能有多少个任务可执行 
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
public class TaskManager {
    private final ConcurrentHashMap<String, AtomicReference<CompletableFuture<String>>> tasks = new ConcurrentHashMap<>();
    private final Semaphore semaphore = new Semaphore(5);  // 限制最多同时执行5个任务
    // 同步执行任务
    public String executeTaskSynchronously(String taskId) {
        AtomicReference<CompletableFuture<String>> taskRef = tasks.computeIfAbsent(taskId, k -> new AtomicReference<>());
        CompletableFuture<String> task = taskRef.get();
        if (task == null) {
            task = startTask(taskId);
            if (!taskRef.compareAndSet(null, task)) {
                task = taskRef.get();
            }
        }
        return task.join();  // 等待异步任务完成并返回结果
    }
    // 异步执行任务
    public CompletableFuture<String> executeTaskAsynchronously(String taskId) {
        AtomicReference<CompletableFuture<String>> taskRef = tasks.computeIfAbsent(taskId, k -> new AtomicReference<>());
        CompletableFuture<String> task = taskRef.get();
        if (task == null) {
            task = startTask(taskId);
            if (!taskRef.compareAndSet(null, task)) {
                task = taskRef.get();
            }
        }
        return task;
    }
    // 实际启动任务
    private CompletableFuture<String> startTask(String taskId) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();  // 获取许可,最多只能有5个任务同时执行
                // 模拟耗时任务
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "Result for task " + taskId;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "Task interrupted";
            } finally {
                semaphore.release();  // 释放许可,允许下一个任务执行
                tasks.remove(taskId);
            }
        });
    }
}
代码说明:
ConcurrentHashMap:tasks是一个ConcurrentHashMap,其中的每个任务都有一个唯一的taskId作为键。每个任务的执行状态用AtomicReference<CompletableFuture<String>>表示。Semaphore:Semaphore初始化为 5,表示最多允许 5 个任务同时执行。如果一个任务完成了,会释放许可,允许下一个任务执行。- 同步调用 (
executeTaskSynchronously): 同步调用会获取任务的CompletableFuture,如果任务还未启动,则启动任务并存储在AtomicReference中,然后等待任务完成并返回结果。 - 异步调用 (
executeTaskAsynchronously): 异步调用与同步调用类似,只是不等待任务完成,而是直接返回CompletableFuture,以便调用者在未来某个时间点处理结果。 - 任务完成后清理: 当任务完成后,
tasks中与该任务相关的记录会被移除,以便下次可以重新启动任务。 




