Objetivo: transferência de dados entre o sistema Gaia para o Vtex através de scripts python em Databricks, implementando a camada raw e gold de dados, bem como uma base de dados no Databricks que servirá como repositório de consultas para as mais diversas finalidades.
Localização: https://dbc-9570c746-ec11.cloud.databricks.com/?o=3606981051492833#folder/3278180759103619
Workflow: https://dbc-9570c746-ec11.cloud.databricks.com/?o=3606981051492833#job/list
Diagrama lógico:
Scripts para integração do Gaia com Vtex
Existem 4 scripts para importar e processar os arquivos Json originados pelo Gaia.
Os scripts seguem a lógica abaixo:
Existem 3 origens:
Deca,
Hydra
Madeira.
Todas no formato JSON.
Os scripts "loadJson_{source name}" , buscam os arquivos JSON em um bucket S3 e os importam dentro da base "Gaia" interna ao Databricks, mantendo a formatação do JSON.
São eles:
loadJson_Deca
loadJson_Hydra
loadJson_Madeira
Estes scripts geram respectivamente as tabelas:
gaia.deca2_raw
gaia.hydra_raw
gaia.madeira_raw
Modelo ER camada Raw:
Scripts:
1# Databricks notebook source
2# DBTITLE 1,Import LIBs
3import requests
4
5from pyspark import SparkContext
6from pyspark.sql import SQLContext, SparkSession, Row
7from pyspark.sql.types import *
8
9# COMMAND ----------
10
11# DBTITLE 1,Json URL
12url_DataLake_Deca = "https://dtx-mdm-download.s3.amazonaws.com/DTX-f2dd9e49-8608-4057-b9d1-fea9fa5c8795/v1/DataLake_Deca.json"
13
14# COMMAND ----------
15
16# DBTITLE 1,Download DataLake_Deca.json file
17response = requests.get(url_DataLake_Deca)
18open("/tmp/DataLake_Deca.json", "wb").write(response.content)
19
20# COMMAND ----------
21
22# DBTITLE 1,Send JSON file to DBFS
23dbutils.fs.cp("/tmp/DataLake_Deca.json", "dbfs:/FileStore/DataLake_Deca.json")
24
25# COMMAND ----------
26
27# DBTITLE 1,Read JSON file from DBFS
28df = spark.read.json('dbfs:/FileStore/DataLake_Deca.json')
29
30# COMMAND ----------
31
32# DBTITLE 1,Send JSON file to table into Gaia database
33df.write.mode("overwrite").saveAsTable("gaia.deca2_raw")
34
35# COMMAND ----------
1# Databricks notebook source
2# DBTITLE 1,Import LIBs
3import requests
4
5from pyspark import SparkContext
6from pyspark.sql import SQLContext, SparkSession, Row
7from pyspark.sql.types import *
8
9# COMMAND ----------
10
11# DBTITLE 1,Json URL
12url_DataLake_Hydra = "https://dtx-mdm-download.s3.amazonaws.com/DTX-f2dd9e49-8608-4057-b9d1-fea9fa5c8795/v1/DataLake_Hydra.json"
13
14# COMMAND ----------
15
16# DBTITLE 1,Download DataLake_Hydra.json file
17response = requests.get(url_DataLake_Hydra)
18open("/tmp/DataLake_Hydra.json", "wb").write(response.content)
19
20# COMMAND ----------
21
22# DBTITLE 1,Send JSON file to DBFS
23dbutils.fs.cp("tmp/DataLake_Hydra.json", "dbfs:/FileStore/DataLake_Hydra.json")
24
25# COMMAND ----------
26
27# DBTITLE 1,Read JSON file from DBFS
28df_DataLake_Hydra = spark.read.json('dbfs:/FileStore/DataLake_Hydra.json')
29
30# COMMAND ----------
31
32# DBTITLE 1,Send JSON file to table into Gaia database
33df_DataLake_Hydra.write.mode("overwrite").saveAsTable("gaia.hydra_raw")
34
35# COMMAND ----------
1# Databricks notebook source
2# DBTITLE 1,Import LIBs
3import requests
4
5from pyspark import SparkContext
6from pyspark.sql import SQLContext, SparkSession, Row
7from pyspark.sql.types import *
8
9# COMMAND ----------
10
11# DBTITLE 1,Json URL
12url_DataLake_Madeira = "https://dtx-mdm-download.s3.amazonaws.com/DTX-f2dd9e49-8608-4057-b9d1-fea9fa5c8795/v1/DataLake_Madeira.json"
13
14# COMMAND ----------
15
16# DBTITLE 1,Download DataLake_Madeira.json file
17response = requests.get(url_DataLake_Madeira)
18open("/tmp/DataLake_Madeira.json", "wb").write(response.content)
19
20# COMMAND ----------
21
22# DBTITLE 1,Send JSON file to DBFS
23dbutils.fs.cp("tmp/DataLake_Madeira.json", "dbfs:/FileStore/DataLake_Madeira.json")
24
25# COMMAND ----------
26
27# DBTITLE 1,Read JSON file from DBFS
28df = spark.read.json('dbfs:/FileStore/DataLake_Madeira.json')
29
30# COMMAND ----------
31
32# DBTITLE 1,Send JSON file to table into Gaia database
33df.write.mode("overwrite").saveAsTable("gaia.madeira_raw")
34
35# COMMAND ----------
O script "tables", processa as 3 origens, agora usando como origem a base de dados "Gaia", une estas origens gerando novas tabelas na base de dados, simplificando a estrutura JSON en tabelas.
São geradas as tabelas:
assets_documents
assets_images
attributes
documents
images
keys
parent
products
Estas tabelas são unidas pelas colunas SKU e brand.
Modelo ER:
1# Databricks notebook source
2# DBTITLE 1,Table GAIA.ATTRIBUTES
3# MAGIC %sql
4# MAGIC CREATE OR REPLACE TABLE gaia.attributes (
5# MAGIC SELECT q1.sku,
6# MAGIC INITCAP(q1.id) AS id,
7# MAGIC INITCAP(q1.name) AS name,
8# MAGIC INITCAP(q1.uom) AS uom,
9# MAGIC INITCAP(q1.value) AS value,
10# MAGIC INITCAP(q1.valueId) AS valueId,
11# MAGIC q1.dataAtualizacao
12# MAGIC FROM ( SELECT sku,
13# MAGIC id,
14# MAGIC name,
15# MAGIC inline(values),
16# MAGIC dataAtualizacao
17# MAGIC FROM ( SELECT sku,
18# MAGIC inline(attributes),
19# MAGIC dataAtualizacao
20# MAGIC FROM gaia.madeira_raw )
21# MAGIC UNION ALL
22# MAGIC SELECT sku,
23# MAGIC id,
24# MAGIC name,
25# MAGIC inline(values),
26# MAGIC dataAtualizacao
27# MAGIC FROM ( SELECT sku,
28# MAGIC inline(attributes),
29# MAGIC dataAtualizacao
30# MAGIC FROM gaia.hydra_raw )
31# MAGIC UNION ALL
32# MAGIC SELECT sku,
33# MAGIC id,
34# MAGIC name,
35# MAGIC inline(values),
36# MAGIC dataAtualizacao
37# MAGIC FROM ( SELECT sku,
38# MAGIC inline(attributes),
39# MAGIC dataAtualizacao
40# MAGIC FROM gaia.deca2_raw )
41# MAGIC ) q1
42# MAGIC ORDER BY 1,2,3,4,5,6
43# MAGIC )
44
45# COMMAND ----------
46
47# DBTITLE 1,Table GAIA.KEYS
48# MAGIC %sql
49# MAGIC CREATE OR REPLACE TABLE gaia.keys (
50# MAGIC SELECT sku,
51# MAGIC brand,
52# MAGIC inline(keys)
53# MAGIC FROM gaia.deca2_raw
54# MAGIC UNION ALL
55# MAGIC SELECT sku,
56# MAGIC brand,
57# MAGIC inline(keys)
58# MAGIC FROM gaia.madeira_raw
59# MAGIC UNION ALL
60# MAGIC SELECT sku,
61# MAGIC brand,
62# MAGIC inline(keys)
63# MAGIC FROM gaia.hydra_raw
64# MAGIC )
65
66# COMMAND ----------
67
68# DBTITLE 1,Table GAIA.PRODUCTS
69# MAGIC %sql
70# MAGIC CREATE OR REPLACE TABLE gaia.products (
71# MAGIC SELECT sku,
72# MAGIC brand,
73# MAGIC '' AS id,
74# MAGIC group,
75# MAGIC name,
76# MAGIC dataAtualizacao --, inline(keys)
77# MAGIC FROM gaia.deca2_raw
78# MAGIC UNION ALL
79# MAGIC SELECT sku,
80# MAGIC brand,
81# MAGIC '' AS id,
82# MAGIC '' AS group,
83# MAGIC name,
84# MAGIC dataAtualizacao --, inline(keys)
85# MAGIC FROM gaia.madeira_raw
86# MAGIC UNION ALL
87# MAGIC SELECT sku,
88# MAGIC brand,
89# MAGIC id,
90# MAGIC group,
91# MAGIC name,
92# MAGIC dataAtualizacao
93# MAGIC --, inline(keys)
94# MAGIC FROM gaia.hydra_raw
95# MAGIC )
96
97# COMMAND ----------
98
99# DBTITLE 1,Table GAIA.PARENT
100# MAGIC %sql
101# MAGIC CREATE OR REPLACE TABLE gaia.parent (
102# MAGIC SELECT sku,
103# MAGIC brand,
104# MAGIC parent.id as sku_id,
105# MAGIC parent.sku as sku_parent
106# MAGIC FROM gaia.deca2_raw
107# MAGIC UNION
108# MAGIC SELECT sku,
109# MAGIC brand,
110# MAGIC parent.id as sku_id,
111# MAGIC parent.sku as sku_parent
112# MAGIC FROM gaia.hydra_raw
113# MAGIC -- gaia.madeira_raw não tem parent
114# MAGIC )
115
116# COMMAND ----------
117
118# DBTITLE 1,Table GAIA.ASSETS_DOCUMENTS
119# MAGIC %sql
120# MAGIC CREATE OR REPLACE TABLE gaia.assets_documents (
121# MAGIC SELECT q3.sku,
122# MAGIC 'Deca' as brand,
123# MAGIC q3.assetPushLocation,
124# MAGIC q3.assetType,
125# MAGIC q3.id,
126# MAGIC q3.name,
127# MAGIC q3.uom,
128# MAGIC q3.value,
129# MAGIC q3.valueId,
130# MAGIC q3.doc_id,
131# MAGIC q3.doc_name,
132# MAGIC q3.referenceTypeId,
133# MAGIC q3.referenceTypeName,
134# MAGIC q3.type,
135# MAGIC q3.url,
136# MAGIC q3.version,
137# MAGIC q3.dataAtualizacao
138# MAGIC FROM ( SELECT q2.sku,
139# MAGIC q2.assetPushLocation,
140# MAGIC q2.assetType,
141# MAGIC q2.id1 as id,
142# MAGIC q2.name1 as name,
143# MAGIC inline(values),
144# MAGIC q2.id as doc_id,
145# MAGIC q2.name as doc_name,
146# MAGIC q2.referenceTypeId,
147# MAGIC q2.referenceTypeName,
148# MAGIC q2.type,
149# MAGIC q2.url,
150# MAGIC q2.version,
151# MAGIC q2.dataAtualizacao
152# MAGIC FROM ( SELECT q1.sku,
153# MAGIC q1.assetPushLocation,
154# MAGIC q1.assetType,
155# MAGIC inline(q1.attributes),
156# MAGIC q1.id as id1,
157# MAGIC q1.name as name1,
158# MAGIC q1.referenceTypeId,
159# MAGIC q1.referenceTypeName,
160# MAGIC q1.type,
161# MAGIC q1.url,
162# MAGIC q1.version,
163# MAGIC q1.dataAtualizacao
164# MAGIC FROM ( SELECT sku,
165# MAGIC inline(assets.documents),
166# MAGIC dataAtualizacao
167# MAGIC FROM gaia.deca2_raw
168# MAGIC ) q1
169# MAGIC ) q2
170# MAGIC )q3
171# MAGIC UNION ALL
172# MAGIC SELECT q3.sku,
173# MAGIC 'Hydra' as brand,
174# MAGIC q3.assetPushLocation,
175# MAGIC q3.assetType,
176# MAGIC q3.id,
177# MAGIC q3.name,
178# MAGIC q3.uom,
179# MAGIC q3.value,
180# MAGIC q3.valueId,
181# MAGIC q3.doc_id,
182# MAGIC q3.doc_name,
183# MAGIC q3.referenceTypeId,
184# MAGIC q3.referenceTypeName,
185# MAGIC CAST(q3.type AS STRING) as type,
186# MAGIC q3.url,
187# MAGIC q3.version,
188# MAGIC q3.dataAtualizacao
189# MAGIC FROM ( SELECT q2.sku,
190# MAGIC q2.assetPushLocation,
191# MAGIC q2.assetType,
192# MAGIC q2.id1 as id,
193# MAGIC q2.name1 as name,
194# MAGIC '' as uom,
195# MAGIC '' as value,
196# MAGIC '' as valueId,
197# MAGIC '' as doc_id,
198# MAGIC '' as doc_name,
199# MAGIC q2.referenceTypeId,
200# MAGIC q2.referenceTypeName,
201# MAGIC q2.type,
202# MAGIC q2.url,
203# MAGIC q2.version,
204# MAGIC q2.dataAtualizacao
205# MAGIC FROM ( SELECT q1.sku,
206# MAGIC '' AS assetPushLocation,
207# MAGIC q1.assetType,
208# MAGIC '' as attributes,
209# MAGIC '' as id1,
210# MAGIC q1.name as name1,
211# MAGIC '' as referenceTypeId,
212# MAGIC '' as referenceTypeName,
213# MAGIC q1.type,
214# MAGIC q1.url,
215# MAGIC '' as version,
216# MAGIC q1.dataAtualizacao
217# MAGIC FROM ( SELECT sku,
218# MAGIC inline(assets.documents),
219# MAGIC dataAtualizacao
220# MAGIC FROM gaia.hydra_raw
221# MAGIC ) q1
222# MAGIC ) q2
223# MAGIC ) q3
224# MAGIC )
225
226# COMMAND ----------
227
228# DBTITLE 1,Table ASSETS_IMAGES
229# MAGIC %sql
230# MAGIC CREATE OR REPLACE TABLE gaia.assets_images (
231# MAGIC SELECT q3.sku,
232# MAGIC 'Deca' as brand,
233# MAGIC q3.assetPushLocation,
234# MAGIC q3.assetType,
235# MAGIC q3.uom,
236# MAGIC q3.value,
237# MAGIC q3.valueId,
238# MAGIC q3.image_id,
239# MAGIC q3.image_name,
240# MAGIC q3.referenceTypeId,
241# MAGIC q3.referenceTypeName,
242# MAGIC q3.resize,
243# MAGIC q3.type,
244# MAGIC q3.url,
245# MAGIC q3.version,
246# MAGIC q3.dataAtualizacao
247# MAGIC FROM (
248# MAGIC SELECT q2.sku,
249# MAGIC q2.assetPushLocation,
250# MAGIC q2.assetType,
251# MAGIC q2.image_id,
252# MAGIC q2.image_name,
253# MAGIC inline(values),
254# MAGIC image_id,
255# MAGIC image_name,
256# MAGIC referenceTypeId,
257# MAGIC referenceTypeName,
258# MAGIC resize,
259# MAGIC type,
260# MAGIC url,
261# MAGIC version,
262# MAGIC dataAtualizacao
263# MAGIC FROM ( SELECT q1.sku,
264# MAGIC q1.assetPushLocation,
265# MAGIC q1.assetType,
266# MAGIC inline(attributes),
267# MAGIC q1.id as image_id,
268# MAGIC q1.name as image_name,
269# MAGIC q1.referenceTypeId,
270# MAGIC q1.referenceTypeName,
271# MAGIC q1.resize,
272# MAGIC q1.type,
273# MAGIC q1.url,
274# MAGIC q1.version,
275# MAGIC q1.dataAtualizacao
276# MAGIC FROM ( SELECT sku,
277# MAGIC inline(assets.images),
278# MAGIC dataAtualizacao
279# MAGIC FROM gaia.deca2_raw
280# MAGIC ) q1
281# MAGIC ) q2
282# MAGIC ) q3
283# MAGIC UNION ALL
284# MAGIC SELECT q1.sku,
285# MAGIC 'Hydra' as brand,
286# MAGIC '' as assetPushLocation,
287# MAGIC q1.assetType,
288# MAGIC '' as uom,
289# MAGIC '' as value,
290# MAGIC '' as valueId,
291# MAGIC '' as image_id,
292# MAGIC '' as image_name,
293# MAGIC '' as referenceTypeId,
294# MAGIC '' as referenceTypeName,
295# MAGIC q1.resize,
296# MAGIC CAST(q1.type as STRING) as type,
297# MAGIC q1.url,
298# MAGIC '' as version,
299# MAGIC q1.dataAtualizacao
300# MAGIC FROM ( SELECT sku,
301# MAGIC inline(assets.images),
302# MAGIC dataAtualizacao
303# MAGIC FROM gaia.hydra_raw
304# MAGIC ) q1
305# MAGIC )
306
307# COMMAND ----------
Workflows:
Os processos são executados atráves de um Job de controle chamado job_Load_Gaia_Database e especificado no arquivo job_Load_Gaia_Database.json.
1{
2 "job_id": 1075104663470440,
3 "creator_user_name": "fabiano.noble-ext@dex.co",
4 "run_as_user_name": "fabiano.noble-ext@dex.co",
5 "run_as_owner": true,
6 "settings": {
7 "name": "Load Gaia Database",
8 "email_notifications": {
9 "no_alert_for_skipped_runs": false
10 },
11 "timeout_seconds": 0,
12 "max_concurrent_runs": 1,
13 "tasks": [
14 {
15 "task_key": "Hydra_Json",
16 "notebook_task": {
17 "notebook_path": "/Shared/DataLakes/loadJson_Hydra",
18 "source": "WORKSPACE"
19 },
20 "existing_cluster_id": "0712-171233-wpo3k09y",
21 "timeout_seconds": 0,
22 "email_notifications": {}
23 },
24 {
25 "task_key": "Deca_Json",
26 "notebook_task": {
27 "notebook_path": "/Shared/DataLakes/loadJson_Deca",
28 "source": "WORKSPACE"
29 },
30 "existing_cluster_id": "0712-171233-wpo3k09y",
31 "timeout_seconds": 0,
32 "email_notifications": {}
33 },
34 {
35 "task_key": "Madeira_Json",
36 "notebook_task": {
37 "notebook_path": "/Shared/DataLakes/loadJson_Madeira",
38 "source": "WORKSPACE"
39 },
40 "existing_cluster_id": "0712-171233-wpo3k09y",
41 "timeout_seconds": 0,
42 "email_notifications": {}
43 },
44 {
45 "task_key": "Create_Tables",
46 "depends_on": [
47 {
48 "task_key": "Hydra_Json"
49 },
50 {
51 "task_key": "Deca_Json"
52 },
53 {
54 "task_key": "Madeira_Json"
55 }
56 ],
57 "notebook_task": {
58 "notebook_path": "/Shared/DataLakes/tables",
59 "source": "WORKSPACE"
60 },
61 "existing_cluster_id": "0712-171233-wpo3k09y",
62 "timeout_seconds": 0,
63 "email_notifications": {}
64 }
65 ],
66 "job_clusters": [
67 {
68 "job_cluster_key": "Gaia_vtex_job_cluster",
69 "new_cluster": {
70 "cluster_name": "",
71 "spark_version": "10.4.x-scala2.12",
72 "spark_conf": {
73 "spark.master": "local[*, 4]",
74 "spark.databricks.cluster.profile": "singleNode"
75 },
76 "aws_attributes": {
77 "first_on_demand": 1,
78 "availability": "SPOT_WITH_FALLBACK",
79 "zone_id": "us-east-1a",
80 "spot_bid_price_percent": 100
81 },
82 "node_type_id": "m5d.large",
83 "driver_node_type_id": "m5d.large",
84 "custom_tags": {
85 "ResourceClass": "SingleNode"
86 },
87 "spark_env_vars": {
88 "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
89 },
90 "enable_elastic_disk": true,
91 "data_security_mode": "LEGACY_SINGLE_USER_STANDARD",
92 "runtime_engine": "STANDARD",
93 "num_workers": 0
94 }
95 }
96 ],
97 "format": "MULTI_TASK"
98 },
99 "created_time": 1661229019026
100}
1// Databricks notebook source
2// MAGIC %md
3// MAGIC # Controle de carga em paralelo
4
5// COMMAND ----------
6
7// MAGIC %md
8// MAGIC ### Importa controle de paralelismo
9
10// COMMAND ----------
11
12// MAGIC %run "/Shared/DataLakes/parallel-notebooks"
13
14// COMMAND ----------
15
16// MAGIC %md
17// MAGIC ## Executa paralelismo
18
19// COMMAND ----------
20
21import scala.concurrent.ExecutionContext.Implicits.global
22import scala.concurrent.duration._
23import scala.language.postfixOps
24
25val n00 = parallelNotebook(NotebookData("/Shared/DataLakes/loadJson_Decaa", 0))
26val n01 = parallelNotebook(NotebookData("/Shared/DataLakes/loadJson_Hydra", 0))
27val n02 = parallelNotebook(NotebookData("/Shared/DataLakes/loadJson_Madeira", 0 ))
28
29val res = Future.sequence(List(n00,
30 n01,
31 n02
32 )
33 )
34
35Await.result(res, 7200 minutes)
36res.value
37
38// COMMAND ----------
1// Databricks notebook source
2// MAGIC %md
3// MAGIC Defines functions for running notebooks in parallel. This notebook allows you to define functions as if they
4// were in a library.
5
6// COMMAND ----------
7
8import scala.concurrent.{Future, Await}
9import scala.concurrent.duration._
10import scala.util.control.NonFatal
11
12// COMMAND ----------
13
14case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty[String, String])
15
16def parallelNotebooks(notebooks: Seq[NotebookData]): Future[Seq[String]] = {
17 import scala.concurrent.{Future, blocking, Await}
18 import java.util.concurrent.Executors
19 import scala.concurrent.ExecutionContext
20 import com.databricks.WorkflowException
21
22 val numNotebooksInParallel = 16
23 // If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
24 // This code limits the number of parallel notebooks.
25 implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
26 val ctx = dbutils.notebook.getContext()
27
28 Future.sequence(
29 notebooks.map { notebook =>
30 Future {
31 dbutils.notebook.setContext(ctx)
32 if (notebook.parameters.nonEmpty)
33 dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters)
34 else
35 dbutils.notebook.run(notebook.path, notebook.timeout)
36 }
37 .recover {
38// case NonFatal(e) => s"ERROR: ${e.getMessage}"
39 case NonFatal(e) => s"ERROR: ${notebook.path}"
40 }
41 }
42 )
43}
44
45def parallelNotebook(notebook: NotebookData): Future[String] = {
46 import scala.concurrent.{Future, blocking, Await}
47 import java.util.concurrent.Executors
48 import scala.concurrent.ExecutionContext.Implicits.global
49 import com.databricks.WorkflowException
50
51 val ctx = dbutils.notebook.getContext()
52 // The simplest interface we can have but doesn't
53 // have protection for submitting to many notebooks in parallel at once
54 Future {
55 dbutils.notebook.setContext(ctx)
56
57 if (notebook.parameters.nonEmpty)
58 dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters)
59 else
60 dbutils.notebook.run(notebook.path, notebook.timeout)
61
62 }
63 .recover {
64// case NonFatal(e) => s"ERROR: ${e.getMessage}"
65 case NonFatal(e) => s"ERROR: ${notebook.path}"
66 }
67}