当前位置:   article > 正文

Flink笔记-sink写入kafka_flink sink kafka

flink sink kafka
  1. public class KafkaSinkStream {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(3);
  5. DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer010<String>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps()));
  6. // kafka配置
  7. Properties props = new Properties();
  8. props.put("bootstrap.servers", "192.168.3.160:9092");
  9. // flink写入kafka
  10. sourceStream.addSink(new FlinkKafkaProducer010<String>("out", new SimpleStringSchema(), props));
  11. env.execute();
  12. }
  13. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/44236
推荐阅读
相关标签
  

闽ICP备14008679号