CSV Export with Postgres, Ecto and Phoenix

OpenSubs pagination

I’ve been working on an export feature for opensubs.io so that users can export their payments as a CSV file and manipulate them as they please.

On the Ruby land, I’ve accomplished this using Postgres COPY feature to output the result as CSV and wrapping it in a Lazy Enumerable. Then on the controller, I was consuming the Enumerable and streaming the data via HTTP as it was being queried.

Here’s how I did it using Ecto and Phoenix.

CSV export with Postgres and Ecto

Given that the final output is a CSV file, we can take the most out of PostgreSQL and use the COPY feature. COPY allows us to give a query and pipe the result the output to STDOUT formatted as CSV. Here’s an example:

db=# COPY (
    SELECT id, name, amount, amount_currency, cycle, first_bill_date, type, type_description
    FROM subscriptions
    WHERE archived = false
    AND user_id = 1
) to STDOUT WITH CSV DELIMITER ',';

1,Netflix,599,GBP,monthly,2017-12-02 00:00:00,card,Monzo
2,Github,6000,GBP,yearly,2017-02-23 00:00:00,card,Monzo
...

To use Postgre’s COPY feature with Ecto, we have to create a custom query. To run custom queries with Ecto, we need to use the SQL adapter directly and depending on the solution it can be either Ecto.Adapters.SQL.query/3 or Ecto.Adapters.SQL.stream/4.

Ecto.Adapters.SQL.query/3 executes the full query without any concerns of batching the query so it gives us the whole result of the query. If we have a table with millions of records this will be a problem because we’ll need to hold millions of records in memory.

Ecto.Adapters.SQL.stream/4 returns a stream which is lazy and it’s not going to be executed until we tell it so. When executed, Ecto will query the database in batches (500 by default). This means that we don’t need to hold all the records in memory and we can process them in batches.

Build export stream with Ecto

def build_export_query(user, batch_size \\ 500)
  columns = ~w(id name amount amount_currency cycle first_bill_date type type_description)

  query = """
    COPY (
      SELECT #{Enum.join(columns, ",")}
      FROM subscriptions
      WHERE archived = false
      AND user_id = #{user.id}
    ) to STDOUT WITH CSV DELIMITER ',';
  """

  csv_header = [Enum.join(columns, ","), "\n"]

  Ecto.Adapters.SQL.stream(Repo, query, [], max_rows: batch_size)
  |> Stream.map(&(&1.rows))
  |> (fn stream -> Stream.concat(csv_header, stream) end).()
end

When Ecto executes the query, it returns a Postgrex.Result struct which encapsulates the result of the query and holds the list of results on rows.

To clean up the result metadata we transform the stream with Stream.map to take only rows for the next transformation.

Streaming via HTTP with Phoenix

When building a usual HTTP response, we create the response body, set the response content type (e. g. application/json the body is json), set the HTTP status code, e. g. 200, and deliver the response to the client. Given that we are working with batches of data, we have to tell the client that our response is not going to be delivered in full but in small chunks.

Prepare the Phoenix response

conn =
  conn
  |> put_resp_header("content-disposition", "attachment; filename=payments.csv")
  |> put_resp_content_type("text/csv")
  |> send_chunked(200)

First, we set the content-disposition response header to tell the client that the response is an attachment with the name payments.csv.

Then we set the content-type as text/csv which identifies the content of our response.

Finally, we call send_chunked/2 which sets transfer-encoding response header as chunked and indicates that the client needs to keep an HTTP connection open to receive all chunks of the body. The client will close the connection when a zero-length chunk is sent by the server.

This combination of headers will make a browser show the download dialog and start the download.

Streaming chunks of data

def export(conn, _params) do
  current_user = get_current_user()

  {:ok, conn} =
    Repo.transaction(fn ->
      build_export_query(current_user)
      |> Enum.reduce_while(conn, fn (data, conn) ->
      case chunk(conn, data) do
        {:ok, conn} ->
          {:cont, conn}
        {:error, :closed} ->
          {:halt, conn}
      end
    end)

  conn
end

Ecto streams can only be executed inside a transaction. This can be seen as a limitation given that it does not give much flexibility when organizing the code since we cannot have a clear separation of concerns. Also, depending on the amount of data it’s been exported, the timeout option might need to be set on the transaction. It defaults to 15 seconds which might not be enough.

To run the stream and start getting the data we use Enum.reduce_while/3. This function allows us to stop the stream execution when something goes wrong. It’s the best approach given that we are dealing with a side effect.

On my first solution, I was using Enum.into/2 to traverse the stream into conn. Enum.into/2 expects a Collectable as the second argument and a small visit to Plug’s source code shows that Plug.Conn implements the Collectable protocol which calls Plug.Conn.chunk/2 to stream the chunk of data to the response. However, there’s a problem with it as it does not expect things to go wrong. If the client closes a connection - e.g., the user canceled the download, an exception will be raised, and the application fails. Because of that, I went for Enum.reduce_while/3.

I’ve opened an issue on Plug’s Github talking about that problem, and the resolution will probably be deprecating Plug.Conn implementation the Collectable protocol.

Wrapping up

I find this combination a pretty good approach for exporting data from the database and providing a downloadable file to the users. We can easily export huge amounts of data given that only a chunk of it is in memory at a time.

On the downside, I found a bit difficult to untangle different concerns. I would be happy to have Ecto logic being separated from Phoenix logic. I was able to accomplish that using a GenStage process to handle the database side of things and where chunks are streamed to a consumer on a Phoenix Controller. It allowed me to split concerns as I wanted but everything got a bit more complex. It was an interesting approach, but I’ll talk about it later.