Time windows helper classes and functions

WindowedDataFrame

WindowedDataFrame(self, df: DataFrame, entity: Entity, time_column: str, time_windows: List[str]) -> WindowedDataFrame

DataFrame which allows for time_windowed calculations

  • df : DataFrame, input DataFrame
  • entity : Entity, the feature store Entity instance, such as client
  • time_column : str, name of Date or Timestamp Column in df, which is subtracted from the run_date to create the time window interval
  • time_windows: List[str], list of time windows as a [0-9]+[dhw], suffixes d = days, h = hours, w = weeks

Methods:

time_windowed

time_windowed(self, agg_columns_function: Callable[[str], List[WindowedColumn]] = lambda x: list(), non_agg_columns_function: Callable[[str], List[Column]] = lambda x: list(), extra_group_keys: List[str] = [], unnest_structs: bool = False) -> WindowedDataFrame

Returns a new WindowedDataFrame with calculated aggregated and non aggregated columns

  • agg_columns_function: Callable[[str], List[WindowedColumn]] = lambda x: list(): Function which takes time_window: str and returns List[WindowedColumn]
  • non_agg_columns_function: Callable[[str], List[Column]] = lambda x: list(): Function which takes time_window: str and returns List[Column]
  • extra_group_keys: List[str] = []: By default it groups by entity.primary_key, use extra_group_keys to add more columns to group by
  • unnest_structs: bool = False: if True, all structs will be expanded into regular columns

Example:

@dp.transformation(card_transactions)
def card_channel_country_features(wdf: WindowedDataFrame):
    def country_agg_features(time_window: str) -> List[tw.WindowedColumn]:
        return [
            tw.sum_windowed(
                f.col("cardtr_country").isin("CZ", "CZE").cast("integer"),
                f"card_tr_location_czech_count_{time_window}",
            ),
            tw.sum_windowed(
                (~f.col("cardtr_country").isin("CZ", "CZE")).cast("integer"),
                f"card_tr_location_abroad_count_{time_window}",
            ),
            tw.sum_windowed(
                f.when(
                    f.col("cardtr_country").isin("CZ", "CZE"),
                    f.col("cardtr_amount_czk"),
                ).otherwise(0),
                f"card_tr_location_czech_volume_{time_window}",
            ),
            tw.sum_windowed(
                f.when(
                    ~f.col("cardtr_country").isin("CZ", "CZE"),
                    f.col("cardtr_amount_czk"),
                ).otherwise(0),
                f"card_tr_location_abroad_volume_{time_window}",
            ),
        ]

    def flag_features(time_window: str) -> List[Column]:
        return [
            tw.column(
                (f.col(f"card_tr_location_abroad_count_{time_window}") > 0).cast(
                    "integer"
                ),
                f"card_tr_location_abroad_flag_{time_window}",
            )
        ]

    return wdf.time_windowed(country_agg_features, flag_features)

apply_per_time_windowed

apply_per_time_window(self, function: Callable[[WindowedDataFrame, str], DataFrame]) -> WindowedDataFrame

Apply user defined function per time_windows

  • function: Callable[[WindowedDataFrame, str], WindowedDataFrame]: Function which takes WindowedDataFrame and time_window: str and returns WindowedDataFrame

Example:

@dp.transformation(frequencies, display=False)
def frequencies_structs(wdf: WindowedDataFrame):
    def make_structs(wdf: WindowedDataFrame, time_window: str):
        return wdf.withColumn(
            f"values_{time_window}",
            f.struct(
                tw.column(
                    f.col(f"transaction_city_count_{time_window}"),
                    f"card_tr_location_city_most_common_count_{time_window}",
                ),
                tw.column(
                    f.col(f"transaction_city_volume_{time_window}"),
                    f"card_tr_location_city_most_common_volume_{time_window}",
                )
            ),
        )

    return wdf.apply_per_time_window(make_structs)

is_time_window

is_time_window(self, time_window: str) -> Column

Returns a boolean Column to indicate if a row in the desired time_window

  • time_window: str: time window as a [0-9]+[dhw], suffixes d = days, h = hours, w = weeks

get_windowed_column_list

get_windowed_column_list(self, column_names: List[str]) -> List[str]

Get windowed column names

  • column_names: List[str]: List of column names with a {time_window} placeholder, such as ["feature1_{time_window}", "feature2_{time_window}", "feature3_{time_window}"]

make_windowed

make_windowed(df: DataFrame, entity: Entity, time_column: str) -> WindowedDataFrame

