背景:存在一个耗时的任务,优先使用异步方式进行调用,但是有时有需要同步调用等待获取任务执行结果。

这里存在下面几个问题:

  • 同一任务已经异步提交执行了,现在需要同步执行等待结果
  • 同一任务正在同步执行等待结果,后面后进行了异步重复提交,需要确保不会重复调用该任务
  • 系统资源有效,耗时任务不能无限的添加执行

解决方案:

  1. 可以使用CompletableFutureAtomicReference来保存异步调用的结果,并在同步调用时检查是否有正在进行的任务。如果有,等待结果;如果没有,启动任务。
  2. 使用Semaphore来控制同时能有多少个任务可执行

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

public class TaskManager {
    private final ConcurrentHashMap<String, AtomicReference<CompletableFuture<String>>> tasks = new ConcurrentHashMap<>();
    private final Semaphore semaphore = new Semaphore(5);  // 限制最多同时执行5个任务

    // 同步执行任务
    public String executeTaskSynchronously(String taskId) {
        AtomicReference<CompletableFuture<String>> taskRef = tasks.computeIfAbsent(taskId, k -> new AtomicReference<>());
        CompletableFuture<String> task = taskRef.get();

        if (task == null) {
            task = startTask(taskId);
            if (!taskRef.compareAndSet(null, task)) {
                task = taskRef.get();
            }
        }

        return task.join();  // 等待异步任务完成并返回结果
    }

    // 异步执行任务
    public CompletableFuture<String> executeTaskAsynchronously(String taskId) {
        AtomicReference<CompletableFuture<String>> taskRef = tasks.computeIfAbsent(taskId, k -> new AtomicReference<>());
        CompletableFuture<String> task = taskRef.get();

        if (task == null) {
            task = startTask(taskId);
            if (!taskRef.compareAndSet(null, task)) {
                task = taskRef.get();
            }
        }

        return task;
    }

    // 实际启动任务
    private CompletableFuture<String> startTask(String taskId) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();  // 获取许可,最多只能有5个任务同时执行
                // 模拟耗时任务
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "Result for task " + taskId;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "Task interrupted";
            } finally {
                semaphore.release();  // 释放许可,允许下一个任务执行
                tasks.remove(taskId);
            }
        });
    }
}

代码说明:

  1. ConcurrentHashMap: tasks 是一个 ConcurrentHashMap,其中的每个任务都有一个唯一的 taskId 作为键。每个任务的执行状态用 AtomicReference<CompletableFuture<String>> 表示。
  2. Semaphore: Semaphore 初始化为 5,表示最多允许 5 个任务同时执行。如果一个任务完成了,会释放许可,允许下一个任务执行。
  3. 同步调用 (executeTaskSynchronously): 同步调用会获取任务的 CompletableFuture,如果任务还未启动,则启动任务并存储在 AtomicReference 中,然后等待任务完成并返回结果。
  4. 异步调用 (executeTaskAsynchronously): 异步调用与同步调用类似,只是不等待任务完成,而是直接返回 CompletableFuture,以便调用者在未来某个时间点处理结果。
  5. 任务完成后清理: 当任务完成后,tasks 中与该任务相关的记录会被移除,以便下次可以重新启动任务。

标签: java

仅有一条评论

  1. 每次看到你的文章,我都觉得时间过得好快。http://www.xatbgg.com

添加新评论