Skip to end of metadata
Go to start of metadata

You are viewing an old version of this content. View the current version.

Compare with Current Restore this Version View Version History

« Previous Version 7 Next »

Conjunto de notebooks que têm como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para serem consumidos pelo dashboard desenvolvido para o projeto.

1. Indicadores Carteira Deca

Indicadores para pedidos em carteira Deca.

1.1 Origem:

Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema small e tabela tb_ordem_pendente.

1.2 Transformação:

Fazendo uma query com um select de todos os campos da tabela origem:

query = "select * from small.tb_ordem_pendente"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
pedidos_carteira = redshift_to_dataframe(query = query, filename = "tb_ordem_pendente", bucket_name = bucket_name)

Renomeando as colunas:

colunas_pedidos_carteira = ["codigo_empresa", "numero_ordem_venda", "numero_sequencia_item_ordem_venda", "codigo_organizacao_vendas", "codigo_canal_distribuicao", "codigo_setor_atividade", "codigo_escritorio_vendas", "codigo_equipe_vendas", "codigo_emissor_ordem", "data_primeira_remessa", "data_emissao", "data_pedido_cliente", "motivo_recusa", "tipo_documento_ordem_venda", "quantidade_itens", "quantidade_faturada_ordem", "quantidade_pendente", "valor_liquido", "valor_faturado_ordem", "valor_pendente", "status_faturamento", "bloqueio_remessa_cliente", "status_verificacao_credito", "status_carteira", "codigo_produto", "descricao_produto", "data_atualizacao", "remessa", "data_desejada_remessa", "status_recusa", "status_item", "codigo_centro"]
df_pedidos_carteira = pedidos_carteira.toDF(*colunas_pedidos_carteira)

Utilizando Spark SQL para fazer alguns ajustes e aplicar algumas regras de negócio:

df_carteira = spark.sql("""
    select
        to_date(data_atualizacao) as data_referencia,
        (case when codigo_setor_atividade == 'HY' then 'hydra'
            when codigo_setor_atividade == 'MS' then 'metais'
            when codigo_setor_atividade == 'CS' then 'loucas' end) as negocio,
        sum(case when status_carteira in('Bloqueio Adm.', 'Credito', 'Limbo', 'Limbo Programado', 'Não classificado') then valor_pendente else 0 end) as valor_bloqueado,
        sum(case when status_carteira in('Programado', 'Remetido') then valor_pendente else 0 end) as valor_livre
   
    from pedidos_carteira
     
    where to_date(data_atualizacao) == current_date()
        and codigo_setor_atividade in('CS', 'HY', 'MS')

    group by 1, 2
""")

Fazendo append na tabela final na database indicadores_mercado:

df_carteira.write.mode("append").saveAsTable("indicadores_mercado.tb_indicadores_carteira_deca")

1.3 Base Final:

col_name

data_type

data_referencia

date

negocio

string

valor_bloqueado

double

valor_livre

double

2. Indicadores Devolução Deca

Indicadores para pedidos em status de devolução para Deca.

2.1 Origem:

Database large e tabela tb_resultado_comercial.

2.2 Transformação:

Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:

df_devolucoes_deca = spark.sql("""
    select distinct
        to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes,
        (case when codigo_setor_atividade == 'MS' then 'metais'
             when codigo_setor_atividade == 'CS' then 'loucas'
             when codigo_setor_atividade == 'HY' then 'hydra'
             when codigo_setor_atividade == '01' then 'revestimento' end) as negocio,
        sum(case when tipo_documento_venda == 'S2' then valor_receita_liquida else 0 end) as valor_estorno,
        sum(case when status_ordem_venda == 'DEVOLUÇÃO' then abs(valor_receita_liquida) else 0 end) as valor_devolucao,
        sum(case when status_ordem_venda == 'FATURAMENTO' then valor_receita_liquida else 0 end) as valor_faturamento
        
    from large.tb_resultado_comercial
    
    where data_competencia between '2019-01-01' and current_date() - 1
        and codigo_setor_atividade in('MS', 'CS', 'HY', '01')
    
    group by 1, 2
    
    order by 1, 2 asc
""")

Calculando o valor de devolução ajustado, conforme regra de negócio:

df_devolucoes_deca = df_devolucoes_deca.withColumn("valor_devolucao_ajustado", col("valor_devolucao") - col("valor_estorno"))

Sobrescrevendo a tabela final na database indicadores_mercado:

df_devolucoes_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_deca")

2.3 Base Final:

col_name

data_type

mes

date

negocio

string

valor_faturamento

double

valor_devolucao

double

3. Indicadores Devolução Madeira

Indicadores para pedidos em status de devolução para Madeira.

3.1 Origem:

Database analytics_prd e tabela custos_rem.

3.2 Transformação:

Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:

df_devolucao_madeira = spark.sql("""
    select
        perio as mes,
        (case when prctr like '%MDP' then 'mdp'
            when prctr like '%MDF' then 'mdf' else 'paineis' end) as negocio,
        sum(case when fkart in('ZREB', 'ZROB') then abs((vv089) - (vv001 + vv002 + vv003 + vv004)) else 0 end) as valor_devolucao,
        sum(case when fkart not in('ZREB', 'ZROB') then (vv089) - (vv001 + vv002 + vv003 + vv004) else 0 end) as valor_faturado
        
    from analytics_prd.custos_rem
    
    where spart == 'CH'
        and (prctr like '%MDP' or prctr like '%MDF')
    
    and perio >= 2019001
    
    group by 1, 2
    
    order by 1, 2
""")

Sobrescrevendo a tabela final na database indicadores_mercado:

df_devolucao_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_madeira")

3.3 Base Final:

col_name

data_type

mes

date

negocio

string

valor_faturamento

decimal(29,2)

valor_devolucao

decimal(29,2)

4. Indicadores Financeiros

Indicadores financeiros disponibilizados pela Controladoria.

4.1 Origem:

A Controladoria atualiza mensalmente alguns arquivos Excel disponibilizados em no Sharepoint Relatório Controladoria.

4.2 Transformação:

Foi desenvolvida uma função para fazer o download desses arquivos e gravar no diretório do dbfs:

def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_financeiros/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Fazendo o download dos arquivos:

download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fechamento%20Gerencial%202022.xlsm", "Fechamento_Gerencial_2022.xlsm")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fluxo%20de%20Caixa%20Oficial%202022.xlsx", "Fluxo_de_Caixa_Oficial_2022.xlsx")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20Consolidador.xlsx", "Forecast_3_9_2022.xlsx")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20-%20Fluxo%20de%20Caixa%20Livre.xlsx", "FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx")

4.2.1 Arquivo Fechamento_Gerencial_2022.xlsm

4.2.1.1 Indicadores do DRE

Definindo função para Indicadores do DRE:

#parâmetros da função: range de células e negócio("madeira", "deca", "revestimento" ou "consolidado")
def ler_dre(celulas, negocio):
    #lendo range de células do excel utilizando o pandas.read_excel(arquivo, sheet, linhas desconsideradas, número de linhas, range células, coluna index)
    #transpondo a tabela utilizando .transpose()
    #selecionando colunas utilizando .iloc[]
    #resetando index
    df = pd.read_excel(fechamento_gerencial, "DRE CONSOL", skiprows = 1, nrows = 146, usecols = celulas, index_col = 0).transpose().iloc[:, [6, 127]].reset_index()
    #renomeando colunas
    df.columns = dre_header
    #transformando em dataframe do spark
    df = spark.createDataFrame(df)
    #criando coluna "negocio"
    df = df.selectExpr(dre_header[0], "'{}' as negocio".format(negocio), dre_header[1], dre_header[2])
    #tratando coluna "mes"
    df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual)
                           .when(df.mes.like("FEV%"), "01/02/" + ano_atual)
                           .when(df.mes.like("MAR%"), "01/03/" + ano_atual)
                           .when(df.mes.like("ABR%"), "01/04/" + ano_atual)
                           .when(df.mes.like("MAI%"), "01/05/" + ano_atual)
                           .when(df.mes.like("JUN%"), "01/06/" + ano_atual)
                           .when(df.mes.like("JUL%"), "01/07/" + ano_atual)
                           .when(df.mes.like("AGO%"), "01/08/" + ano_atual)
                           .when(df.mes.like("SET%"), "01/09/" + ano_atual)
                           .when(df.mes.like("OUT%"), "01/10/" + ano_atual)
                           .when(df.mes.like("NOV%"), "01/11/" + ano_atual)
                           .when(df.mes.like("DEZ%"), "01/12/" + ano_atual)
                           .otherwise(None))
    df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
    return df

Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:

df_dre_madeira = ler_dre("EL:EX", "madeira")
df_dre_deca = ler_dre("B:N", "deca")
df_dre_revestimento = ler_dre("HN:HZ", "revestimento")
df_dre_consolidado = ler_dre("GT:HF", "consolidado")

Fazendo união de todos os dataframes gerados:

df_dre = df_dre_madeira.union(df_dre_deca).union(df_dre_revestimento).union(df_dre_consolidado)

4.2.1.2 Indicadores do EVA

Definindo função para Indicadores do EVA, lendo a sheet EVA do arquivo:

#indicadores de valor agregado:
def ler_eva(celulas, negocio):
    df = pd.read_excel(fechamento_gerencial, "EVA", skiprows = 2, nrows = 50, usecols = celulas, index_col = 0).transpose().iloc[1:, 18].reset_index()
    df.columns = eva_header
    df = spark.createDataFrame(df)
    df = df.selectExpr(eva_header[0], "'{}' as negocio".format(negocio), eva_header[1])
    df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual)
                           .when(df.mes.like("FEV%"), "01/02/" + ano_atual)
                           .when(df.mes.like("MAR%"), "01/03/" + ano_atual)
                           .when(df.mes.like("ABR%"), "01/04/" + ano_atual)
                           .when(df.mes.like("MAI%"), "01/05/" + ano_atual)
                           .when(df.mes.like("JUN%"), "01/06/" + ano_atual)
                           .when(df.mes.like("JUL%"), "01/07/" + ano_atual)
                           .when(df.mes.like("AGO%"), "01/08/" + ano_atual)
                           .when(df.mes.like("SET%"), "01/09/" + ano_atual)
                           .when(df.mes.like("OUT%"), "01/10/" + ano_atual)
                           .when(df.mes.like("NOV%"), "01/11/" + ano_atual)
                           .when(df.mes.like("DEZ%"), "01/12/" + ano_atual)
                           .otherwise(None))
    df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
    return df

Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:

df_eva_madeira = ler_eva("GG:GT", "madeira")
df_eva_deca = ler_eva("BA:BN", "deca")
df_eva_revestimento = ler_eva("EY:FL", "revestimento")
df_eva_consolidado = ler_eva("FP:GC", "consolidado")

Fazendo união de todos os dataframes gerados e ajustando a ordem de grandeza do campo eva_recorrente:

df_eva = df_eva_madeira.union(df_eva_deca).union(df_eva_revestimento).union(df_eva_consolidado).withColumn("eva_recorrente", col("eva_recorrente")/1000)

4.2.2 Arquivo Fluxo_de_Caixa_Oficial_2022.xlsx

4.2.2.1 Indicadores de Fluxo de Caixa

Definindo função para Indicadores de Fluxo de Caixa, lendo a sheet F.C. REAL do arquivo:

def ler_fluxo_caixa(celulas, negocio):
    df = pd.read_excel(fluxo_caixa, "F.C. REAL", skiprows = 1, nrows = 86, usecols = celulas, index_col = 0).transpose().iloc[1:, 54].reset_index()
    df.columns = fc_header
    df = spark.createDataFrame(df)
    df = df.selectExpr(fc_header[0], "'{}' as negocio".format(negocio), fc_header[1])
    df = df.withColumn("mes", to_date("mes"))
    return df

Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:

df_fc_madeira = ler_fluxo_caixa("FN:GA", "madeira")
df_fc_deca = ler_fluxo_caixa("BM:BZ", "deca")
df_fc_revestimento = ler_fluxo_caixa("IT:JG", "revestimento")
df_fc_consolidado = ler_fluxo_caixa("KJ:KW", "consolidado")

Fazendo união de todos os dataframes gerados:

df_fc = df_fc_madeira.union(df_fc_deca).union(df_fc_revestimento).union(df_fc_consolidado)

4.2.2.2 Indicadores de PMP

Definindo função para Indicadores de PMP, lendo a sheet CGL.ROL do arquivo:

def ler_pmp(celulas, negocio):
    df = pd.read_excel(fluxo_caixa, "CGL.ROL", skiprows = 1, nrows = 50, usecols = celulas, index_col = 0).transpose().iloc[1:, 26].reset_index()
    df.columns = pmp_header
    df = spark.createDataFrame(df)
    df = df.selectExpr(pmp_header[0], "'{}' as negocio".format(negocio), pmp_header[1])
    df = df.withColumn("mes", to_date("mes"))
    return df

Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:

df_pmp_madeira = ler_pmp("DR:EE", "madeira")
df_pmp_deca = ler_pmp("AU:BH", "deca")
df_pmp_revestimento = ler_pmp("FZ:GM", "revestimento")
df_pmp_consolidado = ler_pmp("GO:HB", "consolidado")

Fazendo união de todos os dataframes gerados:

df_pmp = df_pmp_madeira.union(df_pmp_deca).union(df_pmp_revestimento).union(df_pmp_consolidado)

4.2.3 Arquivo Forecast_3_9_2022.xlsx

4.2.3.1 Indicadores de Forecast

Definindo função para Indicadores de Forecast:

def ler_forecast(sheet, celulas, negocio):
    df = pd.read_excel(forecast, sheet, skiprows = 3, nrows = 83, usecols = celulas, index_col = 0).transpose().iloc[:, [6, 47]].reset_index()
    df.columns = frc_header
    df = spark.createDataFrame(df)
    df = df.selectExpr(frc_header[0], "'{}' as negocio".format(negocio), frc_header[1], frc_header[2])
    df = df.withColumn("mes", to_date("mes"))
    return df

Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:

df_frc_dre_madeira = ler_forecast("DRE_MAD", "A:M", "madeira")
df_frc_dre_deca = ler_forecast("DRE_DEC", "A:M", "deca")
df_frc_dre_revestimento = ler_forecast("DRE_REVEST", "A:M", "revestimento")
df_frc_dre_consolidado = ler_forecast("DRE_CONSOL", "A:M", "consolidado")

Fazendo união de todos os dataframes gerados:

df_frc = df_frc_dre_madeira.union(df_frc_dre_deca).union(df_frc_dre_revestimento).union(df_frc_dre_consolidado)

4.2.3.2 Indicadores de Forecast EVA

