Posts

Showing posts from November, 2022

BAse Class

 """UODa Base Class """ import json from typing import Optional, Dict, List, Tuple, Union import argparse from json.decoder import JSONDecodeError from abc import ABC, abstractmethod from enum import Enum from datetime import datetime import sys from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.conf import SparkConf # This is not available in local environments. from notebookutils import mssparkutils  # noqa from .util import DatabaseConnection, Logger class DataMovementStepStatus(Enum):     """DataMovementStep statuses to be used with stored procedures.     """     SUCCESSFUL = 'SUCCESSFUL'     FAILED = 'FAILED'     IN_PROGRESS = 'IN_PROGRESS' class UodaBaseClass(ABC):     """The foundational building block to extract configuration information      from the SupportDB and manage logging.     Is used as an Abstract Base Class. The subclass MUST implement th...

Read Data from database table and access it as Tuple

Call the method using  self.data_movement_config, self._data_movement_name = \                     self._read_config() it will call the read_config method and the returned tuple will be saved to two variables , data_movement_config, data_movement_name  def _read_config(self) -> Tuple[Dict, str]:         """Read configuration from SupportDB .         Returns:             A tuple with the dictionary of the JSON configuration for a given              and the data movement name.         sql_query = f"""             SELECT                ConfigurationJSON,               DataMovementName              FROM                ctrl.DataMo...

Access contents in base class from derived class

 Define the methods inside base class Call the base class __init__ method from derived class using super().__init__ method Access the method or values in base class using self. keyword. Example from  typing  import  Optional, Dict, List, Tuple, Union import  argparse class   computer :      def   __init__ ( self , input_args :Optional[List[ str ]]):          #self.a=a          #self.b=b          #self.c=c          #print(f'base class {a}')          #print(f'base class  {b}')          #print(f'base class {c}')          #self.add(self.a,self.b)          self ._parse_...

Parse commandline arguments in python (incoming arguments under * )

from  typing  import  Optional, Dict, List, Tuple, Union import  argparse class   computer :      def   __init__ ( self , input_args :Optional[List[ str ]]):          #self.a=a          #self.b=b          #self.c=c          #print(f'base class {a}')          #print(f'base class  {b}')          #print(f'base class {c}')          #self.add(self.a,self.b)          self ._parse_arguments(input_args)      #def add (self,a,b) :          #print(a+b)         def   ...

Read and Write to delta table using explicit path

 First use below methods its mandatory def   set_abfss_config ( spark : SparkSession,  linked_service :  str ):      """Set the Spark configuration to access storage accounts using linked     services.     This function only supports managed identity for authentication. Once this     configuration has been set, the storage account can be accessed using the     abfss:// URI prefix.     See https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-secure-credentials-with-tokenlibrary # # noqa E501     Args:         spark: A current SparkSession.      ...

pattern 3 , Read using path

from  pyspark.sql.functions  import  col, lit import  pyspark.sql.functions  as  F import  sys import  traceback from  delta.tables  import  * from  pyspark.sql.types  import  StructType,StructField, StringType, TimestampType  from  datetime  import  datetime from  delta.tables  import  * from  pyspark.sql.types  import  Row from  uoda.base  import  UodaBaseClass, DataMovementStepStatus from  uoda.util  import  DatabaseConnection import  json import  jsonpointer from  urllib.parse  import  urlparse from  delta.tables  import  DeltaTable   class   ProcessingPattern3 ( UodaBaseClass ):      def   __init__ ( self , * args , ** kwargs ):          super (). __init__ (*args, **kwargs)   ...

Class perform write to ADLS and SQL server target

  def   get_abfss_path ( linked_service ,  container_name ,  path ):     ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))     parse_result = urlparse(ls[ 'Endpoint' ])      return   f "abfss:// { container_name } @ { parse_result.netloc } / { path } " def   set_abfss_config ( spark ,  linked_service ):     ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))     parse_result = urlparse(ls[ 'Endpoint' ])     spark.conf.set( f "spark.storage.synapse. { parse_result.netloc } .linkedServiceName" , linked_service)     spark.conf.set( f "fs.azure.account.oauth.provider.type. { parse_result.netloc } " ,  "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider" ) class   ProcessingPattern1 ( UodaBaseClas...