赞
踩
Flink之流处理API之Sink
Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.10.1</version>
</dependency>
java代码
package com.zch.apitest.sink; import com.zch.apitest.beans.SensorReading; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; /** * Author: zhaoHui * Date: 2022/01/22 * Time: 14:27 * Description: */ public class SinkTest1_Kafka { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取文件 DataStream<String> inputStream = env.readTextFile("F:\\JAVA\\bigdata2107\\zch\\flink\\src\\main\\resources\\Sensor.txt"); SingleOutputStreamOperator<String> dataStream = inputStream.map(lines -> { String[] split = lines.split(","); return new SensorReading(split[0], new Long(split[1]), new Double(split[2])).toString(); }); dataStream.addSink(new FlinkKafkaProducer<String>("zhaohui01:9092","sinktest",new SimpleStringSchema())); dataStream.print(); env.execute(); } }
启动zookeeper
zookeeper-server-start.sh config/zookeeper.properties
启动kafka服务
kafka-server-start.sh config/server.properties
新建kafka生产者console
kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
新建kafka消费者console
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest
运行Flink程序,在kafka生产者console输入数据,查看kafka消费者console的输出结果
输入(kafka生产者console)
>sensor_1,1547718199,35.8
>sensor_6,1547718201,15.4
输出(kafka消费者console)
SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
这里Flink的作用相当于pipeline了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。