CompletableFuture和AtomicReference解决异步和同步调用获取结果的问题
背景:存在一个耗时的任务,优先使用异步方式进行调用,但是有时有需要同步调用等待获取任务执行结果。
这里存在下面几个问题:
- 同一任务已经异步提交执行了,现在需要同步执行等待结果
- 同一任务正在同步执行等待结果,后面后进行了异步重复提交,需要确保不会重复调用该任务
- 系统资源有效,耗时任务不能无限的添加执行
解决方案:
- 可以使用
CompletableFuture
和AtomicReference
来保存异步调用的结果,并在同步调用时检查是否有正在进行的任务。如果有,等待结果;如果没有,启动任务。 - 使用
Semaphore
来控制同时能有多少个任务可执行
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
public class TaskManager {
private final ConcurrentHashMap<String, AtomicReference<CompletableFuture<String>>> tasks = new ConcurrentHashMap<>();
private final Semaphore semaphore = new Semaphore(5); // 限制最多同时执行5个任务
// 同步执行任务
public String executeTaskSynchronously(String taskId) {
AtomicReference<CompletableFuture<String>> taskRef = tasks.computeIfAbsent(taskId, k -> new AtomicReference<>());
CompletableFuture<String> task = taskRef.get();
if (task == null) {
task = startTask(taskId);
if (!taskRef.compareAndSet(null, task)) {
task = taskRef.get();
}
}
return task.join(); // 等待异步任务完成并返回结果
}
// 异步执行任务
public CompletableFuture<String> executeTaskAsynchronously(String taskId) {
AtomicReference<CompletableFuture<String>> taskRef = tasks.computeIfAbsent(taskId, k -> new AtomicReference<>());
CompletableFuture<String> task = taskRef.get();
if (task == null) {
task = startTask(taskId);
if (!taskRef.compareAndSet(null, task)) {
task = taskRef.get();
}
}
return task;
}
// 实际启动任务
private CompletableFuture<String> startTask(String taskId) {
return CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire(); // 获取许可,最多只能有5个任务同时执行
// 模拟耗时任务
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result for task " + taskId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Task interrupted";
} finally {
semaphore.release(); // 释放许可,允许下一个任务执行
tasks.remove(taskId);
}
});
}
}
代码说明:
ConcurrentHashMap
:tasks
是一个ConcurrentHashMap
,其中的每个任务都有一个唯一的taskId
作为键。每个任务的执行状态用AtomicReference<CompletableFuture<String>>
表示。Semaphore
:Semaphore
初始化为 5,表示最多允许 5 个任务同时执行。如果一个任务完成了,会释放许可,允许下一个任务执行。- 同步调用 (
executeTaskSynchronously
): 同步调用会获取任务的CompletableFuture
,如果任务还未启动,则启动任务并存储在AtomicReference
中,然后等待任务完成并返回结果。 - 异步调用 (
executeTaskAsynchronously
): 异步调用与同步调用类似,只是不等待任务完成,而是直接返回CompletableFuture
,以便调用者在未来某个时间点处理结果。 - 任务完成后清理: 当任务完成后,
tasks
中与该任务相关的记录会被移除,以便下次可以重新启动任务。
每次看到你的文章,我都觉得时间过得好快。http://www.xatbgg.com