Run pyspark file as python
#!/usr/bin/env python
# coding: utf-8
# ## CallDeltaLakeand SqlServer
#
#
#
# In[ ]:
import json
import jsonpointer
from pyspark.sql.functions import lit,col
from pyspark.sql import SparkSession
import sys
from notebookutils import mssparkutils
from pyspark import SparkContext, SparkConf
spark = SparkSession.builder.appName("pattern1").getOrCreate()
conf = SparkConf().setAppName("pattern1").set("spark.hadoop.validateOutputSpecs", "false")
# Adding to delta lake
def write2deltatable(df_ip_sensor,target_table):
try:
df_ip_sensor_delta =df_ip_sensor.select(df_ip_sensor["body"])
df_ip_sensor_delta.write.mode("append").format("delta").saveAsTable(target_table)
except Exception as e:
print(e)
#process dataframe to add data to sql server table
def write2sqltable(df_ip_sensor,target_table):
try:
#query fetch json pointer and target schema
serverName=mssparkutils.credentials.getSecret("kvidpresources","rio-assurance-server","lsidpkeyvault")
databaseName=mssparkutils.credentials.getSecret("kvidpresources","rio-assurance-database","lsidpkeyvault")
userName=mssparkutils.credentials.getSecret("kvidpresources","rio-assurance-db-user","lsidpkeyvault")
password=mssparkutils.credentials.getSecret("kvidpresources","rio-assurance-db-password","lsidpkeyvault")
server_name = "jdbc:sqlserver://"+serverName
database_name = databaseName
url = server_name + ";" + "databaseName=" + database_name + ";"
username = userName
password = password
#fetch json pointer values and target schema details for flattening json string
query_jsonpointer = f"Select source_pointer,src_schema_name,tgt_schema_name,table_name from ctl.map_all_jsonpointer where event_name='testevent3'"
df_jsonpointer = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("query", query_jsonpointer) \
.option("user", username) \
.option("password", password).load()
# create data frame which is to be added to target table
# create a list which has data to be inserted and another list for schema
list_data_consolidated=list()
df_ip_sensor_body=df_ip_sensor.select(["body"])
for json_data in df_ip_sensor_body.collect():
json_string=json.loads(json_data['body'])
list_schema=list()
list_data=list()
# fetch source pointers stored in sql server metadata table
for j in df_jsonpointer.select(["source_pointer","tgt_schema_name","table_name"]).collect():
pointer=jsonpointer.JsonPointer(j["source_pointer"])
list_schema.append(j["tgt_schema_name"])
list_data.append(pointer.resolve(json_string))
target_tablename =target_table
list_schema_consolidated=list()
list_schema_consolidated=list_schema
list_data_consolidated.append(list_data)
#create dataframe
df_op_sensor_sql=spark.createDataFrame(data=list_data_consolidated,schema=list_schema_consolidated)
#adding data to sql server table
df_op_sensor_sql.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", target_tablename) \
.option("user", username) \
.option("password", password) \
.save()
except Exception as e:
print(e)
def checkeachtable(df_ip_sensor,epoch_id):
try:
#call write2deltalake function to add dataframe to the delta lake
write2deltatable(df_ip_sensor,"default.uoda_mine_sensor_streaming")
#call write2sqlserver table
write2sqltable(df_ip_sensor,"dbo.uoda_mine_sensor_streaming")
except Exception as e:
print(e)
def eventhubread(connectionstring):
print("start process")
ehConf = {
'eventhubs.connectionstring' : spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionstring)
}
df_ehip_sensor = spark.readStream.format("eventhubs").options(**ehConf).load()
df_ip_sensor = df_ehip_sensor.withColumn("body", df_ehip_sensor["body"].cast("string"))
df_ip_sensor.writeStream.outputMode("append").option("checkpointlocation","abfss://uodaadlsfilesystem@uodaadlsaccount.dfs.core.windows.net/uodadirectory/").foreachBatch(checkeachtable).start().awaitTermination()
eventhubread('Endpoint=sb://uodaeventhub.servicebus.windows.net/;SharedAccessKeyName=sastestevent3;SharedAccessKey=CDLWbwEJpy59hrBljA6Di08wgUWxjrkmFEmcHh/BSyI=;EntityPath=testevent3')
Comments
Post a Comment