Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Conjunto de notebooks que têm como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para serem consumidos pelo dashboard desenvolvido para o projeto.

1. Indicadores Carteira Deca

...

Code Block
languagepy
df_poa = df_poa.select(
    "mes",
    "negocio",
    col("poa_receita_liquida_vendas").cast(DoubleType()),
    col("poa_ebitda_recorrente").cast(DoubleType()),
    col("poa_eva_recorrente").cast(DoubleType()),
    col("poa_fluxo_caixa_livre_total").cast(DoubleType())
).withColumn("poa_eva_recorrente", col("poa_eva_recorrente")/1000).withColumn("poa_fluxo_caixa_livre_total", col("poa_fluxo_caixa_livre_total")/1000)

4.2.

...

6 Base Final

Todos os dataframes gerados nos passos anteriores agora são unidos para formar um único, utilizando os campos mes e negocio:

...

col_name

data_type

mes

date

negocio

string

valor_faturamento

decimal(29,2)

valor_devolucao

decimal(29,2)

receita_liquida_vendas

double

ebitda_recorrente

double

eva_recorrente

double

fluxo_caixa_livre_total

double

pmp

double

frc_receita_liquida_vendas

double

frc_ebitda_recorrente

double

frc_eva_recorrente

double

frc_fluxo_caixa_livre_total

double

poa_receita_liquida_vendas

double

poa_ebitda_recorrente

double

poa_eva_recorrente

double

poa_fluxo_caixa_livre_total

double

5. Indicadores Gente

Indicadores de Recursos Humanos.

5.1 Origem:

O arquivo indicadores_gente.csv é disponibilizado e atualizado mensalmente no Sharepoint.

5.2 Transformação:

Definindo função para o download do arquivo para o dbfs:

Code Block
languagepy
def download_arquivo(arquivo_download, nome_arquivo):
    ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
    web = ctx.load(ctx.web).execute_query()
    response = File.open_binary(ctx, arquivo_download)
    response.raise_for_status()
    with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
        pasta.write(response.content)

Lendo o arquivo:

Code Block
languagepy
df_gente = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/indicadores_gente.csv")

Aplicando regra de negócio para o campo taxa_afastamento:

Code Block
languagepy
df_gente = df_gente.withColumn("taxa_afastamento", col("taxa_absenteismo_com_afastamento") - col("taxa_absenteismo_sem_afastamento"))

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_gente.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_gente")

5.3 Base Final:

col_name

data_type

mes

date

negocio

string

quantidade_cargos_lideranca_mulheres

int

quantidade_cargos_lideranca_total

int

taxa_desligamentos_voluntarios

double

taxa_desligamentos_involuntarios

double

taxa_desligamentos_total

double

taxa_absenteismo_sem_afastamento

double

taxa_absenteismo_com_afastamento

double

taxa_afastamento

double

7. Indicadores Margem Madeira

Indicadores de Margem para Madeira.

7.1 Origem:

É feita uma consulta na fonte de dados vw_ren_rateio_aj_SQL, no site Madeira do Tableau Server:

7.2 Transformação:

São utilizadas as bibliotecas tableauserverclient e tableauhyperapi do Python:

Code Block
languagepy
import tableauserverclient as tsc
import tableauhyperapi
from tableauhyperapi import HyperProcess, Telemetry, Connection, TableName, escape_name, escape_string_literal

Definindo variáveis para conexão no Tableau Server:

Code Block
languagepy
usuario = "***"
senha = "***"
site = "Madeira"
servidor = "https://analytics.duratex.com.br"
id_datasource = "e88f21c1-bb2f-4516-b612-b62e8af74b94"
nome_database = "vw_ren_rateio_aj_SQL"
diretorio_download = "/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/"

Definindo parâmetros da conexão com o Tableau Server:

Code Block
languagepy
autenticador = tsc.TableauAuth(usuario, senha, site)
servidor = tsc.Server(servidor)

Fazendo o download do arquivo .tdsx:

Code Block
languagepy
with servidor.auth.sign_in(autenticador):
    caminho_arquivo = servidor.datasources.download(id_datasource, filepath = diretorio_download, include_extract = True)

O arquivo de extensão .tdsx nada mais é que uma compactação dos arquivos de fonte de dados do Tableau. Portanto foi utilizada a biblioteca zipfile do Python para fazer essa descompactação em diretório do dbfs:

Code Block
languagepy
with zipfile.ZipFile(diretorio_download + nome_database + ".tdsx", "r") as arquivo_zipado:
    arquivo_zipado.extractall(diretorio_download)

Após a descompactação, é utilizada a biblioteca tableauhyperapi para extrair os dados do arquivo .hyper, que foi descompactado no diretório /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Data/Extracts/ no processo anterior. É feita uma query para trazer apenas os campos necessários:

Code Block
with HyperProcess(telemetry = Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
    with Connection(endpoint = hyper.endpoint, database = diretorio_download + "Data/Extracts/" + nome_database + ".hyper") as conexao:
        with conexao.execute_query(query = f"select {escape_name('cd_competencia')}, {escape_name('dc_linha_produto')}, {escape_name('gerencia')}, {escape_name('vl_receita_aj')}, {escape_name('md_m3_quantidade')}, {escape_name('vl_custo_industrial_total_CIT')}, {escape_name('vl_custo_comercial_total_CCO')} from {TableName('Extract', 'Extract')} where {escape_name('cd_setor_atividade')} = {escape_string_literal('CH')} and {escape_name('cd_competencia')} >= 202101 and {escape_name('gerencia')} = {escape_string_literal('MERCADO EXTERNO')}") as resultado:
            resultado_consulta = list(resultado)

Definindo schema para o dataframe que será criado a partir da lista resultado_consulta:

Code Block
languagepy
esquema_margem = StructType([
    StructField('mes', StringType(), True),
    StructField('produto', StringType(), True),
    StructField('mercado', StringType(), True),
    StructField('receita', DoubleType(), True),
    StructField('quantidade', DoubleType(), True),
    StructField('valor_cit', DoubleType(), True),
    StructField('valor_cco', DoubleType(), True)
])

Criando o dataframe:

Code Block
languagepy
df_margem = spark.createDataFrame(resultado_consulta, esquema_margem)

Utilizando o Spark SQL para aplicar algumas regras de negócio:

Code Block
df_margem_calculos = spark.sql("""
    select
        mes,
        (case when produto like 'MDF%' then 'mdf' when produto like 'MDP%' then 'mdp' else 'outros' end) as produto,
        (sum(receita) / sum(quantidade)) as receita_m3,
        (sum(valor_cit) / sum(quantidade)) as cit_m3,
        (sum(valor_cco) / sum(quantidade)) as cco_m3
       
    from margem_view
    
    group by 1, 2
    
    order by 1, 2 asc
""")

Aplicando mais algumas regras de negócio:

Code Block
df_margem_calculos = df_margem_calculos.withColumn("mop_m3", col("receita_m3") - col("cit_m3") - col("cco_m3")).withColumn("percentual_margem", col("mop_m3") / col("receita_m3"))

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_margem_calculos.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_margem_madeira")

7.3 Base Final:

col_name

data_type

mes

string

produto

string

receita_m3

double

cit_m3

double

cco_m3

double

mop_m3

double