Conjunto de notebooks que têm como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para serem consumidos pelo dashboard desenvolvido para o projeto.
1. Indicadores Carteira Deca
...
Code Block | ||
---|---|---|
| ||
df_poa = df_poa.select( "mes", "negocio", col("poa_receita_liquida_vendas").cast(DoubleType()), col("poa_ebitda_recorrente").cast(DoubleType()), col("poa_eva_recorrente").cast(DoubleType()), col("poa_fluxo_caixa_livre_total").cast(DoubleType()) ).withColumn("poa_eva_recorrente", col("poa_eva_recorrente")/1000).withColumn("poa_fluxo_caixa_livre_total", col("poa_fluxo_caixa_livre_total")/1000) |
4.2.
...
6 Base Final
Todos os dataframes gerados nos passos anteriores agora são unidos para formar um único, utilizando os campos mes e negocio:
...
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | decimal(29,2) |
valor_devolucao | decimal(29,2) |
receita_liquida_vendas | double |
ebitda_recorrente | double |
eva_recorrente | double |
fluxo_caixa_livre_total | double |
pmp | double |
frc_receita_liquida_vendas | double |
frc_ebitda_recorrente | double |
frc_eva_recorrente | double |
frc_fluxo_caixa_livre_total | double |
poa_receita_liquida_vendas | double |
poa_ebitda_recorrente | double |
poa_eva_recorrente | double |
poa_fluxo_caixa_livre_total | double |
5. Indicadores Gente
Indicadores de Recursos Humanos.
5.1 Origem:
O arquivo indicadores_gente.csv é disponibilizado e atualizado mensalmente no Sharepoint.
5.2 Transformação:
Definindo função para o download do arquivo para o dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Lendo o arquivo:
Code Block | ||
---|---|---|
| ||
df_gente = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/indicadores_gente.csv") |
Aplicando regra de negócio para o campo taxa_afastamento:
Code Block | ||
---|---|---|
| ||
df_gente = df_gente.withColumn("taxa_afastamento", col("taxa_absenteismo_com_afastamento") - col("taxa_absenteismo_sem_afastamento")) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_gente.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_gente") |
5.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
quantidade_cargos_lideranca_mulheres | int |
quantidade_cargos_lideranca_total | int |
taxa_desligamentos_voluntarios | double |
taxa_desligamentos_involuntarios | double |
taxa_desligamentos_total | double |
taxa_absenteismo_sem_afastamento | double |
taxa_absenteismo_com_afastamento | double |
taxa_afastamento | double |
7. Indicadores Margem Madeira
Indicadores de Margem para Madeira.
7.1 Origem:
É feita uma consulta na fonte de dados vw_ren_rateio_aj_SQL, no site Madeira do Tableau Server:
7.2 Transformação:
São utilizadas as bibliotecas tableauserverclient e tableauhyperapi do Python:
Code Block | ||
---|---|---|
| ||
import tableauserverclient as tsc
import tableauhyperapi
from tableauhyperapi import HyperProcess, Telemetry, Connection, TableName, escape_name, escape_string_literal |
Definindo variáveis para conexão no Tableau Server:
Code Block | ||
---|---|---|
| ||
usuario = "***"
senha = "***"
site = "Madeira"
servidor = "https://analytics.duratex.com.br"
id_datasource = "e88f21c1-bb2f-4516-b612-b62e8af74b94"
nome_database = "vw_ren_rateio_aj_SQL"
diretorio_download = "/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" |
Definindo parâmetros da conexão com o Tableau Server:
Code Block | ||
---|---|---|
| ||
autenticador = tsc.TableauAuth(usuario, senha, site)
servidor = tsc.Server(servidor) |
Fazendo o download do arquivo .tdsx:
Code Block | ||
---|---|---|
| ||
with servidor.auth.sign_in(autenticador):
caminho_arquivo = servidor.datasources.download(id_datasource, filepath = diretorio_download, include_extract = True) |
O arquivo de extensão .tdsx nada mais é que uma compactação dos arquivos de fonte de dados do Tableau. Portanto foi utilizada a biblioteca zipfile do Python para fazer essa descompactação em diretório do dbfs:
Code Block | ||
---|---|---|
| ||
with zipfile.ZipFile(diretorio_download + nome_database + ".tdsx", "r") as arquivo_zipado:
arquivo_zipado.extractall(diretorio_download) |
Após a descompactação, é utilizada a biblioteca tableauhyperapi para extrair os dados do arquivo .hyper, que foi descompactado no diretório /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Data/Extracts/ no processo anterior. É feita uma query para trazer apenas os campos necessários:
Code Block |
---|
with HyperProcess(telemetry = Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
with Connection(endpoint = hyper.endpoint, database = diretorio_download + "Data/Extracts/" + nome_database + ".hyper") as conexao:
with conexao.execute_query(query = f"select {escape_name('cd_competencia')}, {escape_name('dc_linha_produto')}, {escape_name('gerencia')}, {escape_name('vl_receita_aj')}, {escape_name('md_m3_quantidade')}, {escape_name('vl_custo_industrial_total_CIT')}, {escape_name('vl_custo_comercial_total_CCO')} from {TableName('Extract', 'Extract')} where {escape_name('cd_setor_atividade')} = {escape_string_literal('CH')} and {escape_name('cd_competencia')} >= 202101 and {escape_name('gerencia')} = {escape_string_literal('MERCADO EXTERNO')}") as resultado:
resultado_consulta = list(resultado) |
Definindo schema para o dataframe que será criado a partir da lista resultado_consulta:
Code Block | ||
---|---|---|
| ||
esquema_margem = StructType([
StructField('mes', StringType(), True),
StructField('produto', StringType(), True),
StructField('mercado', StringType(), True),
StructField('receita', DoubleType(), True),
StructField('quantidade', DoubleType(), True),
StructField('valor_cit', DoubleType(), True),
StructField('valor_cco', DoubleType(), True)
]) |
Criando o dataframe:
Code Block | ||
---|---|---|
| ||
df_margem = spark.createDataFrame(resultado_consulta, esquema_margem) |
Utilizando o Spark SQL para aplicar algumas regras de negócio:
Code Block |
---|
df_margem_calculos = spark.sql("""
select
mes,
(case when produto like 'MDF%' then 'mdf' when produto like 'MDP%' then 'mdp' else 'outros' end) as produto,
(sum(receita) / sum(quantidade)) as receita_m3,
(sum(valor_cit) / sum(quantidade)) as cit_m3,
(sum(valor_cco) / sum(quantidade)) as cco_m3
from margem_view
group by 1, 2
order by 1, 2 asc
""") |
Aplicando mais algumas regras de negócio:
Code Block |
---|
df_margem_calculos = df_margem_calculos.withColumn("mop_m3", col("receita_m3") - col("cit_m3") - col("cco_m3")).withColumn("percentual_margem", col("mop_m3") / col("receita_m3")) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_margem_calculos.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_margem_madeira") |
7.3 Base Final:
col_name | data_type |
---|---|
mes | string |
produto | string |
receita_m3 | double |
cit_m3 | double |
cco_m3 | double |
mop_m3 | double |