Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Conjunto de notebooks que têm tem como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para serem consumidos pelo dashboard desenvolvido para o projeto.

Índice

  1. Indicadores Carteira Deca

  2. Indicadores Devolução Deca

  3. Indicadores Devolução Madeira

  4. Indicadores Financeiros

  5. Indicadores Gente

  6. Indicadores Margem Madeira

  7. Indicadores OEE Madeira

  8. Indicadores OTIF Deca

  9. Indicadores OTIF Madeira

  10. Indicadores Panorama Mercado Madeira

  11. Indicadores Produtividade Deca

  12. Indicadores SAC

  13. Indicadores Sell-in Deca - Chuveiros

  14. Indicadores Sell-in Deca - Cubas

  15. Indicadores Sell-in Deca - Torneiras

  16. Indicadores Market Share, Sell-in e Sell-out Madeira

  17. Indicadores Vendas Deca

1. Indicadores Carteira Deca

Indicadores para pedidos em carteira Deca.

1.1 Origem:

...

  • Notebook: indicadores_carteira_deca

  • Job: inteligencia-mercado_job_prod_indicadores_carteira_deca

  • Schedule: diário/18:00h

  • Base fim: indicadores_mercado.tb_indicadores_carteira_deca

1.1 Origem:

Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema smalle tabela tb_ordem_pendente.

...

Indicadores para pedidos em status de devolução para Deca.

  • Notebook: indicadores_devolucao_deca

  • Job: inteligencia-mercado_job_prod_indicadores_devolucao_deca

  • Schedule: diário/14:00h

  • Base fim: indicadores_mercado.tb_indicadores_devolucoes_deca

2.1 Origem:

Database large e tabela tb_resultado_comercial.

...

Indicadores para pedidos em status de devolução para Madeira.

3.1 Origem:

Database analytics_prd e tabela custos_rem.

3.2 Transformação:

...

  • Notebook: indicadores_devolucao_madeira

  • Job: inteligencia-mercado_job_prod_indicadores_devolucao_madeira

  • Schedule: diário/12:00h

  • Base fim: indicadores_mercado.tb_indicadores_devolucoes_madeira

3.1 Origem:

Database analytics_prd e tabela custos_rem.

3.2 Transformação:

Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:

...

Indicadores financeiros disponibilizados pela Controladoria.

  • Notebook: indicadores_financeiros

  • Job: inteligencia-mercado_job_prod_indicadores_financeiros

  • Schedule: diário/15:00h

  • Base fim: indicadores_mercado.tb_indicadores_financeiros_2022

4.1 Origem:

A Controladoria atualiza mensalmente alguns arquivos Excel disponibilizados em no Sharepoint Relatório Controladoria.

...

Code Block
languagepy
df_poa = df_poa.select(
    "mes",
    "negocio",
    col("poa_receita_liquida_vendas").cast(DoubleType()),
    col("poa_ebitda_recorrente").cast(DoubleType()),
    col("poa_eva_recorrente").cast(DoubleType()),
    col("poa_fluxo_caixa_livre_total").cast(DoubleType())
).withColumn("poa_eva_recorrente", col("poa_eva_recorrente")/1000).withColumn("poa_fluxo_caixa_livre_total", col("poa_fluxo_caixa_livre_total")/1000)

4.2.

...

6 Base Final

Todos os dataframes gerados nos passos anteriores agora são unidos para formar um único, utilizando os campos mes e negocio:

Code Block
languagepy
campos_join = ["mes", "negocio"]
df_indicadores_financeiros = df_dre.join(
    df_eva, campos_join
).join(
    df_fc, campos_join
).join(
    df_pmp, campos_join
).join(
    df_frc, campos_join
).join(
    df_frc_eva, campos_join
).join(
    df_frc_fc, campos_join
).join(
    df_poa, campos_join
).orderBy("negocio", "mes")

...

, campos_join
).orderBy("negocio", "mes")

Sobrescrevendo a tabela final no database :

Code Block
languagepy
df_indicadores_financeiros.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_financeiros_{}".format(ano_atual))

4.3 Base Final:

col_name

data_type

mes

date

negocio

string

receita_liquida_vendas

double

ebitda_recorrente

double

eva_recorrente

double

fluxo_caixa_livre_total

double

pmp

double

frc_receita_liquida_vendas

double

frc_ebitda_recorrente

double

frc_eva_recorrente

double

frc_fluxo_caixa_livre_total

double

poa_receita_liquida_vendas

double

poa_ebitda_recorrente

double

poa_eva_recorrente

double

