当前位置:   article > 正文

Flink—kafkaSink_flink kafkasink

flink kafkasink

Flink—kafkaSink

1.开发流程

在这里插入图片描述

特别说明:

Flink内置了一些Sink, 除此之外的Sink需要用户自定义!

在这里插入图片描述

2.环境介绍

hadoop+zookeeper+kafka

添加Kafka Connector依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3.代码

import com.alibaba.fastjson.JSON;
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class Flink_Sink_Kafka {
    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.读取端口数据并转化为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop102", 9999)
                .map(new MapFunction<String, WaterSensor>() {

                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new WaterSensor(split[0],
                                Long.parseLong(split[1]),
                                Integer.parseInt(split[2]));
                    }
                });


        //3.将数据转换为字符串写入Kafka
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        waterSensorDS.map(new MapFunction<WaterSensor, String>() {

            @Override
            public String map(WaterSensor value) throws Exception {
                return JSON.toJSONString(value);
            }
        }).addSink(new FlinkKafkaProducer<String>("test",new SimpleStringSchema(),properties));

        //4.执行任务
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
public class WaterSensor {

    private String id;
    private Long ts;
    private Integer vc;

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在集群上启动kafka的消费者,查看接收到的数据

kafka/bin 目录下输入命令:

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test
  • 1

结果显示:

目录下输入命令:

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test
  • 1

结果显示:

在这里插入图片描述

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/article/detail/44093
推荐阅读
相关标签
  

闽ICP备14008679号