PySpark é uma linguagem que permite aos usuários interagir com um back-end Apache Spark para processar dados rapidamente.
O Spark pode operar em grandes conjuntos de dados em uma rede distribuída de servidores, oferecendo grandes benefícios de desempenho e confiabilidade quando utilizado corretamente.
Este guia apresenta situações comuns que encontramos e as práticas recomendadas associadas.
Prefira a seleção de coluna implícita para acesso direto, exceto para desambiguação
# ruim df = df.select(F.lower(df1.colA). F.upper(df2.colB)) # bom df = df.select(F.lower(F.col('colA')), F.upper(F.col('colB'))) # melhor - desde Spark 3.0 df = df.select(F.lower('colA'), F.upper('colB'))
Na maioria das situações, é melhor evitar o primeiro e segundo estilos e apenas referenciar a coluna pelo nome, usando uma string, como no terceiro exemplo. O Spark 3.0 expandiu bastante os casos em que o terceiro estilo funciona. Quando o método string não é possível, entretanto, devemos recorrer a uma abordagem mais detalhada.
Se o nome da variável do dataframe for grande, as expressões que o envolvem rapidamente se tornarão complicadas;
Se o nome da coluna tiver um espaço ou outro caractere não suportado, o operador colchete deverá ser usado. Isso gera inconsistência, e df1['colA'] é tão difícil de escrever quanto F.col('colA');
As expressões de coluna envolvendo o dataframe não são reutilizáveis e não podem ser usadas para definir funções abstratas;
Renomear uma variável de dataframe pode ser propenso a erros, pois todas as referências de coluna devem ser atualizadas em conjunto.
Além disso, a sintaxe de ponto (".") incentiva o uso de nomes de variáveis curtos e não descritivos para os dfs.
Por outro lado, F.col('colA') sempre fará referência a uma coluna designada no dataframe que está sendo operado, denominado df neste caso.
Refatorar operações lógicas complexas
As operações lógicas, que geralmente residem dentro de .filter() ou F.when(), precisam ser legíveis. Aplicamos a mesma regra das funções de encadeamento, mantendo as expressões lógicas dentro do mesmo bloco de código em três (3) expressões no máximo. Se as operações crescem demais, muitas vezes é um sinal de que o código pode ser simplificado ou extraído. Extrair operações lógicas complexas em variáveis torna o código mais fácil de ler e raciocinar, o que também reduz bugs.
# ruim F.when( (F.col('prod_status') == 'Delivered') | (((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('currentRegistration') != '') | ((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('originalOperator') != '') | (F.col('currentOperator') != '')))))), 'In Service')
O código acima pode ser simplificado de diversas maneiras. Para começar, concentre-se em agrupar as etapas lógicas em algumas variáveis nomeadas. O Pyspark requer que as expressões sejam colocadas entre parênteses. Isso, misturado com os parênteses utilizados para agrupar as operações lógicas pode prejudicar a legibilidade. O código acima tem uma redundância de difícil detecção dado a ilegibilidade do código :
(F.datediff(df.deliveryDate_actual, df.current_date) < 0)
# melhor has_operator = ((F.col('originalOperator') != '') | (F.col('currentOperator') != '')) delivery_date_passed = (F.datediff('deliveryDate_actual', 'current_date') < 0) has_registration = (F.col('currentRegistration').rlike('.+')) is_delivered = (F.col('prod_status') == 'Delivered') F.when(is_delivered | (delivery_date_passed & (has_registration | has_operator)), 'In Service')
O exemplo acima elimina a expressão redundante e é mais fácil de ler, podemos melhorá-lo ainda mais reduzindo o número de operações.
# bom has_operator = ((F.col('originalOperator') != '') | (F.col('currentOperator') != '')) delivery_date_passed = (F.datediff('deliveryDate_actual', 'current_date') < 0) has_registration = (F.col('currentRegistration').rlike('.+')) is_delivered = (F.col('prod_status') == 'Delivered') is_active = (has_registration | has_operator) F.when(is_delivered | (delivery_date_passed & is_active), 'In Service')
Observe como a expressão F.when agora é sucinta e legível e o comportamento desejado é claro para qualquer pessoa que revise este código.
Instruções Select
Realizar o select() no começo de uma transformação PySpark é considerado uma boa prática, expressões envolvendo mais de um df, ou operações condicionais como .when() são desencorajadas de serem usadas em um select(), a menos que seja necessário por motivos de desempenho.
# ruim aircraft = aircraft.select( 'aircraft_id', 'aircraft_msn', F.col('aircraft_registration').alias('registration'), 'aircraft_type', F.avg('staleness').alias('avg_staleness'), F.col('number_of_economy_seats').cast('long'), F.avg('flight_hours').alias('avg_flight_hours'), 'operator_code', F.col('number_of_business_seats').cast('long'), )
A menos que a ordem seja importante para você, tente agrupar operações do mesmo tipo.
# bom aircraft = aircraft.select( 'aircraft_id', 'aircraft_msn', 'aircraft_type', 'operator_code', F.col('aircraft_registration').alias('registration'), F.col('number_of_economy_seats').cast('long'), F.col('number_of_business_seats').cast('long'), F.avg('staleness').alias('avg_staleness'), F.avg('flight_hours').alias('avg_flight_hours'), )
A instrução select() redefine o esquema de um dataframe, portanto, naturalmente suporta a inclusão ou exclusão de colunas, antigas e novas, bem como a redefinição de colunas pré-existentes. Ao centralizar todas essas operações em uma única instrução, fica muito mais fácil identificar o esquema final, o que torna o código mais legível e conciso.
Em vez de utilizar withColumnRenamed(), use aliases:
# ruim df.select('key', 'comments').withColumnRenamed('comments', 'num_comments') # bom df.select('key', F.col('comments').alias('num_comments'))
Em vez de utilizar withColumn() para redefinir o tipo, converta no select:
# ruim df.select('comments').withColumn('comments', F.col('comments').cast('double')) # bom df.select(F.col('comments').cast('double'))
Colunas Vazias
Se você precisar adicionar uma coluna vazia para satisfazer um schema, sempre use F.lit(None) para preencher essa coluna. Nunca use uma string vazia ou alguma outra string sinalizando um valor vazio (como NA).
Além de ser semanticamente correto, uma razão prática para utilizar F.lit(None) é preservar a capacidade de usar utilitários como isNull, em vez de verificar strings vazias, nulos e NA, etc.
# ruim df = df.withColumn('foo', F.lit('')) # ruim df = df.withColumn('foo', F.lit('NA')) # bom df = df.withColumn('foo', F.lit(None))
Usando Comentários
Enquanto os comentários podem fornecer informações úteis no código, muitas vezes é mais valioso refatorar o código para melhorar sua legibilidade, o código deve ser legível por si mesmo.
Se você estiver usando comentários para explicar a lógica passo a passo, você de refatorar.
UDFS (user defined functions)
É altamente recomendável evitar o uso de UDFS em todas as situações, pois são drasticamente menos performáticas do que Pyspark nativo. Na maioria das situações a lógica que parece exigir uma UDF, pode ser refatorada para usar apenas funções nativas Pyspark.
Joins
Cuidado com os joins! Se você executar um left join e o lado direito tiver matches múltiplos para uma key, essa linha será duplicada quantas vezes houver correspondência. Sempre verifique suas suposições para ver se a chave na qual você está fazendo o join é única a menos que fvocê esteja esperando ao menos que esteja esperando a multiplicação de linhas.
Joins ruins são as fonte de múltiplos problemas difíceis de depurar. Algumas práticas podem ajudar como declarar o how explicitamente, mesmo se você estiver utilizando o join default (inner):
# ruim flights = flights.join(aircraft, 'aircraft_id') # ruim também flights = flights.join(aircraft, 'aircraft_id', 'inner') # bom flights = flights.join(aircraft, 'aircraft_id', how='inner')
Encadeamento de expressões
Evite encadear expressões em expressões de várias linhas com tipos diferentes, principalmente se tiverem comportamentos ou contextos diferentes. Por exemplo, misturando a criação de colunas ou fazendo joining com select e filter.
# ruim df = ( df .select('a', 'b', 'c', 'key') .filter(F.col('a') == 'truthiness') .withColumn('boverc', F.col('b') / F.col('c')) .join(df2, 'key', how='inner') .join(df3, 'key', how='left') .drop('c') ) # melhor (seperarando em steps) # primeiro: selecionamos e filtramos os dados que precisamos # segundo: criamos as colunas que precisamos # terceiro: joining com outros dfs df = ( df .select('a', 'b', 'c', 'key') .filter(F.col('a') == 'truthiness') ) df = df.withColumn('boverc', F.col('b') / F.col('c')) df = ( df .join(df2, 'key', how='inner') .join(df3, 'key', how='left') .drop('c') )
Ter cada grupo de expressões isolado em seu bloco de códico lógico melhora a legibilidade e torna mais fácil encontrar a lógica relevante.
# ruim df = ( df .select('foo', 'bar', 'foobar', 'abc') .filter(F.col('abc') == 123) .join(another_table, 'some_field') ) # melhor df = ( df .select('foo', 'bar', 'foobar', 'abc') .filter(F.col('abc') == 123) ) df = df.join(another_table, 'some_field', how='inner')
Para maiores informações utilize a documentação oficial da Apache Spark :
https://spark.apache.org/docs/latest/sql-programming-guide.html
0 Comments