当前位置:   article > 正文

Spark3.x入门到精通-阶段五(SparkStreaming详解原理&java&scala双语实战)_spark3 支持java语法嘛

spark3 支持java语法嘛

SparkStreaming

简介

Spark Streaming 是 Spark 的一个子模块,用于快速构建可扩展,高吞吐量,高容错的流处理程序。具有以下特点:

  • 通过高级 API 构建应用程序,简单易用;
  • 支持多种语言,如 Java,Scala 和 Python;
  • 良好的容错性,Spark Streaming 支持快速从失败中恢复丢失的操作状态;
  • 能够和 Spark 其他模块无缝集成,将流处理与批处理完美结合;
  • Spark Streaming 可以从 HDFS,Flume,Kafka,Twitter 和 ZeroMQ 读取数据,也支持自定义数据源。

DStream

Spark Streaming 提供称为离散流 (DStream) 的高级抽象,用于表示连续的数据流。 DStream 可以从来自 Kafka,Flume 和 Kinesis 等数据源的输入数据流创建,也可以由其他 DStream 转化而来。在内部,DStream 表示为一系列 RDD。 

Spark & Storm & Flink 

storm 和 Flink 都是真正意义上的流计算框架,但 Spark Streaming 只是将数据流进行极小粒度的拆分,拆分为多个批处理,使得其能够得到接近于流处理的效果,但其本质上还是批处理(或微批处理)。

开始使用

hello world

java版本

  1. public class JavaHelloWord {
  2. public static void main(String[] args) throws InterruptedException {
  3. SparkConf sparkConf = new SparkConf();
  4. //当使用Local模式启动Spark时,master URL必须为"local[n]",且"n"的值必须大于"receivers"的数量:
  5. //否则会出现Expecting 1 replicas with only 0 处理
  6. sparkConf.setMaster("local[2]")
  7. .setAppName("JavaHelloWord");
  8. //Durations.seconds(1)表示每一秒拉去一次数据
  9. //每一秒就是一个batch,每一个batch里面会有多个rdd
  10. JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
  11. JavaReceiverInputDStream<String> socketTextStream = javaStreamingContext.socketTextStream("master", 9999);
  12. //对于RDD里面每一个元素处理,这里的每一个元素是一行数据,其实中间的操作就是对于RDD里面元素的处理
  13. JavaDStream<String> wordDStream = socketTextStream.flatMap(new FlatMapFunction<String, String>() {
  14. @Override
  15. public Iterator<String> call(String s) throws Exception {
  16. System.out.println(s);
  17. return Arrays.asList(s.split(" ")).iterator();
  18. }
  19. });
  20. JavaPairDStream<String, Integer> wordCountDStream = wordDStream.mapToPair(new PairFunction<String, String, Integer>() {
  21. @Override
  22. public Tuple2<String, Integer> call(String word) throws Exception {
  23. return new Tuple2<String, Integer>(word, 1);
  24. }
  25. });
  26. JavaPairDStream<String, Integer> resultDStream = wordCountDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
  27. @Override
  28. public Integer call(Integer v1, Integer v2) throws Exception {
  29. return v1 + v2;
  30. }
  31. });
  32. resultDStream.foreachRDD(
  33. //这里对于每一batch所有的RDD进行分布式的发送到集群其他机器上面去执行
  34. rdd->{
  35. rdd.foreach(new VoidFunction<Tuple2<String, Integer>>() {
  36. //对于RDD里面每一个分区里面的数据进行分布式计算
  37. @Override
  38. public void call(Tuple2<String, Integer> result) throws Exception {
  39. System.out.println(result);
  40. }
  41. });
  42. }
  43. );
  44. javaStreamingContext.start();
  45. javaStreamingContext.awaitTermination();
  46. javaStreamingContext.close();
  47. }
  48. }

打包后执行(下面是client提交,就会在提交的机器上面启动Driver,那么打印的时候就会直接在提交的窗口打印数据)

  1. bin/spark-submit \
  2. --master yarn \
  3. --deploy-mode client \
  4. --driver-memory 512m \
  5. --executor-memory 512m \
  6. --executor-cores 1 \
  7. --num-executors 2 \
  8. --queue default \
  9. --class com.zhang.one.javasparkstreaming.JavaHelloWord \
  10. /home/bigdata/spark/spark-yarn/original-sparkstart-1.0-SNAPSHOT.jar

