标签 java 下的文章

背景:存在一个耗时的任务,优先使用异步方式进行调用,但是有时有需要同步调用等待获取任务执行结果。

这里存在下面几个问题:

  • 同一任务已经异步提交执行了,现在需要同步执行等待结果
  • 同一任务正在同步执行等待结果,后面后进行了异步重复提交,需要确保不会重复调用该任务
  • 系统资源有效,耗时任务不能无限的添加执行

解决方案:

  1. 可以使用CompletableFutureAtomicReference来保存异步调用的结果,并在同步调用时检查是否有正在进行的任务。如果有,等待结果;如果没有,启动任务。
  2. 使用Semaphore来控制同时能有多少个任务可执行

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

public class TaskManager {
    private final ConcurrentHashMap<String, AtomicReference<CompletableFuture<String>>> tasks = new ConcurrentHashMap<>();
    private final Semaphore semaphore = new Semaphore(5);  // 限制最多同时执行5个任务

    // 同步执行任务
    public String executeTaskSynchronously(String taskId) {
        AtomicReference<CompletableFuture<String>> taskRef = tasks.computeIfAbsent(taskId, k -> new AtomicReference<>());
        CompletableFuture<String> task = taskRef.get();

        if (task == null) {
            task = startTask(taskId);
            if (!taskRef.compareAndSet(null, task)) {
                task = taskRef.get();
            }
        }

        return task.join();  // 等待异步任务完成并返回结果
    }

    // 异步执行任务
    public CompletableFuture<String> executeTaskAsynchronously(String taskId) {
        AtomicReference<CompletableFuture<String>> taskRef = tasks.computeIfAbsent(taskId, k -> new AtomicReference<>());
        CompletableFuture<String> task = taskRef.get();

        if (task == null) {
            task = startTask(taskId);
            if (!taskRef.compareAndSet(null, task)) {
                task = taskRef.get();
            }
        }

        return task;
    }

    // 实际启动任务
    private CompletableFuture<String> startTask(String taskId) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();  // 获取许可,最多只能有5个任务同时执行
                // 模拟耗时任务
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "Result for task " + taskId;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "Task interrupted";
            } finally {
                semaphore.release();  // 释放许可,允许下一个任务执行
                tasks.remove(taskId);
            }
        });
    }
}

代码说明:

  1. ConcurrentHashMap: tasks 是一个 ConcurrentHashMap,其中的每个任务都有一个唯一的 taskId 作为键。每个任务的执行状态用 AtomicReference<CompletableFuture<String>> 表示。
  2. Semaphore: Semaphore 初始化为 5,表示最多允许 5 个任务同时执行。如果一个任务完成了,会释放许可,允许下一个任务执行。
  3. 同步调用 (executeTaskSynchronously): 同步调用会获取任务的 CompletableFuture,如果任务还未启动,则启动任务并存储在 AtomicReference 中,然后等待任务完成并返回结果。
  4. 异步调用 (executeTaskAsynchronously): 异步调用与同步调用类似,只是不等待任务完成,而是直接返回 CompletableFuture,以便调用者在未来某个时间点处理结果。
  5. 任务完成后清理: 当任务完成后,tasks 中与该任务相关的记录会被移除,以便下次可以重新启动任务。

Java Mail发送邮件email 支持SSL加密、TSL加密

类型服务名称服务器地址SSL协议端口非SSL协议端口TSL协议端口
收件服务器POPpop.163.com995110
收件服务器IMAPmap.163.com993143
发件服务器SMTPsmtp.163.com465/99425587

POM依赖:
注意:不要依赖javax.mail javax.mail-api 1.6.2,不然会提示:java.lang.NoClassDefFoundError: com/sun/mail/util/MailLogger

<dependency>
    <groupId>com.sun.mail</groupId>
    <artifactId>javax.mail</artifactId>
    <version>1.6.2</version>
</dependency>

Java Mail代码如下:

/**
 * 发送邮件
 *
 * @param toEmails    收件人列表
 * @param ccEmails    抄送人列表
 * @param subject     主题
 * @param htmlContent 邮件正文
 * @param files       附件。Key:附件的文件 Value:显示文件名
 */
