当前位置:   article > 正文

Spark Delta Lake

Spark Delta Lake

rm -r dp-203 -f

git clone https://github.com/MicrosoftLearning/dp-203-azure-data-engineer dp-203

cd dp-203/Allfiles/labs/07

./setup.ps1

  1. %%pyspark
  2. df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/products/products.csv', format='csv'
  3. ## If header exists uncomment line below
  4. ##, header=True
  5. )
  6. display(df.limit(10))

  1. %%pyspark
  2. df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/products/products.csv', format='csv'
  3. ## If header exists uncomment line below
  4. , header=True
  5. )
  6. display(df.limit(10))

  1. delta_table_path = "/delta/products-delta"
  2. df.write.format("delta").save(delta_table_path)

  1. On the files tab, use the  icon in the toolbar to return to the root of the files container, and note that a new folder named delta has been created. Open this folder and the products-delta table it contains, where you should see the parquet format file(s) containing the data.

  1. from delta.tables import *
  2. from pyspark.sql.functions import *
  3. # Create a deltaTable object
  4. deltaTable = DeltaTable.forPath(spark, delta_table_path)
  5. # Update the table (reduce price of product 771 by 10%)
  6. deltaTable.update(
  7. condition = "ProductID == 771",
  8. set = { "ListPrice": "ListPrice * 0.9" })
  9. # View the updated data as a dataframe
  10. deltaTable.toDF().show(10)

  1. new_df = spark.read.format("delta").load(delta_table_path)
  2. new_df.show(10)

  1. new_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
  2. new_df.show(10)

deltaTable.history(10).show(20, False, True)

  1. spark.sql("CREATE DATABASE AdventureWorks")
  2. spark.sql("CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{0}'".format(delta_table_path))
  3. spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsExternal").show(truncate=False)

This code creates a new database named AdventureWorks and then creates an external tabled named ProductsExternalin that database based on the path to the parquet files you defined previously. It then displays a description of the table’s properties. Note that the Location property is the path you specified.

 

  1. %%sql
  2. USE AdventureWorks;
  3. SELECT * FROM ProductsExternal;

 

  1. df.write.format("delta").saveAsTable("AdventureWorks.ProductsManaged")
  2. spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsManaged").show(truncate=False)

This code creates a managed tabled named ProductsManaged based on the DataFrame you originally loaded from the products.csv file (before you updated the price of product 771). You do not specify a path for the parquet files used by the table - this is managed for you in the Hive metastore, and shown in the Location property in the table description (in the files/synapse/workspaces/synapsexxxxxxx/warehouse path).

  1. %%sql
  2. USE AdventureWorks;
  3. SELECT * FROM ProductsManaged;

 

  1. %%sql
  2. USE AdventureWorks;
  3. SHOW TABLES;

  1. %%sql
  2. USE AdventureWorks;
  3. DROP TABLE IF EXISTS ProductsExternal;
  4. DROP TABLE IF EXISTS ProductsManaged;

  1. Return to the files tab and view the files/delta/products-delta folder. Note that the data files still exist in this location. Dropping the external table has removed the table from the metastore, but left the data files intact.
  2. View the files/synapse/workspaces/synapsexxxxxxx/warehouse folder, and note that there is no folder for the ProductsManaged table data. Dropping a managed table removes the table from the metastore and also deletes the table’s data files.

  1. %%sql
  2. USE AdventureWorks;
  3. CREATE TABLE Products
  4. USING DELTA
  5. LOCATION '/delta/products-delta';

  1. %%sql
  2. USE AdventureWorks;
  3. SELECT * FROM Products;

  1. from notebookutils import mssparkutils
  2. from pyspark.sql.types import *
  3. from pyspark.sql.functions import *
  4. # Create a folder
  5. inputPath = '/data/'
  6. mssparkutils.fs.mkdirs(inputPath)
  7. # Create a stream that reads data from the folder, using a JSON schema
  8. jsonSchema = StructType([
  9. StructField("device", StringType(), False),
  10. StructField("status", StringType(), False)
  11. ])
  12. iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
  13. # Write some event data to the folder
  14. device_data = '''{"device":"Dev1","status":"ok"}
  15. {"device":"Dev1","status":"ok"}
  16. {"device":"Dev1","status":"ok"}
  17. {"device":"Dev2","status":"error"}
  18. {"device":"Dev1","status":"ok"}
  19. {"device":"Dev1","status":"error"}
  20. {"device":"Dev2","status":"ok"}
  21. {"device":"Dev2","status":"error"}
  22. {"device":"Dev1","status":"ok"}'''
  23. mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
  24. print("Source stream created...")

Ensure the message Source stream created… is printed. The code you just ran has created a streaming data source based on a folder to which some data has been saved, representing readings from hypothetical IoT devices.

  1. # Write the stream to a delta table
  2. delta_stream_table_path = '/delta/iotdevicedata'
  3. checkpointpath = '/delta/checkpoint'
  4. deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
  5. print("Streaming to delta sink...")

  1. # Read the data in delta format into a dataframe
  2. df = spark.read.format("delta").load(delta_stream_table_path)
  3. display(df)

  1. # create a catalog table based on the streaming sink
  2. spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))

  1. %%sql
  2. SELECT * FROM IotDeviceData;

 

  1. # Add more data to the source stream
  2. more_data = '''{"device":"Dev1","status":"ok"}
  3. {"device":"Dev1","status":"ok"}
  4. {"device":"Dev1","status":"ok"}
  5. {"device":"Dev1","status":"ok"}
  6. {"device":"Dev1","status":"error"}
  7. {"device":"Dev2","status":"error"}
  8. {"device":"Dev1","status":"ok"}'''
  9. mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

  1. %%sql
  2. SELECT * FROM IotDeviceData;

 

 deltastream.stop()

  1. -- This is auto-generated code
  2. SELECT
  3. TOP 100 *
  4. FROM
  5. OPENROWSET(
  6. BULK 'https://datalakexxxxxxx.dfs.core.windows.net/files/delta/products-delta/',
  7. FORMAT = 'DELTA'
  8. ) AS [result]

  1. USE AdventureWorks;
  2. SELECT * FROM Products;

Run the code and observe that you can also use the serverless SQL pool to query Delta Lake data in catalog tables that are defined the Spark metastore.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/801478
推荐阅读
相关标签
  

闽ICP备14008679号