Definindo função para Indicadores de Forecast EVA, lendo a sheet Base EVA ROIC:

def ler_forecast_eva(negocio):
    #desinindo constantantes para cada negócio para utilizar posteriormente o iloc[] e localizar os dados necessários
    #esses valores pra neg são como os negócios são identificados no arquivo
    if negocio == "madeira":
        neg = "Mad. Total"
    elif negocio == "deca":
        neg = "Deca"
    elif negocio == "revestimento":
        neg = "RC"
    elif negocio == "consolidado":
        neg = "Consol"
    df = pd.read_excel(forecast, "Base EVA ROIC", skiprows = 4, nrows = 130, usecols = "B:Q")
    #buscando a linha em que a coluna 0 é igual ao negócio, coluna 1 igual a "Mês", coluna 2 igual a "RECORRENTE" e coluna 4 igual a "EVA"
    df = df.where(
        (df.iloc[:, 0] == neg) &
        (df.iloc[:, 1] == "Mês") &
        (df.iloc[:, 2] == "RECORRENTE") &
        (df.iloc[:, 3] == "EVA")
    ).dropna()
    df = df.transpose().iloc[4: ].reset_index()
    df.columns = frc_eva_header
    df = spark.createDataFrame(df)
    df = df.selectExpr(frc_eva_header[0], "'{}' as negocio".format(negocio), frc_eva_header[1])
    df = df.withColumn("mes", when(df.mes == 1, "01/01/" + ano_atual)
                           .when(df.mes == 2, "01/02/" + ano_atual)
                           .when(df.mes == 3, "01/03/" + ano_atual)
                           .when(df.mes == 4, "01/04/" + ano_atual)
                           .when(df.mes == 5, "01/05/" + ano_atual)
                           .when(df.mes == 6, "01/06/" + ano_atual)
                           .when(df.mes == 7, "01/07/" + ano_atual)
                           .when(df.mes == 8, "01/08/" + ano_atual)
                           .when(df.mes == 9, "01/09/" + ano_atual)
                           .when(df.mes == 10, "01/10/" + ano_atual)
                           .when(df.mes == 11, "01/11/" + ano_atual)
                           .when(df.mes == 12, "01/12/" + ano_atual)
                           .otherwise(None))
    df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
    return df

Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:

df_frc_eva_madeira = ler_forecast_eva("madeira")
df_frc_eva_deca = ler_forecast_eva("deca")
df_frc_eva_revestimento = ler_forecast_eva("revestimento")
df_frc_eva_consolidado = ler_forecast_eva("consolidado")

Fazendo união de todos os dataframes gerados e ajustando a ordem de grandeza do campo frc_eva_recorrente:

df_frc_eva = df_frc_eva_madeira.union(df_frc_eva_deca).union(df_frc_eva_revestimento).union(df_frc_eva_consolidado).withColumn("frc_eva_recorrente", col("frc_eva_recorrente")/1000)

4.2.4 Arquivo FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx

4.2.4.1 Indicadores Forecast Fluxo de Caixa

Definindo função para Indicadores de Forecast fluxo de Caixa, lendo a sheet F.C. FCST:

def ler_fcf(celulas, negocio):
    df = pd.read_excel(fcf, "F.C. FCST", skiprows = 1, nrows = 87, usecols = celulas, index_col = 0).transpose().iloc[1:, 54].reset_index()
    df.columns = fcf_header
    df = spark.createDataFrame(df)
    df = df.selectExpr(fcf_header[0], "'{}' as negocio".format(negocio), fcf_header[1])
    df = df.withColumn("mes", when(df.mes.like("JAN%"), "01/01/" + ano_atual)
                           .when(df.mes.like("FEV%"), "01/02/" + ano_atual)
                           .when(df.mes.like("MAR%"), "01/03/" + ano_atual)
                           .when(df.mes.like("ABR%"), "01/04/" + ano_atual)
                           .when(df.mes.like("MAI%"), "01/05/" + ano_atual)
                           .when(df.mes.like("JUN%"), "01/06/" + ano_atual)
                           .when(df.mes.like("JUL%"), "01/07/" + ano_atual)
                           .when(df.mes.like("AGO%"), "01/08/" + ano_atual)
                           .when(df.mes.like("SET%"), "01/09/" + ano_atual)
                           .when(df.mes.like("OUT%"), "01/10/" + ano_atual)
                           .when(df.mes.like("NOV%"), "01/11/" + ano_atual)
                           .when(df.mes.like("DEZ%"), "01/12/" + ano_atual)
                           .otherwise(None))
    df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
    return df

Aplicando a função para cada negócio, passando o parâmetro de range de células onde se encontram os dados:

df_fcf_madeira = ler_fcf("FN:GA", "madeira")
df_fcf_deca = ler_fcf("BM:BZ", "deca")
df_fcf_revestimento = ler_fcf("HD:HQ", "revestimento")
df_fcf_consolidado = ler_fcf("IT:JG", "consolidado")