public void sendByJavaMail(String[] toEmails, String[] ccEmails, String subject, String htmlContent, Map<String, String> files) {
    // 获取邮件配置
    final String mailServerUsername = "{你的邮箱地址,例如:xxx@163.com}",
        mailServerPassword = "{你的邮箱密码或授权密码}",      
        mailFrom = "{你的邮箱发件人地址,例如:xxx@163.com}",
        mailFromDisplayName = "{希望收件人看到的发件人显示名称,例如:报警服务}";
    if (Strings.isBlank(mailServerUsername) || Strings.isBlank(mailServerPassword) || Strings.isBlank(mailFrom)) {
        throw new ApiException("Email sending service is not set up, please contact the platform if there are any issues.");
    }
    // 设置邮箱服务属性
    Properties props = new Properties();
    props.setProperty("mail.smtp.host", "smtp.163.com");
    props.setProperty("mail.smtp.auth", "true");
    // "mail.smtp.host":"smtp.gmail.com" "mail.smtp.auth":"true" "mail.smtp.port":"587" "mail.smtp.starttls.enable":"true" "mail.smtp.ssl.protocols":"TLSv1.2"
    // TLS需要内容项:"mail.smtp.starttls.enable":"true" "mail.smtp.port":"587"
    // SSL需要内容项:"mail.smtp.ssl.enable":"true" "mail.smtp.port":"465" "mail.smtp.socketFactory.class":"javax.net.ssl.SSLSocketFactory"
    props.setProperty("mail.smtp.ssl.enable", "true");
    props.setProperty("mail.smtp.port", "465");
    props.setProperty("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");    

    // 获取默认的 Session 对象。
    Session session = Session.getInstance(props, new Authenticator() {
        @Override
        protected PasswordAuthentication getPasswordAuthentication() {
            return new PasswordAuthentication(mailServerUsername, mailServerPassword);
        }
    });
    // 是否配置debug打印日志信息
    session.setDebug(true);
    try {
        // 创建默认的 MimeMessage 对象。
        MimeMessage message = new MimeMessage(session);
        // 发送方邮箱
        Address fromAddress = Strings.isBlank(mailFromDisplayName) ? new InternetAddress(mailFrom) : new InternetAddress(mailFrom, mailFromDisplayName, "utf-8");
        message.setFrom(fromAddress);
        // 接收方邮箱
        for (String toEmail : toEmails) {
            message.addRecipient(Message.RecipientType.TO, new InternetAddress(toEmail));
        }
        // 抄送邮件列表
        if (null != ccEmails && ccEmails.length > 0) {
            for (String ccEmail : ccEmails) {
                message.addRecipient(Message.RecipientType.CC, new InternetAddress(ccEmail));
            }
        }
        // 设置主题
        message.setSubject(subject);

        /* 消息部分 */
        // 正文
        Multipart multipart = new MimeMultipart();
        // HTML消息
        BodyPart htmlBodyPart = new MimeBodyPart();
        htmlBodyPart.setContent(htmlContent, "text/html;charset=UTF-8");
        multipart.addBodyPart(htmlBodyPart);
        // 设置附件
        if (null != files && !files.isEmpty()) {
            for (Map.Entry<String, String> entry : files.entrySet()) {
                final String file = entry.getKey(), filename = entry.getValue();
                BodyPart fileBodyPart = new MimeBodyPart();
                DataSource source = new FileDataSource(file);
                fileBodyPart.setDataHandler(new DataHandler(source));
                // fileBodyPart.setFileName(filename);
                fileBodyPart.setFileName(MimeUtility.encodeText(filename, "UTF-8", "B"));
                multipart.addBodyPart(fileBodyPart);
            }
        }
        message.setContent(multipart);
        // 发送消息
        log.debug(">>>>>>>>>>sendByJavaMail---sending--<<<<<<<<<<<<<<");
        Transport.send(message);
    } catch (Exception e) {
        log.error("", e);
    } finally {
        // 根据业务需求,是否删除附件的文件
        if (null != files && !files.isEmpty()) {
            for (Map.Entry<String, String> entry : files.entrySet()) {
                Files.deleteQuietly(new File(entry.getKey()));
            }
        }
    }
}

Lombok介绍

Lombok 是一种 Java 实用工具,可用来帮助开发人员消除 Java 的冗长,尤其是对于简单的 Java 对象(POJO)。它通过注释实现这一目的。通过在开发环境中实现 Lombok,开发人员可以节省构建诸如 hashCode() 和 equals() 这样的方法以及以往用来分类各种 accessor 和 mutator 的大量时间。

