当前位置:   article > 正文

KafkaStreams_kafka streams csdn

kafka streams csdn

介绍

  • Kafka在0.10.0.0版本以前的定位是分布式,分区化的,带备份机制的日志提交服务。
  • 在这之前kafka也没有提供数据处理的服务。
  • 大家的流处理计算主要是还是依赖于Spark Streaming,Flink等流式处理框架。
  • 但是他们都离不开Kafka的消息中转,所以Kafka于0.10.0.0版本推出了自己的流处理框架,Kafka Streams。
  • Kafka的定位也正式变成为了Apache Kafka® is a distributed streaming platform,分布式流处理平台
  • 目前KafkaStreams在实际中用的不多
  • 流式数据计算方面用的更多的还是SparkStreaming和Flink,他们更专业

需求

  • 从teststream主题接收数据,并做单词统计
  • 如:
    • 输入 kafka kafka spark spark spark ,
    • 得出kafka 2 spark 3

代码演示

准备主题

kafka-topics.sh --zookeeper node01:2181 --create --topic teststream --replication-factor 2 --partitions 3
  • 1

启动控制台生产者发送单词

kafka-console-producer.sh --broker-list node01:9092 --topic teststream
  • 1
package cn.hanjiaxiaozhi.stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;

/**
 * Author hanjiaxiaozhi
 * Date 2020/7/11 16:18
 * Desc
 * 从teststream主题接收数据,并做单词统计
 * 如:
 * 输入 kafka kafka spark spark spark ,
 * 得出kafka 2   spark 3
 */
public class WordCount {
    public static void main(String[] args) {
        //1.准备参数
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "MyWordCount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        //2.进行流数据处理
        StreamsBuilder builder = new StreamsBuilder();
        //接收到一行行的数据,如kafka kafka spark spark spark
        KStream<String, String> textLines = builder.stream("teststream");
        //对上面的数据按照单词进行切分
        KTable<String, Long> wordCounts = textLines
                .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
                //按照单词进行分组,这样相同的单词就到同一个组中了
                .groupBy((key, word) -> word)
                //对各个组内的单词进行计数,并存在counts-store中
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
        //3.输出统计结果
        wordCounts.foreach((k,v)-> System.out.println(k+" : "+v));

        //4.创建并启动流程序
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/749896
推荐阅读
相关标签
  

闽ICP备14008679号