Objetivo: transferência de dados entre o sistema Gaia para o Vtex através de scripts python em Databricks, implementando a camada raw e gold de dados, bem como uma base de dados no Databricks que servirá como repositório de consultas para as mais diversas finalidades.
Localização: https://dbc-9570c746-ec11.cloud.databricks.com/?o=3606981051492833#folder/3278180759103619
Bitbucket: https://bitbucket.org/fabianoguidottinoble/gaia_to_vetex/src/7b4e698d4fbedff8cff0021fa87a6b59a380f321/?at=feature%2Fdev
Workflow: https://dbc-9570c746-ec11.cloud.databricks.com/?o=3606981051492833#job/list
Diagrama lógico:
Scripts para integração do Gaia com Vtex
Existem 4 scripts para importar e processar os arquivos Json originados pelo Gaia.
Os scripts seguem a lógica abaixo:
Existem 3 origens:
Todas no formato JSON.
Os scripts "loadJson_{source name}" , buscam os arquivos JSON em um bucket S3 e os importam dentro da base "Gaia" interna ao Databricks, mantendo a formatação do JSON.
São eles:
loadJson_Deca
loadJson_Hydra
loadJson_Madeira
Estes scripts geram respectivamente as tabelas:
gaia.deca2_raw
gaia.hydra_raw
gaia.madeira_raw
Modelo ER camada Raw:
Scripts:
loadJson_Deca.py
# Databricks notebook source
# DBTITLE 1,Import LIBs
import requests
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.sql.types import *
# COMMAND ----------
# DBTITLE 1,Json URL
url_DataLake_Deca = "https://dtx-mdm-download.s3.amazonaws.com/DTX-f2dd9e49-8608-4057-b9d1-fea9fa5c8795/v1/DataLake_Deca.json"
# COMMAND ----------
# DBTITLE 1,Download DataLake_Deca.json file
response = requests.get(url_DataLake_Deca)
open("/tmp/DataLake_Deca.json", "wb").write(response.content)
# COMMAND ----------
# DBTITLE 1,Send JSON file to DBFS
dbutils.fs.cp("/tmp/DataLake_Deca.json", "dbfs:/FileStore/DataLake_Deca.json")
# COMMAND ----------
# DBTITLE 1,Read JSON file from DBFS
df = spark.read.json('dbfs:/FileStore/DataLake_Deca.json')
# COMMAND ----------
# DBTITLE 1,Send JSON file to table into Gaia database
df.write.mode("overwrite").saveAsTable("gaia.deca2_raw")
# COMMAND ----------
loadJson_Hydra.py
# Databricks notebook source
# DBTITLE 1,Import LIBs
import requests
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.sql.types import *
# COMMAND ----------
# DBTITLE 1,Json URL
url_DataLake_Hydra = "https://dtx-mdm-download.s3.amazonaws.com/DTX-f2dd9e49-8608-4057-b9d1-fea9fa5c8795/v1/DataLake_Hydra.json"
# COMMAND ----------
# DBTITLE 1,Download DataLake_Hydra.json file
response = requests.get(url_DataLake_Hydra)
open("/tmp/DataLake_Hydra.json", "wb").write(response.content)
# COMMAND ----------
# DBTITLE 1,Send JSON file to DBFS
dbutils.fs.cp("tmp/DataLake_Hydra.json", "dbfs:/FileStore/DataLake_Hydra.json")
# COMMAND ----------
# DBTITLE 1,Read JSON file from DBFS
df_DataLake_Hydra = spark.read.json('dbfs:/FileStore/DataLake_Hydra.json')
# COMMAND ----------
# DBTITLE 1,Send JSON file to table into Gaia database
df_DataLake_Hydra.write.mode("overwrite").saveAsTable("gaia.hydra_raw")
# COMMAND ----------
loadJson_Madeira.py
# Databricks notebook source
# DBTITLE 1,Import LIBs
import requests
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.sql.types import *
# COMMAND ----------
# DBTITLE 1,Json URL
url_DataLake_Madeira = "https://dtx-mdm-download.s3.amazonaws.com/DTX-f2dd9e49-8608-4057-b9d1-fea9fa5c8795/v1/DataLake_Madeira.json"
# COMMAND ----------
# DBTITLE 1,Download DataLake_Madeira.json file
response = requests.get(url_DataLake_Madeira)
open("/tmp/DataLake_Madeira.json", "wb").write(response.content)
# COMMAND ----------
# DBTITLE 1,Send JSON file to DBFS
dbutils.fs.cp("tmp/DataLake_Madeira.json", "dbfs:/FileStore/DataLake_Madeira.json")
# COMMAND ----------
# DBTITLE 1,Read JSON file from DBFS
df = spark.read.json('dbfs:/FileStore/DataLake_Madeira.json')
# COMMAND ----------
# DBTITLE 1,Send JSON file to table into Gaia database
df.write.mode("overwrite").saveAsTable("gaia.madeira_raw")
# COMMAND ----------
O script "tables ", processa as 3 origens, agora usando como origem a base de dados "Gaia", une estas origens gerando novas tabelas na base de dados, simplificando a estrutura JSON en tabelas.
São geradas as tabelas:
assets_documents
assets_images
attributes
documents
images
keys
parent
products
Estas tabelas são unidas pelas colunas SKU e brand .
Modelo ER:
Workflows:
Os processos são executados atráves de um Job de controle chamado job_Load_Gaia_Database e especificado no arquivo job_Load_Gaia_Database.json .
job_Load_Gaia_Database.json