poa_fluxo_caixa_livre_total

double

5. Indicadores Gente

Indicadores de Recursos Humanos.

  • Notebook: indicadores_gente

  • Job: inteligencia-mercado_job_prod_indicadores_gente

  • Schedule: sextas-feiras, 14:00h

  • Base fim: indicadores_mercado.tb_indicadores_gente

5.1 Origem:

O arquivo indicadores_gente.csv é disponibilizado e atualizado mensalmente no Sharepoint.

5.2 Transformação:

Definindo função para o download do arquivo para o dbfs:

Code Block
languagepy
def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Lendo o arquivo:

Code Block
languagepy
df_gente = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/indicadores_gente.csv")

Aplicando regra de negócio para o campo taxa_afastamento:

Code Block
languagepy
df_gente = df_gente.withColumn("taxa_afastamento", col("taxa_absenteismo_com_afastamento") - col("taxa_absenteismo_sem_afastamento"))

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_gente.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_gente")

5.3 Base Final:

col_name

data_type

mes

date

negocio

string

quantidade_cargos_lideranca_mulheres

int

quantidade_cargos_lideranca_total

int

taxa_desligamentos_voluntarios

double

taxa_desligamentos_involuntarios

double

taxa_desligamentos_total

double

taxa_absenteismo_sem_afastamento

double

taxa_absenteismo_com_afastamento

double

taxa_afastamento

double

7. Indicadores Margem Madeira

Indicadores de Margem para Madeira.

  • Notebook: indicadores_margem_madeira

  • Job: inteligencia-mercado_job_prod_indicadores_margem_madeira

  • Schedule: diário/14:22h

  • Base fim: indicadores_mercado.tb_indicadores_margem_madeira

7.1 Origem:

É feita uma consulta na fonte de dados vw_ren_rateio_aj_SQL, no site Madeira do Tableau Server:

7.2 Transformação:

São utilizadas as bibliotecas tableauserverclient e tableauhyperapi do Python:

Code Block
languagepy
import tableauserverclient as tsc
import tableauhyperapi
from tableauhyperapi import HyperProcess, Telemetry, Connection, TableName, escape_name, escape_string_literal

Definindo variáveis para conexão no Tableau Server:

Code Block
languagepy
usuario = "***"
senha = "***"
site = "Madeira"
servidor = "https://analytics.duratex.com.br"
id_datasource = "e88f21c1-bb2f-4516-b612-b62e8af74b94"
nome_database = "vw_ren_rateio_aj_SQL"
diretorio_download = "/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/"

Definindo parâmetros da conexão com o Tableau Server:

Code Block
languagepy
autenticador = tsc.TableauAuth(usuario, senha, site)
servidor = tsc.Server(servidor)

Fazendo o download do arquivo .tdsx:

Code Block
languagepy
with servidor.auth.sign_in(autenticador):
    caminho_arquivo = servidor.datasources.download(id_datasource, filepath = diretorio_download, include_extract = True)

O arquivo de extensão .tdsx nada mais é que uma compactação dos arquivos de fonte de dados do Tableau. Portanto foi utilizada a biblioteca zipfile do Python para fazer essa descompactação em diretório do dbfs:

Code Block
languagepy
with zipfile.ZipFile(diretorio_download + nome_database + ".tdsx", "r") as arquivo_zipado:
    arquivo_zipado.extractall(diretorio_download)

Após a descompactação, é utilizada a biblioteca tableauhyperapi para extrair os dados do arquivo .hyper, que foi descompactado no diretório /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Data/Extracts/ no processo anterior. É feita uma query para trazer apenas os campos necessários:

Code Block
with HyperProcess(telemetry = Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
    with Connection(endpoint = hyper.endpoint, database = diretorio_download + "Data/Extracts/" + nome_database + ".hyper") as conexao:
        with conexao.execute_query(query = f"select {escape_name('cd_competencia')}, {escape_name('dc_linha_produto')}, {escape_name('gerencia')}, {escape_name('vl_receita_aj')}, {escape_name('md_m3_quantidade')}, {escape_name('vl_custo_industrial_total_CIT')}, {escape_name('vl_custo_comercial_total_CCO')} from {TableName('Extract', 'Extract')} where {escape_name('cd_setor_atividade')} = {escape_string_literal('CH')} and {escape_name('cd_competencia')} >= 202101 and {escape_name('gerencia')} = {escape_string_literal('MERCADO EXTERNO')}") as resultado:
            resultado_consulta = list(resultado)

Definindo schema para o dataframe que será criado a partir da lista resultado_consulta:

Code Block
languagepy
esquema_margem = StructType([
    StructField('mes', StringType(), True),
    StructField('produto', StringType(), True),
    StructField('mercado', StringType(), True),
    StructField('receita', DoubleType(), True),
    StructField('quantidade', DoubleType(), True),
    StructField('valor_cit', DoubleType(), True),
    StructField('valor_cco', DoubleType(), True)
])

Criando o dataframe:

Code Block
languagepy
df_margem = spark.createDataFrame(resultado_consulta, esquema_margem)

Utilizando o Spark SQL para aplicar algumas regras de negócio:

Code Block
df_margem_calculos = spark.sql("""
    select
        mes,
        (case when produto like 'MDF%' then 'mdf' when produto like 'MDP%' then 'mdp' else 'outros' end) as produto,
        (sum(receita) / sum(quantidade)) as receita_m3,
        (sum(valor_cit) / sum(quantidade)) as cit_m3,
        (sum(valor_cco) / sum(quantidade)) as cco_m3
       
    from margem_view
    
    group by 1, 2
    
    order by 1, 2 asc
""")

Aplicando mais algumas regras de negócio:

Code Block
df_margem_calculos = df_margem_calculos.withColumn("mop_m3", col("receita_m3") - col("cit_m3") - col("cco_m3")).withColumn("percentual_margem", col("mop_m3") / col("receita_m3"))

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_margem_calculos.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_margem_madeira")

7.3 Base Final:

col_name

data_type

mes

string

produto

string

receita_m3

double

cit_m3

double

cco_m3

double

mop_m3

double

9. Indicadores OEE Madeira

Indicadores de OEE para Madeira.

  • Notebook: indicadores_oee_madeira

  • Job: inteligencia-mercado_job_prod_indicadores_oee_madeira

  • Schedule: diário/12:00h

  • Base fim: indicadores_mercado.tb_indicadores_oee_madeira

9.1 Origem:

O arquivo indicadores_oee_madeira.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.

9.2 Transformação:

Definindo função para ler o arquivo:

Code Block
languagepy
def extrair_excel(arquivo_leitura, nome_sheet, celulas, colunas, pular_linhas):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_leitura)
    response.raise_for_status()
    df = pd.read_excel(response.content, skiprows = pular_linhas, usecols = celulas, sheet_name = nome_sheet, names = colunas)
    df = spark.createDataFrame(df)
    return df

Lendo a sheet cru do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas. Utilizando o while e a função isinstance() para repetir o processo até que o dataframe esteja criado:

Code Block
languagepy
df_oee_cru = []
while not isinstance(df_oee_cru, DataFrame):
    try:
        df_oee_cru = extrair_excel(arquivo, "cru", "B:D", ["mes", "oee", "oee_meta_global"], 0)
    except:
        pass

Definindo o nome do produto como constante:

Code Block
languagepy
df_oee_cru = df_oee_cru.withColumn("produto", lit("cru"))

Lendo a sheet revestido do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:

Code Block
languagepy
df_oee_revestido = []
while not isinstance(df_oee_revestido, DataFrame):
    try:
        df_oee_revestido = extrair_excel(arquivo, "revestido", "B:D", ["mes", "oee", "oee_meta_global"], 0)
    except:
        pass

Definindo o nome do produto como constante:

Code Block
languagepy
df_oee_revestido = df_oee_revestido.withColumn("produto", lit("revestido"))

Lendo a sheet historico do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:

Code Block
languagepy
df_oee_historico = []
while not isinstance(df_oee_historico, DataFrame):
    try:
        df_oee_historico = extrair_excel(arquivo, "historico", "B:E", ["mes", "oee", "oee_meta_global", "produto"], 0)
    except:
        pass

Fazendo a união de todos os dataframes criados:

Code Block
languagepy
df_oee_madeira = df_oee_historico.union(df_oee_cru).union(df_oee_revestido)

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_oee_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_oee_madeira")

9.3 Base Final:

col_name

data_type

mes

date

oee

double

oee_meta_global

double

produto

string

10. Indicadores OTIF Deca

Indicadores de OTIF para Deca.

  • Notebook: indicadores_otif_deca

  • Job: inteligencia-mercado_job_prod_indicadores_otif_deca

  • Schedule: diário/12:00h

  • Base fim: indicadores_mercado.tb_indicadores_otif_deca

10.1 Origem:

O arquivo dados_otif_deca.csv é disponibilizado e atualizado mensalmente no Sharepoint.

10.2 Transformação:

Definindo função para fazer o download do arquivo em diretório do dbfs:

Code Block
languagepy
def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Lendo arquivo:

Code Block
languagepy
df_otif_deca = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_deca.csv")

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_otif_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_deca")

10.3 Base Final:

col_name

data_type

mes

date

segmento_deca

string

otif_deca

double

meta_otif_deca

double

11. Indicadores OTIF Madeira

Indicadores de OTIF para Madeira.

  • Notebook: indicadores_otif_madeira

  • Job: inteligencia-mercado_job_prod_indicadores_otif_madeira

  • Schedule: diário/12:00h

  • Base fim: indicadores_mercado.tb_indicadores_otif_madeira

11.1 Origem:

O arquivo dados_otif_madeira.csv é disponibilizado e atualizado mensalmente no Sharepoint.

11.2 Transformação:

Definindo função para fazer o download do arquivo em diretório do dbfs:

Code Block
languagepy
def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Lendo arquivo:

Code Block
languagepy
df_otif_madeira = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_madeira.csv")

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_otif_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_madeira")

11.3 Base Final:

col_name

data_type

mes

date

segmento_madeira

string

otif_madeira

double

meta_otif_madeira

double

12. Indicadores Panorama Mercado Madeira

Indicadores de Panorama de Mercado para Madeira.

  • Notebook: indicadores_panorama_mercado_madeira

  • Job: inteligencia-mercado_job_prod_indicadores_panorama_mercado_madeira

  • Schedule: diário/13:00h

  • Base fim: indicadores_mercado.tb_indicadores_panorama_mercado_madeira

12.1 Origem:

Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e tabela vw_mercado_panorama_v2.

12.2 Transformação:

É feita uma query da tabela já aplicando algumas regras de negócio:

Code Block
languagepy
query = "select competencia, mercado, case when produto_detalhe in('MDF Fino Cru', 'MDF Grosso Cru') then 'MDF Cru' when produto_detalhe in('MDF Fino BP', 'MDF Grosso BP') then 'MDF Revestido' when produto_detalhe = 'MDP Cru' then 'MDP Cru' when produto_detalhe = 'MDP BP' then 'MDP Revestido' end as produto, sum(volume_m3) as volume_m3_total from madeira.vw_mercado_panorama_v2 where competencia >= 202101 and mercado = 'MI' group by 1, 2, 3 order by 1"

Renomeando campos:

Code Block
languagepy
colunas_panorama_mercado = ["mes", "mercado", "produto", "volume_m3_total"]
df_panorama_mercado = panorama_mercado.toDF(*colunas_panorama_mercado)

Ajustando campo mes:

Code Block
languagepy
df_panorama_mercado = df_panorama_mercado.withColumn("mes", col("mes").cast(StringType())).withColumn("mes", concat(substring(col("mes"), 1, 4), lit("-"), substring(col("mes"), 5, 2), lit("-"), lit("01"))).withColumn("mes", col("mes").cast(DateType()))

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_panorama_mercado.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_panorama_mercado_madeira")

12.3 Base Final:

col_name

data_type

mes

date

mercado

string

produto

string

volume_m3_total

double

13. Indicadores Produtividade Deca

Indicadores de Produtividade para Deca.

  • Notebook: indicadores_produtividade_deca

  • Job: inteligencia-mercado_job_prod_indicadores_produtividade_deca

  • Schedule: diário/12:00h

  • Base fim: indicadores_mercado.tb_indicadores_minuto_homem_deca

13.1 Origem:

O arquivo Produtividade DECA&RC.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.

13.2 Transformação:

Definindo função para fazer o download do arquivo para diretório do dbfs:

Code Block
languagepy
def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Fazendo o download do arquivo:

Code Block
languagepy
download_arquivo("/personal/henrique_fantin_duratex_com_br/Documents/Min_HEq/Produtividade%20DECA%26RC.xlsx", "Produtividade_DECA_RC.xlsx")

Lendo o arquivo:

Code Block
languagepy
produtividade_deca = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Produtividade_DECA_RC.xlsx")

Definindo função para ler os dados dos diferentes segmentos Deca. Passando os parâmetros de negócio, range de células, quantidade de linhas desconsideradas e quantidade de linhas consideradas, a função gera dois dataframes, um para produtividade e outro para POA. Isso é feito através de filtro na coluna: quando é diferente de POA, então é produtividade.

