赞
踩

Flink内置了一些Sink, 除此之外的Sink需要用户自定义!

hadoop+zookeeper+kafka
添加Kafka Connector依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
import com.alibaba.fastjson.JSON; import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; public class Flink_Sink_Kafka { public static void main(String[] args) throws Exception { //1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.读取端口数据并转化为JavaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop102", 9999) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] split = value.split(","); return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])); } }); //3.将数据转换为字符串写入Kafka Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); waterSensorDS.map(new MapFunction<WaterSensor, String>() { @Override public String map(WaterSensor value) throws Exception { return JSON.toJSONString(value); } }).addSink(new FlinkKafkaProducer<String>("test",new SimpleStringSchema(),properties)); //4.执行任务 env.execute(); } }
public class WaterSensor {
private String id;
private Long ts;
private Integer vc;
}
在集群上启动kafka的消费者,查看接收到的数据
kafka/bin 目录下输入命令:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test
结果显示:
目录下输入命令:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test
结果显示:

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。