下面用cluster提交(启动的Driver在集群的随机一台启动)

  1. bin/spark-submit \
  2. --master yarn \
  3. --deploy-mode cluster \
  4. --driver-memory 512m \
  5. --executor-memory 512m \
  6. --executor-cores 1 \
  7. --num-executors 2 \
  8. --queue default \
  9. --class com.zhang.one.javasparkstreaming.JavaHelloWord \
  10. /home/bigdata/spark/spark-yarn/original-sparkstart-1.0-SNAPSHOT.jar

下面是查看输出数据的步骤

 

 

scala版本

  1. object ScalaStreaingWc {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setAppName("ScalaStreaingWc")
  5. .setMaster("local[2]")
  6. val sparkStreaming = new StreamingContext(conf, Seconds(1))
  7. sparkStreaming.socketTextStream("master",9999)
  8. .flatMap(_.split(" "))
  9. .map((_,1))
  10. .reduceByKey(_+_)
  11. .foreachRDD(rdd=>rdd.foreach(println))
  12. sparkStreaming.start()
  13. sparkStreaming.awaitTermination()
  14. sparkStreaming.stop()
  15. }
  16. }

SparkStreaming用Kafka作为数据源

Receiver模式(由于这种模式在spark3.x没有了所以就不写代码了)

关于上面为什么必须用local[*]图解,因为sparkstreaming会一直占用资源,必须有足够资源的前提它才能启动,下面以Reciver模式原理图解

Direct模式

java&scala双语实战

  • 有点kafka的partition和RDD的partiton有一一对应的关系
  • 能够保证spark仅仅消费一次kafka的数据

现在kafka创建对应的topic 

 ./kafka-topics.sh --bootstrap-server master:9092 --create --topic spark-streaming-topic --partitions 2 --replication-factor 2

创建一个生产者

./kafka-console-producer.sh --broker-list master:9092 --topic spark-streaming-topic

java版本 

  1. public class JavaKafkaDirectStream {
  2. public static void main(String[] args) throws InterruptedException {
  3. SparkConf sparkConf = new SparkConf();
  4. //对应kafka里面的分区一一对应
  5. sparkConf.setMaster("local[2]")
  6. .setAppName("JavaKafkaDirectStream");
  7. JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
  8. Map<String, Object> params = new HashMap<>();
  9. params.put("bootstrap.servers","master:9092");
  10. params.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  11. params.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  12. params.put("group.id","spark-streaming-group");
  13. params.put("auto.offset.reset","latest");
  14. params.put("enable.auto.commit",true);
  15. List<String> topics = Arrays.asList("spark-streaming-topic");
  16. JavaInputDStream<ConsumerRecord<String, String>> kafkaDStream = KafkaUtils.createDirectStream(
  17. javaStreamingContext,
  18. LocationStrategies.PreferConsistent(),
  19. ConsumerStrategies.Subscribe(topics, params));
  20. JavaDStream<String> kafkaValueDStream = kafkaDStream.map((Function<ConsumerRecord<String, String>, String>) v1 -> v1.value());
  21. JavaDStream<String> woldDStream = kafkaValueDStream.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator());
  22. JavaPairDStream<String, Integer> wordCountDStream = woldDStream.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<String, Integer>(s, 1));
  23. JavaPairDStream<String, Integer> resultDStream = wordCountDStream.reduceByKey((Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
  24. resultDStream.foreachRDD(rdd->rdd.foreach((VoidFunction<Tuple2<String, Integer>>) result -> System.out.println(result)));
  25. javaStreamingContext.start();
  26. javaStreamingContext.awaitTermination();
  27. javaStreamingContext.stop();
  28. }
  29. }