Code Block
languagepy
def ler_produtividade(negocio, celulas, pular_linhas, numero_linhas):
    #lendo arquivo excel
    df = pd.read_excel(produtividade_deca, skiprows = pular_linhas, nrows = numero_linhas, usecols = celulas, index_col = 0).transpose().reset_index()
    #definindo dataframe para o poa
    df_poa = df
    #definindo dataframe para produtividade, coluna 1 é diferente de "POA"
    df = df.where(df.iloc[:, 1] != "POA")
    df = df.iloc[:, [0, 1, numero_linhas]]
    df.columns = ["mes", "ano", "produtividade"]
    df = df.dropna(axis = 0)
    df = spark.createDataFrame(df)
    df = df.withColumn("ano", col("ano").cast(IntegerType()).cast(StringType()))
    #ajustandando campo mes para produtividade
    df = df.withColumn("mes", when(col("mes").like("JAN%"), concat(lit("01/01/"), col("ano")))
                           .when(col("mes").like("FEV%"), concat(lit("01/02/"), col("ano")))
                           .when(col("mes").like("MAR%"), concat(lit("01/03/"), col("ano")))
                           .when(col("mes").like("ABR%"), concat(lit("01/04/"), col("ano")))
                           .when(col("mes").like("MAI%"), concat(lit("01/05/"), col("ano")))
                           .when(col("mes").like("JUN%"), concat(lit("01/06/"), col("ano")))
                           .when(col("mes").like("JUL%"), concat(lit("01/07/"), col("ano")))
                           .when(col("mes").like("AGO%"), concat(lit("01/08/"), col("ano")))
                           .when(col("mes").like("SET%"), concat(lit("01/09/"), col("ano")))
                           .when(col("mes").like("OUT%"), concat(lit("01/10/"), col("ano")))
                           .when(col("mes").like("NOV%"), concat(lit("01/11/"), col("ano")))
                           .when(col("mes").like("DEZ%"), concat(lit("01/12/"), col("ano")))
                           .otherwise(None))
    df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
    #trabalhando com dataframe para poa, quando coluna 1 é igual a "POA"
    df_poa = df_poa.where(df_poa.iloc[:, 1] == "POA")
    df_poa = df_poa.iloc[:, [0, numero_linhas]]
    df_poa .columns = ["mes", "poa_produtividade"]
    df_poa = df_poa.dropna(axis = 0)
    df_poa = spark.createDataFrame(df_poa)
    #ajustando campo mes
    df_poa = df_poa.withColumn("mes", when(col("mes").like("JAN%"), "01/01/" + ano_atual)
                           .when(col("mes").like("FEV%"), "01/02/" + ano_atual)
                           .when(col("mes").like("MAR%"), "01/03/" + ano_atual)
                           .when(col("mes").like("ABR%"), "01/04/" + ano_atual)
                           .when(col("mes").like("MAI%"), "01/05/" + ano_atual)
                           .when(col("mes").like("JUN%"), "01/06/" + ano_atual)
                           .when(col("mes").like("JUL%"), "01/07/" + ano_atual)
                           .when(col("mes").like("AGO%"), "01/08/" + ano_atual)
                           .when(col("mes").like("SET%"), "01/09/" + ano_atual)
                           .when(col("mes").like("OUT%"), "01/10/" + ano_atual)
                           .when(col("mes").like("NOV%"), "01/11/" + ano_atual)
                           .when(col("mes").like("DEZ%"), "01/12/" + ano_atual)
                           .otherwise(None))
    df_poa = df_poa.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
    #definindo dataframe final, fazendo um join com os anteriores
    df_final = df_poa.join(df, df_poa.mes == df.mes, "left").drop(df.mes)
    df_final = df_final.withColumn("segmento", lit(negocio))
    df_final = df_final.select("mes", "segmento", "poa_produtividade", "produtividade")
    df_final = df_final.withColumn("poa_produtividade", round(col("poa_produtividade"), 2)).withColumn("produtividade", round(col("produtividade"), 2))
    return df_final

Aplicando função para metais, já definindo os parâmetros de range de células, quantidade de linhas ignoradas e quantidade de linhas consideradas:

Code Block
languagepy
df_metais = ler_produtividade("metais", "FA:FZ", 21, 5)

Aplicando função para loucas:

Code Block
languagepy
df_loucas = ler_produtividade("loucas", "FA:FZ", 54, 8)

Aplicando função para Hydra:

Code Block
languagepy
df_hydra = ler_produtividade("hydra", "FA:FZ", 93, 4)

Aplicando função para revestimento:

Code Block
languagepy
df_revestimento = ler_produtividade("revestimento", "FA:FZ", 145, 7)

Fazendo a união de todos os dataframes criados:

Code Block
languagepy
df_produtividade_deca = df_metais.union(df_loucas).union(df_hydra).union(df_revestimento)

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_produtividade_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_minuto_homem_deca")

13.3 Base Final:

col_name

data_type

mes

date

segmento

string

poa_produtividade

double

produtividade

double

14. Indicadores SAC

Indicadores de SAC.

  • Notebook: indicadores_sac

  • Job: inteligencia-mercado_job_prod_indicadores_sac

  • Schedule: diário/12:00h

  • Base fim: indicadores_mercado.tb_indicadores_sac

14.1 Origem:

O arquivo dados_sac.csv é disponibilizado e atualizado mensalmente no Sharepoint.

14.2 Transformação:

Definindo função para fazer o download do arquivo em diretório do dbfs:

Code Block
languagepy
def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Lendo arquivo:

Code Block
languagepy
df_sac = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_sac.csv")

Aplicando regra de negócio para o campo saldo:

Code Block
languagepy
df_sac = df_sac.withColumn("saldo", col("casos_abertos") - col("casos_fechados"))

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_sac.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sac")

14.3 Base Final:

col_name

data_type

mes

date

visao

string

negocio

string

casos_abertos

int

casos_fechados

int

saldo

int

15. Indicadores Sell-in Deca - Chuveiros

Indicadores de Sell-in para Deca.

  • Notebook: indicadores_sell_in_deca_chuveiros

  • Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_chuveiros

15.1 Origem:

O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx.

15.2 Transformação:

Lendo sheet do arquivo:

Code Block
languagepy
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")

Renomeando colunas:

Code Block
languagepy
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "tipo", "sku", "meses", "desviador", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"]

Tratando os valores missing:

Code Block
languagepy
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)

Transformando em dataframe do Spark:

Code Block
languagepy
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_chuveiros")

15.3 Base Final:

col_name

data_type

marca

string

canal_abastecimento

string

regiao

string

macro_categoria

string

material

string

tipo

string

sku

string

meses

timestamp

desviador

string

linha

string

tensao

string

potencia

string

modelo

string

ean

string

valor

double

volume

double

16. Indicadores Sell-in Deca - Cubas

Indicadores de Sell-in para Deca.

  • Notebook: indicadores_sell_in_deca_cubas

  • Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_cubas

16.1 Origem:

O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx.

16.2 Transformação:

Lendo sheet do arquivo:

Code Block
languagepy
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")

Renomeando colunas:

Code Block
languagepy
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "aplicacao", "instalacao", "sku", "meses", "formato", "cor", "acabamento", "ean", "valor", "volume"]

Tratando os valores missing:

Code Block
languagepy
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)

Transformando em dataframe do Spark:

Code Block
languagepy
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_cubas")

16.3 Base Final:

col_name

data_type

marca

string

canal_abastecimento

string

regiao

string

macro_categoria

string

material

string

aplicacao

string

instalacao

string

sku

string

meses

timestamp

formato

string

cor

string

acabamento

string

ean

string

valor

double

volume

double

17. Indicadores Sell-in Deca - Torneiras

Indicadores de Sell-in para Deca.

  • Notebook: indicadores_sell_in_deca_torneiras

  • Base fim: indicadores_mercado.tb_indicadores_sell_in_deca_torneiras

17.1 Origem:

O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx.

17.2 Transformação:

Lendo sheet do arquivo:

Code Block
languagepy
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")

Renomeando colunas:

Code Block
languagepy
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "subcategoria", "aplicacao", "instalacao", "sku", "meses", "bica", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"]

Tratando os valores missing:

Code Block
languagepy
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)

Transformando em dataframe do Spark:

Code Block
languagepy
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_torneiras")

17.3 Base Final:

col_name

data_type

marca

string

canal_abastecimento

string

regiao

string

macro_categoria

string

material

string

subcategoria

string

aplicacao

string

instalacao

string

sku

string

meses

timestamp

bica

string

linha

string

tensao

string

potencia

string

modelo

string

ean

string

valor

double

volume

double

18. Indicadores Market Share, Sell-in e Sell-out Madeira

Indicadores para Market Share, Sell-in e Sell-out de Madeira.

  • Notebook: indicadores_share_sell_in_sell_out_madeira

  • Job: inteligencia-mercado_job_prod_indicadores_share_sell_in_sell_out_madeira

  • Schedule: diário/12:00h

  • Base fim: indicadores_mercado.tb_indicadores_share_sell_in_sell_out_madeira

18.1 Origem:

Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e view vw_base_mktshare_sellin_sellout.

18.2 Transformação:

