pattern 3 , Read using path

from pyspark.sql.functions import col, lit
import pyspark.sql.functions as F
import sys
import traceback
from delta.tables import *
from pyspark.sql.types import StructType,StructField, StringType, TimestampType 
from datetime import datetime
from delta.tables import *
from pyspark.sql.types import Row

from uoda.base import UodaBaseClass, DataMovementStepStatus
from uoda.util import DatabaseConnection

import json
import jsonpointer
from urllib.parse import urlparse
from delta.tables import DeltaTable

 


class ProcessingPattern3(UodaBaseClass):


    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def execute_business_logic(self):
        self.logger.info("Starting execution of business logic")
        self.create_log_step("Starting data movement from CDC Table to EventHub")
        
        try:
            self.startCDCStreaming()
        except Exception as e:
            self.update_log_step(DataMovementStepStatus.FAILED)
            self.create_log_step_event(f'Data movement failed. Exception message is {e}')
            raise

    def set_abfss_config(self,spark: SparkSession, linked_servicestr):

        """Set the Spark configuration to access storage accounts using linked
        services.

        This function only supports managed identity for authentication. Once this
        configuration has been set, the storage account can be accessed using the
        abfss:// URI prefix.

        See https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-secure-credentials-with-tokenlibrary # # noqa E501

        Args:
            spark: A current SparkSession.
            linked_service: The linked service available in the Synapse workspace.
        """
        ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
        parse_result = urlparse(ls['Endpoint'])
        spark.conf.set(
            f"spark.storage.synapse.{parse_result.netloc}.linkedServiceName",
            linked_service)
        spark.conf.set(
            f"fs.azure.account.oauth.provider.type.{parse_result.netloc}",
            "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")  # noqa E501

