Read and Write to delta table using explicit path

 First use below methods its mandatory


def set_abfss_config(spark: SparkSession, linked_servicestr):
    """Set the Spark configuration to access storage accounts using linked
    services.

    This function only supports managed identity for authentication. Once this
    configuration has been set, the storage account can be accessed using the
    abfss:// URI prefix.

    See https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-secure-credentials-with-tokenlibrary # # noqa E501

    Args:
        spark: A current SparkSession.
        linked_service: The linked service available in the Synapse workspace.
    """
    ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
    parse_result = urlparse(ls['Endpoint'])
    spark.conf.set(
        f"spark.storage.synapse.{parse_result.netloc}.linkedServiceName",
        linked_service)
    spark.conf.set(
        f"fs.azure.account.oauth.provider.type.{parse_result.netloc}",
        "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")  # noqa E501

#class ProcessingPattern4(UodaBaseClass):
def get_abfss_path(linked_servicestrcontainer_namestrpathstr) -> str:
    """Get the full Azure Blob File Storage Secure URI from a linked service.

    Args:
        linked_service: The linked service available in the Synapse workspace.
        container_name: The container in the storage account.
        path: The path to the blob in the container.

    Returns:
        A fully qualified UR using the abfss prefix.
    """
    ls = json.loads(mssparkutils.credentials.getPropertiesAll(linked_service))
    parse_result = urlparse(ls['Endpoint'])
    return f"abfss://{container_name}@{parse_result.netloc}/{path}"

linked_service='LS_ADLS_2'
container_name='eyuodaadlsfs02'
path='MineStar/GDS/Cdc_Delta'
set_abfss_config(spark,linked_service)
path = get_abfss_path(linked_service, container_name, path)
and create a a dataframe with same structure of data we expect

df_delta_path = spark.read.format("delta").load(_path)
            df_delta_path=df_delta_path.withColumn("updatedatetime",df_delta_path.updatedatetime.cast( "STRING"))
            _lastestDateTimeAry=df_delta_path.select(df_delta_path["updatedatetime"]).filter(df_delta_path["tabelname"]=='ctrl_ADF_RUN_HIST_CT').collect()
            #_getParamQuery = f"select cast(updatedatetime as STRING) from  default.cdcstatustable where tabelname='{_sourceTableName}'"
            #_lastestDateTimeAry = spark.sql(_getParamQuery).collect()
    

Comments

Popular posts from this blog

Introduction To Oracle10g

Insert

Except