Conjunto de notebooks que tem como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para serem consumidos pelo dashboard desenvolvido para o projeto.
1. Indicadores Carteira Deca
Indicadores para pedidos em carteira Deca.
Notebook: indicadores_carteira_deca
Job: inteligencia-mercado_job_prod_indicadores_carteira_deca
Schedule: diário/18:00h
Base fim: indicadores_mercado.tb_indicadores_carteira_deca
1.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema small e tabela tb_ordem_pendente.
1.2 Transformação:
Fazendo uma query com um select de todos os campos da tabela origem:
query = "select * from small.tb_ordem_pendente" multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings() bucket_name = multiple_run_parameters["bucket_name"] pedidos_carteira = redshift_to_dataframe(query = query, filename = "tb_ordem_pendente", bucket_name = bucket_name)
Renomeando as colunas:
colunas_pedidos_carteira = ["codigo_empresa", "numero_ordem_venda", "numero_sequencia_item_ordem_venda", "codigo_organizacao_vendas", "codigo_canal_distribuicao", "codigo_setor_atividade", "codigo_escritorio_vendas", "codigo_equipe_vendas", "codigo_emissor_ordem", "data_primeira_remessa", "data_emissao", "data_pedido_cliente", "motivo_recusa", "tipo_documento_ordem_venda", "quantidade_itens", "quantidade_faturada_ordem", "quantidade_pendente", "valor_liquido", "valor_faturado_ordem", "valor_pendente", "status_faturamento", "bloqueio_remessa_cliente", "status_verificacao_credito", "status_carteira", "codigo_produto", "descricao_produto", "data_atualizacao", "remessa", "data_desejada_remessa", "status_recusa", "status_item", "codigo_centro"] df_pedidos_carteira = pedidos_carteira.toDF(*colunas_pedidos_carteira)
Utilizando Spark SQL para fazer alguns ajustes e aplicar algumas regras de negócio:
df_carteira = spark.sql(""" select to_date(data_atualizacao) as data_referencia, (case when codigo_setor_atividade == 'HY' then 'hydra' when codigo_setor_atividade == 'MS' then 'metais' when codigo_setor_atividade == 'CS' then 'loucas' end) as negocio, sum(case when status_carteira in('Bloqueio Adm.', 'Credito', 'Limbo', 'Limbo Programado', 'Não classificado') then valor_pendente else 0 end) as valor_bloqueado, sum(case when status_carteira in('Programado', 'Remetido') then valor_pendente else 0 end) as valor_livre from pedidos_carteira where to_date(data_atualizacao) == current_date() and codigo_setor_atividade in('CS', 'HY', 'MS') group by 1, 2 """)
Fazendo append na tabela final na database indicadores_mercado:
df_carteira.write.mode("append").saveAsTable("indicadores_mercado.tb_indicadores_carteira_deca")
1.3 Base Final:
col_name | data_type |
---|---|
data_referencia | date |
negocio | string |
valor_bloqueado | double |
valor_livre | double |
2. Indicadores Devolução Deca
Indicadores para pedidos em status de devolução para Deca.
Notebook: indicadores_devolucao_deca
Job: inteligencia-mercado_job_prod_indicadores_devolucao_deca
Schedule: diário/14:00h
Base fim: indicadores_mercado.tb_indicadores_devolucoes_deca
2.1 Origem:
Database large e tabela tb_resultado_comercial.
2.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
df_devolucoes_deca = spark.sql(""" select distinct to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes, (case when codigo_setor_atividade == 'MS' then 'metais' when codigo_setor_atividade == 'CS' then 'loucas' when codigo_setor_atividade == 'HY' then 'hydra' when codigo_setor_atividade == '01' then 'revestimento' end) as negocio, sum(case when tipo_documento_venda == 'S2' then valor_receita_liquida else 0 end) as valor_estorno, sum(case when status_ordem_venda == 'DEVOLUÇÃO' then abs(valor_receita_liquida) else 0 end) as valor_devolucao, sum(case when status_ordem_venda == 'FATURAMENTO' then valor_receita_liquida else 0 end) as valor_faturamento from large.tb_resultado_comercial where data_competencia between '2019-01-01' and current_date() - 1 and codigo_setor_atividade in('MS', 'CS', 'HY', '01') group by 1, 2 order by 1, 2 asc """)
Calculando o valor de devolução ajustado, conforme regra de negócio:
df_devolucoes_deca = df_devolucoes_deca.withColumn("valor_devolucao_ajustado", col("valor_devolucao") - col("valor_estorno"))
Sobrescrevendo a tabela final na database indicadores_mercado:
df_devolucoes_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_deca")
2.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | double |
valor_devolucao | double |
3. Indicadores Devolução Madeira
Indicadores para pedidos em status de devolução para Madeira.
Notebook: indicadores_devolucao_madeira
Job: inteligencia-mercado_job_prod_indicadores_devolucao_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_devolucoes_madeira
3.1 Origem:
Database analytics_prd e tabela custos_rem.
3.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
df_devolucao_madeira = spark.sql(""" select perio as mes, (case when prctr like '%MDP' then 'mdp' when prctr like '%MDF' then 'mdf' else 'paineis' end) as negocio, sum(case when fkart in('ZREB', 'ZROB') then abs((vv089) - (vv001 + vv002 + vv003 + vv004)) else 0 end) as valor_devolucao, sum(case when fkart not in('ZREB', 'ZROB') then (vv089) - (vv001 + vv002 + vv003 + vv004) else 0 end) as valor_faturado from analytics_prd.custos_rem where spart == 'CH' and (prctr like '%MDP' or prctr like '%MDF') and perio >= 2019001 group by 1, 2 order by 1, 2 """)
Sobrescrevendo a tabela final na database indicadores_mercado:
df_devolucao_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_madeira")
3.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | decimal(29,2) |
valor_devolucao | decimal(29,2) |
4. Indicadores Financeiros
Indicadores financeiros disponibilizados pela Controladoria.
Notebook: indicadores_financeiros
Job: inteligencia-mercado_job_prod_indicadores_financeiros
Schedule: diário/15:00h
Base fim: indicadores_mercado.tb_indicadores_financeiros_2022
4.1 Origem:
A Controladoria atualiza mensalmente alguns arquivos Excel disponibilizados em no Sharepoint Relatório Controladoria.
4.2 Transformação:
Foi desenvolvida uma função para fazer o download desses arquivos e gravar no diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo): ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha)) web = ctx.load(ctx.web).execute_query() response = File.open_binary(ctx, arquivo_download) response.raise_for_status() with open("/dbfs/FileStore/shared_uploads/arquivos_financeiros/" + nome_arquivo, "wb") as pasta: pasta.write(response.content)
Fazendo o download dos arquivos:
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fechamento%20Gerencial%202022.xlsm", "Fechamento_Gerencial_2022.xlsm") download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fluxo%20de%20Caixa%20Oficial%202022.xlsx", "Fluxo_de_Caixa_Oficial_2022.xlsx") download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20Consolidador.xlsx", "Forecast_3_9_2022.xlsx") download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20-%20Fluxo%20de%20Caixa%20Livre.xlsx", "FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx")
4.2.1 Arquivo Fechamento_Gerencial_2022.xlsm
4.2.1.1 Indicadores do DRE
Definindo função para Indicadores do DRE:
#parâmetros da função: range de células e negócio("madeira", "deca", "revestimento" ou "consolidado") def ler_dre(celulas, negocio): #lendo range de células do excel utilizando o pandas.read_excel(arquivo, sheet, linhas desconsideradas, número de linhas, range células, coluna index) #transpondo a tabela utilizando .transpose() #selecionando colunas utilizando .iloc[] #resetando index df = pd.read_excel(fechamento_gerencial, "DRE CONSOL", skiprows = 1, nrows = 146, usecols = celulas, index_col = 0).transpose().iloc[:, [6, 127]].reset_index() #renomeando colunas df.columns = dre_header #transformando em dataframe do spark df = spark.createDataFrame(df) #criando coluna "negocio" df = df.selectExpr(dre_header[0], "'{}' as negocio".format(negocio), dre_header[1], dre_header[2]) #tratando coluna "mes" df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual) .when(df.mes.like("FEV%"), "01/02/" + ano_atual) .when(df.mes.like("MAR%"), "01/03/" + ano_atual) .when(df.mes.like("ABR%"), "01/04/" + ano_atual) .when(df.mes.like("MAI%"), "01/05/" + ano_atual) .when(df.mes.like("JUN%"), "01/06/" + ano_atual) .when(df.mes.like("JUL%"), "01/07/" + ano_atual) .when(df.mes.like("AGO%"), "01/08/" + ano_atual) .when(df.mes.like("SET%"), "01/09/" + ano_atual) .when(df.mes.like("OUT%"), "01/10/" + ano_atual) .when(df.mes.like("NOV%"), "01/11/" + ano_atual) .when(df.mes.like("DEZ%"), "01/12/" + ano_atual) .otherwise(None)) df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_dre_madeira = ler_dre("EL:EX", "madeira") df_dre_deca = ler_dre("B:N", "deca") df_dre_revestimento = ler_dre("HN:HZ", "revestimento") df_dre_consolidado = ler_dre("GT:HF", "consolidado")
Fazendo união de todos os dataframes gerados:
df_dre = df_dre_madeira.union(df_dre_deca).union(df_dre_revestimento).union(df_dre_consolidado)
4.2.1.2 Indicadores do EVA
Definindo função para Indicadores do EVA, lendo a sheet EVA do arquivo:
#indicadores de valor agregado: def ler_eva(celulas, negocio): df = pd.read_excel(fechamento_gerencial, "EVA", skiprows = 2, nrows = 50, usecols = celulas, index_col = 0).transpose().iloc[1:, 18].reset_index() df.columns = eva_header df = spark.createDataFrame(df) df = df.selectExpr(eva_header[0], "'{}' as negocio".format(negocio), eva_header[1]) df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual) .when(df.mes.like("FEV%"), "01/02/" + ano_atual) .when(df.mes.like("MAR%"), "01/03/" + ano_atual) .when(df.mes.like("ABR%"), "01/04/" + ano_atual) .when(df.mes.like("MAI%"), "01/05/" + ano_atual) .when(df.mes.like("JUN%"), "01/06/" + ano_atual) .when(df.mes.like("JUL%"), "01/07/" + ano_atual) .when(df.mes.like("AGO%"), "01/08/" + ano_atual) .when(df.mes.like("SET%"), "01/09/" + ano_atual) .when(df.mes.like("OUT%"), "01/10/" + ano_atual) .when(df.mes.like("NOV%"), "01/11/" + ano_atual) .when(df.mes.like("DEZ%"), "01/12/" + ano_atual) .otherwise(None)) df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_eva_madeira = ler_eva("GG:GT", "madeira") df_eva_deca = ler_eva("BA:BN", "deca") df_eva_revestimento = ler_eva("EY:FL", "revestimento") df_eva_consolidado = ler_eva("FP:GC", "consolidado")
Fazendo união de todos os dataframes gerados e ajustando a ordem de grandeza do campo eva_recorrente:
df_eva = df_eva_madeira.union(df_eva_deca).union(df_eva_revestimento).union(df_eva_consolidado).withColumn("eva_recorrente", col("eva_recorrente")/1000)
4.2.2 Arquivo Fluxo_de_Caixa_Oficial_2022.xlsx
4.2.2.1 Indicadores de Fluxo de Caixa
Definindo função para Indicadores de Fluxo de Caixa, lendo a sheet F.C. REAL do arquivo:
def ler_fluxo_caixa(celulas, negocio): df = pd.read_excel(fluxo_caixa, "F.C. REAL", skiprows = 1, nrows = 86, usecols = celulas, index_col = 0).transpose().iloc[1:, 54].reset_index() df.columns = fc_header df = spark.createDataFrame(df) df = df.selectExpr(fc_header[0], "'{}' as negocio".format(negocio), fc_header[1]) df = df.withColumn("mes", to_date("mes")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_fc_madeira = ler_fluxo_caixa("FN:GA", "madeira") df_fc_deca = ler_fluxo_caixa("BM:BZ", "deca") df_fc_revestimento = ler_fluxo_caixa("IT:JG", "revestimento") df_fc_consolidado = ler_fluxo_caixa("KJ:KW", "consolidado")
Fazendo união de todos os dataframes gerados:
df_fc = df_fc_madeira.union(df_fc_deca).union(df_fc_revestimento).union(df_fc_consolidado)
4.2.2.2 Indicadores de PMP
Definindo função para Indicadores de PMP, lendo a sheet CGL.ROL do arquivo:
def ler_pmp(celulas, negocio): df = pd.read_excel(fluxo_caixa, "CGL.ROL", skiprows = 1, nrows = 50, usecols = celulas, index_col = 0).transpose().iloc[1:, 26].reset_index() df.columns = pmp_header df = spark.createDataFrame(df) df = df.selectExpr(pmp_header[0], "'{}' as negocio".format(negocio), pmp_header[1]) df = df.withColumn("mes", to_date("mes")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_pmp_madeira = ler_pmp("DR:EE", "madeira") df_pmp_deca = ler_pmp("AU:BH", "deca") df_pmp_revestimento = ler_pmp("FZ:GM", "revestimento") df_pmp_consolidado = ler_pmp("GO:HB", "consolidado")
Fazendo união de todos os dataframes gerados:
df_pmp = df_pmp_madeira.union(df_pmp_deca).union(df_pmp_revestimento).union(df_pmp_consolidado)
4.2.3 Arquivo Forecast_3_9_2022.xlsx
4.2.3.1 Indicadores de Forecast
Definindo função para Indicadores de Forecast:
def ler_forecast(sheet, celulas, negocio): df = pd.read_excel(forecast, sheet, skiprows = 3, nrows = 83, usecols = celulas, index_col = 0).transpose().iloc[:, [6, 47]].reset_index() df.columns = frc_header df = spark.createDataFrame(df) df = df.selectExpr(frc_header[0], "'{}' as negocio".format(negocio), frc_header[1], frc_header[2]) df = df.withColumn("mes", to_date("mes")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_frc_dre_madeira = ler_forecast("DRE_MAD", "A:M", "madeira") df_frc_dre_deca = ler_forecast("DRE_DEC", "A:M", "deca") df_frc_dre_revestimento = ler_forecast("DRE_REVEST", "A:M", "revestimento") df_frc_dre_consolidado = ler_forecast("DRE_CONSOL", "A:M", "consolidado")
Fazendo união de todos os dataframes gerados:
df_frc = df_frc_dre_madeira.union(df_frc_dre_deca).union(df_frc_dre_revestimento).union(df_frc_dre_consolidado)
4.2.3.2 Indicadores de Forecast EVA
Definindo função para Indicadores de Forecast EVA, lendo a sheet Base EVA ROIC:
def ler_forecast_eva(negocio): #desinindo constantantes para cada negócio para utilizar posteriormente o iloc[] e localizar os dados necessários #esses valores pra neg são como os negócios são identificados no arquivo if negocio == "madeira": neg = "Mad. Total" elif negocio == "deca": neg = "Deca" elif negocio == "revestimento": neg = "RC" elif negocio == "consolidado": neg = "Consol" df = pd.read_excel(forecast, "Base EVA ROIC", skiprows = 4, nrows = 130, usecols = "B:Q") #buscando a linha em que a coluna 0 é igual ao negócio, coluna 1 igual a "Mês", coluna 2 igual a "RECORRENTE" e coluna 4 igual a "EVA" df = df.where( (df.iloc[:, 0] == neg) & (df.iloc[:, 1] == "Mês") & (df.iloc[:, 2] == "RECORRENTE") & (df.iloc[:, 3] == "EVA") ).dropna() df = df.transpose().iloc[4: ].reset_index() df.columns = frc_eva_header df = spark.createDataFrame(df) df = df.selectExpr(frc_eva_header[0], "'{}' as negocio".format(negocio), frc_eva_header[1]) df = df.withColumn("mes", when(df.mes == 1, "01/01/" + ano_atual) .when(df.mes == 2, "01/02/" + ano_atual) .when(df.mes == 3, "01/03/" + ano_atual) .when(df.mes == 4, "01/04/" + ano_atual) .when(df.mes == 5, "01/05/" + ano_atual) .when(df.mes == 6, "01/06/" + ano_atual) .when(df.mes == 7, "01/07/" + ano_atual) .when(df.mes == 8, "01/08/" + ano_atual) .when(df.mes == 9, "01/09/" + ano_atual) .when(df.mes == 10, "01/10/" + ano_atual) .when(df.mes == 11, "01/11/" + ano_atual) .when(df.mes == 12, "01/12/" + ano_atual) .otherwise(None)) df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_frc_eva_madeira = ler_forecast_eva("madeira") df_frc_eva_deca = ler_forecast_eva("deca") df_frc_eva_revestimento = ler_forecast_eva("revestimento") df_frc_eva_consolidado = ler_forecast_eva("consolidado")
Fazendo união de todos os dataframes gerados e ajustando a ordem de grandeza do campo frc_eva_recorrente:
df_frc_eva = df_frc_eva_madeira.union(df_frc_eva_deca).union(df_frc_eva_revestimento).union(df_frc_eva_consolidado).withColumn("frc_eva_recorrente", col("frc_eva_recorrente")/1000)
4.2.4 Arquivo FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx
4.2.4.1 Indicadores Forecast Fluxo de Caixa
Definindo função para Indicadores de Forecast fluxo de Caixa, lendo a sheet F.C. FCST:
def ler_fcf(celulas, negocio): df = pd.read_excel(fcf, "F.C. FCST", skiprows = 1, nrows = 87, usecols = celulas, index_col = 0).transpose().iloc[1:, 54].reset_index() df.columns = fcf_header df = spark.createDataFrame(df) df = df.selectExpr(fcf_header[0], "'{}' as negocio".format(negocio), fcf_header[1]) df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual) .when(df.mes.like("FEV%"), "01/02/" + ano_atual) .when(df.mes.like("MAR%"), "01/03/" + ano_atual) .when(df.mes.like("ABR%"), "01/04/" + ano_atual) .when(df.mes.like("MAI%"), "01/05/" + ano_atual) .when(df.mes.like("JUN%"), "01/06/" + ano_atual) .when(df.mes.like("JUL%"), "01/07/" + ano_atual) .when(df.mes.like("AGO%"), "01/08/" + ano_atual) .when(df.mes.like("SET%"), "01/09/" + ano_atual) .when(df.mes.like("OUT%"), "01/10/" + ano_atual) .when(df.mes.like("NOV%"), "01/11/" + ano_atual) .when(df.mes.like("DEZ%"), "01/12/" + ano_atual) .otherwise(None)) df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy")) return df
Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_fcf_madeira = ler_fcf("FN:GA", "madeira") df_fcf_deca = ler_fcf("BM:BZ", "deca") df_fcf_revestimento = ler_fcf("HD:HQ", "revestimento") df_fcf_consolidado = ler_fcf("IT:JG", "consolidado")
Fazendo união de todos os dataframes gerados:
df_frc_fc = df_fcf_madeira.union(df_fcf_deca).union(df_fcf_revestimento).union(df_fcf_consolidado)
4.2.5 Arquivo indicadores_poa_2022.csv
Foi feito o upload desse arquivo diretamente no dbfs do Databricks:
df_poa = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_financeiros/indicadores_poa_2022.csv")
Tratando alguns tipos de campos e ajustando a ordem de grandeza de outro:
df_poa = df_poa.select( "mes", "negocio", col("poa_receita_liquida_vendas").cast(DoubleType()), col("poa_ebitda_recorrente").cast(DoubleType()), col("poa_eva_recorrente").cast(DoubleType()), col("poa_fluxo_caixa_livre_total").cast(DoubleType()) ).withColumn("poa_eva_recorrente", col("poa_eva_recorrente")/1000).withColumn("poa_fluxo_caixa_livre_total", col("poa_fluxo_caixa_livre_total")/1000)
4.2.6 Base Final
Todos os dataframes gerados nos passos anteriores agora são unidos para formar um único, utilizando os campos mes e negocio:
campos_join = ["mes", "negocio"] df_indicadores_financeiros = df_dre.join( df_eva, campos_join ).join( df_fc, campos_join ).join( df_pmp, campos_join ).join( df_frc, campos_join ).join( df_frc_eva, campos_join ).join( df_frc_fc, campos_join ).join( df_poa, campos_join ).orderBy("negocio", "mes")
Sobrescrevendo a tabela final no database :
df_indicadores_financeiros.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_financeiros_{}".format(ano_atual))
4.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
receita_liquida_vendas | double |
ebitda_recorrente | double |
eva_recorrente | double |
fluxo_caixa_livre_total | double |
pmp | double |
frc_receita_liquida_vendas | double |
frc_ebitda_recorrente | double |
frc_eva_recorrente | double |
frc_fluxo_caixa_livre_total | double |
poa_receita_liquida_vendas | double |
poa_ebitda_recorrente | double |
poa_eva_recorrente | double |
poa_fluxo_caixa_livre_total | double |
5. Indicadores Gente
Indicadores de Recursos Humanos.
Notebook: indicadores_gente
Job: inteligencia-mercado_job_prod_indicadores_gente
Schedule: sextas-feiras, 14:00h
Base fim: indicadores_mercado.tb_indicadores_gente
5.1 Origem:
O arquivo indicadores_gente.csv é disponibilizado e atualizado mensalmente no Sharepoint.
5.2 Transformação:
Definindo função para o download do arquivo para o dbfs:
def download_arquivo(arquivo_download, nome_arquivo): ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha)) web = ctx.load(ctx.web).execute_query() response = File.open_binary(ctx, arquivo_download) response.raise_for_status() with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta: pasta.write(response.content)
Lendo o arquivo:
df_gente = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/indicadores_gente.csv")
Aplicando regra de negócio para o campo taxa_afastamento:
df_gente = df_gente.withColumn("taxa_afastamento", col("taxa_absenteismo_com_afastamento") - col("taxa_absenteismo_sem_afastamento"))
Sobrescrevendo a tabela final na database indicadores_mercado:
df_gente.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_gente")
5.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
quantidade_cargos_lideranca_mulheres | int |
quantidade_cargos_lideranca_total | int |
taxa_desligamentos_voluntarios | double |
taxa_desligamentos_involuntarios | double |
taxa_desligamentos_total | double |
taxa_absenteismo_sem_afastamento | double |
taxa_absenteismo_com_afastamento | double |
taxa_afastamento | double |
7. Indicadores Margem Madeira
Indicadores de Margem para Madeira.
Notebook: indicadores_margem_madeira
Job: inteligencia-mercado_job_prod_indicadores_margem_madeira
Schedule: diário/14:22h
Base fim: indicadores_mercado.tb_indicadores_margem_madeira
7.1 Origem:
É feita uma consulta na fonte de dados vw_ren_rateio_aj_SQL, no site Madeira do Tableau Server:
7.2 Transformação:
São utilizadas as bibliotecas tableauserverclient e tableauhyperapi do Python:
import tableauserverclient as tsc import tableauhyperapi from tableauhyperapi import HyperProcess, Telemetry, Connection, TableName, escape_name, escape_string_literal
Definindo variáveis para conexão no Tableau Server:
usuario = "***" senha = "***" site = "Madeira" servidor = "https://analytics.duratex.com.br" id_datasource = "e88f21c1-bb2f-4516-b612-b62e8af74b94" nome_database = "vw_ren_rateio_aj_SQL" diretorio_download = "/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/"
Definindo parâmetros da conexão com o Tableau Server:
autenticador = tsc.TableauAuth(usuario, senha, site) servidor = tsc.Server(servidor)
Fazendo o download do arquivo .tdsx:
with servidor.auth.sign_in(autenticador): caminho_arquivo = servidor.datasources.download(id_datasource, filepath = diretorio_download, include_extract = True)
O arquivo de extensão .tdsx nada mais é que uma compactação dos arquivos de fonte de dados do Tableau. Portanto foi utilizada a biblioteca zipfile do Python para fazer essa descompactação em diretório do dbfs:
with zipfile.ZipFile(diretorio_download + nome_database + ".tdsx", "r") as arquivo_zipado: arquivo_zipado.extractall(diretorio_download)
Após a descompactação, é utilizada a biblioteca tableauhyperapi para extrair os dados do arquivo .hyper, que foi descompactado no diretório /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Data/Extracts/ no processo anterior. É feita uma query para trazer apenas os campos necessários:
with HyperProcess(telemetry = Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper: with Connection(endpoint = hyper.endpoint, database = diretorio_download + "Data/Extracts/" + nome_database + ".hyper") as conexao: with conexao.execute_query(query = f"select {escape_name('cd_competencia')}, {escape_name('dc_linha_produto')}, {escape_name('gerencia')}, {escape_name('vl_receita_aj')}, {escape_name('md_m3_quantidade')}, {escape_name('vl_custo_industrial_total_CIT')}, {escape_name('vl_custo_comercial_total_CCO')} from {TableName('Extract', 'Extract')} where {escape_name('cd_setor_atividade')} = {escape_string_literal('CH')} and {escape_name('cd_competencia')} >= 202101 and {escape_name('gerencia')} = {escape_string_literal('MERCADO EXTERNO')}") as resultado: resultado_consulta = list(resultado)
Definindo schema para o dataframe que será criado a partir da lista resultado_consulta:
esquema_margem = StructType([ StructField('mes', StringType(), True), StructField('produto', StringType(), True), StructField('mercado', StringType(), True), StructField('receita', DoubleType(), True), StructField('quantidade', DoubleType(), True), StructField('valor_cit', DoubleType(), True), StructField('valor_cco', DoubleType(), True) ])
Criando o dataframe:
df_margem = spark.createDataFrame(resultado_consulta, esquema_margem)
Utilizando o Spark SQL para aplicar algumas regras de negócio:
df_margem_calculos = spark.sql(""" select mes, (case when produto like 'MDF%' then 'mdf' when produto like 'MDP%' then 'mdp' else 'outros' end) as produto, (sum(receita) / sum(quantidade)) as receita_m3, (sum(valor_cit) / sum(quantidade)) as cit_m3, (sum(valor_cco) / sum(quantidade)) as cco_m3 from margem_view group by 1, 2 order by 1, 2 asc """)
Aplicando mais algumas regras de negócio:
df_margem_calculos = df_margem_calculos.withColumn("mop_m3", col("receita_m3") - col("cit_m3") - col("cco_m3")).withColumn("percentual_margem", col("mop_m3") / col("receita_m3"))
Sobrescrevendo a tabela final na database indicadores_mercado:
df_margem_calculos.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_margem_madeira")
7.3 Base Final:
col_name | data_type |
---|---|
mes | string |
produto | string |
receita_m3 | double |
cit_m3 | double |
cco_m3 | double |
mop_m3 | double |
9. Indicadores OEE Madeira
Indicadores de OEE para Madeira.
Notebook: indicadores_oee_madeira
Job: inteligencia-mercado_job_prod_indicadores_oee_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_oee_madeira
9.1 Origem:
O arquivo indicadores_oee_madeira.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.
9.2 Transformação:
Definindo função para ler o arquivo:
def extrair_excel(arquivo_leitura, nome_sheet, celulas, colunas, pular_linhas): ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha)) web = ctx.load(ctx.web).execute_query() response = File.open_binary(ctx, arquivo_leitura) response.raise_for_status() df = pd.read_excel(response.content, skiprows = pular_linhas, usecols = celulas, sheet_name = nome_sheet, names = colunas) df = spark.createDataFrame(df) return df
Lendo a sheet cru do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas. Utilizando o while e a função isinstance() para repetir o processo até que o dataframe esteja criado:
df_oee_cru = [] while not isinstance(df_oee_cru, DataFrame): try: df_oee_cru = extrair_excel(arquivo, "cru", "B:D", ["mes", "oee", "oee_meta_global"], 0) except: pass
Definindo o nome do produto como constante:
df_oee_cru = df_oee_cru.withColumn("produto", lit("cru"))
Lendo a sheet revestido do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:
df_oee_revestido = [] while not isinstance(df_oee_revestido, DataFrame): try: df_oee_revestido = extrair_excel(arquivo, "revestido", "B:D", ["mes", "oee", "oee_meta_global"], 0) except: pass
Definindo o nome do produto como constante:
df_oee_revestido = df_oee_revestido.withColumn("produto", lit("revestido"))
Lendo a sheet historico do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:
df_oee_historico = [] while not isinstance(df_oee_historico, DataFrame): try: df_oee_historico = extrair_excel(arquivo, "historico", "B:E", ["mes", "oee", "oee_meta_global", "produto"], 0) except: pass
Fazendo a união de todos os dataframes criados:
df_oee_madeira = df_oee_historico.union(df_oee_cru).union(df_oee_revestido)
Sobrescrevendo a tabela final na database indicadores_mercado:
df_oee_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_oee_madeira")
9.3 Base Final:
col_name | data_type |
---|---|
mes | date |
oee | double |
oee_meta_global | double |
produto | string |
10. Indicadores OTIF Deca
Indicadores de OTIF para Deca.
Notebook: indicadores_otif_deca
Job: inteligencia-mercado_job_prod_indicadores_otif_deca
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_otif_deca
10.1 Origem:
O arquivo dados_otif_deca.csv é disponibilizado e atualizado mensalmente no Sharepoint.
10.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo): ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha)) web = ctx.load(ctx.web).execute_query() response = File.open_binary(ctx, arquivo_download) response.raise_for_status() with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta: pasta.write(response.content)
Lendo arquivo:
df_otif_deca = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_deca.csv")
Sobrescrevendo a tabela final na database indicadores_mercado:
df_otif_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_deca")
10.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento_deca | string |
otif_deca | double |
meta_otif_deca | double |
11. Indicadores OTIF Madeira
Indicadores de OTIF para Madeira.
Notebook: indicadores_otif_madeira
Job: inteligencia-mercado_job_prod_indicadores_otif_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_otif_madeira
11.1 Origem:
O arquivo dados_otif_madeira.csv é disponibilizado e atualizado mensalmente no Sharepoint.
11.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo): ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha)) web = ctx.load(ctx.web).execute_query() response = File.open_binary(ctx, arquivo_download) response.raise_for_status() with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta: pasta.write(response.content)
Lendo arquivo:
df_otif_madeira = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_madeira.csv")
Sobrescrevendo a tabela final na database indicadores_mercado:
df_otif_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_madeira")
11.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento_madeira | string |
otif_madeira | double |
meta_otif_madeira | double |
12. Indicadores Panorama Mercado Madeira
Indicadores de Panorama de Mercado para Madeira.
Notebook: indicadores_panorama_mercado_madeira
Job: inteligencia-mercado_job_prod_indicadores_panorama_mercado_madeira
Schedule: diário/13:00h
Base fim: indicadores_mercado.tb_indicadores_panorama_mercado_madeira
12.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e tabela vw_mercado_panorama_v2.
12.2 Transformação:
É feita uma query da tabela já aplicando algumas regras de negócio:
query = "select competencia, mercado, case when produto_detalhe in('MDF Fino Cru', 'MDF Grosso Cru') then 'MDF Cru' when produto_detalhe in('MDF Fino BP', 'MDF Grosso BP') then 'MDF Revestido' when produto_detalhe = 'MDP Cru' then 'MDP Cru' when produto_detalhe = 'MDP BP' then 'MDP Revestido' end as produto, sum(volume_m3) as volume_m3_total from madeira.vw_mercado_panorama_v2 where competencia >= 202101 and mercado = 'MI' group by 1, 2, 3 order by 1"
Renomeando campos:
colunas_panorama_mercado = ["mes", "mercado", "produto", "volume_m3_total"] df_panorama_mercado = panorama_mercado.toDF(*colunas_panorama_mercado)
Ajustando campo mes:
df_panorama_mercado = df_panorama_mercado.withColumn("mes", col("mes").cast(StringType())).withColumn("mes", concat(substring(col("mes"), 1, 4), lit("-"), substring(col("mes"), 5, 2), lit("-"), lit("01"))).withColumn("mes", col("mes").cast(DateType()))
Sobrescrevendo a tabela final na database indicadores_mercado:
df_panorama_mercado.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_panorama_mercado_madeira")
12.3 Base Final:
col_name | data_type |
---|---|
mes | date |
mercado | string |
produto | string |
volume_m3_total | double |
13. Indicadores Produtividade Deca
Indicadores de Produtividade para Deca.
Notebook: indicadores_produtividade_deca
Job: inteligencia-mercado_job_prod_indicadores_produtividade_deca
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_minuto_homem_deca
13.1 Origem:
O arquivo Produtividade DECA&RC.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.
13.2 Transformação:
Definindo função para fazer o download do arquivo para diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo): ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha)) web = ctx.load(ctx.web).execute_query() response = File.open_binary(ctx, arquivo_download) with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta: pasta.write(response.content)
Fazendo o download do arquivo:
download_arquivo("/personal/henrique_fantin_duratex_com_br/Documents/Min_HEq/Produtividade%20DECA%26RC.xlsx", "Produtividade_DECA_RC.xlsx")
Lendo o arquivo:
produtividade_deca = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Produtividade_DECA_RC.xlsx")
Definindo função para ler os dados dos diferentes segmentos Deca. Passando os parâmetros de negócio, range de células, quantidade de linhas desconsideradas e quantidade de linhas consideradas, a função gera dois dataframes, um para produtividade e outro para POA. Isso é feito através de filtro na coluna: quando é diferente de POA, então é produtividade.
def ler_produtividade(negocio, celulas, pular_linhas, numero_linhas): #lendo arquivo excel df = pd.read_excel(produtividade_deca, skiprows = pular_linhas, nrows = numero_linhas, usecols = celulas, index_col = 0).transpose().reset_index() #definindo dataframe para o poa df_poa = df #definindo dataframe para produtividade, coluna 1 é diferente de "POA" df = df.where(df.iloc[:, 1] != "POA") df = df.iloc[:, [0, 1, numero_linhas]] df.columns = ["mes", "ano", "produtividade"] df = df.dropna(axis = 0) df = spark.createDataFrame(df) df = df.withColumn("ano", col("ano").cast(IntegerType()).cast(StringType())) #ajustandando campo mes para produtividade df = df.withColumn("mes", when(col("mes").like("JAN%"), concat(lit("01/01/"), col("ano"))) .when(col("mes").like("FEV%"), concat(lit("01/02/"), col("ano"))) .when(col("mes").like("MAR%"), concat(lit("01/03/"), col("ano"))) .when(col("mes").like("ABR%"), concat(lit("01/04/"), col("ano"))) .when(col("mes").like("MAI%"), concat(lit("01/05/"), col("ano"))) .when(col("mes").like("JUN%"), concat(lit("01/06/"), col("ano"))) .when(col("mes").like("JUL%"), concat(lit("01/07/"), col("ano"))) .when(col("mes").like("AGO%"), concat(lit("01/08/"), col("ano"))) .when(col("mes").like("SET%"), concat(lit("01/09/"), col("ano"))) .when(col("mes").like("OUT%"), concat(lit("01/10/"), col("ano"))) .when(col("mes").like("NOV%"), concat(lit("01/11/"), col("ano"))) .when(col("mes").like("DEZ%"), concat(lit("01/12/"), col("ano"))) .otherwise(None)) df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy")) #trabalhando com dataframe para poa, quando coluna 1 é igual a "POA" df_poa = df_poa.where(df_poa.iloc[:, 1] == "POA") df_poa = df_poa.iloc[:, [0, numero_linhas]] df_poa .columns = ["mes", "poa_produtividade"] df_poa = df_poa.dropna(axis = 0) df_poa = spark.createDataFrame(df_poa) #ajustando campo mes df_poa = df_poa.withColumn("mes", when(col("mes").like("JAN%"), "01/01/" + ano_atual) .when(col("mes").like("FEV%"), "01/02/" + ano_atual) .when(col("mes").like("MAR%"), "01/03/" + ano_atual) .when(col("mes").like("ABR%"), "01/04/" + ano_atual) .when(col("mes").like("MAI%"), "01/05/" + ano_atual) .when(col("mes").like("JUN%"), "01/06/" + ano_atual) .when(col("mes").like("JUL%"), "01/07/" + ano_atual) .when(col("mes").like("AGO%"), "01/08/" + ano_atual) .when(col("mes").like("SET%"), "01/09/" + ano_atual) .when(col("mes").like("OUT%"), "01/10/" + ano_atual) .when(col("mes").like("NOV%"), "01/11/" + ano_atual) .when(col("mes").like("DEZ%"), "01/12/" + ano_atual) .otherwise(None)) df_poa = df_poa.withColumn("mes", to_date("mes", "dd/MM/yyyy")) #definindo dataframe final, fazendo um join com os anteriores df_final = df_poa.join(df, df_poa.mes == df.mes, "left").drop(df.mes) df_final = df_final.withColumn("segmento", lit(negocio)) df_final = df_final.select("mes", "segmento", "poa_produtividade", "produtividade") df_final = df_final.withColumn("poa_produtividade", round(col("poa_produtividade"), 2)).withColumn("produtividade", round(col("produtividade"), 2)) return df_final
Aplicando função para metais, já definindo os parâmetros de range de células, quantidade de linhas ignoradas e quantidade de linhas consideradas:
df_metais = ler_produtividade("metais", "FA:FZ", 21, 5)
Aplicando função para loucas:
df_loucas = ler_produtividade("loucas", "FA:FZ", 54, 8)
Aplicando função para Hydra:
df_hydra = ler_produtividade("hydra", "FA:FZ", 93, 4)
Aplicando função para revestimento:
df_revestimento = ler_produtividade("revestimento", "FA:FZ", 145, 7)
Fazendo a união de todos os dataframes criados:
df_produtividade_deca = df_metais.union(df_loucas).union(df_hydra).union(df_revestimento)
Sobrescrevendo a tabela final na database indicadores_mercado:
df_produtividade_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_minuto_homem_deca")
13.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento | string |
poa_produtividade | double |
produtividade | double |
14. Indicadores SAC
Indicadores de SAC.
Notebook: indicadores_sac
Job: inteligencia-mercado_job_prod_indicadores_sac
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_sac
14.1 Origem:
O arquivo dados_sac.csv é disponibilizado e atualizado mensalmente no Sharepoint.
14.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo): ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha)) web = ctx.load(ctx.web).execute_query() response = File.open_binary(ctx, arquivo_download) response.raise_for_status() with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta: pasta.write(response.content)
Lendo arquivo:
df_sac = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_sac.csv")
Aplicando regra de negócio para o campo saldo:
df_sac = df_sac.withColumn("saldo", col("casos_abertos") - col("casos_fechados"))
Sobrescrevendo a tabela final na database indicadores_mercado:
df_sac.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sac")
14.3 Base Final:
col_name | data_type |
---|---|
mes | date |
visao | string |
negocio | string |
casos_abertos | int |
casos_fechados | int |
saldo | int |
15. Indicadores Sell-in Deca - Chuveiros
Indicadores de Sell-in para Deca.
Notebook: indicadores_sell_in_deca_chuveiros
Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_chuveiros
15.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx.
15.2 Transformação:
Lendo sheet do arquivo:
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx") df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")
Renomeando colunas:
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "tipo", "sku", "meses", "desviador", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"]
Tratando os valores missing:
objs = df_sell_in_deca.select_dtypes(include = "object").columns df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)
Transformando em dataframe do Spark:
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)
Sobrescrevendo a tabela final na database indicadores_mercado:
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_chuveiros")
15.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
tipo | string |
sku | string |
meses | timestamp |
desviador | string |
linha | string |
tensao | string |
potencia | string |
modelo | string |
ean | string |
valor | double |
volume | double |
16. Indicadores Sell-in Deca - Cubas
Indicadores de Sell-in para Deca.
Notebook: indicadores_sell_in_deca_cubas
Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_cubas
16.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx.
16.2 Transformação:
Lendo sheet do arquivo:
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx") df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")
Renomeando colunas:
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "aplicacao", "instalacao", "sku", "meses", "formato", "cor", "acabamento", "ean", "valor", "volume"]
Tratando os valores missing:
objs = df_sell_in_deca.select_dtypes(include = "object").columns df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)
Transformando em dataframe do Spark:
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)
Sobrescrevendo a tabela final na database indicadores_mercado:
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_cubas")
16.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
aplicacao | string |
instalacao | string |
sku | string |
meses | timestamp |
formato | string |
cor | string |
acabamento | string |
ean | string |
valor | double |
volume | double |
17. Indicadores Sell-in Deca - Torneiras
Indicadores de Sell-in para Deca.
Notebook: indicadores_sell_in_deca_torneiras
Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_torneiras
17.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx.
17.2 Transformação:
Lendo sheet do arquivo:
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx") df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")
Renomeando colunas:
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "subcategoria", "aplicacao", "instalacao", "sku", "meses", "bica", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"]
Tratando os valores missing:
objs = df_sell_in_deca.select_dtypes(include = "object").columns df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)
Transformando em dataframe do Spark:
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)
Sobrescrevendo a tabela final na database indicadores_mercado:
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_torneiras")
17.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
subcategoria | string |
aplicacao | string |
instalacao | string |
sku | string |
meses | timestamp |
bica | string |
linha | string |
tensao | string |
potencia | string |
modelo | string |
ean | string |
valor | double |
volume | double |
18. Indicadores Market Share, Sell-in e Sell-out Madeira
Indicadores para Market Share, Sell-in e Sell-out de Madeira.
Notebook: indicadores_share_sell_in_sell_out_madeira
Job: inteligencia-mercado_job_prod_indicadores_share_sell_in_sell_out_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_share_sell_in_sell_out_madeira
18.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e view vw_base_mktshare_sellin_sellout.
18.2 Transformação:
Fazendo uma query com um select de todos os campos da tabela origem:
query = "select * from madeira.vw_base_mktshare_sellin_sellout" multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings() bucket_name = multiple_run_parameters["bucket_name"] consulta_madeira = redshift_to_dataframe(query = query, filename = "vw_base_mktshare_sellin_sellout", bucket_name = bucket_name)
Renomeando as colunas:
olunas_madeira = ["tipo", "ano", "competencia", "mercado", "produto", "produto_detalhe", "volume_m3", "volume_m2", "volume_m3_cap_dtx", "volume_m3_cap_mercex", "data_atualizacao_base", "produto_segmento", "quantidade_volume_m3_liquido", "quantidade_volume_m2"] df_madeira = consulta_madeira.toDF(*colunas_madeira)
Ajustando tipos dos campos:
df_madeira = df_madeira.withColumn("ano", col("ano").cast(IntegerType())).withColumn("competencia", col("competencia").cast(IntegerType())).withColumn("volume_m3", col("volume_m3").cast(DoubleType())).withColumn("volume_m2", col("volume_m2").cast(DoubleType())).withColumn("volume_m3_cap_dtx", col("volume_m3_cap_dtx").cast(DoubleType())).withColumn("volume_m3_cap_mercex", col("volume_m3_cap_mercex").cast(DoubleType()))
Sobrescrevendo a tabela final na database indicadores_mercado:
df_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_share_sell_in_sell_out_madeira")
18.3 Base Final:
col_name | data_type |
---|---|
tipo | string |
ano | int |
competencia | int |
mercado | string |
produto | string |
produto_detalhe | string |
volume_m3 | double |
volume_m2 | double |
volume_m3_cap_dtx | double |
volume_m3_cap_mercex | double |
data_atualizacao_base | date |
produto_segmento | string |
quantidade_volume_m3_liquido | double |
quantidade_volume_m2 | double |
19. Indicadores Vendas Deca
Indicadores para Vendas de Deca.
Notebook: indicadores_venda_deca
Job: inteligencia-mercado_job_prod_indicadores_venda_deca
Schedule: diário/10:00h
Base fim: indicadores_mercado.tb_indicadores_vendas_deca
19.1 Origem:
Consulta da tabela tb_resultado_comercial da database large e consulta database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema large e tabela tb_metas_comercial_hierarquia_produto.
19.2 Transformação:
Definindo função para criação de dataframe com range de datas, desde 01/01/2022 até hoje.
for negocio in negocios: df = pd.date_range(start = "2022-01-01", end = date.today()) df = pd.DataFrame(df, columns = ["data"]) df = spark.createDataFrame(df) df = df.withColumn("data", to_date("data", 'yyyy-MM-dd')).withColumn("mes", to_date(date_format("data", 'yyyy-MM-01'))) df = df.withColumn("negocio", lit(negocio)) if negocio == negocios[0]: df_datas = df else: df_datas = df_datas.union(df)
Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
df_datas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_datas")
Fazendo consulta para vendas na large, já aplicando algumas regras de negócio:
df_vendas = spark.sql(""" select t1.data_competencia as data, to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes, (case when t1.codigo_setor_atividade = 'CS' then 'loucas' when t1.codigo_setor_atividade = 'MS' then 'metais' when t1.codigo_setor_atividade = '01' then 'revestimento' when t1.codigo_setor_atividade = 'HY' then 'hydra' end) as negocio, round(sum(t1.valor_receita_liquida), 2) as receita_liquida_vendas from large.tb_resultado_comercial t1 where t1.codigo_setor_atividade in("CS", "MS", "HY", "01") and t1.status_ordem_venda in("EXPORTAÇÃO", "VENDA", "DEVOLUÇÃO", "CANCELAMENTO", "CINI") and t1.data_competencia between '2022-01-01' and (current_date() - 1) group by 1, 2, 3 order by 3, 2, 1 asc """)
Fazendo join entre o dataframe de datas e o de vendas, com o objetivo de agregar todos os dias do range na base, mesmo que sem registro de vendas:
df_vendas_ajustado = df_datas.join(df_vendas, ["data", "mes", "negocio"], "left")
Definindo um particionamento no dataframe df_vendas_ajustado, com o objetivo de posteriormente fazer uma soma acumuladas das vendas agrupada por negócio e mês:
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0))
Criando campo com soma acumulada, utilizando a partição criada anteriormente:
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part))
Criando campo com soma acumulada, utilizando a partição criada anteriormente:
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part))
Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
df_vendas_acumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_vendas_diarias_deca")
Então, é feita uma query, já aplicando algumas regras de negócio, na large do Redshift, com o objetivo de obter as metas:
query = "select data_competencia as mes, (case when codigo_setor_atividade = 'CS' then 'loucas' when codigo_setor_atividade = 'MS' then 'metais' when codigo_setor_atividade = '01' then 'revestimento' when codigo_setor_atividade = 'HY' then 'hydra' end) as negocio, tipo_meta, sum(valor_receita_liquida) as valor_meta from large.tb_metas_comercial_hierarquia_produto where data_competencia >= '2021-01-01' and codigo_setor_atividade in('CS', 'HY', 'MS', '01') and tipo_meta in('POA', 'PEV') group by 1, 2, 3 order by 1, 2, 3" multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings() bucket_name = multiple_run_parameters["bucket_name"] consulta_metas = redshift_to_dataframe(query = query, filename = "tb_metas_comercial_hierarquia_produto", bucket_name = bucket_name)
Renomeando colunas:
colunas_metas = ["mes", "negocio", "tipo_meta", "valor_meta_mes"] df_metas = consulta_metas.toDF(*colunas_metas)
Próximo passo é definir uma função para cálculo de quantidade de dias dentro de cada mês, com o objetivo de posteriormente calcular a meta diárias através da mensal:
def numero_dias(data): mes = data.month ano = data.year #tratativa caso fevereiro (ano bissexto) cons = 0 #todo ano divisível por 400 e ao mesmo tempo 100 é bissexto if ano % 400 == 0 and ano % 100 == 0: cons = 1 #todo ano divisível por 4 é bissexto elif ano % 4 == 0: cons = 1 else: cons = 0 if mes == 2: return 28 + cons #meses com quantidade ímpar de dias impares = [1, 3, 5, 7, 8, 10, 12] if mes in impares: return 31 return 30
Aplicando a função e calculando a meta diária:
numero_dias_udf = udf(lambda z: numero_dias(z), IntegerType()) df_metas = df_metas.withColumn("valor_meta_diaria", (col("valor_meta_mes") / col("numero_dias_mes")))
Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
df_metas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_metas_mensais_deca")
Criando colunas para cada tipo de meta, PEV e POA:
df_metas_pev = df_metas.where(col("tipo_meta") == "PEV").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_pev").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_pev", functions.round("valor_meta_diaria_pev", 2)) df_metas_poa = df_metas.where(col("tipo_meta") == "POA").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_poa").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_poa", functions.round("valor_meta_diaria_poa", 2))
Criando dataframe unindo as duas metas:
df_metas_final = df_metas_pev.join(df_metas_poa, ["mes", "negocio"])
Fazendo join entre a base de datas e a de metas:
df_datas_ajuste = spark.sql("select * from indicadores_mercado.tb_datas") df_metas_ajustado = df_datas_ajuste.join(df_metas_final, ["mes", "negocio"], "left")
Por fim, criando a tabela final. Fazendo query na database para vendas e join com o dataframe de metas:
df_vendas_metas_deca = df_vendas.join(df_metas_ajustado, ["data", "mes", "negocio"])
Criando particionamento, para posteriormente calcular as metas acumuladas por mês e negócio:
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0))
Criando as metas acumuladas utilizando o particionamento:
df_vendas_metas_deca_acumuladas = df_vendas_metas_deca.withColumn("metas_acumuladas_pev", functions.sum("valor_meta_diaria_pev").over(part)).withColumn("metas_acumuladas_poa", functions.sum("valor_meta_diaria_poa").over(part))
Sobrescrevendo a tabela final na database indicadores_mercado:
df_vendas_metas_deca_acumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_vendas_deca")
19.3 Base Final:
col_name | data_type |
---|---|
data | date |
mes | date |
negocio | string |
receita_liquida_vendas | double |
receita_liquida_vendas_acumuladas | double |
valor_meta_diaria_pev | double |
valor_meta_diaria_poa | double |
metas_acumuladas_pev | double |
metas_acumuladas_poa | double |
Add Comment