Calling Signal R from Azure function
Connection.py
Inorder to run it locally you can change the Verify=False.And while pushing to Azure function or to git make it True
import base64
import functools
import azure.functions as func
import hashlib
import hmac
import json
import logging
import os
import time
import httpx
import requests
import sqlalchemy
import pyodbc
# Get the Azure Function logger
logger = logging.getLogger(__name__)
logger.info(f"__name__{__name__}")
def get_engine():
connection_string = (
'DRIVER={ODBC Driver 17 for SQL Server};'
f'SERVER={os.environ["DB_SERVER"]};'
f'DATABASE={os.environ["DB_SCHEMA"]};'
f'UID={os.environ["DB_USER"]};'
f'PWD={os.environ["DB_PWD"]};'
'Trusted_Connection=no;'
)
connection_url = sqlalchemy.engine.URL.create(
"mssql+pyodbc",
query=dict(odbc_connect=connection_string)
)
return sqlalchemy.create_engine(connection_url, fast_executemany=True)
def get_connection_details():
conn = pyodbc.connect(
'DRIVER={ODBC Driver 17 for SQL Server};\
SERVER=%s;\
DATABASE=%s;\
UID=%s;PWD=%s' %(os.environ["DB_SERVER"], os.environ["DB_SCHEMA"], os.environ["DB_USER"], os.environ["DB_PWD"])
)
return conn
var_blob_connection_string=os.environ["adi_blob_connection_string"]
var_container_name=os.environ["adi_container_name"]
var_subfolder_path=os.environ["adi_subfolder_path"]
var_endpoint=os.environ["adi_endpoint"]
var_key=os.environ["adi_key"]
var_formUrl=os.environ["adi_formUrl"]
async def generate_restapi_token(connection_string: str, hub_name: str, lifetime_minutes: int = 60):
"""Generates a JWT token specifically for REST API calls"""
# Parse connection string
parts = dict(pair.split('=', 1) for pair in connection_string.split(';') if '=' in pair)
endpoint = parts['Endpoint'].rstrip('/')
access_key = parts['AccessKey']
# Ensure proper endpoint format
if not endpoint.startswith('https://'):
endpoint = f'https://{endpoint}'
# REST API audience (DIFFERENT from client audience)
audience = f"{endpoint}/api/v1/hubs/{hub_name.lower()}"
# CORRECT expiration format (Unix epoch in seconds)
expiration = int(time.time()) + (lifetime_minutes * 60)
# Create token payload
payload = {
"aud": audience, # Must match the REST API endpoint
"exp": expiration # Must be Unix time
}
# Create and sign token
header = {"alg": "HS256", "typ": "JWT"}
encoded_header = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b'=').decode()
encoded_payload = base64.urlsafe_b64encode(json.dumps(payload).encode()).rstrip(b'=').decode()
signing_input = f"{encoded_header}.{encoded_payload}"
signature = hmac.new(
access_key.encode('utf-8'),
signing_input.encode('utf-8'),
hashlib.sha256
).digest()
encoded_signature = base64.urlsafe_b64encode(signature).rstrip(b'=').decode()
return {
"token": f"{encoded_header}.{encoded_payload}.{encoded_signature}",
"audience": audience,
"expires_at": expiration
}
async def send_restapi_message(hub_name: str, message: str, group_name: str = None):
connection_string = os.getenv("azuresignalrconnectionstring")
# Generate token with correct REST API audience
token_info = await generate_restapi_token(connection_string, hub_name)
# Prepare request
headers = {
"Authorization": f"Bearer {token_info['token']}",
"Content-Type": "application/json"
}
payload = {
"target": "ReceiveNotification", # Must match .NET hub method
"arguments": [message] # Must match method parameters
}
if group_name:
payload["groups"] = [group_name]
# Send to the EXACT audience URL
async with httpx.AsyncClient(verify=False) as client: # Point to your CA cert
try:
response = await client.post(
token_info['audience'],
headers=headers,
json=payload
)
response.raise_for_status()
logging.info(f"Async call successful: {response.status_code}")
logging.info(response.text)
except httpx.RequestError as exc:
logging.info(f"An error occurred while requesting {exc.request.url!r}: {exc}")
except httpx.HTTPStatusError as exc:
logging.info(f"Error response {exc.response.status_code} while requesting {exc.request.url!r}: {exc.response.text}")
if response.status_code != 202:
raise Exception(f"API request failed: {response.status_code} - {response.text}")
return response.json()
def notify_and_handle_errors(route_func):
@functools.wraps(route_func)
async def wrapper(req: func.HttpRequest, *args, **kwargs):
logging.info('Processing request...')
# Notify start (non-blocking)
try:
await send_restapi_message(
hub_name="notificationhub",
message='process started',
group_name="valuator-group"
)
except Exception as e:
logging.warning(f"Start notification failed: {e}")
try:
# Execute main route logic
result_json = await route_func(req, *args, **kwargs)
# Notify completion (non-blocking)
try:
await send_restapi_message(
hub_name="notificationhub",
message='process completed',
group_name="valuator-group"
)
except Exception as e:
logging.warning(f"Completion notification failed: {e}")
return func.HttpResponse(
json.dumps({"status": "success", "result": result_json}),
mimetype="application/json",
status_code=200
)
except (RuntimeError, ValueError, KeyError, TypeError) as e:
logging.error(f"Client error: {e}")
return func.HttpResponse(
json.dumps({"status": "error", "message": str(e)}),
mimetype="application/json",
status_code=400
)
except Exception as e:
logging.error(f"Server error: {e}")
return func.HttpResponse(
json.dumps({"status": "error", "message": str(e)}),
mimetype="application/json",
status_code=500
)
return wrapper
Comments
Post a Comment