scala版本

  1. object ScalaKafkaDirectStream {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf = new SparkConf().setAppName("ScalaKafkaDirectStream").setMaster("local[2]")
  4. val streamingContext = new StreamingContext(sparkConf, Seconds(5))
  5. val kafkaParams = Map[String, Object](
  6. /*
  7. * 指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找其他 broker 的信息。
  8. * 不过建议至少提供两个 broker 的信息作为容错。
  9. */
  10. "bootstrap.servers" -> "master:9092",
  11. /*键的序列化器*/
  12. "key.deserializer" -> classOf[StringDeserializer],
  13. /*值的序列化器*/
  14. "value.deserializer" -> classOf[StringDeserializer],
  15. /*消费者所在分组的 ID*/
  16. "group.id" -> "spark-streaming-group",
  17. /*
  18. * 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
  19. * latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
  20. * earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
  21. */
  22. "auto.offset.reset" -> "latest",
  23. /*是否自动提交*/
  24. "enable.auto.commit" -> (true: java.lang.Boolean)
  25. )
  26. /*可以同时订阅多个主题*/
  27. val topics = Array("spark-streaming-topic")
  28. val stream = KafkaUtils.createDirectStream[String, String](
  29. streamingContext,
  30. /*位置策略*/
  31. /**
  32. * 位置策略
  33. * Spark Streaming 中提供了如下三种位置策略,用于指定 Kafka 主题分区与 Spark 执行程序 Executors 之间的分配关系:
  34. PreferConsistent : 它将在所有的 Executors 上均匀分配分区;
  35. PreferBrokers : 当 Spark 的 Executor 与 Kafka Broker 在同一机器上时可以选择该选项,它优先将该 Broker 上的首领分区分配给该机器上的 Executor;
  36. PreferFixed : 可以指定主题分区与特定主机的映射关系,显示地将分区分配到特定的主机.
  37. */
  38. PreferConsistent,
  39. /*订阅主题*/
  40. /**
  41. *
  42. * @param 需要订阅的主题的集合
  43. * @param Kafka 消费者参数
  44. * @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或 auto.offset.reset 属性的
  45. * def Subscribe[K, V](
  46. topics: ju.Collection[jl.String],
  47. kafkaParams: ju.Map[String, Object],
  48. offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { ... }
  49. */
  50. Subscribe[String, String](topics, kafkaParams)
  51. )
  52. /*打印输入流*/
  53. stream.map(_.value())
  54. .flatMap(_.split(" "))
  55. .map((_,1))
  56. .reduceByKey(_+_)
  57. .foreachRDD(rdd=>rdd.foreach(println))
  58. streamingContext.start()
  59. streamingContext.awaitTermination()
  60. }
  61. }

常用的transform使用

updateStateByKey

  • 对于每一个key维护一个状态
  • 注意使用它的时候必须开始checkpoint(因为如果数据丢失了,还可以在磁盘中恢复数据)

java版本

  1. public class JavaUpdateStateByKey {
  2. public static void main(String[] args) throws InterruptedException {
  3. SparkConf sparkConf = new SparkConf();
  4. sparkConf.setMaster("local[2]")
  5. .setAppName("JavaUpdateStateByKey");
  6. //解决权限问题
  7. System.setProperty("HADOOP_USER_NAME","bigdata");
  8. JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
  9. javaStreamingContext.checkpoint("hdfs://master:8020/sparkstreamingcheckpoing");
  10. JavaReceiverInputDStream<String> socketTextStream = javaStreamingContext.socketTextStream("master", 9998);
  11. JavaDStream<String> wordDStream = socketTextStream.flatMap(new FlatMapFunction<String, String>() {
  12. @Override
  13. public Iterator<String> call(String s) throws Exception {
  14. return Arrays.asList(s.split(" ")).iterator();
  15. }
  16. });
  17. //装换成key,value操作的
  18. JavaPairDStream<String, Integer> wordCountDStream = wordDStream.mapToPair(new PairFunction<String, String, Integer>() {
  19. @Override
  20. public Tuple2<String, Integer> call(String s) throws Exception {
  21. return new Tuple2<String, Integer>(s, 1);
  22. }
  23. });
  24. //对于每一个key维护一个状态
  25. wordCountDStream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){
  26. @Override
  27. public Optional<Integer> call(List<Integer> newValues, Optional<Integer> state) throws Exception {
  28. Integer initData=0;
  29. //如果有state了,就拿出这个state的值
  30. if(state.isPresent()){
  31. initData=state.get();
  32. }
  33. //这里对于每一batch的新的数据对于state进行累加
  34. for (Integer newValue : newValues) {
  35. initData+=newValue;
  36. }
  37. return Optional.of(initData);
  38. }
  39. }).foreachRDD(rdd->rdd.foreach((VoidFunction<Tuple2<String, Integer>>) result -> System.out.println(result)));
  40. javaStreamingContext.start();
  41. javaStreamingContext.awaitTermination();
  42. javaStreamingContext.stop();
  43. }
  44. }

