Delta Live tables DLT

 # import relevant libraries

import dlt

from pyspark.sql.functions import *

from pyspark.sql.types import *



if you are using csv file , it will not use a schema. So we have to manually

mention the schema . But for parquet it is not needed as schema is associated with it.


Schema=StructType(StructField("cust_name" ,IntegerType()),StructField("cust_id" IntegerType()))


ingest data using autoloader


specify the cloud files format and other etails


cloud_file_option={

cloudFiles.format="CSV"

}

create delta live tables using incoming data from storage account location or catalog


@dlt.table (

name="dim_customer" #provide this same name in below function 

description="provide a description"

)

def dim_customer()

df=spark.readStream.format("cloudfiles").options(**cloud_file_option).load("folder path where files stores")

df.withColumn("Processed_Time",date_format(current_timestamp,'YYYYMMDD'))

return df

now  dim_customer is a streaming table 


Now we can apply rules to check each fields conditions


checks={}

checks["check customer id is not null"]=(CUSTOMER_ID IS NOT NULL)

checks["customer name is not null"]=(Customer_Name IS NOT NULL)


assign the rules to variables


customer_id_check=checks["check customer id is not null"]

customer_name_check=checks["customer name is not null"]



create a view from above table with applying teh checks


@dlt.view( 

 name ="dim_customer_vw"

 description="dim customer vw description"

 )

 def dim_customer_vw:

 df=dlt.readStream(dlt.Dim_Customer)

 @dlt.expect_all(checks)

 df.withColumn("id_check",expr(customer_id_check))

 df.withColumn("name_check",expr(customer_name_check))

 retrun df

 

 #create a bronze fnal table

 

 @dlt.create_straming_table(

 name="dim_customer_final"

 description="escription"

 )

 

 @dlt.apply_changes (

 target="dim_customer_final"

 source="dim_customer_vw"

 keys=["customer_id"]

 sequence_by=col(customer_id)

 ignore_null_updates=false

 stored_as_scd_type="2"

 track_histoery_coluumn_list=["Effective_date"]  # this column will keep track of change in data using scd type 

 

 

 


Comments

  1. If you want to keep all the validating conditions on a single column then please try below code
    variable_name="({0})".format(" and ".join( checks.values()))

    and in teh view please use this variable in a column like

    df=df.withcolumn("is_valid",F.expr(variable_name))

    ReplyDelete

Post a Comment

Popular posts from this blog

Introduction To Oracle10g

Insert

Except