Run pyspark file as python

 #!/usr/bin/env python

# coding: utf-8


# ## CallDeltaLakeand SqlServer


# In[ ]:



import json

import jsonpointer

from pyspark.sql.functions import lit,col

from pyspark.sql import SparkSession

import sys

from notebookutils import mssparkutils

from pyspark import SparkContext, SparkConf 

spark = SparkSession.builder.appName("pattern1").getOrCreate()

conf = SparkConf().setAppName("pattern1").set("spark.hadoop.validateOutputSpecs", "false")



# Adding to delta lake

def write2deltatable(df_ip_sensor,target_table):

    try:       

        df_ip_sensor_delta =df_ip_sensor.select(df_ip_sensor["body"])

        df_ip_sensor_delta.write.mode("append").format("delta").saveAsTable(target_table)       

    except Exception as e:

        print(e)


        #process dataframe to add data to sql server table

def write2sqltable(df_ip_sensor,target_table):

    try:

        #query fetch json pointer and target schema

        serverName=mssparkutils.credentials.getSecret("kvidpresources","rio-assurance-server","lsidpkeyvault")

        databaseName=mssparkutils.credentials.getSecret("kvidpresources","rio-assurance-database","lsidpkeyvault") 

        userName=mssparkutils.credentials.getSecret("kvidpresources","rio-assurance-db-user","lsidpkeyvault")

        password=mssparkutils.credentials.getSecret("kvidpresources","rio-assurance-db-password","lsidpkeyvault")

        server_name = "jdbc:sqlserver://"+serverName

        database_name = databaseName

        url = server_name + ";" + "databaseName=" + database_name + ";"

        username = userName

        password = password 

        #fetch json pointer values and target schema details for flattening json string

        query_jsonpointer = f"Select source_pointer,src_schema_name,tgt_schema_name,table_name from ctl.map_all_jsonpointer  where event_name='testevent3'"

        df_jsonpointer = spark.read \

        .format("com.microsoft.sqlserver.jdbc.spark") \

        .option("url", url) \

        .option("query", query_jsonpointer) \

        .option("user", username) \

        .option("password", password).load()

        # create data frame which is to be added to target table 

        # create a list which has data to be inserted and another list for schema

        list_data_consolidated=list()

        df_ip_sensor_body=df_ip_sensor.select(["body"])       

        for json_data in df_ip_sensor_body.collect():

            json_string=json.loads(json_data['body'])           

            list_schema=list()

            list_data=list()

        # fetch source pointers stored in sql server metadata table    

            for j in df_jsonpointer.select(["source_pointer","tgt_schema_name","table_name"]).collect():

                pointer=jsonpointer.JsonPointer(j["source_pointer"])            

                list_schema.append(j["tgt_schema_name"])                               

                list_data.append(pointer.resolve(json_string))


                target_tablename =target_table

            list_schema_consolidated=list()               

            list_schema_consolidated=list_schema                        

            list_data_consolidated.append(list_data)  

        #create dataframe                     

        df_op_sensor_sql=spark.createDataFrame(data=list_data_consolidated,schema=list_schema_consolidated) 

        #adding data to sql server table                

        df_op_sensor_sql.write \

        .format("com.microsoft.sqlserver.jdbc.spark") \

        .mode("append") \

        .option("url", url) \

        .option("dbtable", target_tablename) \

        .option("user", username) \

        .option("password", password) \

        .save()


    except Exception as e:

        print(e)        


def checkeachtable(df_ip_sensor,epoch_id):

    try:

        

        #call write2deltalake function to add dataframe to the delta lake

        write2deltatable(df_ip_sensor,"default.uoda_mine_sensor_streaming")

        #call write2sqlserver table

        write2sqltable(df_ip_sensor,"dbo.uoda_mine_sensor_streaming")         

    except Exception as e:

            print(e)

    


def eventhubread(connectionstring):

    print("start process")

    ehConf = {

        'eventhubs.connectionstring' : spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionstring)

        }

        

    df_ehip_sensor = spark.readStream.format("eventhubs").options(**ehConf).load()

    df_ip_sensor = df_ehip_sensor.withColumn("body", df_ehip_sensor["body"].cast("string"))

    df_ip_sensor.writeStream.outputMode("append").option("checkpointlocation","abfss://uodaadlsfilesystem@uodaadlsaccount.dfs.core.windows.net/uodadirectory/").foreachBatch(checkeachtable).start().awaitTermination()        


eventhubread('Endpoint=sb://uodaeventhub.servicebus.windows.net/;SharedAccessKeyName=sastestevent3;SharedAccessKey=CDLWbwEJpy59hrBljA6Di08wgUWxjrkmFEmcHh/BSyI=;EntityPath=testevent3')


Comments

Popular posts from this blog

Introduction To Oracle10g

Insert

Except