当前位置:   article > 正文

Flink Sink之Kafka_flink sinkkafak

flink sinkkafak

Flink之流处理API之Sink

Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxxx)) 
  • 1

官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

img

5.7.1 Kafka

  1. pom依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
  2. java代码

    package com.zch.apitest.sink;
    
    import com.zch.apitest.beans.SensorReading;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    
    /**
     * Author: zhaoHui
     * Date: 2022/01/22
     * Time: 14:27
     * Description:
     */
    public class SinkTest1_Kafka {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 读取文件
            DataStream<String> inputStream = env.readTextFile("F:\\JAVA\\bigdata2107\\zch\\flink\\src\\main\\resources\\Sensor.txt");
    
            SingleOutputStreamOperator<String> dataStream = inputStream.map(lines -> {
                String[] split = lines.split(",");
                return new SensorReading(split[0], new Long(split[1]), new Double(split[2])).toString();
            });
    
            dataStream.addSink(new FlinkKafkaProducer<String>("zhaohui01:9092","sinktest",new SimpleStringSchema()));
    
            dataStream.print();
            env.execute();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    1. 启动zookeeper

      zookeeper-server-start.sh config/zookeeper.properties
      
      • 1
    2. 启动kafka服务

      kafka-server-start.sh config/server.properties
      
      • 1
    3. 新建kafka生产者console

      kafka-console-producer.sh --broker-list localhost:9092  --topic sensor
      
      • 1
    4. 新建kafka消费者console

      kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest
      
      • 1
    5. 运行Flink程序,在kafka生产者console输入数据,查看kafka消费者console的输出结果

      输入(kafka生产者console)

      >sensor_1,1547718199,35.8
      >sensor_6,1547718201,15.4
      
      • 1
      • 2

      输出(kafka消费者console)

      SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
      SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
      
      • 1
      • 2

    这里Flink的作用相当于pipeline了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/44218
推荐阅读
相关标签
  

闽ICP备14008679号