赞
踩
package com.shufang.source import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer object StreamFromKafka { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val properties: Properties = new Properties() //指定kafka的启动集群 properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") //指定消费者组 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flinkConsumer") //指定key的反序列化类型 properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName) //指定value的反序列化类型 properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName) //指定自动消费的策略 properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") println(properties) val kafkastream: DataStream[String] = env.addSource( new FlinkKafkaConsumer[String]("console-topic", new SimpleStringSchema(), properties) ) kafkastream.print() env.execute("kafkasource") } }
package com.shufang.flink.connectors import com.shufang.flink.bean.People import com.shufang.flink.examples.MyPeopleSource import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer object TestKafkaSink { def main(args: Array[String]): Unit = { //获取环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //获取数据流 val peopleStream: DataStream[People] = env.addSource(new MyPeopleSource()) peopleStream.map(_.toString).addSink( new FlinkKafkaProducer[String]( "localhost:9092", "console-topic", new SimpleStringSchema() ) ) //执行 env.execute("kafka_sink") } }
#通过命令行进行数据的娇艳
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic console-topic
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。