我需要将大量数据(一秒钟内可以获取高达1KB的数据)写入许多不同的文件(每个文件最多可以占用几个GiB的空间)。
在我的应用程序中,一个线程不断产生数据,当前我正在为每个文件创建一个线程来写入数据(创建新文件取决于生产者正在产生的输入数据的某些条件)。
我正在使用由BufferedWriter包装的FileWriter。最初,我尝试使用默认缓冲大小8KB进行编写。但是,由于这会使每秒的写入数增加,因此CPU消耗会迅速增加而不会下降。
因此,我现在将缓冲大小增加到50KB。
但这会使我的应用程序由于outOfMemory问题而崩溃。
当我对其进行概要分析时,我可以看到所有数据都以char数组的形式存储,该数组是由缓冲写入器创建的(记住每个文件一个缓冲写入器。我大约有400个文件)。
请在这里建议如何解决该问题。
我也想知道是否有替代方案或更好的方法来实现该要求。
我不能将缓冲区大小减小到小于50KB,因为这会使我的CPU占用率达到100%。
编辑:好的,我想我不需要添加代码,因为我可以很明确地要求(而且我与代码无关。没关系,只是我在寻找效率)。但是因为我的问题被否决了,所以我将代码包括在这里。
public class DataWriter {
private LinkedBlockingQueue<MyDataObject> dataQueue = new LinkedBlockingQueue<>();
private ExecutorService singleThread = Executors.newSingleThreadExecutor();
private boolean isRunning = true;
private Map<String, FileWriterThread> map = Collections.synchronizedMap(new HashMap<String, FileWriterThread>());
public DataWriter() {
singleThread.submit(new DataProcessor());
}
public void writeProducedData(MyDataObject object) {
if (isRunning) {
dataQueue.offer(object);
}
}
public void stopWriting() {
isRunning = false;
}
private class DataProcessor implements Runnable {
@Override
public void run() {
while (isRunning) {
MyDataObject obj = dataQueue.take();
if (obj.getMapKey() == null) {
FileWriterThread thread = new FileWriterThread();
map.put(obj.getMapKey(), thread);
}
FileWriterThread thread = map.get(obj.getMapKey());
thread.writeData(obj);
}
}
}
}
FileWriterThread类:
public class FileWriterThread {
private ExecutorService singleThread = Executors.newSingleThreadExecutor();
private FileWriter fileWriter;
private BufferedWriter bufferedWriter;
private LinkedBlockingQueue<MyDataObject> dataQueue = new LinkedBlockingQueue<>();
public FileWriterThread() {
singleThread.submit(new DataProcessor());
}
public void writeData(MyDataObject obj) {
if (fileWriter == null) {
createWriter(obj.getFileName());
}
dataQueue.offer(obj);
}
public void stopWriting() {
// close the file writer and buffer writer gracefully
}
private void createWriter(String fileName) {
try {
fileWriter = new FileWriter(fileName, true);
bufferedWriter = new BufferedWriter(fileWriter, 50);
} catch (Exception e){}
}
private class DataProcessor implements Runnable {
@Override
public void run() {
MyDataObject obj = dataQueue.take();
try {
bufferedWriter.write(obj.toString());
} catch(Exception e) {}
}
}
}
最佳答案
拥有所有这些线程不会给您带来任何好处-整个事情是IO约束的,而不是CPU约束的(或者,如果您的CPU没有努力处理400个线程的话)。
在一个线程中,只需让您的生产者向适当的写入者写信即可:
Map<String, Writer> writers = ...;
private handleOutput(byte[] output, String key) {
writers.get(key).write(output);
}
如果您想分离您的关注点,也许有几个(不是数百个)写线程和一个生产线程,则用
Queue
分开。