Read Events From Event Hub and Put it to SQL Server and Delta Lake
import json
import jsonpointer
from pyspark.sql.functions import lit,col
# Adding to delta lake
def write2deltatable(df1,tbl_name):
try:
df_final=df1.select(df1["body"])
df_final.write.mode("append").format("delta").saveAsTable(tbl_name)
except Exception as e:
print(e)
#process dataframe to add data to sql server table
def write2sqltable(df1,tbl_name):
try:
#query fetch json pointer and target schema
#system information
serverName = TokenLibrary.getSecret("kvidpresources","rio-assurance-server","lsidpkeyvault")
databaseName = TokenLibrary.getSecret("kvidpresources","rio-assurance-database","lsidpkeyvault")
userName = TokenLibrary.getSecret("kvidpresources","rio-assurance-db-user","lsidpkeyvault")
password = TokenLibrary.getSecret("kvidpresources","rio-assurance-db-password","lsidpkeyvault")
server_name = "jdbc:sqlserver://"+serverName
database_name = databaseName
url = server_name + ";" + "databaseName=" + database_name + ";"
username = userName
password = password
#fetch json pointer values and target schema details for flattening json string
querySel = f"Select Sourcepointer,src_schema_name,tgt_schema_name,tgt_table from ctl.schema_mapping where tableName='meta_control'"
mappingDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("query", querySel) \
.option("user", username) \
.option("password", password).load()
# create data frame which is to be added to target table
#create a list which has data to be inserted and another list for schema
value_list=list()
json_type=df1.select(["body"])
#display(json_type)
for i in json_type.collect():
json_value=json.loads(i['body'])
schema_list=list()
value_list_loop=list()
for j in mappingDF.select(["Sourcepointer","tgt_schema_name","tgt_table"]).collect():
pointer=jsonpointer.JsonPointer(j["Sourcepointer"])
schema_list.append(j["tgt_schema_name"])
value_list_loop.append(pointer.resolve(json_value))
#target_tablename=j["tgt_table"]
target_tablename ="dbo.meta_control_sql"
schemalist_final=list()
schemalist_final=schema_list
value_list.append(value_list_loop)
#create dataframe
final_df=spark.createDataFrame(data=value_list,schema=schemalist_final)
#adding data to sql server table
final_df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", target_tablename) \
.option("user", username) \
.option("password", password) \
.save()
except Exception as e:
print(e)
def checkeachtable(df1,epoch_id):
try:
#call write2deltalake function to add dataframe to the delta lake
write2deltatable(df1,"default.delta_table173")
#call write2sqlserver table
write2sqltable(df1,"dbo.meta_control_sql")
except Exception as e:
print(e)
def eventhubread(connectionstring,tgttable,target_system):
print("start process")
ehConf = {
'eventhubs.connectionstring' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionstring)
}
df = spark.readStream.format("eventhubs").options(**ehConf).load()
df1 = df.withColumn("body", df["body"].cast("string"))
df1.writeStream.outputMode("append").option("checkpointlocation","abfss://uodaadlsfilesystem@uodaadlsaccount.dfs.core.windows.net/uodadirectory/").foreachBatch(checkeachtable).start().awaitTermination()
Comments
Post a Comment