Fazendo união de todos os dataframes gerados:

df_frc_fc = df_fcf_madeira.union(df_fcf_deca).union(df_fcf_revestimento).union(df_fcf_consolidado)

4.2.5 Arquivo indicadores_poa_2022.csv

Foi feito o upload desse arquivo diretamente no dbfs do Databricks:

df_poa = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_financeiros/indicadores_poa_2022.csv")

Tratando alguns tipos de campos e ajustando a ordem de grandeza de outro:

df_poa = df_poa.select(
    "mes",
    "negocio",
    col("poa_receita_liquida_vendas").cast(DoubleType()),
    col("poa_ebitda_recorrente").cast(DoubleType()),
    col("poa_eva_recorrente").cast(DoubleType()),
    col("poa_fluxo_caixa_livre_total").cast(DoubleType())
).withColumn("poa_eva_recorrente", col("poa_eva_recorrente")/1000).withColumn("poa_fluxo_caixa_livre_total", col("poa_fluxo_caixa_livre_total")/1000)

4.2.6 Base Final

Todos os dataframes gerados nos passos anteriores agora são unidos para formar um único, utilizando os campos mes e negocio:

campos_join = ["mes", "negocio"]
df_indicadores_financeiros = df_dre.join(
    df_eva, campos_join
).join(
    df_fc, campos_join
).join(
    df_pmp, campos_join
).join(
    df_frc, campos_join
).join(
    df_frc_eva, campos_join
).join(
    df_frc_fc, campos_join
).join(
    df_poa, campos_join
).orderBy("negocio", "mes")

Sobrescrevendo a tabela final no database :

df_indicadores_financeiros.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_financeiros_{}".format(ano_atual))

4.3 Base Final:

col_name

data_type

mes

date

negocio

string

receita_liquida_vendas

double

ebitda_recorrente

double

eva_recorrente

double

fluxo_caixa_livre_total

double

pmp

double

frc_receita_liquida_vendas

double

frc_ebitda_recorrente

double

frc_eva_recorrente

double

frc_fluxo_caixa_livre_total

double

poa_receita_liquida_vendas

double

poa_ebitda_recorrente

double

poa_eva_recorrente

double

poa_fluxo_caixa_livre_total

double

5. Indicadores Gente

Indicadores de Recursos Humanos.

5.1 Origem:

O arquivo indicadores_gente.csv é disponibilizado e atualizado mensalmente no Sharepoint.

5.2 Transformação:

Definindo função para o download do arquivo para o dbfs:

def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Lendo o arquivo:

df_gente = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/indicadores_gente.csv")

Aplicando regra de negócio para o campo taxa_afastamento:

df_gente = df_gente.withColumn("taxa_afastamento", col("taxa_absenteismo_com_afastamento") - col("taxa_absenteismo_sem_afastamento"))

Sobrescrevendo a tabela final na database indicadores_mercado:

df_gente.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_gente")

5.3 Base Final:

col_name

data_type

mes

date

negocio

string

quantidade_cargos_lideranca_mulheres

int

quantidade_cargos_lideranca_total

int

taxa_desligamentos_voluntarios

double

taxa_desligamentos_involuntarios

double

taxa_desligamentos_total

double

taxa_absenteismo_sem_afastamento

double

taxa_absenteismo_com_afastamento

double

taxa_afastamento

double

7. Indicadores Margem Madeira

Indicadores de Margem para Madeira.

7.1 Origem:

É feita uma consulta na fonte de dados vw_ren_rateio_aj_SQL, no site Madeira do Tableau Server:

7.2 Transformação:

São utilizadas as bibliotecas tableauserverclient e tableauhyperapi do Python:

import tableauserverclient as tsc
import tableauhyperapi
from tableauhyperapi import HyperProcess, Telemetry, Connection, TableName, escape_name, escape_string_literal

Definindo variáveis para conexão no Tableau Server:

usuario = "***"
senha = "***"
site = "Madeira"
servidor = "https://analytics.duratex.com.br"
id_datasource = "e88f21c1-bb2f-4516-b612-b62e8af74b94"
nome_database = "vw_ren_rateio_aj_SQL"
diretorio_download = "/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/"

Definindo parâmetros da conexão com o Tableau Server:

autenticador = tsc.TableauAuth(usuario, senha, site)
servidor = tsc.Server(servidor)

Fazendo o download do arquivo .tdsx:

with servidor.auth.sign_in(autenticador):
    caminho_arquivo = servidor.datasources.download(id_datasource, filepath = diretorio_download, include_extract = True)

O arquivo de extensão .tdsx nada mais é que uma compactação dos arquivos de fonte de dados do Tableau. Portanto foi utilizada a biblioteca zipfile do Python para fazer essa descompactação em diretório do dbfs:

with zipfile.ZipFile(diretorio_download + nome_database + ".tdsx", "r") as arquivo_zipado:
    arquivo_zipado.extractall(diretorio_download)