Used for creating a WindowedDataFrame instance.

  • df : DataFrame, input DataFrame
  • entity : Entity, the feature store Entity instance, such as client
  • time_column : str, name of Date or Timestamp Column in df, which is subtracted from the run_date to create the time window interval

Note: - time_windows are read from a widget called time_windows

Example:

@dp.transformation(
    make_windowed(
        card_transactions,
        client_entity,
        "cardtr_process_date",
    ),
    display=False,
)
def card_transactions_with_time_windows(windowed_card_transactions: WindowedDataFrame):
    return windowed_card_transactions

WindowedColumn

WindowedColumn

Alias for type Callable[[str], Column]


column

column(name: str, col: Column) -> Column

Alias for col.alias(name)

  • name : name of the column
  • col : PySpark Column

Warning

  • No time window functionality: column is intended to only be used in non aggregated columns function, it does not handle time windows on its own.

most_common

most_common(name: str, *columns: Column): -> Column

Performs a most common element calculation based on the input columns

  • name : name of the column
  • columns : PySpark Column

Example:

This example code calculates most common cities based on the number of transactions conducted in the city.

The order of the argument columns determines the outcome. The most common columns is the maximum struct. The maximum struct si determined the same way the max function works in Python.

a = (10, -5, "Praha")
b = (11, -1000, "Zlín")
c = (10, -5, "Brno")

max(a, b, c)  # Result: (11, -1000, 'Zlín')
max(a, c)     # Result: (10, -5, "Praha")
When the number of transactions is 0 for all cities, the result is NULL.

@dp.transformation(city_amount, display=False)
def most_common_city(wdf: WindowedDataFrame):
    def most_common_features(time_window: str):
        return [
            tw.most_common(
                f"most_common_city_{time_window}",
                tw.column(
                    f"card_tr_location_city_most_common_count_{time_window}",
                    f.col(f"transaction_city_count_{time_window}")
                ),
                tw.column(
                    f"random_number_{time_window}",
                    f.hash(*wdf.primary_key)
                ),
                tw.column(
                    f"card_tr_location_city_most_common_{time_window}",
                    f.when(f.col(f"transaction_city_count_{time_window}") > 0, f.col("cardtr_transaction_city"))
                ),
            )
        ]

    return wdf.time_windowed(most_common_features, unnest_structs=True)

sum_windowed

sum_windowed(name: str, col: Column, default_value=None) -> WindowedColumn

Returns and aggregated WindowedColumn for the PySpark function f.sum

  • name : name of the column with a {time_window}
  • col : PySpark Column
  • default_value = None: value given to all rows not fitting in a current time_window

Example:

tw.sum_windowed(
    "card_tr_location_czech_count_14d",
    f.col("cardtr_country").isin("CZ", "CZE").cast("integer"),
)

count_windowed

count_windowed(name: str, col: Column, default_value=None) -> WindowedColumn

Returns and aggregated WindowedColumn for the PySpark function f.count

  • name : name of the column
  • col : PySpark Column
  • default_value = None: value given to all rows not fitting in a current time_window

count_distinct_windowed

count_distinct_windowed(name: str, cols: List[Column], default_value=None) ->WindowedColumn`

Returns and aggregated WindowedColumn for the PySpark function f.countDistinct

  • name : name of the column
  • cols : PySpark Column
  • default_value = None: value given to all rows not fitting in a current time_window

min_windowed

min_windowed(name: str, col: Column, default_value=None) -> WindowedColumn

Returns and aggregated WindowedColumn for the PySpark function f.min

  • name : name of the column
  • col : PySpark Column
  • default_value = None: value given to all rows not fitting in a current time_window

max_windowed

max_windowed(name: str, col: Column, default_value=None) -> WindowedColumn

Returns and aggregated WindowedColumn for the PySpark function f.max

  • name : name of the column
  • col : PySpark Column
  • default_value = None: value given to all rows not fitting in a current time_window

avg_windowed

avg_windowed(name: str, col: Column, default_value=None) -> WindowedColumn

Returns and aggregated WindowedColumn for the PySpark function f.avg

  • name : name of the column
  • col : PySpark Column
  • default_value = None: value given to all rows not fitting in a current time_window

mean_windowed

mean_windowed(name: str, col: Column, default_value=None) -> WindowedColumn

Returns and aggregated WindowedColumn for the PySpark function f.mean

  • name : name of the column
  • col : PySpark Column
  • default_value = None: value given to all rows not fitting in a current time_window

first_windowed

first_windowed(name: str, col: Column, default_value=None) -> WindowedColumn

Returns and aggregated WindowedColumn for the PySpark function f.first

  • name : name of the column
  • col : PySpark Column
  • default_value = None: value given to all rows not fitting in a current time_window