Call ADI from Azure function
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeDocumentRequest
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from warnings import filterwarnings
import pyodbc
import sqlalchemy
from sqlalchemy import create_engine, text
import json
from azure.storage.blob import BlobServiceClient
import pandas as pd
import os
from IPython.display import display
from constants import STORED_PROCEDURES
from connection import get_engine,get_connection_details,var_blob_connection_string,var_container_name,var_subfolder_path,var_endpoint,var_key,var_formUrl
#from connection import get_engine,get_connection_details
def fn_extract_document_data(Asset_Fund_Val_Doc_Map_Id,Asset_Valuation_Id):
filterwarnings("ignore", category=UserWarning, message='.*pandas only supports SQLAlchemy connectable.*')
engine = get_engine()
conn=get_connection_details()
try:
doc_stat = pd.read_sql("EXEC dbo.sp_Get_Data_Extraction_Status ? ",conn,params=(Asset_Fund_Val_Doc_Map_Id))
extraction_stat=doc_stat['Data_Extraction_Status'].values[0]
print(f"extraction_status {extraction_stat}")
if extraction_stat == 1:
raise RuntimeError ("Document extraction is in progress now, please try after some time")
doc_details = pd.read_sql("EXEC dbo.sp_Get_Document_Details ? ,?", conn, params=(Asset_Fund_Val_Doc_Map_Id,Asset_Valuation_Id))
with conn.cursor() as cursor:
cursor.execute("EXEC dbo.sp_Update_Data_Extraction_Status ?, ?", Asset_Fund_Val_Doc_Map_Id,1)
conn.commit()
blob_connection_string=var_blob_connection_string
container_name=var_container_name
subfolder_path=var_subfolder_path
file_name=doc_details['Document_Path'].values[0]
blob_name = f"{file_name}.json"
endpoint=var_endpoint
key=var_key
model_id=doc_details['Model_Name'].values[0]
formUrl=f"{var_formUrl}{file_name}"
###################################
document_intelligence_client = DocumentIntelligenceClient(
endpoint=endpoint, credential=AzureKeyCredential(key)
)
# Make sure your document's type is included in the list of document types the custom model can analyze
poller = document_intelligence_client.begin_analyze_document(
model_id, AnalyzeDocumentRequest(url_source=formUrl)
)
result = poller.result()
# Extract JSON content from the result variable
result_json = result.as_dict()
# Convert the JSON content to a string
result_json_str = json.dumps(result_json)
# Create a BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string)
# Create a BlobClient
blob_client = blob_service_client.get_blob_client(container=container_name, blob=os.path.join(subfolder_path, blob_name))
# Upload the JSON content to the blob
blob_client.upload_blob(result_json_str, overwrite=True)
print(f"JSON content successfully uploaded to {os.path.join(subfolder_path, blob_name)} in container {container_name}.")
######################## Save to Table #######################
blob_name = doc_details['Document_Name'].values[0]
blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string)
container_client = blob_service_client.get_container_client(container_name)
#full_blob_path=f"{subfolder_path}{file_name}.json"
# Construct the full blob path
full_blob_path = f"{subfolder_path}{file_name}.json"
print(full_blob_path)
# Initialize blob client
blob_client = container_client.get_blob_client(full_blob_path)
# Download the JSON content directly from the blob
download_stream = blob_client.download_blob()
json_content = download_stream.readall()
# Parse JSON data from the downloaded content
data = json.loads(json_content)
try:
Document_Type=doc_details['Document_Type'].values[0]
# Extract values from valueObject
extracted_data = {}
documents = data['documents']
for document in documents:
if Document_Type == 'Audited Financial Statement':
afs_data = document['fields']['AFSData']['valueArray']
elif Document_Type == 'Management Account':
afs_data = document['fields']['MA_Dataset']['valueArray']
else:
afs_data = []
for item in afs_data:
value_object = item['valueObject']
for key, value in value_object.items():
if 'content' in value:
extracted_data[key] = value['content']
elif key not in extracted_data:
extracted_data[key] = None
except Exception as e:
# Handle any other exceptions
error_message = f"An unexpected error occurred, Please check the File you Upload : {e}"
raise RuntimeError(error_message)
# Create DataFrame from extracted data
df_afs = pd.DataFrame([extracted_data])
# Display DataFrame
display(df_afs)
# Insert DataFrame into SQL table
df_afs.to_sql('#audited_financials', engine, if_exists='replace', index=False)
#Delete the local file downloaded
# os.remove(local_file_path)
# Update status to completed#####
# Retrieve the stored procedure name
if Document_Type=='Audited Financial Statement':
proc_name = STORED_PROCEDURES['LOAD_AFS_DOCUMENT']
proc_backtesting =STORED_PROCEDURES['LOAD_AFS_BACKTESTING']
proc_budget_back=STORED_PROCEDURES['LOAD_BUDGET_BACK_AFS']
elif Document_Type == 'Management Account':
proc_name = STORED_PROCEDURES['LOAD_MA_DOCUMENT']
proc_backtesting =STORED_PROCEDURES['LOAD_MA_BACKTESTING']
proc_budget_back=STORED_PROCEDURES['LOAD_BUDGET_BACK_MA']
else:
proc_name = ''
proc_asset_val=STORED_PROCEDURES['LOAD_ASSET_VAL_MAP']
# Execute the MERGE statement
with engine.begin() as connection:
connection.execute(text(f"EXEC {proc_name} :param1,:param2"),{'param1':Asset_Fund_Val_Doc_Map_Id,'param2':Asset_Valuation_Id})
connection.execute(text(f"EXEC {proc_backtesting} :param1,:param2"),{'param1':Asset_Valuation_Id,'param2':Asset_Fund_Val_Doc_Map_Id})
connection.execute(text(f"EXEC {proc_budget_back} :param1,:param2"),{'param1':Asset_Valuation_Id,'param2':Asset_Fund_Val_Doc_Map_Id})
connection.execute(text(f"EXEC {proc_asset_val} :param1,:param2"),{'param1':Asset_Valuation_Id,'param2':Asset_Fund_Val_Doc_Map_Id})
with conn.cursor() as cursor:
cursor.execute("EXEC dbo.sp_Update_Data_Extraction_Status ?, ?", Asset_Fund_Val_Doc_Map_Id,2)
conn.commit()
except pyodbc.Error as e:
print(f"Error: Unable to connect to the database. {e}")
error_message = f"An unexpected error occurred: {e}"
## Update status to Failure#####
with conn.cursor() as cursor:
cursor.execute("EXEC dbo.sp_Update_Data_Extraction_Status ?, ?", Asset_Fund_Val_Doc_Map_Id,3)
conn.commit()
raise RuntimeError(error_message)
except ValueError as e:
# Handle specific exception for each query if no rows returned
print(f"Program execution stopped due to an error: {e}")
## Update status to Failure#####
with conn.cursor() as cursor:
cursor.execute("EXEC dbo.sp_Update_Data_Extraction_Status ?, ?", Asset_Fund_Val_Doc_Map_Id,3)
conn.commit()
raise # Re-raise the exception if needed
except Exception as e:
# Handle any other exceptions
print(f"An unexpected error occurred: {e}")
error_message = f"An unexpected error occurred: {e}"
## Update status to Failure#####
with conn.cursor() as cursor:
cursor.execute("EXEC dbo.sp_Update_Data_Extraction_Status ?, ?", Asset_Fund_Val_Doc_Map_Id,3)
conn.commit()
raise RuntimeError(error_message)
Comments
Post a Comment