Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
from pyspark.sql.functions import *

host = ""
dbname = ""
user = ""
password = ""
port = 5432

bucket_name = dbutils.secrets.get(scope = 'PROCESS_ETL_ANALYTICKS_DATABRICKS_API', key = 'BUCKET_TEMP_DATA_TRANSFER')

aws_access = dbutils.secrets.get(scope = 'AWS_SVC_S3_DATA_TRANSFER', key = 'ACCESS_KEY')
aws_secret = dbutils.secrets.get(scope = 'AWS_SVC_S3_DATA_TRANSFER', key = 'SECRET_KEY')
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret)

conn_string = f'jdbc:postgresql://{host}:{port}/{dbname}?user={user}&password={password}'

def query_postgres(select, nome_tabela):
    
    try:

        tempdir = f"s3a://{bucket_name}/Redshift/PRODUTOS-DIGITAIS/{tabela}"

        df = (spark.read.format("jdbc") 
                .option("url", conn_string) 
                .option("forward_spark_s3_credentials", "true") 
                .option("query", select) 
                .option("tempdir", tempdir) 
                .option("driver", "org.postgresql.Driver")
                .load())
        return df

    except Exception as ex:
        print(f"❌ Erro na leitura da query")
        print(ex)
        pass

Como acessar um banco de dados Sql Server

Code Block
# DBTITLE 1,Configuração SQL Server
host = dbutils.secrets.get(scope = 'DB_ETIQUETA_SQL_SERVER', key = 'HOST')
database = dbutils.secrets.get(scope = 'DB_ETIQUETA_SQL_SERVER', key = 'DATABASE')
password = dbutils.secrets.get(scope = 'DB_ETIQUETA_SQL_SERVER', key = 'PASS')
user = dbutils.secrets.get(scope = 'DB_ETIQUETA_SQL_SERVER', key = 'USER')

server_name = f"jdbc:sqlserver://{host}:1433"
url = server_name + ";" + "databaseName=" + database + ";encrypt=true;trustServerCertificate=true;"

TABLE_TO_WRITE = '<NOME DA TABELA>'

#READ TABLE
remote_table = (spark.read
  .format("jdbc")
  .option("url", url)
  .option("dbtable", TABLE_TO_WRITE)
  .option("user", user)
  .option("password", password)
  .load()
)

# WRITE TABLE
df.distinct().write.format("jdbc")\
    .mode("overwrite")\
    .option("truncate","true")\
    .option("url", url)\
    .option("dbtable", TABLE_TO_WRITE)\
    .option("user", user)\
    .option("password", password) \
    .option("schemaCheckEnabled", "false")\
    .save()