我需要将大量数据(一秒钟内可以获取高达1KB的数据)写入许多不同的文件(每个文件最多可以占用几个GiB的空间)。

在我的应用程序中,一个线程不断产生数据,当前我正在为每个文件创建一个线程来写入数据(创建新文件取决于生产者正在产生的输入数据的某些条件)。

我正在使用由BufferedWriter包装的FileWriter。最初,我尝试使用默认缓冲大小8KB进行编写。但是,由于这会使每秒的写入数增加,因此CPU消耗会迅速增加而不会下降。

因此,我现在将缓冲大小增加到50KB。
但这会使我的应用程序由于outOfMemory问题而崩溃。

当我对其进行概要分析时,我可以看到所有数据都以char数组的形式存储,该数组是由缓冲写入器创建的(记住每个文件一个缓冲写入器。我大约有400个文件)。

请在这里建议如何解决该问题。
我也想知道是否有替代方案或更好的方法来实现该要求。

我不能将缓冲区大小减小到小于50KB,因为这会使我的CPU占用率达到100%。

编辑:好的,我想我不需要添加代码,因为我可以很明确地要求(而且我与代码无关。没关系,只是我在寻找效率)。但是因为我的问题被否决了,所以我将代码包括在这里。

public class DataWriter {

      private LinkedBlockingQueue<MyDataObject> dataQueue = new LinkedBlockingQueue<>();
      private ExecutorService singleThread = Executors.newSingleThreadExecutor();
      private boolean isRunning = true;
      private Map<String, FileWriterThread> map = Collections.synchronizedMap(new HashMap<String, FileWriterThread>());

      public DataWriter() {
         singleThread.submit(new DataProcessor());
      }

      public void writeProducedData(MyDataObject object) {
          if (isRunning) {
             dataQueue.offer(object);
          }
      }

      public void stopWriting() {
          isRunning = false;
      }

      private class DataProcessor implements Runnable {

         @Override
         public void run() {
            while (isRunning) {

            MyDataObject obj = dataQueue.take();

            if (obj.getMapKey() == null) {
                FileWriterThread thread = new FileWriterThread();
                map.put(obj.getMapKey(), thread);
            }

            FileWriterThread thread = map.get(obj.getMapKey());
            thread.writeData(obj);
         }
        }
      }
    }


FileWriterThread类:

public class FileWriterThread {

       private ExecutorService singleThread = Executors.newSingleThreadExecutor();
       private FileWriter fileWriter;
       private BufferedWriter bufferedWriter;
       private LinkedBlockingQueue<MyDataObject> dataQueue = new LinkedBlockingQueue<>();

    public FileWriterThread() {
      singleThread.submit(new DataProcessor());
    }

    public void writeData(MyDataObject obj) {
     if (fileWriter == null) {
       createWriter(obj.getFileName());
     }
     dataQueue.offer(obj);
    }

    public void stopWriting() {
      // close the file writer and buffer writer gracefully
    }

    private void createWriter(String fileName) {
      try {
        fileWriter = new FileWriter(fileName, true);
        bufferedWriter = new BufferedWriter(fileWriter, 50);
      } catch (Exception e){}
    }

    private class DataProcessor implements Runnable {

       @Override
       public void run() {
         MyDataObject obj =  dataQueue.take();
         try {
           bufferedWriter.write(obj.toString());
         } catch(Exception e) {}
       }
    }
  }

最佳答案

拥有所有这些线程不会给您带来任何好处-整个事情是IO约束的,而不是CPU约束的(或者,如果您的CPU没有努力处理400个线程的话)。

在一个线程中,只需让您的生产者向适当的写入者写信即可:

   Map<String, Writer> writers = ...;

   private handleOutput(byte[] output, String key) {
        writers.get(key).write(output);
   }


如果您想分离您的关注点,也许有几个(不是数百个)写线程和一个生产线程,则用Queue分开。

09-10 06:25
查看更多