rename columns using a column name in mapping table

try:
  querySel = f"Select SourceFieldName,TargetFieldName from ctl.MappingTable  where tableName='{table_name1}'"
  mappingDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("query", querySel) \
        .option("user", username) \
        .option("password", password).load()
  display(mappingDF)
except ValueError as error :

    print("Connector write failed", error) 


mappingDict = {row.asDict()['SourceFieldName']:row.asDict()['TargetFieldName']  for row in mappingDF.collect()}

print(mappingDict)

try:
    data_df1.createOrReplaceTempView("data_df1_view")
    query_val="select "
    cnt=0
    for i in data_df1.schema.fields:
        field_name=i.name
        fieldToBe=mappingDict[field_name]
        if cnt>0:
            query_val=query_val+" , "
        query_val=query_val+field_name +" as " +fieldToBe
        cnt=cnt+1
    query_val=query_val+ " from data_df1_view"
    print(query_val)
    query_result_df=sqlContext.sql(query_val)
    display(query_result_df)
except Exception as e:
    print('exception ' ,e)    

Comments

Popular posts from this blog

Introduction To Oracle10g

Insert

Except