赞
踩
流计算(Streaming)和批计算(Batch)
Spark Streaming 和 Spark Structured Streaming
案例说明
- from pyspark.sql import SparkSession
- from pyspark.sql import types as T
- from pyspark.sql import functions as F
- import time
-
-
- # partitions分区数量
- # parallelism并行的任务数量
- spark = SparkSession.builder \
- .appName("structured streaming") \
- .config("spark.sql.shuffle.partitions", "4") \
- .config("spark.default.parallelism", "4") \
- .config("master", "local[4]") \
- .enableHiveSupport() \
- .getOrCreate()
- sc = spark.sparkContext

- # 定义流计算的DataFrame表架构
- user_id = T.StructField("user_id", T.IntegerType())
- quantity = T.StructField("quantity", T.IntegerType())
- price = T.StructField("price", T.IntegerType())
- order_time = T.StructField("order_time", T.StringType())
- schema = T.StructType([user_id, quantity, price, order_time])
- import os
- import shutil
- import time
- import random
- import datetime
- import json
-
- path = "./data/streaming_input"
- # 如存在则删除
- if os.path.exists(path):
- shutil.rmtree(path)
- # 创建目录
- os.makedirs(path)
-
- for i in range(100):
- user_id = random.choice(range(1, 4))
- quantity = random.choice(range(10, 100))
- price = random.choice(range(100, 1000))
- order_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
- data = {
- "user_id": user_id,
- "quantity": quantity,
- "price": price,
- "order_time": order_time
- }
-
- file = os.path.join(path, str(i)+".json")
- with open(file, "w") as f:
- json.dump(data, f)
- time.sleep(5)

- # 流计算的输入目录,即流数据源
- input_dir = "data/streaming_input"
-
- df = spark.readStream.schema(schema).json(input_dir)
- df.printSchema()
- '''
- root
- |-- user_id: integer (nullable = true)
- |-- quantity: integer (nullable = true)
- |-- price: integer (nullable = true)
- |-- order_time: string (nullable = true)
- '''
- # 必须使用start()执行具有流数据源的查询
- # df.show()
- # 流计算的输出目录
- output_dir = "data/streaming_output"
- # 流计算的检查点目录:避免数据重复处理
- checkpoint = "data/checkpoint"
-
- stream = df.writeStream.format('csv').option('checkpointLocation', checkpoint).option('path', output_dir).start()
-
- # 运行 20 秒
- time.sleep(20)
-
- # 终止流计算
- stream.stop()
- total = df.groupBy('user_id').sum('quantity')
- # outputMode 调整输出模式 complete 完整数据
- stream = total.writeStream.outputMode('complete').format('console').start()
-
- # 运行 20 秒
- time.sleep(20)
-
- # 终止流计算
- stream.stop()
- stream = df.writeStream.queryName('hive_table').outputMode('append').format('memory').start()
-
- sql = 'select user_id, sum(quantity) from hive_table group by user_id'
- total = spark.sql(sql)
- total.show()
-
- # 手动终止流计算
- stream.stop()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。