Conjunto de notebooks que tem como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para serem consumidos pelo dashboard desenvolvido para o projeto.
Índice
1. Indicadores Carteira Deca
Indicadores para pedidos em carteira Deca.
Notebook: indicadores_carteira_deca
Job: inteligencia-mercado_job_prod_indicadores_carteira_deca
Schedule: diário/18:00h
Base fim: indicadores_mercado.tb_indicadores_carteira_deca
1.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema small e tabela tb_ordem_pendente.
1.2 Transformação:
Fazendo uma query com um select de todos os campos da tabela origem:
query = "select * from small.tb_ordem_pendente"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
pedidos_carteira = redshift_to_dataframe(query = query, filename = "tb_ordem_pendente", bucket_name = bucket_name)Renomeando as colunas:
colunas_pedidos_carteira = ["codigo_empresa", "numero_ordem_venda", "numero_sequencia_item_ordem_venda", "codigo_organizacao_vendas", "codigo_canal_distribuicao", "codigo_setor_atividade", "codigo_escritorio_vendas", "codigo_equipe_vendas", "codigo_emissor_ordem", "data_primeira_remessa", "data_emissao", "data_pedido_cliente", "motivo_recusa", "tipo_documento_ordem_venda", "quantidade_itens", "quantidade_faturada_ordem", "quantidade_pendente", "valor_liquido", "valor_faturado_ordem", "valor_pendente", "status_faturamento", "bloqueio_remessa_cliente", "status_verificacao_credito", "status_carteira", "codigo_produto", "descricao_produto", "data_atualizacao", "remessa", "data_desejada_remessa", "status_recusa", "status_item", "codigo_centro"]
df_pedidos_carteira = pedidos_carteira.toDF(*colunas_pedidos_carteira)Utilizando Spark SQL para fazer alguns ajustes e aplicar algumas regras de negócio:
df_carteira = spark.sql("""
select
to_date(data_atualizacao) as data_referencia,
(case when codigo_setor_atividade == 'HY' then 'hydra'
when codigo_setor_atividade == 'MS' then 'metais'
when codigo_setor_atividade == 'CS' then 'loucas' end) as negocio,
sum(case when status_carteira in('Bloqueio Adm.', 'Credito', 'Limbo', 'Limbo Programado', 'Não classificado') then valor_pendente else 0 end) as valor_bloqueado,
sum(case when status_carteira in('Programado', 'Remetido') then valor_pendente else 0 end) as valor_livre
from pedidos_carteira
where to_date(data_atualizacao) == current_date()
and codigo_setor_atividade in('CS', 'HY', 'MS')
group by 1, 2
""")Fazendo append na tabela final na database indicadores_mercado:
df_carteira.write.mode("append").saveAsTable("indicadores_mercado.tb_indicadores_carteira_deca")1.3 Base Final:
col_name | data_type |
---|---|
data_referencia | date |
negocio | string |
valor_bloqueado | double |
valor_livre | double |
2. Indicadores Devolução Deca
Indicadores para pedidos em status de devolução para Deca.
Notebook: indicadores_devolucao_deca
Job: inteligencia-mercado_job_prod_indicadores_devolucao_deca
Schedule: diário/14:00h
Base fim: indicadores_mercado.tb_indicadores_devolucoes_deca
2.1 Origem:
Database large e tabela tb_resultado_comercial.
2.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
df_devolucoes_deca = spark.sql("""
select distinct
to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes,
(case when codigo_setor_atividade == 'MS' then 'metais'
when codigo_setor_atividade == 'CS' then 'loucas'
when codigo_setor_atividade == 'HY' then 'hydra'
when codigo_setor_atividade == '01' then 'revestimento' end) as negocio,
sum(case when tipo_documento_venda == 'S2' then valor_receita_liquida else 0 end) as valor_estorno,
sum(case when status_ordem_venda == 'DEVOLUÇÃO' then abs(valor_receita_liquida) else 0 end) as valor_devolucao,
sum(case when status_ordem_venda == 'FATURAMENTO' then valor_receita_liquida else 0 end) as valor_faturamento
from large.tb_resultado_comercial
where data_competencia between '2019-01-01' and current_date() - 1
and codigo_setor_atividade in('MS', 'CS', 'HY', '01')
group by 1, 2
order by 1, 2 asc
""")Calculando o valor de devolução ajustado, conforme regra de negócio:
df_devolucoes_deca = df_devolucoes_deca.withColumn("valor_devolucao_ajustado", col("valor_devolucao") - col("valor_estorno"))Sobrescrevendo a tabela final na database indicadores_mercado:
df_devolucoes_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_deca")2.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | double |
valor_devolucao | double |
3. Indicadores Devolução Madeira
Indicadores para pedidos em status de devolução para Madeira.
Notebook: indicadores_devolucao_madeira
Job: inteligencia-mercado_job_prod_indicadores_devolucao_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_devolucoes_madeira
3.1 Origem:
Database analytics_prd e tabela custos_rem.
3.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
df_devolucao_madeira = spark.sql("""
select
perio as mes,
(case when prctr like '%MDP' then 'mdp'
when prctr like '%MDF' then 'mdf' else 'paineis' end) as negocio,
sum(case when fkart in('ZREB', 'ZROB') then abs((vv089) - (vv001 + vv002 + vv003 + vv004)) else 0 end) as valor_devolucao,
sum(case when fkart not in('ZREB', 'ZROB') then (vv089) - (vv001 + vv002 + vv003 + vv004) else 0 end) as valor_faturado
from analytics_prd.custos_rem
where spart == 'CH'
and (prctr like '%MDP' or prctr like '%MDF')
and perio >= 2019001
group by 1, 2
order by 1, 2
""")Sobrescrevendo a tabela final na database indicadores_mercado:
df_devolucao_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_madeira")3.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | decimal(29,2) |
valor_devolucao | decimal(29,2) |
4. Indicadores Financeiros
Indicadores financeiros disponibilizados pela Controladoria.
Notebook: indicadores_financeiros
Job: inteligencia-mercado_job_prod_indicadores_financeiros
Schedule: diário/15:00h
Base fim: indicadores_mercado.tb_indicadores_financeiros_2022
4.1 Origem:
A Controladoria atualiza mensalmente alguns arquivos Excel disponibilizados em no Sharepoint Relatório Controladoria.
4.2 Transformação:
Foi desenvolvida uma função para fazer o download desses arquivos e gravar no diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_financeiros/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content)Fazendo o download dos arquivos:
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fechamento%20Gerencial%202022.xlsm", "Fechamento_Gerencial_2022.xlsm")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fluxo%20de%20Caixa%20Oficial%202022.xlsx", "Fluxo_de_Caixa_Oficial_2022.xlsx")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20Consolidador.xlsx", "Forecast_3_9_2022.xlsx")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20-%20Fluxo%20de%20Caixa%20Livre.xlsx", "FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx")4.2.1 Arquivo Fechamento_Gerencial_2022.xlsm
4.2.1.1 Indicadores do DRE
Definindo função para Indicadores do DRE:
#parâmetros da função: range de células e negócio("madeira", "deca", "revestimento" ou "consolidado")
def ler_dre(celulas, negocio):
#lendo range de células do excel utilizando o pandas.read_excel(arquivo, sheet, linhas desconsideradas, número de linhas, range células, coluna index)
#transpondo a tabela utilizando .transpose()
#selecionando colunas utilizando .iloc[]
#resetando index
df = pd.read_excel(fechamento_gerencial, "DRE CONSOL", skiprows = 1, nrows = 146, usecols = celulas, index_col = 0).transpose().iloc[:, [6, 127]].reset_index()
#renomeando colunas
df.columns = dre_header
#transformando em dataframe do spark
df = spark.createDataFrame(df)
#criando coluna "negocio"
df = df.selectExpr(dre_header[0], "'{}' as negocio".format(negocio), dre_header[1], dre_header[2])
#tratando coluna "mes"
df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual)
.when(df.mes.like("FEV%"), "01/02/" + ano_atual)
.when(df.mes.like("MAR%"), "01/03/" + ano_atual)
.when(df.mes.like("ABR%"), "01/04/" + ano_atual)
.when(df.mes.like("MAI%"), "01/05/" + ano_atual)
.when(df.mes.like("JUN%"), "01/06/" + ano_atual)
.when(df.mes.like("JUL%"), "01/07/" + ano_atual)
.when(df.mes.like("AGO%"), "01/08/" + ano_atual)
.when(df.mes.like("SET%"), "01/09/" + ano_atual)
.when(df.mes.like("OUT%"), "01/10/" + ano_atual)
.when(df.mes.like("NOV%"), "01/11/" + ano_atual)
.when(df.mes.like("DEZ%"), "01/12/" + ano_atual)
.otherwise(None))
df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
return dfAplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_dre_madeira = ler_dre("EL:EX", "madeira")
df_dre_deca = ler_dre("B:N", "deca")
df_dre_revestimento = ler_dre("HN:HZ", "revestimento")
df_dre_consolidado = ler_dre("GT:HF", "consolidado")Fazendo união de todos os dataframes gerados:
df_dre = df_dre_madeira.union(df_dre_deca).union(df_dre_revestimento).union(df_dre_consolidado)4.2.1.2 Indicadores do EVA
Definindo função para Indicadores do EVA, lendo a sheet EVA do arquivo:
#indicadores de valor agregado:
def ler_eva(celulas, negocio):
df = pd.read_excel(fechamento_gerencial, "EVA", skiprows = 2, nrows = 50, usecols = celulas, index_col = 0).transpose().iloc[1:, 18].reset_index()
df.columns = eva_header
df = spark.createDataFrame(df)
df = df.selectExpr(eva_header[0], "'{}' as negocio".format(negocio), eva_header[1])
df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual)
.when(df.mes.like("FEV%"), "01/02/" + ano_atual)
.when(df.mes.like("MAR%"), "01/03/" + ano_atual)
.when(df.mes.like("ABR%"), "01/04/" + ano_atual)
.when(df.mes.like("MAI%"), "01/05/" + ano_atual)
.when(df.mes.like("JUN%"), "01/06/" + ano_atual)
.when(df.mes.like("JUL%"), "01/07/" + ano_atual)
.when(df.mes.like("AGO%"), "01/08/" + ano_atual)
.when(df.mes.like("SET%"), "01/09/" + ano_atual)
.when(df.mes.like("OUT%"), "01/10/" + ano_atual)
.when(df.mes.like("NOV%"), "01/11/" + ano_atual)
.when(df.mes.like("DEZ%"), "01/12/" + ano_atual)
.otherwise(None))
df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
return dfAplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_eva_madeira = ler_eva("GG:GT", "madeira")
df_eva_deca = ler_eva("BA:BN", "deca")
df_eva_revestimento = ler_eva("EY:FL", "revestimento")
df_eva_consolidado = ler_eva("FP:GC", "consolidado")Fazendo união de todos os dataframes gerados e ajustando a ordem de grandeza do campo eva_recorrente:
df_eva = df_eva_madeira.union(df_eva_deca).union(df_eva_revestimento).union(df_eva_consolidado).withColumn("eva_recorrente", col("eva_recorrente")/1000)4.2.2 Arquivo Fluxo_de_Caixa_Oficial_2022.xlsx
4.2.2.1 Indicadores de Fluxo de Caixa
Definindo função para Indicadores de Fluxo de Caixa, lendo a sheet F.C. REAL do arquivo:
def ler_fluxo_caixa(celulas, negocio):
df = pd.read_excel(fluxo_caixa, "F.C. REAL", skiprows = 1, nrows = 86, usecols = celulas, index_col = 0).transpose().iloc[1:, 54].reset_index()
df.columns = fc_header
df = spark.createDataFrame(df)
df = df.selectExpr(fc_header[0], "'{}' as negocio".format(negocio), fc_header[1])
df = df.withColumn("mes", to_date("mes"))
return dfAplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_fc_madeira = ler_fluxo_caixa("FN:GA", "madeira")
df_fc_deca = ler_fluxo_caixa("BM:BZ", "deca")
df_fc_revestimento = ler_fluxo_caixa("IT:JG", "revestimento")
df_fc_consolidado = ler_fluxo_caixa("KJ:KW", "consolidado")Fazendo união de todos os dataframes gerados:
df_fc = df_fc_madeira.union(df_fc_deca).union(df_fc_revestimento).union(df_fc_consolidado)4.2.2.2 Indicadores de PMP
Definindo função para Indicadores de PMP, lendo a sheet CGL.ROL do arquivo:
def ler_pmp(celulas, negocio):
df = pd.read_excel(fluxo_caixa, "CGL.ROL", skiprows = 1, nrows = 50, usecols = celulas, index_col = 0).transpose().iloc[1:, 26].reset_index()
df.columns = pmp_header
df = spark.createDataFrame(df)
df = df.selectExpr(pmp_header[0], "'{}' as negocio".format(negocio), pmp_header[1])
df = df.withColumn("mes", to_date("mes"))
return dfAplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_pmp_madeira = ler_pmp("DR:EE", "madeira")
df_pmp_deca = ler_pmp("AU:BH", "deca")
df_pmp_revestimento = ler_pmp("FZ:GM", "revestimento")
df_pmp_consolidado = ler_pmp("GO:HB", "consolidado")Fazendo união de todos os dataframes gerados:
df_pmp = df_pmp_madeira.union(df_pmp_deca).union(df_pmp_revestimento).union(df_pmp_consolidado)4.2.3 Arquivo Forecast_3_9_2022.xlsx
4.2.3.1 Indicadores de Forecast
Definindo função para Indicadores de Forecast:
def ler_forecast(sheet, celulas, negocio):
df = pd.read_excel(forecast, sheet, skiprows = 3, nrows = 83, usecols = celulas, index_col = 0).transpose().iloc[:, [6, 47]].reset_index()
df.columns = frc_header
df = spark.createDataFrame(df)
df = df.selectExpr(frc_header[0], "'{}' as negocio".format(negocio), frc_header[1], frc_header[2])
df = df.withColumn("mes", to_date("mes"))
return dfAplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_frc_dre_madeira = ler_forecast("DRE_MAD", "A:M", "madeira")
df_frc_dre_deca = ler_forecast("DRE_DEC", "A:M", "deca")
df_frc_dre_revestimento = ler_forecast("DRE_REVEST", "A:M", "revestimento")
df_frc_dre_consolidado = ler_forecast("DRE_CONSOL", "A:M", "consolidado")Fazendo união de todos os dataframes gerados:
df_frc = df_frc_dre_madeira.union(df_frc_dre_deca).union(df_frc_dre_revestimento).union(df_frc_dre_consolidado)4.2.3.2 Indicadores de Forecast EVA
Definindo função para Indicadores de Forecast EVA, lendo a sheet Base EVA ROIC:
def ler_forecast_eva(negocio):
#desinindo constantantes para cada negócio para utilizar posteriormente o iloc[] e localizar os dados necessários
#esses valores pra neg são como os negócios são identificados no arquivo
if negocio == "madeira":
neg = "Mad. Total"
elif negocio == "deca":
neg = "Deca"
elif negocio == "revestimento":
neg = "RC"
elif negocio == "consolidado":
neg = "Consol"
df = pd.read_excel(forecast, "Base EVA ROIC", skiprows = 4, nrows = 130, usecols = "B:Q")
#buscando a linha em que a coluna 0 é igual ao negócio, coluna 1 igual a "Mês", coluna 2 igual a "RECORRENTE" e coluna 4 igual a "EVA"
df = df.where(
(df.iloc[:, 0] == neg) &
(df.iloc[:, 1] == "Mês") &
(df.iloc[:, 2] == "RECORRENTE") &
(df.iloc[:, 3] == "EVA")
).dropna()
df = df.transpose().iloc[4: ].reset_index()
df.columns = frc_eva_header
df = spark.createDataFrame(df)
df = df.selectExpr(frc_eva_header[0], "'{}' as negocio".format(negocio), frc_eva_header[1])
df = df.withColumn("mes", when(df.mes == 1, "01/01/" + ano_atual)
.when(df.mes == 2, "01/02/" + ano_atual)
.when(df.mes == 3, "01/03/" + ano_atual)
.when(df.mes == 4, "01/04/" + ano_atual)
.when(df.mes == 5, "01/05/" + ano_atual)
.when(df.mes == 6, "01/06/" + ano_atual)
.when(df.mes == 7, "01/07/" + ano_atual)
.when(df.mes == 8, "01/08/" + ano_atual)
.when(df.mes == 9, "01/09/" + ano_atual)
.when(df.mes == 10, "01/10/" + ano_atual)
.when(df.mes == 11, "01/11/" + ano_atual)
.when(df.mes == 12, "01/12/" + ano_atual)
.otherwise(None))
df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
return dfAplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_frc_eva_madeira = ler_forecast_eva("madeira")
df_frc_eva_deca = ler_forecast_eva("deca")
df_frc_eva_revestimento = ler_forecast_eva("revestimento")
df_frc_eva_consolidado = ler_forecast_eva("consolidado")Fazendo união de todos os dataframes gerados e ajustando a ordem de grandeza do campo frc_eva_recorrente:
df_frc_eva = df_frc_eva_madeira.union(df_frc_eva_deca).union(df_frc_eva_revestimento).union(df_frc_eva_consolidado).withColumn("frc_eva_recorrente", col("frc_eva_recorrente")/1000)4.2.4 Arquivo FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx
4.2.4.1 Indicadores Forecast Fluxo de Caixa
Definindo função para Indicadores de Forecast fluxo de Caixa, lendo a sheet F.C. FCST:
def ler_fcf(celulas, negocio):
df = pd.read_excel(fcf, "F.C. FCST", skiprows = 1, nrows = 87, usecols = celulas, index_col = 0).transpose().iloc[1:, 54].reset_index()
df.columns = fcf_header
df = spark.createDataFrame(df)
df = df.selectExpr(fcf_header[0], "'{}' as negocio".format(negocio), fcf_header[1])
df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual)
.when(df.mes.like("FEV%"), "01/02/" + ano_atual)
.when(df.mes.like("MAR%"), "01/03/" + ano_atual)
.when(df.mes.like("ABR%"), "01/04/" + ano_atual)
.when(df.mes.like("MAI%"), "01/05/" + ano_atual)
.when(df.mes.like("JUN%"), "01/06/" + ano_atual)
.when(df.mes.like("JUL%"), "01/07/" + ano_atual)
.when(df.mes.like("AGO%"), "01/08/" + ano_atual)
.when(df.mes.like("SET%"), "01/09/" + ano_atual)
.when(df.mes.like("OUT%"), "01/10/" + ano_atual)
.when(df.mes.like("NOV%"), "01/11/" + ano_atual)
.when(df.mes.like("DEZ%"), "01/12/" + ano_atual)
.otherwise(None))
df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
return dfAplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:
df_fcf_madeira = ler_fcf("FN:GA", "madeira")
df_fcf_deca = ler_fcf("BM:BZ", "deca")
df_fcf_revestimento = ler_fcf("HD:HQ", "revestimento")
df_fcf_consolidado = ler_fcf("IT:JG", "consolidado")Fazendo união de todos os dataframes gerados:
df_frc_fc = df_fcf_madeira.union(df_fcf_deca).union(df_fcf_revestimento).union(df_fcf_consolidado)4.2.5 Arquivo indicadores_poa_2022.csv
Foi feito o upload desse arquivo diretamente no dbfs do Databricks:
df_poa = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_financeiros/indicadores_poa_2022.csv")Tratando alguns tipos de campos e ajustando a ordem de grandeza de outro:
df_poa = df_poa.select(
"mes",
"negocio",
col("poa_receita_liquida_vendas").cast(DoubleType()),
col("poa_ebitda_recorrente").cast(DoubleType()),
col("poa_eva_recorrente").cast(DoubleType()),
col("poa_fluxo_caixa_livre_total").cast(DoubleType())
).withColumn("poa_eva_recorrente", col("poa_eva_recorrente")/1000).withColumn("poa_fluxo_caixa_livre_total", col("poa_fluxo_caixa_livre_total")/1000)4.2.6 Base Final
Todos os dataframes gerados nos passos anteriores agora são unidos para formar um único, utilizando os campos mes e negocio:
campos_join = ["mes", "negocio"]
df_indicadores_financeiros = df_dre.join(
df_eva, campos_join
).join(
df_fc, campos_join
).join(
df_pmp, campos_join
).join(
df_frc, campos_join
).join(
df_frc_eva, campos_join
).join(
df_frc_fc, campos_join
).join(
df_poa, campos_join
).orderBy("negocio", "mes")Sobrescrevendo a tabela final no database :
df_indicadores_financeiros.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_financeiros_{}".format(ano_atual))4.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
receita_liquida_vendas | double |
ebitda_recorrente | double |
eva_recorrente | double |
fluxo_caixa_livre_total | double |
pmp | double |
frc_receita_liquida_vendas | double |
frc_ebitda_recorrente | double |
frc_eva_recorrente | double |
frc_fluxo_caixa_livre_total | double |
poa_receita_liquida_vendas | double |
poa_ebitda_recorrente | double |
poa_eva_recorrente | double |
poa_fluxo_caixa_livre_total | double |
5. Indicadores Gente
Indicadores de Recursos Humanos.
Notebook: indicadores_gente
Job: inteligencia-mercado_job_prod_indicadores_gente
Schedule: sextas-feiras, 14:00h
Base fim: indicadores_mercado.tb_indicadores_gente
5.1 Origem:
O arquivo indicadores_gente.csv é disponibilizado e atualizado mensalmente no Sharepoint.
5.2 Transformação:
Definindo função para o download do arquivo para o dbfs:
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content)Lendo o arquivo:
df_gente = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/indicadores_gente.csv")Aplicando regra de negócio para o campo taxa_afastamento:
df_gente = df_gente.withColumn("taxa_afastamento", col("taxa_absenteismo_com_afastamento") - col("taxa_absenteismo_sem_afastamento"))Sobrescrevendo a tabela final na database indicadores_mercado:
df_gente.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_gente")5.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
quantidade_cargos_lideranca_mulheres | int |
quantidade_cargos_lideranca_total | int |
taxa_desligamentos_voluntarios | double |
taxa_desligamentos_involuntarios | double |
taxa_desligamentos_total | double |
taxa_absenteismo_sem_afastamento | double |
taxa_absenteismo_com_afastamento | double |
taxa_afastamento | double |
7. Indicadores Margem Madeira
Indicadores de Margem para Madeira.
Notebook: indicadores_margem_madeira
Job: inteligencia-mercado_job_prod_indicadores_margem_madeira
Schedule: diário/14:22h
Base fim: indicadores_mercado.tb_indicadores_margem_madeira
7.1 Origem:
É feita uma consulta na fonte de dados vw_ren_rateio_aj_SQL, no site Madeira do Tableau Server:
7.2 Transformação:
São utilizadas as bibliotecas tableauserverclient e tableauhyperapi do Python:
import tableauserverclient as tsc
import tableauhyperapi
from tableauhyperapi import HyperProcess, Telemetry, Connection, TableName, escape_name, escape_string_literalDefinindo variáveis para conexão no Tableau Server:
usuario = "***"
senha = "***"
site = "Madeira"
servidor = "https://analytics.duratex.com.br"
id_datasource = "e88f21c1-bb2f-4516-b612-b62e8af74b94"
nome_database = "vw_ren_rateio_aj_SQL"
diretorio_download = "/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/"Definindo parâmetros da conexão com o Tableau Server:
autenticador = tsc.TableauAuth(usuario, senha, site)
servidor = tsc.Server(servidor)Fazendo o download do arquivo .tdsx:
with servidor.auth.sign_in(autenticador):
caminho_arquivo = servidor.datasources.download(id_datasource, filepath = diretorio_download, include_extract = True)O arquivo de extensão .tdsx nada mais é que uma compactação dos arquivos de fonte de dados do Tableau. Portanto foi utilizada a biblioteca zipfile do Python para fazer essa descompactação em diretório do dbfs:
with zipfile.ZipFile(diretorio_download + nome_database + ".tdsx", "r") as arquivo_zipado:
arquivo_zipado.extractall(diretorio_download)Após a descompactação, é utilizada a biblioteca tableauhyperapi para extrair os dados do arquivo .hyper, que foi descompactado no diretório /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Data/Extracts/ no processo anterior. É feita uma query para trazer apenas os campos necessários:
with HyperProcess(telemetry = Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
with Connection(endpoint = hyper.endpoint, database = diretorio_download + "Data/Extracts/" + nome_database + ".hyper") as conexao:
with conexao.execute_query(query = f"select {escape_name('cd_competencia')}, {escape_name('dc_linha_produto')}, {escape_name('gerencia')}, {escape_name('vl_receita_aj')}, {escape_name('md_m3_quantidade')}, {escape_name('vl_custo_industrial_total_CIT')}, {escape_name('vl_custo_comercial_total_CCO')} from {TableName('Extract', 'Extract')} where {escape_name('cd_setor_atividade')} = {escape_string_literal('CH')} and {escape_name('cd_competencia')} >= 202101 and {escape_name('gerencia')} = {escape_string_literal('MERCADO EXTERNO')}") as resultado:
resultado_consulta = list(resultado)Definindo schema para o dataframe que será criado a partir da lista resultado_consulta:
esquema_margem = StructType([
StructField('mes', StringType(), True),
StructField('produto', StringType(), True),
StructField('mercado', StringType(), True),
StructField('receita', DoubleType(), True),
StructField('quantidade', DoubleType(), True),
StructField('valor_cit', DoubleType(), True),
StructField('valor_cco', DoubleType(), True)
])Criando o dataframe:
df_margem = spark.createDataFrame(resultado_consulta, esquema_margem)Utilizando o Spark SQL para aplicar algumas regras de negócio:
df_margem_calculos = spark.sql("""
select
mes,
(case when produto like 'MDF%' then 'mdf' when produto like 'MDP%' then 'mdp' else 'outros' end) as produto,
(sum(receita) / sum(quantidade)) as receita_m3,
(sum(valor_cit) / sum(quantidade)) as cit_m3,
(sum(valor_cco) / sum(quantidade)) as cco_m3
from margem_view
group by 1, 2
order by 1, 2 asc
""")Aplicando mais algumas regras de negócio:
df_margem_calculos = df_margem_calculos.withColumn("mop_m3", col("receita_m3") - col("cit_m3") - col("cco_m3")).withColumn("percentual_margem", col("mop_m3") / col("receita_m3"))Sobrescrevendo a tabela final na database indicadores_mercado:
df_margem_calculos.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_margem_madeira")7.3 Base Final:
col_name | data_type |
---|---|
mes | string |
produto | string |
receita_m3 | double |
cit_m3 | double |
cco_m3 | double |
mop_m3 | double |
9. Indicadores OEE Madeira
Indicadores de OEE para Madeira.
Notebook: indicadores_oee_madeira
Job: inteligencia-mercado_job_prod_indicadores_oee_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_oee_madeira
9.1 Origem:
O arquivo indicadores_oee_madeira.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.
9.2 Transformação:
Definindo função para ler o arquivo:
def extrair_excel(arquivo_leitura, nome_sheet, celulas, colunas, pular_linhas):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_leitura)
response.raise_for_status()
df = pd.read_excel(response.content, skiprows = pular_linhas, usecols = celulas, sheet_name = nome_sheet, names = colunas)
df = spark.createDataFrame(df)
return dfLendo a sheet cru do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas. Utilizando o while e a função isinstance() para repetir o processo até que o dataframe esteja criado:
df_oee_cru = []
while not isinstance(df_oee_cru, DataFrame):
try:
df_oee_cru = extrair_excel(arquivo, "cru", "B:D", ["mes", "oee", "oee_meta_global"], 0)
except:
passDefinindo o nome do produto como constante:
df_oee_cru = df_oee_cru.withColumn("produto", lit("cru"))Lendo a sheet revestido do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:
df_oee_revestido = []
while not isinstance(df_oee_revestido, DataFrame):
try:
df_oee_revestido = extrair_excel(arquivo, "revestido", "B:D", ["mes", "oee", "oee_meta_global"], 0)
except:
passDefinindo o nome do produto como constante:
df_oee_revestido = df_oee_revestido.withColumn("produto", lit("revestido"))Lendo a sheet historico do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:
df_oee_historico = []
while not isinstance(df_oee_historico, DataFrame):
try:
df_oee_historico = extrair_excel(arquivo, "historico", "B:E", ["mes", "oee", "oee_meta_global", "produto"], 0)
except:
passFazendo a união de todos os dataframes criados:
df_oee_madeira = df_oee_historico.union(df_oee_cru).union(df_oee_revestido)Sobrescrevendo a tabela final na database indicadores_mercado:
df_oee_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_oee_madeira")9.3 Base Final:
col_name | data_type |
---|---|
mes | date |
oee | double |
oee_meta_global | double |
produto | string |
10. Indicadores OTIF Deca
Indicadores de OTIF para Deca.
Notebook: indicadores_otif_deca
Job: inteligencia-mercado_job_prod_indicadores_otif_deca
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_otif_deca
10.1 Origem:
O arquivo dados_otif_deca.csv é disponibilizado e atualizado mensalmente no Sharepoint.
10.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content)Lendo arquivo:
df_otif_deca = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_deca.csv")Sobrescrevendo a tabela final na database indicadores_mercado:
df_otif_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_deca")10.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento_deca | string |
otif_deca | double |
meta_otif_deca | double |
11. Indicadores OTIF Madeira
Indicadores de OTIF para Madeira.
Notebook: indicadores_otif_madeira
Job: inteligencia-mercado_job_prod_indicadores_otif_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_otif_madeira
11.1 Origem:
O arquivo dados_otif_madeira.csv é disponibilizado e atualizado mensalmente no Sharepoint.
11.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content)Lendo arquivo:
df_otif_madeira = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_madeira.csv")Sobrescrevendo a tabela final na database indicadores_mercado:
df_otif_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_madeira")11.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento_madeira | string |
otif_madeira | double |
meta_otif_madeira | double |
12. Indicadores Panorama Mercado Madeira
Indicadores de Panorama de Mercado para Madeira.
Notebook: indicadores_panorama_mercado_madeira
Job: inteligencia-mercado_job_prod_indicadores_panorama_mercado_madeira
Schedule: diário/13:00h
Base fim: indicadores_mercado.tb_indicadores_panorama_mercado_madeira
12.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e tabela vw_mercado_panorama_v2.
12.2 Transformação:
É feita uma query da tabela já aplicando algumas regras de negócio:
query = "select competencia, mercado, case when produto_detalhe in('MDF Fino Cru', 'MDF Grosso Cru') then 'MDF Cru' when produto_detalhe in('MDF Fino BP', 'MDF Grosso BP') then 'MDF Revestido' when produto_detalhe = 'MDP Cru' then 'MDP Cru' when produto_detalhe = 'MDP BP' then 'MDP Revestido' end as produto, sum(volume_m3) as volume_m3_total from madeira.vw_mercado_panorama_v2 where competencia >= 202101 and mercado = 'MI' group by 1, 2, 3 order by 1"Renomeando campos:
colunas_panorama_mercado = ["mes", "mercado", "produto", "volume_m3_total"]
df_panorama_mercado = panorama_mercado.toDF(*colunas_panorama_mercado)Ajustando campo mes:
df_panorama_mercado = df_panorama_mercado.withColumn("mes", col("mes").cast(StringType())).withColumn("mes", concat(substring(col("mes"), 1, 4), lit("-"), substring(col("mes"), 5, 2), lit("-"), lit("01"))).withColumn("mes", col("mes").cast(DateType()))Sobrescrevendo a tabela final na database indicadores_mercado:
df_panorama_mercado.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_panorama_mercado_madeira")12.3 Base Final:
col_name | data_type |
---|---|
mes | date |
mercado | string |
produto | string |
volume_m3_total | double |
13. Indicadores Produtividade Deca
Indicadores de Produtividade para Deca.
Notebook: indicadores_produtividade_deca
Job: inteligencia-mercado_job_prod_indicadores_produtividade_deca
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_minuto_homem_deca
13.1 Origem:
O arquivo Produtividade DECA&RC.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.
13.2 Transformação:
Definindo função para fazer o download do arquivo para diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content)Fazendo o download do arquivo:
download_arquivo("/personal/henrique_fantin_duratex_com_br/Documents/Min_HEq/Produtividade%20DECA%26RC.xlsx", "Produtividade_DECA_RC.xlsx")Lendo o arquivo:
produtividade_deca = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Produtividade_DECA_RC.xlsx")Definindo função para ler os dados dos diferentes segmentos Deca. Passando os parâmetros de negócio, range de células, quantidade de linhas desconsideradas e quantidade de linhas consideradas, a função gera dois dataframes, um para produtividade e outro para POA. Isso é feito através de filtro na coluna: quando é diferente de POA, então é produtividade.
def ler_produtividade(negocio, celulas, pular_linhas, numero_linhas):
#lendo arquivo excel
df = pd.read_excel(produtividade_deca, skiprows = pular_linhas, nrows = numero_linhas, usecols = celulas, index_col = 0).transpose().reset_index()
#definindo dataframe para o poa
df_poa = df
#definindo dataframe para produtividade, coluna 1 é diferente de "POA"
df = df.where(df.iloc[:, 1] != "POA")
df = df.iloc[:, [0, 1, numero_linhas]]
df.columns = ["mes", "ano", "produtividade"]
df = df.dropna(axis = 0)
df = spark.createDataFrame(df)
df = df.withColumn("ano", col("ano").cast(IntegerType()).cast(StringType()))
#ajustandando campo mes para produtividade
df = df.withColumn("mes", when(col("mes").like("JAN%"), concat(lit("01/01/"), col("ano")))
.when(col("mes").like("FEV%"), concat(lit("01/02/"), col("ano")))
.when(col("mes").like("MAR%"), concat(lit("01/03/"), col("ano")))
.when(col("mes").like("ABR%"), concat(lit("01/04/"), col("ano")))
.when(col("mes").like("MAI%"), concat(lit("01/05/"), col("ano")))
.when(col("mes").like("JUN%"), concat(lit("01/06/"), col("ano")))
.when(col("mes").like("JUL%"), concat(lit("01/07/"), col("ano")))
.when(col("mes").like("AGO%"), concat(lit("01/08/"), col("ano")))
.when(col("mes").like("SET%"), concat(lit("01/09/"), col("ano")))
.when(col("mes").like("OUT%"), concat(lit("01/10/"), col("ano")))
.when(col("mes").like("NOV%"), concat(lit("01/11/"), col("ano")))
.when(col("mes").like("DEZ%"), concat(lit("01/12/"), col("ano")))
.otherwise(None))
df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
#trabalhando com dataframe para poa, quando coluna 1 é igual a "POA"
df_poa = df_poa.where(df_poa.iloc[:, 1] == "POA")
df_poa = df_poa.iloc[:, [0, numero_linhas]]
df_poa .columns = ["mes", "poa_produtividade"]
df_poa = df_poa.dropna(axis = 0)
df_poa = spark.createDataFrame(df_poa)
#ajustando campo mes
df_poa = df_poa.withColumn("mes", when(col("mes").like("JAN%"), "01/01/" + ano_atual)
.when(col("mes").like("FEV%"), "01/02/" + ano_atual)
.when(col("mes").like("MAR%"), "01/03/" + ano_atual)
.when(col("mes").like("ABR%"), "01/04/" + ano_atual)
.when(col("mes").like("MAI%"), "01/05/" + ano_atual)
.when(col("mes").like("JUN%"), "01/06/" + ano_atual)
.when(col("mes").like("JUL%"), "01/07/" + ano_atual)
.when(col("mes").like("AGO%"), "01/08/" + ano_atual)
.when(col("mes").like("SET%"), "01/09/" + ano_atual)
.when(col("mes").like("OUT%"), "01/10/" + ano_atual)
.when(col("mes").like("NOV%"), "01/11/" + ano_atual)
.when(col("mes").like("DEZ%"), "01/12/" + ano_atual)
.otherwise(None))
df_poa = df_poa.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
#definindo dataframe final, fazendo um join com os anteriores
df_final = df_poa.join(df, df_poa.mes == df.mes, "left").drop(df.mes)
df_final = df_final.withColumn("segmento", lit(negocio))
df_final = df_final.select("mes", "segmento", "poa_produtividade", "produtividade")
df_final = df_final.withColumn("poa_produtividade", round(col("poa_produtividade"), 2)).withColumn("produtividade", round(col("produtividade"), 2))
return df_finalAplicando função para metais, já definindo os parâmetros de range de células, quantidade de linhas ignoradas e quantidade de linhas consideradas:
df_metais = ler_produtividade("metais", "FA:FZ", 21, 5)Aplicando função para loucas:
df_loucas = ler_produtividade("loucas", "FA:FZ", 54, 8)Aplicando função para Hydra:
df_hydra = ler_produtividade("hydra", "FA:FZ", 93, 4)Aplicando função para revestimento:
df_revestimento = ler_produtividade("revestimento", "FA:FZ", 145, 7)Fazendo a união de todos os dataframes criados:
df_produtividade_deca = df_metais.union(df_loucas).union(df_hydra).union(df_revestimento)Sobrescrevendo a tabela final na database indicadores_mercado:
df_produtividade_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_minuto_homem_deca")13.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento | string |
poa_produtividade | double |
produtividade | double |
14. Indicadores SAC
Indicadores de SAC.
Notebook: indicadores_sac
Job: inteligencia-mercado_job_prod_indicadores_sac
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_sac
14.1 Origem:
O arquivo dados_sac.csv é disponibilizado e atualizado mensalmente no Sharepoint.
14.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content)Lendo arquivo:
df_sac = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_sac.csv")Aplicando regra de negócio para o campo saldo:
df_sac = df_sac.withColumn("saldo", col("casos_abertos") - col("casos_fechados"))Sobrescrevendo a tabela final na database indicadores_mercado:
df_sac.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sac")14.3 Base Final:
col_name | data_type |
---|---|
mes | date |
visao | string |
negocio | string |
casos_abertos | int |
casos_fechados | int |
saldo | int |
15. Indicadores Sell-in Deca - Chuveiros
Indicadores de Sell-in para Deca.
Notebook: indicadores_sell_in_deca_chuveiros
Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_chuveiros
15.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx.
15.2 Transformação:
Lendo sheet do arquivo:
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")Renomeando colunas:
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "tipo", "sku", "meses", "desviador", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"]Tratando os valores missing:
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)Transformando em dataframe do Spark:
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)Sobrescrevendo a tabela final na database indicadores_mercado:
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_chuveiros")15.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
tipo | string |
sku | string |
meses | timestamp |
desviador | string |
linha | string |
tensao | string |
potencia | string |
modelo | string |
ean | string |
valor | double |
volume | double |
16. Indicadores Sell-in Deca - Cubas
Indicadores de Sell-in para Deca.
Notebook: indicadores_sell_in_deca_cubas
Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_cubas
16.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx.
16.2 Transformação:
Lendo sheet do arquivo:
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")Renomeando colunas:
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "aplicacao", "instalacao", "sku", "meses", "formato", "cor", "acabamento", "ean", "valor", "volume"]Tratando os valores missing:
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)Transformando em dataframe do Spark:
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)Sobrescrevendo a tabela final na database indicadores_mercado:
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_cubas")16.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
aplicacao | string |
instalacao | string |
sku | string |
meses | timestamp |
formato | string |
cor | string |
acabamento | string |
ean | string |
valor | double |
volume | double |
17. Indicadores Sell-in Deca - Torneiras
Indicadores de Sell-in para Deca.
Notebook: indicadores_sell_in_deca_torneiras
Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_torneiras
17.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx.
17.2 Transformação:
Lendo sheet do arquivo:
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")Renomeando colunas:
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "subcategoria", "aplicacao", "instalacao", "sku", "meses", "bica", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"]Tratando os valores missing:
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)Transformando em dataframe do Spark:
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)Sobrescrevendo a tabela final na database indicadores_mercado:
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_torneiras")17.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
subcategoria | string |
aplicacao | string |
instalacao | string |
sku | string |
meses | timestamp |
bica | string |
linha | string |
tensao | string |
potencia | string |
modelo | string |
ean | string |
valor | double |
volume | double |
18. Indicadores Market Share, Sell-in e Sell-out Madeira
Indicadores para Market Share, Sell-in e Sell-out de Madeira.
Notebook: indicadores_share_sell_in_sell_out_madeira
Job: inteligencia-mercado_job_prod_indicadores_share_sell_in_sell_out_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_share_sell_in_sell_out_madeira
18.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e view vw_base_mktshare_sellin_sellout.
18.2 Transformação:
Fazendo uma query com um select de todos os campos da tabela origem:
query = "select * from madeira.vw_base_mktshare_sellin_sellout"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
consulta_madeira = redshift_to_dataframe(query = query, filename = "vw_base_mktshare_sellin_sellout", bucket_name = bucket_name)Renomeando as colunas:
olunas_madeira = ["tipo", "ano", "competencia", "mercado", "produto", "produto_detalhe", "volume_m3", "volume_m2", "volume_m3_cap_dtx", "volume_m3_cap_mercex", "data_atualizacao_base", "produto_segmento", "quantidade_volume_m3_liquido", "quantidade_volume_m2"]
df_madeira = consulta_madeira.toDF(*colunas_madeira)Ajustando tipos dos campos:
df_madeira = df_madeira.withColumn("ano", col("ano").cast(IntegerType())).withColumn("competencia", col("competencia").cast(IntegerType())).withColumn("volume_m3", col("volume_m3").cast(DoubleType())).withColumn("volume_m2", col("volume_m2").cast(DoubleType())).withColumn("volume_m3_cap_dtx", col("volume_m3_cap_dtx").cast(DoubleType())).withColumn("volume_m3_cap_mercex", col("volume_m3_cap_mercex").cast(DoubleType()))Sobrescrevendo a tabela final na database indicadores_mercado:
df_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_share_sell_in_sell_out_madeira")18.3 Base Final:
col_name | data_type |
---|---|
tipo | string |
ano | int |
competencia | int |
mercado | string |
produto | string |
produto_detalhe | string |
volume_m3 | double |
volume_m2 | double |
volume_m3_cap_dtx | double |
volume_m3_cap_mercex | double |
data_atualizacao_base | date |
produto_segmento | string |
quantidade_volume_m3_liquido | double |
quantidade_volume_m2 | double |
19. Indicadores Vendas Deca
Indicadores para Vendas de Deca.
Notebook: indicadores_venda_deca
Job: inteligencia-mercado_job_prod_indicadores_venda_deca
Schedule: diário/10:00h
Base fim: indicadores_mercado.tb_indicadores_vendas_deca
19.1 Origem:
Consulta da tabela tb_resultado_comercial da database large e consulta database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema large e tabela tb_metas_comercial_hierarquia_produto.
19.2 Transformação:
Definindo função para criação de dataframe com range de datas, desde 01/01/2022 até hoje.
for negocio in negocios:
df = pd.date_range(start = "2022-01-01", end = date.today())
df = pd.DataFrame(df, columns = ["data"])
df = spark.createDataFrame(df)
df = df.withColumn("data", to_date("data", 'yyyy-MM-dd')).withColumn("mes", to_date(date_format("data", 'yyyy-MM-01')))
df = df.withColumn("negocio", lit(negocio))
if negocio == negocios[0]:
df_datas = df
else:
df_datas = df_datas.union(df)Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
df_datas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_datas")Fazendo consulta para vendas na large, já aplicando algumas regras de negócio:
df_vendas = spark.sql("""
select
t1.data_competencia as data,
to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes,
(case when t1.codigo_setor_atividade = 'CS' then 'loucas'
when t1.codigo_setor_atividade = 'MS' then 'metais'
when t1.codigo_setor_atividade = '01' then 'revestimento'
when t1.codigo_setor_atividade = 'HY' then 'hydra' end) as negocio,
round(sum(t1.valor_receita_liquida), 2) as receita_liquida_vendas
from large.tb_resultado_comercial t1
where t1.codigo_setor_atividade in("CS", "MS", "HY", "01")
and t1.status_ordem_venda in("EXPORTAÇÃO", "VENDA", "DEVOLUÇÃO", "CANCELAMENTO", "CINI")
and t1.data_competencia between '2022-01-01' and (current_date() - 1)
group by 1, 2, 3
order by 3, 2, 1 asc
""")Fazendo join entre o dataframe de datas e o de vendas, com o objetivo de agregar todos os dias do range na base, mesmo que sem registro de vendas:
df_vendas_ajustado = df_datas.join(df_vendas, ["data", "mes", "negocio"], "left")Definindo um particionamento no dataframe df_vendas_ajustado, com o objetivo de posteriormente fazer uma soma acumuladas das vendas agrupada por negócio e mês:
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0))Criando campo com soma acumulada, utilizando a partição criada anteriormente:
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part))Criando campo com soma acumulada, utilizando a partição criada anteriormente:
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part))Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
df_vendas_acumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_vendas_diarias_deca")Então, é feita uma query, já aplicando algumas regras de negócio, na large do Redshift, com o objetivo de obter as metas:
query = "select data_competencia as mes, (case when codigo_setor_atividade = 'CS' then 'loucas' when codigo_setor_atividade = 'MS' then 'metais' when codigo_setor_atividade = '01' then 'revestimento' when codigo_setor_atividade = 'HY' then 'hydra' end) as negocio, tipo_meta, sum(valor_receita_liquida) as valor_meta from large.tb_metas_comercial_hierarquia_produto where data_competencia >= '2021-01-01' and codigo_setor_atividade in('CS', 'HY', 'MS', '01') and tipo_meta in('POA', 'PEV') group by 1, 2, 3 order by 1, 2, 3"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
consulta_metas = redshift_to_dataframe(query = query, filename = "tb_metas_comercial_hierarquia_produto", bucket_name = bucket_name)Renomeando colunas:
colunas_metas = ["mes", "negocio", "tipo_meta", "valor_meta_mes"]
df_metas = consulta_metas.toDF(*colunas_metas)Próximo passo é definir uma função para cálculo de quantidade de dias dentro de cada mês, com o objetivo de posteriormente calcular a meta diárias através da mensal:
def numero_dias(data):
mes = data.month
ano = data.year
#tratativa caso fevereiro (ano bissexto)
cons = 0
#todo ano divisível por 400 e ao mesmo tempo 100 é bissexto
if ano % 400 == 0 and ano % 100 == 0:
cons = 1
#todo ano divisível por 4 é bissexto
elif ano % 4 == 0:
cons = 1
else:
cons = 0
if mes == 2:
return 28 + cons
#meses com quantidade ímpar de dias
impares = [1, 3, 5, 7, 8, 10, 12]
if mes in impares:
return 31
return 30Aplicando a função e calculando a meta diária:
numero_dias_udf = udf(lambda z: numero_dias(z), IntegerType())
df_metas = df_metas.withColumn("valor_meta_diaria", (col("valor_meta_mes") / col("numero_dias_mes"))) Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
df_metas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_metas_mensais_deca")Criando colunas para cada tipo de meta, PEV e POA:
df_metas_pev = df_metas.where(col("tipo_meta") == "PEV").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_pev").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_pev", functions.round("valor_meta_diaria_pev", 2))
df_metas_poa = df_metas.where(col("tipo_meta") == "POA").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_poa").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_poa", functions.round("valor_meta_diaria_poa", 2))Criando dataframe unindo as duas metas:
df_metas_final = df_metas_pev.join(df_metas_poa, ["mes", "negocio"])Fazendo join entre a base de datas e a de metas:
df_datas_ajuste = spark.sql("select * from indicadores_mercado.tb_datas")
df_metas_ajustado = df_datas_ajuste.join(df_metas_final, ["mes", "negocio"], "left")Por fim, criando a tabela final. Fazendo query na database para vendas e join com o dataframe de metas:
df_vendas_metas_deca = df_vendas.join(df_metas_ajustado, ["data", "mes", "negocio"])Criando particionamento, para posteriormente calcular as metas acumuladas por mês e negócio:
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0))Criando as metas acumuladas utilizando o particionamento:
df_vendas_metas_deca_acumuladas = df_vendas_metas_deca.withColumn("metas_acumuladas_pev", functions.sum("valor_meta_diaria_pev").over(part)).withColumn("metas_acumuladas_poa", functions.sum("valor_meta_diaria_poa").over(part))Sobrescrevendo a tabela final na database indicadores_mercado:
df_vendas_metas_deca_acumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_vendas_deca")19.3 Base Final:
col_name | data_type |
---|---|
data | date |
mes | date |
negocio | string |
receita_liquida_vendas | double |
receita_liquida_vendas_acumuladas | double |
valor_meta_diaria_pev | double |
valor_meta_diaria_poa | double |
metas_acumuladas_pev | double |
metas_acumuladas_poa | double |