赞
踩
最近想统计一些消息数据,原计划接收kakfa消息后自行统计然后存入数据库(统计相对比较简单,所以没有考虑使用Apache Storm), 突然想起来Kafka已经提供Kakfa Stream功能,于是开始看Kafka Stream。 下面的例子非常简单,只是在Kafka提供的例子上做了一点修改。
因为我们使用的Kafka Stream所以添加的依赖是kafka-streams, 不是以前经常使用的kafka-clients.
我的kafka安装在Windows 10上面(为了方便测试,平时在公司时可以直接连接到Kafka集群,开发时先在本地运行,于是在Windows10上安装了Kafka)。 版本kafka_2.12-1.0.0
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.2</version>
</dependency>
官方示例的代码在`这里
官方示例中向topic直接发送了温度数据。 我修改一下。 向topic发送json格式的数据,里面包含了温度和湿度。例如 {“temp”:19, “humidity”: 25}
注意:该代码只在官方示例上修该数据格式,其他部分和官方示例一样。启动程序后直接向topic iot-temperature发送格式为{“temp”:19, “humidity”: 25}的消息即可看到运行效果。
public class TemperatureDemo { // threshold used for filtering max temperature values private static final int TEMPERATURE_THRESHOLD = 20; // window size within which the filtering is applied private static final int TEMPERATURE_WINDOW_SIZE = 5; public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); StreamsBuilder builder = new StreamsBuilder(); //从topic iot-temperature读取设备发送的传感器信息 KStream<String, String> source = builder.stream("iot-temperature"); KStream<Windowed<String>, String> max = source // temperature values are sent without a key (null), so in order // to group and reduce them, a key is needed ("temp" has been chosen) .selectKey(new KeyValueMapper<String, String, String>() { @Override public String apply(String key, String value) { return "temp"; } }) .groupByKey() .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE))) .reduce(new Reducer<String>() { @Override public String apply(String value1, String value2) { System.out.println("value1=" + value1+ ", value2=" + value2); JSONObject json = JSON.parseObject(value1); Integer temperature = json.getInteger("temp"); if (temperature > Integer.parseInt(value2)) { return temperature.toString(); } else { return value2; } } }) .toStream() //过滤条件就是温度大于20 .filter(new Predicate<Windowed<String>, String>() { @Override public boolean test(Windowed<String> key, String value) { System.out.println("key=" + key+ ", value=" + value); JSONObject json = JSON.parseObject(value); Integer temperature = json.getInteger("temp"); return temperature > TEMPERATURE_THRESHOLD; } }); WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer()); WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE); Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer); // need to override key serde to Windowed<String> type max.to("iot-temperature-max", Produced.with(windowedSerde, Serdes.String())); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-temperature-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
Stream运行结果存放在topic iot-temperature-max中, 我们查看该topic的数据。 只有大于TEMPERATURE_THRESHOLD (20)被存入该topic
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。