Parser na fonte de dados em CSV para inserir dados no banco

6 minute read

As dificuldades de lidar com arquivos .csv

Inicialmente, eu criei o Parser utilizando apenas ferramentas do Elixir, porem, acabei adicionando a Lib NimbleCSV escrita pelo próprio Valim, que é muito eficiente e basicamente não adiciona outras dependências indesejadas no projeto. Essa escolha ficara melhor entendida ao decorrer do artigo.

CSV pode parecer um formato simples ao primeiro olhar, mas esconde muitas complexidades, a começar pelo seu nome Comma Separated Values (Valores Separados por Virgula) quando na verdade vemos diversos separadores sendo utilizados em arquivos .csv, listarei os mais comuns abaixo:

  • RFC4180

    é o padrão quando o separador é a virgula

  • Planilhas digitais (Spreadsheets)

    é comum ver a utilização de tabs, ponto e virgula(;) e pipes ( | ) em arquivos csv destinados a programas como o Excel, Numbers e etc. Além disso, o encoding “padrão” de arquivos com esse fim é utf-16.

Graças a benevolência de pessoas como o Valim e outros contribuidores, a lib utilizada já possui implementações dessas “especificações”, entretanto, não a utilizaremos por hora.

Parseando o arquivo

Ok, essa parte pode ser embaraçosa e deve haver muito espaço para melhorias, mas vamos seguir.

O ponto de entrar para iniciar a leitura do arquivo é a função privada parse do modulo Parser da nossa aplicação que também define uma implementação para a lib NimbleCSV.

defmodule Pep.Sources.Parser do
  NimbleCSV.define(CSVParser, separator: ";", escape: "\"")

... resto do modulo ...

end

Função parse:

defp parse(%{ano_mes: ano_mes, id: source_id} = _source) do
    ("priv/reports/" <> ano_mes <> "_PEP.csv")
    |> File.stream!()
    |> CSVParser.parse_stream()
    |> Stream.map(fn [
                       cpf,
                       nome,
                       sigla,
                       descr,
                       nivel,
                       regiao,
                       data_inicio,
                       data_fim,
                       data_carencia
                     ] ->
      %{
        id: "",
        cpf: :binary.copy(cpf),
        nome: :binary.copy(nome),
        sigla: :binary.copy(sigla),
        descr: :binary.copy(descr),
        nivel: :binary.copy(nivel),
        regiao: :binary.copy(regiao),
        data_inicio: :binary.copy(data_inicio),
        data_fim: :binary.copy(data_fim),
        data_carencia: :binary.copy(data_carencia),
        source_id: "",
        inserted_at: "",
        updated_at: ""
      }
    end)
    |> Stream.map(fn pep -> fix_enconding(pep) end)
    |> Stream.map(fn pep -> %{pep | source_id: source_id} end)
    |> Stream.map(fn pep -> %{pep | id: UUID.generate()} end)
    |> Stream.map(fn pep ->
      %{pep | inserted_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)}
    end)
    |> Stream.map(fn pep ->
      %{pep | updated_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)}
    end)
    |> Stream.map(fn %{cpf: cpf} = pep -> %{pep | cpf: sanitize_cpf(cpf)} end)
    |> Enum.to_list()
  end

A função recebe um Map com o período desejado para ser analisado e o ID da fonte que originou esse arquivo (Vimos essa parte no artigo anterior). Ela é responsável por transformar cada linha do arquivo em um Map contendo todas as informações necessárias para a inserção no banco, incluindo colunas que o banco normalmente preenche automaticamente, como o ID, inserted_at e updated_at, adiante ficará claro o motivo disto.

Alguns pontos importantes aqui sāo:

  • Utilização de Stream em vez de funções Enum

    Streams possuem uma abordagem lazy, o que as tornam perfeitas em momentos que você quer fazer diversas alterações em seus dados sem comprometer a performance.

  • :binary_copy

    Utilizamos esta função do Erlang para garantir que o conteúdo que sera persistido seja realmente a copia binaria, não uma referencia. Se não planeja guardar os dados, não vejo necessidade de utilizar essa função

  • Função fix_encoding

    Como dito antes, arquivos CSV podem ser complicados, e este em questão estava com um encoding diferente, tendo que ser tratado com a função escrita abaixo

    defp fix_enconding(pep) do
        pep
        |> Stream.map(fn {key, value} -> {key, latin1_to_utf8(value)} end)
        |> Enum.into(%{})
      end
      
      defp latin1_to_utf8(binary),
        do:
          :unicode.characters_to_binary(
            binary,
            :latin1,
            :utf8
          )
    

Inserindo no Banco de dados

A função publica deste modulo na verdade é a import_to_db (me ocorre agora de refatorar-la para outro modulo), que é responsável por inserir todas as linhas do arquivo analisado no banco de dados.

def import_to_db(ano_mes) do
    Task.start_link(fn -> parse_import_to_db(ano_mes) end)
  end

  defp parse_import_to_db(ano_mes) do
    source = Repo.get_by(Source, ano_mes: ano_mes)

    parse(source)
    |> Stream.chunk_every(5000)
    |> Enum.each(fn chunck -> Repo.insert_all(PepStruct, chunck) end)
  end

Utilizamos o modulo Task para iniciar essa tarefa de modo assíncrono, no modo fire and forget, pois ela pode demorar para processas e não queremos que o usuário fique bloqueado enquanto espera.

A função parse_import_to_db fica responsável por chamar o parse, pegar seu resultado e separar em diversos chunk (pedaços da lista) e então inserir cada pedaço de uma vez no banco de dados. Isso só é possível pois já preenchemos todas as colunas necessárias para inserção anteriormente.

Deste modo, posso dizer que a inserção no banco de dados (que era o maior gargalo, quando era feita uma a uma) passou a tomar um tempo muito pequeno, cerca de 5 segundos para inserir mais de 130 mil linhas analisadas no banco.

Alguns comentários

Meu objetivo com esse artigo é demonstrar as dificuldades técnicas que alguém que está aprendendo de forma auto-didata enfrente ao tentar fazer um projeto escalável por conta própria. Sei que o código pode ficar muito melhor, porém, a diferença de qualidade e principalmente perfomance entre o primeiro commit e o atual é enorme. Enfim, estou feliz com o resultado até então.

Projeto no Github