赞
踩
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName("Enterprise Data Warehouse")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
# 读取销售订单数据
sales_orders = spark.read.format("csv").option("header", "true").load("sales_orders.csv")
# 读取销售订单详情数据
sales_order_details = spark.read.format("csv").option("header", "true").load("sales_order_details.csv")
# 读取产品信息数据
products = spark.read.format("csv").option("header", "true").load("products.csv")
# 读取客户信息数据
customers = spark.read.format("csv").option("header", "true").load("customers.csv")
# 将销售订单数据进行清洗和预处理
sales_orders = sales_orders.dropDuplicates() # 去重
sales_orders = sales_orders.dropna() # 去除缺失值
# 将销售订单详情数据进行清洗和预处理
sales_order_details = sales_order_details.dropDuplicates() # 去重
sales_order_details = sales_order_details.dropna() # 去除缺失值
# 将产品信息数据进行清洗和预处理
products = products.dropDuplicates() # 去重
products = products.dropna() # 去除缺失值
# 将客户信息数据进行清洗和预处理
customers = customers.dropDuplicates() # 去重
customers = customers.dropna() # 去除缺失值
# 合并销售订单数据和销售订单详情数据
order_details = sales_orders.join(sales_order_details, on="order_id")
# 合并订单详情数据和产品信息数据
products_info = order_details.join(products, on="product_id")
# 合并产品信息数据和客户信息数据
customers_info = products_info.join(customers, on="customer_id")
# 使用Spark SQL进行数据聚合和分析
customers_info.createOrReplaceTempView("customers_info")
result = spark.sql("""
SELECT
customer_id,
customer_name,
SUM(total_amount) AS total_sales_amount,
COUNT(DISTINCT order_id) AS total_orders,
COUNT(DISTINCT product_id) AS total_products
FROM customers_info
GROUP BY customer_id, customer_name
ORDER BY total_sales_amount DESC
""")
# 显示结果
result.show()
我们使用了Spark SQL实现企业数据仓库构建,具体实现过程如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。