Skip to end of metadata
Go to start of metadata

You are viewing an old version of this content. View the current version.

Compare with Current Restore this Version View Version History

« Previous Version 4 Next »

Conjunto de notebooks que têm como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para o projeto.

1. Indicadores Carteira Deca

Indicadores para pedidos em carteira Deca.

1.1 Origem:

Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema small e tabela tb_ordem_pendente.

1.2 Transformação:

Fazendo uma query com um select de todos os campos da tabela origem:

query = "select * from small.tb_ordem_pendente"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
pedidos_carteira = redshift_to_dataframe(query = query, filename = "tb_ordem_pendente", bucket_name = bucket_name)

Renomeando as colunas:

colunas_pedidos_carteira = ["codigo_empresa", "numero_ordem_venda", "numero_sequencia_item_ordem_venda", "codigo_organizacao_vendas", "codigo_canal_distribuicao", "codigo_setor_atividade", "codigo_escritorio_vendas", "codigo_equipe_vendas", "codigo_emissor_ordem", "data_primeira_remessa", "data_emissao", "data_pedido_cliente", "motivo_recusa", "tipo_documento_ordem_venda", "quantidade_itens", "quantidade_faturada_ordem", "quantidade_pendente", "valor_liquido", "valor_faturado_ordem", "valor_pendente", "status_faturamento", "bloqueio_remessa_cliente", "status_verificacao_credito", "status_carteira", "codigo_produto", "descricao_produto", "data_atualizacao", "remessa", "data_desejada_remessa", "status_recusa", "status_item", "codigo_centro"]
df_pedidos_carteira = pedidos_carteira.toDF(*colunas_pedidos_carteira)

Utilizando Spark SQL para fazer alguns ajustes e aplicar algumas regras de negócio:

df_carteira = spark.sql("""
    select
        to_date(data_atualizacao) as data_referencia,
        (case when codigo_setor_atividade == 'HY' then 'hydra'
            when codigo_setor_atividade == 'MS' then 'metais'
            when codigo_setor_atividade == 'CS' then 'loucas' end) as negocio,
        sum(case when status_carteira in('Bloqueio Adm.', 'Credito', 'Limbo', 'Limbo Programado', 'Não classificado') then valor_pendente else 0 end) as valor_bloqueado,
        sum(case when status_carteira in('Programado', 'Remetido') then valor_pendente else 0 end) as valor_livre
   
    from pedidos_carteira
     
    where to_date(data_atualizacao) == current_date()
        and codigo_setor_atividade in('CS', 'HY', 'MS')

    group by 1, 2
""")

Fazendo append na tabela final na database indicadores_mercado:

df_carteira.write.mode("append").saveAsTable("indicadores_mercado.tb_indicadores_carteira_deca")

1.3 Base Final:

col_name

data_type

data_referencia

date

negocio

string

valor_bloqueado

double

valor_livre

double

2. Indicadores Devolução Deca

Indicadores para pedidos em status de devolução para Deca.

2.1 Origem:

Database large e tabela tb_resultado_comercial.

2.2 Transformação:

Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:

df_devolucoes_deca = spark.sql("""
    select distinct
        to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes,
        (case when codigo_setor_atividade == 'MS' then 'metais'
             when codigo_setor_atividade == 'CS' then 'loucas'
             when codigo_setor_atividade == 'HY' then 'hydra'
             when codigo_setor_atividade == '01' then 'revestimento' end) as negocio,
        sum(case when tipo_documento_venda == 'S2' then valor_receita_liquida else 0 end) as valor_estorno,
        sum(case when status_ordem_venda == 'DEVOLUÇÃO' then abs(valor_receita_liquida) else 0 end) as valor_devolucao,
        sum(case when status_ordem_venda == 'FATURAMENTO' then valor_receita_liquida else 0 end) as valor_faturamento
        
    from large.tb_resultado_comercial
    
    where data_competencia between '2019-01-01' and current_date() - 1
        and codigo_setor_atividade in('MS', 'CS', 'HY', '01')
    
    group by 1, 2
    
    order by 1, 2 asc
""")

Calculando o valor de devolução ajustado, conforme regra de negócio:

df_devolucoes_deca = df_devolucoes_deca.withColumn("valor_devolucao_ajustado", col("valor_devolucao") - col("valor_estorno"))

Sobrescrevendo a tabela final na database indicadores_mercado:

df_devolucoes_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_deca")

2.3 Base Final:

col_name

data_type

mes

date

negocio

string

valor_faturamento

double

valor_devolucao

double

3. Indicadores Devolução Madeira

Indicadores para pedidos em status de devolução para Madeira.

3.1 Origem:

Database analytics_prd e tabela custos_rem.

3.2 Transformação:

Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:

df_devolucao_madeira = spark.sql("""
    select
        perio as mes,
        (case when prctr like '%MDP' then 'mdp'
            when prctr like '%MDF' then 'mdf' else 'paineis' end) as negocio,
        sum(case when fkart in('ZREB', 'ZROB') then abs((vv089) - (vv001 + vv002 + vv003 + vv004)) else 0 end) as valor_devolucao,
        sum(case when fkart not in('ZREB', 'ZROB') then (vv089) - (vv001 + vv002 + vv003 + vv004) else 0 end) as valor_faturado
        
    from analytics_prd.custos_rem
    
    where spart == 'CH'
        and (prctr like '%MDP' or prctr like '%MDF')
    
    and perio >= 2019001
    
    group by 1, 2
    
    order by 1, 2
""")

Sobrescrevendo a tabela final na database indicadores_mercado:

df_devolucao_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_madeira")

3.3 Base Final:

col_name

data_type

mes

date

negocio

string

valor_faturamento

decimal(29,2)

valor_devolucao

decimal(29,2)

4. Indicadores Financeiros

Indicadores financeiros disponibilizados pela Controladoria.

4.1 Origem:

A Controladoria atualiza mensalmente alguns arquivos Excel disponibilizados em um Sharepoint.

4.2 Transformação:

Foi desenvolvida uma função para fazer o download desses arquivos e gravar no diretório dbfs:

def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_financeiros/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Fazendo o download dos arquivos:

download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fechamento%20Gerencial%202022.xlsm", "Fechamento_Gerencial_2022.xlsm")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fluxo%20de%20Caixa%20Oficial%202022.xlsx", "Fluxo_de_Caixa_Oficial_2022.xlsx")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20Consolidador.xlsx", "Forecast_3_9_2022.xlsx")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20-%20Fluxo%20de%20Caixa%20Livre.xlsx", "FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx")

4.3 Base Final:

col_name

data_type

mes

date

negocio

string

valor_faturamento

decimal(29,2)

valor_devolucao

decimal(29,2)