赞
踩
项目整体流程:
python /home/jackie/project_0907/generate_log.py
[jackie@hadoop102:project_0907]$ crontab -e
*/1 * * * * /home/jackie/project_0907/lgl.sh
#单机 zookeeper
/opt/module/zookeeper/bin/zkServer.sh start
#启动kafka服务
/opt/module/kafka/bin/kafka-server-start.sh \
-daemon /opt/module/kafka/config/server.properties
#启动flume
/opt/module/flume/bin/flume-ng agent \
--conf /opt/module/flume/conf/ \
--conf-file /home/jackie/project_0907/streaming_project.conf \
--name exec-memory-kafka \
-Dflume.root.logger=INFO,console
模拟消费,输出到控制台
#消费
[jackie@hadoop102:kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic streamingtopic
//1.初始化Spark配置信息 //打包时 将 setMaster("local[*]") 注释掉 val sparkConf = new SparkConf().setAppName("StreamCount").setMaster("local[*]") val sparkConf = new SparkConf().setAppName("CourseClickCount") //2.初始化SparkStreamingContext 实时数据分析环境对象 采集周期 60s val streamingContext = new StreamingContext(sparkConf, Seconds(60)) //3.从Kafka中采集数据 val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream( streamingContext, "hadoop102:2181", "lzou", Map("streamingtopic" -> 1) ) kafkaDStream.map(_._2).count().print() //输出 /* 10.63.98.87 2020-09-15 18:09:01 "GET /class/143.html HTTP/1.1" 404 - 143.55.98.124 2020-09-15 18:09:01 "GET /class/112.html HTTP/1.1" 200 http://www.baidu.com/s?wd=Hadoop基础 132.143.98.156 2020-09-15 18:09:01 "GET /class/128.html HTTP/1.1" 200 - 10.132.63.124 2020-09-15 18:09:01 "GET /class/128.html HTTP/1.1" 200 - 187.10.132.72 2020-09-15 18:09:01 "GET /course/list HTTP/1.1" 200 http://www.baidu.com/s?wd=Spark Streaming */
val cleanData: DStream[ClickLog] = kafkaDStream.map(_._2).map(line => { val fields: Array[String] = line.split("\t") val url: String = fields(2).split(" ")(1) var courseId = 0 // 10.55.187.87 2020-09-09 14:03:01 "GET /class/112.html HTTP/1.1" 404 - if (url.startsWith("/class")) { val courseIdHTML: String = url.split("/")(2) courseId = courseIdHTML.substring(0, courseIdHTML.lastIndexOf(".")).toInt } ClickLog(fields(0), DateUtils.parseToMinute(fields(1)), courseId, fields(3).toInt, fields(4)) }).filter(ClickLog => ClickLog.courseId != 0) cleanData.print() //输出 /** cleanData * ClickLog(30.87.55.46,20200915211301,143,404,-) * ClickLog(156.187.132.98,20200915211301,141,404,-) * ClickLog(30.55.63.167,20200915211301,143,500,http://search.yahoo.com/search?p=大数据面试) */
cleanData.map(x => {
(x.time.substring(0, 8) + "_" + x.courseId, 1)
}).reduceByKey(_ + _).foreachRDD(rdd => {
rdd.foreachPartition(partition => {
val buffer: ListBuffer[CourseClickCount] = new ListBuffer[CourseClickCount]
partition.foreach(pair => {
buffer.append(CourseClickCount(pair._1, pair._2))
})
CourseClickCountDAO.save(buffer)
})
})
// //ClickLog(10.46.187.63,20200915212801,143,404,http://www.baidu.com/s?wd=Hadoop基础) cleanData.map(x => { val referer: String = x.referer.replaceAll("//", "/") val splits: Array[String] = referer.split("/") var host = "" if (splits.length > 2) { host = splits(1) } (host, x.courseId, x.time) }).filter(_._1 != "").map(x => { (x._3.substring(0, 8) + "_" + x._1 + "_" + x._2, 1) }).reduceByKey(_ + _).foreachRDD(rdd => { rdd.foreachPartition(partition => { val list: ListBuffer[CourseSearchClickCount] = new ListBuffer[CourseSearchClickCount] partition.foreach(pair => { list.append(CourseSearchClickCount(pair._1, pair._2)) }) CourseSearchClickCountDAO.save(list) }) })
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.1</version> <configuration> <archive> <manifest> <!-- 主类信息 --> <mainClass>com.spark.SparkStreaming_Kafka</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
先启动hdfs zk
#启动spark
[jackie@hadoop102:spark]$ sbin/start-all.sh
# 启动spark-shell
[jackie@hadoop102 spark]$ bin/spark-shell
[jackie@hadoop102:spark]$ bin/spark-submit --master local[5] \
> --name CourseClickCount \
> --class com.spark.SparkStreaming_Kafka \
> /home/jackie/project_0907/spark-streaming-project-1.0-SNAPSHOT-jar-with-dependencies.jar
Spark Web UI : http://hadoop102:4040/
HDFS、ZK、Flume、Kafka、HBase、Spark
解决方案:
[jackie@hadoop102:zookeeper]$ bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] rmr /hbase
[jackie@hadoop102:~]$ hadoop fs -rm -r /hbase
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。