scala版本

  1. object ScalaUpdateStateByKey {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setMaster("local[2]")
  5. .setAppName("ScalaUpdateStateByKey")
  6. //解决权限问题(主要一点要在得到sparkStreaming前配置)
  7. System.setProperty("HADOOP_USER_NAME", "bigdata")
  8. val sparkStreaming = new StreamingContext(conf, Seconds(1))
  9. sparkStreaming.checkpoint("hdfs://master:8020/scalasparkstreamingcheckpoing");
  10. sparkStreaming.socketTextStream("master",9998)
  11. .flatMap(_.split(" "))
  12. .map((_,1))
  13. .updateStateByKey((values:Seq[Int],state:Option[Int])=>{
  14. var res: Int = state.getOrElse(0)
  15. for (elem <- values) {
  16. res=res+elem
  17. }
  18. Option(res)
  19. }).foreachRDD(rdd=>rdd.foreach(println))
  20. sparkStreaming.start()
  21. sparkStreaming.awaitTermination()
  22. sparkStreaming.stop()
  23. }
  24. }

transform

使用它是因为DStream只能和DStream执行join操作,当我们想和其他的RDD使用的时候就需要用到它

下面我们写一个事实黑名单点击过滤案例

java版本实现

  1. public class JavaBlackList {
  2. public static void main(String[] args) throws InterruptedException {
  3. SparkConf sparkConf = new SparkConf();
  4. sparkConf.setMaster("local[2]")
  5. .setAppName("JavaHelloWord");
  6. //定义要过滤的黑名单的数据
  7. List<Tuple2<String, Boolean>> blackList = new ArrayList<>();
  8. blackList.add(new Tuple2<>("zhangsan",true));
  9. //得到每一批的数据的时候,我们为了测试把批次间隔调大一点
  10. JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(5));
  11. JavaPairRDD<String, Boolean> blackListRDD = javaStreamingContext.sparkContext().parallelizePairs(blackList);
  12. JavaReceiverInputDStream<String> socketTextStream = javaStreamingContext.socketTextStream("master", 9998);
  13. JavaPairDStream<String, String> socketTupleUserMsg = socketTextStream.mapToPair(new PairFunction<String, String, String>() {
  14. @Override
  15. public Tuple2<String, String> call(String s) throws Exception {
  16. String[] socketTextStringItem = s.split(" ");
  17. //因为假的输入的是name msg的形式
  18. String userName = socketTextStringItem[0];
  19. String msg = socketTextStringItem[1];
  20. return new Tuple2<String, String>(userName, msg);
  21. }
  22. });
  23. //第一个参数是装换成的RDD,第二个是要返回什么RDD
  24. socketTupleUserMsg.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
  25. @Override
  26. public JavaRDD<String> call(JavaPairRDD<String, String> socketKeyValue) throws Exception {
  27. //对于事实得到的用户数据和黑名单的数据进行左外连接的操作,
  28. JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> allUser = socketKeyValue.leftOuterJoin(blackListRDD);
  29. //过滤出黑名单的数据
  30. JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filterUser = allUser.filter(new Function<Tuple2<String, Tuple2<String, Optional<Boolean>>>, Boolean>() {
  31. @Override
  32. public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> v1) throws Exception {
  33. //这里是Optional的特殊处理,isPresent表示这个Optional是否有值
  34. //get()如果有值,并且左外连接以后匹配到了名字那么就是黑名单的数据我们直接过滤掉
  35. if (v1._2._2.isPresent() && v1._2._2.get()) {
  36. return false;
  37. }
  38. return true;
  39. }
  40. });
  41. //对于连接的数据,我们只要之前输入的数据就行,那么就通过map进行装换处理
  42. return filterUser.map(new Function<Tuple2<String, Tuple2<String, Optional<Boolean>>>, String>() {
  43. @Override
  44. public String call(Tuple2<String, Tuple2<String, Optional<Boolean>>> v1) throws Exception {
  45. return v1._2._1;
  46. }
  47. });
  48. }
  49. //得到最后过滤出来的结果
  50. }).foreachRDD(rdd->rdd.foreach(new VoidFunction<String>() {
  51. @Override
  52. public void call(String res) throws Exception {
  53. System.out.println(res);
  54. }
  55. }));
  56. javaStreamingContext.start();
  57. javaStreamingContext.awaitTermination();
  58. javaStreamingContext.stop();
  59. }
  60. }