Após a descompactação, é utilizada a biblioteca tableauhyperapi para extrair os dados do arquivo .hyper, que foi descompactado no diretório /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Data/Extracts/ no processo anterior. É feita uma query para trazer apenas os campos necessários:

with HyperProcess(telemetry = Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
    with Connection(endpoint = hyper.endpoint, database = diretorio_download + "Data/Extracts/" + nome_database + ".hyper") as conexao:
        with conexao.execute_query(query = f"select {escape_name('cd_competencia')}, {escape_name('dc_linha_produto')}, {escape_name('gerencia')}, {escape_name('vl_receita_aj')}, {escape_name('md_m3_quantidade')}, {escape_name('vl_custo_industrial_total_CIT')}, {escape_name('vl_custo_comercial_total_CCO')} from {TableName('Extract', 'Extract')} where {escape_name('cd_setor_atividade')} = {escape_string_literal('CH')} and {escape_name('cd_competencia')} >= 202101 and {escape_name('gerencia')} = {escape_string_literal('MERCADO EXTERNO')}") as resultado:
            resultado_consulta = list(resultado)

Definindo schema para o dataframe que será criado a partir da lista resultado_consulta:

esquema_margem = StructType([
    StructField('mes', StringType(), True),
    StructField('produto', StringType(), True),
    StructField('mercado', StringType(), True),
    StructField('receita', DoubleType(), True),
    StructField('quantidade', DoubleType(), True),
    StructField('valor_cit', DoubleType(), True),
    StructField('valor_cco', DoubleType(), True)
])

Criando o dataframe:

df_margem = spark.createDataFrame(resultado_consulta, esquema_margem)

Utilizando o Spark SQL para aplicar algumas regras de negócio:

df_margem_calculos = spark.sql("""
    select
        mes,
        (case when produto like 'MDF%' then 'mdf' when produto like 'MDP%' then 'mdp' else 'outros' end) as produto,
        (sum(receita) / sum(quantidade)) as receita_m3,
        (sum(valor_cit) / sum(quantidade)) as cit_m3,
        (sum(valor_cco) / sum(quantidade)) as cco_m3
       
    from margem_view
    
    group by 1, 2
    
    order by 1, 2 asc
""")

Aplicando mais algumas regras de negócio:

df_margem_calculos = df_margem_calculos.withColumn("mop_m3", col("receita_m3") - col("cit_m3") - col("cco_m3")).withColumn("percentual_margem", col("mop_m3") / col("receita_m3"))

Sobrescrevendo a tabela final na database indicadores_mercado:

df_margem_calculos.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_margem_madeira")

7.3 Base Final:

col_name

data_type

mes

string

produto

string

receita_m3

double

cit_m3

double

cco_m3

double

mop_m3

double

9. Indicadores OEE Madeira

Indicadores de OEE para Madeira.

9.1 Origem:

O arquivo indicadores_oee_madeira.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.

9.2 Transformação:

Definindo função para ler o arquivo:

def extrair_excel(arquivo_leitura, nome_sheet, celulas, colunas, pular_linhas):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_leitura)
    response.raise_for_status()
    df = pd.read_excel(response.content, skiprows = pular_linhas, usecols = celulas, sheet_name = nome_sheet, names = colunas)
    df = spark.createDataFrame(df)
    return df

Lendo a sheet cru do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas. Utilizando o while e a função isinstance() para repetir o processo até que o dataframe esteja criado:

df_oee_cru = []
while not isinstance(df_oee_cru, DataFrame):
    try:
        df_oee_cru = extrair_excel(arquivo, "cru", "B:D", ["mes", "oee", "oee_meta_global"], 0)
    except:
        pass

Definindo o nome do produto como constante:

df_oee_cru = df_oee_cru.withColumn("produto", lit("cru"))

Lendo a sheet revestido do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:

df_oee_revestido = []
while not isinstance(df_oee_revestido, DataFrame):
    try:
        df_oee_revestido = extrair_excel(arquivo, "revestido", "B:D", ["mes", "oee", "oee_meta_global"], 0)
    except:
        pass

Definindo o nome do produto como constante:

df_oee_revestido = df_oee_revestido.withColumn("produto", lit("revestido"))

Lendo a sheet historico do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:

df_oee_historico = []
while not isinstance(df_oee_historico, DataFrame):
    try:
        df_oee_historico = extrair_excel(arquivo, "historico", "B:E", ["mes", "oee", "oee_meta_global", "produto"], 0)
    except:
        pass

Fazendo a união de todos os dataframes criados:

df_oee_madeira = df_oee_historico.union(df_oee_cru).union(df_oee_revestido)

Sobrescrevendo a tabela final na database indicadores_mercado:

df_oee_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_oee_madeira")

9.3 Base Final:

col_name

data_type

mes

date

oee

double

oee_meta_global

double

produto

string

10. Indicadores OTIF Deca

Indicadores de OTIF para Deca.

10.1 Origem:

O arquivo dados_otif_deca.csv é disponibilizado e atualizado mensalmente no Sharepoint.

