赞
踩
数据处理流程:kafka–>spark streaming -->hbase
最近在做数据处理,但是通过java api进行处理的,目前想通过spark去做处理,这里记下了一个简单的实现,但是生产上肯定不是那么简单的.后面会说有哪些优化的点.
jdk 1.8
cdh:1.2.0-cdh5.16.1
scala:2.11.8
KAFKA-3.1.0-1.3.1.0
<properties> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.6.0-cdh5.7.0</hadoop.version> </properties> <repositories> <!--<repository>--> <!--<id>scala-tools.org</id>--> <!--<name>Scala-Tools Maven2 Repository</name>--> <!--<url>http://scala-tools.org/repo-releases</url>--> <!--</repository>--> <repository> <id>cloudera</id> <name>cloudera</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.16.1</version> </dependency> <!--scala depedency--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--kafka dependency--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--<dependency>--> <!--<groupId>org.apache.hbase</groupId>--> <!--<artifactId>hbase-build-configuration</artifactId>--> <!--<version>1.2.0-cdh5.16.1</version>--> <!--</dependency>--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.16.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.2.0-cdh5.16.1</version> <type>pom</type> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency>
package cn.zhangyu import cn.zhangyu.HbaseStreaming.createTable import cn.zhangyu.utils.JsonUitls import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Table} import org.apache.hadoop.hbase.util.Bytes import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.spark.internal.Logging import scala.collection.JavaConversions.mapAsScalaMap object KafkaStreaming2Hbase extends Logging{ def main(args: Array[String]): Unit = { //创建streamingcontext val sparkConf = new SparkConf().setAppName("KafkaStreaming2Hbase").setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Seconds(5)) //准备参数 val kafkaParams = Map[String,Object]( "bootstrap.servers" -> "hadoop001:9092,hadoop002:9092,hadoop003:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) //准备topics val topics = Array("test") //创建stream val stream = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) // column family val family = Bytes.toBytes("cf1") stream.foreachRDD(rdd => { rdd.foreachPartition(partitions =>{ //在每一个partition创建,否则会出现序列化异常问题 val table = createTable() try { partitions.foreach(row => { print("----------------" + row.value()) val map = JsonUitls.parseJson(row.value()) print("map" + map) //转化为map val javaMap = mapAsScalaMap(map) println("java Map----------" + javaMap) println("id=====" + javaMap.get("id").get.toString) javaMap.foreach(x => { //rowkey val put = new Put(Bytes.toBytes(javaMap.get("id").get.toString)) x._1 match { case "flag" => (put.addImmutable(family, Bytes.toBytes("flag"), Bytes.toBytes(javaMap.get("flag").get.toString)), table.put(put)) case "libid" => (put.addImmutable(family, Bytes.toBytes("libid"), Bytes.toBytes(javaMap.get("libid").get.toString)), table.put(put)) case "idOpType" => (put.addImmutable(family, Bytes.toBytes("idOpType"), Bytes.toBytes(javaMap.get("idOpType").get.toString)), table.put(put)) case "ATTR" => (put.addImmutable(family, Bytes.toBytes("ATTR"), Bytes.toBytes(javaMap.get("ATTR").get.toString)), table.put(put)) case _ => println("x:" + x) } }) }) }catch { case e: Exception => logError("写入HBase失败,{}" + e.getMessage) } }) }) ssc.start() ssc.awaitTermination() } def createTable (): Table = { val hbaseConf = HBaseConfiguration.create () hbaseConf.set ("hbase.zookeeper.quorum", "hadoop001,hadoop002,hadoop003") hbaseConf.set ("hbase.zookeeper.property.clientPort", "2181") hbaseConf.set ("hbase.defaults.for.version.skip", "true") val conn = ConnectionFactory.createConnection (hbaseConf) conn.getTable (TableName.valueOf ("aaabbb") ) } }
package cn.zhangyu.utils; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; public class JsonUitls { static Logger logger = LoggerFactory.getLogger(JsonUitls.class); public static Map<String,Object> parseJson(String s){ //{"id":"test","age":18,"name":"lisi","sex":"gender","libid":"001","idOpType":"1","flag":"1"} Map<String,Object> map = new HashMap<>(); Map<String,Object> attrMap = new HashMap<String, Object>(); if (s != null && !s.isEmpty()){ map = JSONObject.parseObject(s, new TypeReference<Map<String, Object>>() { }); } //遍历kafka 中的数据 for (Map.Entry<String, Object> entry : map.entrySet()) { //过滤id,libid,flag,idOpType字段 if (!"id".equals(entry.getKey()) && !"libid".equals(entry.getKey()) && !"flag".equals(entry.getKey()) && !"idOpType".equals(entry.getKey())) { String key = entry.getKey(); attrMap.put(key, entry.getValue()); } } //转化为json String attrString = new JSONObject(attrMap).toJSONString(); Map<String,Object> result = new HashMap<String, Object>(); Object id = map.get("id"); Object libid = map.get("libid"); if (id == null || libid == null) { logger.error("id is null or libid is null"); return null; } result.put("id",id.toString()); result.put("libid",libid.toString()); result.put("flag",map.get("flag") == null ? "0":map.get("flag")); result.put("idOpType",map.get("idOpType") == null ? "1":map.get("idOpType")); result.put("ATTR",attrString); return result; } }
例子很简单,启动程序可以再本地启动,或者通过spark-submit去提交
spark-submit \
--class cn.zhangyu.KafkaStreaming2Hbase \
--master local[2] \
/home/hadoop/lib/spark_streaming-1.0-SNAPSHOT.jar \ args0 args1 args2 .... (参数在最后写,空格分隔)
这个例子中其实存在很多问题,这里简单说两个:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。