BAse Class

 """UODa Base Class

"""

import json

from typing import Optional, Dict, List, Tuple, Union

import argparse

from json.decoder import JSONDecodeError

from abc import ABC, abstractmethod

from enum import Enum

from datetime import datetime

import sys


from pyspark import SparkContext

from pyspark.sql import SparkSession

from pyspark.conf import SparkConf


# This is not available in local environments.

from notebookutils import mssparkutils  # noqa


from .util import DatabaseConnection, Logger



class DataMovementStepStatus(Enum):

    """DataMovementStep statuses to be used with stored procedures.

    """

    SUCCESSFUL = 'SUCCESSFUL'

    FAILED = 'FAILED'

    IN_PROGRESS = 'IN_PROGRESS'



class UodaBaseClass(ABC):

    """The foundational building block to extract configuration information 

    from the SupportDB and manage logging.


    Is used as an Abstract Base Class. The subclass MUST implement the

    processing method. The logic in this method is executed when execute_spark

    is run.


    Examples:

        class MyProcessingPattern(UodaBaseClass):

            def __init__(self, *args, **kwargs):

                super().__init__(*args, **kwargs)


            def processing():

                # Logic here

                return


        if __name__ == "__main__":

            my_processing_pattern = MyProcessingPattern()

            my_processing_pattern.execute_spark()


    Attributes:

        spark: The SparkSession to use for processing.

        data_movement_config: A dictionary from the ConfigurationJSON field in

            the SupportDB DataMovement table

        data_movement_table_config: A dictionary from the ConfigurationJSON

            field in the SupportDB DataMovementTable table

        app_name: The Spark application name to use

        logger: The Log4j wrapper class

    """


    spark: SparkSession = None

    data_movement_config: Dict = None

    data_movement_table_config: Dict = None

    app_name: str = None

    logger: Logger = None


    _support_db_ls_name: str = None

    _support_db_conn: DatabaseConnection = None

    _data_movement_id: int = None

    _data_movement_name: str = None

    _data_movement_log_id: int = None

    _batch_id: int = None

    _step_id: int = None

    _notebook_mode: bool = False

    _debug_data_movement_config: Dict = None

    _debug_data_movement_table_config: Dict = None


    def __init__(self, app_name: str,

                 input_args: Optional[List[str]] = None,

                 notebook_mode: bool = False,

                 debug_data_movement_config: Dict = None,

                 debug_data_movement_table_config: Dict = None) -> None:

        """Creates the UODa Base Class.


        Args:

            app_name: The spark application name.

            input_args: The command line arguments to use. Use for notebook

                development. If not supplied or None, then sys.argv is used, but

                this is handled in _parse_arguments.

            notebook_mode: Use for notebook development. This will keep the

                Spark session alive after execute_spark is run, and print log

                messages created with the logger to the notebook.

            debug_data_movement_config: A dictionary object to use instead of

                the configuration JSON from the Support DB. This will only be

                used if notebook_mode is True

            debug_data_movement_table_config: A dictionary object to use instead

                of the configuration JSON from the Support DB. This will only be

                used if notebook_mode is True

        """

        self._parse_arguments(input_args)


        self._debug_data_movement_config = debug_data_movement_config

        self._debug_data_movement_table_config = debug_data_movement_table_config # noqa E501

        self.app_name = app_name

        self._notebook_mode = notebook_mode


    @abstractmethod

    def execute_business_logic(self) -> None:

        """This method MUST be implemented by the subclass.


        The logic inside this method will be run when execute_spark is run. The

        implementation should either

            - return (None) if the logic is completed successfully

            - raise an Exception if it has failed

        """

        pass


    def execute_spark(self) -> None:

        """Run the Spark execute_business_logic method.


        This will create the Spark session, SupportDB connection, read data

        movement (and table) configurations to class attributes, and initialise

        the Log4J logger.


        The execute_business_logic method is run with a try/except block:

            - try: the execute_business_logic method is called. If it does not

                raise any exception, the exit code is set to 0.

            - except: any exception from execute_business_logic will be logged

                and re-raised, and the exit code is set to 1.

            - finally: the SupportDB connection is closed and sys.exit called

                with 0 or 1. If not keep_spark_session, the Spark session will

                be closed

        """

        self._support_db_conn = DatabaseConnection(self._support_db_ls_name)


        # noinspection PyBroadException

        try:

            if self._debug_data_movement_config is not None \

                    and self._notebook_mode:

                self.data_movement_config = self._debug_data_movement_config

                _, self._data_movement_name = self._read_config()

            else:

                self.data_movement_config, self._data_movement_name = \

                    self._read_config()


            if self._debug_data_movement_table_config is not None \

                    and self._notebook_mode:

                self.data_movement_table_config = self._debug_data_movement_table_config # noqa E501

            else:

                self.data_movement_table_config = self._read_table_config()


            sconf = SparkContext.getOrCreate().getConf()

            if 'sparkConfig' in self.data_movement_config:

                sconf.setAll(self.data_movement_config['sparkConfig'].items())


            self.spark = SparkSession.builder.appName(

                self.app_name).config(conf=sconf).getOrCreate()


            self.logger = Logger(self.spark, f"UODa_{self.app_name}",

                                 self._notebook_mode)

        except:  # noqa E722

            # Catch any exceptions from the setup process.

            self._support_db_conn.close_connection()

            if self.spark is not None and not self._notebook_mode:

                self.spark.stop()

            raise


        # The logic with _notebook_mode allows for the base class to be

        # used within Synapse notebooks. Running spark.stop() would kill the

        # notebook, without any feedback to the user, and running the notebook

        # would return a "Promise already completed" exception.

        #

        # Note that the SupportDB connection is closed everytime, as this is

        # re-established in this method.

        exit_code = 0

        # noinspection PyBroadException

        try:

            self.execute_business_logic()

        except Exception as e:

            exit_code = 1

            self.logger.fatal(e)

        finally:

            self._support_db_conn.close_connection()

            if self._notebook_mode:

                if exit_code != 0:

                    sys.exit(exit_code)

                else:

                    return

            else:

                self.spark.stop()

                sys.exit(exit_code)


    def _parse_arguments(self, input_args: Optional[List[str]] = None) -> None:

        """Parse command line arguments for the processing job.


        Args:

          input_args: A list of command line arguments passed (like sys.argv)

        """

        # TODO - update

        parser = argparse.ArgumentParser(

            prog='UODa Processing Job',

            description='Run a UODa processing job',

            epilog='Contact xxx@riotinto.com for more information.'

        )

        parser.add_argument('-dm', '--data_movement_id',

                            required=True, type=int, help='The DataMovementId')

        parser.add_argument('-b', '--batch_id', required=True,

                            type=int, help='The BatchId')

        parser.add_argument('-s', '--support_db', required=True,

                            help='The name of the linked service to ')

        if input_args is None:

            args = parser.parse_args()

        else:

            args = parser.parse_args(input_args)

        self._batch_id = args.batch_id

        self._data_movement_id = args.data_movement_id

        self._support_db_ls_name = args.support_db


    def _read_config(self) -> Tuple[Dict, str]:

        """Read configuration from SupportDB using DataMovementID.


        Returns:

            A tuple with the dictionary of the JSON configuration for a given

            DataMovementID, and the data movement name.


        Raises:

            ValueError: The DataMovementID did not return any rows.

            ValueError, JSONDecodeError: The DataMovementID did not return a valid

                JSON configuration.

        """


        sql_query = f"""

            SELECT 

              ConfigurationJSON,

              DataMovementName 

            FROM 

              ctrl.DataMovement 

            WHERE DataMovementID = '{self._data_movement_id}' 

        """

        try:

            cursor = self._support_db_conn.execute_with_retry(sql_query)

        except:  # noqa E722

            raise

        db_row = cursor.fetchone()

        if db_row is None:

            raise ValueError("The DataMovementID did not return any rows.")

        try:

            return json.loads(db_row[0]), db_row[1]

        except (JSONDecodeError, json.JSONDecodeError, TypeError):

            raise ValueError(

                "The DataMovementID did not return a valid JSON configuration.")


    def _read_table_config(self) -> Dict:

        """Read table configuration data from SupportDB using DataMovementID.


        Returns:

          A dictionary of the JSON configuration for a given DataMovementID.


        Raises:

          ValueError: The DataMovementID did not return any rows.

          ValueError, JSONDecodeError: The DataMovementID did not return a valid

            JSON configuration.

        """


        sql_query = f"""

            SELECT

              DataMovementTableID

              ,ConfigurationJSON 

            FROM 

              ctrl.DataMovementTable 

            WHERE 

              DataMovementID = '{self._data_movement_id}'

        """

        try:

            cursor = self._support_db_conn.execute_with_retry(sql_query)

        except:  # noqa E722

            raise

        db_rows = cursor.fetchall()

        rtn: dict = {}

        for data_movement_table_id, config_json in db_rows:

            rtn[data_movement_table_id] = json.loads(config_json)

        return rtn


    def create_log_step(self, step_desc: str,

                        data_movement_table_id: Optional[int] = None) -> None:

        """Insert a record to log table DataMovementStep.


        DataMovementSteps can be updated using update_log_step and the log ID

        returned by the stored procedure. This is stored in a class attribute

        and referenced by update_log_step.

        

        The following (in addition to the method arguments) are used as

        parameters in the stored procedure call:

            - BatchID = Batch ID in arguments

            - SourceMasterDataMovementName = value from read_config

            - Status = DataMovementStepStatus.IN_PROGRESS

            - CreatedBy = App name in arguments


        Note that it is NOT supported to have multiple DataMovementSteps at a

        single time. This method will overwrite the step ID class attribute,

        regardless of if it is completed or not.


        Args:

          step_desc: Description of the step.

          data_movement_table_id: Data movement table identifier


        Raises:

          ValueError: The stored procedure did not return any rows.

        """


        sql_query = """

          SET NOCOUNT ON; 

          EXECUTE [log].[uspCreateDataMovementStep] 

          @BatchID = ?

          ,@SourceMasterDataMovementName = ?

          ,@StepDescription = ?

          ,@Status = ?

          ,@CreatedBy = ?

          ,@DatamovementTableID = ?

          ,@ID=NULL

          """

        params = (self._batch_id, self._data_movement_name, step_desc,

                  DataMovementStepStatus.IN_PROGRESS.value, self.app_name,

                  data_movement_table_id)

        try:

            cursor = self._support_db_conn.execute_with_retry(sql_query, params)

        except:  # noqa E722

            raise

        db_row = cursor.fetchone()

        if db_row is None:

            raise ValueError("The procedure did not return any rows.")

        else:

            self._step_id = db_row[0]


    def update_log_step(self, status: DataMovementStepStatus,

                        end_time: Optional[

                            Union[datetime, str]] = datetime.now()) -> None:

        """Update a record in the log table DataMovementStep


        This will use the step_id from the most recent call of create_log_step

        to update the matching row in the DataMovementStep table.


        The following (in addition to the method arguments) are used as

        parameters in the stored procedure call:

            - Step ID = Step ID from most recent create_log_step

            - Status = DataMovementStepStatus.IN_PROGRESS

            - UpdatedBy = App name in arguments


        Args:

            status: Status of the step.

            end_time: End time of the step. Defaults to current time. Can be

                passed as either a string or datetime object.


        Raises:

            ValueError: If _step_id is None (i.e. create_log_step has not

                successfully run).

        """


        if self._step_id is None:

            raise ValueError("Class attribute _step_id is not set.")


        sql_query = """

          SET NOCOUNT ON; 

          EXECUTE [log].[uspSetDataMovementStep] 

          @StepID = ?

          ,@Status = ?

          ,@EndTime = ?

          ,@UpdatedBy = ?

          """

        params = (self._step_id, status.value, end_time, self.app_name)

        try:

            # Discarding the cursor as we don't need to read anything from it.

            _ = self._support_db_conn.execute_with_retry(sql_query, params)

        except:  # noqa E722

            raise


    def create_log_step_audit(self, event_text: str,

                              record_count: Optional[int] = None,

                              insert_record_count: Optional[int] = None,

                              update_record_count: Optional[int] = None,

                              delete_record_count: Optional[int] = None,

                              stored_procedure_name: Optional[str] = None,

                              custom_procedure_schema: Optional[str] = None,

                              custom_procedure_name: Optional[str] = None,

                              log_database: Optional[str] = None) -> None:

        """Insert a record into log table DataMovementSepAudit.


        The following (in addition to the method arguments) are used as

        parameters in the stored procedure call:

            - Step ID = Step ID from most recent create_log_step

            - CreatedBy = App name in arguments


        Args:

            event_text: A description of the event, with a maximum length of 250

            record_count: The total record count in the table after making changes

            insert_record_count: The number of records inserted

            update_record_count: The number of records updated

            delete_record_count: The number of records deleted

            stored_procedure_name: Reserved for future use

            custom_procedure_schema: Reserved for future use

            log_database: Reserved for future use

            custom_procedure_name: Reserved for future use

        """


        sql_query = """

          SET NOCOUNT ON;

          EXECUTE [log].[uspCreateDataMovementStepAudit] 

          @StepID = ?

          ,@LogDatabase = ?

          ,@RecordCount = ?

          ,@InsertedRecordCount = ?

          ,@UpdatedRecordCount = ?

          ,@DeletedRecordCount = ? 

          ,@EventText = ?  

          ,@StoredProcedureName = ?

          ,@CustomProcedureSchema = ?

          ,@CustomProcedureName = ?

          ,@CreatedBy = ?

          """

        params = (

            self._step_id, log_database, record_count, insert_record_count,

            update_record_count, delete_record_count, event_text,

            stored_procedure_name, custom_procedure_schema,

            custom_procedure_name, self.app_name

        )

        try:

            # Discarding the cursor as we don't need to read anything from it.

            _ = self._support_db_conn.execute_with_retry(sql_query, params)

        except:  # noqa E722

            raise


    def create_log_step_event(self, event_log_description: str) -> None:

        """Insert a record to log table DataMovementStepEvent


        The following (in addition to the method arguments) are used as

        parameters in the stored procedure call:

            - Step ID = Step ID from most recent create_log_step

            - CreatedBy = App name in arguments


        Args:

          event_log_description: Description of the event

        """

        sql_query = """

          SET NOCOUNT ON;

          EXECUTE [log].[uspCreateDataMovementStepEventLog] 

          @StepID = ?

          ,@EventLogDescription = ?

          ,@CreatedBy = ?

          """

        params = (self._step_id, event_log_description, self.app_name)

        try:

            # Discarding the cursor as we don't need to read anything from it.

            _ = self._support_db_conn.execute_with_retry(sql_query, params)

        except:  # noqa E722

            raise


Comments

Popular posts from this blog

Introduction To Oracle10g

Insert

Except