Creating custom decorator function¶
First let's create a folder lib
inside the root of our project to contain the custom code.
We are going to create a table_stream_read
decorator to read a table as stream. Let's create a file called table_stream_read
.
Now we just need to ashere to the interface. The function must be decorated using @input_decorator_function
and it must contain a definition of a wrapper
function which it returns. Example code:
from daipecore.function.input_decorator_function import input_decorator_function
from injecta.container.ContainerInterface import ContainerInterface
from pyspark.sql import SparkSession
from datalakebundle.table.parameters.TableParametersManager import TableParametersManager
@input_decorator_function
def read_stream_table(identifier: str):
def wrapper(container: ContainerInterface):
table_parameters_manager: TableParametersManager = container.get(TableParametersManager)
table_parameters = table_parameters_manager.get_or_parse(identifier)
spark: SparkSession = container.get(SparkSession)
return spark.readStream.format("delta").table(table_parameters.full_table_name)
return wrapper
Usage¶
import daipe as dp
from __myproject__.lib.read_stream_table import read_stream_table
@dp.transformation(read_stream_table("bronze.steaming_events"))
@dp.table_overwrite("silver.tbl_loans")
def save(df: DataFrame):
return df.filter(f.col("type") == "new_loan").orderBy("LoanDate")