Time windows helper classes and functions¶
WindowedDataFrame¶
WindowedDataFrame(self, df: DataFrame, entity: Entity, time_column: str, time_windows: List[str]
) -> WindowedDataFrame
DataFrame which allows for time_windowed calculations
df
: DataFrame, input DataFrameentity
: Entity, the feature storeEntity
instance, such asclient
time_column
: str, name ofDate
orTimestamp
Column indf
, which is subtracted from therun_date
to create the time window intervaltime_windows
:List[str]
, list of time windows as a[0-9]+[dhw]
, suffixesd
= days,h
= hours,w
= weeks
Methods:¶
time_windowed¶
time_windowed(self, agg_columns_function: Callable[[str], List[WindowedColumn]] = lambda x: list(), non_agg_columns_function: Callable[[str], List[Column]] = lambda x: list(), extra_group_keys: List[str] = [], unnest_structs: bool = False
) -> WindowedDataFrame
Returns a new WindowedDataFrame with calculated aggregated and non aggregated columns
agg_columns_function: Callable[[str], List[WindowedColumn]] = lambda x: list()
: Function which takestime_window: str
and returnsList[WindowedColumn]
non_agg_columns_function: Callable[[str], List[Column]] = lambda x: list()
: Function which takestime_window: str
and returnsList[Column]
extra_group_keys: List[str] = []
: By default it groups byentity.primary_key
, useextra_group_keys
to add more columns to group byunnest_structs: bool = False
: ifTrue
, all structs will be expanded into regular columns
Example:
@dp.transformation(card_transactions)
def card_channel_country_features(wdf: WindowedDataFrame):
def country_agg_features(time_window: str) -> List[tw.WindowedColumn]:
return [
tw.sum_windowed(
f.col("cardtr_country").isin("CZ", "CZE").cast("integer"),
f"card_tr_location_czech_count_{time_window}",
),
tw.sum_windowed(
(~f.col("cardtr_country").isin("CZ", "CZE")).cast("integer"),
f"card_tr_location_abroad_count_{time_window}",
),
tw.sum_windowed(
f.when(
f.col("cardtr_country").isin("CZ", "CZE"),
f.col("cardtr_amount_czk"),
).otherwise(0),
f"card_tr_location_czech_volume_{time_window}",
),
tw.sum_windowed(
f.when(
~f.col("cardtr_country").isin("CZ", "CZE"),
f.col("cardtr_amount_czk"),
).otherwise(0),
f"card_tr_location_abroad_volume_{time_window}",
),
]
def flag_features(time_window: str) -> List[Column]:
return [
tw.column(
(f.col(f"card_tr_location_abroad_count_{time_window}") > 0).cast(
"integer"
),
f"card_tr_location_abroad_flag_{time_window}",
)
]
return wdf.time_windowed(country_agg_features, flag_features)
apply_per_time_windowed¶
apply_per_time_window(self, function: Callable[[WindowedDataFrame, str], DataFrame]
) -> WindowedDataFrame
Apply user defined function per time_windows
function: Callable[[WindowedDataFrame, str], WindowedDataFrame]
: Function which takesWindowedDataFrame
andtime_window: str
and returnsWindowedDataFrame
Example:
@dp.transformation(frequencies, display=False)
def frequencies_structs(wdf: WindowedDataFrame):
def make_structs(wdf: WindowedDataFrame, time_window: str):
return wdf.withColumn(
f"values_{time_window}",
f.struct(
tw.column(
f.col(f"transaction_city_count_{time_window}"),
f"card_tr_location_city_most_common_count_{time_window}",
),
tw.column(
f.col(f"transaction_city_volume_{time_window}"),
f"card_tr_location_city_most_common_volume_{time_window}",
)
),
)
return wdf.apply_per_time_window(make_structs)
is_time_window¶
is_time_window(self, time_window: str
) -> Column
Returns a boolean
Column
to indicate if a row in the desiredtime_window
time_window: str
: time window as a[0-9]+[dhw]
, suffixesd
= days,h
= hours,w
= weeks
get_windowed_column_list¶
get_windowed_column_list(self, column_names: List[str]
) -> List[str]
Get windowed column names
column_names: List[str]
: List of column names with a{time_window}
placeholder, such as["feature1_{time_window}", "feature2_{time_window}", "feature3_{time_window}"]
make_windowed¶
make_windowed(df: DataFrame, entity: Entity, time_column: str
) -> WindowedDataFrame
Used for creating a WindowedDataFrame instance.
df
: DataFrame, input DataFrameentity
: Entity, the feature storeEntity
instance, such asclient
time_column
: str, name ofDate
orTimestamp
Column indf
, which is subtracted from therun_date
to create the time window interval
Note:
- time_windows
are read from a widget called time_windows
Example:
@dp.transformation(
make_windowed(
card_transactions,
client_entity,
"cardtr_process_date",
),
display=False,
)
def card_transactions_with_time_windows(windowed_card_transactions: WindowedDataFrame):
return windowed_card_transactions
WindowedColumn¶
WindowedColumn
Alias for type
Callable[[str], Column]
column¶
column(name: str, col: Column
) -> Column
Alias for
col.alias(name)
name
: name of the columncol
: PySparkColumn
Warning
- No time window functionality:
column
is intended to only be used in non aggregated columns function, it does not handle time windows on its own.
most_common¶
most_common(name: str, *columns: Column): -> Column
Performs a most common element calculation based on the input
columns
name
: name of the columncolumns
: PySparkColumn
Example:
This example code calculates most common cities based on the number of transactions conducted in the city.
The order of the argument columns
determines the outcome. The most common columns is the maximum struct.
The maximum struct si determined the same way the max
function works in Python.
a = (10, -5, "Praha")
b = (11, -1000, "Zlín")
c = (10, -5, "Brno")
max(a, b, c) # Result: (11, -1000, 'Zlín')
max(a, c) # Result: (10, -5, "Praha")
NULL
.
@dp.transformation(city_amount, display=False)
def most_common_city(wdf: WindowedDataFrame):
def most_common_features(time_window: str):
return [
tw.most_common(
f"most_common_city_{time_window}",
tw.column(
f"card_tr_location_city_most_common_count_{time_window}",
f.col(f"transaction_city_count_{time_window}")
),
tw.column(
f"random_number_{time_window}",
f.hash(*wdf.primary_key)
),
tw.column(
f"card_tr_location_city_most_common_{time_window}",
f.when(f.col(f"transaction_city_count_{time_window}") > 0, f.col("cardtr_transaction_city"))
),
)
]
return wdf.time_windowed(most_common_features, unnest_structs=True)
sum_windowed¶
sum_windowed(name: str, col: Column, default_value=None
) -> WindowedColumn
Returns and aggregated WindowedColumn for the PySpark function
f.sum
name
: name of the column with a{time_window}
col
: PySparkColumn
default_value = None
: value given to all rows not fitting in a currenttime_window
Example:
tw.sum_windowed(
"card_tr_location_czech_count_14d",
f.col("cardtr_country").isin("CZ", "CZE").cast("integer"),
)
count_windowed¶
count_windowed(name: str, col: Column, default_value=None
) -> WindowedColumn
Returns and aggregated WindowedColumn for the PySpark function
f.count
name
: name of the columncol
: PySparkColumn
default_value = None
: value given to all rows not fitting in a currenttime_window
count_distinct_windowed¶
count_distinct_windowed(name: str, cols: List[Column]
, default_value=None) ->
WindowedColumn`
Returns and aggregated WindowedColumn for the PySpark function
f.countDistinct
name
: name of the columncols
: PySparkColumn
default_value = None
: value given to all rows not fitting in a currenttime_window
min_windowed¶
min_windowed(name: str, col: Column, default_value=None
) -> WindowedColumn
Returns and aggregated WindowedColumn for the PySpark function
f.min
name
: name of the columncol
: PySparkColumn
default_value = None
: value given to all rows not fitting in a currenttime_window
max_windowed¶
max_windowed(name: str, col: Column, default_value=None
) -> WindowedColumn
Returns and aggregated WindowedColumn for the PySpark function
f.max
name
: name of the columncol
: PySparkColumn
default_value = None
: value given to all rows not fitting in a currenttime_window
avg_windowed¶
avg_windowed(name: str, col: Column, default_value=None
) -> WindowedColumn
Returns and aggregated WindowedColumn for the PySpark function
f.avg
name
: name of the columncol
: PySparkColumn
default_value = None
: value given to all rows not fitting in a currenttime_window
mean_windowed¶
mean_windowed(name: str, col: Column, default_value=None
) -> WindowedColumn
Returns and aggregated WindowedColumn for the PySpark function
f.mean
name
: name of the columncol
: PySparkColumn
default_value = None
: value given to all rows not fitting in a currenttime_window
first_windowed¶
first_windowed(name: str, col: Column, default_value=None
) -> WindowedColumn
Returns and aggregated WindowedColumn for the PySpark function
f.first
name
: name of the columncol
: PySparkColumn
default_value = None
: value given to all rows not fitting in a currenttime_window