pattern 3 , Read using path
from pyspark.sql.functions import col, lit
import pyspark.sql.functions as F
import sys
import traceback
from delta.tables import *
from pyspark.sql.types import StructType,StructField, StringType, TimestampType
from datetime import datetime
from delta.tables import *
from pyspark.sql.types import Row
from uoda.base import UodaBaseClass, DataMovementStepStatus
from uoda.util import DatabaseConnection
import json
import jsonpointer
from urllib.parse import urlparse
from delta.tables import DeltaTable
class ProcessingPattern3(UodaBaseClass):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def execute_business_logic(self):
self.logger.info("Starting execution of business logic")
self.create_log_step("Starting data movement from CDC Table to EventHub")
try:
self.startCDCStreaming()
except Exception as e:
self.update_log_step(DataMovementStepStatus.FAILED)
self.create_log_step_event(f'Data movement failed. Exception message is {e}')
raise
def set_abfss_config(self,spark: SparkSession, linked_service: str):
"""Set the Spark configuration to access storage accounts using linked
services.
This function only supports managed identity for authentication. Once this
configuration has been set, the storage account can be accessed using the
abfss:// URI prefix.
See https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-secure-credentials-with-tokenlibrary # # noqa E501
Args:
spark: A current SparkSession.
linked_service: The linked service available in the Synapse workspace.
"""
ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
parse_result = urlparse(ls['Endpoint'])
spark.conf.set(
f"spark.storage.synapse.{parse_result.netloc}.linkedServiceName",
linked_service)
spark.conf.set(
f"fs.azure.account.oauth.provider.type.{parse_result.netloc}",
"com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider") # noqa E501
#class ProcessingPattern4(UodaBaseClass):
def get_abfss_path(self,linked_service: str, container_name: str, path: str) -> str:
"""Get the full Azure Blob File Storage Secure URI from a linked service.
Args:
linked_service: The linked service available in the Synapse workspace.
container_name: The container in the storage account.
path: The path to the blob in the container.
Returns:
A fully qualified UR using the abfss prefix.
"""
ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
parse_result = urlparse(ls['Endpoint'])
return f"abfss://{container_name}@{parse_result.netloc}/{path}"
def querySQLServerChangeTable(self,epoch_id):
try:
print("BatchStart =", datetime.now())
_sourceDetails = self.data_movement_config["source"]
_targetDetails = self.data_movement_config["targets"]
_sourceLinkedService = self.data_movement_config["source"]["linkedService"]
_sourceSchemaName = self.data_movement_config["source"]["schemaName"]
_sourceTableName = self.data_movement_config["source"]["tableName"]
_checkpointpath=self.data_movement_config["checkpoint"]["path"]
_checkpointLinkedService=self.data_movement_config["checkpoint"]["linkedService"]
_checkPointContainer=self.data_movement_config["checkpoint"]["containerName"]
_sourceDatabaseNAme=self.data_movement_config["source"]["databaseName"]
_rdbms_linked_service = json.loads(mssparkutils.credentials.getPropertiesAll(_sourceLinkedService))
_jdbc_url = f"jdbc:sqlserver://{_rdbms_linked_service['Endpoint']};" \
f"database={_sourceDatabaseNAme};"
self.set_abfss_config(spark,_checkpointLinkedService)
_path = self.get_abfss_path(_checkpointLinkedService, _checkPointContainer, _checkpointpath)
df_delta_path = spark.read.format("delta").load(_path)
df_delta_path=df_delta_path.withColumn("updatedatetime",df_delta_path.updatedatetime.cast( "STRING"))
_lastestDateTimeAry=df_delta_path.select(df_delta_path["updatedatetime"]).filter(df_delta_path["tabelname"]=='ctrl_ADF_RUN_HIST_CT').collect()
#_getParamQuery = f"select cast(updatedatetime as STRING) from default.cdcstatustable where tabelname='{_sourceTableName}'"
#_lastestDateTimeAry = spark.sql(_getParamQuery).collect()
_query ="Select *,sys.fn_cdc_map_lsn_to_time(__$start_lsn) AS Row_Change_DteTme_Stamp from cdc."+_sourceTableName
if(len(_lastestDateTimeAry) > 0):
_lastestDateTime = _lastestDateTimeAry[0]['updatedatetime']
_query =f"Select *,sys.fn_cdc_map_lsn_to_time(__$start_lsn) AS Row_Change_DteTme_Stamp from cdc.{_sourceTableName} where sys.fn_cdc_map_lsn_to_time(__$start_lsn) > cast('{_lastestDateTime}' as datetime) "
df_changed_rows = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", _jdbc_url) \
.option("query", _query) \
.option("accessToken", _rdbms_linked_service['AuthKey']) \
.load()
_cnt = df_changed_rows.count()
print(_cnt)
print("=======================cnt========================")
if(_cnt >0):
df_sorted = df_changed_rows.sort(col("Row_Change_DteTme_Stamp").desc())
_firstRow = df_sorted.first()
print("Process execution time 1212 =", datetime.now())
self.updateCDCStatusTable(_firstRow,_sourceTableName,_path)
print("Process execution time -11 =", datetime.now())
self.sendToEventHub(df_sorted)
print(f"CDC for {_sourceLinkedService} with SchemaName:{_sourceSchemaName} and TableName:{_sourceTableName} CDC Count is {_cnt}")
print(_targetDetails)
except Exception as e:
print("=======================Exce========================"*3)
print(e)
def sendToEventHub(self,df_changed_rows_passed):
_operations=[3,4]
#select the rows which do not have update operation
df_rest_operation = df_changed_rows_passed.filter(~col("__$operation").isin(_operations))
#select the after rows with before update changes only
df_update_before = df_changed_rows_passed.filter(col("__$operation") == 3)
#select the after rows with after update changes only
df_update_after = df_changed_rows_passed.filter(col("__$operation") == 4)
#create the json for changes as an additional column ChangeRow
print("Process execution time -1 =", datetime.now())
df_rest_operation_json = df_rest_operation.withColumn('ChangeRow',F.to_json(F.struct("*")))
#select only the timestamp, seqval and operation as per the change
df_rest_operation_json = df_rest_operation_json.select("Row_Change_DteTme_Stamp","__$seqval","__$operation",'ChangeRow')
#create the json for before update changes as an additional column BeforeUpdate
df_update_before = df_update_before.withColumn('BeforeUpdate',F.to_json(F.struct("*")))
#select only the timestamp, seqval and operation unified as 4 for update
df_update_before = df_update_before.select("Row_Change_DteTme_Stamp","__$seqval",'BeforeUpdate').withColumn('__$operation',lit(4))
#create the json for after update changes as an additional column AfterUpdate
df_update_after = df_update_after.withColumn('AfterUpdate',F.to_json(F.struct("*")))
#select only the timestamp, seqval and operation unified as 4 for update
df_update_after = df_update_after.select("Row_Change_DteTme_Stamp","__$seqval",'AfterUpdate').withColumn('__$operation',lit(4))
#Combine before and after rows for update
df_update = df_update_before.join(df_update_after, ['Row_Change_DteTme_Stamp',"__$seqval",'__$operation'])
#Convert update rows into Json with new column changedrows
df_update_final = df_update.withColumn('json',F.to_json(F.struct("*"))).groupby(['Row_Change_DteTme_Stamp','__$seqval']).agg(F.collect_list('json').alias("changedrows"))
#Convert changed rows into Json with new column changedrows
df_final_rest = df_rest_operation_json.withColumn('json',F.to_json(F.struct("*"))).groupby(['Row_Change_DteTme_Stamp','__$seqval']).agg(F.collect_list('json').alias("changedrows"))
#combine all to form a single dataframe
df_combined = df_update_final.unionAll(df_final_rest)
df_to_send = df_combined.select(F.to_json(F.struct("changedrows")).alias("body"))
print("Process execution time -2 =", datetime.now())
for target in self.data_movement_config['targets']:
print(target["type"])
print(target["namespace"])
print(target["eventHub"])
print(target["keyVaultLinkedService"])
print(target["keyVaultSecretName"])
_eh_conn_string = mssparkutils.credentials.getSecretWithLS(target["keyVaultLinkedService"],target["keyVaultSecretName"])
_ehConf = {
'eventhubs.connectionstring' : spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(_eh_conn_string)
}
df_to_send.select("body").write.format("eventhubs").options(**_ehConf).save()
print(_eh_conn_string)
#
print("Process execution time -3 =", datetime.now())
def updateCDCStatusTable(self,firstRowPassed,tableNamePassed,_path):
_maxTimeStamp = firstRowPassed.Row_Change_DteTme_Stamp
#path = get_abfss_path(linked_service, container_name, path)
_table_data=[("CDC",tableNamePassed,_maxTimeStamp)]
_schema = StructType([ \
StructField("schemaname",StringType(),True), \
StructField("tabelname",StringType(),True), \
StructField("updatedatetime",TimestampType(),True)\
])
df_status=spark.createDataFrame(data=_table_data,schema=_schema)
print(df_status.schema)
if DeltaTable.isDeltaTable(spark, _path):
delta_table = DeltaTable.forPath(spark, _path)
delta_table.alias("t") \
.merge(df_status.alias("u"),
"u.tabelname = t.tabelname") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
df_status.write.format("delta").save(path)
def foreach_batch_function(self, df_event_input, epoch_id):
try:
self.logger.info(f"Batch:{epoch_id} Record Count:{df_event_input.count()}")
_sourceDetails = self.data_movement_config['source']
self.querySQLServerChangeTable(epoch_id)
except Exception as e:
self.logger.error(f"Batch writing failed. Exception message is {e}")
raise
def startCDCStreaming(self):
self.logger.info(f"Started CDC reading for the table ")
try:
df_rate = spark \
.readStream \
.format("rate") \
.option("rowsPerBatch", 1) \
.load()
df_rate.writeStream.foreachBatch(self.foreach_batch_function).start().awaitTermination()
except Exception as e:
self.logger.error(f"Read from CDC Table failed. Exception message is {e}")
raise
Comments
Post a Comment