SpringToolSuite4安装Lombok

下载 lombok.jar

可以通过下面地址进行下载:

https://repo1.maven.org/maven2/org/projectlombok/lombok/1.18.18/lombok-1.18.18.jar

拷贝lombok.jar至STS目录

目录:/Applications/SpringToolSuite4.app/Contents/MacOS

接着需要将lombok-1.18.18.jar重命名为lombok.jar
(下面截图名称没有改,实际操作时,请将lombok-1.18.18.jar改为lombok.jar)

7c2yiab8hz4.png

在STS路径运行命令

cd /Applications/SpringToolSuite4.app/Contents/MacOS

java -jar lombok.jar

- 阅读剩余部分 -

进程(process)和线程(thread)

实现并发最直接的方式是在操作系统级别使用进程。进程是运行在它自己的地址空间内的自包容的程序。
一个线程就是在进程中的一个单一的顺序控制流,单个进程可以拥有多个并发执行任务。

Runnable和Thread

通过Runnable定义任务

线程可以驱动任务,Java中是由Runnable接口提供,需要实现Runnable接口的run()方法。任务run()方法通常总会有某种形式的循环,使得任务一直运行下去直到不再需要。通常run()被写成无限循环的形式。
例如:

public class LiftOff implements Runnable {
    protected int countDown = 10;    //倒计数
    private static int taskCount;
    private final int id = taskCount ++;
    
    public LiftOff() {
        super();
    }
    public LiftOff(int countDown) {
        super();
        this.countDown = countDown;
    }
    
    public String status() {
        return String.format("#%d(%s).", id, (countDown > 0) ? countDown : "LiftOff!");
    }

    @Override
    public void run() {
        while(countDown-- > 0) {
            System.out.print(status());
            Thread.yield();
        }
    }
}

静态方法Thread.yield()

Thread.yield()线程调度器的一种建议,暗示:我已经执行完生命周期中最重要的部分了,现在正是将CPU资源切换给其他任务执行一段时间的时机。

Thread类

Runnable对象转变为工作任务的传统方式是将它提交给一个Thread的构造器。
使用方法如下:

public class BasicThreads {
    public static void main(String[] args) {
        Thread t = new Thread(new LiftOff());
        t.start();
        System.out.println("Waiting for LiftOff");
    }
}

Executor和线程池

Executor是Java SE5引入的,用来管理Thread对象,从而简化并发编程。Executor是启动任务的优选方法

ExecutorService

ExecutorService是具有生命周期的Executor,会管理如何构建恰当的上下文来执行Runnable对象。非常常见的情况是:单个Executor用来创建和管理系统中所有的任务。
在任何线程池中,在现有的线程的可能情况下,都会被自动复用

CachedThreadPool

CachedThreadPool为每个任务都创建一个线程,使用方法:

public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i ++) {
            exec.execute(new LiftOff());
        }
        exec.shutdown();
    }
}

- 阅读剩余部分 -

在项目开发时,我们用的是本地搭建的开发dev环境,开发完成打包部署到服务器时,用到的是服务器prod环境。可以借用Mavenprofilesfiltersresources,在运行或打包时指定选用的环境,实现不同环境自动使用各自环境的配置文件或配置信息。

  • profiles:定义环境变量的id;
  • filters:定义了变量配置文件的地址,其中地址中的环境变量就是上面profile中定义的值;
  • resources:定义哪些目录下的文件会被配置文件中定义的变量替换,另外可以指定目录下的文件打包到classes目录下。

定义环境变量profiles

一般环境变量分:dev开发环境、prod发布环境,当然也可以类比添加其他的环境标志。
此处详细可参看:maven profile动态选择配置文件maven profile切换正式环境和测试环境

    <profiles>
        <!-- 开发测试环境 -->
        <profile>
            <id>dev</id>
            <activation>
                <!-- 设置默认激活dev环境的配置 -->
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <profile.env>dev</profile.env>
            </properties>
        </profile>
        <!-- 产品发布环境 -->
        <profile>
            <id>prod</id>
            <properties>
                <profile.env>prod</profile.env>
            </properties>
        </profile>
    </profiles>

- 阅读剩余部分 -

