赞
踩
文件流
在文件流的应用场景中,需要编写Spark Streaming 程序,一直对文件系统的某个目录进行监听,一旦发现有新的文件生成,
Spark Streaming就会自动把文件内容读取过来,使用用户自定义的处理逻辑进行处理套接字流
Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应的处理
1、创建一个目录 logfile
cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir logfile
2、在另一个终端进入spark-shell,依次输入以下语句
scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._ scala> val ssc = new StreamingContext(sc,Seconds(20)) ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@2108d503 scala> val lines = ssc. | textFileStream("file:///usr/local/spark/mycode/streaming/logfile") lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@63cf0da6 scala> val words = lines.flatMap(_.split(" ")) words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@761b4581 scala> val wordCounts = words.map(x => (x,1)).reduceByKey(_ + _) wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@515576b0 scala> wordCounts.print() scala> ssc.start() # 输出以下: ------------------------------------------- Time: 1592364620000 ms -------------------------------------------
3、在回到刚刚创建文件夹的终端,在logfile目录下创建文件log.txt
spark sql
spark streaming
spark MLlib
4、再回到spark-shell的终端就可以看到词频统计的结果
-------------------------------------------
Time: 1592364620000 ms
------------------------------
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。