Conjunto de notebooks que têm tem como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para serem consumidos pelo dashboard desenvolvido para o projeto.
Índice
1. Indicadores Carteira Deca
Indicadores para pedidos em carteira Deca.
1.1 Origem:
...
Notebook: indicadores_carteira_deca
Job: inteligencia-mercado_job_prod_indicadores_carteira_deca
Schedule: diário/18:00h
Base fim: indicadores_mercado.tb_indicadores_carteira_deca
1.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema smalle tabela tb_ordem_pendente.
...
Indicadores para pedidos em status de devolução para Deca.
Notebook: indicadores_devolucao_deca
Job: inteligencia-mercado_job_prod_indicadores_devolucao_deca
Schedule: diário/14:00h
Base fim: indicadores_mercado.tb_indicadores_devolucoes_deca
2.1 Origem:
Database large e tabela tb_resultado_comercial.
...
Indicadores para pedidos em status de devolução para Madeira.
3.1 Origem:
Database analytics_prd e tabela custos_rem.
3.2 Transformação:
...
Notebook: indicadores_devolucao_madeira
Job: inteligencia-mercado_job_prod_indicadores_devolucao_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_devolucoes_madeira
3.1 Origem:
Database analytics_prd e tabela custos_rem.
3.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
...
Indicadores financeiros disponibilizados pela Controladoria.
Notebook: indicadores_financeiros
Job: inteligencia-mercado_job_prod_indicadores_financeiros
Schedule: diário/15:00h
Base fim: indicadores_mercado.tb_indicadores_financeiros_2022
4.1 Origem:
A Controladoria atualiza mensalmente alguns arquivos Excel disponibilizados em no Sharepoint Relatório Controladoria.
...
Code Block | ||
---|---|---|
| ||
df_poa = df_poa.select( "mes", "negocio", col("poa_receita_liquida_vendas").cast(DoubleType()), col("poa_ebitda_recorrente").cast(DoubleType()), col("poa_eva_recorrente").cast(DoubleType()), col("poa_fluxo_caixa_livre_total").cast(DoubleType()) ).withColumn("poa_eva_recorrente", col("poa_eva_recorrente")/1000).withColumn("poa_fluxo_caixa_livre_total", col("poa_fluxo_caixa_livre_total")/1000) |
4.2.
...
6 Base Final
Todos os dataframes gerados nos passos anteriores agora são unidos para formar um único, utilizando os campos mes e negocio:
Code Block | ||
---|---|---|
| ||
campos_join = ["mes", "negocio"]
df_indicadores_financeiros = df_dre.join(
df_eva, campos_join
).join(
df_fc, campos_join
).join(
df_pmp, campos_join
).join(
df_frc, campos_join
).join(
df_frc_eva, campos_join
).join(
df_frc_fc, campos_join
).join(
df_poa, campos_join
).orderBy("negocio", "mes") |
...
, campos_join
).orderBy("negocio", "mes") |
Sobrescrevendo a tabela final no database :
Code Block | ||
---|---|---|
| ||
df_indicadores_financeiros.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_financeiros_{}".format(ano_atual)) |
4.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
receita_liquida_vendas | double |
ebitda_recorrente | double |
eva_recorrente | double |
fluxo_caixa_livre_total | double |
pmp | double |
frc_receita_liquida_vendas | double |
frc_ebitda_recorrente | double |
frc_eva_recorrente | double |
frc_fluxo_caixa_livre_total | double |
poa_receita_liquida_vendas | double |
poa_ebitda_recorrente | double |
poa_eva_recorrente | double |
poa_fluxo_caixa_livre_total | double |
5. Indicadores Gente
Indicadores de Recursos Humanos.
Notebook: indicadores_gente
Job: inteligencia-mercado_job_prod_indicadores_gente
Schedule: sextas-feiras, 14:00h
Base fim: indicadores_mercado.tb_indicadores_gente
5.1 Origem:
O arquivo indicadores_gente.csv é disponibilizado e atualizado mensalmente no Sharepoint.
5.2 Transformação:
Definindo função para o download do arquivo para o dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Lendo o arquivo:
Code Block | ||
---|---|---|
| ||
df_gente = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/indicadores_gente.csv") |
Aplicando regra de negócio para o campo taxa_afastamento:
Code Block | ||
---|---|---|
| ||
df_gente = df_gente.withColumn("taxa_afastamento", col("taxa_absenteismo_com_afastamento") - col("taxa_absenteismo_sem_afastamento")) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_gente.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_gente") |
5.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
quantidade_cargos_lideranca_mulheres | int |
quantidade_cargos_lideranca_total | int |
taxa_desligamentos_voluntarios | double |
taxa_desligamentos_involuntarios | double |
taxa_desligamentos_total | double |
taxa_absenteismo_sem_afastamento | double |
taxa_absenteismo_com_afastamento | double |
taxa_afastamento | double |
7. Indicadores Margem Madeira
Indicadores de Margem para Madeira.
Notebook: indicadores_margem_madeira
Job: inteligencia-mercado_job_prod_indicadores_margem_madeira
Schedule: diário/14:22h
Base fim: indicadores_mercado.tb_indicadores_margem_madeira
7.1 Origem:
É feita uma consulta na fonte de dados vw_ren_rateio_aj_SQL, no site Madeira do Tableau Server:
7.2 Transformação:
São utilizadas as bibliotecas tableauserverclient e tableauhyperapi do Python:
Code Block | ||
---|---|---|
| ||
import tableauserverclient as tsc
import tableauhyperapi
from tableauhyperapi import HyperProcess, Telemetry, Connection, TableName, escape_name, escape_string_literal |
Definindo variáveis para conexão no Tableau Server:
Code Block | ||
---|---|---|
| ||
usuario = "***"
senha = "***"
site = "Madeira"
servidor = "https://analytics.duratex.com.br"
id_datasource = "e88f21c1-bb2f-4516-b612-b62e8af74b94"
nome_database = "vw_ren_rateio_aj_SQL"
diretorio_download = "/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" |
Definindo parâmetros da conexão com o Tableau Server:
Code Block | ||
---|---|---|
| ||
autenticador = tsc.TableauAuth(usuario, senha, site)
servidor = tsc.Server(servidor) |
Fazendo o download do arquivo .tdsx:
Code Block | ||
---|---|---|
| ||
with servidor.auth.sign_in(autenticador):
caminho_arquivo = servidor.datasources.download(id_datasource, filepath = diretorio_download, include_extract = True) |
O arquivo de extensão .tdsx nada mais é que uma compactação dos arquivos de fonte de dados do Tableau. Portanto foi utilizada a biblioteca zipfile do Python para fazer essa descompactação em diretório do dbfs:
Code Block | ||
---|---|---|
| ||
with zipfile.ZipFile(diretorio_download + nome_database + ".tdsx", "r") as arquivo_zipado:
arquivo_zipado.extractall(diretorio_download) |
Após a descompactação, é utilizada a biblioteca tableauhyperapi para extrair os dados do arquivo .hyper, que foi descompactado no diretório /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Data/Extracts/ no processo anterior. É feita uma query para trazer apenas os campos necessários:
Code Block |
---|
with HyperProcess(telemetry = Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
with Connection(endpoint = hyper.endpoint, database = diretorio_download + "Data/Extracts/" + nome_database + ".hyper") as conexao:
with conexao.execute_query(query = f"select {escape_name('cd_competencia')}, {escape_name('dc_linha_produto')}, {escape_name('gerencia')}, {escape_name('vl_receita_aj')}, {escape_name('md_m3_quantidade')}, {escape_name('vl_custo_industrial_total_CIT')}, {escape_name('vl_custo_comercial_total_CCO')} from {TableName('Extract', 'Extract')} where {escape_name('cd_setor_atividade')} = {escape_string_literal('CH')} and {escape_name('cd_competencia')} >= 202101 and {escape_name('gerencia')} = {escape_string_literal('MERCADO EXTERNO')}") as resultado:
resultado_consulta = list(resultado) |
Definindo schema para o dataframe que será criado a partir da lista resultado_consulta:
Code Block | ||
---|---|---|
| ||
esquema_margem = StructType([
StructField('mes', StringType(), True),
StructField('produto', StringType(), True),
StructField('mercado', StringType(), True),
StructField('receita', DoubleType(), True),
StructField('quantidade', DoubleType(), True),
StructField('valor_cit', DoubleType(), True),
StructField('valor_cco', DoubleType(), True)
]) |
Criando o dataframe:
Code Block | ||
---|---|---|
| ||
df_margem = spark.createDataFrame(resultado_consulta, esquema_margem) |
Utilizando o Spark SQL para aplicar algumas regras de negócio:
Code Block |
---|
df_margem_calculos = spark.sql("""
select
mes,
(case when produto like 'MDF%' then 'mdf' when produto like 'MDP%' then 'mdp' else 'outros' end) as produto,
(sum(receita) / sum(quantidade)) as receita_m3,
(sum(valor_cit) / sum(quantidade)) as cit_m3,
(sum(valor_cco) / sum(quantidade)) as cco_m3
from margem_view
group by 1, 2
order by 1, 2 asc
""") |
Aplicando mais algumas regras de negócio:
Code Block |
---|
df_margem_calculos = df_margem_calculos.withColumn("mop_m3", col("receita_m3") - col("cit_m3") - col("cco_m3")).withColumn("percentual_margem", col("mop_m3") / col("receita_m3")) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_margem_calculos.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_margem_madeira") |
7.3 Base Final:
col_name | data_type |
---|---|
mes | string |
produto | string |
receita_m3 | double |
cit_m3 | double |
cco_m3 | double |
mop_m3 | double |
9. Indicadores OEE Madeira
Indicadores de OEE para Madeira.
Notebook: indicadores_oee_madeira
Job: inteligencia-mercado_job_prod_indicadores_oee_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_oee_madeira
9.1 Origem:
O arquivo indicadores_oee_madeira.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.
9.2 Transformação:
Definindo função para ler o arquivo:
Code Block | ||
---|---|---|
| ||
def extrair_excel(arquivo_leitura, nome_sheet, celulas, colunas, pular_linhas):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_leitura)
response.raise_for_status()
df = pd.read_excel(response.content, skiprows = pular_linhas, usecols = celulas, sheet_name = nome_sheet, names = colunas)
df = spark.createDataFrame(df)
return df |
Lendo a sheet cru do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas. Utilizando o while e a função isinstance() para repetir o processo até que o dataframe esteja criado:
Code Block | ||
---|---|---|
| ||
df_oee_cru = []
while not isinstance(df_oee_cru, DataFrame):
try:
df_oee_cru = extrair_excel(arquivo, "cru", "B:D", ["mes", "oee", "oee_meta_global"], 0)
except:
pass |
Definindo o nome do produto como constante:
Code Block | ||
---|---|---|
| ||
df_oee_cru = df_oee_cru.withColumn("produto", lit("cru")) |
Lendo a sheet revestido do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:
Code Block | ||
---|---|---|
| ||
df_oee_revestido = []
while not isinstance(df_oee_revestido, DataFrame):
try:
df_oee_revestido = extrair_excel(arquivo, "revestido", "B:D", ["mes", "oee", "oee_meta_global"], 0)
except:
pass |
Definindo o nome do produto como constante:
Code Block | ||
---|---|---|
| ||
df_oee_revestido = df_oee_revestido.withColumn("produto", lit("revestido")) |
Lendo a sheet historico do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:
Code Block | ||
---|---|---|
| ||
df_oee_historico = []
while not isinstance(df_oee_historico, DataFrame):
try:
df_oee_historico = extrair_excel(arquivo, "historico", "B:E", ["mes", "oee", "oee_meta_global", "produto"], 0)
except:
pass |
Fazendo a união de todos os dataframes criados:
Code Block | ||
---|---|---|
| ||
df_oee_madeira = df_oee_historico.union(df_oee_cru).union(df_oee_revestido) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_oee_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_oee_madeira") |
9.3 Base Final:
col_name | data_type |
---|---|
mes | date |
oee | double |
oee_meta_global | double |
produto | string |
10. Indicadores OTIF Deca
Indicadores de OTIF para Deca.
Notebook: indicadores_otif_deca
Job: inteligencia-mercado_job_prod_indicadores_otif_deca
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_otif_deca
10.1 Origem:
O arquivo dados_otif_deca.csv é disponibilizado e atualizado mensalmente no Sharepoint.
10.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Lendo arquivo:
Code Block | ||
---|---|---|
| ||
df_otif_deca = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_deca.csv") |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_otif_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_deca") |
10.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento_deca | string |
otif_deca | double |
meta_otif_deca | double |
11. Indicadores OTIF Madeira
Indicadores de OTIF para Madeira.
Notebook: indicadores_otif_madeira
Job: inteligencia-mercado_job_prod_indicadores_otif_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_otif_madeira
11.1 Origem:
O arquivo dados_otif_madeira.csv é disponibilizado e atualizado mensalmente no Sharepoint.
11.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Lendo arquivo:
Code Block | ||
---|---|---|
| ||
df_otif_madeira = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_madeira.csv") |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_otif_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_madeira") |
11.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento_madeira | string |
otif_madeira | double |
meta_otif_madeira | double |
12. Indicadores Panorama Mercado Madeira
Indicadores de Panorama de Mercado para Madeira.
Notebook: indicadores_panorama_mercado_madeira
Job: inteligencia-mercado_job_prod_indicadores_panorama_mercado_madeira
Schedule: diário/13:00h
Base fim: indicadores_mercado.tb_indicadores_panorama_mercado_madeira
12.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e tabela vw_mercado_panorama_v2.
12.2 Transformação:
É feita uma query da tabela já aplicando algumas regras de negócio:
Code Block | ||
---|---|---|
| ||
query = "select competencia, mercado, case when produto_detalhe in('MDF Fino Cru', 'MDF Grosso Cru') then 'MDF Cru' when produto_detalhe in('MDF Fino BP', 'MDF Grosso BP') then 'MDF Revestido' when produto_detalhe = 'MDP Cru' then 'MDP Cru' when produto_detalhe = 'MDP BP' then 'MDP Revestido' end as produto, sum(volume_m3) as volume_m3_total from madeira.vw_mercado_panorama_v2 where competencia >= 202101 and mercado = 'MI' group by 1, 2, 3 order by 1" |
Renomeando campos:
Code Block | ||
---|---|---|
| ||
colunas_panorama_mercado = ["mes", "mercado", "produto", "volume_m3_total"]
df_panorama_mercado = panorama_mercado.toDF(*colunas_panorama_mercado) |
Ajustando campo mes:
Code Block | ||
---|---|---|
| ||
df_panorama_mercado = df_panorama_mercado.withColumn("mes", col("mes").cast(StringType())).withColumn("mes", concat(substring(col("mes"), 1, 4), lit("-"), substring(col("mes"), 5, 2), lit("-"), lit("01"))).withColumn("mes", col("mes").cast(DateType())) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_panorama_mercado.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_panorama_mercado_madeira") |
12.3 Base Final:
col_name | data_type |
---|---|
mes | date |
mercado | string |
produto | string |
volume_m3_total | double |
13. Indicadores Produtividade Deca
Indicadores de Produtividade para Deca.
Notebook: indicadores_produtividade_deca
Job: inteligencia-mercado_job_prod_indicadores_produtividade_deca
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_minuto_homem_deca
13.1 Origem:
O arquivo Produtividade DECA&RC.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.
13.2 Transformação:
Definindo função para fazer o download do arquivo para diretório do dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Fazendo o download do arquivo:
Code Block | ||
---|---|---|
| ||
download_arquivo("/personal/henrique_fantin_duratex_com_br/Documents/Min_HEq/Produtividade%20DECA%26RC.xlsx", "Produtividade_DECA_RC.xlsx") |
Lendo o arquivo:
Code Block | ||
---|---|---|
| ||
produtividade_deca = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Produtividade_DECA_RC.xlsx") |
Definindo função para ler os dados dos diferentes segmentos Deca. Passando os parâmetros de negócio, range de células, quantidade de linhas desconsideradas e quantidade de linhas consideradas, a função gera dois dataframes, um para produtividade e outro para POA. Isso é feito através de filtro na coluna: quando é diferente de POA, então é produtividade.
Code Block | ||
---|---|---|
| ||
def ler_produtividade(negocio, celulas, pular_linhas, numero_linhas):
#lendo arquivo excel
df = pd.read_excel(produtividade_deca, skiprows = pular_linhas, nrows = numero_linhas, usecols = celulas, index_col = 0).transpose().reset_index()
#definindo dataframe para o poa
df_poa = df
#definindo dataframe para produtividade, coluna 1 é diferente de "POA"
df = df.where(df.iloc[:, 1] != "POA")
df = df.iloc[:, [0, 1, numero_linhas]]
df.columns = ["mes", "ano", "produtividade"]
df = df.dropna(axis = 0)
df = spark.createDataFrame(df)
df = df.withColumn("ano", col("ano").cast(IntegerType()).cast(StringType()))
#ajustandando campo mes para produtividade
df = df.withColumn("mes", when(col("mes").like("JAN%"), concat(lit("01/01/"), col("ano")))
.when(col("mes").like("FEV%"), concat(lit("01/02/"), col("ano")))
.when(col("mes").like("MAR%"), concat(lit("01/03/"), col("ano")))
.when(col("mes").like("ABR%"), concat(lit("01/04/"), col("ano")))
.when(col("mes").like("MAI%"), concat(lit("01/05/"), col("ano")))
.when(col("mes").like("JUN%"), concat(lit("01/06/"), col("ano")))
.when(col("mes").like("JUL%"), concat(lit("01/07/"), col("ano")))
.when(col("mes").like("AGO%"), concat(lit("01/08/"), col("ano")))
.when(col("mes").like("SET%"), concat(lit("01/09/"), col("ano")))
.when(col("mes").like("OUT%"), concat(lit("01/10/"), col("ano")))
.when(col("mes").like("NOV%"), concat(lit("01/11/"), col("ano")))
.when(col("mes").like("DEZ%"), concat(lit("01/12/"), col("ano")))
.otherwise(None))
df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
#trabalhando com dataframe para poa, quando coluna 1 é igual a "POA"
df_poa = df_poa.where(df_poa.iloc[:, 1] == "POA")
df_poa = df_poa.iloc[:, [0, numero_linhas]]
df_poa .columns = ["mes", "poa_produtividade"]
df_poa = df_poa.dropna(axis = 0)
df_poa = spark.createDataFrame(df_poa)
#ajustando campo mes
df_poa = df_poa.withColumn("mes", when(col("mes").like("JAN%"), "01/01/" + ano_atual)
.when(col("mes").like("FEV%"), "01/02/" + ano_atual)
.when(col("mes").like("MAR%"), "01/03/" + ano_atual)
.when(col("mes").like("ABR%"), "01/04/" + ano_atual)
.when(col("mes").like("MAI%"), "01/05/" + ano_atual)
.when(col("mes").like("JUN%"), "01/06/" + ano_atual)
.when(col("mes").like("JUL%"), "01/07/" + ano_atual)
.when(col("mes").like("AGO%"), "01/08/" + ano_atual)
.when(col("mes").like("SET%"), "01/09/" + ano_atual)
.when(col("mes").like("OUT%"), "01/10/" + ano_atual)
.when(col("mes").like("NOV%"), "01/11/" + ano_atual)
.when(col("mes").like("DEZ%"), "01/12/" + ano_atual)
.otherwise(None))
df_poa = df_poa.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
#definindo dataframe final, fazendo um join com os anteriores
df_final = df_poa.join(df, df_poa.mes == df.mes, "left").drop(df.mes)
df_final = df_final.withColumn("segmento", lit(negocio))
df_final = df_final.select("mes", "segmento", "poa_produtividade", "produtividade")
df_final = df_final.withColumn("poa_produtividade", round(col("poa_produtividade"), 2)).withColumn("produtividade", round(col("produtividade"), 2))
return df_final |
Aplicando função para metais, já definindo os parâmetros de range de células, quantidade de linhas ignoradas e quantidade de linhas consideradas:
Code Block | ||
---|---|---|
| ||
df_metais = ler_produtividade("metais", "FA:FZ", 21, 5) |
Aplicando função para loucas:
Code Block | ||
---|---|---|
| ||
df_loucas = ler_produtividade("loucas", "FA:FZ", 54, 8) |
Aplicando função para Hydra:
Code Block | ||
---|---|---|
| ||
df_hydra = ler_produtividade("hydra", "FA:FZ", 93, 4) |
Aplicando função para revestimento:
Code Block | ||
---|---|---|
| ||
df_revestimento = ler_produtividade("revestimento", "FA:FZ", 145, 7) |
Fazendo a união de todos os dataframes criados:
Code Block | ||
---|---|---|
| ||
df_produtividade_deca = df_metais.union(df_loucas).union(df_hydra).union(df_revestimento) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_produtividade_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_minuto_homem_deca") |
13.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento | string |
poa_produtividade | double |
produtividade | double |
14. Indicadores SAC
Indicadores de SAC.
Notebook: indicadores_sac
Job: inteligencia-mercado_job_prod_indicadores_sac
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_sac
14.1 Origem:
O arquivo dados_sac.csv é disponibilizado e atualizado mensalmente no Sharepoint.
14.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Lendo arquivo:
Code Block | ||
---|---|---|
| ||
df_sac = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_sac.csv") |
Aplicando regra de negócio para o campo saldo:
Code Block | ||
---|---|---|
| ||
df_sac = df_sac.withColumn("saldo", col("casos_abertos") - col("casos_fechados")) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_sac.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sac") |
14.3 Base Final:
col_name | data_type |
---|---|
mes | date |
visao | string |
negocio | string |
casos_abertos | int |
casos_fechados | int |
saldo | int |
15. Indicadores Sell-in Deca - Chuveiros
Indicadores de Sell-in para Deca.
Notebook: indicadores_sell_in_deca_chuveiros
Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_chuveiros
15.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx.
15.2 Transformação:
Lendo sheet do arquivo:
Code Block | ||
---|---|---|
| ||
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -") |
Renomeando colunas:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "tipo", "sku", "meses", "desviador", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"] |
Tratando os valores missing:
Code Block | ||
---|---|---|
| ||
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str) |
Transformando em dataframe do Spark:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_chuveiros") |
15.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
tipo | string |
sku | string |
meses | timestamp |
desviador | string |
linha | string |
tensao | string |
potencia | string |
modelo | string |
ean | string |
valor | double |
volume | double |
16. Indicadores Sell-in Deca - Cubas
Indicadores de Sell-in para Deca.
Notebook: indicadores_sell_in_deca_cubas
Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_cubas
16.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx.
16.2 Transformação:
Lendo sheet do arquivo:
Code Block | ||
---|---|---|
| ||
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -") |
Renomeando colunas:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "aplicacao", "instalacao", "sku", "meses", "formato", "cor", "acabamento", "ean", "valor", "volume"] |
Tratando os valores missing:
Code Block | ||
---|---|---|
| ||
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str) |
Transformando em dataframe do Spark:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_cubas") |
16.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
aplicacao | string |
instalacao | string |
sku | string |
meses | timestamp |
formato | string |
cor | string |
acabamento | string |
ean | string |
valor | double |
volume | double |
17. Indicadores Sell-in Deca - Torneiras
Indicadores de Sell-in para Deca.
Notebook: indicadores_sell_in_deca_torneiras
Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_torneiras
17.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx.
17.2 Transformação:
Lendo sheet do arquivo:
Code Block | ||
---|---|---|
| ||
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -") |
Renomeando colunas:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "subcategoria", "aplicacao", "instalacao", "sku", "meses", "bica", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"] |
Tratando os valores missing:
Code Block | ||
---|---|---|
| ||
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str) |
Transformando em dataframe do Spark:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_torneiras") |
17.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
subcategoria | string |
aplicacao | string |
instalacao | string |
sku | string |
meses | timestamp |
bica | string |
linha | string |
tensao | string |
potencia | string |
modelo | string |
ean | string |
valor | double |
volume | double |
18. Indicadores Market Share, Sell-in e Sell-out Madeira
Indicadores para Market Share, Sell-in e Sell-out de Madeira.
Notebook: indicadores_share_sell_in_sell_out_madeira
Job: inteligencia-mercado_job_prod_indicadores_share_sell_in_sell_out_madeira
Schedule: diário/12:00h
Base fim: indicadores_mercado.tb_indicadores_share_sell_in_sell_out_madeira
18.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e view vw_base_mktshare_sellin_sellout.
18.2 Transformação:
Fazendo uma query com um select de todos os campos da tabela origem:
Code Block | ||
---|---|---|
| ||
query = "select * from madeira.vw_base_mktshare_sellin_sellout"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
consulta_madeira = redshift_to_dataframe(query = query, filename = "vw_base_mktshare_sellin_sellout", bucket_name = bucket_name) |
Renomeando as colunas:
Code Block | ||
---|---|---|
| ||
olunas_madeira = ["tipo", "ano", "competencia", "mercado", "produto", "produto_detalhe", "volume_m3", "volume_m2", "volume_m3_cap_dtx", "volume_m3_cap_mercex", "data_atualizacao_base", "produto_segmento", "quantidade_volume_m3_liquido", "quantidade_volume_m2"]
df_madeira = consulta_madeira.toDF(*colunas_madeira) |
Ajustando tipos dos campos:
Code Block | ||
---|---|---|
| ||
df_madeira = df_madeira.withColumn("ano", col("ano").cast(IntegerType())).withColumn("competencia", col("competencia").cast(IntegerType())).withColumn("volume_m3", col("volume_m3").cast(DoubleType())).withColumn("volume_m2", col("volume_m2").cast(DoubleType())).withColumn("volume_m3_cap_dtx", col("volume_m3_cap_dtx").cast(DoubleType())).withColumn("volume_m3_cap_mercex", col("volume_m3_cap_mercex").cast(DoubleType())) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_share_sell_in_sell_out_madeira") |
18.3 Base Final:
col_name | data_type |
---|---|
tipo | string |
ano | int |
competencia | int |
mercado | string |
produto | string |
produto_detalhe | string |
volume_m3 | double |
volume_m2 | double |
volume_m3_cap_dtx | double |
volume_m3_cap_mercex | double |
data_atualizacao_base | date |
produto_segmento | string |
quantidade_volume_m3_liquido | double |
quantidade_volume_m2 | double |
19. Indicadores Vendas Deca
Indicadores para Vendas de Deca.
Notebook: indicadores_venda_deca
Job: inteligencia-mercado_job_prod_indicadores_venda_deca
Schedule: diário/10:00h
Base fim: indicadores_mercado.tb_indicadores_vendas_deca
19.1 Origem:
Consulta da tabela tb_resultado_comercial da database large e consulta database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema large e tabela tb_metas_comercial_hierarquia_produto.
19.2 Transformação:
Definindo função para criação de dataframe com range de datas, desde 01/01/2022 até hoje.
Code Block | ||
---|---|---|
| ||
for negocio in negocios:
df = pd.date_range(start = "2022-01-01", end = date.today())
df = pd.DataFrame(df, columns = ["data"])
df = spark.createDataFrame(df)
df = df.withColumn("data", to_date("data", 'yyyy-MM-dd')).withColumn("mes", to_date(date_format("data", 'yyyy-MM-01')))
df = df.withColumn("negocio", lit(negocio))
if negocio == negocios[0]:
df_datas = df
else:
df_datas = df_datas.union(df) |
Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
Code Block | ||
---|---|---|
| ||
df_datas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_datas") |
Fazendo consulta para vendas na large, já aplicando algumas regras de negócio:
Code Block | ||
---|---|---|
| ||
df_vendas = spark.sql("""
select
t1.data_competencia as data,
to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes,
(case when t1.codigo_setor_atividade = 'CS' then 'loucas'
when t1.codigo_setor_atividade = 'MS' then 'metais'
when t1.codigo_setor_atividade = '01' then 'revestimento'
when t1.codigo_setor_atividade = 'HY' then 'hydra' end) as negocio,
round(sum(t1.valor_receita_liquida), 2) as receita_liquida_vendas
from large.tb_resultado_comercial t1
where t1.codigo_setor_atividade in("CS", "MS", "HY", "01")
and t1.status_ordem_venda in("EXPORTAÇÃO", "VENDA", "DEVOLUÇÃO", "CANCELAMENTO", "CINI")
and t1.data_competencia between '2022-01-01' and (current_date() - 1)
group by 1, 2, 3
order by 3, 2, 1 asc
""") |
Fazendo join entre o dataframe de datas e o de vendas, com o objetivo de agregar todos os dias do range na base, mesmo que sem registro de vendas:
Code Block | ||
---|---|---|
| ||
df_vendas_ajustado = df_datas.join(df_vendas, ["data", "mes", "negocio"], "left") |
Definindo um particionamento no dataframe df_vendas_ajustado, com o objetivo de posteriormente fazer uma soma acumuladas das vendas agrupada por negócio e mês:
Code Block | ||
---|---|---|
| ||
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0)) |
Criando campo com soma acumulada, utilizando a partição criada anteriormente:
Code Block | ||
---|---|---|
| ||
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part)) |
Criando campo com soma acumulada, utilizando a partição criada anteriormente:
Code Block | ||
---|---|---|
| ||
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part)) |
Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
Code Block | ||
---|---|---|
| ||
df_vendas_acumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_vendas_diarias_deca") |
Então, é feita uma query, já aplicando algumas regras de negócio, na large do Redshift, com o objetivo de obter as metas:
Code Block | ||
---|---|---|
| ||
query = "select data_competencia as mes, (case when codigo_setor_atividade = 'CS' then 'loucas' when codigo_setor_atividade = 'MS' then 'metais' when codigo_setor_atividade = '01' then 'revestimento' when codigo_setor_atividade = 'HY' then 'hydra' end) as negocio, tipo_meta, sum(valor_receita_liquida) as valor_meta from large.tb_metas_comercial_hierarquia_produto where data_competencia >= '2021-01-01' and codigo_setor_atividade in('CS', 'HY', 'MS', '01') and tipo_meta in('POA', 'PEV') group by 1, 2, 3 order by 1, 2, 3"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
consulta_metas = redshift_to_dataframe(query = query, filename = "tb_metas_comercial_hierarquia_produto", bucket_name = bucket_name) |
Renomeando colunas:
Code Block | ||
---|---|---|
| ||
colunas_metas = ["mes", "negocio", "tipo_meta", "valor_meta_mes"]
df_metas = consulta_metas.toDF(*colunas_metas) |
Próximo passo é definir uma função para cálculo de quantidade de dias dentro de cada mês, com o objetivo de posteriormente calcular a meta diárias através da mensal:
Code Block | ||
---|---|---|
| ||
def numero_dias(data):
mes = data.month
ano = data.year
#tratativa caso fevereiro (ano bissexto)
cons = 0
#todo ano divisível por 400 e ao mesmo tempo 100 é bissexto
if ano % 400 == 0 and ano % 100 == 0:
cons = 1
#todo ano divisível por 4 é bissexto
elif ano % 4 == 0:
cons = 1
else:
cons = 0
if mes == 2:
return 28 + cons
#meses com quantidade ímpar de dias
impares = [1, 3, 5, 7, 8, 10, 12]
if mes in impares:
return 31
return 30 |
Aplicando a função e calculando a meta diária:
Code Block | ||
---|---|---|
| ||
numero_dias_udf = udf(lambda z: numero_dias(z), IntegerType())
df_metas = df_metas.withColumn("valor_meta_diaria", (col("valor_meta_mes") / col("numero_dias_mes"))) |
Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
Code Block | ||
---|---|---|
| ||
df_metas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_metas_mensais_deca") |
Criando colunas para cada tipo de meta, PEV e POA:
Code Block | ||
---|---|---|
| ||
df_metas_pev = df_metas.where(col("tipo_meta") == "PEV").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_pev").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_pev", functions.round("valor_meta_diaria_pev", 2))
df_metas_poa = df_metas.where(col("tipo_meta") == "POA").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_poa").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_poa", functions.round("valor_meta_diaria_poa", 2)) |
Criando dataframe unindo as duas metas:
Code Block | ||
---|---|---|
| ||
df_metas_final = df_metas_pev.join(df_metas_poa, ["mes", "negocio"]) |
Fazendo join entre a base de datas e a de metas:
Code Block | ||
---|---|---|
| ||
df_datas_ajuste = spark.sql("select * from indicadores_mercado.tb_datas")
df_metas_ajustado = df_datas_ajuste.join(df_metas_final, ["mes", "negocio"], "left") |
Por fim, criando a tabela final. Fazendo query na database para vendas e join com o dataframe de metas:
Code Block | ||
---|---|---|
| ||
df_vendas_metas_deca = df_vendas.join(df_metas_ajustado, ["data", "mes", "negocio"]) |
Criando particionamento, para posteriormente calcular as metas acumuladas por mês e negócio:
Code Block | ||
---|---|---|
| ||
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0)) |
Criando as metas acumuladas utilizando o particionamento:
Code Block | ||
---|---|---|
| ||
df_vendas_metas_deca_acumuladas = df_vendas_metas_deca.withColumn("metas_acumuladas_pev", functions.sum("valor_meta_diaria_pev").over(part)).withColumn("metas_acumuladas_poa", functions.sum("valor_meta_diaria_poa").over(part)) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_vendas_metas_indicadoresdeca_financeirosacumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_financeirosvendas_{}".format(ano_atual)deca") |
...
19.3 Base Final:
col_name | data_type |
---|---|
data | date |
mes | date |
negocio | string |
receita_liquida_vendas | double |
receita_liquida_vendas_acumuladas | double |
valor_meta_diaria_ |
decimal(29,2)
valor_devolucao
pev | double |
valor_meta_diaria_poa | double |
metas_acumuladas_pev | double |
metas_acumuladas_poa | double |