赞
踩
- public class KafkaSinkStream {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer010<String>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps()));
-
- // kafka配置
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.3.160:9092");
-
- // flink写入kafka
- sourceStream.addSink(new FlinkKafkaProducer010<String>("out", new SimpleStringSchema(), props));
-
- env.execute();
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。