赞
踩
package com.jin.demo; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; /** * @Author: J * @Version: 1.0 * @CreateTime: 2023/6/29 * @Description: 测试 **/ public class FlinkKafkaSink { public static void main(String[] args) throws Exception { // 创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度为1 env.setParallelism(1); // 添加数据源(CustomizeSource为自定义数据源,便于测试) SingleOutputStreamOperator<String> mapStream = env.addSource(new CustomizeSource()).map(bean -> bean.toString()); // 设置生产者事务超时时间 Properties properties = new Properties(); properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "10000"); // 构建KafkaSink KafkaSink<String> kafkaSink = KafkaSink.<String>builder() // 配置Kafka服务 .setBootstrapServers("lx01:9092") // 配置消息序列化类型 .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder() // 配置kafka topic信息 .setTopic("tpc-02") // 配置value序列化类型 .setValueSerializationSchema(new SimpleStringSchema()) .build() ) // 设置语义 .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 设置事务ID前缀 .setTransactionalIdPrefix("JL-") .build(); // 将结果输出到kafka mapStream.sinkTo(kafkaSink); env.execute("Kafka Sink"); } }
结果数据
[root@lx01 bin]# ./kafka-console-consumer.sh --bootstrap-server lx01:9092 --topic tpc-02
CustomizeBean(name=AAA-274, age=64, gender=W, hobbit=钓鱼爱好者)
CustomizeBean(name=AAA-973, age=45, gender=W, hobbit=钓鱼爱好者)
CustomizeBean(name=AAA-496, age=71, gender=W, hobbit=非遗文化爱好者)
CustomizeBean(name=AAA-263, age=45, gender=M, hobbit=天文知识爱好者)
CustomizeBean(name=AAA-790, age=77, gender=W, hobbit=书法爱好者)
CustomizeBean(name=AAA-806, age=38, gender=M, hobbit=非遗文化爱好者)
CustomizeBean(name=AAA-498, age=58, gender=M, hobbit=篮球运动爱好者)
CustomizeBean(name=AAA-421, age=63, gender=M, hobbit=书法爱好者)
CustomizeBean(name=AAA-938, age=56, gender=W, hobbit=乒乓球运动爱好者)
CustomizeBean(name=AAA-278, age=18, gender=M, hobbit=乒乓球运动爱好者)
CustomizeBean(name=AAA-614, age=74, gender=W, hobbit=钓鱼爱好者)
CustomizeBean(name=AAA-249, age=67, gender=W, hobbit=天文知识爱好者)
CustomizeBean(name=AAA-690, age=72, gender=W, hobbit=网吧战神)
CustomizeBean(name=AAA-413, age=69, gender=M, hobbit=美食爱好者)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。