/
Ingestão Gaia

Objetivo: transferência de dados entre o sistema Gaia para o Vtex através de scripts python em Databricks, implementando a camada raw e gold de dados, bem como uma base de dados no Databricks que servirá como repositório de consultas para as mais diversas finalidades.

Localização: https://dbc-9570c746-ec11.cloud.databricks.com/?o=3606981051492833#folder/3278180759103619

Bitbucket: https://bitbucket.org/fabianoguidottinoble/gaia_to_vetex/src/7b4e698d4fbedff8cff0021fa87a6b59a380f321/?at=feature%2Fdev

Workflow: https://dbc-9570c746-ec11.cloud.databricks.com/?o=3606981051492833#job/list


Diagrama lógico:

Scripts para integração do Gaia com Vtex

Existem 4 scripts para importar e processar os arquivos Json originados pelo Gaia.

Os scripts seguem a lógica abaixo:

Existem 3 origens:

  • Deca,

  • Hydra

  • Madeira.

Todas no formato JSON.

Os scripts "loadJson_{source name}" , buscam os arquivos JSON em um bucket S3 e os importam dentro da base "Gaia" interna ao Databricks, mantendo a formatação do JSON.

São eles:

  • loadJson_Deca

  • loadJson_Hydra

  • loadJson_Madeira

Estes scripts geram respectivamente as tabelas:

  • gaia.deca2_raw

  • gaia.hydra_raw

  • gaia.madeira_raw


Modelo ER camada Raw:

Scripts:

1# Databricks notebook source 2# DBTITLE 1,Import LIBs 3import requests 4 5from pyspark import SparkContext 6from pyspark.sql import SQLContext, SparkSession, Row 7from pyspark.sql.types import * 8 9# COMMAND ---------- 10 11# DBTITLE 1,Json URL 12url_DataLake_Deca = "https://dtx-mdm-download.s3.amazonaws.com/DTX-f2dd9e49-8608-4057-b9d1-fea9fa5c8795/v1/DataLake_Deca.json" 13 14# COMMAND ---------- 15 16# DBTITLE 1,Download DataLake_Deca.json file 17response = requests.get(url_DataLake_Deca) 18open("/tmp/DataLake_Deca.json", "wb").write(response.content) 19 20# COMMAND ---------- 21 22# DBTITLE 1,Send JSON file to DBFS 23dbutils.fs.cp("/tmp/DataLake_Deca.json", "dbfs:/FileStore/DataLake_Deca.json") 24 25# COMMAND ---------- 26 27# DBTITLE 1,Read JSON file from DBFS 28df = spark.read.json('dbfs:/FileStore/DataLake_Deca.json') 29 30# COMMAND ---------- 31 32# DBTITLE 1,Send JSON file to table into Gaia database 33df.write.mode("overwrite").saveAsTable("gaia.deca2_raw") 34 35# COMMAND ----------
1# Databricks notebook source 2# DBTITLE 1,Import LIBs 3import requests 4 5from pyspark import SparkContext 6from pyspark.sql import SQLContext, SparkSession, Row 7from pyspark.sql.types import * 8 9# COMMAND ---------- 10 11# DBTITLE 1,Json URL 12url_DataLake_Hydra = "https://dtx-mdm-download.s3.amazonaws.com/DTX-f2dd9e49-8608-4057-b9d1-fea9fa5c8795/v1/DataLake_Hydra.json" 13 14# COMMAND ---------- 15 16# DBTITLE 1,Download DataLake_Hydra.json file 17response = requests.get(url_DataLake_Hydra) 18open("/tmp/DataLake_Hydra.json", "wb").write(response.content) 19 20# COMMAND ---------- 21 22# DBTITLE 1,Send JSON file to DBFS 23dbutils.fs.cp("tmp/DataLake_Hydra.json", "dbfs:/FileStore/DataLake_Hydra.json") 24 25# COMMAND ---------- 26 27# DBTITLE 1,Read JSON file from DBFS 28df_DataLake_Hydra = spark.read.json('dbfs:/FileStore/DataLake_Hydra.json') 29 30# COMMAND ---------- 31 32# DBTITLE 1,Send JSON file to table into Gaia database 33df_DataLake_Hydra.write.mode("overwrite").saveAsTable("gaia.hydra_raw") 34 35# COMMAND ----------
1# Databricks notebook source 2# DBTITLE 1,Import LIBs 3import requests 4 5from pyspark import SparkContext 6from pyspark.sql import SQLContext, SparkSession, Row 7from pyspark.sql.types import * 8 9# COMMAND ---------- 10 11# DBTITLE 1,Json URL 12url_DataLake_Madeira = "https://dtx-mdm-download.s3.amazonaws.com/DTX-f2dd9e49-8608-4057-b9d1-fea9fa5c8795/v1/DataLake_Madeira.json" 13 14# COMMAND ---------- 15 16# DBTITLE 1,Download DataLake_Madeira.json file 17response = requests.get(url_DataLake_Madeira) 18open("/tmp/DataLake_Madeira.json", "wb").write(response.content) 19 20# COMMAND ---------- 21 22# DBTITLE 1,Send JSON file to DBFS 23dbutils.fs.cp("tmp/DataLake_Madeira.json", "dbfs:/FileStore/DataLake_Madeira.json") 24 25# COMMAND ---------- 26 27# DBTITLE 1,Read JSON file from DBFS 28df = spark.read.json('dbfs:/FileStore/DataLake_Madeira.json') 29 30# COMMAND ---------- 31 32# DBTITLE 1,Send JSON file to table into Gaia database 33df.write.mode("overwrite").saveAsTable("gaia.madeira_raw") 34 35# COMMAND ----------