Fazendo uma query com um select de todos os campos da tabela origem:

Code Block
languagepy
query = "select * from madeira.vw_base_mktshare_sellin_sellout"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
consulta_madeira = redshift_to_dataframe(query = query, filename = "vw_base_mktshare_sellin_sellout", bucket_name = bucket_name)

Renomeando as colunas:

Code Block
languagepy
olunas_madeira = ["tipo", "ano", "competencia", "mercado", "produto", "produto_detalhe", "volume_m3", "volume_m2", "volume_m3_cap_dtx", "volume_m3_cap_mercex", "data_atualizacao_base", "produto_segmento", "quantidade_volume_m3_liquido", "quantidade_volume_m2"]
df_madeira = consulta_madeira.toDF(*colunas_madeira)

Ajustando tipos dos campos:

Code Block
languagepy
df_madeira = df_madeira.withColumn("ano", col("ano").cast(IntegerType())).withColumn("competencia", col("competencia").cast(IntegerType())).withColumn("volume_m3", col("volume_m3").cast(DoubleType())).withColumn("volume_m2", col("volume_m2").cast(DoubleType())).withColumn("volume_m3_cap_dtx", col("volume_m3_cap_dtx").cast(DoubleType())).withColumn("volume_m3_cap_mercex", col("volume_m3_cap_mercex").cast(DoubleType()))

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_share_sell_in_sell_out_madeira")

18.3 Base Final:

col_name

data_type

tipo

string

ano

int

competencia

int

mercado

string

produto

string

produto_detalhe

string

volume_m3

double

volume_m2

double

volume_m3_cap_dtx

double

volume_m3_cap_mercex

double

data_atualizacao_base

date

produto_segmento

string

quantidade_volume_m3_liquido

double

quantidade_volume_m2

double

19. Indicadores Vendas Deca

Indicadores para Vendas de Deca.

  • Notebook: indicadores_venda_deca

  • Job: inteligencia-mercado_job_prod_indicadores_venda_deca

  • Schedule: diário/10:00h

  • Base fim: indicadores_mercado.tb_indicadores_vendas_deca

19.1 Origem:

Consulta da tabela tb_resultado_comercial da database large e consulta database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema large e tabela tb_metas_comercial_hierarquia_produto.

19.2 Transformação:

Definindo função para criação de dataframe com range de datas, desde 01/01/2022 até hoje.

Code Block
languagepy
for negocio in negocios:
    df = pd.date_range(start = "2022-01-01", end = date.today())
    df = pd.DataFrame(df, columns = ["data"])
    df = spark.createDataFrame(df)
    df = df.withColumn("data", to_date("data", 'yyyy-MM-dd')).withColumn("mes", to_date(date_format("data", 'yyyy-MM-01')))
    df = df.withColumn("negocio", lit(negocio))
    if negocio == negocios[0]:
        df_datas = df
    else:
        df_datas = df_datas.union(df)

Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:

Code Block
languagepy
df_datas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_datas")

Fazendo consulta para vendas na large, já aplicando algumas regras de negócio:

Code Block
languagepy
df_vendas = spark.sql("""
    select
        t1.data_competencia as data,
        to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes,
        (case when t1.codigo_setor_atividade = 'CS' then 'loucas'
            when t1.codigo_setor_atividade = 'MS' then 'metais'
            when t1.codigo_setor_atividade = '01' then 'revestimento'
            when t1.codigo_setor_atividade = 'HY' then 'hydra' end) as negocio,
         round(sum(t1.valor_receita_liquida), 2) as receita_liquida_vendas
    from large.tb_resultado_comercial t1
    where t1.codigo_setor_atividade in("CS", "MS", "HY", "01")
        and t1.status_ordem_venda in("EXPORTAÇÃO", "VENDA", "DEVOLUÇÃO", "CANCELAMENTO", "CINI")
        and t1.data_competencia between '2022-01-01' and (current_date() - 1)
    group by 1, 2, 3
    order by 3, 2, 1 asc
""")

Fazendo join entre o dataframe de datas e o de vendas, com o objetivo de agregar todos os dias do range na base, mesmo que sem registro de vendas:

Code Block
languagepy
df_vendas_ajustado = df_datas.join(df_vendas, ["data", "mes", "negocio"], "left")

Definindo um particionamento no dataframe df_vendas_ajustado, com o objetivo de posteriormente fazer uma soma acumuladas das vendas agrupada por negócio e mês:

Code Block
languagepy
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0))

Criando campo com soma acumulada, utilizando a partição criada anteriormente:

Code Block
languagepy
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part))

