Posts

Showing posts from October, 2022

Write Data to SQL Server from Synapse

   df_op_sensor_sql=spark.createDataFrame( data =list_data_consolidated, schema =list_schema_consolidated)           #adding data to sql server table                         df_op_sensor_sql.write \         .format( "com.microsoft.sqlserver.jdbc.spark" ) \         .mode( "append" ) \         .option( "url" , url) \         .option( "dbtable" , target_tablename) \         .option( "user" , username) \         .option( "password" , password) \         .save()

Read sql server data Using synapse notebook

    serverName=mssparkutils.credentials.getSecret( "kvidpresources" , "rio-assurance-server" , "lsidpkeyvault" )         databaseName=mssparkutils.credentials.getSecret( "kvidpresources" , "rio-assurance-database" , "lsidpkeyvault" )          userName=mssparkutils.credentials.getSecret( "kvidpresources" , "rio-assurance-db-user" , "lsidpkeyvault" )         password=mssparkutils.credentials.getSecret( "kvidpresources" , "rio-assurance-db-password" , "lsidpkeyvault" )         server_name =  "jdbc:sqlserver://" +serverName         database_name = databaseName         url = server_name +  ";"  +  "databaseName="  + database_name +  ";"      ...

rename columns using a column name in mapping table

try :   querySel =  f "Select SourceFieldName,TargetFieldName from ctl.MappingTable  where tableName=' { table_name1 } '"   mappingDF = spark.read \         .format( "com.microsoft.sqlserver.jdbc.spark" ) \         .option( "url" , url) \         .option( "query" , querySel) \         .option( "user" , username) \         .option( "password" , password).load()   display(mappingDF) except   ValueError   as  error :      print ( "Connector write failed" , error)   mappingDict = {row.asDict()[ 'SourceFieldName' ]:row.asDict()[ 'TargetFieldName' ]   for  row  in  mappingDF.colle...

Flatten Json string using explode

  Using PySpark to Read and Flatten JSON data with an enforced schema – Ben Alex Keen data_df1 = spark.read.json(df.rdd.map(  lambda   row :row.body),  schema =sensor_schema) from  pyspark.sql.types  import  StructType, StructField, StringType, IntegerType, DoubleType, ArrayType sensor_schema = StructType( fields =[     StructField( 'accelerometer_x' , StringType(),  True ),     StructField( 'accelerometer_y' , StringType(),  True ),     StructField( 'accelerometer_z' ,StringType(), True ),     StructField( 'gyroscope' ,     StructType( fields =[         StructField( 'gyroscope_x' ,StringType(), True ),         StructField( 'gyroscope_y' ,StringType(), True ),         Stru...

Run pyspark file as python

 #!/usr/bin/env python # coding: utf-8 # ## CallDeltaLakeand SqlServer #  #  #  # In[ ]: import json import jsonpointer from pyspark.sql.functions import lit,col from pyspark.sql import SparkSession import sys from notebookutils import mssparkutils from pyspark import SparkContext, SparkConf  spark = SparkSession.builder.appName("pattern1").getOrCreate() conf = SparkConf().setAppName("pattern1").set("spark.hadoop.validateOutputSpecs", "false") # Adding to delta lake def write2deltatable(df_ip_sensor,target_table):     try:                df_ip_sensor_delta =df_ip_sensor.select(df_ip_sensor["body"])         df_ip_sensor_delta.write.mode("append").format("delta").saveAsTable(target_table)            except Exception as e:         print(e)         #process dataframe to add data to sql server table def write2sqltable(df_ip_senso...

Create an Event Publisher to an EventHub

 import time import os import uuid import datetime import random import json import pytz from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices= [   "20100",   "20101",   "20102" ]; # Create a producer client to produce and publish events to the event hub. #crete data  data = [ {"seqn":"1645599504046","device-id":devices[0],"barometer-pressure":"9","temperature":"0","heart-rate":"0","Strap-id":"","timestamp":"1645599504046","postTimestamp":"1645599504046.055","accelerometer_x":"36","accelerometer_y":"50","accelerometer_z":"46","gyroscope":[{"accelerometer_y":"6","gyroscope_y":"10","gyroscope_z":"8"},{...

Read Events From Event Hub and Put it to SQL Server and Delta Lake

  import  json import  jsonpointer from  pyspark.sql.functions  import  lit,col # Adding to delta lake def   write2deltatable ( df1 , tbl_name ):      try :                  df_final=df1.select(df1[ "body" ])         df_final.write.mode( "append" ).format( "delta" ).saveAsTable(tbl_name)              except   Exception   as  e:          print (e) #process dataframe to add data to sql server table def   write2sqltable ( df1 , tbl_name ):      try :          #query fetch json pointer and target schema          #system...