...
Database DEV do Redshift (dtx-deca-sellin.czcbob9woqfg.us-east-1.redshift.amazonaws.com), schema smalle tabela tb_ordem_pendente.
1.2 Transformação:
Fazendo uma query com um select de todos os campos da tabela origem:
...
Fazendo append na tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_carteira.write.mode("append").saveAsTable("indicadores_mercado.tb_indicadores_carteira_deca") |
1.3 Base Final:
col_name | data_type |
---|---|
data_referencia | date |
negocio | string |
valor_bloqueado | double |
valor_livre | double |
2. Indicadores Devolução Deca
Indicadores para pedidos em status de devolução para Deca.
2.1 Origem:
Database large e tabela tb_resultado_comercial.
2.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
Code Block | ||
---|---|---|
| ||
df_devolucoes_deca = spark.sql("""
select distinct
to_date(date_format(data_competencia, 'yyyy-MM-01')) as mes,
(case when codigo_setor_atividade == 'MS' then 'metais'
when codigo_setor_atividade == 'CS' then 'loucas'
when codigo_setor_atividade == 'HY' then 'hydra'
when codigo_setor_atividade == '01' then 'revestimento' end) as negocio,
sum(case when tipo_documento_venda == 'S2' then valor_receita_liquida else 0 end) as valor_estorno,
sum(case when status_ordem_venda == 'DEVOLUÇÃO' then abs(valor_receita_liquida) else 0 end) as valor_devolucao,
sum(case when status_ordem_venda == 'FATURAMENTO' then valor_receita_liquida else 0 end) as valor_faturamento
from large.tb_resultado_comercial
where data_competencia between '2019-01-01' and current_date() - 1
and codigo_setor_atividade in('MS', 'CS', 'HY', '01')
group by 1, 2
order by 1, 2 asc
""") |
Calculando o valor de devolução ajustado, conforme regra de negócio:
Code Block | ||
---|---|---|
| ||
df_devolucoes_deca = df_devolucoes_deca.withColumn("valor_devolucao_ajustado", col("valor_devolucao") - col("valor_estorno")) |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_devolucoes_deca.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_deca") |
2.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | double |
valor_devolucao | double |
3. Indicadores Devolução Madeira
Indicadores para pedidos em status de devolução para Madeira.
3.1 Origem:
Database analytics_prd e tabela custos_rem.
3.2 Transformação:
Utilizando o Spark SQL para query da tabela origem já aplicando regras de negócio:
Code Block | ||
---|---|---|
| ||
df_devolucao_madeira = spark.sql("""
select
perio as mes,
(case when prctr like '%MDP' then 'mdp'
when prctr like '%MDF' then 'mdf' else 'paineis' end) as negocio,
sum(case when fkart in('ZREB', 'ZROB') then abs((vv089) - (vv001 + vv002 + vv003 + vv004)) else 0 end) as valor_devolucao,
sum(case when fkart not in('ZREB', 'ZROB') then (vv089) - (vv001 + vv002 + vv003 + vv004) else 0 end) as valor_faturado
from analytics_prd.custos_rem
where spart == 'CH'
and (prctr like '%MDP' or prctr like '%MDF')
and perio >= 2019001
group by 1, 2
order by 1, 2
""") |
Sobrescrevendo a tabela final na database indicadores_mercado:
Code Block | ||
---|---|---|
| ||
df_devolucao_madeira.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("indicadores_mercado.tb_indicadores_devolucoes_madeira") |
3.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | decimal(29,2) |
valor_devolucao | decimal(29,2) |
4. Indicadores Financeiros
Indicadores financeiros disponibilizados pela Controladoria.
4.1 Origem:
A Controladoria atualiza mensalmente alguns arquivos Excel disponibilizados em um Sharepoint.
4.2 Transformação:
Foi desenvolvida uma função para fazer o download desses arquivos e gravar no diretório dbfs:
Code Block | ||
---|---|---|
| ||
def download_arquivo(arquivo_download, nome_arquivo):
ctx = ClientContext(site_sharepoint).with_credentials(UserCredential(usuario, senha))
web = ctx.load(ctx.web).execute_query()
response = File.open_binary(ctx, arquivo_download)
response.raise_for_status()
with open("/dbfs/FileStore/shared_uploads/arquivos_financeiros/" + nome_arquivo, "wb") as pasta:
pasta.write(response.content) |
Fazendo o download dos arquivos:
Code Block | ||
---|---|---|
| ||
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fechamento%20Gerencial%202022.xlsm", "Fechamento_Gerencial_2022.xlsm")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Fluxo%20de%20Caixa%20Oficial%202022.xlsx", "Fluxo_de_Caixa_Oficial_2022.xlsx")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20Consolidador.xlsx", "Forecast_3_9_2022.xlsx")
download_arquivo("/sites/RelatriosControladoria/Documentos%20Compartilhados/Report%20Book%20Diretoria/Forecast%20-%20Fluxo%20de%20Caixa%20Livre.xlsx", "FCST_2_10___Fluxo_de_Caixa_Livre_2022.xlsx") |
4.3 Base Final:
col_name | data_type |
---|---|
mes | date |
negocio | string |
valor_faturamento | decimal(29,2) |
valor_devolucao | decimal(29,2) |