Class perform write to ADLS and SQL server target
def get_abfss_path(linked_service, container_name, path):
ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
parse_result = urlparse(ls['Endpoint'])
return f"abfss://{container_name}@{parse_result.netloc}/{path}"
def set_abfss_config(spark, linked_service):
ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
parse_result = urlparse(ls['Endpoint'])
spark.conf.set(f"spark.storage.synapse.{parse_result.netloc}.linkedServiceName", linked_service)
spark.conf.set(f"fs.azure.account.oauth.provider.type.{parse_result.netloc}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")
class ProcessingPattern1(UodaBaseClass):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def execute_business_logic(self):
self.logger.info("Starting execution of business logic")
for target in [t for t in self.data_movement_config['targets'] if t['type'] == "ADLS_Table"]:
set_abfss_config(self.spark, target['linkedService'])
eh_conn_string = mssparkutils.credentials.getSecretWithLS(
self.data_movement_config["source"]["keyVaultLinkedService"],
self.data_movement_config["source"]["keyVaultSecretName"]
)
set_abfss_config(self.spark, self.data_movement_config["source"]["checkpointLinkedService"])
eh_checkpoint_location = get_abfss_path(self.data_movement_config["source"]["checkpointLinkedService"], self.data_movement_config["source"]["checkpointContainer"], self.data_movement_config["source"]["checkpointPath"])
self.create_log_step("Starting data movement from EventHub to ADLS/RDBMS in ODS")
json_schema_cur = self._support_db_conn.execute_with_retry(
f"""
SELECT
source_pointer,
src_schema_name,
tgt_schema_name,
table_name
FROM
ctrl.map_all_jsonpointer
WHERE
event_name='{self.data_movement_config["source"]["eventHub"]}'
"""
)
self.json_schema = list(json_schema_cur.fetchall())
del json_schema_cur
try:
self.read_from_eventhub(eh_conn_string, eh_checkpoint_location)
except Exception as e:
self.update_log_step(DataMovementStepStatus.FAILED)
self.create_log_step_event(f'Data movement failed. Exception message is {e}')
raise
def write_to_adls(self, df, linked_service, container_name, path):
try:
df.show()
path = get_abfss_path(linked_service, container_name, path)
df_count = df.count()
df = df.dropDuplicates()
if DeltaTable.isDeltaTable(self.spark, path):
delta_table = DeltaTable.forPath(self.spark, path)
delta_table.alias("t").merge(
df.alias("u"),
"u.body.seqn = t.body.seqn") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
hist = delta_table.history(1).first()['operationMetrics']
self.create_log_step_audit(
event_text=f"Microbatch merged to {linked_service} {path}. Input count={df_count}, without duplicates={df.count()}",
record_count=df.count(),
insert_record_count=hist['numTargetRowsInserted'],
update_record_count=hist['numTargetRowsUpdated'],
delete_record_count=hist['numTargetRowsDeleted'],
)
self.logger.info(f"Microbatch merged to {linked_service} {path}." \
f"Input count={df_count}, without duplicates={df.count()}, " \
f"insert_record_count={hist['numTargetRowsInserted']}," \
f"update_record_count={hist['numTargetRowsInserted']}," \
f"delete_record_count={hist['numTargetRowsInserted']}."
)
else:
self.logger.info(f"Delta table not found at {path}, creating a new table.")
df.write \
.format("delta") \
.save(path)
self.create_log_step_audit(
event_text=f"Microbatch merged to {linked_service} {path}. Input count={df_count}, without duplicates={df.count()}",
record_count=df.count(),
insert_record_count=df.count(),
)
self.logger.info(f"Microbatch created new table to {linked_service} {path}." \
f"Input count={df_count}, without duplicates={df.count()}, " \
f"insert_record_count={hist['numTargetRowsInserted']}."
)
except Exception as e:
self.logger.error(f"Exception occurred while writing to ADLS {linked_service} {path}. Exception is {e}")
raise
def write_to_rdbms(self, df, linked_service, schema_name, table_name):
try:
target_rows = list()
target_schema = [tgt_schema_name for _, _, tgt_schema_name, _ in self.json_schema]
for json_data in df.collect():
json_body = json.loads(json_data['body'])
tmp_rows = list()
for source_pointer, _, _, _ in self.json_schema:
pointer = jsonpointer.JsonPointer(source_pointer)
tmp_rows.append(pointer.resolve(json_body))
target_rows.append(tmp_rows)
df_target = spark.createDataFrame(
data=target_rows,
schema=target_schema
)
rdbms_linked_service = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
jdbc_url = f"jdbc:sqlserver://{rdbms_linked_service['Endpoint']};" \
f"database={rdbms_linked_service['Database']};"
full_table_name = f"{schema_name}.{table_name}"
# df_target.write \
# .format("com.microsoft.sqlserver.jdbc.spark") \
# .mode("append") \
# .option("url", jdbc_url) \
# .option("dbtable", full_table_name) \
# .option("accessToken", rdbms_linked_service['AuthKey']) \
# .save()
df_target.createOrReplaceTempView("updates")
df_existing = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", jdbc_url) \
.option("dbtable", full_table_name) \
.option("accessToken", rdbms_linked_service['AuthKey']) \
.load().createOrReplaceTempView("existing")
cols = df_existing.schema.names
# TODO - update with right column names, and replace seqn
pk = "seqn"
output = self.spark.sql(f""""
DECLARE @SummaryOfChanges TABLE(Change VARCHAR(20));
MERGE existing AS Target
USING (SELECT Col1,Col2 FROM updates) AS Source
ON (Target.seqn = Source.seqn)
WHEN MATCHED THEN
UPDATE SET target.Col2 = source.Col2
WHEN NOT MATCHED BY TARGET THEN
INSERT (Col1,Col2) VALUES (Col1,Col2);
OUTPUT $action INTO @SummaryOfChanges;
SELECT Change, COUNT(*) AS CountPerChange
FROM @SummaryOfChanges
GROUP BY Change;
""")
inserted_count = df_target.count()
# self.create_log_step_audit(
# event_text=f"Batch inserted to {linked_service} {full_table_name}",
# insert_record_count=inserted_count,
# )
self.logger.info(f"Wrote {inserted_count} to {linked_service} {full_table_name}")
except Exception as e:
self.logger.error(f"Exception occurred while writing to RDBMS {linked_service} {schema_name}.{table_name}. Exception message is {e}")
raise
def foreach_batch_function(self, df_event_input, epoch_id):
try:
self.logger.info(f"Batch:{epoch_id} Record Count:{df_event_input.count()}")
df = df_event_input.select(["body"])
for target in self.data_movement_config['targets']:
target_type = target['type']
if target_type == "ADLS_Table":
self.write_to_adls(df, target['linkedService'], target['containerName'], target['path'])
# elif target_type == "RDBMS_UPSERT_SQLSERVER":
# self.write_to_rdbms(df, target['linkedService'], target['schemaName'], target['tableName'])
else:
self.logger.error(f"Target type {target_type} not supported.")
except Exception as e:
self.logger.error(f"Batch writing failed. Exception message is {e}")
raise
def read_from_eventhub(self, eh_conn_string, eh_checkpoint_location):
self.logger.info(f"Started reading from EventHub")
ehConf = {
'eventhubs.connectionstring' : spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn_string)
}
try:
df_event_input_raw = spark.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
df_event_input = df_event_input_raw \
.withColumn("body", df_event_input_raw["body"].cast("string"))
df_event_input.writeStream \
.outputMode("append") \
.option("checkpointlocation", eh_checkpoint_location) \
.foreachBatch(self.foreach_batch_function) \
.start() \
.awaitTermination()
except Exception as e:
self.logger.error(f"Read from EventHub failed. Exception message is {e}")
raise
Comments
Post a Comment