当前位置:   article > 正文

spark核心原理刨析:案例解析(第2天)

spark核心原理刨析:案例解析(第2天)

系列文章目录

  • 1- Spark词频统计案例加强-hdfs读写(掌握)
  • 2- Spark on Yarn 环境—验证案例(操作)
  • 3- spark-submit命令(了解)
  • 4- PySpark程序与Spark交互流程(掌握)
  • 5- 常见面试题(掌握)


前言

本文主要介绍了,
一:
1.spark读写hdfs案例
2.spark链式编程
3.spark排序案例
二:环境验证
验证spark on yarn 环境
三:pyspark程序与spark交互流程
四:整理面试题
1.请列举常见的RDD算子,以及对应功能
2.请列举常见的spark_submit参数,以及对应功能
3.请表述下spark中核心概念
4.请表述下cluster on yarn集群的执行流程


一、1- Spark词频统计案例加强-hdfs读写(掌握)

1、读写HDFS

1- 启动Hadoop集群
start-all.sh

2- 验证Hadoop集群是否运行正常
jps

访问
http://node1:9870
http://node1:8088

3- 上传文件到HDFS
准备一个content.txt文件,内容如下
hello hello spark
hello heima spark

接着将content.txt文件上传到hdfs
hdfs dfs -mkdir /input
hdfs dfs -put content.txt /input/


4- 基于原生入门案例,修改文件路径为 hdfs://node1:8020/...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

代码详解:

# 导包
import os
import time

from pyspark import SparkConf, SparkContext

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    print("Spark加强案例: 从hdfs中读取文件进行WordCount词频统计开始~")
    # - 1.创建SparkContext对象
    conf = SparkConf().setAppName('my_pyspark_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    print(sc)  # 默认<SparkContext master=local[*] appName=pyspark-shell>
    # - 2.数据输入
    # 注意: 默认是linux本地路径省略了file:///  如果是hdfs路径必须加hdfs://node1:8020/
    # 注意: 需要修改为hdfs路径
    init_rdd = sc.textFile('hdfs://node1:8020/input/content.txt')
    # - 3.数据处理
    #   - 3.1文本内容切分
    flatmap_rdd = init_rdd.flatMap(lambda line: line.split(' '))
    #   - 3.2数据格式转换
    map_rdd = flatmap_rdd.map(lambda word: (word, 1))
    #   - 3.3分组和聚合
    reduce_rdd = map_rdd.reduceByKey(lambda agg, curr: agg + curr)
    # - 4.数据输出
    # 注意: 此时结果也需要利用算子存储到hdfs中
    reduce_rdd.saveAsTextFile('hdfs://node1:8020/day02_output')
    # - 5.释放资源
    time.sleep(100)
    sc.stop()
    print("Spark加强案例: WordCount词频统计结果已经保存到hdfs,程序结束~")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

可能报的错误:
在这里插入图片描述

原因: 输出路径已经存在
解决: 直接删除已经存在的路径即可

该错误需要查看Hadoop的源代码(131行):https://gitee.com/highmoutain/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java
  • 1
  • 2
  • 3
  • 4

2、链式编程实现

链式编程: 就是一种代码简洁的写法。省略中间变量的声明,调用完一个算子后,继续调用其他算子
使用场景:当功能需求比较简单(代码比较少)的时候,推荐使用链式编程,可以让你的代码看起来更加简洁。
另外,算子返回值类型都是相同的时候才能够使用
  • 1
  • 2
  • 3

代码详解:

# 导包
import os

from pyspark import SparkConf, SparkContext

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    print("Spark加强案例: 从hdfs中读取文件进行WordCount词频统计开始~")
    # - 1.创建SparkContext对象
    conf = SparkConf().setAppName('my_pyspark_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    print(sc)  # 默认<SparkContext master=local[*] appName=pyspark-shell>
    
    # - 2.数据输入_处理_输出
    result = sc.textFile('hdfs://node1:8020/input/content.txt') \
        .flatMap(lambda line: line.split(' ')) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda agg, curr: agg + curr)
    print(result.collect())
    
    # - 3.释放资源
    sc.stop()
    print("Spark加强案例:程序结束~")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

3、对结果排序

sortBy(参数1,参数2):
	参数1:  自定义函数,通过函数指定按照谁来进行排序操作
	参数2:  (可选)boolean类型,表示是否为升序。默认为True,表示升序
	
sortByKey(参数1):
	参数1: 可选的,boolean类型,表示是否为升序。默认为True 表示升序
		
top(N,函数):
	参数N: 取RDD的前N个元素
	参数函数:(可选)如果kv(键值对)类型,默认是根据key进行排序操作,如果想根据其他排序,可以定义函数指定
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

代码详细实现:

# 导包
from pyspark import SparkContext
import os

# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # - 1.创建SparkContext对象
    sc = SparkContext()  # 默认master=local[*],默认appname=pyspark-shell
    print(sc)
    # - 2.数据输入
    # - 3.数据处理
    reduceRDD = sc.textFile('file:///export/data/spark_project/spark_base/content.txt') \
        .flatMap(lambda line: line.split(
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/771000
推荐阅读
相关标签
  

闽ICP备14008679号