scala版本实现

  1. object ScalaBlackList {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setMaster("local[2]")
  5. .setAppName("ScalaBlackList")
  6. //定义用户黑名单
  7. val blackUser = List(
  8. ("a", true)
  9. )
  10. val sparkStreaming = new StreamingContext(conf, Seconds(2))
  11. val context: SparkContext = sparkStreaming.sparkContext
  12. val blackUserRDD: RDD[(String, Boolean)] = context.parallelize(blackUser)
  13. val socketDS: ReceiverInputDStream[String] = sparkStreaming.socketTextStream("master", 9998)
  14. socketDS.map(item => {
  15. val socketText: Array[String] = item.split(" ")
  16. val userName: String = socketText(0)
  17. val msg: String = socketText(1)
  18. (userName, msg)
  19. }).transform(rdd => {
  20. val allUser: RDD[(String, (String, Option[Boolean]))] = rdd.leftOuterJoin(blackUserRDD)
  21. //对于所有的数据进行过滤
  22. allUser.filter(item=>{
  23. //表示在黑名单里面没有数据的时候就是填充一个false作为默认值
  24. !item._2._2.getOrElse(false)
  25. }).map(item=>{
  26. (item._1,item._2._1)
  27. })
  28. }).foreachRDD(rdd=>rdd.foreach(println))
  29. sparkStreaming.start()
  30. sparkStreaming.awaitTermination()
  31. sparkStreaming.stop()
  32. }
  33. }

window(滑动窗口)

案例

实时统计每10秒统计最近60秒的最多的搜索词top3

java版本

  1. public class JavaWinddow {
  2. public static void main(String[] args) throws InterruptedException {
  3. SparkConf sparkConf = new SparkConf();
  4. sparkConf.setAppName("JavaWinddow")
  5. .setMaster("local[2]");
  6. //每一秒得到一份batch数据,里面其实一堆RDD
  7. JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
  8. JavaReceiverInputDStream<String> socketTextStream = javaStreamingContext.socketTextStream("master", 9998);
  9. JavaPairDStream<String, Integer> wordCount = socketTextStream.mapToPair(new PairFunction<String, String, Integer>() {
  10. @Override
  11. public Tuple2<String, Integer> call(String s) throws Exception {
  12. return new Tuple2<String, Integer>(s, 1);
  13. }
  14. });
  15. //这里使用窗口的特性,每10秒统计最近60秒的数据,也就是上面的数据会一直累积,到达条件以后往下面执行
  16. //第一个是reduceByKey的操作函数,
  17. // 第二个值是窗口大小
  18. //第三个值是滑动距离
  19. JavaPairDStream<String, Integer> reduceWindowValue = wordCount.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
  20. @Override
  21. public Integer call(Integer v1, Integer v2) throws Exception {
  22. return v1 + v2;
  23. }
  24. }, Durations.seconds(60), Durations.seconds(10));
  25. JavaPairDStream<Integer, String> integerStringJavaPairDStream = reduceWindowValue.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
  26. @Override
  27. public Tuple2<Integer, String> call(Tuple2<String, Integer> initData) throws Exception {
  28. return new Tuple2<Integer, String>(initData._2, initData._1);
  29. }
  30. });
  31. //装换成RDD使用RDD的操作,对于60秒的数据进行排序
  32. JavaPairDStream<Integer, String> sortDS = integerStringJavaPairDStream.transformToPair(rdd -> rdd.sortByKey(false));
  33. sortDS.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
  34. @Override
  35. public Tuple2<String, Integer> call(Tuple2<Integer, String> res) throws Exception {
  36. return new Tuple2<String, Integer>(res._2, res._1);
  37. }
  38. }).foreachRDD(rdd->rdd.take(3).forEach(
  39. item->{
  40. System.out.println(item);
  41. }
  42. ));
  43. javaStreamingContext.start();
  44. javaStreamingContext.awaitTermination();
  45. javaStreamingContext.stop();
  46. }
  47. }

