Delta Live tables DLT
# import relevant libraries
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
if you are using csv file , it will not use a schema. So we have to manually
mention the schema . But for parquet it is not needed as schema is associated with it.
Schema=StructType(StructField("cust_name" ,IntegerType()),StructField("cust_id" IntegerType()))
ingest data using autoloader
specify the cloud files format and other etails
cloud_file_option={
cloudFiles.format="CSV"
}
create delta live tables using incoming data from storage account location or catalog
@dlt.table (
name="dim_customer" #provide this same name in below function
description="provide a description"
)
def dim_customer()
df=spark.readStream.format("cloudfiles").options(**cloud_file_option).load("folder path where files stores")
df.withColumn("Processed_Time",date_format(current_timestamp,'YYYYMMDD'))
return df
now dim_customer is a streaming table
Now we can apply rules to check each fields conditions
checks={}
checks["check customer id is not null"]=(CUSTOMER_ID IS NOT NULL)
checks["customer name is not null"]=(Customer_Name IS NOT NULL)
assign the rules to variables
customer_id_check=checks["check customer id is not null"]
customer_name_check=checks["customer name is not null"]
create a view from above table with applying teh checks
@dlt.view(
name ="dim_customer_vw"
description="dim customer vw description"
)
def dim_customer_vw:
df=dlt.readStream(dlt.Dim_Customer)
@dlt.expect_all(checks)
df.withColumn("id_check",expr(customer_id_check))
df.withColumn("name_check",expr(customer_name_check))
retrun df
#create a bronze fnal table
@dlt.create_straming_table(
name="dim_customer_final"
description="escription"
)
@dlt.apply_changes (
target="dim_customer_final"
source="dim_customer_vw"
keys=["customer_id"]
sequence_by=col(customer_id)
ignore_null_updates=false
stored_as_scd_type="2"
track_histoery_coluumn_list=["Effective_date"] # this column will keep track of change in data using scd type
If you want to keep all the validating conditions on a single column then please try below code
ReplyDeletevariable_name="({0})".format(" and ".join( checks.values()))
and in teh view please use this variable in a column like
df=df.withcolumn("is_valid",F.expr(variable_name))