java mail 多線程處理大量收件人,並將發送結果儲存到數據庫

前言

以前用java mail發送郵件,都是分給每一個郵件一個線程,在郵件發送成功後,由該子線程將mail的信息(發送成功的郵箱和未發送的郵箱)存儲到數據庫中。java

如今須要處理一封郵件有上萬收件人的狀況,若是還按照以前每一個mail一個線程,發送的效率過低了,所以須要將一封郵件分到多個線程中去執行,讓每一個子線程處理一部分收件人,可是子線程執行完成後更新mail的信息,會出現數據覆蓋的狀況。數據庫

若是每一個子線程執行完後能將發送郵件的信息返回給主線程,那麼咱們就能夠在全部子線程結束後再存儲mail的信息了。ide

Java Callable

Runnable任務不返回任何值,若是你但願在任務完成時可以返回一個值,那麼能夠實現Callable接口而不是Runnable接口,Callable是一種具備類型參數的泛型,它的類型參數表示的是從方法call()中返回的值,而且必須使用ExecutorService.submit()方法調用它。函數

public class TaskWithResult implements Callable<String>{
    @Override
    public String call() throws Exception {
        return "result";
    }
}

public class TaskDemo{
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        TaskWithResult task = new TaskWithResult();
        Future<String> future = exec.submit(task);
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
/* Output:
    result
*/

submit()方法會產生Future對象,你能夠用isDone()來查詢Future是否已經完成,任務完成時,能夠用get()方法獲取任務的返回值,若是任務沒有完成,調用get()方法會阻塞主線程。this

代碼實現

在獲取返回結果時,get()會阻塞主線程,爲了使發送郵件的函數不被阻塞,咱們須要新建立一個線程來運行發送郵件的子線程。
類Mailer爲實現了Callable的發送郵件的具體實現的代碼。Mail爲郵件的實體。線程

public class MailerTask extends Thread {
    private Mail mail;//郵件信息
    private String sender;//發件人信息
    //每一個線程發送郵件的最大數量
    private static final int mail_limit = 100;
    
    public static void send(Mail mail, String sender){
        MailerTask mailerTask = new MailerTask();
        mailerTask.setMail(mail);
        mailerTask.setSender(sender);
        mailerTask.start();

    }
    @Override
    public void run() {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        // 去除重複的收件人
        List<String> sendTos = Arrays.stream(mail.getSendTo().split(";")).distinct().collect(Collectors.toList());
        // 存儲發送失敗的收件人
        String sendTo = "";
        // 存儲發送成功的收件人
        String sended = "";
        // 記錄發送郵件的次數
        int sendTimes = 0;
        Mailer.setSender(sender);
        List<Future<List<String>>> futures = new ArrayList<>();
        // 每100個收件人建立一個線程用來發送郵件
        for (int i = 0; i <= sendTos.size()/mail_limit; i++ ){
            List<String> subSendTos = sendTos.stream().skip(i*mail_limit).limit(mail_limit).collect(Collectors.toList());
            String subSendTo = subSendTos.stream().collect(Collectors.joining(";"));
            mail.setSendTo(subSendTo);
            Mailer mailer = new Mailer();
            mailer.setMail(mail);
            Future<List<String>> result = exec.submit(mailer);
            futures.add(result);
        }
        // 處理結果
        for (Future<List<String>> future : futures){
            try {
                String subSendTo = future.get().get(0);
                String subSended = future.get().get(1);
                String subtimes = future.get().get(2);
                if (subSendTo != "") {
                    sendTo = sendTo + subSendTo + ";";
                }
                if (subSended != "" && subSended != null) {
                    sended += subSended;
                    sended += ";";
                }
                sendTimes += Integer.valueOf(subtimes);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        exec.shutdown();
        mail.setSendTo(sendTo);
        mail.setSended(sended);
        mail.setSendTimes(sendTimes);
        mail.setFinishDate(sendTo.isEmpty() ? new Date() : null);
        MailService mailService = (MailService) ServiceFactory.getSpringBean("mailService");
        mailService.saveMail(mail);

    }

    public void setSender(String sender) {
        this.sender = sender;
    }

    public void setMail(Mail mail) {
        this.mail = mail;
    }
}
相關文章
相關標籤/搜索