10.2 Transformação:

Definindo função para fazer o download do arquivo em diretório do dbfs:

def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Lendo arquivo:

df_otif_deca = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_deca.csv")

Sobrescrevendo a tabela final na database indicadores_mercado:

df_otif_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_deca")

10.3 Base Final:

col_name

data_type

mes

date

segmento_deca

string

otif_deca

double

meta_otif_deca

double

11. Indicadores OTIF Madeira

Indicadores de OTIF para Madeira.

11.1 Origem:

O arquivo dados_otif_madeira.csv é disponibilizado e atualizado mensalmente no Sharepoint.

11.2 Transformação:

Definindo função para fazer o download do arquivo em diretório do dbfs:

def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Lendo arquivo:

df_otif_madeira = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_madeira.csv")

Sobrescrevendo a tabela final na database indicadores_mercado:

df_otif_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_madeira")

11.3 Base Final:

col_name

data_type

mes

date

segmento_madeira

string

otif_madeira

double

meta_otif_madeira

double

12. Indicadores Panorama Mercado Madeira

Indicadores de Panorama de Mercado para Madeira.

12.1 Origem:

Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e tabela vw_mercado_panorama_v2.

12.2 Transformação:

É feita uma query da tabela já aplicando algumas regras de negócio:

query = "select competencia, mercado, case when produto_detalhe in('MDF Fino Cru', 'MDF Grosso Cru') then 'MDF Cru' when produto_detalhe in('MDF Fino BP', 'MDF Grosso BP') then 'MDF Revestido' when produto_detalhe = 'MDP Cru' then 'MDP Cru' when produto_detalhe = 'MDP BP' then 'MDP Revestido' end as produto, sum(volume_m3) as volume_m3_total from madeira.vw_mercado_panorama_v2 where competencia >= 202101 and mercado = 'MI' group by 1, 2, 3 order by 1"

Renomeando campos:

colunas_panorama_mercado = ["mes", "mercado", "produto", "volume_m3_total"]
df_panorama_mercado = panorama_mercado.toDF(*colunas_panorama_mercado)

Ajustando campo mes:

df_panorama_mercado = df_panorama_mercado.withColumn("mes", col("mes").cast(StringType())).withColumn("mes", concat(substring(col("mes"), 1, 4), lit("-"), substring(col("mes"), 5, 2), lit("-"), lit("01"))).withColumn("mes", col("mes").cast(DateType()))

Sobrescrevendo a tabela final na database indicadores_mercado:

df_panorama_mercado.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_panorama_mercado_madeira")

12.3 Base Final:

col_name

data_type

mes

date

mercado

string

produto

string

volume_m3_total

double

13. Indicadores Produtividade Deca

Indicadores de Produtividade para Deca.

13.1 Origem:

O arquivo Produtividade DECA&RC.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.

13.2 Transformação:

Definindo função para fazer o download do arquivo para diretório do dbfs:

def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Fazendo o download do arquivo:

download_arquivo("/personal/henrique_fantin_duratex_com_br/Documents/Min_HEq/Produtividade%20DECA%26RC.xlsx", "Produtividade_DECA_RC.xlsx")

Lendo o arquivo:

produtividade_deca = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Produtividade_DECA_RC.xlsx")

Definindo função para ler os dados dos diferentes segmentos Deca. Passando os parâmetros de negócio, range de células, quantidade de linhas desconsideradas e quantidade de linhas consideradas, a função gera dois dataframes, um para produtividade e outro para POA. Isso é feito através de filtro na coluna: quando é diferente de POA, então é produtividade.

