Conjunto de notebooks que têm tem como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para serem consumidos pelo dashboard desenvolvido para o projeto.
...
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "tipo", "sku", "meses", "desviador", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"] |
Tratando os valores em brancomissing:
Code Block | ||
---|---|---|
| ||
objs = df_sell_in_deca.select_dtypes(include = "object").columns df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str) |
...
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
tipo | string |
sku | string |
meses | timestamp |
desviador | string |
linha | string |
tensao | string |
potencia | string |
modelo | string |
ean | string |
valor | double |
volume | double |
16. Indicadores Sell-in Deca - Cubas
Indicadores de Sell-in para Deca.
16.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx.
16.2 Transformação:
Lendo sheet do arquivo:
Code Block | ||
---|---|---|
| ||
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -") |
Renomeando colunas:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "aplicacao", "instalacao", "sku", "meses", "formato", "cor", "acabamento", "ean", "valor", "volume"] |
Tratando os valores missing:
Code Block | ||
---|---|---|
| ||
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str) |
Transformando em dataframe do Spark:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_cubas") |
16.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
aplicacao | string |
instalacao | string |
sku | string |
meses | timestamp |
formato | string |
cor | string |
acabamento | string |
ean | string |
valor | double |
volume | double |
17. Indicadores Sell-in Deca - Torneiras
Indicadores de Sell-in para Deca.
17.1 Origem:
O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx.
17.2 Transformação:
Lendo sheet do arquivo:
Code Block | ||
---|---|---|
| ||
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -") |
Renomeando colunas:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "subcategoria", "aplicacao", "instalacao", "sku", "meses", "bica", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"] |
Tratando os valores missing:
Code Block | ||
---|---|---|
| ||
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str) |
Transformando em dataframe do Spark:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_torneiras") |
17.3 Base Final:
col_name | data_type |
---|---|
marca | string |
canal_abastecimento | string |
regiao | string |
macro_categoria | string |
material | string |
subcategoria | string |
aplicacao | string |
instalacao | string |
sku | string |
meses | timestamp |
bica | string |
linha | string |
tensao | string |
potencia | string |
modelo | string |
ean | string |
valor | double |
volume | double |
18. Indicadores Market Share, Sell-in e Sell-out Madeira
Indicadores para Market Share, Sell-in e Sell-out de Madeira.
18.1 Origem:
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e view vw_base_mktshare_sellin_sellout.
18.2 Transformação:
Fazendo uma query com um select de todos os campos da tabela origem:
Code Block | ||
---|---|---|
| ||
query = "select * from madeira.vw_base_mktshare_sellin_sellout"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
consulta_madeira = redshift_to_dataframe(query = query, filename = "vw_base_mktshare_sellin_sellout", bucket_name = bucket_name) |
Renomeando as colunas:
Code Block | ||
---|---|---|
| ||
olunas_madeira = ["tipo", "ano", "competencia", "mercado", "produto", "produto_detalhe", "volume_m3", "volume_m2", "volume_m3_cap_dtx", "volume_m3_cap_mercex", "data_atualizacao_base", "produto_segmento", "quantidade_volume_m3_liquido", "quantidade_volume_m2"]
df_madeira = consulta_madeira.toDF(*colunas_madeira) |
Ajustando tipos dos campos:
Code Block | ||
---|---|---|
| ||
df_madeira = df_madeira.withColumn("ano", col("ano").cast(IntegerType())).withColumn("competencia", col("competencia").cast(IntegerType())).withColumn("volume_m3", col("volume_m3").cast(DoubleType())).withColumn("volume_m2", col("volume_m2").cast(DoubleType())).withColumn("volume_m3_cap_dtx", col("volume_m3_cap_dtx").cast(DoubleType())).withColumn("volume_m3_cap_mercex", col("volume_m3_cap_mercex").cast(DoubleType())) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_share_sell_in_sell_out_madeira") |
18.3 Base Final:
col_name | data_type |
---|---|
tipo | string |
ano | int |
competencia | int |
mercado | string |
produto | string |
produto_detalhe | string |
volume_m3 | double |
volume_m2 | double |
volume_m3_cap_dtx | double |
volume_m3_cap_mercex | double |
data_atualizacao_base | date |
produto_segmento | string |
quantidade_volume_m3_liquido | double |
quantidade_volume_m2 | double |
19. Indicadores Vendas Deca
Indicadores para Vendas de Deca.
19.1 Origem:
Consulta da tabela tb_resultado_comercial da database large e consulta database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema large e tabela tb_metas_comercial_hierarquia_produto.
19.2 Transformação:
Definindo função para criação de dataframe com range de datas, desde 01/01/2022 até hoje.
Code Block | ||
---|---|---|
| ||
for negocio in negocios:
df = pd.date_range(start = "2022-01-01", end = date.today())
df = pd.DataFrame(df, columns = ["data"])
df = spark.createDataFrame(df)
df = df.withColumn("data", to_date("data", 'yyyy-MM-dd')).withColumn("mes", to_date(date_format("data", 'yyyy-MM-01')))
df = df.withColumn("negocio", lit(negocio))
if negocio == negocios[0]:
df_datas = df
else:
df_datas = df_datas.union(df) |
Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
Code Block | ||
---|---|---|
| ||
df_datas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_datas") |
Fazendo consulta para vendas na large, já aplicando algumas regras de negócio:
Code Block | ||
---|---|---|
| ||
df_vendas = spark.sql("""
select
t1.data_competencia as data,
to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes,
(case when t1.codigo_setor_atividade = 'CS' then 'loucas'
when t1.codigo_setor_atividade = 'MS' then 'metais'
when t1.codigo_setor_atividade = '01' then 'revestimento'
when t1.codigo_setor_atividade = 'HY' then 'hydra' end) as negocio,
round(sum(t1.valor_receita_liquida), 2) as receita_liquida_vendas
from large.tb_resultado_comercial t1
where t1.codigo_setor_atividade in("CS", "MS", "HY", "01")
and t1.status_ordem_venda in("EXPORTAÇÃO", "VENDA", "DEVOLUÇÃO", "CANCELAMENTO", "CINI")
and t1.data_competencia between '2022-01-01' and (current_date() - 1)
group by 1, 2, 3
order by 3, 2, 1 asc
""") |
Fazendo join entre o dataframe de datas e o de vendas, com o objetivo de agregar todos os dias do range na base, mesmo que sem registro de vendas:
Code Block | ||
---|---|---|
| ||
df_vendas_ajustado = df_datas.join(df_vendas, ["data", "mes", "negocio"], "left") |
Definindo um particionamento no dataframe df_vendas_ajustado, com o objetivo de posteriormente fazer uma soma acumuladas das vendas agrupada por negócio e mês:
Code Block | ||
---|---|---|
| ||
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0)) |
Criando campo com soma acumulada, utilizando a partição criada anteriormente:
Code Block | ||
---|---|---|
| ||
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part)) |
Criando campo com soma acumulada, utilizando a partição criada anteriormente:
Code Block | ||
---|---|---|
| ||
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part)) |
Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
Code Block | ||
---|---|---|
| ||
df_vendas_acumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_vendas_diarias_deca") |
Então, é feita uma query, já aplicando algumas regras de negócio, na large do Redshift, com o objetivo de obter as metas:
Code Block | ||
---|---|---|
| ||
query = "select data_competencia as mes, (case when codigo_setor_atividade = 'CS' then 'loucas' when codigo_setor_atividade = 'MS' then 'metais' when codigo_setor_atividade = '01' then 'revestimento' when codigo_setor_atividade = 'HY' then 'hydra' end) as negocio, tipo_meta, sum(valor_receita_liquida) as valor_meta from large.tb_metas_comercial_hierarquia_produto where data_competencia >= '2021-01-01' and codigo_setor_atividade in('CS', 'HY', 'MS', '01') and tipo_meta in('POA', 'PEV') group by 1, 2, 3 order by 1, 2, 3"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
consulta_metas = redshift_to_dataframe(query = query, filename = "tb_metas_comercial_hierarquia_produto", bucket_name = bucket_name) |
Renomeando colunas:
Code Block | ||
---|---|---|
| ||
colunas_metas = ["mes", "negocio", "tipo_meta", "valor_meta_mes"]
df_metas = consulta_metas.toDF(*colunas_metas) |
Próximo passo é definir uma função para cálculo de quantidade de dias dentro de cada mês, com o objetivo de posteriormente calcular a meta diárias através da mensal:
Code Block | ||
---|---|---|
| ||
def numero_dias(data):
mes = data.month
ano = data.year
#tratativa caso fevereiro (ano bissexto)
cons = 0
#todo ano divisível por 400 e ao mesmo tempo 100 é bissexto
if ano % 400 == 0 and ano % 100 == 0:
cons = 1
#todo ano divisível por 4 é bissexto
elif ano % 4 == 0:
cons = 1
else:
cons = 0
if mes == 2:
return 28 + cons
#meses com quantidade ímpar de dias
impares = [1, 3, 5, 7, 8, 10, 12]
if mes in impares:
return 31
return 30 |
Aplicando a função e calculando a meta diária:
Code Block | ||
---|---|---|
| ||
numero_dias_udf = udf(lambda z: numero_dias(z), IntegerType())
df_metas = df_metas.withColumn("valor_meta_diaria", (col("valor_meta_mes") / col("numero_dias_mes"))) |
Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:
Code Block | ||
---|---|---|
| ||
df_metas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_metas_mensais_deca") |
Criando colunas para cada tipo de meta, PEV e POA:
Code Block | ||
---|---|---|
| ||
df_metas_pev = df_metas.where(col("tipo_meta") == "PEV").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_pev").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_pev", functions.round("valor_meta_diaria_pev", 2))
df_metas_poa = df_metas.where(col("tipo_meta") == "POA").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_poa").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_poa", functions.round("valor_meta_diaria_poa", 2)) |
Criando dataframe unindo as duas metas:
Code Block | ||
---|---|---|
| ||
df_metas_final = df_metas_pev.join(df_metas_poa, ["mes", "negocio"]) |
Fazendo join entre a base de datas e a de metas:
Code Block | ||
---|---|---|
| ||
df_datas_ajuste = spark.sql("select * from indicadores_mercado.tb_datas")
df_metas_ajustado = df_datas_ajuste.join(df_metas_final, ["mes", "negocio"], "left") |
Por fim, criando a tabela final. Fazendo query na database para vendas e join com o dataframe de metas:
Code Block | ||
---|---|---|
| ||
df_vendas_metas_deca = df_vendas.join(df_metas_ajustado, ["data", "mes", "negocio"]) |
Criando particionamento, para posteriormente calcular as metas acumuladas por mês e negócio:
Code Block | ||
---|---|---|
| ||
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0)) |
Criando as metas acumuladas utilizando o particionamento:
Code Block | ||
---|---|---|
| ||
df_vendas_metas_deca_acumuladas = df_vendas_metas_deca.withColumn("metas_acumuladas_pev", functions.sum("valor_meta_diaria_pev").over(part)).withColumn("metas_acumuladas_poa", functions.sum("valor_meta_diaria_poa").over(part)) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_vendas_metas_deca_acumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_vendas_deca") |
19.3 Base Final:
col_name | data_type |
---|---|
data | date |
mes | date |
negocio | string |
receita_liquida_vendas | double |
receita_liquida_vendas_acumuladas | double |
valor_meta_diaria_pev | double |
valor_meta_diaria_poa | double |
metas_acumuladas_pev | double |
metas_acumuladas_poa | double |