Class perform write to ADLS and SQL server target

 def get_abfss_path(linked_servicecontainer_namepath):

    ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
    parse_result = urlparse(ls['Endpoint'])
    return f"abfss://{container_name}@{parse_result.netloc}/{path}"

def set_abfss_config(sparklinked_service):
    ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
    parse_result = urlparse(ls['Endpoint'])
    spark.conf.set(f"spark.storage.synapse.{parse_result.netloc}.linkedServiceName", linked_service)
    spark.conf.set(f"fs.azure.account.oauth.provider.type.{parse_result.netloc}""com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")

class ProcessingPattern1(UodaBaseClass):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def execute_business_logic(self):
        self.logger.info("Starting execution of business logic")

        for target in [t for t in self.data_movement_config['targets'if t['type'] == "ADLS_Table"]:
            set_abfss_config(self.spark, target['linkedService'])

        eh_conn_string = mssparkutils.credentials.getSecretWithLS(
            self.data_movement_config["source"]["keyVaultLinkedService"],
            self.data_movement_config["source"]["keyVaultSecretName"]
        )

        set_abfss_config(self.spark, self.data_movement_config["source"]["checkpointLinkedService"])
        eh_checkpoint_location = get_abfss_path(self.data_movement_config["source"]["checkpointLinkedService"], self.data_movement_config["source"]["checkpointContainer"], self.data_movement_config["source"]["checkpointPath"])

        self.create_log_step("Starting data movement from EventHub to ADLS/RDBMS in ODS")

        json_schema_cur = self._support_db_conn.execute_with_retry(
            f"""
            SELECT
                source_pointer, 
                src_schema_name, 
                tgt_schema_name, 
                table_name 
            FROM
                ctrl.map_all_jsonpointer  
            WHERE
                event_name='{self.data_movement_config["source"]["eventHub"]}'
            """
        )

        self.json_schema = list(json_schema_cur.fetchall())
        del json_schema_cur
        try:
            self.read_from_eventhub(eh_conn_string, eh_checkpoint_location)
        except Exception as e:
            self.update_log_step(DataMovementStepStatus.FAILED)
            self.create_log_step_event(f'Data movement failed. Exception message is {e}')
            raise

    def write_to_adls(selfdflinked_servicecontainer_namepath):
        try:
            df.show()
            path = get_abfss_path(linked_service, container_name, path)
            df_count = df.count()
            df = df.dropDuplicates()
            if DeltaTable.isDeltaTable(self.spark, path):
                delta_table = DeltaTable.forPath(self.spark, path)
                delta_table.alias("t").merge(
                    df.alias("u"),
                    "u.body.seqn = t.body.seqn") \
                .whenMatchedUpdateAll() \
                .whenNotMatchedInsertAll() \
                .execute()

                hist = delta_table.history(1).first()['operationMetrics']
                self.create_log_step_audit(
                    event_text=f"Microbatch merged to {linked_service} {path}. Input count={df_count}, without duplicates={df.count()}"
                    record_count=df.count(),
                    insert_record_count=hist['numTargetRowsInserted'],
                    update_record_count=hist['numTargetRowsUpdated'],
                    delete_record_count=hist['numTargetRowsDeleted'],
                )
                self.logger.info(f"Microbatch merged to {linked_service} {path}." \
                    f"Input count={df_count}, without duplicates={df.count()}, " \
                    f"insert_record_count={hist['numTargetRowsInserted']}," \
                    f"update_record_count={hist['numTargetRowsInserted']}," \
                    f"delete_record_count={hist['numTargetRowsInserted']}."
                )
            else:
                self.logger.info(f"Delta table not found at {path}, creating a new table.")
                df.write \
                .format("delta") \
                .save(path)
                self.create_log_step_audit(
                    event_text=f"Microbatch merged to {linked_service} {path}. Input count={df_count}, without duplicates={df.count()}"
                    record_count=df.count(),
                    insert_record_count=df.count(),
                )
                self.logger.info(f"Microbatch created new table to {linked_service} {path}." \
                    f"Input count={df_count}, without duplicates={df.count()}, " \
                    f"insert_record_count={hist['numTargetRowsInserted']}."
                )
                
        except Exception as e:
            self.logger.error(f"Exception occurred while writing to ADLS {linked_service} {path}. Exception is {e}")
            raise

    def write_to_rdbms(selfdflinked_serviceschema_nametable_name):
        try:
            target_rows = list()    
            target_schema = [tgt_schema_name for _, _, tgt_schema_name, _ in self.json_schema]

            for json_data in df.collect():
                json_body = json.loads(json_data['body'])
                tmp_rows = list()
                for source_pointer, _, _, _ in self.json_schema:
                    pointer = jsonpointer.JsonPointer(source_pointer)                               
                    tmp_rows.append(pointer.resolve(json_body))              
                target_rows.append(tmp_rows)  
                                       
            df_target = spark.createDataFrame(
                data=target_rows,
                schema=target_schema
            )

            rdbms_linked_service = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
            jdbc_url = f"jdbc:sqlserver://{rdbms_linked_service['Endpoint']};" \
                       f"database={rdbms_linked_service['Database']};"
            
            full_table_name = f"{schema_name}.{table_name}"

            # df_target.write \
            #     .format("com.microsoft.sqlserver.jdbc.spark") \
            #     .mode("append") \
            #     .option("url", jdbc_url) \
            #     .option("dbtable", full_table_name) \
            #     .option("accessToken", rdbms_linked_service['AuthKey']) \
            #     .save()

            df_target.createOrReplaceTempView("updates")

            df_existing = spark.read \
                .format("com.microsoft.sqlserver.jdbc.spark") \
                .option("url", jdbc_url) \
                .option("dbtable", full_table_name) \
                .option("accessToken", rdbms_linked_service['AuthKey']) \
                .load().createOrReplaceTempView("existing")
            
            cols = df_existing.schema.names
            TODO - update with right column names, and replace seqn
            pk = "seqn"
            output = self.spark.sql(f""""
            DECLARE @SummaryOfChanges TABLE(Change VARCHAR(20));
 
            MERGE existing AS Target
            USING (SELECT Col1,Col2 FROM updates) AS Source
            ON (Target.seqn = Source.seqn)
            
            WHEN MATCHED THEN
                UPDATE SET target.Col2 = source.Col2
            
            WHEN NOT MATCHED BY TARGET THEN
                INSERT (Col1,Col2) VALUES (Col1,Col2);
            
            OUTPUT $action INTO @SummaryOfChanges;
            
            SELECT Change, COUNT(*) AS CountPerChange
            FROM @SummaryOfChanges
            GROUP BY Change;
            """)
            

            inserted_count = df_target.count()
            # self.create_log_step_audit(
            #     event_text=f"Batch inserted to {linked_service} {full_table_name}", 
            #     insert_record_count=inserted_count,
                
            # )
            self.logger.info(f"Wrote {inserted_count} to {linked_service} {full_table_name}")
        except Exception as e:
            self.logger.error(f"Exception occurred while writing to RDBMS {linked_service} {schema_name}.{table_name}. Exception message is {e}")
            raise

    def foreach_batch_function(selfdf_event_inputepoch_id):
        try:
            self.logger.info(f"Batch:{epoch_id} Record Count:{df_event_input.count()}")
            df = df_event_input.select(["body"])
            for target in self.data_movement_config['targets']:
                target_type = target['type']
                if target_type == "ADLS_Table":
                    self.write_to_adls(df, target['linkedService'], target['containerName'], target['path'])
                # elif target_type == "RDBMS_UPSERT_SQLSERVER":
                #     self.write_to_rdbms(df, target['linkedService'], target['schemaName'], target['tableName'])
                else:
                    self.logger.error(f"Target type {target_type} not supported.")
  
        except Exception as e:
            self.logger.error(f"Batch writing failed. Exception message is {e}")
            raise

    def read_from_eventhub(selfeh_conn_stringeh_checkpoint_location):
        self.logger.info(f"Started reading from EventHub")
        ehConf = {
            'eventhubs.connectionstring' : spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn_string)
        }
        try:
            df_event_input_raw = spark.readStream \
                .format("eventhubs") \
                .options(**ehConf) \
                .load()
            
            df_event_input = df_event_input_raw \
                .withColumn("body", df_event_input_raw["body"].cast("string"))
            
            df_event_input.writeStream \
                .outputMode("append") \
                .option("checkpointlocation", eh_checkpoint_location) \
                .foreachBatch(self.foreach_batch_function) \
                .start() \
                .awaitTermination()
        except Exception as e:
            self.logger.error(f"Read from EventHub failed. Exception message is {e}")
            raise 

Comments

Popular posts from this blog

Introduction To Oracle10g

Insert

Except