赞
踩
kafka-topics.sh --zookeeper node01:2181 --create --topic teststream --replication-factor 2 --partitions 3
kafka-console-producer.sh --broker-list node01:9092 --topic teststream
package cn.hanjiaxiaozhi.stream; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Properties; /** * Author hanjiaxiaozhi * Date 2020/7/11 16:18 * Desc * 从teststream主题接收数据,并做单词统计 * 如: * 输入 kafka kafka spark spark spark , * 得出kafka 2 spark 3 */ public class WordCount { public static void main(String[] args) { //1.准备参数 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "MyWordCount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //2.进行流数据处理 StreamsBuilder builder = new StreamsBuilder(); //接收到一行行的数据,如kafka kafka spark spark spark KStream<String, String> textLines = builder.stream("teststream"); //对上面的数据按照单词进行切分 KTable<String, Long> wordCounts = textLines .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) //按照单词进行分组,这样相同的单词就到同一个组中了 .groupBy((key, word) -> word) //对各个组内的单词进行计数,并存在counts-store中 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")); //3.输出统计结果 wordCounts.foreach((k,v)-> System.out.println(k+" : "+v)); //4.创建并启动流程序 KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。