Read Events From Event Hub and Put it to SQL Server and Delta Lake

 import json

import jsonpointer
from pyspark.sql.functions import lit,col

# Adding to delta lake
def write2deltatable(df1,tbl_name):
    try:
        
        df_final=df1.select(df1["body"])
        df_final.write.mode("append").format("delta").saveAsTable(tbl_name)
       
    except Exception as e:

        print(e)

#process dataframe to add data to sql server table
def write2sqltable(df1,tbl_name):
    try:
        #query fetch json pointer and target schema
        #system information
        serverName  = TokenLibrary.getSecret("kvidpresources","rio-assurance-server","lsidpkeyvault")
        databaseName = TokenLibrary.getSecret("kvidpresources","rio-assurance-database","lsidpkeyvault"
        userName = TokenLibrary.getSecret("kvidpresources","rio-assurance-db-user","lsidpkeyvault")
        password = TokenLibrary.getSecret("kvidpresources","rio-assurance-db-password","lsidpkeyvault")
        server_name = "jdbc:sqlserver://"+serverName
        database_name = databaseName
        url = server_name + ";" + "databaseName=" + database_name + ";"
        username = userName
        password = password 
        #fetch json pointer values and target schema details for flattening json string
        querySel = f"Select Sourcepointer,src_schema_name,tgt_schema_name,tgt_table from ctl.schema_mapping  where tableName='meta_control'"
        mappingDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("query", querySel) \
        .option("user", username) \
        .option("password", password).load()
# create data frame which is to be added to target table 
#create a list which has data to be inserted and another list for schema
        value_list=list()
        json_type=df1.select(["body"])       
        #display(json_type)
        for i in json_type.collect():
            json_value=json.loads(i['body'])           
            schema_list=list()
            value_list_loop=list()
            for j in mappingDF.select(["Sourcepointer","tgt_schema_name","tgt_table"]).collect():
                pointer=jsonpointer.JsonPointer(j["Sourcepointer"])            
                schema_list.append(j["tgt_schema_name"])                               
                value_list_loop.append(pointer.resolve(json_value))
                #target_tablename=j["tgt_table"]
                target_tablename ="dbo.meta_control_sql"
            schemalist_final=list()               
            schemalist_final=schema_list                        
            value_list.append(value_list_loop)  
#create dataframe                     
        final_df=spark.createDataFrame(data=value_list,schema=schemalist_final) 
#adding data to sql server table                
        final_df.write \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .mode("append") \
        .option("url", url) \
        .option("dbtable", target_tablename) \
        .option("user", username) \
        .option("password", password) \
        .save()

    except Exception as e:
        print(e)        

def checkeachtable(df1,epoch_id):
    try:
        
        #call write2deltalake function to add dataframe to the delta lake
        write2deltatable(df1,"default.delta_table173")
        #call write2sqlserver table
        write2sqltable(df1,"dbo.meta_control_sql")                    
    except Exception as e:
            print(e)
    

def eventhubread(connectionstring,tgttable,target_system):
    print("start process")
    ehConf = {
        'eventhubs.connectionstring' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionstring)
        }
        
    df = spark.readStream.format("eventhubs").options(**ehConf).load()
    df1 = df.withColumn("body", df["body"].cast("string"))
    
    df1.writeStream.outputMode("append").option("checkpointlocation","abfss://uodaadlsfilesystem@uodaadlsaccount.dfs.core.windows.net/uodadirectory/").foreachBatch(checkeachtable).start().awaitTermination()        


Comments

Popular posts from this blog

Introduction To Oracle10g

Insert

Except