Merge operation in ADLS table
path = get_abfss_path(linked_service, container_name, path) # get delta table path using linked service
df_count = df.count()
df = df.dropDuplicates() #remove duplicates in dataframe
if DeltaTable.isDeltaTable(self.spark, path): # check table exists already in delta lake
delta_table = DeltaTable.forPath(self.spark, path)
delta_table.alias("t").merge(
df.alias("u"),
"u.body.seqn = t.body.seqn") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
Comments
Post a Comment