#class ProcessingPattern4(UodaBaseClass):
    def get_abfss_path(self,linked_servicestrcontainer_namestrpathstr) -> str:
        """Get the full Azure Blob File Storage Secure URI from a linked service.

        Args:
            linked_service: The linked service available in the Synapse workspace.
            container_name: The container in the storage account.
            path: The path to the blob in the container.

        Returns:
            A fully qualified UR using the abfss prefix.
        """
        ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
        parse_result = urlparse(ls['Endpoint'])
        return f"abfss://{container_name}@{parse_result.netloc}/{path}"


    def querySQLServerChangeTable(self,epoch_id):
        try:
            print("BatchStart =", datetime.now())
            _sourceDetails = self.data_movement_config["source"]
            _targetDetails = self.data_movement_config["targets"]
            _sourceLinkedService = self.data_movement_config["source"]["linkedService"]
            _sourceSchemaName = self.data_movement_config["source"]["schemaName"]
            _sourceTableName = self.data_movement_config["source"]["tableName"]
            _checkpointpath=self.data_movement_config["checkpoint"]["path"]
            _checkpointLinkedService=self.data_movement_config["checkpoint"]["linkedService"]
            _checkPointContainer=self.data_movement_config["checkpoint"]["containerName"]
            _sourceDatabaseNAme=self.data_movement_config["source"]["databaseName"]
                      
            _rdbms_linked_service = json.loads(mssparkutils.credentials.getPropertiesAll(_sourceLinkedService))
            _jdbc_url = f"jdbc:sqlserver://{_rdbms_linked_service['Endpoint']};" \
                       f"database={_sourceDatabaseNAme};"
            self.set_abfss_config(spark,_checkpointLinkedService)
            _path = self.get_abfss_path(_checkpointLinkedService, _checkPointContainer, _checkpointpath)
            df_delta_path = spark.read.format("delta").load(_path)
            df_delta_path=df_delta_path.withColumn("updatedatetime",df_delta_path.updatedatetime.cast( "STRING"))
            _lastestDateTimeAry=df_delta_path.select(df_delta_path["updatedatetime"]).filter(df_delta_path["tabelname"]=='ctrl_ADF_RUN_HIST_CT').collect()
            #_getParamQuery = f"select cast(updatedatetime as STRING) from  default.cdcstatustable where tabelname='{_sourceTableName}'"
            #_lastestDateTimeAry = spark.sql(_getParamQuery).collect()
    
            _query ="Select *,sys.fn_cdc_map_lsn_to_time(__$start_lsn) AS Row_Change_DteTme_Stamp from cdc."+_sourceTableName
            if(len(_lastestDateTimeAry) > 0):
                _lastestDateTime = _lastestDateTimeAry[0]['updatedatetime']
                _query =f"Select *,sys.fn_cdc_map_lsn_to_time(__$start_lsn) AS Row_Change_DteTme_Stamp from cdc.{_sourceTableName} where sys.fn_cdc_map_lsn_to_time(__$start_lsn) > cast('{_lastestDateTime}' as datetime) "
            df_changed_rows = spark.read \
                .format("com.microsoft.sqlserver.jdbc.spark") \
                .option("url", _jdbc_url) \
                .option("query", _query) \
                .option("accessToken", _rdbms_linked_service['AuthKey']) \
                .load()
            _cnt = df_changed_rows.count()
            print(_cnt)
            print("=======================cnt========================")
            if(_cnt >0):
                df_sorted = df_changed_rows.sort(col("Row_Change_DteTme_Stamp").desc())
                _firstRow = df_sorted.first()
                print("Process execution time 1212 =", datetime.now())
                self.updateCDCStatusTable(_firstRow,_sourceTableName,_path)
                print("Process execution time -11 =", datetime.now())
                self.sendToEventHub(df_sorted)
            print(f"CDC for {_sourceLinkedService} with SchemaName:{_sourceSchemaName} and TableName:{_sourceTableName} CDC Count is {_cnt}")
            print(_targetDetails)
        except Exception as e:
            print("=======================Exce========================"*3)
            print(e)
    def sendToEventHub(self,df_changed_rows_passed):
        _operations=[3,4]
        #select the rows which do not have update operation  
        df_rest_operation = df_changed_rows_passed.filter(~col("__$operation").isin(_operations))
        #select the after rows with before update changes only 
        df_update_before = df_changed_rows_passed.filter(col("__$operation") == 3)
        #select the after rows with after update changes only 
        df_update_after = df_changed_rows_passed.filter(col("__$operation") == 4)
        #create the json for changes as an additional column ChangeRow
        print("Process execution time -1 =", datetime.now())
        df_rest_operation_json = df_rest_operation.withColumn('ChangeRow',F.to_json(F.struct("*")))
        #select only the timestamp, seqval and operation as per the change
        df_rest_operation_json = df_rest_operation_json.select("Row_Change_DteTme_Stamp","__$seqval","__$operation",'ChangeRow')
        #create the json for before update changes as an additional column BeforeUpdate
        df_update_before = df_update_before.withColumn('BeforeUpdate',F.to_json(F.struct("*")))
        #select only the timestamp, seqval and operation unified as 4 for update
        df_update_before = df_update_before.select("Row_Change_DteTme_Stamp","__$seqval",'BeforeUpdate').withColumn('__$operation',lit(4))
        #create the json for after update changes as an additional column AfterUpdate
        df_update_after = df_update_after.withColumn('AfterUpdate',F.to_json(F.struct("*")))
        #select only the timestamp, seqval and operation unified as 4 for update
        df_update_after = df_update_after.select("Row_Change_DteTme_Stamp","__$seqval",'AfterUpdate').withColumn('__$operation',lit(4))
        #Combine before and after rows for update
        df_update = df_update_before.join(df_update_after, ['Row_Change_DteTme_Stamp',"__$seqval",'__$operation'])
        #Convert update rows into Json with new column changedrows
        df_update_final = df_update.withColumn('json',F.to_json(F.struct("*"))).groupby(['Row_Change_DteTme_Stamp','__$seqval']).agg(F.collect_list('json').alias("changedrows"))
        #Convert changed rows into Json with new column changedrows
        df_final_rest = df_rest_operation_json.withColumn('json',F.to_json(F.struct("*"))).groupby(['Row_Change_DteTme_Stamp','__$seqval']).agg(F.collect_list('json').alias("changedrows"))
        #combine all to form a single dataframe
        df_combined = df_update_final.unionAll(df_final_rest)
        df_to_send = df_combined.select(F.to_json(F.struct("changedrows")).alias("body"))
        print("Process execution time -2 =", datetime.now())
        for target in self.data_movement_config['targets']:
                print(target["type"])
                print(target["namespace"])
                print(target["eventHub"])
                print(target["keyVaultLinkedService"])
                print(target["keyVaultSecretName"])
                _eh_conn_string = mssparkutils.credentials.getSecretWithLS(target["keyVaultLinkedService"],target["keyVaultSecretName"])
                _ehConf = {
                'eventhubs.connectionstring' : spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(_eh_conn_string)
                }
                df_to_send.select("body").write.format("eventhubs").options(**_ehConf).save()
                print(_eh_conn_string)
            #
        print("Process execution time -3 =", datetime.now())
    def updateCDCStatusTable(self,firstRowPassed,tableNamePassed,_path):
        _maxTimeStamp = firstRowPassed.Row_Change_DteTme_Stamp
        #path = get_abfss_path(linked_service, container_name, path)
        _table_data=[("CDC",tableNamePassed,_maxTimeStamp)]
        _schema = StructType([ \
            StructField("schemaname",StringType(),True), \
            StructField("tabelname",StringType(),True), \
            StructField("updatedatetime",TimestampType(),True)\
        ])
        df_status=spark.createDataFrame(data=_table_data,schema=_schema)
        print(df_status.schema)
        if DeltaTable.isDeltaTable(spark, _path):
            delta_table = DeltaTable.forPath(spark, _path)
            delta_table.alias("t") \
                .merge(df_status.alias("u"),
                    "u.tabelname = t.tabelname") \
                .whenMatchedUpdateAll() \
                .whenNotMatchedInsertAll() \
                .execute()
            
        else:
            df_status.write.format("delta").save(path)

    def foreach_batch_function(selfdf_event_inputepoch_id):
        try:
            self.logger.info(f"Batch:{epoch_id} Record Count:{df_event_input.count()}")
            _sourceDetails = self.data_movement_config['source']
            self.querySQLServerChangeTable(epoch_id)
        except Exception as e:
            self.logger.error(f"Batch writing failed. Exception message is {e}")
            raise

    def startCDCStreaming(self):
        self.logger.info(f"Started CDC reading for the table ")
        try:
            df_rate = spark \
            .readStream \
            .format("rate") \
            .option("rowsPerBatch"1) \
            .load()
            df_rate.writeStream.foreachBatch(self.foreach_batch_function).start().awaitTermination()
        except Exception as e:
            self.logger.error(f"Read from CDC Table failed. Exception message is {e}")
            raise 

Comments

Popular posts from this blog

Introduction To Oracle10g

Insert

Except