scala版本

  1. object ScalaWindow {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setAppName("ScalaWindow")
  5. .setMaster("local[2]")
  6. val sparkStreaming = new StreamingContext(conf, Seconds(1))
  7. val socketDS: ReceiverInputDStream[String] = sparkStreaming.socketTextStream("master", 9998)
  8. val reduceRDD: DStream[(String, Int)] = socketDS.map((_, 1))
  9. .reduceByKeyAndWindow((num1:Int,num2:Int)=>num2+num1, Seconds(60), Seconds(10))
  10. reduceRDD.map(item=>{
  11. (item._2,item._1)
  12. }).transform(rdd=>rdd.sortByKey(false)).foreachRDD(rdd=>rdd.take(3).foreach(println))
  13. sparkStreaming.start()
  14. sparkStreaming.awaitTermination()
  15. sparkStreaming.stop()
  16. }
  17. }

foreachRDD详解

案例

实时处理10秒统计一次最近60秒的搜索按搜索的次数倒叙

  1. public class JavaWinddow {
  2. public static void main(String[] args) throws InterruptedException {
  3. SparkConf sparkConf = new SparkConf();
  4. sparkConf.setAppName("JavaWinddow")
  5. .setMaster("local[2]");
  6. //每一秒得到一份batch数据,里面其实一堆RDD
  7. JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
  8. JavaReceiverInputDStream<String> socketTextStream = javaStreamingContext.socketTextStream("master", 9998);
  9. JavaPairDStream<String, Integer> wordCount = socketTextStream.mapToPair(new PairFunction<String, String, Integer>() {
  10. @Override
  11. public Tuple2<String, Integer> call(String s) throws Exception {
  12. return new Tuple2<String, Integer>(s, 1);
  13. }
  14. });
  15. //这里使用窗口的特性,每10秒统计最近60秒的数据,也就是上面的数据会一直累积,到达条件以后往下面执行
  16. //第一个是reduceByKey的操作函数,
  17. // 第二个值是窗口大小
  18. //第三个值是滑动距离
  19. JavaPairDStream<String, Integer> reduceWindowValue = wordCount.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
  20. @Override
  21. public Integer call(Integer v1, Integer v2) throws Exception {
  22. return v1 + v2;
  23. }
  24. }, Durations.seconds(60), Durations.seconds(10));
  25. JavaPairDStream<Integer, String> integerStringJavaPairDStream = reduceWindowValue.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
  26. @Override
  27. public Tuple2<Integer, String> call(Tuple2<String, Integer> initData) throws Exception {
  28. return new Tuple2<Integer, String>(initData._2, initData._1);
  29. }
  30. });
  31. //装换成RDD使用RDD的操作,对于60秒的数据进行排序
  32. JavaPairDStream<Integer, String> sortDS = integerStringJavaPairDStream.transformToPair(rdd -> rdd.sortByKey(false));
  33. sortDS.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
  34. @Override
  35. public Tuple2<String, Integer> call(Tuple2<Integer, String> res) throws Exception {
  36. return new Tuple2<String, Integer>(res._2, res._1);
  37. }
  38. }).foreachRDD(rdd -> rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {
  39. @Override
  40. public void call(Iterator<Tuple2<String, Integer>> res) throws Exception {
  41. //得到连接,每一个连接处理一个分区的数据
  42. Jedis jedis = RedisHelper.getJedis();
  43. Map<String, String> resMap = new HashMap<>();
  44. while (res.hasNext()) {
  45. Tuple2<String, Integer> value = res.next();
  46. String oldValue = resMap.get(value._1);
  47. if (oldValue == null) {
  48. oldValue = "0";
  49. }
  50. Integer newValue = Integer.valueOf(oldValue) + value._2;
  51. resMap.put(value._1, newValue + "");
  52. }
  53. if(!resMap.isEmpty()){
  54. jedis.del("hotWordTop3");
  55. jedis.hset("hotWordTop3", resMap);
  56. }
  57. //把连接还回去
  58. RedisHelper.removeJedis(jedis);
  59. }
  60. }));
  61. javaStreamingContext.start();
  62. javaStreamingContext.awaitTermination();
  63. javaStreamingContext.stop();
  64. }
  65. }

