...
col_name | data_type |
---|---|
mes | string |
produto | string |
receita_m3 | double |
cit_m3 | double |
cco_m3 | double |
mop_m3 | double |
9. Indicadores OEE Madeira
Indicadores de OEE para Madeira.
9.1 Origem:
O arquivo indicadores_oee_madeira.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.
9.2 Transformação:
Definindo função para ler o arquivo:
Code Block | ||
---|---|---|
| ||
def extrair_excel(arquivo_leitura, nome_sheet, celulas, colunas, pular_linhas):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_leitura)
response.raise_for_status()
df = pd.read_excel(response.content, skiprows = pular_linhas, usecols = celulas, sheet_name = nome_sheet, names = colunas)
df = spark.createDataFrame(df)
return df |
Lendo a sheet cru do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas. Utilizando o while e a função isinstance() para repetir o processo até que o dataframe esteja criado:
Code Block | ||
---|---|---|
| ||
df_oee_cru = []
while not isinstance(df_oee_cru, DataFrame):
try:
df_oee_cru = extrair_excel(arquivo, "cru", "B:D", ["mes", "oee", "oee_meta_global"], 0)
except:
pass |
Definindo o nome do produto como constante:
Code Block | ||
---|---|---|
| ||
df_oee_cru = df_oee_cru.withColumn("produto", lit("cru")) |
Lendo a sheet revestido do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:
Code Block | ||
---|---|---|
| ||
df_oee_revestido = []
while not isinstance(df_oee_revestido, DataFrame):
try:
df_oee_revestido = extrair_excel(arquivo, "revestido", "B:D", ["mes", "oee", "oee_meta_global"], 0)
except:
pass |
Definindo o nome do produto como constante:
Code Block | ||
---|---|---|
| ||
df_oee_revestido = df_oee_revestido.withColumn("produto", lit("revestido")) |
Lendo a sheet historico do arquivo, definindo os parâmetros de range de células, nome das células e quantidade de linhas a serem desconsideradas:
Code Block | ||
---|---|---|
| ||
df_oee_historico = []
while not isinstance(df_oee_historico, DataFrame):
try:
df_oee_historico = extrair_excel(arquivo, "historico", "B:E", ["mes", "oee", "oee_meta_global", "produto"], 0)
except:
pass |
Fazendo a união de todos os dataframes criados:
Code Block | ||
---|---|---|
| ||
df_oee_madeira = df_oee_historico.union(df_oee_cru).union(df_oee_revestido) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_oee_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_oee_madeira") |
9.3 Base Final:
col_name | data_type |
---|---|
mes | date |
oee | double |
oee_meta_global | double |
produto | string |
10. Indicadores OTIF Deca
Indicadores de OTIF para Deca.
10.1 Origem:
O arquivo dados_otif_deca.csv é disponibilizado e atualizado mensalmente no Sharepoint.
10.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Lendo arquivo:
Code Block | ||
---|---|---|
| ||
df_otif_deca = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_deca.csv") |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_otif_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_deca") |
10.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento_deca | string |
otif_deca | double |
meta_otif_deca | double |
11. Indicadores OTIF Madeira
Indicadores de OTIF para Madeira.
11.1 Origem:
O arquivo dados_otif_madeira.csv é disponibilizado e atualizado mensalmente no Sharepoint.
11.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Lendo arquivo:
Code Block | ||
---|---|---|
| ||
df_otif_madeira = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_otif_madeira.csv") |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_otif_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_otif_madeira") |
11.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento_madeira | string |
otif_madeira | double |
meta_otif_madeira | double |
12. Indicadores Panorama Mercado Madeira
Indicadores de Panorama de Mercado para Madeira.
12.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e tabela vw_mercado_panorama_v2.
12.2 Transformação:
É feita uma query da tabela já aplicando algumas regras de negócio:
Code Block | ||
---|---|---|
| ||
query = "select competencia, mercado, case when produto_detalhe in('MDF Fino Cru', 'MDF Grosso Cru') then 'MDF Cru' when produto_detalhe in('MDF Fino BP', 'MDF Grosso BP') then 'MDF Revestido' when produto_detalhe = 'MDP Cru' then 'MDP Cru' when produto_detalhe = 'MDP BP' then 'MDP Revestido' end as produto, sum(volume_m3) as volume_m3_total from madeira.vw_mercado_panorama_v2 where competencia >= 202101 and mercado = 'MI' group by 1, 2, 3 order by 1" |
Renomeando campos:
Code Block | ||
---|---|---|
| ||
colunas_panorama_mercado = ["mes", "mercado", "produto", "volume_m3_total"]
df_panorama_mercado = panorama_mercado.toDF(*colunas_panorama_mercado) |
Ajustando campo mes:
Code Block | ||
---|---|---|
| ||
df_panorama_mercado = df_panorama_mercado.withColumn("mes", col("mes").cast(StringType())).withColumn("mes", concat(substring(col("mes"), 1, 4), lit("-"), substring(col("mes"), 5, 2), lit("-"), lit("01"))).withColumn("mes", col("mes").cast(DateType())) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_panorama_mercado.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_panorama_mercado_madeira") |
12.3 Base Final:
col_name | data_type |
---|---|
mes | date |
mercado | string |
produto | string |
volume_m3_total | double |
13. Indicadores Produtividade Deca
Indicadores de Produtividade para Deca.
13.1 Origem:
O arquivo Produtividade DECA&RC.xlsx é disponibilizado e atualizado mensalmente no Sharepoint.
13.2 Transformação:
Definindo função para fazer o download do arquivo para diretório do dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Fazendo o download do arquivo:
Code Block | ||
---|---|---|
| ||
download_arquivo("/personal/henrique_fantin_duratex_com_br/Documents/Min_HEq/Produtividade%20DECA%26RC.xlsx", "Produtividade_DECA_RC.xlsx") |
Lendo o arquivo:
Code Block | ||
---|---|---|
| ||
produtividade_deca = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/Produtividade_DECA_RC.xlsx") |
Definindo função para ler os dados dos diferentes segmentos Deca. Passando os parâmetros de negócio, range de células, quantidade de linhas desconsideradas e quantidade de linhas consideradas, a função gera dois dataframes, um para produtividade e outro para POA. Isso é feito através de filtro na coluna: quando é diferente de POA, então é produtividade.
Code Block | ||
---|---|---|
| ||
def ler_produtividade(negocio, celulas, pular_linhas, numero_linhas):
#lendo arquivo excel
df = pd.read_excel(produtividade_deca, skiprows = pular_linhas, nrows = numero_linhas, usecols = celulas, index_col = 0).transpose().reset_index()
#definindo dataframe para o poa
df_poa = df
#definindo dataframe para produtividade, coluna 1 é diferente de "POA"
df = df.where(df.iloc[:, 1] != "POA")
df = df.iloc[:, [0, 1, numero_linhas]]
df.columns = ["mes", "ano", "produtividade"]
df = df.dropna(axis = 0)
df = spark.createDataFrame(df)
df = df.withColumn("ano", col("ano").cast(IntegerType()).cast(StringType()))
#ajustandando campo mes para produtividade
df = df.withColumn("mes", when(col("mes").like("JAN%"), concat(lit("01/01/"), col("ano")))
.when(col("mes").like("FEV%"), concat(lit("01/02/"), col("ano")))
.when(col("mes").like("MAR%"), concat(lit("01/03/"), col("ano")))
.when(col("mes").like("ABR%"), concat(lit("01/04/"), col("ano")))
.when(col("mes").like("MAI%"), concat(lit("01/05/"), col("ano")))
.when(col("mes").like("JUN%"), concat(lit("01/06/"), col("ano")))
.when(col("mes").like("JUL%"), concat(lit("01/07/"), col("ano")))
.when(col("mes").like("AGO%"), concat(lit("01/08/"), col("ano")))
.when(col("mes").like("SET%"), concat(lit("01/09/"), col("ano")))
.when(col("mes").like("OUT%"), concat(lit("01/10/"), col("ano")))
.when(col("mes").like("NOV%"), concat(lit("01/11/"), col("ano")))
.when(col("mes").like("DEZ%"), concat(lit("01/12/"), col("ano")))
.otherwise(None))
df = df.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
#trabalhando com dataframe para poa, quando coluna 1 é igual a "POA"
df_poa = df_poa.where(df_poa.iloc[:, 1] == "POA")
df_poa = df_poa.iloc[:, [0, numero_linhas]]
df_poa .columns = ["mes", "poa_produtividade"]
df_poa = df_poa.dropna(axis = 0)
df_poa = spark.createDataFrame(df_poa)
#ajustando campo mes
df_poa = df_poa.withColumn("mes", when(col("mes").like("JAN%"), "01/01/" + ano_atual)
.when(col("mes").like("FEV%"), "01/02/" + ano_atual)
.when(col("mes").like("MAR%"), "01/03/" + ano_atual)
.when(col("mes").like("ABR%"), "01/04/" + ano_atual)
.when(col("mes").like("MAI%"), "01/05/" + ano_atual)
.when(col("mes").like("JUN%"), "01/06/" + ano_atual)
.when(col("mes").like("JUL%"), "01/07/" + ano_atual)
.when(col("mes").like("AGO%"), "01/08/" + ano_atual)
.when(col("mes").like("SET%"), "01/09/" + ano_atual)
.when(col("mes").like("OUT%"), "01/10/" + ano_atual)
.when(col("mes").like("NOV%"), "01/11/" + ano_atual)
.when(col("mes").like("DEZ%"), "01/12/" + ano_atual)
.otherwise(None))
df_poa = df_poa.withColumn("mes", to_date("mes", "dd/MM/yyyy"))
#definindo dataframe final, fazendo um join com os anteriores
df_final = df_poa.join(df, df_poa.mes == df.mes, "left").drop(df.mes)
df_final = df_final.withColumn("segmento", lit(negocio))
df_final = df_final.select("mes", "segmento", "poa_produtividade", "produtividade")
df_final = df_final.withColumn("poa_produtividade", round(col("poa_produtividade"), 2)).withColumn("produtividade", round(col("produtividade"), 2))
return df_final |
Aplicando função para metais, já definindo os parâmetros de range de células, quantidade de linhas ignoradas e quantidade de linhas consideradas:
Code Block | ||
---|---|---|
| ||
df_metais = ler_produtividade("metais", "FA:FZ", 21, 5) |
Aplicando função para loucas:
Code Block | ||
---|---|---|
| ||
df_loucas = ler_produtividade("loucas", "FA:FZ", 54, 8) |
Aplicando função para Hydra:
Code Block | ||
---|---|---|
| ||
df_hydra = ler_produtividade("hydra", "FA:FZ", 93, 4) |
Aplicando função para revestimento:
Code Block | ||
---|---|---|
| ||
df_revestimento = ler_produtividade("revestimento", "FA:FZ", 145, 7) |
Fazendo a união de todos os dataframes criados:
Code Block | ||
---|---|---|
| ||
df_produtividade_deca = df_metais.union(df_loucas).union(df_hydra).union(df_revestimento) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_produtividade_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_minuto_homem_deca") |
13.3 Base Final:
col_name | data_type |
---|---|
mes | date |
segmento | string |
poa_produtividade | double |
produtividade | double |
14. Indicadores SAC
Indicadores de SAC.
14.1 Origem:
O arquivo dados_sac.csv é disponibilizado e atualizado mensalmente no Sharepoint.
14.2 Transformação:
Definindo função para fazer o download do arquivo em diretório do dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Lendo arquivo:
Code Block | ||
---|---|---|
| ||
df_sac = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("dbfs:/FileStore/shared_uploads/arquivos_diversos_cognitivo/dados_sac.csv") |
Aplicando regra de negócio para o campo saldo:
Code Block | ||
---|---|---|
| ||
df_sac = df_sac.withColumn("saldo", col("casos_abertos") - col("casos_fechados")) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_sac.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sac") |
14.3 Base Final:
col_name | data_type |
---|---|
mes | date |
visao | string |
negocio | string |
casos_abertos | int |
casos_fechados | int |
saldo | int |
15. Indicadores Sell-in Deca - Chuveiros
Indicadores de Sell-in para Deca.
15.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx.
15.2 Transformação:
Lendo sheet do arquivo:
Code Block | ||
---|---|---|
| ||
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_chuveiros.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -") |
Renomeando colunas:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "tipo", "sku", "meses", "desviador", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"] |
Tratando os valores em branco:
Code Block | ||
---|---|---|
| ||
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str) |
Transformando em dataframe do Spark:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_chuveiros") |
15.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
tipo | string |
sku | string |
meses | timestamp |
desviador | string |
linha | string |
tensao | string |
potencia | string |
modelo | string |
ean | string |
valor | double |
volume | double |