Criando campo com soma acumulada, utilizando a partição criada anteriormente:

Code Block
languagepy
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part))

Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:

Code Block
languagepy
df_vendas_acumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_vendas_diarias_deca")

Então, é feita uma query, já aplicando algumas regras de negócio, na large do Redshift, com o objetivo de obter as metas:

Code Block
languagepy
query = "select data_competencia as mes, (case when codigo_setor_atividade = 'CS' then 'loucas' when codigo_setor_atividade = 'MS' then 'metais' when codigo_setor_atividade = '01' then 'revestimento' when codigo_setor_atividade = 'HY' then 'hydra' end) as negocio, tipo_meta, sum(valor_receita_liquida) as valor_meta from large.tb_metas_comercial_hierarquia_produto where data_competencia >= '2021-01-01' and codigo_setor_atividade in('CS', 'HY', 'MS', '01') and tipo_meta in('POA', 'PEV') group by 1, 2, 3 order by 1, 2, 3"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
consulta_metas = redshift_to_dataframe(query = query, filename = "tb_metas_comercial_hierarquia_produto", bucket_name = bucket_name)

Renomeando colunas:

Code Block
languagepy
colunas_metas = ["mes", "negocio", "tipo_meta", "valor_meta_mes"]
df_metas = consulta_metas.toDF(*colunas_metas)

Próximo passo é definir uma função para cálculo de quantidade de dias dentro de cada mês, com o objetivo de posteriormente calcular a meta diárias através da mensal:

Code Block
languagepy
def numero_dias(data):
    mes = data.month
    ano = data.year
    #tratativa caso fevereiro (ano bissexto)
    cons = 0
    #todo ano divisível por 400 e ao mesmo tempo 100 é bissexto
    if ano % 400 == 0 and ano % 100 == 0:
        cons = 1
    #todo ano divisível por 4 é bissexto
    elif ano % 4 == 0:
        cons = 1
    else:
        cons = 0
    if mes == 2:
        return 28 + cons
    #meses com quantidade ímpar de dias
    impares = [1, 3, 5, 7, 8, 10, 12]
    if mes in impares:
        return 31
    return 30

Aplicando a função e calculando a meta diária:

Code Block
languagepy
numero_dias_udf = udf(lambda z: numero_dias(z), IntegerType())
df_metas = df_metas.withColumn("valor_meta_diaria", (col("valor_meta_mes") / col("numero_dias_mes")))   

Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:

Code Block
languagepy
df_metas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_metas_mensais_deca")

Criando colunas para cada tipo de meta, PEV e POA:

Code Block
languagepy
df_metas_pev = df_metas.where(col("tipo_meta") == "PEV").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_pev").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_pev", functions.round("valor_meta_diaria_pev", 2))
df_metas_poa = df_metas.where(col("tipo_meta") == "POA").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_poa").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_poa", functions.round("valor_meta_diaria_poa", 2))

Criando dataframe unindo as duas metas:

Code Block
languagepy
df_metas_final = df_metas_pev.join(df_metas_poa, ["mes", "negocio"])

Fazendo join entre a base de datas e a de metas:

Code Block
languagepy
df_datas_ajuste = spark.sql("select * from indicadores_mercado.tb_datas")
df_metas_ajustado = df_datas_ajuste.join(df_metas_final, ["mes", "negocio"], "left")

Por fim, criando a tabela final. Fazendo query na database para vendas e join com o dataframe de metas:

Code Block
languagepy
df_vendas_metas_deca = df_vendas.join(df_metas_ajustado, ["data", "mes", "negocio"])

Criando particionamento, para posteriormente calcular as metas acumuladas por mês e negócio:

Code Block
languagepy
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0))

Criando as metas acumuladas utilizando o particionamento:

Code Block
languagepy
df_vendas_metas_deca_acumuladas = df_vendas_metas_deca.withColumn("metas_acumuladas_pev", functions.sum("valor_meta_diaria_pev").over(part)).withColumn("metas_acumuladas_poa", functions.sum("valor_meta_diaria_poa").over(part))

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_vendas_metas_indicadoresdeca_financeirosacumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_financeirosvendas_{}".format(ano_atual)deca")

...

19.3 Base Final:

col_name

data_type

data

date

mes

date

negocio

string

receita_liquida_vendas

double

receita_liquida_vendas_acumuladas

double

valor_meta_diaria_

faturamento

decimal(29,2)

valor_devolucao

decimal(29,2)

pev

double

valor_meta_diaria_poa

double

metas_acumuladas_pev

double

metas_acumuladas_poa

double