Call ADI from Azure function

 from azure.core.credentials import AzureKeyCredential

from azure.ai.documentintelligence import DocumentIntelligenceClient

from azure.ai.documentintelligence.models import AnalyzeDocumentRequest

from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient

from warnings import filterwarnings

import pyodbc

import sqlalchemy

from sqlalchemy import create_engine, text

import json

from azure.storage.blob import BlobServiceClient

import pandas as pd

import os

from IPython.display import display

from constants import STORED_PROCEDURES

from connection import get_engine,get_connection_details,var_blob_connection_string,var_container_name,var_subfolder_path,var_endpoint,var_key,var_formUrl

#from connection import get_engine,get_connection_details


def fn_extract_document_data(Asset_Fund_Val_Doc_Map_Id,Asset_Valuation_Id):

    filterwarnings("ignore", category=UserWarning, message='.*pandas only supports SQLAlchemy connectable.*')


    engine = get_engine()

    conn=get_connection_details()


    try:

        doc_stat = pd.read_sql("EXEC dbo.sp_Get_Data_Extraction_Status ? ",conn,params=(Asset_Fund_Val_Doc_Map_Id))

        extraction_stat=doc_stat['Data_Extraction_Status'].values[0]

        print(f"extraction_status {extraction_stat}")

        if extraction_stat == 1:

            raise RuntimeError ("Document extraction is in progress now, please try after some time")

        doc_details = pd.read_sql("EXEC dbo.sp_Get_Document_Details ? ,?", conn, params=(Asset_Fund_Val_Doc_Map_Id,Asset_Valuation_Id))

        

        

        with conn.cursor() as cursor:

            cursor.execute("EXEC dbo.sp_Update_Data_Extraction_Status ?, ?", Asset_Fund_Val_Doc_Map_Id,1)

            conn.commit()


        

        blob_connection_string=var_blob_connection_string


        

        container_name=var_container_name

        

        subfolder_path=var_subfolder_path

        file_name=doc_details['Document_Path'].values[0]

        blob_name = f"{file_name}.json"


        

        endpoint=var_endpoint

        

        key=var_key


        

        model_id=doc_details['Model_Name'].values[0]

        

        formUrl=f"{var_formUrl}{file_name}"


        ###################################

        document_intelligence_client  = DocumentIntelligenceClient(

        endpoint=endpoint, credential=AzureKeyCredential(key)

        )


        # Make sure your document's type is included in the list of document types the custom model can analyze

        poller = document_intelligence_client.begin_analyze_document(

            model_id, AnalyzeDocumentRequest(url_source=formUrl)

        )

        result = poller.result()


        # Extract JSON content from the result variable

        result_json = result.as_dict()


        # Convert the JSON content to a string

        result_json_str = json.dumps(result_json)


        # Create a BlobServiceClient

        blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string)


        # Create a BlobClient

        blob_client = blob_service_client.get_blob_client(container=container_name, blob=os.path.join(subfolder_path, blob_name))


        # Upload the JSON content to the blob

        blob_client.upload_blob(result_json_str, overwrite=True)


        print(f"JSON content successfully uploaded to {os.path.join(subfolder_path, blob_name)} in container {container_name}.")


    ######################## Save to Table #######################


        blob_name = doc_details['Document_Name'].values[0]

        blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string)

        container_client = blob_service_client.get_container_client(container_name)


        #full_blob_path=f"{subfolder_path}{file_name}.json"


        

        # Construct the full blob path

        full_blob_path = f"{subfolder_path}{file_name}.json"

        print(full_blob_path)

        # Initialize blob client

        blob_client = container_client.get_blob_client(full_blob_path)


        # Download the JSON content directly from the blob

        download_stream = blob_client.download_blob()

        json_content = download_stream.readall()


        # Parse JSON data from the downloaded content

        data = json.loads(json_content)


        try:


            Document_Type=doc_details['Document_Type'].values[0]

            # Extract values from valueObject

            extracted_data = {}

            documents = data['documents']

            for document in documents:

                if Document_Type == 'Audited Financial Statement':

                    afs_data = document['fields']['AFSData']['valueArray']

                elif Document_Type == 'Management Account':

                    afs_data = document['fields']['MA_Dataset']['valueArray']

                else:

                    afs_data = []

                for item in afs_data:

                    value_object = item['valueObject']

                    for key, value in value_object.items():

                        if 'content' in value:

                            extracted_data[key] = value['content']

                        elif key not in extracted_data:

                            extracted_data[key] = None

        except Exception as e:

        # Handle any other exceptions

            error_message = f"An unexpected error occurred, Please check the File you Upload : {e}"

            raise RuntimeError(error_message)

    

        # Create DataFrame from extracted data

        df_afs = pd.DataFrame([extracted_data])

        

        # Display DataFrame

        display(df_afs)

        

        # Insert DataFrame into SQL table

        df_afs.to_sql('#audited_financials', engine, if_exists='replace', index=False)


        #Delete the local file downloaded

       # os.remove(local_file_path)

        # Update status to completed#####


        # Retrieve the stored procedure name

        if Document_Type=='Audited Financial Statement': 

            proc_name = STORED_PROCEDURES['LOAD_AFS_DOCUMENT']

            proc_backtesting =STORED_PROCEDURES['LOAD_AFS_BACKTESTING']

            proc_budget_back=STORED_PROCEDURES['LOAD_BUDGET_BACK_AFS']

        elif Document_Type == 'Management Account':

            proc_name = STORED_PROCEDURES['LOAD_MA_DOCUMENT']

            proc_backtesting =STORED_PROCEDURES['LOAD_MA_BACKTESTING']

            proc_budget_back=STORED_PROCEDURES['LOAD_BUDGET_BACK_MA']

        else:

            proc_name = '' 


        proc_asset_val=STORED_PROCEDURES['LOAD_ASSET_VAL_MAP']

        # Execute the MERGE statement

        with engine.begin() as connection:

            connection.execute(text(f"EXEC {proc_name} :param1,:param2"),{'param1':Asset_Fund_Val_Doc_Map_Id,'param2':Asset_Valuation_Id})

            connection.execute(text(f"EXEC {proc_backtesting} :param1,:param2"),{'param1':Asset_Valuation_Id,'param2':Asset_Fund_Val_Doc_Map_Id})

            connection.execute(text(f"EXEC {proc_budget_back} :param1,:param2"),{'param1':Asset_Valuation_Id,'param2':Asset_Fund_Val_Doc_Map_Id})

            connection.execute(text(f"EXEC {proc_asset_val} :param1,:param2"),{'param1':Asset_Valuation_Id,'param2':Asset_Fund_Val_Doc_Map_Id})

            

        with conn.cursor() as cursor:

            cursor.execute("EXEC dbo.sp_Update_Data_Extraction_Status ?, ?", Asset_Fund_Val_Doc_Map_Id,2)

            conn.commit()


    except pyodbc.Error as e:

        print(f"Error: Unable to connect to the database. {e}")

        error_message = f"An unexpected error occurred: {e}"

        ## Update status to Failure#####

        with conn.cursor() as cursor:

            cursor.execute("EXEC dbo.sp_Update_Data_Extraction_Status ?, ?", Asset_Fund_Val_Doc_Map_Id,3)

            conn.commit()

        raise RuntimeError(error_message)


    except ValueError as e:

        # Handle specific exception for each query if no rows returned

        print(f"Program execution stopped due to an error: {e}")

        ## Update status to Failure#####

        with conn.cursor() as cursor:

            cursor.execute("EXEC dbo.sp_Update_Data_Extraction_Status ?, ?", Asset_Fund_Val_Doc_Map_Id,3)

            conn.commit()

        raise  # Re-raise the exception if needed

    except Exception as e:

        # Handle any other exceptions

        print(f"An unexpected error occurred: {e}")

        error_message = f"An unexpected error occurred: {e}"

        ## Update status to Failure#####

        with conn.cursor() as cursor:

            cursor.execute("EXEC dbo.sp_Update_Data_Extraction_Status ?, ?", Asset_Fund_Val_Doc_Map_Id,3)

            conn.commit()

        raise RuntimeError(error_message)

    

Comments

Popular posts from this blog

Introduction To Oracle10g

Insert

Except