Conjunto de notebooks que têm como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para o projeto.
1. Indicadores Carteira Deca
Indicadores para pedidos em carteira Deca.
1.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema small e tabela tb_ordem_pendente.
1.2 Transformação:
Fazendo uma query com um select de todos os campos da tabela origem:
query = "select * from small.tb_ordem_pendente" multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings() bucket_name = multiple_run_parameters["bucket_name"] pedidos_carteira = redshift_to_dataframe(query = query, filename = "tb_ordem_pendente", bucket_name = bucket_name)
Renomeando as colunas:
colunas_pedidos_carteira = ["codigo_empresa", "numero_ordem_venda", "numero_sequencia_item_ordem_venda", "codigo_organizacao_vendas", "codigo_canal_distribuicao", "codigo_setor_atividade", "codigo_escritorio_vendas", "codigo_equipe_vendas", "codigo_emissor_ordem", "data_primeira_remessa", "data_emissao", "data_pedido_cliente", "motivo_recusa", "tipo_documento_ordem_venda", "quantidade_itens", "quantidade_faturada_ordem", "quantidade_pendente", "valor_liquido", "valor_faturado_ordem", "valor_pendente", "status_faturamento", "bloqueio_remessa_cliente", "status_verificacao_credito", "status_carteira", "codigo_produto", "descricao_produto", "data_atualizacao", "remessa", "data_desejada_remessa", "status_recusa", "status_item", "codigo_centro"] df_pedidos_carteira = pedidos_carteira.toDF(*colunas_pedidos_carteira)
Utilizando Spark SQL para fazer alguns ajustes e aplicar algumas regras de negócio:
df_carteira = spark.sql(""" select to_date(data_atualizacao) as data_referencia, (case when codigo_setor_atividade == 'HY' then 'hydra' when codigo_setor_atividade == 'MS' then 'metais' when codigo_setor_atividade == 'CS' then 'loucas' end) as negocio, sum(case when status_carteira in('Bloqueio Adm.', 'Credito', 'Limbo', 'Limbo Programado', 'Não classificado') then valor_pendente else 0 end) as valor_bloqueado, sum(case when status_carteira in('Programado', 'Remetido') then valor_pendente else 0 end) as valor_livre from pedidos_carteira where to_date(data_atualizacao) == current_date() and codigo_setor_atividade in('CS', 'HY', 'MS') group by 1, 2 """)
Fazendo append na tabela final na database indicadores_mercado:
df_carteira.write.mode("append").saveAsTable("indicadores_mercado.tb_indicadores_carteira_deca")
1.3 Base Final:
col_name | data_type |
---|---|
data_referencia | date |
negocio | string |
valor_bloqueado | double |
valor_livre | double |
2. Indicadores Devolução Deca
Indicadores para pedidos em status de devolução para Deca.
2.1 Origem:
Database large e tabela tb_resultado_comercial.
2.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
df_devolucoes_deca = spark.sql(""" select distinct to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes, (case when codigo_setor_atividade == 'MS' then 'metais' when codigo_setor_atividade == 'CS' then 'loucas' when codigo_setor_atividade == 'HY' then 'hydra' when codigo_setor_atividade == '01' then 'revestimento' end) as negocio, sum(case when tipo_documento_venda == 'S2' then valor_receita_liquida else 0 end) as valor_estorno, sum(case when status_ordem_venda == 'DEVOLUÇÃO' then abs(valor_receita_liquida) else 0 end) as valor_devolucao, sum(case when status_ordem_venda == 'FATURAMENTO' then valor_receita_liquida else 0 end) as valor_faturamento from large.tb_resultado_comercial where data_competencia between '2019-01-01' and current_date() - 1 and codigo_setor_atividade in('MS', 'CS', 'HY', '01') group by 1, 2 order by 1, 2 asc """)
Calculando o valor de devolução ajustado, conforme regra de negócio:
df_devolucoes_deca = df_devolucoes_deca.withColumn("valor_devolucao_ajustado", col("valor_devolucao") - col("valor_estorno"))
Sobrescrevendo a tabela final na database indicadores_mercado:
df_devolucoes_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_deca")
2.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | double |
valor_devolucao | double |
3. Indicadores Devolução Madeira
Indicadores para pedidos em status de devolução para Madeira.
3.1 Origem:
Database analytics_prd e tabela custos_rem.
3.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
df_devolucao_madeira = spark.sql(""" select perio as mes, (case when prctr like '%MDP' then 'mdp' when prctr like '%MDF' then 'mdf' else 'paineis' end) as negocio, sum(case when fkart in('ZREB', 'ZROB') then abs((vv089) - (vv001 + vv002 + vv003 + vv004)) else 0 end) as valor_devolucao, sum(case when fkart not in('ZREB', 'ZROB') then (vv089) - (vv001 + vv002 + vv003 + vv004) else 0 end) as valor_faturado from analytics_prd.custos_rem where spart == 'CH' and (prctr like '%MDP' or prctr like '%MDF') and perio >= 2019001 group by 1, 2 order by 1, 2 """)
Sobrescrevendo a tabela final na database indicadores_mercado:
df_devolucao_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_madeira")
3.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | decimal(29,2) |
valor_devolucao | decimal(29,2) |
4. Indicadores Financeiros
Indicadores financeiros disponibilizados pela Controladoria.
4.1 Origem:
A Controladoria atualiza mensalmente alguns arquivos Excel disponibilizados em um Sharepoint.
4.2 Transformação:
Foi desenvolvida uma função para fazer o download desses arquivos e gravar no diretório dbfs:
def download_arquivo(arquivo_download, nome_arquivo): ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha)) web = ctx.load(ctx.web).execute_query() response = File.open_binary(ctx, arquivo_download) response.raise_for_status() with open("/dbfs/FileStore/shared_uploads/arquivos_financeiros/" + nome_arquivo, "wb") as pasta: pasta.write(response.content)
Fazendo o download dos arquivos:
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fechamento%20Gerencial%202022.xlsm", "Fechamento_Gerencial_2022.xlsm") download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fluxo%20de%20Caixa%20Oficial%202022.xlsx", "Fluxo_de_Caixa_Oficial_2022.xlsx") download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20Consolidador.xlsx", "Forecast_3_9_2022.xlsx") download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20-%20Fluxo%20de%20Caixa%20Livre.xlsx", "FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx")
4.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | decimal(29,2) |
valor_devolucao | decimal(29,2) |
Add Comment