Efficiently Uploading Data Using CSV and JDBC

Data feeds, or bulk transfers of data from one system to another, are a common feature in many enterprise applications. However, they are often implemented inefficiently. First, the source system runs a nightly batch job that generates a (typically large) update file. Some time later, another job looks for this file and uploads it to the destination system, usually via FTP or something similar. Finally, a third job on the remote system looks for the uploaded file and imports it (assuming that it is available).

This sort of process is slow and error prone. The destination system must wait until the first two processes (generating the export file, then uploading it) are complete before it can even start. Because the second and third steps are time-dependent, they may fail if an unexpected condition caused any preceding task to run late.

A better alternative is to perform these operations in parallel. By streaming the output generated by the source system, the destination system can consume it as it is being produced, eliminating the redundant and time-consuming copy steps.

Example Export

For example, the following code simulates a batch process that exports a CSV document. It uses the WebServiceProxy and CSVEncoder classes from the Kilo framework to upload the simulated data to a RESTful web service. The string values passed to the encoder’s constructor represent the columns in the output file:

var webServiceProxy = new WebServiceProxy("POST", baseURL, path);

webServiceProxy.setRequestHandler(new WebServiceProxy.RequestHandler() {
    @Override
    public String getContentType() {
        return "text/csv";
    }

    @Override
    public void encodeRequest(OutputStream outputStream) throws IOException {
        var csvEncoder = new CSVEncoder(listOf("text1", "text2", "number1", "number2", "number3"));

        csvEncoder.write(new Rows(count), outputStream);
    }
});

webServiceProxy.setReadTimeout(120000);
webServiceProxy.setChunkSize(65536);

webServiceProxy.invoke();

The data is provided by the following class, which simply generates an arbitrary number of duplicate rows. In a real application, the data would most likely come from a relational database or something similar:

public static class Rows implements Iterable<Map<String, Object>> {
    private int count;

    private int i = 0;

    public Rows(int count) {
        this.count = count;
    }

    private Map<String, Object> row = mapOf(
        entry("text1", "abcdefghijklmnopqrstuvwxyz"),
        entry("text2", "ABCDEFG"),
        entry("number1", 123456),
        entry("number2", 101.05),
        entry("number3", 2002.0125)
    );

    @Override
    public Iterator<Map<String, Object>> iterator() {
        return new Iterator<>() {
            @Override
            public boolean hasNext() {
                return i < count;
            }

            @Override
            public Map<String, Object> next() {
                i++;

                return row;
            }
        };
    }
}

The resulting output would look something like this:

text1,text2,number1,number2,number3
"abcdefghijklmnopqrstuvwxyz","ABCDEFG",123456,101.05,2002.0125
"abcdefghijklmnopqrstuvwxyz","ABCDEFG",123456,101.05,2002.0125
"abcdefghijklmnopqrstuvwxyz","ABCDEFG",123456,101.05,2002.0125
...

Import Service

A service method for processing the exported data is shown below. Rather than reading the entire payload into memory up front, the method uses Kilo’s CSVDecoder class to obtain a cursor over the rows in the document. As each record is read, it is coerced to an instance of Row and inserted into the database:

public interface Row {
    String getText1();
    String getText2();
    Double getNumber1();
    Double getNumber2();
    Double getNumber3();
}
@RequestMethod("POST")
@ResourcePath("upload")
public void upload(Void body) throws SQLException, IOException {
    var queryBuilder = new QueryBuilder();

    queryBuilder.appendLine("insert into bulk_upload_test (text1, text2, number1, number2, number3)");
    queryBuilder.appendLine("values (:text1, :text2, :number1, :number2, :number3)");

    try (var statement = queryBuilder.prepare(getConnection())) {
        var csvDecoder = new CSVDecoder();

        for (var row : csvDecoder.iterate(getRequest().getInputStream())) {
            queryBuilder.executeUpdate(statement, new BeanAdapter(BeanAdapter.coerce(row, Row.class)));
        }
    }
}

Batch Updates

Unfortunately, even though this service efficiently consumes the data provided by the client, it is fairly slow. Importing 25,000 records takes about 100 seconds. The solution is to insert the records in batches, as shown below:

private static final int BATCH_SIZE = 25000;

...

@RequestMethod("POST")
@ResourcePath("upload-batch")
public void uploadBatch(Void body) throws SQLException, IOException {
    var queryBuilder = new QueryBuilder();

    queryBuilder.appendLine("insert into bulk_upload_test (text1, text2, number1, number2, number3)");
    queryBuilder.appendLine("values (:text1, :text2, :number1, :number2, :number3)");

    try (var statement = queryBuilder.prepare(getConnection())) {
        var i = 0;

        var csvDecoder = new CSVDecoder();

        for (var row : csvDecoder.iterate(getRequest().getInputStream())) {
            queryBuilder.addBatch(statement, new BeanAdapter(BeanAdapter.coerce(row, Row.class)));

            if (++i % BATCH_SIZE == 0) {
                statement.executeBatch();
            }
        }

        statement.executeBatch();
    }
}

This method is nearly identical to the previous version. However, instead of executing a database update for each row, the updates are batched and executed once every 25,000 rows. The service can now process 500,000 rows in about 25 seconds – 20 times as many in a quarter of the time.

Additional Information

Data feeds are a common element in many enterprise systems, and are often implemented inefficiently. However, by streaming imports and using batch updates, performance can be significantly improved.

The complete source code for this example can be found here:

For more information, see the Kilo README.