Conjunto de notebooks que têm como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para o projeto.
1. Indicadores Carteira Deca
Indicadores para pedidos em carteira Deca.
1.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema small e tabela tb_ordem_pendente.
1.2 Transformação:
Fazendo uma query com um select de todos os campos da tabela origem:
query = "select * from small.tb_ordem_pendente" multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings() bucket_name = multiple_run_parameters["bucket_name"] pedidos_carteira = redshift_to_dataframe(query = query, filename = "tb_ordem_pendente", bucket_name = bucket_name)
Renomeando as colunas:
colunas_pedidos_carteira = ["codigo_empresa", "numero_ordem_venda", "numero_sequencia_item_ordem_venda", "codigo_organizacao_vendas", "codigo_canal_distribuicao", "codigo_setor_atividade", "codigo_escritorio_vendas", "codigo_equipe_vendas", "codigo_emissor_ordem", "data_primeira_remessa", "data_emissao", "data_pedido_cliente", "motivo_recusa", "tipo_documento_ordem_venda", "quantidade_itens", "quantidade_faturada_ordem", "quantidade_pendente", "valor_liquido", "valor_faturado_ordem", "valor_pendente", "status_faturamento", "bloqueio_remessa_cliente", "status_verificacao_credito", "status_carteira", "codigo_produto", "descricao_produto", "data_atualizacao", "remessa", "data_desejada_remessa", "status_recusa", "status_item", "codigo_centro"] df_pedidos_carteira = pedidos_carteira.toDF(*colunas_pedidos_carteira)
Utilizando Spark SQL para fazer alguns ajustes e aplicar algumas regras de negócio:
df_carteira = spark.sql(""" select to_date(data_atualizacao) as data_referencia, (case when codigo_setor_atividade == 'HY' then 'hydra' when codigo_setor_atividade == 'MS' then 'metais' when codigo_setor_atividade == 'CS' then 'loucas' end) as negocio, sum(case when status_carteira in('Bloqueio Adm.', 'Credito', 'Limbo', 'Limbo Programado', 'Não classificado') then valor_pendente else 0 end) as valor_bloqueado, sum(case when status_carteira in('Programado', 'Remetido') then valor_pendente else 0 end) as valor_livre from pedidos_carteira where to_date(data_atualizacao) == current_date() and codigo_setor_atividade in('CS', 'HY', 'MS') group by 1, 2 """)
Fazendo append na tabela final na database indicadores_mercado:
df_carteira.write.mode("append").saveAsTable("indicadores_mercado.tb_indicadores_carteira_deca")
1.3 Base Final:
col_name | data_type |
---|---|
data_referencia | date |
negocio | string |
valor_bloqueado | double |
valor_livre | double |
2. Indicadores Devolução Deca
Indicadores para pedidos em status de devolução para Deca.
2.1 Origem:
Database large e tabela tb_resultado_comercial.
2.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
df_devolucoes_deca = spark.sql(""" select distinct to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes, (case when codigo_setor_atividade == 'MS' then 'metais' when codigo_setor_atividade == 'CS' then 'loucas' when codigo_setor_atividade == 'HY' then 'hydra' when codigo_setor_atividade == '01' then 'revestimento' end) as negocio, sum(case when tipo_documento_venda == 'S2' then valor_receita_liquida else 0 end) as valor_estorno, sum(case when status_ordem_venda == 'DEVOLUÇÃO' then abs(valor_receita_liquida) else 0 end) as valor_devolucao, sum(case when status_ordem_venda == 'FATURAMENTO' then valor_receita_liquida else 0 end) as valor_faturamento from large.tb_resultado_comercial where data_competencia between '2019-01-01' and current_date() - 1 and codigo_setor_atividade in('MS', 'CS', 'HY', '01') group by 1, 2 order by 1, 2 asc """)
Calculando o valor de devolução ajustado, conforme regra de negócio:
df_devolucoes_deca = df_devolucoes_deca.withColumn("valor_devolucao_ajustado", col("valor_devolucao") - col("valor_estorno"))
Sobrescrevendo a tabela final na database indicadores_mercado:
df_devolucoes_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_deca")
2.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | double |
valor_devolucao | double |
3. Indicadores Devolução Madeira
Indicadores para pedidos em status de devolução para Madeira.
3.1 Origem:
Database analytics_prd e tabela custos_rem.
3.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
df_devolucao_madeira = spark.sql(""" select perio as mes, (case when prctr like '%MDP' then 'mdp' when prctr like '%MDF' then 'mdf' else 'paineis' end) as negocio, sum(case when fkart in('ZREB', 'ZROB') then abs((vv089) - (vv001 + vv002 + vv003 + vv004)) else 0 end) as valor_devolucao, sum(case when fkart not in('ZREB', 'ZROB') then (vv089) - (vv001 + vv002 + vv003 + vv004) else 0 end) as valor_faturado from analytics_prd.custos_rem where spart == 'CH' and (prctr like '%MDP' or prctr like '%MDF') and perio >= 2019001 group by 1, 2 order by 1, 2 """)
Sobrescrevendo a tabela final na database indicadores_mercado:
df_devolucao_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_madeira")
3.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | decimal(29,2) |
valor_devolucao | decimal(29,2) |
4. Indicadores Financeiros
Indicadores financeiros disponibilizados pela Controladoria.
4.1 Origem:
A Controladoria atualiza mensalmente alguns arquivos Excel disponibilizados em no Sharepoint Relatório Controladoria.
4.2 Transformação:
Foi desenvolvida uma função para fazer o download desses arquivos e gravar no diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo): ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha)) web = ctx.load(ctx.web).execute_query() response = File.open_binary(ctx, arquivo_download) response.raise_for_status() with open("/dbfs/FileStore/shared_uploads/arquivos_financeiros/" + nome_arquivo, "wb") as pasta: pasta.write(response.content)
Fazendo o download dos arquivos:
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fechamento%20Gerencial%202022.xlsm", "Fechamento_Gerencial_2022.xlsm") download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fluxo%20de%20Caixa%20Oficial%202022.xlsx", "Fluxo_de_Caixa_Oficial_2022.xlsx") download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20Consolidador.xlsx", "Forecast_3_9_2022.xlsx") download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20-%20Fluxo%20de%20Caixa%20Livre.xlsx", "FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx")
4.2.1 Arquivo Fechamento_Gerencial_2022.xlsm
4.2.1.1 Indicadores do DRE
Definindo função para Indicadores do DRE:
#parâmetros da função: range de células e negócio("madeira", "deca", "revestimento" ou "consolidado") def ler_dre(celulas, negocio): #lendo range de células do excel utilizando o pandas.read_excel(arquivo, sheet, linhas desconsideradas, número de linhas, range células, coluna index) #transpondo a tabela utilizando .transpose() #selecionando colunas utilizando .iloc[] #resetando index df = pd.read_excel(fechamento_gerencial, "DRE CONSOL", skiprows = 1, nrows = 146, usecols = celulas, index_col = 0).transpose().iloc[:, [6, 127]].reset_index() #renomeando colunas df.columns = dre_header #transformando em dataframe do spark df = spark.createDataFrame(df) #criando coluna "negocio" df = df.selectExpr(dre_header[0], "'{}' as negocio".format(negocio), dre_header[1], dre_header[2]) #tratando coluna "mes" df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual) .when(df.mes.like("FEV%"), "01/02/" + ano_atual) .when(df.mes.like("MAR%"), "01/03/" + ano_atual) .when(df.mes.like("ABR%"), "01/04/" + ano_atual) .when(df.mes.like("MAI%"), "01/05/" + ano_atual) .when(df.mes.like("JUN%"), "01/06/" + ano_atual) .when(df.mes.like("JUL%"), "01/07/" + ano_atual) .when(df.mes.like("AGO%"), "01/08/" + ano_atual) .when(df.mes.like("SET%"), "01/09/" + ano_atual) .when(df.mes.like("OUT%"), "01/10/" + ano_atual) .when(df.mes.like("NOV%"), "01/11/" + ano_atual) .when(df.mes.like("DEZ%"), "01/12/" + ano_atual) .otherwise(None)) df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_dre_madeira = ler_dre("EL:EX", "madeira") df_dre_deca = ler_dre("B:N", "deca") df_dre_revestimento = ler_dre("HN:HZ", "revestimento") df_dre_consolidado = ler_dre("GT:HF", "consolidado")
Fazendo união de todos os dataframes gerados:
df_dre = df_dre_madeira.union(df_dre_deca).union(df_dre_revestimento).union(df_dre_consolidado)
4.2.1.2 Indicadores do EVA
Definindo função para Indicadores do EVA, lendo a sheet EVA do arquivo:
#indicadores de valor agregado: def ler_eva(celulas, negocio): df = pd.read_excel(fechamento_gerencial, "EVA", skiprows = 2, nrows = 50, usecols = celulas, index_col = 0).transpose().iloc[1:, 18].reset_index() df.columns = eva_header df = spark.createDataFrame(df) df = df.selectExpr(eva_header[0], "'{}' as negocio".format(negocio), eva_header[1]) df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual) .when(df.mes.like("FEV%"), "01/02/" + ano_atual) .when(df.mes.like("MAR%"), "01/03/" + ano_atual) .when(df.mes.like("ABR%"), "01/04/" + ano_atual) .when(df.mes.like("MAI%"), "01/05/" + ano_atual) .when(df.mes.like("JUN%"), "01/06/" + ano_atual) .when(df.mes.like("JUL%"), "01/07/" + ano_atual) .when(df.mes.like("AGO%"), "01/08/" + ano_atual) .when(df.mes.like("SET%"), "01/09/" + ano_atual) .when(df.mes.like("OUT%"), "01/10/" + ano_atual) .when(df.mes.like("NOV%"), "01/11/" + ano_atual) .when(df.mes.like("DEZ%"), "01/12/" + ano_atual) .otherwise(None)) df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_eva_madeira = ler_eva("GG:GT", "madeira") df_eva_deca = ler_eva("BA:BN", "deca") df_eva_revestimento = ler_eva("EY:FL", "revestimento") df_eva_consolidado = ler_eva("FP:GC", "consolidado")
Fazendo união de todos os dataframes gerados e ajustando a ordem de grandeza do campo eva_recorrente:
df_eva = df_eva_madeira.union(df_eva_deca).union(df_eva_revestimento).union(df_eva_consolidado).withColumn("eva_recorrente", col("eva_recorrente")/1000)
4.2.2 Arquivo Fluxo_de_Caixa_Oficial_2022.xlsx
4.2.2.1 Indicadores de Fluxo de Caixa
Definindo função para Indicadores de Fluxo de Caixa, lendo a sheet F.C. REAL do arquivo:
def ler_fluxo_caixa(celulas, negocio): df = pd.read_excel(fluxo_caixa, "F.C. REAL", skiprows = 1, nrows = 86, usecols = celulas, index_col = 0).transpose().iloc[1:, 54].reset_index() df.columns = fc_header df = spark.createDataFrame(df) df = df.selectExpr(fc_header[0], "'{}' as negocio".format(negocio), fc_header[1]) df = df.withColumn("mes", to_date("mes")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_fc_madeira = ler_fluxo_caixa("FN:GA", "madeira") df_fc_deca = ler_fluxo_caixa("BM:BZ", "deca") df_fc_revestimento = ler_fluxo_caixa("IT:JG", "revestimento") df_fc_consolidado = ler_fluxo_caixa("KJ:KW", "consolidado")
Fazendo união de todos os dataframes gerados:
df_fc = df_fc_madeira.union(df_fc_deca).union(df_fc_revestimento).union(df_fc_consolidado)
4.2.2.2 Indicadores de PMP
Definindo função para Indicadores de PMP, lendo a sheet CGL.ROL do arquivo:
def ler_pmp(celulas, negocio): df = pd.read_excel(fluxo_caixa, "CGL.ROL", skiprows = 1, nrows = 50, usecols = celulas, index_col = 0).transpose().iloc[1:, 26].reset_index() df.columns = pmp_header df = spark.createDataFrame(df) df = df.selectExpr(pmp_header[0], "'{}' as negocio".format(negocio), pmp_header[1]) df = df.withColumn("mes", to_date("mes")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_pmp_madeira = ler_pmp("DR:EE", "madeira") df_pmp_deca = ler_pmp("AU:BH", "deca") df_pmp_revestimento = ler_pmp("FZ:GM", "revestimento") df_pmp_consolidado = ler_pmp("GO:HB", "consolidado")
Fazendo união de todos os dataframes gerados:
df_pmp = df_pmp_madeira.union(df_pmp_deca).union(df_pmp_revestimento).union(df_pmp_consolidado)
4.2.3 Arquivo Forecast_3_9_2022.xlsx
4.2.3.1 Indicadores de Forecast
Definindo função para Indicadores de Forecast:
def ler_forecast(sheet, celulas, negocio): df = pd.read_excel(forecast, sheet, skiprows = 3, nrows = 83, usecols = celulas, index_col = 0).transpose().iloc[:, [6, 47]].reset_index() df.columns = frc_header df = spark.createDataFrame(df) df = df.selectExpr(frc_header[0], "'{}' as negocio".format(negocio), frc_header[1], frc_header[2]) df = df.withColumn("mes", to_date("mes")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_frc_dre_madeira = ler_forecast("DRE_MAD", "A:M", "madeira") df_frc_dre_deca = ler_forecast("DRE_DEC", "A:M", "deca") df_frc_dre_revestimento = ler_forecast("DRE_REVEST", "A:M", "revestimento") df_frc_dre_consolidado = ler_forecast("DRE_CONSOL", "A:M", "consolidado")
Fazendo união de todos os dataframes gerados:
df_frc = df_frc_dre_madeira.union(df_frc_dre_deca).union(df_frc_dre_revestimento).union(df_frc_dre_consolidado)
4.2.3.2 Indicadores de Forecast EVA
Definindo função para Indicadores de Forecast EVA, lendo a sheet Base EVA ROIC:
def ler_forecast_eva(negocio): #desinindo constantantes para cada negócio para utilizar posteriormente o iloc[] e localizar os dados necessários #esses valores pra neg são como os negócios são identificados no arquivo if negocio == "madeira": neg = "Mad. Total" elif negocio == "deca": neg = "Deca" elif negocio == "revestimento": neg = "RC" elif negocio == "consolidado": neg = "Consol" df = pd.read_excel(forecast, "Base EVA ROIC", skiprows = 4, nrows = 130, usecols = "B:Q") #buscando a linha em que a coluna 0 é igual ao negócio, coluna 1 igual a "Mês", coluna 2 igual a "RECORRENTE" e coluna 4 igual a "EVA" df = df.where( (df.iloc[:, 0] == neg) & (df.iloc[:, 1] == "Mês") & (df.iloc[:, 2] == "RECORRENTE") & (df.iloc[:, 3] == "EVA") ).dropna() df = df.transpose().iloc[4: ].reset_index() df.columns = frc_eva_header df = spark.createDataFrame(df) df = df.selectExpr(frc_eva_header[0], "'{}' as negocio".format(negocio), frc_eva_header[1]) df = df.withColumn("mes", when(df.mes == 1, "01/01/" + ano_atual) .when(df.mes == 2, "01/02/" + ano_atual) .when(df.mes == 3, "01/03/" + ano_atual) .when(df.mes == 4, "01/04/" + ano_atual) .when(df.mes == 5, "01/05/" + ano_atual) .when(df.mes == 6, "01/06/" + ano_atual) .when(df.mes == 7, "01/07/" + ano_atual) .when(df.mes == 8, "01/08/" + ano_atual) .when(df.mes == 9, "01/09/" + ano_atual) .when(df.mes == 10, "01/10/" + ano_atual) .when(df.mes == 11, "01/11/" + ano_atual) .when(df.mes == 12, "01/12/" + ano_atual) .otherwise(None)) df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_frc_eva_madeira = ler_forecast_eva("madeira") df_frc_eva_deca = ler_forecast_eva("deca") df_frc_eva_revestimento = ler_forecast_eva("revestimento") df_frc_eva_consolidado = ler_forecast_eva("consolidado")
Fazendo união de todos os dataframes gerados e ajustando a ordem de grandeza do campo frc_eva_recorrente:
df_frc_eva = df_frc_eva_madeira.union(df_frc_eva_deca).union(df_frc_eva_revestimento).union(df_frc_eva_consolidado).withColumn("frc_eva_recorrente", col("frc_eva_recorrente")/1000)
4.2.4 Arquivo FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx
4.2.4.1 Indicadores Forecast Fluxo de Caixa
Definindo função para Indicadores de Forecast fluxo de Caixa, lendo a sheet F.C. FCST:
def ler_fcf(celulas, negocio): df = pd.read_excel(fcf, "F.C. FCST", skiprows = 1, nrows = 87, usecols = celulas, index_col = 0).transpose().iloc[1:, 54].reset_index() df.columns = fcf_header df = spark.createDataFrame(df) df = df.selectExpr(fcf_header[0], "'{}' as negocio".format(negocio), fcf_header[1]) df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual) .when(df.mes.like("FEV%"), "01/02/" + ano_atual) .when(df.mes.like("MAR%"), "01/03/" + ano_atual) .when(df.mes.like("ABR%"), "01/04/" + ano_atual) .when(df.mes.like("MAI%"), "01/05/" + ano_atual) .when(df.mes.like("JUN%"), "01/06/" + ano_atual) .when(df.mes.like("JUL%"), "01/07/" + ano_atual) .when(df.mes.like("AGO%"), "01/08/" + ano_atual) .when(df.mes.like("SET%"), "01/09/" + ano_atual) .when(df.mes.like("OUT%"), "01/10/" + ano_atual) .when(df.mes.like("NOV%"), "01/11/" + ano_atual) .when(df.mes.like("DEZ%"), "01/12/" + ano_atual) .otherwise(None)) df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_fcf_madeira = ler_fcf("FN:GA", "madeira") df_fcf_deca = ler_fcf("BM:BZ", "deca") df_fcf_revestimento = ler_fcf("HD:HQ", "revestimento") df_fcf_consolidado = ler_fcf("IT:JG", "consolidado")
Fazendo união de todos os dataframes gerados:
df_frc_fc = df_fcf_madeira.union(df_fcf_deca).union(df_fcf_revestimento).union(df_fcf_consolidado)
4.2.5 Arquivo indicadores_poa_2022.csv
Foi feito o upload desse arquivo diretamente no dbfs do Databricks:
df_poa = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_financeiros/indicadores_poa_2022.csv")
Tratando alguns tipos de campos e ajustando a ordem de grandeza de outro:
df_poa = df_poa.select( "mes", "negocio", col("poa_receita_liquida_vendas").cast(DoubleType()), col("poa_ebitda_recorrente").cast(DoubleType()), col("poa_eva_recorrente").cast(DoubleType()), col("poa_fluxo_caixa_livre_total").cast(DoubleType()) ).withColumn("poa_eva_recorrente", col("poa_eva_recorrente")/1000).withColumn("poa_fluxo_caixa_livre_total", col("poa_fluxo_caixa_livre_total")/1000)
4.2.5 Base Final
Todos os dataframes gerados nos passos anteriores agora são unidos para formar um único, utilizando os campos mes e negocio:
campos_join = ["mes", "negocio"] df_indicadores_financeiros = df_dre.join( df_eva, campos_join ).join( df_fc, campos_join ).join( df_pmp, campos_join ).join( df_frc, campos_join ).join( df_frc_eva, campos_join ).join( df_frc_fc, campos_join ).join( df_poa, campos_join ).orderBy("negocio", "mes")
Sobrescrevendo a tabela final no database :
df_indicadores_financeiros.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_financeiros_{}".format(ano_atual))
4.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | decimal(29,2) |
valor_devolucao | decimal(29,2) |
Add Comment