赞
踩
- <dependency>
-
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>1.15.2</version>
-
- </dependency>
- public class Demo01KafkaSource {
- public static void main(String[] args) throws Exception{
- //构建环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- //构建kafka source 环境
-
- KafkaSource<String> source = KafkaSource.<String>builder()
- //指定broker列表
- .setBootstrapServers("master:9092,node1:9092,node2:9092")
- //指定topic
- .setTopics("bigdata")
- //指定消费组
- .setGroupId("my-group")
- //指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据
- .setStartingOffsets(OffsetsInitializer.earliest())
- //读取数据格式:
- .setValueOnlyDeserializer(new SimpleStringSchema())
- .build();
- //使用kafka数据源
- DataStreamSource<String> kafkaSourceDS = env.
- fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
-
- kafkaSourceDS.print();
- //启动flink
- env.execute();
-
- }
- }

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
- public class Demo02KafkaSink {
- public static void main(String[] args) throws Exception{
- //构建flink的环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //读取数据文件:
- DataStreamSource<String> studentDS = env.readTextFile("flink/data/students.txt");
-
- //创建kafka sink
- KafkaSink<String> sink = KafkaSink.<String>builder()
- //指定flink broker列表
- .setBootstrapServers("master:9092,node1:9092,node2:9092")
- //指定数据的格式:
- .setRecordSerializer(KafkaRecordSerializationSchema.builder()
- //指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic
- .setTopic("student")
- //指定数据的格式
- .setValueSerializationSchema(new SimpleStringSchema())
- .build()
- )
- //指定数据处理的语义:
- .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
- .build();
- //执行flink
- studentDS.sinkTo(sink);
- //构建flink环境
- env.execute();
- }
- }

kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic student
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。