rename columns using a column name in mapping table
try:
querySel = f"Select SourceFieldName,TargetFieldName from ctl.MappingTable where tableName='{table_name1}'"
mappingDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("query", querySel) \
.option("user", username) \
.option("password", password).load()
display(mappingDF)
except ValueError as error :
print("Connector write failed", error)
mappingDict = {row.asDict()['SourceFieldName']:row.asDict()['TargetFieldName'] for row in mappingDF.collect()}
print(mappingDict)
try:
data_df1.createOrReplaceTempView("data_df1_view")
query_val="select "
cnt=0
for i in data_df1.schema.fields:
field_name=i.name
fieldToBe=mappingDict[field_name]
if cnt>0:
query_val=query_val+" , "
query_val=query_val+field_name +" as " +fieldToBe
cnt=cnt+1
query_val=query_val+ " from data_df1_view"
print(query_val)
query_result_df=sqlContext.sql(query_val)
display(query_result_df)
except Exception as e:
print('exception ' ,e)
Comments
Post a Comment