Parser na fonte de dados em CSV para inserir dados no banco
As dificuldades de lidar com arquivos .csv
Inicialmente, eu criei o Parser utilizando apenas ferramentas do Elixir, porem, acabei adicionando a Lib NimbleCSV escrita pelo próprio Valim, que é muito eficiente e basicamente não adiciona outras dependências indesejadas no projeto. Essa escolha ficara melhor entendida ao decorrer do artigo.
CSV pode parecer um formato simples ao primeiro olhar, mas esconde muitas complexidades, a começar pelo seu nome Comma Separated Values (Valores Separados por Virgula) quando na verdade vemos diversos separadores sendo utilizados em arquivos .csv, listarei os mais comuns abaixo:
RFC4180
é o padrão quando o separador é a virgula
Planilhas digitais (Spreadsheets)
é comum ver a utilização de tabs, ponto e virgula(;) e pipes ( | ) em arquivos csv destinados a programas como o Excel, Numbers e etc. Além disso, o encoding “padrão” de arquivos com esse fim é utf-16.
Graças a benevolência de pessoas como o Valim e outros contribuidores, a lib utilizada já possui implementações dessas “especificações”, entretanto, não a utilizaremos por hora.
Parseando o arquivo
Ok, essa parte pode ser embaraçosa e deve haver muito espaço para melhorias, mas vamos seguir.
O ponto de entrar para iniciar a leitura do arquivo é a função privada parse do modulo Parser da nossa aplicação que também define uma implementação para a lib NimbleCSV.
defmodule Pep.Sources.Parser do
NimbleCSV.define(CSVParser, separator: ";", escape: "\"")
... resto do modulo ...
end
Função parse:
defp parse(%{ano_mes: ano_mes, id: source_id} = _source) do
("priv/reports/" <> ano_mes <> "_PEP.csv")
|> File.stream!()
|> CSVParser.parse_stream()
|> Stream.map(fn [
cpf,
nome,
sigla,
descr,
nivel,
regiao,
data_inicio,
data_fim,
data_carencia
] ->
%{
id: "",
cpf: :binary.copy(cpf),
nome: :binary.copy(nome),
sigla: :binary.copy(sigla),
descr: :binary.copy(descr),
nivel: :binary.copy(nivel),
regiao: :binary.copy(regiao),
data_inicio: :binary.copy(data_inicio),
data_fim: :binary.copy(data_fim),
data_carencia: :binary.copy(data_carencia),
source_id: "",
inserted_at: "",
updated_at: ""
}
end)
|> Stream.map(fn pep -> fix_enconding(pep) end)
|> Stream.map(fn pep -> %{pep | source_id: source_id} end)
|> Stream.map(fn pep -> %{pep | id: UUID.generate()} end)
|> Stream.map(fn pep ->
%{pep | inserted_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)}
end)
|> Stream.map(fn pep ->
%{pep | updated_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)}
end)
|> Stream.map(fn %{cpf: cpf} = pep -> %{pep | cpf: sanitize_cpf(cpf)} end)
|> Enum.to_list()
end
A função recebe um Map com o período desejado para ser analisado e o ID da fonte que originou esse arquivo (Vimos essa parte no artigo anterior). Ela é responsável por transformar cada linha do arquivo em um Map contendo todas as informações necessárias para a inserção no banco, incluindo colunas que o banco normalmente preenche automaticamente, como o ID, inserted_at e updated_at, adiante ficará claro o motivo disto.
Alguns pontos importantes aqui sāo:
Utilização de Stream em vez de funções Enum
Streams possuem uma abordagem lazy, o que as tornam perfeitas em momentos que você quer fazer diversas alterações em seus dados sem comprometer a performance.
:binary_copy
Utilizamos esta função do Erlang para garantir que o conteúdo que sera persistido seja realmente a copia binaria, não uma referencia. Se não planeja guardar os dados, não vejo necessidade de utilizar essa função
Função fix_encoding
Como dito antes, arquivos CSV podem ser complicados, e este em questão estava com um encoding diferente, tendo que ser tratado com a função escrita abaixo
defp fix_enconding(pep) do pep |> Stream.map(fn {key, value} -> {key, latin1_to_utf8(value)} end) |> Enum.into(%{}) end defp latin1_to_utf8(binary), do: :unicode.characters_to_binary( binary, :latin1, :utf8 )
Inserindo no Banco de dados
A função publica deste modulo na verdade é a import_to_db (me ocorre agora de refatorar-la para outro modulo), que é responsável por inserir todas as linhas do arquivo analisado no banco de dados.
def import_to_db(ano_mes) do
Task.start_link(fn -> parse_import_to_db(ano_mes) end)
end
defp parse_import_to_db(ano_mes) do
source = Repo.get_by(Source, ano_mes: ano_mes)
parse(source)
|> Stream.chunk_every(5000)
|> Enum.each(fn chunck -> Repo.insert_all(PepStruct, chunck) end)
end
Utilizamos o modulo Task para iniciar essa tarefa de modo assíncrono, no modo fire and forget, pois ela pode demorar para processas e não queremos que o usuário fique bloqueado enquanto espera.
A função parse_import_to_db fica responsável por chamar o parse, pegar seu resultado e separar em diversos chunk (pedaços da lista) e então inserir cada pedaço de uma vez no banco de dados. Isso só é possível pois já preenchemos todas as colunas necessárias para inserção anteriormente.
Deste modo, posso dizer que a inserção no banco de dados (que era o maior gargalo, quando era feita uma a uma) passou a tomar um tempo muito pequeno, cerca de 5 segundos para inserir mais de 130 mil linhas analisadas no banco.
Alguns comentários
Meu objetivo com esse artigo é demonstrar as dificuldades técnicas que alguém que está aprendendo de forma auto-didata enfrente ao tentar fazer um projeto escalável por conta própria. Sei que o código pode ficar muito melhor, porém, a diferença de qualidade e principalmente perfomance entre o primeiro commit e o atual é enorme. Enfim, estou feliz com o resultado até então.