我们有一个旧版应用程序,它从mongo读取每个用户的数据(根据用户请求,查询结果从小到大),并且我们的应用程序为每个用户创建一个文件,并放置到FTP服务器/ s3中。我们以mongo游标的形式读取数据,并在获取批处理数据后立即将每个批处理写入文件,因此文件写入性能不错。该应用程序很好用,但绑定到mongo和mongo游标。

现在我们必须重新设计此应用程序,因为我们必须支持不同的数据源,即MongoDB,Postgres DB,Kinesis,S3等。到目前为止,我们已经想到了以下想法:


为每个源构建数据API并公开分页的REST响应。这是一个可行的解决方案,但对于大型企业来说可能会很慢
查询数据与当前光标响应进行比较。
通过在kafka中馈送批处理数据并在我们的文件生成器中读取批处理数据流来构建数据抽象层,但是大多数时候用户要求排序的数据,因此我们需要按顺序读取消息。在写入文件之前,我们将失去高吞吐量和大量额外工作来组合这些数据消息的好处。


我们正在寻找一种解决方案来替换当前的mongo游标,并使我们的文件生成器独立于数据源。

最佳答案

因此,听起来好像您本质上是想创建一个API,在其中您可以尽可能地保持流传输的效率,就像在读取用户数据时编写文件一样。

在这种情况下,您可能想为ReadSource定义一个推式解析器API,它将把数据流式传输到WriteTarget,并将数据写入您要实现的任何内容。排序将在事物的ReadSource一侧进行处理,因为对于某些来源,您可以按有序的方式进行读取(例如从数据库中读取);对于那些您无法执行此操作的源,您可能只需要执行一个中间步骤即可对数据进行排序(例如写入临时表),然后将其流式传输到WriteTarget

一个基本的实现可能看起来像这样:

public class UserDataRecord {
    private String data1;
    private String data2;

    public String getRecordAsString() {
        return data1 + "," + data2;
    }
}




public interface WriteTarget<Record> {
    /** Write a record to the target */
    public void writeRecord(Record record);

    /** Finish writing to the target and save everything */
    public void commit();

    /** Undo whatever was written */
    public void rollback();
}




public abstract class ReadSource<Record> {
    protected final WriteTarget<Record> writeTarget;

    public ReadSource(WriteTarget<Record> writeTarget) { this.writeTarget = writeTarget; }

    public abstract void read();
}




import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class RelationalDatabaseReadSource extends ReadSource<UserDataRecord> {
    private Connection dbConnection;

    public RelationalDatabaseReadSource (WriteTarget<UserDataRecord> writeTarget, Connection dbConnection) {
        super(writeTarget);
        this.dbConnection = dbConnection;
    }

    @Override public void read() {
        // read user data from DB and encapsulate it in a record
        try (Statement statement = dbConnection.createStatement();
                ResultSet resultSet = statement.executeQuery("Select * From TABLE Order By COLUMNS");) {
            while (resultSet.next()) {
                UserDataRecord record = new UserDataRecord();
                // stream the records to the write target
                writeTarget.writeRecord(record);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}




import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
public class FileWriteTarget implements WriteTarget<UserDataRecord> {
    private File fileToWrite;
    private PrintWriter writer;

    public FileWriteTarget(File fileToWrite) throws IOException {
        this.fileToWrite = fileToWrite;
        this.writer = new PrintWriter(new FileWriter(fileToWrite));
    }

    @Override public void writeRecord(UserDataRecord record) {
        writer.println(record.getRecordAsString().getBytes(StandardCharsets.UTF_8));
    }

    @Override public void commit() {
        // write trailing records
        writer.close();
    }

    @Override
    public void rollback() {
        try { writer.close(); } catch (Exception e) { }
        fileToWrite.delete();
    }
}




这只是总体思路,需要认真改进。
任何人都可以随时更新此API。

10-08 19:10