Calling Signal R from Azure function

 Connection.py

Inorder to run it locally you can change the Verify=False.And while pushing to Azure function or to git make it True

import base64
import functools
import azure.functions as func
import hashlib
import hmac
import json
import logging
import os
import time
import httpx
import requests
import sqlalchemy
import pyodbc

# Get the Azure Function logger
logger = logging.getLogger(__name__)
logger.info(f"__name__{__name__}")
def get_engine():

   
    connection_string = (
    'DRIVER={ODBC Driver 17 for SQL Server};'
    f'SERVER={os.environ["DB_SERVER"]};'
    f'DATABASE={os.environ["DB_SCHEMA"]};'
    f'UID={os.environ["DB_USER"]};'
    f'PWD={os.environ["DB_PWD"]};'
    'Trusted_Connection=no;'
    )
    connection_url = sqlalchemy.engine.URL.create(
        "mssql+pyodbc",
        query=dict(odbc_connect=connection_string)
    )
    return sqlalchemy.create_engine(connection_url, fast_executemany=True)

def get_connection_details():
    conn = pyodbc.connect(
        'DRIVER={ODBC Driver 17 for SQL Server};\
            SERVER=%s;\
                DATABASE=%s;\
                    UID=%s;PWD=%s' %(os.environ["DB_SERVER"], os.environ["DB_SCHEMA"], os.environ["DB_USER"], os.environ["DB_PWD"])
        )
    return conn

var_blob_connection_string=os.environ["adi_blob_connection_string"]
var_container_name=os.environ["adi_container_name"]
var_subfolder_path=os.environ["adi_subfolder_path"]
var_endpoint=os.environ["adi_endpoint"]
var_key=os.environ["adi_key"]
var_formUrl=os.environ["adi_formUrl"]  

async def generate_restapi_token(connection_string: str, hub_name: str, lifetime_minutes: int = 60):
    """Generates a JWT token specifically for REST API calls"""
    # Parse connection string
    parts = dict(pair.split('=', 1) for pair in connection_string.split(';') if '=' in pair)
   
    endpoint = parts['Endpoint'].rstrip('/')
    access_key = parts['AccessKey']
   
    # Ensure proper endpoint format
    if not endpoint.startswith('https://'):
        endpoint = f'https://{endpoint}'
   
    # REST API audience (DIFFERENT from client audience)
    audience = f"{endpoint}/api/v1/hubs/{hub_name.lower()}"
   
    # CORRECT expiration format (Unix epoch in seconds)
    expiration = int(time.time()) + (lifetime_minutes * 60)
   
    # Create token payload
    payload = {
        "aud": audience,  # Must match the REST API endpoint
        "exp": expiration  # Must be Unix time
    }
   
    # Create and sign token
    header = {"alg": "HS256", "typ": "JWT"}
    encoded_header = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b'=').decode()
    encoded_payload = base64.urlsafe_b64encode(json.dumps(payload).encode()).rstrip(b'=').decode()
   
    signing_input = f"{encoded_header}.{encoded_payload}"
    signature = hmac.new(
        access_key.encode('utf-8'),
        signing_input.encode('utf-8'),
        hashlib.sha256
    ).digest()
   
    encoded_signature = base64.urlsafe_b64encode(signature).rstrip(b'=').decode()
   
    return {
        "token": f"{encoded_header}.{encoded_payload}.{encoded_signature}",
        "audience": audience,
        "expires_at": expiration
    }
async def send_restapi_message(hub_name: str, message: str, group_name: str = None):    
    connection_string = os.getenv("azuresignalrconnectionstring")
    # Generate token with correct REST API audience
    token_info = await generate_restapi_token(connection_string, hub_name)
   
    # Prepare request
    headers = {
        "Authorization": f"Bearer {token_info['token']}",
        "Content-Type": "application/json"
    }
   
    payload = {
        "target": "ReceiveNotification",  # Must match .NET hub method
        "arguments": [message]       # Must match method parameters
    }
   
    if group_name:
        payload["groups"] = [group_name]
   
    # Send to the EXACT audience URL
       
    async with httpx.AsyncClient(verify=False) as client: # Point to your CA cert
        try:
            response = await client.post(
                token_info['audience'],
                headers=headers,
                json=payload
            )
            response.raise_for_status()
            logging.info(f"Async call successful: {response.status_code}")
            logging.info(response.text)
        except httpx.RequestError as exc:
            logging.info(f"An error occurred while requesting {exc.request.url!r}: {exc}")
        except httpx.HTTPStatusError as exc:
            logging.info(f"Error response {exc.response.status_code} while requesting {exc.request.url!r}: {exc.response.text}")

   
    if response.status_code != 202:
        raise Exception(f"API request failed: {response.status_code} - {response.text}")
   
    return response.json()

def notify_and_handle_errors(route_func):
    @functools.wraps(route_func)
    async def wrapper(req: func.HttpRequest, *args, **kwargs):
        logging.info('Processing request...')

        # Notify start (non-blocking)
        try:
            await send_restapi_message(
                hub_name="notificationhub",
                message='process started',
                group_name="valuator-group"
            )
        except Exception as e:
            logging.warning(f"Start notification failed: {e}")

        try:
            # Execute main route logic
            result_json = await route_func(req, *args, **kwargs)

            # Notify completion (non-blocking)
            try:
                await send_restapi_message(
                    hub_name="notificationhub",
                    message='process completed',
                    group_name="valuator-group"
                )
            except Exception as e:
                logging.warning(f"Completion notification failed: {e}")

            return func.HttpResponse(
                json.dumps({"status": "success", "result": result_json}),
                mimetype="application/json",
                status_code=200
            )

        except (RuntimeError, ValueError, KeyError, TypeError) as e:
            logging.error(f"Client error: {e}")
            return func.HttpResponse(
                json.dumps({"status": "error", "message": str(e)}),
                mimetype="application/json",
                status_code=400
            )

        except Exception as e:
            logging.error(f"Server error: {e}")
            return func.HttpResponse(
                json.dumps({"status": "error", "message": str(e)}),
                mimetype="application/json",
                status_code=500
            )

    return wrapper

Comments

Popular posts from this blog

Introduction To Oracle10g

Insert

Except