背景:在业务中,经常需要在执行数据库操作后(事务提交完成),发送消息或事件来异步调用其他组件执行相应的业务操作。
比如:用户注册成功后,发送激活码或激活邮件,如果用户保存后就执行异步操作发送激活码或激活邮件,但是前面用户保存后发生异常,数据库进行回滚,用户实际没有注册成功,但用户收到激活码或激活邮件。此时,我们就迫切要求数据库事务完成后再执行异步操作。

@Autowired
private UserDao userDao;
@Autowired
private JmsProducer jmsProducer;

public User saveUser(User user) {
    // 保存用户
    userDao.save(user);

    // 发送激活码或激活邮件
    jmsProducer.sendEmail(user.getId());
}

// -------------------------------------
public void sendEmail(int userId) {
    /*
     * 获取待接收邮件的用户。(如果上面的保存用户方法还未提交事务,则实际数据还未插入到数据库中,此时会返回null)
    */
    User user = userDao.get(userId);
    // 可能抛NullPointException
    String email = user.getEmail();
    mailService.send(email);
}

解决方案

1、Spring 4.2之后,使用注解@TransactionalEventListener

/**
 * 业务Service
 */
@Service
@Transactional
public class FooService {
    @Autowired
    private  ApplicationEventPublisher applicationEventPublisher;

    public User saveUser(User user) {
        userDao.save(user);
        // 注册事件
        applicationEventPublisher.publishEvent(new SavedUserEvent(user.getId()));
    }
}

// -------------------------------------
/**
 * 保存用户事件
 */
public class SavedUserEvent {
    private int userId;
    
    public SavedUserEvent(int userId) {
        this.userId = userId;
    }
    
    // getter and setter
}

// ---------------------------------
/**
 * 事件侦听,处理对应事件
 */
@Component
public class FooEventListener() {
    @Autowired
    private UserDao userDao;
    @Autowired
    private MailService mailService;

    @TransactionalEventListener
    public sendEmail(SavedUserEvent savedUserEvent) {
        User user = userDao.get(userId);
        String email = user.getEmail();
        mailService.send(email);
    }
}

2.使用TransactionSynchronizationManager 和 TransactionSynchronizationAdapter

@Autowired
private UserDao userDao;
@Autowired
private JmsProducer jmsProducer;

public User saveUser(User user) {
    // 保存用户
    userDao.save(user);
    final int userId = user.getId();

    // 事务提交后调用
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            jmsProducer.sendEmail(userId);
        }
    });
}

注意:上面的代码将在事务提交后执行.如果在非事务context中将抛出java.lang.IllegalStateException: Transaction synchronization is not active。
改进后代码:

@Autowired
private UserDao userDao;
@Autowired
private JmsProducer jmsProducer;

public User saveUser(User user) {
    // 保存用户
    userDao.save(user);
    final int userId = user.getId();

    // 兼容无论是否有事务
    if(TransactionSynchronizationManager.isActualTransactionActive()) {
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                jmsProducer.sendEmail(userId);
            }
        });
    } else {
        jmsProducer.sendEmail(userId);
    }
}

3.在上面2的基础上扩展TransactionSynchronizationAdapter

@Component("afterCommitExecutor")
public class AfterCommitExecutor extends TransactionSynchronizationAdapter implements Executor {
    private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
    private ThreadPoolExecutor threadPool;

    @PostConstruct
    public void init() {
        Logs.debug("初始化线程池。。。");
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        if (0 >= availableProcessors) {
            availableProcessors = 1;
        }
        int maxPoolSize = (availableProcessors > 5) ? availableProcessors * 2 : 5;
        Logs.debug("CPU Processors :%s MaxPoolSize:%s", availableProcessors, maxPoolSize);
        threadPool = new ThreadPoolExecutor(
            availableProcessors,
            maxPoolSize,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(maxPoolSize * 2),
            Executors.defaultThreadFactory(),
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    Logs.debug("Task:%s rejected", r.toString());
                    if (!executor.isShutdown()) {
                        executor.getQueue().poll();
                        executor.execute(r);
                    }
                }
            }
        );
    }

    @PreDestroy
    public void destroy() {
        Logs.debug("销毁线程池。。。");
        if (null != threadPool && !threadPool.isShutdown()) {
            threadPool.shutdown();
        }
    }

    @Override
    public void execute(@NotNull Runnable runnable) {
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
            runnable.run();
            return;
        }
        List<Runnable> threadRunnables = RUNNABLES.get();
        if (threadRunnables == null) {
            threadRunnables = new ArrayList<Runnable>();
            RUNNABLES.set(threadRunnables);
            TransactionSynchronizationManager.registerSynchronization(this);
        }
        threadRunnables.add(runnable);
    }

    @Override
    public void afterCommit() {
        Logs.debug("事务提交完成处理 ... ");
        List<Runnable> threadRunnables = RUNNABLES.get();
        for (int i = 0; i < threadRunnables.size(); i++) {
            Runnable runnable = threadRunnables.get(i);
            try {
                threadPool.execute(runnable);
            } catch (RuntimeException e) {
                Logs.error("", e);
            }
        }
    }

    @Override
    public void afterCompletion(int status) {
        Logs.debug("事务处理完毕 .... ");
        RUNNABLES.remove();
    }
}

