Resumo

Este artigo curto visa mostrar num exemplo real como ler grande volumes de dados usando o Spark no R. Ainda muito preliminar é passível de melhorias, ajustes e sugestões de avanços no futuro próximo. Utilizarei aqui a leitura de um dataset considerado grande (um pouco mais de 36 milhões de registros) dos microdados do CAGED de jan/2020 a fev/2021.

When you think of the computation power that Spark provides and the ease of use of the R language, it is natural to want them to work together, seamlessly. This is also what the R community expected: an R package that would provide an interface to Spark that was easy to use, compatible with other R packages, and available in CRAN. With this goal, we started developing sparklyr.

Officially, sparklyr is an R interface for Apache Spark. It’s available in CRAN and works like any other CRAN package, meaning that it’s agnostic to Spark versions, it’s easy to install, it serves the R community, it embraces other packages and practices from the R community, and so on. It’s hosted in GitHub and licensed under Apache 2.0, which allows you to clone, modify, and contribute back to this project.

Mastering Spark with R

Javier Luraschi, Kevin Kuo, Edgar Ruiz


\[\\[1in]\]



\[\\[1in]\]

Carregando as bibliotecas

library(googledrive) # Pacote para abrir os csvs que foram salvos no Drive
library(DBI) # O pacote DBI pode ser usado para querys SQL dentro do R com o dplyr
library(DT) # Pacote para gerar tabelas interativas de outputs
library(pryr) # Pacote para mostrar tamanho dos dataframes
library(dplyr) # O dplyr roda com o sparklyr


library(sparklyr)

Setando o uso da memória local

memory.limit() # Mostra o total alocado na minha memoria local
[1] 8006
paste(round(memory.limit(size=10000000000)/2^30, 2),"Gb") # Aumento a alocacao da memoria no R ao limite maximo
[1] "9.31 Gb"
paste(round(memory.limit()/2^30, 2), "Gb") # Mostra o total alocado na minha maquina
[1] "9.31 Gb"

Configuro o Spark pra usar minha memória:

config <- spark_config()

config$`sparklyr.shell.driver-memory` <- "7G" # Minha máquina possui 8Gb
config$`sparklyr.shell.executor-memory` <- "7G"

Conectando no Spark

spark_conn <- spark_connect(master="local", config = config)

spark_conn # Mostra a conexao local do Spark
$master
[1] "local[8]"

$method
[1] "shell"

$app_name
[1] "sparklyr"

$config
$config$spark.env.SPARK_LOCAL_IP.local
[1] "127.0.0.1"

$config$sparklyr.connect.csv.embedded
[1] "^1.*"

$config$spark.sql.legacy.utcTimestampFunc.enabled
[1] TRUE

$config$sparklyr.connect.cores.local
[1] 8

$config$spark.sql.shuffle.partitions.local
[1] 8

$config$`sparklyr.shell.driver-memory`
[1] "7G"

$config$`sparklyr.shell.executor-memory`
[1] "7G"

$config$sparklyr.shell.name
[1] "sparklyr"

attr(,"config")
[1] "default"
attr(,"file")
[1] "C:\\Users\\rodri\\Documents\\R\\win-library\\4.0\\sparklyr\\conf\\config-template.yml"

$state
<environment: 0x0000000018750328>

$extensions
$extensions$jars
character(0)

$extensions$packages
character(0)

$extensions$initializers
list()

$extensions$catalog_jars
character(0)

$extensions$repositories
character(0)


$spark_home
[1] "C:\\Users\\rodri\\AppData\\Local\\spark\\spark-3.0.1-bin-hadoop3.2"

$backend
A connection with                              
description "->localhost:8881"
class       "sockconn"        
mode        "wb"              
text        "binary"          
opened      "opened"          
can read    "yes"             
can write   "yes"             

$monitoring
A connection with                              
description "->localhost:8881"
class       "sockconn"        
mode        "wb"              
text        "binary"          
opened      "opened"          
can read    "yes"             
can write   "yes"             

$gateway
A connection with                              
description "->localhost:8880"
class       "sockconn"        
mode        "rb"              
text        "binary"          
opened      "opened"          
can read    "yes"             
can write   "yes"             

$output_file
[1] "C:\\Users\\rodri\\AppData\\Local\\Temp\\RtmpMf5pyC\\file2c083fbf5c47_spark.log"

$sessionId
[1] 80103

$home_version
[1] "3.0.1"

attr(,"class")
[1] "spark_connection"       "spark_shell_connection" "DBIConnection"         

A local cluster is really helpful to get started, test code, and troubleshoot with ease. (Luraschi, Kuo e Ruiz, 2021)

Os microdados do CAGED

Vamos utilizar como exemplo os “maravilhosos” microdados do Ministério do Trabalho e Emprego, a saber o Cadastro Geral de Admitidos e Desligados (CAGED):

(veja esse artigo a respeito do novo CAGED. Vale lembrar que se você tentar hoje (junho 06, 2021) reproduzir o mesmo código utilizado nesse site notará que o pacote ecoseries parece ter sido descontinuado do CRAN.)

Primeiro baixe os microdados do CAGED referente aos mêses desejados para sua análise no seguinte endereço:

E também baixe aqui o dicionário desses microdados do CAGED.

Caso você queira acessar o link acima, destaco que não recomendo a tentativa pelo navegador Google Chrome (não abre nada), tente usando p. ex. o excelente “Internet Explorer” (heheheh).

Em seguida, você deverá extrair todos os arquivos de dados no formato .txt para alguma pasta específica que você precise o armazenar.

O próximo passo seria então você utilizar os comandos do R para lêr todos eles e em seguida criar como .csvs todos os meses que você deseja. (utilize o comando write.csv() para isso)

Pra facilitar e reduzir o seu tempo com isso, criei uma pasta no Google Drive para agilizar nosso trabalho.

Para acessar os arquivos sem precisar baixá-los em sua máquina crie um script do R e rode os seguintes comandos:

library(googledrive)

drive_find() #Lista todos os arquivos que estao no seu drive

library(googlesheets4) 

gs4_find()

Em seguida o RStudio vai lhe perguntar no Console qual conta do Google sua desejas autenticar. Siga os passos e voila!

Após a sua autenticação e na medida em que você vai reproduzindo esses códigos aqui no seu RStudio partimos pro nosso próximo passo. Veremos então os dados que estão nessa pasta compartilhada:

Datasets_CAGED <- drive_ls(path = "Datasets_CAGED") # Lista todos os arquivos que estao no seu drive
Datasets_CAGED
# A tibble: 14 x 3
   name                   id                                drive_resource   
 * <chr>                  <chr>                             <list>           
 1 caged_mov_012020.csv   1LdLaVZ25cE6UpIfo0IHxJj0e8m8taPkd <named list [39]>
 2 Layout Novo Caged.xlsx 1uqfw_zvOqg7Kj-aftVUGMhpziT2pA-pe <named list [40]>
 3 caged_mov_022020.csv   1Ya9vdboSyic9ckfh8CU1yNk9uKB2H9Vm <named list [39]>
 4 caged_mov_032020.csv   1ENe681tmfYYkfx5GSSp5wIAvq2P3lNQe <named list [39]>
 5 caged_mov_042020.csv   1V4tx4cPIxhiWaetY-fYsFbnWm-dDDYNN <named list [39]>
 6 caged_mov_052020.csv   1Wh6SPPKAqJYDtthgLcmuU_gnfVMkNinF <named list [39]>
 7 caged_mov_062020.csv   1Ha2drcIUdY7kQACFzxeS7FVzi2XLH6aN <named list [39]>
 8 caged_mov_072020.csv   1wdNcGQNXdY0gf6lfKfe4HQqewkvooyy_ <named list [39]>
 9 caged_mov_012021.csv   1mxdiiKRx_qac1iLPsu9V3LgzyUe1AwgZ <named list [39]>
10 caged_mov_122020.csv   1bYvxjY3Xqwj0fSuJBfu5K9kw_1P8UbXK <named list [39]>
11 caged_mov_082020.csv   1q2J1WO6_TGYMc35jyk8pQTKItt0n6OCC <named list [39]>
12 caged_mov_092020.csv   1AMBf3hqMpMLj1rj-acYivzk1p-e0WMe4 <named list [39]>
13 caged_mov_102020.csv   1D9PKz5Lwjk1UKzL0ZYreCFmDe9P2lklC <named list [39]>
14 caged_mov_112020.csv   1euHtWmZTklPuafcCyd28WbRgS_aKolu7 <named list [39]>

Agora leremos os arquivos da pasta:

jan2020 <- spark_read_csv(spark_conn,"caged_mov_012020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

fev2020 <- spark_read_csv(spark_conn,"caged_mov_022020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

mar2020 <- spark_read_csv(spark_conn,"caged_mov_032020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

abr2020 <- spark_read_csv(spark_conn,"caged_mov_042020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

mai2020 <- spark_read_csv(spark_conn,"caged_mov_052020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

jun2020 <- spark_read_csv(spark_conn,"caged_mov_062020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

jul2020 <- spark_read_csv(spark_conn,"caged_mov_072020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

ago2020 <- spark_read_csv(spark_conn,"caged_mov_082020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

set2020 <- spark_read_csv(spark_conn,"caged_mov_092020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

out2020 <- spark_read_csv(spark_conn,"caged_mov_102020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

nov2020 <- spark_read_csv(spark_conn,"caged_mov_112020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

dez2020 <- spark_read_csv(spark_conn,"caged_mov_122020.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

jan2021 <- spark_read_csv(spark_conn,"caged_mov_012021.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

fev2021 <- spark_read_csv(spark_conn,"caged_mov_022021.csv", header = TRUE, delimiter = ",", charset = "UTF-8")

Confiro quais datasets estão carregados dentro do Spark

src_tbls(spark_conn)
 [1] "caged_mov_012020_703491c1_1ac8_467d_83ec_c77c62b6dd2a"
 [2] "caged_mov_012021_269c4a30_6067_4f4f_98b6_99df3df34622"
 [3] "caged_mov_022020_0fa73933_a686_4d14_a7d5_643836fd372a"
 [4] "caged_mov_022021_72fc2e86_1c85_4e61_abb5_e822e5d40e23"
 [5] "caged_mov_032020_e9942f4a_6ec0_457f_9a3e_39c704c5ad52"
 [6] "caged_mov_042020_83a45e48_273e_47ec_87fe_cd7904aec885"
 [7] "caged_mov_052020_f13c080f_0ace_4b34_b227_e0c5837df18e"
 [8] "caged_mov_062020_7a6c13bc_3075_4594_bde1_70ce29c1d27b"
 [9] "caged_mov_072020_371db071_9a19_47e0_aadb_5f151daddbc8"
[10] "caged_mov_082020_0f0c6b41_7c8d_4bb5_b03a_ce7cd008a4fd"
[11] "caged_mov_092020_e0b9e92b_9d2d_47c3_bec2_36dc9175e076"
[12] "caged_mov_102020_230236cd_7ebe_4d57_ab6c_51a4f6a0c301"
[13] "caged_mov_112020_37f379d7_2449_409b_9ded_1957237dbd02"
[14] "caged_mov_122020_bc1694ff_5c7b_437c_b2f2_bfb166d95bec"

Como as ordens das colunas em todos essas tabelas estão exatamente na mesma ordem e com a nomenclatura exatamente iguais, posso juntá-las em um único dataframe:

caged_microdados <- sdf_bind_rows(
                                  jan2020,
                                  fev2020,
                                  mar2020,
                                  abr2020,
                                  mai2020,
                                  jun2020,
                                  jul2020,
                                  ago2020,
                                  set2020,
                                  out2020,
                                  nov2020,
                                  dez2020,
                                  jan2021,
                                  fev2021)

caged_microdados <- caged_microdados %>% # Corrijo as grafias dos nomes
  select(-`_c0`) %>%
    rename(competencia = "competincia",
           salario = "salirio",
           saldomovimentacao ="saldomovimentaiio",
           graudeinstrucao = "graudeinstruiio",
           racacor = "raiacor",
           regiao = "regiio",
           secao = "seiio",
           cbo2002ocupacao = "cbo2002ocupaiio",
           tipomovimentacao = "tipomovimentaiio")

glimpse(caged_microdados)
Rows: ??
Columns: 24
Database: spark_connection
$ competencia         <int> 202001, 202001, 202001, 202001, 202001, 202001, 20~
$ regiao              <int> 2, 4, 4, 3, 3, 2, 3, 3, 5, 4, 3, 2, 2, 2, 5, 3, 5,~
$ uf                  <int> 22, 42, 42, 35, 32, 23, 31, 35, 51, 42, 33, 23, 23~
$ municipio           <int> 221100, 420460, 420460, 350160, 320530, 230440, 31~
$ secao               <chr> "Q", "G", "Q", "I", "Q", "G", "E", "N", "I", "N", ~
$ subclasse           <int> 8640202, 4789099, 8640202, 5611203, 8610102, 47890~
$ saldomovimentacao   <int> 1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1, -1, 1, -1, -1,~
$ cbo2002ocupacao     <int> 421125, 521110, 515215, 513435, 422205, 521110, 51~
$ categoria           <int> 999, 101, 101, 101, 101, 999, 101, 101, 101, 101, ~
$ graudeinstrucao     <int> 7, 7, 8, 5, 7, 7, 4, 7, 7, 4, 9, 7, 5, 6, 7, 2, 5,~
$ idade               <chr> "49", "18", "34", "19", "19", "27", "28", "59", "1~
$ horascontratuais    <int> 44, 44, 44, 44, 30, 44, 44, 0, 44, 24, 40, 22, 44,~
$ racacor             <int> 6, 1, 1, 6, 1, 3, 1, 1, 1, 1, 1, 1, 3, 1, 3, 1, 3,~
$ sexo                <int> 3, 3, 3, 1, 1, 1, 1, 1, 3, 3, 1, 3, 1, 1, 3, 1, 1,~
$ tipoempregador      <int> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,~
$ tipoestabelecimento <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,~
$ tipomovimentacao    <int> 20, 20, 31, 10, 20, 31, 20, 20, 40, 20, 20, 31, 20~
$ tipodedeficiincia   <int> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,~
$ indtrabintermitente <int> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,~
$ indtrabparcial      <int> 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 1, 0, 0, 0, 0, 0,~
$ salario             <chr> "1039", "1298", "1408.25", "1396.48999023438", "15~
$ tamestabjan         <int> 2, 4, 3, 6, 9, 2, 10, 10, 5, 8, 6, 3, 8, 8, 5, 3, ~
$ indicadoraprendiz   <int> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,~
$ fonte               <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,~
sdf_nrow(caged_microdados) # Conta o numero de linhas no dataframe
[1] 36110048

Começamos substituindo a coluna de datas (competencias) e a de seções econômicas do IBGE:

caged_microdados <- caged_microdados %>% 
 
  mutate(
    
    competencia = as.character(competencia), # Transformo em caracter a data
    competencia = case_when(
                            competencia == "202001" ~"2020/01",
                            competencia == "202002" ~"2020/02",
                            competencia == "202003" ~"2020/03",
                            competencia == "202004" ~"2020/04",
                            competencia == "202005" ~"2020/05",
                            competencia == "202006" ~"2020/06",
                            competencia == "202007" ~"2020/07",
                            competencia == "202008" ~"2020/08",
                            competencia == "202009" ~"2020/09",
                            competencia == "202010" ~"2020/10",
                            competencia == "202011" ~"2020/11",
                            competencia == "202012" ~"2020/12",
                            competencia == "202101" ~"2021/01",
                            competencia == "202102" ~"2021/02")
         )
caged_microdados <- caged_microdados %>% 
  
  mutate(
    
    secao = case_when(
    secao ==  "A"   ~"Agricultura, Pecuária, Produção Florestal, Pesca e AqÜIcultura",
    secao ==  "B"   ~"Indústrias Extrativas",
    secao ==  "C"   ~"Indústrias de Transformação",
    secao ==  "D"   ~"Eletricidade e Gás",
    secao ==  "E"   ~"Água, Esgoto, Atividades de Gestão de Resíduos e Descontaminação",
    secao ==  "F"   ~"Construção",
    secao ==  "G"   ~"Comércio, Reparação de Veículos Automotores e Motocicletas",
    secao ==  "H"   ~"Transporte, Armazenagem e Correio",
    secao ==  "I"   ~"Alojamento e Alimentação",
    secao ==  "J"   ~"Informação e Comunicação",
    secao ==  "K"   ~"Atividades Financeiras, de Seguros e Serviços Relacionados",
    secao ==  "L"   ~"Atividades Imobiliárias",
    secao ==  "M"   ~"Atividades Profissionais, Científicas e Técnicas",
    secao ==  "N"   ~"Atividades Administrativas e Serviços Complementares",
    secao ==  "O"   ~"Administração Pública, Defesa e Seguridade Social",
    secao ==  "P"   ~"Educação",
    secao ==  "Q"   ~"Saúde Humana e Serviços Sociais",
    secao ==  "R"   ~"Artes, Cultura, Esporte e Recreação",
    secao ==  "S"   ~"Outras Atividades de Serviços",
    secao ==  "T"   ~"Serviços Domésticos",
    secao ==  "U"   ~"Organismos Internacionais e Outras Instituições Extraterritoriais",
    secao ==  "Z"   ~"Não identificado")
    
       )

Agora façamos as relacionadas a região e localidades:

caged_microdados <- caged_microdados %>%
  
 mutate(
    
  regiao = case_when(
    regiao ==  1 ~ "Norte",
    regiao ==  2 ~ "Nordeste",
    regiao ==  3 ~ "Sudeste",
    regiao ==  4 ~ "Sul",
    regiao ==  5 ~ "Centro-Oeste",
    regiao ==  9 ~ "Não identificado"),
    
   uf = case_when(
    uf ==  11 ~ "Rondônia",
    uf ==  12 ~ "Acre",
    uf ==  13 ~ "Amazonas",
    uf ==  14 ~ "Roraima",
    uf ==  15 ~ "Pará",
    uf ==  16 ~ "Amapá",
    uf ==  17 ~ "Tocantins",
    uf ==  21 ~ "Maranhão",
    uf ==  22 ~ "Piauí",
    uf ==  23 ~ "Ceará",
    uf ==  24 ~ "Rio Grande do Norte",
    uf ==  25 ~ "Paraíba",
    uf ==  26 ~ "Pernambuco",
    uf ==  27 ~ "Alagoas",
    uf ==  28 ~ "Sergipe",
    uf ==  29 ~ "Bahia",
    uf ==  31 ~ "Minas gerais",
    uf ==  32 ~ "Espírito Santo",
    uf ==  33 ~ "Rio de Janeiro",
    uf ==  35 ~ "São Paulo",
    uf ==  41 ~ "Paraná",
    uf ==  42 ~ "Santa Catarina",
    uf ==  43 ~ "Rio Grande do Sul",
    uf ==  50 ~ "Mato Grosso do Sul",
    uf ==  51 ~ "Mato Grosso",
    uf ==  52 ~ "Goiás",
    uf ==  53 ~ "Distrito Federal",
    uf ==  99 ~ "Não identificado")
    
    )

Para as demais variáveis

caged_microdados <- caged_microdados %>%
  
 mutate(
    
  salario = as.numeric(salario), # Salario como numerica
  idade = as.numeric(idade), # idade como numerica

  fonte = case_when(
    fonte ==  1 ~ "Dado Original/Sem Imputação",
    fonte ==  2 ~ "Desligamentos Imputados do CAGED",
    fonte ==  3 ~ "Desligamentos Imputados do EmpregadorWEB"),
    
  categoria = case_when(
    categoria ==  101   ~ "Empregado - Geral, inclusive o empregado público da administração direta ou indireta contratado pela CLT",
    categoria ==  102   ~ "Empregado - Trabalhador rural por pequeno prazo da Lei 11.718/2008",
    categoria ==  103   ~ "Empregado - Aprendiz",
    categoria ==  104   ~ "Empregado - Doméstico",
    categoria ==  105   ~ "Empregado - Contrato a termo firmado nos termos da Lei 9.601/1998",
    categoria ==  106   ~ "Trabalhador temporário - Contrato nos termos da Lei 6.019/1974",
    categoria ==  107   ~ "Empregado - Contrato de trabalho Verde e Amarelo - sem acordo para antecipação mensal da multa rescisória do FGTS",
    categoria ==  108   ~ "Empregado - Contrato de trabalho Verde e Amarelo - com acordo para antecipação mensal da multa rescisória do FGTS",
    categoria ==  111   ~ "Empregado - Contrato de trabalho intermitente",
    categoria ==  999   ~ "Não Identificado"),
    
  graudeinstrucao = case_when(
    graudeinstrucao ==  1   ~"Analfabeto",
    graudeinstrucao ==  2   ~"Até 5ª Incompleto",
    graudeinstrucao ==  3   ~"5ª Completo Fundamental",
    graudeinstrucao ==  4   ~"6ª a 9ª Fundamental",
    graudeinstrucao ==  5   ~"Fundamental Completo",
    graudeinstrucao ==  6   ~"Médio Incompleto",
    graudeinstrucao ==  7   ~"Médio Completo",
    graudeinstrucao ==  8   ~"Superior Incompleto",
    graudeinstrucao ==  9   ~"Superior Completo",
    graudeinstrucao ==  10 ~"Mestrado",
    graudeinstrucao ==  11 ~"Doutorado",
    graudeinstrucao ==  80 ~"Pós-Graduação completa",
    graudeinstrucao ==  99 ~"Não Identificado"),
    
  sexo = case_when(
    sexo ==  1  ~"Homem",
    sexo ==  3  ~"Mulher",
    sexo ==  9  ~"Não Identificado"),
    
  tipoempregador = case_when(
    tipoempregador ==  0 ~"CNPJ RAIZ",
    tipoempregador ==  2 ~"CPF",
    tipoempregador ==  9 ~"Não Identificado"),
    
  tipoestabelecimento = case_when(
    tipoestabelecimento ==  1   ~"CNPJ",
    tipoestabelecimento ==  3   ~"CAEPF(Cadastro de Atividade Econômica de Pessoa Física)",
    tipoestabelecimento ==  4   ~"CNO(Cadastro Nacional de Obra)",
    tipoestabelecimento ==  5   ~"CEI(CAGED)",
    tipoestabelecimento ==  9   ~"Não Identificado")
  )
caged_microdados <- caged_microdados %>%
  
 mutate(
    
  tipomovimentacao = case_when(
    tipomovimentacao ==  10 ~"Admissão por primeiro emprego",
    tipomovimentacao ==  20 ~"Admissão por reemprego",
    tipomovimentacao ==  25 ~"Admissão por contrato trabalho prazo determinado",
    tipomovimentacao ==  31 ~"Desligamento por demissão sem justa causa",
    tipomovimentacao ==  32 ~"Desligamento por demissão com justa causa",
    tipomovimentacao ==  33 ~"Culpa Recíproca",
    tipomovimentacao ==  35 ~"Admissão por reintegração",
    tipomovimentacao ==  40 ~"Desligamento a pedido",
    tipomovimentacao ==  43 ~"Término contrato trabalho prazo determinado",
    tipomovimentacao ==  45 ~"Desligamento por Término de contrato",
    tipomovimentacao ==  50 ~"Desligamento por aposentadoria",
    tipomovimentacao ==  60 ~"Desligamento por morte",
    tipomovimentacao ==  70 ~"Admissão por transferência",
    tipomovimentacao ==  80 ~"Desligamento por transferência",
    tipomovimentacao ==  90 ~"Desligamento por Acordo entre empregado e empregador",
    tipomovimentacao ==  98 ~"Desligamento de Tipo Ignorado",
    tipomovimentacao ==  99 ~"Não Identificado"),
    
  indtrabparcial = case_when(
    indtrabparcial == 0 ~"Não",
    indtrabparcial == 1 ~"Sim",
    indtrabparcial == 9 ~"Não Identificado"),
    
  indtrabintermitente = case_when(
    indtrabintermitente ==  0   ~"Não",
    indtrabintermitente ==  1   ~"Sim",
    indtrabintermitente ==  9   ~"Não Identificado"),
    
   tipodedeficiincia = case_when(
    tipodedeficiincia ==  0 ~"Não Deficiente",
    tipodedeficiincia ==  1 ~"Física",
    tipodedeficiincia ==  2 ~"Auditiva",
    tipodedeficiincia ==  3 ~"Visual",
    tipodedeficiincia ==  4 ~"Intelectual (Mental)",
    tipodedeficiincia ==  5 ~"Múltipla",
    tipodedeficiincia ==  6 ~"Reabilitado",
    tipodedeficiincia ==  9 ~"Não Identificado" ),
    
   racacor = case_when(
    racacor ==  1   ~"Branca",
    racacor ==  2   ~"Preta",
    racacor ==  3   ~"Parda",
    racacor ==  4   ~"Amarela",
    racacor ==  5   ~"Indígena",
    racacor ==  6   ~"Não informada",
    racacor ==  9   ~"Não Identificado"),
    
   indicadoraprendiz = case_when(
    indicadoraprendiz == 0 ~"Não",
    indicadoraprendiz == 1  ~"Sim",
    indicadoraprendiz == 9  ~"Não Identificado"),
    
   tamestabjan = case_when(
    tamestabjan ==  1   ~"Nenhum vínculo",
    tamestabjan ==  2   ~"De 1 a 4 vínculos",
    tamestabjan ==  3   ~"De 5 a 9 vínculos",
    tamestabjan ==  4   ~"De 10 a 19 vínculos",
    tamestabjan ==  5   ~"De 20 a 49 vínculos",
    tamestabjan ==  6   ~"De 50 a 99 vínculos",
    tamestabjan ==  7   ~"De 100 a 249 vínculos",
    tamestabjan ==  8   ~"De 250 a 499 vínculos",
    tamestabjan ==  9   ~"De 500 a 999 vínculos",
    tamestabjan ==  10 ~"1000 ou mais vínculos",
    tamestabjan ==  99 ~"Não Identificado")

  )

Vamos dar uma olhada nos dados novamente:

glimpse(caged_microdados)
Rows: ??
Columns: 24
Database: spark_connection
$ competencia         <chr> "2020/01", "2020/01", "2020/01", "2020/01", "2020/~
$ regiao              <chr> "Nordeste", "Sul", "Sul", "Sudeste", "Sudeste", "N~
$ uf                  <chr> "Piauí", "Santa Catarina", "Santa Catarina", "São ~
$ municipio           <int> 221100, 420460, 420460, 350160, 320530, 230440, 31~
$ secao               <chr> "Saúde Humana e Serviços Sociais", "Comércio, Repa~
$ subclasse           <int> 8640202, 4789099, 8640202, 5611203, 8610102, 47890~
$ saldomovimentacao   <int> 1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1, -1, 1, -1, -1,~
$ cbo2002ocupacao     <int> 421125, 521110, 515215, 513435, 422205, 521110, 51~
$ categoria           <chr> "Não Identificado", "Empregado - Geral, inclusive ~
$ graudeinstrucao     <chr> "Médio Completo", "Médio Completo", "Superior Inco~
$ idade               <dbl> 49, 18, 34, 19, 19, 27, 28, 59, 19, 44, 26, 24, 51~
$ horascontratuais    <int> 44, 44, 44, 44, 30, 44, 44, 0, 44, 24, 40, 22, 44,~
$ racacor             <chr> "Não informada", "Branca", "Branca", "Não informad~
$ sexo                <chr> "Mulher", "Mulher", "Mulher", "Homem", "Homem", "H~
$ tipoempregador      <chr> "CNPJ RAIZ", "CNPJ RAIZ", "CNPJ RAIZ", "CNPJ RAIZ"~
$ tipoestabelecimento <chr> "CNPJ", "CNPJ", "CNPJ", "CNPJ", "CNPJ", "CNPJ", "C~
$ tipomovimentacao    <chr> "Admissão por reemprego", "Admissão por reemprego"~
$ tipodedeficiincia   <chr> "Não Deficiente", "Não Deficiente", "Não Deficient~
$ indtrabintermitente <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ indtrabparcial      <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ salario             <dbl> 1039.00, 1298.00, 1408.25, 1396.49, 1501.76, 1112.~
$ tamestabjan         <chr> "De 1 a 4 vínculos", "De 10 a 19 vínculos", "De 5 ~
$ indicadoraprendiz   <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ fonte               <chr> "Dado Original/Sem Imputação", "Dado Original/Sem ~
head(caged_microdados)
# Source: spark<?> [?? x 24]
  competencia regiao  uf      municipio secao         subclasse saldomovimentac~
  <chr>       <chr>   <chr>       <int> <chr>             <int>            <int>
1 2020/01     Nordes~ Piauí      221100 Saúde Humana~   8640202                1
2 2020/01     Sul     Santa ~    420460 Comércio, Re~   4789099                1
3 2020/01     Sul     Santa ~    420460 Saúde Humana~   8640202               -1
4 2020/01     Sudeste São Pa~    350160 Alojamento e~   5611203                1
5 2020/01     Sudeste Espíri~    320530 Saúde Humana~   8610102                1
6 2020/01     Nordes~ Ceará      230440 Comércio, Re~   4789001               -1
# ... with 17 more variables: cbo2002ocupacao <int>, categoria <chr>,
#   graudeinstrucao <chr>, idade <dbl>, horascontratuais <int>, racacor <chr>,
#   sexo <chr>, tipoempregador <chr>, tipoestabelecimento <chr>,
#   tipomovimentacao <chr>, tipodedeficiincia <chr>, indtrabintermitente <chr>,
#   indtrabparcial <chr>, salario <dbl>, tamestabjan <chr>,
#   indicadoraprendiz <chr>, fonte <chr>

Ainda precisaremos incluir as substituições faltantes que são municipio, subclasse e cbo2002ocupacao, mas pra isso teremos de utilizar as abas de descritores específicas do Layout do CAGED.

library(readxl)

url<-'ftp://ftp.mtps.gov.br/pdet/microdados/NOVO%20CAGED/Movimenta%E7%F5es/Layout%20Novo%20Caged%20Movimenta%E7%E3o.xlsx'
dicionario_temp <- tempfile()
download.file(url, dicionario_temp, mode="wb")

municipio <- read_excel(path = dicionario_temp, sheet = 4) 
head(municipio)
# A tibble: 6 x 2
  Código Descrição               
   <dbl> <chr>                   
1 110001 Ro-Alta Floresta D Oeste
2 110002 Ro-Ariquemes            
3 110003 Ro-Cabixi               
4 110004 Ro-Cacoal               
5 110005 Ro-Cerejeiras           
6 110006 Ro-Colorado do Oeste    
subclasse_cnae <- read_excel(path = dicionario_temp, sheet = "subclasse") 
head(subclasse_cnae)
# A tibble: 6 x 2
  Código Descrição                                                
   <dbl> <chr>                                                    
1 111301 Cultivo de Arroz                                         
2 111302 Cultivo de Milho                                         
3 111303 Cultivo de Trigo                                         
4 111399 Cultivo de Outros Cereais não Especificados Anteriormente
5 112101 Cultivo de Algodão Herbáceo                              
6 112102 Cultivo de Juta                                          
cbo2002_ocup <- read_excel(path = dicionario_temp, sheet = 9) 
head(cbo2002_ocup)
# A tibble: 6 x 2
  Código Descrição                     
   <dbl> <chr>                         
1  10105 Oficial General da Aeronautica
2  10110 Oficial General do Exercito   
3  10115 Oficial General da Marinha    
4  10205 Oficial da Aeronautica        
5  10210 Oficial do Exercito           
6  10215 Oficial da Marinha            

Preciso modificar as variáveis das planilhas

municipio <- municipio %>%
  mutate(Código = as.character(Código)) %>%
  rename(municipio = "Código",
         descricao_municipio = "Descrição")
#write.csv(municipio,"municipio.csv")

municipio_spark <- spark_read_csv(spark_conn,"municipio.csv", header = TRUE, delimiter = ",")

municipio_spark <- municipio_spark %>%
  select(-`_c0`) %>%
  mutate(municipio = as.character(municipio))
glimpse(municipio_spark)


subclasse_cnae <- subclasse_cnae %>%
  mutate(Código = as.character(Código)) %>%
  rename(subclasse = "Código",
         descricao_subclasse = "Descrição")
#write.csv(subclasse_cnae,"subclasse_cnae.csv")

subclasse_cnae_spark <- spark_read_csv(spark_conn,"subclasse_cnae.csv", header = TRUE, delimiter = ",")

subclasse_cnae_spark <- subclasse_cnae_spark %>%
  select(-`_c0`) %>%
  mutate(subclasse = as.character(subclasse))
glimpse(subclasse_cnae_spark)

cbo2002_ocup <- cbo2002_ocup %>%
  mutate(Código = as.character(Código)) %>%
  rename(cbo2002ocupacao = "Código",
         descricao_cbo = "Descrição")
#write.csv(cbo2002_ocup,"cbo2002_ocup.csv")
cbo2002_ocup_spark <- spark_read_csv(spark_conn,"cbo2002_ocup.csv", header = TRUE, delimiter = ",")       

cbo2002_ocup_spark <- cbo2002_ocup_spark %>%
  select(-`_c0`) %>%
  mutate(cbo2002ocupacao = as.character(cbo2002ocupacao))
glimpse(cbo2002_ocup_spark) 
Rows: ??
Columns: 2
Database: spark_connection
$ municipio           <chr> "110001", "110002", "110003", "110004", "110005", ~
$ descricao_municipio <chr> "Ro-Alta Floresta D Oeste", "Ro-Ariquemes", "Ro-Ca~
Rows: ??
Columns: 2
Database: spark_connection
$ subclasse           <chr> "111301", "111302", "111303", "111399", "112101", ~
$ descricao_subclasse <chr> "Cultivo de Arroz", "Cultivo de Milho", "Cultivo d~
Rows: ??
Columns: 2
Database: spark_connection
$ cbo2002ocupacao <chr> "10105", "10110", "10115", "10205", "10210", "10215", ~
$ descricao_cbo   <chr> "Oficial General da Aeronautica", "Oficial General do ~

Converto os dados para caracteres no dataset dos últimos 3 meses disponíveis:

caged_microdados <- caged_microdados %>%
  
  mutate(
    
    municipio = as.character(municipio),
    subclasse = as.character(subclasse),
    cbo2002ocupacao = as.character(cbo2002ocupacao)
    
        )

glimpse(caged_microdados)
Rows: ??
Columns: 24
Database: spark_connection
$ competencia         <chr> "2020/01", "2020/01", "2020/01", "2020/01", "2020/~
$ regiao              <chr> "Nordeste", "Sul", "Sul", "Sudeste", "Sudeste", "N~
$ uf                  <chr> "Piauí", "Santa Catarina", "Santa Catarina", "São ~
$ municipio           <chr> "221100", "420460", "420460", "350160", "320530", ~
$ secao               <chr> "Saúde Humana e Serviços Sociais", "Comércio, Repa~
$ subclasse           <chr> "8640202", "4789099", "8640202", "5611203", "86101~
$ saldomovimentacao   <int> 1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1, -1, 1, -1, -1,~
$ cbo2002ocupacao     <chr> "421125", "521110", "515215", "513435", "422205", ~
$ categoria           <chr> "Não Identificado", "Empregado - Geral, inclusive ~
$ graudeinstrucao     <chr> "Médio Completo", "Médio Completo", "Superior Inco~
$ idade               <dbl> 49, 18, 34, 19, 19, 27, 28, 59, 19, 44, 26, 24, 51~
$ horascontratuais    <int> 44, 44, 44, 44, 30, 44, 44, 0, 44, 24, 40, 22, 44,~
$ racacor             <chr> "Não informada", "Branca", "Branca", "Não informad~
$ sexo                <chr> "Mulher", "Mulher", "Mulher", "Homem", "Homem", "H~
$ tipoempregador      <chr> "CNPJ RAIZ", "CNPJ RAIZ", "CNPJ RAIZ", "CNPJ RAIZ"~
$ tipoestabelecimento <chr> "CNPJ", "CNPJ", "CNPJ", "CNPJ", "CNPJ", "CNPJ", "C~
$ tipomovimentacao    <chr> "Admissão por reemprego", "Admissão por reemprego"~
$ tipodedeficiincia   <chr> "Não Deficiente", "Não Deficiente", "Não Deficient~
$ indtrabintermitente <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ indtrabparcial      <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ salario             <dbl> 1039.00, 1298.00, 1408.25, 1396.49, 1501.76, 1112.~
$ tamestabjan         <chr> "De 1 a 4 vínculos", "De 10 a 19 vínculos", "De 5 ~
$ indicadoraprendiz   <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ fonte               <chr> "Dado Original/Sem Imputação", "Dado Original/Sem ~

Insiro os dados faltantes no dataset dos últimos 3 meses disponíveis:

caged_microdados <- left_join(caged_microdados, municipio_spark %>%
                                    select(municipio, descricao_municipio), by = c("municipio"="municipio"))

caged_microdados <- left_join(caged_microdados, subclasse_cnae_spark %>%
                                            select(subclasse, descricao_subclasse), by = c("subclasse"="subclasse"))

caged_microdados <- left_join(caged_microdados, cbo2002_ocup_spark %>%
                                    select(cbo2002ocupacao, descricao_cbo), by = c("cbo2002ocupacao"="cbo2002ocupacao"))

glimpse(caged_microdados)
Rows: ??
Columns: 27
Database: spark_connection
$ competencia         <chr> "2020/01", "2020/01", "2020/01", "2020/01", "2020/~
$ regiao              <chr> "Nordeste", "Sul", "Sul", "Sudeste", "Sudeste", "N~
$ uf                  <chr> "Piauí", "Santa Catarina", "Santa Catarina", "São ~
$ municipio           <chr> "221100", "420460", "420460", "350160", "320530", ~
$ secao               <chr> "Saúde Humana e Serviços Sociais", "Comércio, Repa~
$ subclasse           <chr> "8640202", "4789099", "8640202", "5611203", "86101~
$ saldomovimentacao   <int> 1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1, -1, 1, -1, -1,~
$ cbo2002ocupacao     <chr> "421125", "521110", "515215", "513435", "422205", ~
$ categoria           <chr> "Não Identificado", "Empregado - Geral, inclusive ~
$ graudeinstrucao     <chr> "Médio Completo", "Médio Completo", "Superior Inco~
$ idade               <dbl> 49, 18, 34, 19, 19, 27, 28, 59, 19, 44, 26, 24, 51~
$ horascontratuais    <int> 44, 44, 44, 44, 30, 44, 44, 0, 44, 24, 40, 22, 44,~
$ racacor             <chr> "Não informada", "Branca", "Branca", "Não informad~
$ sexo                <chr> "Mulher", "Mulher", "Mulher", "Homem", "Homem", "H~
$ tipoempregador      <chr> "CNPJ RAIZ", "CNPJ RAIZ", "CNPJ RAIZ", "CNPJ RAIZ"~
$ tipoestabelecimento <chr> "CNPJ", "CNPJ", "CNPJ", "CNPJ", "CNPJ", "CNPJ", "C~
$ tipomovimentacao    <chr> "Admissão por reemprego", "Admissão por reemprego"~
$ tipodedeficiincia   <chr> "Não Deficiente", "Não Deficiente", "Não Deficient~
$ indtrabintermitente <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ indtrabparcial      <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ salario             <dbl> 1039.00, 1298.00, 1408.25, 1396.49, 1501.76, 1112.~
$ tamestabjan         <chr> "De 1 a 4 vínculos", "De 10 a 19 vínculos", "De 5 ~
$ indicadoraprendiz   <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ fonte               <chr> "Dado Original/Sem Imputação", "Dado Original/Sem ~
$ descricao_municipio <chr> "Pi-Teresina", "Sc-Criciuma", "Sc-Criciuma", "Sp-A~
$ descricao_subclasse <chr> "Laborat<U+FFFD>rios Cl<U+FFFD>nicos", "Com<U+FFFD>rcio Varejista de Ou~
$ descricao_cbo       <chr> "Operador de Caixa", "Vendedor de Comercio Varejis~
sdf_nrow(caged_microdados)
[1] 36110048

Podemos retirar as colunas municipio, subclasse e cbo2002ocupacao e regorganizar o dataset:

caged_microdados <- caged_microdados %>%
  select(-municipio, -subclasse, -cbo2002ocupacao)

caged_microdados <- caged_microdados %>%
  select( # Faco esse select pra organizar a ordem das colunas no dataframe
    competencia,
    regiao,
    uf,
    descricao_municipio,
    secao,
    descricao_subclasse,
    saldomovimentacao,
    descricao_cbo,
    categoria,
    graudeinstrucao,
    idade,
    horascontratuais,
    racacor,
    sexo,
    tipoempregador,
    tipoestabelecimento,
    tipomovimentacao,
    tipodedeficiincia,
    indtrabintermitente,
    indtrabparcial,
    salario,
    tamestabjan,
    indicadoraprendiz,
    fonte )

glimpse(caged_microdados)
Rows: ??
Columns: 24
Database: spark_connection
$ competencia         <chr> "2020/01", "2020/01", "2020/01", "2020/01", "2020/~
$ regiao              <chr> "Nordeste", "Sul", "Sul", "Sudeste", "Sudeste", "N~
$ uf                  <chr> "Piauí", "Santa Catarina", "Santa Catarina", "São ~
$ descricao_municipio <chr> "Pi-Teresina", "Sc-Criciuma", "Sc-Criciuma", "Sp-A~
$ secao               <chr> "Saúde Humana e Serviços Sociais", "Comércio, Repa~
$ descricao_subclasse <chr> "Laborat<U+FFFD>rios Cl<U+FFFD>nicos", "Com<U+FFFD>rcio Varejista de Ou~
$ saldomovimentacao   <int> 1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1, -1, 1, -1, -1,~
$ descricao_cbo       <chr> "Operador de Caixa", "Vendedor de Comercio Varejis~
$ categoria           <chr> "Não Identificado", "Empregado - Geral, inclusive ~
$ graudeinstrucao     <chr> "Médio Completo", "Médio Completo", "Superior Inco~
$ idade               <dbl> 49, 18, 34, 19, 19, 27, 28, 59, 19, 44, 26, 24, 51~
$ horascontratuais    <int> 44, 44, 44, 44, 30, 44, 44, 0, 44, 24, 40, 22, 44,~
$ racacor             <chr> "Não informada", "Branca", "Branca", "Não informad~
$ sexo                <chr> "Mulher", "Mulher", "Mulher", "Homem", "Homem", "H~
$ tipoempregador      <chr> "CNPJ RAIZ", "CNPJ RAIZ", "CNPJ RAIZ", "CNPJ RAIZ"~
$ tipoestabelecimento <chr> "CNPJ", "CNPJ", "CNPJ", "CNPJ", "CNPJ", "CNPJ", "C~
$ tipomovimentacao    <chr> "Admissão por reemprego", "Admissão por reemprego"~
$ tipodedeficiincia   <chr> "Não Deficiente", "Não Deficiente", "Não Deficient~
$ indtrabintermitente <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ indtrabparcial      <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ salario             <dbl> 1039.00, 1298.00, 1408.25, 1396.49, 1501.76, 1112.~
$ tamestabjan         <chr> "De 1 a 4 vínculos", "De 10 a 19 vínculos", "De 5 ~
$ indicadoraprendiz   <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ fonte               <chr> "Dado Original/Sem Imputação", "Dado Original/Sem ~

Pesquisa do saldo de admissões/desligamentos no período

Ok, finalmente de possa de nosso dataframe em mãos podemos filtrar nossa análise com o intuito de avaliarmos o fluxo do mercado formal de trabalho em determinada categoria profissional numa certa localidade.

Deste modo farei a seguinte investigação:

No estado do Paraná, selecionamos…

  • Qual é o saldo de admitidos e desligados da CBO economista nos meses de nov/2020 a jan/2021 ?

  • Qual é o salário de contratação destes profissionais ? *Obs.: Não utilize a média dos salários para essa avaliação;

  • Qual é o tamanho do estabelecimento que os contrata/demite ?

  • Qual é o grau de instrução desses profissionais ?

  • Quais são as subclasses CNAEs do tipo de estabelecimento que mais os contrata/demite ?

  • Qual é o modelo de contrato formal firmado de trabalho (*variável categoria)

  • Quantas horas contratuais semanais eles foram contratatados/trabalhavam ?

  • Enfim, utilize praticamente todas as colunas do dataset caged_ultimos_3meses para sanar as demais dúvidas.

minha_pesquisa <- caged_microdados %>%
  filter(grepl("economista", descricao_cbo ) | grepl("Economista", descricao_cbo) & 
         uf == "Paraná")


glimpse(minha_pesquisa)
Rows: ??
Columns: 24
Database: spark_connection
$ competencia         <chr> "2020/01", "2020/01", "2020/01", "2020/01", "2020/~
$ regiao              <chr> "Sul", "Sul", "Sul", "Sul", "Sul", "Sul", "Sul", "~
$ uf                  <chr> "Paraná", "Paraná", "Paraná", "Paraná", "Paraná", ~
$ descricao_municipio <chr> "Pr-Ibipora", "Pr-Curitiba", "Pr-Curitiba", "Pr-Ma~
$ secao               <chr> "Atividades Profissionais, Científicas e Técnicas"~
$ descricao_subclasse <chr> "Atividades de Contabilidade", "Sociedades de Fome~
$ saldomovimentacao   <int> -1, 1, -1, 1, 1, 1, -1, 1, -1, -1, -1, -1, 1, 1, 1~
$ descricao_cbo       <chr> "Economista do Setor Publico", "Economista Finance~
$ categoria           <chr> "Empregado - Geral, inclusive o empregado público ~
$ graudeinstrucao     <chr> "Superior Completo", "Médio Completo", "Superior I~
$ idade               <dbl> 31, 35, 34, 22, 41, 39, 22, 42, 31, 27, 23, 49, 24~
$ horascontratuais    <int> 44, 33, 44, 44, 44, 44, 44, 40, 40, 44, 44, 44, 40~
$ racacor             <chr> "Branca", "Parda", "Branca", "Branca", "Branca", "~
$ sexo                <chr> "Mulher", "Mulher", "Homem", "Mulher", "Homem", "M~
$ tipoempregador      <chr> "CNPJ RAIZ", "CNPJ RAIZ", "CNPJ RAIZ", "CNPJ RAIZ"~
$ tipoestabelecimento <chr> "CNPJ", "CNPJ", "CNPJ", "CNPJ", "CNPJ", "CNPJ", "C~
$ tipomovimentacao    <chr> "Desligamento por demissão sem justa causa", "Admi~
$ tipodedeficiincia   <chr> "Não Deficiente", "Não Deficiente", "Não Deficient~
$ indtrabintermitente <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ indtrabparcial      <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ salario             <dbl> 2900.00, 2727.33, 2700.00, 1500.00, 4549.60, 6013.~
$ tamestabjan         <chr> "De 5 a 9 vínculos", "Nenhum vínculo", "De 20 a 49~
$ indicadoraprendiz   <chr> "Não", "Não", "Não", "Não", "Não", "Não", "Não", "~
$ fonte               <chr> "Dado Original/Sem Imputação", "Dado Original/Sem ~
sdf_nrow(minha_pesquisa) # Mostra quantas linhas 
[1] 1012

Como temos poucas linhas podemos gerar uma tabela interativa:

minha_pesquisa <- minha_pesquisa %>%
  as.data.frame( ) %>% # Transformo pra dataframe local pq eh pequeno!
  mutate(salario = round(salario,2))
  
datatable(minha_pesquisa) # Nao eh mais um objeto Spark

Ótimo, na tabela interativa acima podemos ordenar do maior para o menor simplesmente clicando no título da coluna que desejamos. Também podemos segmentar ainda mais nossa pesquisa quando digitamos p. ex. “Curitiba” no campo Search da tabela interativa.

Agora também podemos gerar um gráfico interativo com os filtros que temos interesse:

library(ggplot2)
library(plotly)

grafico <- minha_pesquisa %>%
  filter(descricao_municipio == "Pr-Curitiba") %>%
  group_by(competencia) %>%
  summarise(fluxo = sum(saldomovimentacao, na.rm=TRUE)) 

ggplotly(
ggplot(grafico, aes(x = competencia, y = fluxo))+
  geom_col()+
  ggtitle("Fluxo de admitidos e desligados dos economistas em Curitiba/PR de jan 2020 a jan 2021")
)

Desconecta do Spark

spark_disconnect(sc=spark_conn)

\[\\[1in]\]


Referências

AWS, Running sparklyr – RStudio’s R Interface to Spark on Amazon EMR, Disponível em: \(<\) https://aws.amazon.com/pt/blogs/big-data/running-sparklyr-rstudios-r-interface-to-spark-on-amazon-emr/ \(>\)

CRAN, package ´sparklyr´, disponível em: \(<\) https://cran.r-project.org/ \(>\)

Data Science in Spark with Sparklyr : : CHEAT SHEET disponível em: \(<\) https://science.nu/wp-content/uploads/2018/07/r-sparklyr.pdf \(>\)

Luraschi, J., Kuo, K., Ruiz, E. Mastering Spark with R, Bookdown disponível em: \(<\) https://therinspark.com/ \(>\) Acesso em março de 2021.

Wilher, V. Um pesadelo chamado novo CAGED, Disponível em: \(<\) https://analisemacro.com.br/economia/comentario-de-conjuntura/um-pesadelo-chamado-novo-caged/ \(>\)