放入redis

  1. <dependency>
  2. <groupId>redis.clients</groupId>
  3. <artifactId>jedis</artifactId>
  4. <version>3.1.0</version>
  5. </dependency>

如果没有redis想快点使用的话下面提供docker安装

  1. sudo yum install docker
  2. sudo systemctl start docker
  3. sudo systemctl enable docker

时间同步操作(有些时间不一样会有问题)

  1. sudo yum install -y ntpdate
  2. sudo ntpdate 120.24.81.91
  3. #冲突时使用这个
  4. sudo sudo systemctl stop ntp

镜像加速(由于自己提供的经常过期,可以自己去找下)

  1. sudo systemctl daemon-reload
  2. sudo systemctl restart docker

启动一个redis容器并且开机自启

sudo docker run -itd --name spark-redis -p 6379:6379  --restart=always  redis:6.0

创建一个redis的工具类

  1. public class RedisHelper {
  2. private static JedisPool jedisPool=null;
  3. static {
  4. GenericObjectPoolConfig config = new GenericObjectPoolConfig();
  5. config.setMaxIdle(10);
  6. config.setMaxTotal(20);
  7. config.setMinIdle(5);
  8. jedisPool = new JedisPool(config, "node1");
  9. }
  10. public static Jedis getJedis() {
  11. Jedis resource = jedisPool.getResource();
  12. return resource;
  13. }
  14. public static void removeJedis(Jedis jedis){
  15. jedis.close();
  16. }
  17. }

结合SparkSql实战

案例要求

每10秒统计最近60秒用户点击top3的数据

  1. public class JavaStreamingSql {
  2. public static void main(String[] args) throws InterruptedException {
  3. SparkConf sparkConf = new SparkConf();
  4. sparkConf.setAppName("JavaStreamingSql")
  5. .setMaster("local[2]");
  6. //第一步得到sparkStreaming,我们可以认为DStream就是1s的一个RDD,但是事件上是产生了一个batch的数据
  7. //一个batch里面有很多RDD
  8. JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
  9. JavaReceiverInputDStream<String> socketTextStream = javaStreamingContext.socketTextStream("master", 9998);
  10. //得到的点击数据进行map
  11. JavaPairDStream<String, Integer> wordCountDStream = socketTextStream.mapToPair(new PairFunction<String, String, Integer>() {
  12. //得到的数据是每个用户名
  13. @Override
  14. public Tuple2<String, Integer> call(String userName) throws Exception {
  15. return new Tuple2<String, Integer>(userName, 1);
  16. }
  17. });
  18. //第二步开窗并求聚合操作
  19. JavaPairDStream<String, Integer> reduceByKeyAndWindowDStream = wordCountDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
  20. @Override
  21. public Integer call(Integer v1, Integer v2) throws Exception {
  22. return v1 + v2;
  23. }
  24. }, Durations.seconds(60), Durations.seconds(10));
  25. //第三步根据得到的RDD使用SparkSql进行分析
  26. reduceByKeyAndWindowDStream.foreachRDD(rdd->{
  27. //根据RDD得到Row
  28. JavaRDD<Row> userClickRowCount = rdd.map(new Function<Tuple2<String, Integer>, Row>() {
  29. @Override
  30. public Row call(Tuple2<String, Integer> rddItem) throws Exception {
  31. return RowFactory.create(
  32. rddItem._1,
  33. rddItem._2
  34. );
  35. }
  36. });
  37. //定义Row的元数据
  38. List<StructField> rowMeta = new ArrayList<>();
  39. rowMeta.add(DataTypes.createStructField("userName",DataTypes.StringType,true));
  40. rowMeta.add(DataTypes.createStructField("clickNum",DataTypes.IntegerType,true));
  41. StructType rowMetastructType = DataTypes.createStructType(rowMeta);
  42. //使得Row和structType建立联系
  43. SQLContext sqlContext = new SQLContext(javaStreamingContext.sparkContext());
  44. Dataset<Row> userClickDataFrame = sqlContext.createDataFrame(userClickRowCount, rowMetastructType);
  45. userClickDataFrame.registerTempTable("userClickData");
  46. //这里开始对于每10秒统计最近60秒的数据
  47. Dataset<Row> res = sqlContext.sql(
  48. "select userName,clickNum from " +
  49. "(" +
  50. "select userName,clickNum,row_number() over (order by clickNum desc) as rank from " +
  51. "userClickData" +
  52. ") where rank<=3"
  53. );
  54. res.show();
  55. });
  56. javaStreamingContext.start();
  57. javaStreamingContext.awaitTermination();
  58. javaStreamingContext.stop();
  59. }
  60. }

