我正在尝试编写具有两种方法的批处理邮件服务:
add(Mail mail):可以发送邮件,由生产者调用
flushMailService():刷新服务。消费者应获取列表,然后调用另一种(昂贵的)方法。通常,仅在达到批处理大小后才调用昂贵的方法。

这有点类似于这个问题:
Producer/Consumer - producer adds data to collection without blocking, consumer consumes data from collection in batch

可以使用具有超时的poll()来做到这一点。但是,如果生产者不想等待超时,则生产者应该能够刷新该邮件服务,但会使生产者发送队列中的所有邮件。

poll(20, TimeUnit.SECONDS) 可以被打断。如果被中断,则无论批次大小是否达到,都应发送队列中的所有邮件,直到队列为空为止(使用 poll() ,如果队列为空,则立即返回null。一旦为空,则由然后,生产者应再次调用poll的阻塞版本,直到被其他任何生产者中断为止,依此类推。

这似乎适用于给定的实现。

我尝试将ExecutorServicesFutures一起使用,但是由于只有第一个中断被认为已取消,因此Future只能被中断一次。因此,我求助于可以多次中断的线程。

目前,我有以下实现似乎可行的实现(但正在使用“原始”线程)。

这是一种合理的方法吗?也许可以使用另一种方法?

public class BatchMailService {
   private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>();
   private CopyOnWriteArrayList<Thread> threads = new CopyOnWriteArrayList<>();
   private static Logger LOGGER = LoggerFactory.getLogger(BatchMailService.class);

   public void checkMails() {

        int batchSize = 100;
        int timeout = 20;
        int consumerCount = 5;

        Runnable runnable = () -> {
            boolean wasInterrupted = false;

            while (true) {
                List<Mail> buffer = new ArrayList<>();
                while (buffer.size() < batchSize) {
                    try {
                        Mail mail;
                        wasInterrupted |= Thread.interrupted();
                        if (wasInterrupted) {
                            mail = queue.poll(); // non-blocking call
                        } else {
                            mail = queue.poll(timeout, TimeUnit.SECONDS); // blocking call
                        }
                        if (mail != null) {  // mail found immediately, or within timeout
                            buffer.add(mail);
                        } else { // no mail in queue, or timeout reached
                            LOGGER.debug("{} all mails currently in queue have been processed", Thread.currentThread());
                            wasInterrupted = false;
                            break;
                        }
                    } catch (InterruptedException e) {
                        LOGGER.info("{} interrupted", Thread.currentThread());
                        wasInterrupted = true;
                        break;
                    }
                }
                if (!buffer.isEmpty()) {
                    LOGGER.info("{} sending {} mails", Thread.currentThread(), buffer.size());
                    mailService.sendMails(buffer);
                }
            }
        };

        LOGGER.info("starting 5 threads ");
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(runnable);
            threads.add(thread);
            thread.start();
        }

    }

    public void addMail(Mail mail) {
        queue.add(mail);
    }

    public void flushMailService() {
        LOGGER.info("flushing BatchMailService");
        for (Thread t : threads) {
            t.interrupt();
        }
    }
}

另一种没有中断,但有毒药的变体(Mail POISON_PILL = new Mail())的方法如下。如果只有一个使用者线程,则可能效果最好。至少,对于一种毒丸,只有一名消费者会继续服用。
Runnable runnable = () -> {
      boolean flush = false;
      boolean shutdown = false;

      while (!shutdown) {
           List<Mail> buffer = new ArrayList<>();
           while (buffer.size() < batchSize && !shutdown) {
               try {
                   Mail mail;
                   if (flush){
                       mail = queue.poll();
                       if (mail == null) {
                           LOGGER.info(Thread.currentThread() + " all mails currently in queue have been processed");
                           flush = false;
                           break;
                       }
                   }else {
                      mail = queue.poll(5, TimeUnit.SECONDS); // blocking call
                   }

                   if (mail == POISON_PILL){  // flush
                       LOGGER.info(Thread.currentThread() + " got flush");
                       flush = true;
                   }
                   else if (mail != null){
                       buffer.add(mail);
                   }
               } catch (InterruptedException e) {
                   LOGGER.info(Thread.currentThread() + " interrupted");
                   shutdown = true;
               }
           }
           if (!buffer.isEmpty()) {
               LOGGER.info(Thread.currentThread()+"{} sending " + buffer.size()+" mails");
               mailService.sendEmails(buffer);
           }
       }
    };

public void flushMailService() {
    LOGGER.info("flushing BatchMailService");
    queue.add(POISON_PILL);
}

最佳答案

如何使用信号并等待而不是中断?

生产者放置邮件并发信号通知是否需要冲洗。
分派(dispatch)器等待信号或超时,然后继续在使用者线程中发送电子邮件。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BatchMailService {

    private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>();

    public static final int BATCH_SIZE = 100;
    public static final int TIMEOUT = 20;
    public static final int CONSUMER_COUNT = 5;

    private final Lock flushLock = new ReentrantLock();
    private final Condition flushCondition = flushLock.newCondition();

    MailService mailService = new MailService();

    public void checkMails() {

        ExecutorService consumerExecutor = Executors.newFixedThreadPool(CONSUMER_COUNT);

        while (true) {

            try {
                // wait for timeout or for signal to come
                flushLock.lock();
                flushCondition.await(TIMEOUT, TimeUnit.SECONDS);

                // flush all present emails
                final List<Mail> toFLush = new ArrayList<>();
                queue.drainTo(toFLush);

                if (!toFLush.isEmpty()) {
                    consumerExecutor.submit(() -> {
                        LOGGER.info("{} sending {} mails", Thread.currentThread(), toFLush.size());
                        mailService.sendEmails(toFLush);
                    });
                }

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break; // terminate execution in case of external interrupt
            } finally {
                flushLock.unlock();
            }
        }

    }

    public void addMail(Mail mail) {

        queue.add(mail);

        // check batch size and flush if necessary
        if (queue.size() >= BATCH_SIZE) {

            try {
                flushLock.lock();
                if (queue.size() >= BATCH_SIZE) {
                    flushMailService();
                }
            } finally {
                flushLock.unlock();
            }
        }
    }

    public void flushMailService() {
        LOGGER.info("flushing BatchMailService");
        try {
            flushLock.lock();
            flushCondition.signal();
        } finally {
            flushLock.unlock();
        }
    }

}

关于java - 生产者/消费者具有批处理和冲洗功能,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37727746/

10-13 00:38