O script "tables", processa as 3 origens, agora usando como origem a base de dados "Gaia", une estas origens gerando novas tabelas na base de dados, simplificando a estrutura JSON en tabelas.

São geradas as tabelas:

  • assets_documents

  • assets_images

  • attributes

  • documents

  • images

  • keys

  • parent

  • products


Estas tabelas são unidas pelas colunas SKU e brand.


Modelo ER:


1# Databricks notebook source 2# DBTITLE 1,Table GAIA.ATTRIBUTES 3# MAGIC %sql 4# MAGIC CREATE OR REPLACE TABLE gaia.attributes ( 5# MAGIC SELECT q1.sku, 6# MAGIC INITCAP(q1.id) AS id, 7# MAGIC INITCAP(q1.name) AS name, 8# MAGIC INITCAP(q1.uom) AS uom, 9# MAGIC INITCAP(q1.value) AS value, 10# MAGIC INITCAP(q1.valueId) AS valueId, 11# MAGIC q1.dataAtualizacao 12# MAGIC FROM ( SELECT sku, 13# MAGIC id, 14# MAGIC name, 15# MAGIC inline(values), 16# MAGIC dataAtualizacao 17# MAGIC FROM ( SELECT sku, 18# MAGIC inline(attributes), 19# MAGIC dataAtualizacao 20# MAGIC FROM gaia.madeira_raw ) 21# MAGIC UNION ALL 22# MAGIC SELECT sku, 23# MAGIC id, 24# MAGIC name, 25# MAGIC inline(values), 26# MAGIC dataAtualizacao 27# MAGIC FROM ( SELECT sku, 28# MAGIC inline(attributes), 29# MAGIC dataAtualizacao 30# MAGIC FROM gaia.hydra_raw ) 31# MAGIC UNION ALL 32# MAGIC SELECT sku, 33# MAGIC id, 34# MAGIC name, 35# MAGIC inline(values), 36# MAGIC dataAtualizacao 37# MAGIC FROM ( SELECT sku, 38# MAGIC inline(attributes), 39# MAGIC dataAtualizacao 40# MAGIC FROM gaia.deca2_raw ) 41# MAGIC ) q1 42# MAGIC ORDER BY 1,2,3,4,5,6 43# MAGIC ) 44 45# COMMAND ---------- 46 47# DBTITLE 1,Table GAIA.KEYS 48# MAGIC %sql 49# MAGIC CREATE OR REPLACE TABLE gaia.keys ( 50# MAGIC SELECT sku, 51# MAGIC brand, 52# MAGIC inline(keys) 53# MAGIC FROM gaia.deca2_raw 54# MAGIC UNION ALL 55# MAGIC SELECT sku, 56# MAGIC brand, 57# MAGIC inline(keys) 58# MAGIC FROM gaia.madeira_raw 59# MAGIC UNION ALL 60# MAGIC SELECT sku, 61# MAGIC brand, 62# MAGIC inline(keys) 63# MAGIC FROM gaia.hydra_raw 64# MAGIC ) 65 66# COMMAND ---------- 67 68# DBTITLE 1,Table GAIA.PRODUCTS 69# MAGIC %sql 70# MAGIC CREATE OR REPLACE TABLE gaia.products ( 71# MAGIC SELECT sku, 72# MAGIC brand, 73# MAGIC '' AS id, 74# MAGIC group, 75# MAGIC name, 76# MAGIC dataAtualizacao --, inline(keys) 77# MAGIC FROM gaia.deca2_raw 78# MAGIC UNION ALL 79# MAGIC SELECT sku, 80# MAGIC brand, 81# MAGIC '' AS id, 82# MAGIC '' AS group, 83# MAGIC name, 84# MAGIC dataAtualizacao --, inline(keys) 85# MAGIC FROM gaia.madeira_raw 86# MAGIC UNION ALL 87# MAGIC SELECT sku, 88# MAGIC brand, 89# MAGIC id, 90# MAGIC group, 91# MAGIC name, 92# MAGIC dataAtualizacao 93# MAGIC --, inline(keys) 94# MAGIC FROM gaia.hydra_raw 95# MAGIC ) 96 97# COMMAND ---------- 98 99# DBTITLE 1,Table GAIA.PARENT 100# MAGIC %sql 101# MAGIC CREATE OR REPLACE TABLE gaia.parent ( 102# MAGIC SELECT sku, 103# MAGIC brand, 104# MAGIC parent.id as sku_id, 105# MAGIC parent.sku as sku_parent 106# MAGIC FROM gaia.deca2_raw 107# MAGIC UNION 108# MAGIC SELECT sku, 109# MAGIC brand, 110# MAGIC parent.id as sku_id, 111# MAGIC parent.sku as sku_parent 112# MAGIC FROM gaia.hydra_raw 113# MAGIC -- gaia.madeira_raw não tem parent 114# MAGIC ) 115 116# COMMAND ---------- 117 118# DBTITLE 1,Table GAIA.ASSETS_DOCUMENTS 119# MAGIC %sql 120# MAGIC CREATE OR REPLACE TABLE gaia.assets_documents ( 121# MAGIC SELECT q3.sku, 122# MAGIC 'Deca' as brand, 123# MAGIC q3.assetPushLocation, 124# MAGIC q3.assetType, 125# MAGIC q3.id, 126# MAGIC q3.name, 127# MAGIC q3.uom, 128# MAGIC q3.value, 129# MAGIC q3.valueId, 130# MAGIC q3.doc_id, 131# MAGIC q3.doc_name, 132# MAGIC q3.referenceTypeId, 133# MAGIC q3.referenceTypeName, 134# MAGIC q3.type, 135# MAGIC q3.url, 136# MAGIC q3.version, 137# MAGIC q3.dataAtualizacao 138# MAGIC FROM ( SELECT q2.sku, 139# MAGIC q2.assetPushLocation, 140# MAGIC q2.assetType, 141# MAGIC q2.id1 as id, 142# MAGIC q2.name1 as name, 143# MAGIC inline(values), 144# MAGIC q2.id as doc_id, 145# MAGIC q2.name as doc_name, 146# MAGIC q2.referenceTypeId, 147# MAGIC q2.referenceTypeName, 148# MAGIC q2.type, 149# MAGIC q2.url, 150# MAGIC q2.version, 151# MAGIC q2.dataAtualizacao 152# MAGIC FROM ( SELECT q1.sku, 153# MAGIC q1.assetPushLocation, 154# MAGIC q1.assetType, 155# MAGIC inline(q1.attributes), 156# MAGIC q1.id as id1, 157# MAGIC q1.name as name1, 158# MAGIC q1.referenceTypeId, 159# MAGIC q1.referenceTypeName, 160# MAGIC q1.type, 161# MAGIC q1.url, 162# MAGIC q1.version, 163# MAGIC q1.dataAtualizacao 164# MAGIC FROM ( SELECT sku, 165# MAGIC inline(assets.documents), 166# MAGIC dataAtualizacao 167# MAGIC FROM gaia.deca2_raw 168# MAGIC ) q1 169# MAGIC ) q2 170# MAGIC )q3 171# MAGIC UNION ALL 172# MAGIC SELECT q3.sku, 173# MAGIC 'Hydra' as brand, 174# MAGIC q3.assetPushLocation, 175# MAGIC q3.assetType, 176# MAGIC q3.id, 177# MAGIC q3.name, 178# MAGIC q3.uom, 179# MAGIC q3.value, 180# MAGIC q3.valueId, 181# MAGIC q3.doc_id, 182# MAGIC q3.doc_name, 183# MAGIC q3.referenceTypeId, 184# MAGIC q3.referenceTypeName, 185# MAGIC CAST(q3.type AS STRING) as type, 186# MAGIC q3.url, 187# MAGIC q3.version, 188# MAGIC q3.dataAtualizacao 189# MAGIC FROM ( SELECT q2.sku, 190# MAGIC q2.assetPushLocation, 191# MAGIC q2.assetType, 192# MAGIC q2.id1 as id, 193# MAGIC q2.name1 as name, 194# MAGIC '' as uom, 195# MAGIC '' as value, 196# MAGIC '' as valueId, 197# MAGIC '' as doc_id, 198# MAGIC '' as doc_name, 199# MAGIC q2.referenceTypeId, 200# MAGIC q2.referenceTypeName, 201# MAGIC q2.type, 202# MAGIC q2.url, 203# MAGIC q2.version, 204# MAGIC q2.dataAtualizacao 205# MAGIC FROM ( SELECT q1.sku, 206# MAGIC '' AS assetPushLocation, 207# MAGIC q1.assetType, 208# MAGIC '' as attributes, 209# MAGIC '' as id1, 210# MAGIC q1.name as name1, 211# MAGIC '' as referenceTypeId, 212# MAGIC '' as referenceTypeName, 213# MAGIC q1.type, 214# MAGIC q1.url, 215# MAGIC '' as version, 216# MAGIC q1.dataAtualizacao 217# MAGIC FROM ( SELECT sku, 218# MAGIC inline(assets.documents), 219# MAGIC dataAtualizacao 220# MAGIC FROM gaia.hydra_raw 221# MAGIC ) q1 222# MAGIC ) q2 223# MAGIC ) q3 224# MAGIC ) 225 226# COMMAND ---------- 227 228# DBTITLE 1,Table ASSETS_IMAGES 229# MAGIC %sql 230# MAGIC CREATE OR REPLACE TABLE gaia.assets_images ( 231# MAGIC SELECT q3.sku, 232# MAGIC 'Deca' as brand, 233# MAGIC q3.assetPushLocation, 234# MAGIC q3.assetType, 235# MAGIC q3.uom, 236# MAGIC q3.value, 237# MAGIC q3.valueId, 238# MAGIC q3.image_id, 239# MAGIC q3.image_name, 240# MAGIC q3.referenceTypeId, 241# MAGIC q3.referenceTypeName, 242# MAGIC q3.resize, 243# MAGIC q3.type, 244# MAGIC q3.url, 245# MAGIC q3.version, 246# MAGIC q3.dataAtualizacao 247# MAGIC FROM ( 248# MAGIC SELECT q2.sku, 249# MAGIC q2.assetPushLocation, 250# MAGIC q2.assetType, 251# MAGIC q2.image_id, 252# MAGIC q2.image_name, 253# MAGIC inline(values), 254# MAGIC image_id, 255# MAGIC image_name, 256# MAGIC referenceTypeId, 257# MAGIC referenceTypeName, 258# MAGIC resize, 259# MAGIC type, 260# MAGIC url, 261# MAGIC version, 262# MAGIC dataAtualizacao 263# MAGIC FROM ( SELECT q1.sku, 264# MAGIC q1.assetPushLocation, 265# MAGIC q1.assetType, 266# MAGIC inline(attributes), 267# MAGIC q1.id as image_id, 268# MAGIC q1.name as image_name, 269# MAGIC q1.referenceTypeId, 270# MAGIC q1.referenceTypeName, 271# MAGIC q1.resize, 272# MAGIC q1.type, 273# MAGIC q1.url, 274# MAGIC q1.version, 275# MAGIC q1.dataAtualizacao 276# MAGIC FROM ( SELECT sku, 277# MAGIC inline(assets.images), 278# MAGIC dataAtualizacao 279# MAGIC FROM gaia.deca2_raw 280# MAGIC ) q1 281# MAGIC ) q2 282# MAGIC ) q3 283# MAGIC UNION ALL 284# MAGIC SELECT q1.sku, 285# MAGIC 'Hydra' as brand, 286# MAGIC '' as assetPushLocation, 287# MAGIC q1.assetType, 288# MAGIC '' as uom, 289# MAGIC '' as value, 290# MAGIC '' as valueId, 291# MAGIC '' as image_id, 292# MAGIC '' as image_name, 293# MAGIC '' as referenceTypeId, 294# MAGIC '' as referenceTypeName, 295# MAGIC q1.resize, 296# MAGIC CAST(q1.type as STRING) as type, 297# MAGIC q1.url, 298# MAGIC '' as version, 299# MAGIC q1.dataAtualizacao 300# MAGIC FROM ( SELECT sku, 301# MAGIC inline(assets.images), 302# MAGIC dataAtualizacao 303# MAGIC FROM gaia.hydra_raw 304# MAGIC ) q1 305# MAGIC ) 306 307# COMMAND ----------