输出的结果是 

  1. +--------+--------+
  2. |userName|clickNum|
  3. +--------+--------+
  4. | c| 28|
  5. | a| 19|
  6. | f| 12|
  7. +--------+--------+

scala版本

  1. object ScalaSparkStreamSql {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setAppName("ScalaSparkStreamSql")
  5. .setMaster("local[2]")
  6. val sparkStreaming = new StreamingContext(conf, Seconds(1))
  7. sparkStreaming.socketTextStream("master", 9998)
  8. .map((_, 1))
  9. //每10秒得到最近60秒的数据,60是窗口的大小,10是滑动的长度
  10. .reduceByKeyAndWindow((num1: Int, num2: Int) => {
  11. num1 + num2
  12. }, Seconds(60), Seconds(10))
  13. .foreachRDD(rdd => {
  14. //得到Row
  15. val userClickRow: RDD[Row] = rdd.map(
  16. mapItem => {
  17. Row(mapItem._1, mapItem._2)
  18. }
  19. )
  20. //得到数据
  21. val userClickStructTyoe: StructType = StructType(
  22. Array(
  23. StructField("userName", DataTypes.StringType, true),
  24. StructField("clickNum", DataTypes.IntegerType, true)
  25. )
  26. )
  27. //得到DataFrame
  28. val sparkSession: SparkSession = SparkSession
  29. .builder()
  30. .config(conf)
  31. .getOrCreate()
  32. //开始查询数据
  33. val userClickDataFrame: DataFrame = sparkSession.createDataFrame(userClickRow, userClickStructTyoe)
  34. userClickDataFrame.createTempView("userClickData")
  35. sparkSession.sql(
  36. """
  37. |select userName,clickNum from
  38. |(
  39. | select userName,clickNum,row_number() over (order by clickNum desc) as rank from
  40. | userClickData
  41. |) where rank<=3
  42. |""".stripMargin
  43. ).show()
  44. //后面可以更具结果的数据自己处理保存到redis或者mysql
  45. })
  46. sparkStreaming.start()
  47. sparkStreaming.awaitTermination()
  48. sparkStreaming.stop()
  49. }
  50. }

输出结果

  1. +--------+--------+
  2. |userName|clickNum|
  3. +--------+--------+
  4. | a| 9|
  5. | v| 3|
  6. | c| 3|
  7. +--------+--------+

缓存与持久化机制

  • sparkstreaming默认对窗口的操作,还有upstateBykey开启的持久化的操作
  • 与RDD不同的是它默认都是会要序列化的

Sparkstreaming容错机制

分区容错

 可靠receiver

SparkStreaming架构原理

sparkStreaming架构原理图解

 接收数据的原理

写数据原理

 

SparkStreaming性能调优 

  • 对于receiver模式的处理,可以调高receiver的数量,提高并行度
  • 对于每一批的RDD进行调优,因为RDD的分区和每一个batch里面的block有关,比如batch的间隔是2s,也就是JavaStreamingContext(sparkConf, Durations.seconds(2)),这里就是设置拉去batch的时间,每一批batch里面有若干的block,每一个block产生的时间是200毫秒,每一个block对应RDD的一个分区,这里调优的地方就是如果你的CPU有8核,但是如果你每一批的数据就是5个block的话,那么就是没有充分的利用cpu我们这个时候就可以对于block产生的时间减少
  • 对于Kafka的Direct模式的话,我们只要设置的并行度和分区一样就可以了
  • 使用CMS垃圾回收器,减少gc的时间,提高batch的处理速度
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/824814
推荐阅读
相关标签
  

闽ICP备14008679号