Bad practices when writing features

PySpark offers many functions and methods for developing complex features out of any data.

It does not mean that all of them should be used.

Shortlist of forbidden functions

Explanations and alternatives

DataFrame.collect()

Explanation

Spark is built upon the idea of lazy evaluation meaning it doesn"t calculate until it"s necessary.

collect() is an action which means it triggers the calculation and thus breaks the lazy evaluation sequence. It also brings the whole DataFrame onto the driver which might fill its memory and crash it.

Alternative

If your code contains collect() it is most definitely not optimal, try to come up with a way, how to get the data which needs to be collected into the DataFrame lazily such as by joining the would be collected values to the rows that need it and using the values from there.

DataFrame.toPandas()

Explanation

toPandas() is very similar to collect(). It forces Spark to calculate and it brings the data to the driver.

Alternative

If your code uses toPandas() then try to rewriting the Pandas logic into PySpark. For example to perform row wise operations in Spark read this.

DataFrame.dropDuplicates()

Explanation

dropDuplicates() always keeps the first occurrence of a "unique" row and drops all subsequent duplicates of it. Therefore its outcome is dependent on the order of rows in a DataFrame. Order of rows depends on partitioning and other frequent operations such as join and cache.

Alternative

To produce replicable and testable code it is necessary NOT to use dropDuplicates() making the code deterministic. A good alternative can be using structs and groupBy allowing you to control how the remaining rows are selected.

DataFrame.union()

Explanation

union() doesn"t check if the columns are in the same order. It will just glue two DataFrames of the column size together.

Alternative

Therefore always use unionByName instead of union.

DataFrame.count()

Explanation

count() is an action which triggers calculation.

Alternative

To preserve the laziness of Spark, if you need to use the number of rows, just use f.count(f.lit(1)).

f.rank()

Explanation

f.rank() is a useful Window function. Nevertheless it can lead to some unexpected results.

rank assigns the same number to rows with equal values.

So if you use it in a combination with filter - df.filter(f.rank().over(window) == 1) it gives you multiple rows per rank == 1.

This leads to frequent usage of dropDuplicates to solve this issue.

Alternative

This algorithm can be better solved by using the solution of dropDuplicates.

Example:

In this case the most common city is picked based on the freq column when equal to 1 which can result in multiple rows per one client_id due to f.rank returning 1 for all rows with maximum freq.

Bad:

window_spec = Window.partitionBy(client_entity.id_column, client_entity.time_column).orderBy(f.col("freq").desc())

df_most_common_city = (
    card_transactions
    .groupBy("client_id", "cardtr_transaction_city")
    .agg(f.count("cardtr_transaction_city").alias("freq"),
         f.sum("cardtr_amount_czk").alias("transaction_city_volume")
        )
    .withColumn("freq_rank", f.rank().over(window_spec))
    .filter("freq_rank == 1")
    .dropDuplicates([client_entity.id_column])
    .select("client_id", "freq", "transaction_city_volume", f.col("cardtr_transaction_city").alias("most_common_city"))
)

# Result - the frequency is the maximum => 10, but the rest is based on the order of the rows 
#+---------+----+-----------------------+----------------+
#|client_id|freq|transaction_city_volume|most_common_city|
#+---------+----+-----------------------+----------------+
#|        1|  10|                   1000|            Brno|
#+---------+----+-----------------------+----------------+

The alternative version uses structs and their property of comparison as a tuple meaning (x1, x2, x3) <= (x4, x5, x6) only when (x1 <= x4) & (x2 <= x5) & (x3 <= x6). This gives us a deterministic result based on inequality of either freq, transaction_city_volume or if both are equal the alphabetical order of most_common_city.

Good:

df_most_common_city = (
    card_transactions
    .withColumn("freq_struct", f.struct(
        f.count("cardtr_transaction_city").alias("freq"),
        f.sum("cardtr_amount_czk").alias("transaction_city_volume"),
        f.col("cardtr_transaction_city").alias("most_common_city")
    )).groupBy(
        "client_id"
    ).agg(
        f.max(
            f.col("freq_struct")
        ).alias("max_freq_struct")
    ).select(
        "client_id", "max_freq_struct.*"
    )
)

# Result - the frequency is the maximum => 10 and the volume is maximum 
# and the city is always the first alphabetically if freq and volume are equal
#+---------+----+-----------------------+----------------+
#|client_id|freq|transaction_city_volume|most_common_city|
#+---------+----+-----------------------+----------------+
#|        1|  10|                   2000|           Praha|
#+---------+----+-----------------------+----------------+