Workflows:

Os processos são executados atráves de um Job de controle chamado job_Load_Gaia_Database e especificado no arquivo job_Load_Gaia_Database.json.


1{ 2 "job_id": 1075104663470440, 3 "creator_user_name": "fabiano.noble-ext@dex.co", 4 "run_as_user_name": "fabiano.noble-ext@dex.co", 5 "run_as_owner": true, 6 "settings": { 7 "name": "Load Gaia Database", 8 "email_notifications": { 9 "no_alert_for_skipped_runs": false 10 }, 11 "timeout_seconds": 0, 12 "max_concurrent_runs": 1, 13 "tasks": [ 14 { 15 "task_key": "Hydra_Json", 16 "notebook_task": { 17 "notebook_path": "/Shared/DataLakes/loadJson_Hydra", 18 "source": "WORKSPACE" 19 }, 20 "existing_cluster_id": "0712-171233-wpo3k09y", 21 "timeout_seconds": 0, 22 "email_notifications": {} 23 }, 24 { 25 "task_key": "Deca_Json", 26 "notebook_task": { 27 "notebook_path": "/Shared/DataLakes/loadJson_Deca", 28 "source": "WORKSPACE" 29 }, 30 "existing_cluster_id": "0712-171233-wpo3k09y", 31 "timeout_seconds": 0, 32 "email_notifications": {} 33 }, 34 { 35 "task_key": "Madeira_Json", 36 "notebook_task": { 37 "notebook_path": "/Shared/DataLakes/loadJson_Madeira", 38 "source": "WORKSPACE" 39 }, 40 "existing_cluster_id": "0712-171233-wpo3k09y", 41 "timeout_seconds": 0, 42 "email_notifications": {} 43 }, 44 { 45 "task_key": "Create_Tables", 46 "depends_on": [ 47 { 48 "task_key": "Hydra_Json" 49 }, 50 { 51 "task_key": "Deca_Json" 52 }, 53 { 54 "task_key": "Madeira_Json" 55 } 56 ], 57 "notebook_task": { 58 "notebook_path": "/Shared/DataLakes/tables", 59 "source": "WORKSPACE" 60 }, 61 "existing_cluster_id": "0712-171233-wpo3k09y", 62 "timeout_seconds": 0, 63 "email_notifications": {} 64 } 65 ], 66 "job_clusters": [ 67 { 68 "job_cluster_key": "Gaia_vtex_job_cluster", 69 "new_cluster": { 70 "cluster_name": "", 71 "spark_version": "10.4.x-scala2.12", 72 "spark_conf": { 73 "spark.master": "local[*, 4]", 74 "spark.databricks.cluster.profile": "singleNode" 75 }, 76 "aws_attributes": { 77 "first_on_demand": 1, 78 "availability": "SPOT_WITH_FALLBACK", 79 "zone_id": "us-east-1a", 80 "spot_bid_price_percent": 100 81 }, 82 "node_type_id": "m5d.large", 83 "driver_node_type_id": "m5d.large", 84 "custom_tags": { 85 "ResourceClass": "SingleNode" 86 }, 87 "spark_env_vars": { 88 "PYSPARK_PYTHON": "/databricks/python3/bin/python3" 89 }, 90 "enable_elastic_disk": true, 91 "data_security_mode": "LEGACY_SINGLE_USER_STANDARD", 92 "runtime_engine": "STANDARD", 93 "num_workers": 0 94 } 95 } 96 ], 97 "format": "MULTI_TASK" 98 }, 99 "created_time": 1661229019026 100}
1// Databricks notebook source 2// MAGIC %md 3// MAGIC # Controle de carga em paralelo 4 5// COMMAND ---------- 6 7// MAGIC %md 8// MAGIC ### Importa controle de paralelismo 9 10// COMMAND ---------- 11 12// MAGIC %run "/Shared/DataLakes/parallel-notebooks" 13 14// COMMAND ---------- 15 16// MAGIC %md 17// MAGIC ## Executa paralelismo 18 19// COMMAND ---------- 20 21import scala.concurrent.ExecutionContext.Implicits.global 22import scala.concurrent.duration._ 23import scala.language.postfixOps 24 25val n00 = parallelNotebook(NotebookData("/Shared/DataLakes/loadJson_Decaa", 0)) 26val n01 = parallelNotebook(NotebookData("/Shared/DataLakes/loadJson_Hydra", 0)) 27val n02 = parallelNotebook(NotebookData("/Shared/DataLakes/loadJson_Madeira", 0 )) 28 29val res = Future.sequence(List(n00, 30 n01, 31 n02 32 ) 33 ) 34 35Await.result(res, 7200 minutes) 36res.value 37 38// COMMAND ----------
1// Databricks notebook source 2// MAGIC %md 3// MAGIC Defines functions for running notebooks in parallel. This notebook allows you to define functions as if they 4// were in a library. 5 6// COMMAND ---------- 7 8import scala.concurrent.{Future, Await} 9import scala.concurrent.duration._ 10import scala.util.control.NonFatal 11 12// COMMAND ---------- 13 14case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String]) 15 16def parallelNotebooks(notebooks: Seq[NotebookData]): Future[Seq[String]] = { 17 import scala.concurrent.{Future, blocking, Await} 18 import java.util.concurrent.Executors 19 import scala.concurrent.ExecutionContext 20 import com.databricks.WorkflowException 21 22 val numNotebooksInParallel = 16 23 // If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once. 24 // This code limits the number of parallel notebooks. 25 implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel)) 26 val ctx = dbutils.notebook.getContext() 27 28 Future.sequence( 29 notebooks.map { notebook => 30 Future { 31 dbutils.notebook.setContext(ctx) 32 if (notebook.parameters.nonEmpty) 33 dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters) 34 else 35 dbutils.notebook.run(notebook.path, notebook.timeout) 36 } 37 .recover { 38// case NonFatal(e) => s"ERROR: ${e.getMessage}" 39 case NonFatal(e) => s"ERROR: ${notebook.path}" 40 } 41 } 42 ) 43} 44 45def parallelNotebook(notebook: NotebookData): Future[String] = { 46 import scala.concurrent.{Future, blocking, Await} 47 import java.util.concurrent.Executors 48 import scala.concurrent.ExecutionContext.Implicits.global 49 import com.databricks.WorkflowException 50 51 val ctx = dbutils.notebook.getContext() 52 // The simplest interface we can have but doesn't 53 // have protection for submitting to many notebooks in parallel at once 54 Future { 55 dbutils.notebook.setContext(ctx) 56 57 if (notebook.parameters.nonEmpty) 58 dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters) 59 else 60 dbutils.notebook.run(notebook.path, notebook.timeout) 61 62 } 63 .recover { 64// case NonFatal(e) => s"ERROR: ${e.getMessage}" 65 case NonFatal(e) => s"ERROR: ${notebook.path}" 66 } 67}