当前位置:   article > 正文

PySpark —— 流计算_pyspark stream

pyspark stream

1.流计算基本概念

  • Structured Streaming: 结构化流计算

流计算(Streaming)和批计算(Batch)

  • 批计算(或批处理),处理离线的数据,单次处理数据量大,速度较慢
  • 流计算(或流处理),处理在线的实时数据,单次处理数据量小,速度较快

Spark Streaming 和 Spark Structured Streaming

  • Spark2.0前期版本主推的流计算模块为Spark Streaming,目前主推Structured Streaming
  • Spark Streaming 建立在RDD上,其数据结构为DStream
  • Structured Streaming 建立在SparkSQL上,其数据结构为DataFrame,大部分API支持流计算,另外SparkSQL也具备自动优化,所以性能更佳

案例说明

  • 先通过make_streaming_data.py构造虚拟的实时交易数据
  • 再通过流计算来处理实时的交易数据
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql import types as T
  3. from pyspark.sql import functions as F
  4. import time
  5. # partitions分区数量
  6. # parallelism并行的任务数量
  7. spark = SparkSession.builder \
  8. .appName("structured streaming") \
  9. .config("spark.sql.shuffle.partitions", "4") \
  10. .config("spark.default.parallelism", "4") \
  11. .config("master", "local[4]") \
  12. .enableHiveSupport() \
  13. .getOrCreate()
  14. sc = spark.sparkContext

2.定义表架构

  1. # 定义流计算的DataFrame表架构
  2. user_id = T.StructField("user_id", T.IntegerType())
  3. quantity = T.StructField("quantity", T.IntegerType())
  4. price = T.StructField("price", T.IntegerType())
  5. order_time = T.StructField("order_time", T.StringType())
  6. schema = T.StructType([user_id, quantity, price, order_time])

3.make_streaming_data

  1. import os
  2. import shutil
  3. import time
  4. import random
  5. import datetime
  6. import json
  7. path = "./data/streaming_input"
  8. # 如存在则删除
  9. if os.path.exists(path):
  10. shutil.rmtree(path)
  11. # 创建目录
  12. os.makedirs(path)
  13. for i in range(100):
  14. user_id = random.choice(range(1, 4))
  15. quantity = random.choice(range(10, 100))
  16. price = random.choice(range(100, 1000))
  17. order_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
  18. data = {
  19. "user_id": user_id,
  20. "quantity": quantity,
  21. "price": price,
  22. "order_time": order_time
  23. }
  24. file = os.path.join(path, str(i)+".json")
  25. with open(file, "w") as f:
  26. json.dump(data, f)
  27. time.sleep(5)

4.读取流数据

  1. # 流计算的输入目录,即流数据源
  2. input_dir = "data/streaming_input"
  3. df = spark.readStream.schema(schema).json(input_dir)
  4. df.printSchema()
  5. '''
  6. root
  7. |-- user_id: integer (nullable = true)
  8. |-- quantity: integer (nullable = true)
  9. |-- price: integer (nullable = true)
  10. |-- order_time: string (nullable = true)
  11. '''
  12. # 必须使用start()执行具有流数据源的查询
  13. # df.show()

5.输出到文件

  • 必须设置检查点目录checkpointLocation
  • 在代码 make_streaming_data 没执行完之前运行,输出的数据会随之变化
  1. # 流计算的输出目录
  2. output_dir = "data/streaming_output"
  3. # 流计算的检查点目录:避免数据重复处理
  4. checkpoint = "data/checkpoint"
  5. stream = df.writeStream.format('csv').option('checkpointLocation', checkpoint).option('path', output_dir).start()
  6. # 运行 20 秒
  7. time.sleep(20)
  8. # 终止流计算
  9. stream.stop()

6.输出到控制台

  • Jupyter Notebook(命令行窗口)
  1. total = df.groupBy('user_id').sum('quantity')
  2. # outputMode 调整输出模式 complete 完整数据
  3. stream = total.writeStream.outputMode('complete').format('console').start()
  4. # 运行 20 秒
  5. time.sleep(20)
  6. # 终止流计算
  7. stream.stop()

7.输出到内存

  1. stream = df.writeStream.queryName('hive_table').outputMode('append').format('memory').start()
  2. sql = 'select user_id, sum(quantity) from hive_table group by user_id'
  3. total = spark.sql(sql)
  4. total.show()
  1. # 手动终止流计算
  2. stream.stop()
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/709936
推荐阅读
相关标签
  

闽ICP备14008679号