Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Conjunto de notebooks que têm tem como objetivo fazer a ingestão de dados e indicadores de diversas áreas da Dexco e disponibilizar da forma mais automatizada possível para serem consumidos pelo dashboard desenvolvido para o projeto.

...

Code Block
languagepy
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "tipo", "sku", "meses", "desviador", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"]

Tratando os valores em brancomissing:

Code Block
languagepy
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)

...

col_name

data_type

marca

string

canal_abastecimento

string

regiao

string

macro_categoria

string

material

string

tipo

string

sku

string

meses

timestamp

desviador

string

linha

string

tensao

string

potencia

string

modelo

string

ean

string

valor

double

volume

double

16. Indicadores Sell-in Deca - Cubas

Indicadores de Sell-in para Deca.

16.1 Origem:

O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx.

16.2 Transformação:

Lendo sheet do arquivo:

Code Block
languagepy
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_cubas.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")

Renomeando colunas:

Code Block
languagepy
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "aplicacao", "instalacao", "sku", "meses", "formato", "cor", "acabamento", "ean", "valor", "volume"]

Tratando os valores missing:

Code Block
languagepy
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)

Transformando em dataframe do Spark:

Code Block
languagepy
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_cubas")

16.3 Base Final:

col_name

data_type

marca

string

canal_abastecimento

string

regiao

string

macro_categoria

string

material

string

aplicacao

string

instalacao

string

sku

string

meses

timestamp

formato

string

cor

string

acabamento

string

ean

string

valor

double

volume

double

17. Indicadores Sell-in Deca - Torneiras

Indicadores de Sell-in para Deca.

17.1 Origem:

O arquivo foi disponibilizado e foi feito seu upload direto no dbfs do Databricks. Motivo é que esses dados não são atualizáveis. Diretório de upload: /dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx.

17.2 Transformação:

Lendo sheet do arquivo:

Code Block
languagepy
arquivo_sell_in = pd.ExcelFile("/dbfs/FileStore/shared_uploads/arquivos_diversos_cognitivo/sell_in_deca_torneiras.xlsx")
df_sell_in_deca = pd.read_excel(arquivo_sell_in, "Banco de dados - Construcheck -")

Renomeando colunas:

Code Block
languagepy
df_sell_in_deca.columns = ["marca", "canal_abastecimento", "regiao", "macro_categoria", "material", "subcategoria", "aplicacao", "instalacao", "sku", "meses", "bica", "linha", "tensao", "potencia", "modelo", "ean", "valor", "volume"]

Tratando os valores missing:

Code Block
languagepy
objs = df_sell_in_deca.select_dtypes(include = "object").columns
df_sell_in_deca[objs] = df_sell_in_deca[objs].fillna("").astype(str)

Transformando em dataframe do Spark:

Code Block
languagepy
df_sell_in_deca = spark.createDataFrame(df_sell_in_deca)

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_sell_in_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_sell_in_deca_torneiras")

17.3 Base Final:

col_name

data_type

marca

string

canal_abastecimento

string

regiao

string

macro_categoria

string

material

string

subcategoria

string

aplicacao

string

instalacao

string

sku

string

meses

timestamp

bica

string

linha

string

tensao

string

potencia

string

modelo

string

ean

string

valor

double

volume

double

18. Indicadores Market Share, Sell-in e Sell-out Madeira

Indicadores para Market Share, Sell-in e Sell-out de Madeira.

18.1 Origem:

Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema madeira e view vw_base_mktshare_sellin_sellout.

18.2 Transformação:

Fazendo uma query com um select de todos os campos da tabela origem:

Code Block
languagepy
query = "select * from madeira.vw_base_mktshare_sellin_sellout"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
consulta_madeira = redshift_to_dataframe(query = query, filename = "vw_base_mktshare_sellin_sellout", bucket_name = bucket_name)

Renomeando as colunas:

Code Block
languagepy
olunas_madeira = ["tipo", "ano", "competencia", "mercado", "produto", "produto_detalhe", "volume_m3", "volume_m2", "volume_m3_cap_dtx", "volume_m3_cap_mercex", "data_atualizacao_base", "produto_segmento", "quantidade_volume_m3_liquido", "quantidade_volume_m2"]
df_madeira = consulta_madeira.toDF(*colunas_madeira)

Ajustando tipos dos campos:

Code Block
languagepy
df_madeira = df_madeira.withColumn("ano", col("ano").cast(IntegerType())).withColumn("competencia", col("competencia").cast(IntegerType())).withColumn("volume_m3", col("volume_m3").cast(DoubleType())).withColumn("volume_m2", col("volume_m2").cast(DoubleType())).withColumn("volume_m3_cap_dtx", col("volume_m3_cap_dtx").cast(DoubleType())).withColumn("volume_m3_cap_mercex", col("volume_m3_cap_mercex").cast(DoubleType()))

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_share_sell_in_sell_out_madeira")

18.3 Base Final:

col_name

data_type

tipo

string

ano

int

competencia

int

mercado

string

produto

string

produto_detalhe

string

volume_m3

double

volume_m2

double

volume_m3_cap_dtx

double

volume_m3_cap_mercex

double

data_atualizacao_base

date

produto_segmento

string

quantidade_volume_m3_liquido

double

quantidade_volume_m2

double

19. Indicadores Vendas Deca

Indicadores para Vendas de Deca.

19.1 Origem:

Consulta da tabela tb_resultado_comercial da database large e consulta database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema large e tabela tb_metas_comercial_hierarquia_produto.

19.2 Transformação:

Definindo função para criação de dataframe com range de datas, desde 01/01/2022 até hoje.

Code Block
languagepy
for negocio in negocios:
    df = pd.date_range(start = "2022-01-01", end = date.today())
    df = pd.DataFrame(df, columns = ["data"])
    df = spark.createDataFrame(df)
    df = df.withColumn("data", to_date("data", 'yyyy-MM-dd')).withColumn("mes", to_date(date_format("data", 'yyyy-MM-01')))
    df = df.withColumn("negocio", lit(negocio))
    if negocio == negocios[0]:
        df_datas = df
    else:
        df_datas = df_datas.union(df)

Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:

Code Block
languagepy
df_datas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_datas")

Fazendo consulta para vendas na large, já aplicando algumas regras de negócio:

Code Block
languagepy
df_vendas = spark.sql("""
    select
        t1.data_competencia as data,
        to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes,
        (case when t1.codigo_setor_atividade = 'CS' then 'loucas'
            when t1.codigo_setor_atividade = 'MS' then 'metais'
            when t1.codigo_setor_atividade = '01' then 'revestimento'
            when t1.codigo_setor_atividade = 'HY' then 'hydra' end) as negocio,
         round(sum(t1.valor_receita_liquida), 2) as receita_liquida_vendas
    from large.tb_resultado_comercial t1
    where t1.codigo_setor_atividade in("CS", "MS", "HY", "01")
        and t1.status_ordem_venda in("EXPORTAÇÃO", "VENDA", "DEVOLUÇÃO", "CANCELAMENTO", "CINI")
        and t1.data_competencia between '2022-01-01' and (current_date() - 1)
    group by 1, 2, 3
    order by 3, 2, 1 asc
""")

Fazendo join entre o dataframe de datas e o de vendas, com o objetivo de agregar todos os dias do range na base, mesmo que sem registro de vendas:

Code Block
languagepy
df_vendas_ajustado = df_datas.join(df_vendas, ["data", "mes", "negocio"], "left")

Definindo um particionamento no dataframe df_vendas_ajustado, com o objetivo de posteriormente fazer uma soma acumuladas das vendas agrupada por negócio e mês:

Code Block
languagepy
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0))

Criando campo com soma acumulada, utilizando a partição criada anteriormente:

Code Block
languagepy
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part))

Criando campo com soma acumulada, utilizando a partição criada anteriormente:

Code Block
languagepy
df_vendas_acumuladas = df_vendas_ajustado.withColumn("receita_liquida_vendas_acumuladas", functions.sum("receita_liquida_vendas").over(part))

Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:

Code Block
languagepy
df_vendas_acumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_vendas_diarias_deca")

Então, é feita uma query, já aplicando algumas regras de negócio, na large do Redshift, com o objetivo de obter as metas:

Code Block
languagepy
query = "select data_competencia as mes, (case when codigo_setor_atividade = 'CS' then 'loucas' when codigo_setor_atividade = 'MS' then 'metais' when codigo_setor_atividade = '01' then 'revestimento' when codigo_setor_atividade = 'HY' then 'hydra' end) as negocio, tipo_meta, sum(valor_receita_liquida) as valor_meta from large.tb_metas_comercial_hierarquia_produto where data_competencia >= '2021-01-01' and codigo_setor_atividade in('CS', 'HY', 'MS', '01') and tipo_meta in('POA', 'PEV') group by 1, 2, 3 order by 1, 2, 3"
multiple_run_parameters = dbutils.notebook.entry_point.getCurrentBindings()
bucket_name = multiple_run_parameters["bucket_name"]
consulta_metas = redshift_to_dataframe(query = query, filename = "tb_metas_comercial_hierarquia_produto", bucket_name = bucket_name)

Renomeando colunas:

Code Block
languagepy
colunas_metas = ["mes", "negocio", "tipo_meta", "valor_meta_mes"]
df_metas = consulta_metas.toDF(*colunas_metas)

Próximo passo é definir uma função para cálculo de quantidade de dias dentro de cada mês, com o objetivo de posteriormente calcular a meta diárias através da mensal:

Code Block
languagepy
def numero_dias(data):
    mes = data.month
    ano = data.year
    #tratativa caso fevereiro (ano bissexto)
    cons = 0
    #todo ano divisível por 400 e ao mesmo tempo 100 é bissexto
    if ano % 400 == 0 and ano % 100 == 0:
        cons = 1
    #todo ano divisível por 4 é bissexto
    elif ano % 4 == 0:
        cons = 1
    else:
        cons = 0
    if mes == 2:
        return 28 + cons
    #meses com quantidade ímpar de dias
    impares = [1, 3, 5, 7, 8, 10, 12]
    if mes in impares:
        return 31
    return 30

Aplicando a função e calculando a meta diária:

Code Block
languagepy
numero_dias_udf = udf(lambda z: numero_dias(z), IntegerType())
df_metas = df_metas.withColumn("valor_meta_diaria", (col("valor_meta_mes") / col("numero_dias_mes")))   

Sobrescrevendo a tabela de datas na database indicadores_mercado, para ser utilizada em outra etapa do processo:

Code Block
languagepy
df_metas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_metas_mensais_deca")

Criando colunas para cada tipo de meta, PEV e POA:

Code Block
languagepy
df_metas_pev = df_metas.where(col("tipo_meta") == "PEV").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_pev").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_pev", functions.round("valor_meta_diaria_pev", 2))
df_metas_poa = df_metas.where(col("tipo_meta") == "POA").withColumnRenamed("valor_meta_diaria", "valor_meta_diaria_poa").drop("tipo_meta", "valor_meta_mes", "numero_dias_mes").withColumn("valor_meta_diaria_poa", functions.round("valor_meta_diaria_poa", 2))

Criando dataframe unindo as duas metas:

Code Block
languagepy
df_metas_final = df_metas_pev.join(df_metas_poa, ["mes", "negocio"])

Fazendo join entre a base de datas e a de metas:

Code Block
languagepy
df_datas_ajuste = spark.sql("select * from indicadores_mercado.tb_datas")
df_metas_ajustado = df_datas_ajuste.join(df_metas_final, ["mes", "negocio"], "left")

Por fim, criando a tabela final. Fazendo query na database para vendas e join com o dataframe de metas:

Code Block
languagepy
df_vendas_metas_deca = df_vendas.join(df_metas_ajustado, ["data", "mes", "negocio"])

Criando particionamento, para posteriormente calcular as metas acumuladas por mês e negócio:

Code Block
languagepy
part = (Window.partitionBy("negocio", "mes").orderBy("data").rangeBetween(Window.unboundedPreceding, 0))

Criando as metas acumuladas utilizando o particionamento:

Code Block
languagepy
df_vendas_metas_deca_acumuladas = df_vendas_metas_deca.withColumn("metas_acumuladas_pev", functions.sum("valor_meta_diaria_pev").over(part)).withColumn("metas_acumuladas_poa", functions.sum("valor_meta_diaria_poa").over(part))

Sobrescrevendo a tabela final na database indicadores_mercado:

Code Block
languagepy
df_vendas_metas_deca_acumuladas.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_vendas_deca")

19.3 Base Final:

col_name

data_type

data

date

mes

date

negocio

string

receita_liquida_vendas

double

receita_liquida_vendas_acumuladas

double

valor_meta_diaria_pev

double

valor_meta_diaria_poa

double

metas_acumuladas_pev

double

metas_acumuladas_poa

double