Skip to content

PySpark

Boas Práticas

Imports

Evite poluir o manespace global do seu projeto de código

from pyspark.sql import functions as F
from pyspark.sql import types as T

Select

Evite usar o withColumnRenamed quando for possível.

RUIM:

df_output = df_input.withColumnRenamed("name", "fullname")

BOM:

df_output = df_input.select(
    F.col("name").alias("fullname")
    ...
)

Evite usar o withColumn quando for possível. Veja alguns exemplos de códigos bem interessantes.

RUIM:

df_output = df_input\
    .withColumn("months", F.round(F.months_between(F.col("end_date"), F.col("start_date"))))
    .withColumn("days_between", F.datediff(F.col("open_date"), F.col("close_date")))

BOM:

df_output = df_input.select(
    F.round(F.months_between(F.col("end_date"), F.col("start_date"))).alias("months"),
    F.datediff(F.col("open_date"), F.col("close_date")).alias("days_between")
)

Window

Para explorarmos alguns cenários utilizando-se da função Window do PySpark temos que gerar uma amostra de dados e para isso utilizaremos a biblioteca Faker do Python.

Gerando uma amostra de dados Faker

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
from faker import Faker

fake = Faker("pt-BR")

disciplinas = ["Matemática", "Português", "Geografia", "História", "Física", "Biologia"] 
anos = [2020, 2021, 2022, 2023, 2024]

schema = T.StructType([
    T.StructField("ID", T.StringType(), True),
    T.StructField("NOME", T.StringType(), True),
    T.StructField("ANO", T.IntegerType(), True),
    T.StructField("NOTA", T.FloatType(), True),
    T.StructField("DISCIPLINA", T.StringType(), True)
])

data = []

for _ in range(100):

    data.append([
        fake.md5(),
        fake.name(),
        random.choice(anos),
        round(random.uniform(0, 9.99), 2),
        random.choice(disciplinas)
    ])

df_base = spark.createDataFrame(data, schema=schema)

Retornar com a função Window a maiores notas por disciplina

win_spec_max = Window.partitionBy("DISCIPLINA").orderBy(F.col("NOTA").desc())

df_max_notas = df_base.withColumn("row_number", F.row_number().over(win_spec_max)) \
    .filter(F.col("row_number") == 1) \
    .drop("row_number")

df_max_notas.show()

Retornar com a função Window a menores notas por disciplina

win_spec_min = Window.partitionBy("DISCIPLINA").orderBy(F.col("NOTA").asc())

df_min_notas = df_base.withColumn("row_number", F.row_number().over(win_spec_min)) \
    .filter(F.col("row_number") == 1) \
    .drop("row_number")

df_min_notas.show()

Observação: Disponibilizei aqui

Funções Úteis

Ler arquivo csv e entregar dados em dataframe

def read_csv(path_filename: str) -> DataFrame:

    df = spark.read\
        .option("sep", ";")\
        .option("header", "true")\
        .option("encoding", "UTF-8")\
        .csv(path_filename)

    return df

Ler arquivo json e entregar dados em dataframe

def read_json(path_filename: str) -> DataFrame:

    df spark.read\
        .option("multiline", "true")\
        .option("encoding", "UTF-8")\
        .json(path_filename)

    return df

Retirar colunas duplicadas de um DataFrame

def drop_cols_duplicates(df: DataFrame) -> DataFrame:

    dup_cols = df.columns
    dup_cols = [col for col in dup_cols if df.select(col).distinct().count() == 1]
    df = df.drop(*dup_cols)

    return df

Unir vários dataframes

from functools import reduce

def union_all_dfs(*dfs)-> DataFrame:
    return reduce(Dataframe.unionAll, dfs)

Converter títulos de colunas para maiusculos

def uppercase_columns(df: DataFrame) -> DataFrame:

    for col in df.columns:
        df = df.withColumnRenamed(col, col.upper())
    return df

Converter títulos de colunas para letras minúsculas

def lowercase_columns(df: DataFrame) -> DataFrame:

    for col in df.columns:
        df = df.withColumnRenamed(col, col.lower())
    return df

Verificar valores nulos

def check_null_values(df: DataFrame) -> DataFrame:

    columns = []
    null_counts = []
    non_null_counts = []

    for col in df.columns:
        columns.append(col)
        null_counts.append(df.filter(F.col(col).isNull()).count())
        non_null_counts.append(df.filter(F.col(col).isNotNull()).count())

    df = spark.DataFrame(list(zip(columns, null_counts, non_null_counts)),
                        ["Coluna", "Contagem de Nulos", "Contagem de Não Nulos"])

    return df

Adicionar/Somar meses a uma data

def add_months_to_date(date_str: str, months: int) -> date:

    input_date = datetime.strftime(date_str, "%Y-%m-%d")
    future_date = input_date + timedelta(days=30 * months)
    return future_date.strftime("%Y-%m-%d")

Verificar registros duplicados a partir de colunas

df_output = df_input.groupBy("name", "email").count().filter("count > 1")
df_output.show(truncate=False)