赞
踩
本文主要介绍了,
一:
1.spark读写hdfs案例
2.spark链式编程
3.spark排序案例
二:环境验证
验证spark on yarn 环境
三:pyspark程序与spark交互流程
四:整理面试题
1.请列举常见的RDD算子,以及对应功能
2.请列举常见的spark_submit参数,以及对应功能
3.请表述下spark中核心概念
4.请表述下cluster on yarn集群的执行流程
1- 启动Hadoop集群 start-all.sh 2- 验证Hadoop集群是否运行正常 jps 访问 http://node1:9870 http://node1:8088 3- 上传文件到HDFS 准备一个content.txt文件,内容如下 hello hello spark hello heima spark 接着将content.txt文件上传到hdfs hdfs dfs -mkdir /input hdfs dfs -put content.txt /input/ 4- 基于原生入门案例,修改文件路径为 hdfs://node1:8020/...
# 导包 import os import time from pyspark import SparkConf, SparkContext # 绑定指定的python解释器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' # 创建main函数 if __name__ == '__main__': print("Spark加强案例: 从hdfs中读取文件进行WordCount词频统计开始~") # - 1.创建SparkContext对象 conf = SparkConf().setAppName('my_pyspark_demo').setMaster('local[*]') sc = SparkContext(conf=conf) print(sc) # 默认<SparkContext master=local[*] appName=pyspark-shell> # - 2.数据输入 # 注意: 默认是linux本地路径省略了file:/// 如果是hdfs路径必须加hdfs://node1:8020/ # 注意: 需要修改为hdfs路径 init_rdd = sc.textFile('hdfs://node1:8020/input/content.txt') # - 3.数据处理 # - 3.1文本内容切分 flatmap_rdd = init_rdd.flatMap(lambda line: line.split(' ')) # - 3.2数据格式转换 map_rdd = flatmap_rdd.map(lambda word: (word, 1)) # - 3.3分组和聚合 reduce_rdd = map_rdd.reduceByKey(lambda agg, curr: agg + curr) # - 4.数据输出 # 注意: 此时结果也需要利用算子存储到hdfs中 reduce_rdd.saveAsTextFile('hdfs://node1:8020/day02_output') # - 5.释放资源 time.sleep(100) sc.stop() print("Spark加强案例: WordCount词频统计结果已经保存到hdfs,程序结束~")
可能报的错误:
原因: 输出路径已经存在
解决: 直接删除已经存在的路径即可
该错误需要查看Hadoop的源代码(131行):https://gitee.com/highmoutain/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java
链式编程: 就是一种代码简洁的写法。省略中间变量的声明,调用完一个算子后,继续调用其他算子
使用场景:当功能需求比较简单(代码比较少)的时候,推荐使用链式编程,可以让你的代码看起来更加简洁。
另外,算子返回值类型都是相同的时候才能够使用
# 导包 import os from pyspark import SparkConf, SparkContext # 绑定指定的python解释器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' # 创建main函数 if __name__ == '__main__': print("Spark加强案例: 从hdfs中读取文件进行WordCount词频统计开始~") # - 1.创建SparkContext对象 conf = SparkConf().setAppName('my_pyspark_demo').setMaster('local[*]') sc = SparkContext(conf=conf) print(sc) # 默认<SparkContext master=local[*] appName=pyspark-shell> # - 2.数据输入_处理_输出 result = sc.textFile('hdfs://node1:8020/input/content.txt') \ .flatMap(lambda line: line.split(' ')) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda agg, curr: agg + curr) print(result.collect()) # - 3.释放资源 sc.stop() print("Spark加强案例:程序结束~")
sortBy(参数1,参数2):
参数1: 自定义函数,通过函数指定按照谁来进行排序操作
参数2: (可选)boolean类型,表示是否为升序。默认为True,表示升序
sortByKey(参数1):
参数1: 可选的,boolean类型,表示是否为升序。默认为True 表示升序
top(N,函数):
参数N: 取RDD的前N个元素
参数函数:(可选)如果kv(键值对)类型,默认是根据key进行排序操作,如果想根据其他排序,可以定义函数指定
# 导包 from pyspark import SparkContext import os # 绑定指定的python解释器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' if __name__ == '__main__': # - 1.创建SparkContext对象 sc = SparkContext() # 默认master=local[*],默认appname=pyspark-shell print(sc) # - 2.数据输入 # - 3.数据处理 reduceRDD = sc.textFile('file:///export/data/spark_project/spark_base/content.txt') \ .flatMap(lambda line: line.split(
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。