def ler_produtividade(negocio, celulas, pular_linhas, numero_linhas):
    #lendo arquivo excel
    df = pd.read_excel(produtividade_deca, skiprows = pular_linhas, nrows = numero_linhas, usecols = celulas, index_col = 0).transpose().reset_index()
    #definindo dataframe para o poa
    df_poa = df
    #definindo dataframe para produtividade, coluna 1 é diferente de "POA"
    df = df.where(df.iloc[:, 1] != "POA")
    df = df.iloc[:, [0, 1, numero_linhas]]
    df.columns = ["mes", "ano", "produtividade"]
    df = df.dropna(axis = 0)
    df = spark.createDataFrame(df)
    df = df.withColumn("ano", col("ano").cast(IntegerType()).cast(StringType()))
    #ajustandando campo mes para produtividade
    df = df.withColumn("mes", when(col("mes").like("JAN%"), concat(lit("01/01/"), col("ano")))
                           .when(col("mes").like("FEV%"), concat(lit("01/02/"), col("ano")))
                           .when(col("mes").like("MAR%"), concat(lit("01/03/"), col("ano")))
                           .when(col("mes").like("ABR%"), concat(lit("01/04/"), col("ano")))
                           .when(col("mes").like("MAI%"), concat(lit("01/05/"), col("ano")))
                           .when(col("mes").like("JUN%"), concat(lit("01/06/"), col("ano")))
                           .when(col("mes").like("JUL%"), concat(lit("01/07/"), col("ano")))
                           .when(col("mes").like("AGO%"), concat(lit("01/08/"), col("ano")))
                           .when(col("mes").like("SET%"), concat(lit("01/09/"), col("ano")))
                           .when(col("mes").like("OUT%"), concat(lit("01/10/"), col("ano")))
                           .when(col("mes").like("NOV%"), concat(lit("01/11/"), col("ano")))
                           .when(col("mes").like("DEZ%"), concat(lit("01/12/"), col("ano")))
                           .otherwise(None))
    df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
    #trabalhando com dataframe para poa, quando coluna 1 é igual a "POA"
    df_poa = df_poa.where(df_poa.iloc[:, 1] == "POA")
    df_poa = df_poa.iloc[:, [0, numero_linhas]]
    df_poa .columns = ["mes", "poa_produtividade"]
    df_poa = df_poa.dropna(axis = 0)
    df_poa = spark.createDataFrame(df_poa)
    #ajustando campo mes
    df_poa = df_poa.withColumn("mes", when(col("mes").like("JAN%"), "01/01/" + ano_atual)
                           .when(col("mes").like("FEV%"), "01/02/" + ano_atual)
                           .when(col("mes").like("MAR%"), "01/03/" + ano_atual)
                           .when(col("mes").like("ABR%"), "01/04/" + ano_atual)
                           .when(col("mes").like("MAI%"), "01/05/" + ano_atual)
                           .when(col("mes").like("JUN%"), "01/06/" + ano_atual)
                           .when(col("mes").like("JUL%"), "01/07/" + ano_atual)
                           .when(col("mes").like("AGO%"), "01/08/" + ano_atual)
                           .when(col("mes").like("SET%"), "01/09/" + ano_atual)
                           .when(col("mes").like("OUT%"), "01/10/" + ano_atual)
                           .when(col("mes").like("NOV%"), "01/11/" + ano_atual)
                           .when(col("mes").like("DEZ%"), "01/12/" + ano_atual)
                           .otherwise(None))
    df_poa = df_poa.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
    #definindo dataframe final, fazendo um join com os anteriores
    df_final = df_poa.join(df, df_poa.mes == df.mes, "left").drop(df.mes)
    df_final = df_final.withColumn("segmento", lit(negocio))
    df_final = df_final.select("mes", "segmento", "poa_produtividade", "produtividade")
    df_final = df_final.withColumn("poa_produtividade", round(col("poa_produtividade"), 2)).withColumn("produtividade", round(col("produtividade"), 2))
    return df_final

Aplicando função para metais, já definindo os parâmetros de range de células, quantidade de linhas ignoradas e quantidade de linhas consideradas:

df_metais = ler_produtividade("metais", "FA:FZ", 21, 5)

Aplicando função para loucas:

df_loucas = ler_produtividade("loucas", "FA:FZ", 54, 8)

Aplicando função para Hydra:

df_hydra = ler_produtividade("hydra", "FA:FZ", 93, 4)

Aplicando função para revestimento:

df_revestimento = ler_produtividade("revestimento", "FA:FZ", 145, 7)

Fazendo a união de todos os dataframes criados:

df_produtividade_deca = df_metais.union(df_loucas).union(df_hydra).union(df_revestimento)

Sobrescrevendo a tabela final na database indicadores_mercado:

df_produtividade_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_minuto_homem_deca")

13.3 Base Final:

col_name

data_type

mes

date

segmento

string

poa_produtividade

double

produtividade

double

14. Indicadores SAC

Indicadores de SAC.

14.1 Origem:

O arquivo dados_sac.csv é disponibilizado e atualizado mensalmente no Sharepoint.

14.2 Transformação:

Definindo função para fazer o download do arquivo em diretório do dbfs:

def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Lendo arquivo:

df_sac = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_sac.csv")

Aplicando regra de negócio para o campo saldo:

df_sac = df_sac.withColumn("saldo", col("casos_abertos") - col("casos_fechados"))

Sobrescrevendo a tabela final na database indicadores_mercado:

df_sac.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sac")

14.3 Base Final:

col_name

data_type

mes

date

visao

string

negocio

string

casos_abertos

int

casos_fechados

int

saldo

int

15. Indicadores Sell-in Deca - Chuveiros

Indicadores de Sell-in para Deca.

15.1 Origem:

O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx.

15.2 Transformação:

Lendo sheet do arquivo:

arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")

Renomeando colunas:

df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "tipo", "sku", "meses", "desviador", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"]

Tratando os valores em branco:

objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)

Transformando em dataframe do Spark:

df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)

Sobrescrevendo a tabela final na database indicadores_mercado:

df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_chuveiros")

15.3 Base Final:

col_name

data_type

marca

string

canal_abastecimento

string

regiao

string

macro_categoria

string

material

string

tipo

string

sku

string

meses

timestamp

desviador

string

linha

string

tensao

string

potencia

string

modelo

string

ean

string

valor

double

volume

double