Skip to end of metadata
Go to start of metadata

You are viewing an old version of this content. View the current version.

Compare with Current Restore this Version View Version History

« Previous Version 3 Next »

Conjunto de notebooks que têm como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para o projeto.

1. Indicadores Carteira Deca

Indicadores para pedidos em carteira Deca.

1.1 Origem:

Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema small e tabela tb_ordem_pendente.

1.2 Transformação:

Fazendo uma query com um select de todos os campos da tabela origem:

query = "select * from small.tb_ordem_pendente"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
pedidos_carteira = redshift_to_dataframe(query = query, filename = "tb_ordem_pendente", bucket_name = bucket_name)

Renomeando as colunas:

colunas_pedidos_carteira = ["codigo_empresa", "numero_ordem_venda", "numero_sequencia_item_ordem_venda", "codigo_organizacao_vendas", "codigo_canal_distribuicao", "codigo_setor_atividade", "codigo_escritorio_vendas", "codigo_equipe_vendas", "codigo_emissor_ordem", "data_primeira_remessa", "data_emissao", "data_pedido_cliente", "motivo_recusa", "tipo_documento_ordem_venda", "quantidade_itens", "quantidade_faturada_ordem", "quantidade_pendente", "valor_liquido", "valor_faturado_ordem", "valor_pendente", "status_faturamento", "bloqueio_remessa_cliente", "status_verificacao_credito", "status_carteira", "codigo_produto", "descricao_produto", "data_atualizacao", "remessa", "data_desejada_remessa", "status_recusa", "status_item", "codigo_centro"]
df_pedidos_carteira = pedidos_carteira.toDF(*colunas_pedidos_carteira)

Utilizando Spark SQL para fazer alguns ajustes e aplicar algumas regras de negócio:

df_carteira = spark.sql("""
    select
        to_date(data_atualizacao) as data_referencia,
        (case when codigo_setor_atividade == 'HY' then 'hydra'
            when codigo_setor_atividade == 'MS' then 'metais'
            when codigo_setor_atividade == 'CS' then 'loucas' end) as negocio,
        sum(case when status_carteira in('Bloqueio Adm.', 'Credito', 'Limbo', 'Limbo Programado', 'Não classificado') then valor_pendente else 0 end) as valor_bloqueado,
        sum(case when status_carteira in('Programado', 'Remetido') then valor_pendente else 0 end) as valor_livre
   
    from pedidos_carteira
     
    where to_date(data_atualizacao) == current_date()
        and codigo_setor_atividade in('CS', 'HY', 'MS')

    group by 1, 2
""")

Fazendo append na tabela final na database indicadores_mercado:

df_carteira.write.mode("append").saveAsTable("indicadores_mercado.tb_indicadores_carteira_deca")

1.3 Base Final:

0 Comments

You are not logged in. Any changes you make will be marked as anonymous. You may want to Log In if you already have an account.