业务层使用:

@Autowired
private UserDao userDao;
@Autowired
private JmsProducer jmsProducer;
@Autowired
private AfterCommitExecutor afterCommitExecutor;

public User saveUser(User user) {
    // 保存用户
    userDao.save(user);
    final int userId = user.getId();

    // 使用AfterCommitExecutor
    afterCommitExecutor.execute(new Runnable() {
        @Override
        public void run() {
            jmsProducer.sendEmail(userId);
        }
    });
  
}

参考文章

Spring的TransactionEventListener
Spring Event 事件中的事务控制
Java Code Examples org.springframework.transaction.support.TransactionSynchronizationAdapter
Spring Events | Baeldung
Thinking about IT: Transaction synchronization callbacks in Spring Framework

最近将之前关于Spring的书重新拿出来温故一番,并进行总结。

Bean容器

Spring的Bean容器其实不止一个,归结起来可以分为2类:

  • Bean工厂(由org.springframework.beans.factory.BeanFactory接口定义),是最简单的Bean容器,提供基本的DI支持;
  • 应用上下文(由org.springframework.context.ApplicationContext接口定义),基于BeanFactory构建,提供应用框架级别的服务。

常用应用上下文

  • AnnotationConfigApplicationContext:从一个或多个基于Java的配置类中加载Spring应用上下文;
  • AnnotationConfigWebApplicationContext:从一个或多个基于Java的配置类中加载Spring Web应用上下文;
  • ClassPathXmlApplicationContext:从类路径下的一个或多个Spring的XML配置文件中加载应用上下文;
  • FileSystemXmlApplicationContext:从文件系统下的一个或多个Spring的XML配置文件中加载应用上下文;
  • XmlWebApplicationContext:从Web应用下的一个或多个Spring的XML配置文件中加载应用上下文。

- 阅读剩余部分 -

XML文档树图

xml_nodetree.gif

<?xml version="1.0" encoding="utf-8"?>
<bookstore> 
  <book category="COOKING"> 
    <title lang="en">Everyday Italian</title>  
    <author>Giada De Laurentiis</author>  
    <year>2005</year>  
    <price>30.00</price> 
  </book>  
  <book category="CHILDREN"> 
    <title lang="en">Harry Potter</title>  
    <author>J K. Rowling</author>  
    <year>2005</year>  
    <price>29.99</price> 
  </book>  
  <book category="WEB"> 
    <title lang="en">Learning XML</title>  
    <author>Erik T. Ray</author>  
    <year>2003</year>  
    <price>39.95</price> 
  </book> 
</bookstore>

一个xml文档由元素节点,属性节点,文本节点构成,其中bookstore被称为文档元素或根元素,也是一个元素节点。XML 文档中的每个成分都是一个节点Node

- 阅读剩余部分 -

Intellij IDEA功能很强大,而且是越用越爱不释手的,下文用IDEA来表示:Intellij IDEA。

自动生成serialVersionUID

默认IDEA是关闭了继承Serializable接口类生成serialVersionUID的警告,如果要让IDEA自动生成,需要进行如下操作:

  1. Files → Settings → Editor → Inspections → Serializable class without 'serialVersionUID',勾选上。默认提醒级别是:Warning,可以根据需要设置成:Error。

IDEA_serialVersionUID.jpg

  1. 将光标放到类名上,按Atl + Enter键,就会提示生成serialVersionUID了。

关闭自动保存和标志修改文件为星号

IDEA默认自动保存文件,而且及时修改了文件也没有*标记。

- 阅读剩余部分 -