当前位置:   article > 正文

Spark Streaming 进阶与案例实战

sparkstreaming 实战例子

Spark Streaming 进阶与案例实战

Spark Streaming 进阶与案例实战

1.带状态的算子: UpdateStateByKey

2.实战:计算到目前位置累积出现的单词个数写入到MySql中

  • 1.create table
  1. CREATE TABLE `wordcount` (
  2. `word` VARCHAR(50) NOT NULL,
  3. `count` INT(11) NOT NULL,
  4. PRIMARY KEY (`word`)
  5. )
  6. COMMENT='单词统计表'
  7. COLLATE='utf8mb4_german2_ci'
  8. ENGINE=InnoDB ;
  • 2.代码实现
  1. /**
  2. * <p>
  3. * 使用Spark Streaming完成有状态统计,并存到mysql中
  4. * </p>
  5. */
  6. object ForeachRDDApp {
  7. def main(args: Array[String]): Unit = {
  8. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
  9. val ssc = new StreamingContext(sparkConf, Seconds(5))
  10. val lines = ssc.socketTextStream("192.168.37.128", 6789)
  11. val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
  12. // -----error------Task not serializable
  13. // result.foreachRDD(rdd => {
  14. // val conn = createConnection()
  15. // rdd.foreach { record =>
  16. // val sql = "INSERT INTO wordcount(word, count) " +
  17. // "VALUES('" + record._1 + "', '" + record._2 + "')"
  18. // conn.createStatement().execute(sql)
  19. // }
  20. // })
  21. result.print()
  22. result.foreachRDD {rdd =>
  23. rdd.foreachPartition {partitionOfRecords =>
  24. if (partitionOfRecords.nonEmpty) {
  25. val conn = createConnection()
  26. partitionOfRecords.foreach {pair =>
  27. val sql = "INSERT INTO wordcount(word, count) " +
  28. "VALUES('" + pair._1 + "', '" + pair._2 + "')"
  29. conn.createStatement().execute(sql)
  30. }
  31. conn.close()
  32. }
  33. }
  34. }
  35. ssc.start()
  36. ssc.awaitTermination()
  37. }
  38. def createConnection (): Connection = {
  39. Class.forName("com.mysql.jdbc.Driver")
  40. DriverManager.getConnection("jdbc:mysql:///spark_stream", "root", "tiger")
  41. }
  42. }
  • 3.存在的问题
    • 1.对于已有的数据做更新,而是所有的数据均为insert
      • 改进思路
        • 1.在插入数据前先判断单词是否存在,如果存在就update,不存在则insert.
        • 2.工作中:HBase/Redis
    • 2.每个RDD的partition创建connection,建议改成连接池

3.基于window的统计

  • window: 定时的进行一个时间段内的数据处理

1490928-20181029222817905-1791083727.png

  • window length: The duration of the window (3 in the figure). 窗口的长度,窗口长度是3。
  • sliding interval: The interval at which the window operation s performed (2 in the figure). 窗口操作经过多久执行一次。
  • 这2个参数和我们的batch size有关系:倍数
  • 每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wordcount,
  • 每个 sliding interval 执行 window length 的数据
TransformationMeaning
window(windowLength, slideInterval)Return a new DStream which is computed based on windowed batches of the source DStream.
countByWindow(windowLength, slideInterval)Return a sliding window count of elements in the stream.
reduceByWindow(func, windowLength, slideInterval)Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel.
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])A more efficient version of the above
countByValueAndWindow(windowLength, slideInterval, [numTasks])When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.

4.实战:黑名单过滤

  • 1.实现思路
  1. 1. 访问日志 ===> DStream
  2. 20180808,zs
  3. 20180808,ls
  4. 20180808,ww
  5. map to (zs, 20180808,zs)(ls, 20180808,ls)(ww, 20180808,ww)
  6. 2. 黑名单列表(一般存在数据库) ===> RDD
  7. zs
  8. ls
  9. map to (zs: true), (ls: true)
  10. 3. 返回结果
  11. ===> 20180808,ww
  12. 4. left join
  13. (zs, [<20180808,zs>, <true>]) x
  14. (ls, [<20180808,ls>, <true>]) x
  15. (ww, [<20180808,ww>, <false>]) ===> tuple 1
  • 2.代码实现
  1. /**
  2. * <p>
  3. * 黑名单过滤
  4. * </p>
  5. */
  6. object TransformApp {
  7. def main(args: Array[String]): Unit = {
  8. //1. 创建spark conf配置
  9. val sparkConf = new SparkConf()
  10. .setMaster("local[2]")
  11. .setAppName("NetworkWordCount")
  12. //2. 创建StreamingContext需要两个参数: SparkConf 和 batch interval
  13. val ssc = new StreamingContext(sparkConf, Seconds(5))
  14. val lines = ssc.socketTextStream("192.168.37.128", 6789)
  15. //3. 构建黑名单
  16. val blacklist = List("zs", "ls")
  17. val blackRDD = ssc.sparkContext.parallelize(blacklist)
  18. .map(x => (x,true))
  19. /**
  20. * map(x => (x.split(",")(1), x)) ##将输入数据20180808,zs按逗号拆分取出zs做为key构成元组("zs","20180808,zs")
  21. * transform ##将DStream转换成RDD
  22. * rdd.leftOuterJoin(blackRDD) ##两个元组left join后, 数据格式("zs",("20180808,zs",true))
  23. * filter(x => !x._2._2.getOrElse(false)) ##取出元组._2._2值为false或者空的数据
  24. * map(x => x._2._1) ##转换成需要的数据格式---->"20180808,zs"
  25. */
  26. val result = lines.map(x => (x.split(",")(1), x)).transform{rdd =>
  27. rdd.leftOuterJoin(blackRDD)
  28. .filter(x => !x._2._2.getOrElse(false))
  29. .map(x => x._2._1)
  30. }
  31. result.print()
  32. ssc.start()
  33. ssc.awaitTermination()
  34. }
  35. }

5.实战:Spark Streaming整合Spark SQL实战

posted @ 2018-10-29 22:29 eat.u 阅读( ...) 评论( ...) 编辑 收藏
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/975680
推荐阅读
相关标签
  

闽ICP备14008679号