BAse Class
"""UODa Base Class
"""
import json
from typing import Optional, Dict, List, Tuple, Union
import argparse
from json.decoder import JSONDecodeError
from abc import ABC, abstractmethod
from enum import Enum
from datetime import datetime
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
# This is not available in local environments.
from notebookutils import mssparkutils # noqa
from .util import DatabaseConnection, Logger
class DataMovementStepStatus(Enum):
"""DataMovementStep statuses to be used with stored procedures.
"""
SUCCESSFUL = 'SUCCESSFUL'
FAILED = 'FAILED'
IN_PROGRESS = 'IN_PROGRESS'
class UodaBaseClass(ABC):
"""The foundational building block to extract configuration information
from the SupportDB and manage logging.
Is used as an Abstract Base Class. The subclass MUST implement the
processing method. The logic in this method is executed when execute_spark
is run.
Examples:
class MyProcessingPattern(UodaBaseClass):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def processing():
# Logic here
return
if __name__ == "__main__":
my_processing_pattern = MyProcessingPattern()
my_processing_pattern.execute_spark()
Attributes:
spark: The SparkSession to use for processing.
data_movement_config: A dictionary from the ConfigurationJSON field in
the SupportDB DataMovement table
data_movement_table_config: A dictionary from the ConfigurationJSON
field in the SupportDB DataMovementTable table
app_name: The Spark application name to use
logger: The Log4j wrapper class
"""
spark: SparkSession = None
data_movement_config: Dict = None
data_movement_table_config: Dict = None
app_name: str = None
logger: Logger = None
_support_db_ls_name: str = None
_support_db_conn: DatabaseConnection = None
_data_movement_id: int = None
_data_movement_name: str = None
_data_movement_log_id: int = None
_batch_id: int = None
_step_id: int = None
_notebook_mode: bool = False
_debug_data_movement_config: Dict = None
_debug_data_movement_table_config: Dict = None
def __init__(self, app_name: str,
input_args: Optional[List[str]] = None,
notebook_mode: bool = False,
debug_data_movement_config: Dict = None,
debug_data_movement_table_config: Dict = None) -> None:
"""Creates the UODa Base Class.
Args:
app_name: The spark application name.
input_args: The command line arguments to use. Use for notebook
development. If not supplied or None, then sys.argv is used, but
this is handled in _parse_arguments.
notebook_mode: Use for notebook development. This will keep the
Spark session alive after execute_spark is run, and print log
messages created with the logger to the notebook.
debug_data_movement_config: A dictionary object to use instead of
the configuration JSON from the Support DB. This will only be
used if notebook_mode is True
debug_data_movement_table_config: A dictionary object to use instead
of the configuration JSON from the Support DB. This will only be
used if notebook_mode is True
"""
self._parse_arguments(input_args)
self._debug_data_movement_config = debug_data_movement_config
self._debug_data_movement_table_config = debug_data_movement_table_config # noqa E501
self.app_name = app_name
self._notebook_mode = notebook_mode
@abstractmethod
def execute_business_logic(self) -> None:
"""This method MUST be implemented by the subclass.
The logic inside this method will be run when execute_spark is run. The
implementation should either
- return (None) if the logic is completed successfully
- raise an Exception if it has failed
"""
pass
def execute_spark(self) -> None:
"""Run the Spark execute_business_logic method.
This will create the Spark session, SupportDB connection, read data
movement (and table) configurations to class attributes, and initialise
the Log4J logger.
The execute_business_logic method is run with a try/except block:
- try: the execute_business_logic method is called. If it does not
raise any exception, the exit code is set to 0.
- except: any exception from execute_business_logic will be logged
and re-raised, and the exit code is set to 1.
- finally: the SupportDB connection is closed and sys.exit called
with 0 or 1. If not keep_spark_session, the Spark session will
be closed
"""
self._support_db_conn = DatabaseConnection(self._support_db_ls_name)
# noinspection PyBroadException
try:
if self._debug_data_movement_config is not None \
and self._notebook_mode:
self.data_movement_config = self._debug_data_movement_config
_, self._data_movement_name = self._read_config()
else:
self.data_movement_config, self._data_movement_name = \
self._read_config()
if self._debug_data_movement_table_config is not None \
and self._notebook_mode:
self.data_movement_table_config = self._debug_data_movement_table_config # noqa E501
else:
self.data_movement_table_config = self._read_table_config()
sconf = SparkContext.getOrCreate().getConf()
if 'sparkConfig' in self.data_movement_config:
sconf.setAll(self.data_movement_config['sparkConfig'].items())
self.spark = SparkSession.builder.appName(
self.app_name).config(conf=sconf).getOrCreate()
self.logger = Logger(self.spark, f"UODa_{self.app_name}",
self._notebook_mode)
except: # noqa E722
# Catch any exceptions from the setup process.
self._support_db_conn.close_connection()
if self.spark is not None and not self._notebook_mode:
self.spark.stop()
raise
# The logic with _notebook_mode allows for the base class to be
# used within Synapse notebooks. Running spark.stop() would kill the
# notebook, without any feedback to the user, and running the notebook
# would return a "Promise already completed" exception.
#
# Note that the SupportDB connection is closed everytime, as this is
# re-established in this method.
exit_code = 0
# noinspection PyBroadException
try:
self.execute_business_logic()
except Exception as e:
exit_code = 1
self.logger.fatal(e)
finally:
self._support_db_conn.close_connection()
if self._notebook_mode:
if exit_code != 0:
sys.exit(exit_code)
else:
return
else:
self.spark.stop()
sys.exit(exit_code)
def _parse_arguments(self, input_args: Optional[List[str]] = None) -> None:
"""Parse command line arguments for the processing job.
Args:
input_args: A list of command line arguments passed (like sys.argv)
"""
# TODO - update
parser = argparse.ArgumentParser(
prog='UODa Processing Job',
description='Run a UODa processing job',
epilog='Contact xxx@riotinto.com for more information.'
)
parser.add_argument('-dm', '--data_movement_id',
required=True, type=int, help='The DataMovementId')
parser.add_argument('-b', '--batch_id', required=True,
type=int, help='The BatchId')
parser.add_argument('-s', '--support_db', required=True,
help='The name of the linked service to ')
if input_args is None:
args = parser.parse_args()
else:
args = parser.parse_args(input_args)
self._batch_id = args.batch_id
self._data_movement_id = args.data_movement_id
self._support_db_ls_name = args.support_db
def _read_config(self) -> Tuple[Dict, str]:
"""Read configuration from SupportDB using DataMovementID.
Returns:
A tuple with the dictionary of the JSON configuration for a given
DataMovementID, and the data movement name.
Raises:
ValueError: The DataMovementID did not return any rows.
ValueError, JSONDecodeError: The DataMovementID did not return a valid
JSON configuration.
"""
sql_query = f"""
SELECT
ConfigurationJSON,
DataMovementName
FROM
ctrl.DataMovement
WHERE DataMovementID = '{self._data_movement_id}'
"""
try:
cursor = self._support_db_conn.execute_with_retry(sql_query)
except: # noqa E722
raise
db_row = cursor.fetchone()
if db_row is None:
raise ValueError("The DataMovementID did not return any rows.")
try:
return json.loads(db_row[0]), db_row[1]
except (JSONDecodeError, json.JSONDecodeError, TypeError):
raise ValueError(
"The DataMovementID did not return a valid JSON configuration.")
def _read_table_config(self) -> Dict:
"""Read table configuration data from SupportDB using DataMovementID.
Returns:
A dictionary of the JSON configuration for a given DataMovementID.
Raises:
ValueError: The DataMovementID did not return any rows.
ValueError, JSONDecodeError: The DataMovementID did not return a valid
JSON configuration.
"""
sql_query = f"""
SELECT
DataMovementTableID
,ConfigurationJSON
FROM
ctrl.DataMovementTable
WHERE
DataMovementID = '{self._data_movement_id}'
"""
try:
cursor = self._support_db_conn.execute_with_retry(sql_query)
except: # noqa E722
raise
db_rows = cursor.fetchall()
rtn: dict = {}
for data_movement_table_id, config_json in db_rows:
rtn[data_movement_table_id] = json.loads(config_json)
return rtn
def create_log_step(self, step_desc: str,
data_movement_table_id: Optional[int] = None) -> None:
"""Insert a record to log table DataMovementStep.
DataMovementSteps can be updated using update_log_step and the log ID
returned by the stored procedure. This is stored in a class attribute
and referenced by update_log_step.
The following (in addition to the method arguments) are used as
parameters in the stored procedure call:
- BatchID = Batch ID in arguments
- SourceMasterDataMovementName = value from read_config
- Status = DataMovementStepStatus.IN_PROGRESS
- CreatedBy = App name in arguments
Note that it is NOT supported to have multiple DataMovementSteps at a
single time. This method will overwrite the step ID class attribute,
regardless of if it is completed or not.
Args:
step_desc: Description of the step.
data_movement_table_id: Data movement table identifier
Raises:
ValueError: The stored procedure did not return any rows.
"""
sql_query = """
SET NOCOUNT ON;
EXECUTE [log].[uspCreateDataMovementStep]
@BatchID = ?
,@SourceMasterDataMovementName = ?
,@StepDescription = ?
,@Status = ?
,@CreatedBy = ?
,@DatamovementTableID = ?
,@ID=NULL
"""
params = (self._batch_id, self._data_movement_name, step_desc,
DataMovementStepStatus.IN_PROGRESS.value, self.app_name,
data_movement_table_id)
try:
cursor = self._support_db_conn.execute_with_retry(sql_query, params)
except: # noqa E722
raise
db_row = cursor.fetchone()
if db_row is None:
raise ValueError("The procedure did not return any rows.")
else:
self._step_id = db_row[0]
def update_log_step(self, status: DataMovementStepStatus,
end_time: Optional[
Union[datetime, str]] = datetime.now()) -> None:
"""Update a record in the log table DataMovementStep
This will use the step_id from the most recent call of create_log_step
to update the matching row in the DataMovementStep table.
The following (in addition to the method arguments) are used as
parameters in the stored procedure call:
- Step ID = Step ID from most recent create_log_step
- Status = DataMovementStepStatus.IN_PROGRESS
- UpdatedBy = App name in arguments
Args:
status: Status of the step.
end_time: End time of the step. Defaults to current time. Can be
passed as either a string or datetime object.
Raises:
ValueError: If _step_id is None (i.e. create_log_step has not
successfully run).
"""
if self._step_id is None:
raise ValueError("Class attribute _step_id is not set.")
sql_query = """
SET NOCOUNT ON;
EXECUTE [log].[uspSetDataMovementStep]
@StepID = ?
,@Status = ?
,@EndTime = ?
,@UpdatedBy = ?
"""
params = (self._step_id, status.value, end_time, self.app_name)
try:
# Discarding the cursor as we don't need to read anything from it.
_ = self._support_db_conn.execute_with_retry(sql_query, params)
except: # noqa E722
raise
def create_log_step_audit(self, event_text: str,
record_count: Optional[int] = None,
insert_record_count: Optional[int] = None,
update_record_count: Optional[int] = None,
delete_record_count: Optional[int] = None,
stored_procedure_name: Optional[str] = None,
custom_procedure_schema: Optional[str] = None,
custom_procedure_name: Optional[str] = None,
log_database: Optional[str] = None) -> None:
"""Insert a record into log table DataMovementSepAudit.
The following (in addition to the method arguments) are used as
parameters in the stored procedure call:
- Step ID = Step ID from most recent create_log_step
- CreatedBy = App name in arguments
Args:
event_text: A description of the event, with a maximum length of 250
record_count: The total record count in the table after making changes
insert_record_count: The number of records inserted
update_record_count: The number of records updated
delete_record_count: The number of records deleted
stored_procedure_name: Reserved for future use
custom_procedure_schema: Reserved for future use
log_database: Reserved for future use
custom_procedure_name: Reserved for future use
"""
sql_query = """
SET NOCOUNT ON;
EXECUTE [log].[uspCreateDataMovementStepAudit]
@StepID = ?
,@LogDatabase = ?
,@RecordCount = ?
,@InsertedRecordCount = ?
,@UpdatedRecordCount = ?
,@DeletedRecordCount = ?
,@EventText = ?
,@StoredProcedureName = ?
,@CustomProcedureSchema = ?
,@CustomProcedureName = ?
,@CreatedBy = ?
"""
params = (
self._step_id, log_database, record_count, insert_record_count,
update_record_count, delete_record_count, event_text,
stored_procedure_name, custom_procedure_schema,
custom_procedure_name, self.app_name
)
try:
# Discarding the cursor as we don't need to read anything from it.
_ = self._support_db_conn.execute_with_retry(sql_query, params)
except: # noqa E722
raise
def create_log_step_event(self, event_log_description: str) -> None:
"""Insert a record to log table DataMovementStepEvent
The following (in addition to the method arguments) are used as
parameters in the stored procedure call:
- Step ID = Step ID from most recent create_log_step
- CreatedBy = App name in arguments
Args:
event_log_description: Description of the event
"""
sql_query = """
SET NOCOUNT ON;
EXECUTE [log].[uspCreateDataMovementStepEventLog]
@StepID = ?
,@EventLogDescription = ?
,@CreatedBy = ?
"""
params = (self._step_id, event_log_description, self.app_name)
try:
# Discarding the cursor as we don't need to read anything from it.
_ = self._support_db_conn.execute_with_retry(sql_query, params)
except: # noqa E722
raise
Comments
Post a Comment