Conjunto de notebooks que têm como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para o projeto.
1. Indicadores Carteira Deca
Indicadores para pedidos em carteira Deca.
1.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema small e tabela tb_ordem_pendente.
1.2 Transformação:
Fazendo uma query com um select de todos os campos da tabela origem:
query = "select * from small.tb_ordem_pendente" multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings() bucket_name = multiple_run_parameters["bucket_name"] pedidos_carteira = redshift_to_dataframe(query = query, filename = "tb_ordem_pendente", bucket_name = bucket_name)
Renomeando as colunas:
colunas_pedidos_carteira = ["codigo_empresa", "numero_ordem_venda", "numero_sequencia_item_ordem_venda", "codigo_organizacao_vendas", "codigo_canal_distribuicao", "codigo_setor_atividade", "codigo_escritorio_vendas", "codigo_equipe_vendas", "codigo_emissor_ordem", "data_primeira_remessa", "data_emissao", "data_pedido_cliente", "motivo_recusa", "tipo_documento_ordem_venda", "quantidade_itens", "quantidade_faturada_ordem", "quantidade_pendente", "valor_liquido", "valor_faturado_ordem", "valor_pendente", "status_faturamento", "bloqueio_remessa_cliente", "status_verificacao_credito", "status_carteira", "codigo_produto", "descricao_produto", "data_atualizacao", "remessa", "data_desejada_remessa", "status_recusa", "status_item", "codigo_centro"] df_pedidos_carteira = pedidos_carteira.toDF(*colunas_pedidos_carteira)
Utilizando Spark SQL para fazer alguns ajustes e aplicar algumas regras de negócio:
df_carteira = spark.sql(""" select to_date(data_atualizacao) as data_referencia, (case when codigo_setor_atividade == 'HY' then 'hydra' when codigo_setor_atividade == 'MS' then 'metais' when codigo_setor_atividade == 'CS' then 'loucas' end) as negocio, sum(case when status_carteira in('Bloqueio Adm.', 'Credito', 'Limbo', 'Limbo Programado', 'Não classificado') then valor_pendente else 0 end) as valor_bloqueado, sum(case when status_carteira in('Programado', 'Remetido') then valor_pendente else 0 end) as valor_livre from pedidos_carteira where to_date(data_atualizacao) == current_date() and codigo_setor_atividade in('CS', 'HY', 'MS') group by 1, 2 """)
Fazendo append na tabela final na database indicadores_mercado:
df_carteira.write.mode("append").saveAsTable("indicadores_mercado.tb_indicadores_carteira_deca")
0 Comments