当前位置:   article > 正文

Flink1.13中tableAPI对接数据源的source&sink_flink tableenv.connect已过时

flink tableenv.connect已过时

目录

过时的文件source

推荐的文件source

过时的文件Sink

推荐的文件Sink

过时的kafka-Source

推荐的kafka-Source

过时的kafka-Sink

推荐的kafka-Sink


在学习的过程中遇到了file和kafka数据源读写(sink和source)的问题,我使用的flink是1.13.3的版本,发现connect方法已经过时了,于是就去官网查看了一下推荐的方式,下面我会用代码来实现一下:

参考官方文档:CSVhttps://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/csv/Kafkahttps://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/

 本地数据源:

  1. id,timeStamp,temperature
  2. sensor_1,1,1
  3. sensor_1,2,2
  4. sensor_1,3,32
  5. sensor_1,4,4
  6. sensor_1,5,5
  7. sensor_1,22,6
  8. sensor_1,23,99
  9. sensor_2,25,13
  10. sensor_2,32,40.2
  11. sensor_3,40,42

过时的文件source

添加依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-csv</artifactId>
  4. <version>1.13.3</version>
  5. </dependency>
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  4. //step-1 表的元数据
  5. Schema schema = new Schema()
  6. .field("id", DataTypes.STRING())
  7. .field("timeStamp", DataTypes.BIGINT())
  8. .field("temperature", DataTypes.FLOAT());
  9. //step-2 连接文件,并创建一个临时表,其实就是一个动态表
  10. tEnv.connect(new FileSystem()
  11. .path("..\\resources\\sensor.txt"))
  12. .withSchema(schema)//定义表结构
  13. //fieldDelimiter是一行数据根据逗号划分字段
  14. //lineDelimiter每行数据根据换行符划分一条数据
  15. .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n"))
  16. .createTemporaryTable("sensor");//创建临时表
  17. //step-3 做成表对象,然后对动态表进行查询
  18. Table resTable = tEnv.from("sensor")
  19. .groupBy($("id"))
  20. .select($("id"), $("id").count().as("cnt"));
  21. tEnv.toRetractStream(resTable, Row.class).print("cnt");
  22. env.execute();

推荐的文件source

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  4. //step-1 编写数据source的sql语句
  5. String sourceDDL =
  6. "create table sensor_source(id STRING," +
  7. "`timeStamp` BIGINT," +
  8. "temperature DOUBLE" +
  9. ") with (" +
  10. "'connector'='filesystem'," +
  11. "'format'='csv'," +
  12. "'csv.field-delimiter'=','," + //每个字段按,切分且这里不用指定每行数据的切分,默认一行一行读
  13. "'path'='..\\resources\\sensor.txt'" +
  14. ")";
  15. tEnv.executeSql(sourceDDL);
  16. Table resTable = tEnv.from("sensor_source")
  17. .groupBy($("id"))
  18. .select($("id"), $("id").count().as("cnt"));
  19. tEnv.toRetractStream(resTable, Row.class).print("cnt");
  20. env.execute();

实现结果

  1. cnt> (true,+I[sensor_1, 1])
  2. cnt> (false,-U[sensor_1, 1])
  3. cnt> (true,+U[sensor_1, 2])
  4. cnt> (false,-U[sensor_1, 2])
  5. cnt> (true,+U[sensor_1, 3])
  6. cnt> (false,-U[sensor_1, 3])
  7. cnt> (true,+U[sensor_1, 4])
  8. cnt> (false,-U[sensor_1, 4])
  9. cnt> (true,+U[sensor_1, 5])
  10. cnt> (false,-U[sensor_1, 5])
  11. cnt> (true,+U[sensor_1, 6])
  12. cnt> (false,-U[sensor_1, 6])
  13. cnt> (true,+U[sensor_1, 7])
  14. cnt> (true,+I[sensor_2, 1])
  15. cnt> (false,-U[sensor_2, 1])
  16. cnt> (true,+U[sensor_2, 2])
  17. cnt> (true,+I[sensor_3, 1])

过时的文件Sink

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  4. //step-1 建造数据源
  5. DataStreamSource<SensorReading> waterSensorStream =
  6. env.fromElements(new SensorReading("sensor_1", 1000L, 10D),
  7. new SensorReading("sensor_1", 2000L, 20D),
  8. new SensorReading("sensor_2", 3000L, 30D),
  9. new SensorReading("sensor_1", 4000L, 40D),
  10. new SensorReading("sensor_1", 5000L, 50D),
  11. new SensorReading("sensor_2", 6000L, 60D));
  12. //step-2 处理数据
  13. Table resTable = tEnv.fromDataStream(waterSensorStream)
  14. .where($("id").isEqual("sensor_1"))
  15. .select($("id"), $("timeStamp"), $("temperature"));
  16. //step-3 配置元数据
  17. Schema schema = new Schema()
  18. .field("id", DataTypes.STRING())
  19. .field("timeStamp", DataTypes.BIGINT())
  20. .field("temperature", DataTypes.DOUBLE());
  21. tEnv.connect(new FileSystem().path("output/sink.txt"))
  22. .withFormat(new Csv().fieldDelimiter(','))
  23. .withSchema(schema)
  24. .createTemporaryTable("sensor_sink");
  25. resTable.executeInsert("sensor_sink");
  26. /*
  27. * explain
  28. * env.execute()方法会去分析代码,生成一些 graph,但是我们代码中没有调用算子,所以会报错,可以直接不用
  29. * */

推荐的文件Sink

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  4. //step-1 建造数据源
  5. DataStreamSource<SensorReading> waterSensorStream =
  6. env.fromElements(new SensorReading("sensor_1", 1000L, 10D),
  7. new SensorReading("sensor_1", 2000L, 20D),
  8. new SensorReading("sensor_2", 3000L, 30D),
  9. new SensorReading("sensor_1", 4000L, 40D),
  10. new SensorReading("sensor_1", 5000L, 50D),
  11. new SensorReading("sensor_2", 6000L, 60D));
  12. //step-2 处理数据
  13. Table resTable = tEnv.fromDataStream(waterSensorStream)
  14. .where($("id").isEqual("sensor_1"))
  15. .select($("id"), $("timeStamp"), $("temperature"));
  16. tEnv.executeSql(
  17. "create table file_sink(" +
  18. "`id` STRING," +
  19. "`timestamp` BIGINT," +
  20. "`temp` DOUBLE" +
  21. ")with(" +
  22. "'connector' = 'filesystem'," +
  23. "'format'='csv'," + //可以填写json
  24. //explain 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。
  25. "'csv.ignore-parse-errors' = 'true'," +
  26. //explain 是否允许忽略注释行(默认不允许),注释行以 '#' 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。
  27. "'csv.allow-comments' = 'true'," +
  28. "'path'='output/sink_new.txt'" +
  29. ")");
  30. resTable.executeInsert("file_sink");

实现结果

过时的kafka-Source

添加依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.12</artifactId>
  4. <version>1.13.3</version>
  5. </dependency>
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  4. //step-1 表的元数据
  5. Schema schema = new Schema()
  6. .field("id", DataTypes.STRING())
  7. .field("timeStamp", DataTypes.BIGINT())
  8. .field("temperature", DataTypes.FLOAT());
  9. //接收来自sensor主题的消息
  10. tEnv.connect(new Kafka()
  11. .version("universal") //kafka通用版本
  12. .topic("sensor")
  13. .startFromLatest()
  14. .property("group.id", "bigdata")
  15. .property("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092"))
  16. .withFormat(new Json())
  17. .withSchema(schema)
  18. .createTemporaryTable("sensor");
  19. Table table = tEnv.from("sensor")
  20. .groupBy($("id"))
  21. .select($("id"), $("id").count().as("cnt"));
  22. tEnv.toRetractStream(table, Row.class).print();
  23. env.execute();

推荐的kafka-Source

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  4. tEnv.executeSql(
  5. "create table kafka_source(" +
  6. "`id` STRING," +
  7. "`timestamp` BIGINT," +
  8. "`temp` DOUBLE" +
  9. ")with(" +
  10. "'connector' = 'kafka'," +
  11. "'topic' = 'sensor'," +
  12. "'properties.bootstrap.servers' = 'hadoop102:9092'," +
  13. "'properties.group.id' = 'bigdata'," +
  14. "'format' = 'json'," + //接收到的kafka都是json格式
  15. "'json.ignore-parse-errors' = 'true'" +
  16. ")");
  17. Table table = tEnv.from("kafka_source")
  18. .groupBy($("id"))
  19. .select($("id"), $("id").count().as("cnt"));
  20. tEnv.toRetractStream(table, Row.class).print();
  21. env.execute();

实现结果

这里注意kafka发送的数据格式必须是json格式

  1. (true,+I[sensor_1, 1])
  2. (false,-U[sensor_1, 1])
  3. (true,+U[sensor_1, 2])
  4. (true,+I[sensor_2, 1])

过时的kafka-Sink

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  4. //step-1 建造数据源
  5. DataStreamSource<SensorReading> waterSensorStream =
  6. env.fromElements(new SensorReading("sensor_1", 1000L, 10D),
  7. new SensorReading("sensor_1", 2000L, 20D),
  8. new SensorReading("sensor_2", 3000L, 30D),
  9. new SensorReading("sensor_1", 4000L, 40D),
  10. new SensorReading("sensor_1", 5000L, 50D),
  11. new SensorReading("sensor_2", 6000L, 60D));
  12. //step-2 处理数据
  13. Table sensorTable = tEnv.fromDataStream(waterSensorStream);
  14. Table resultTable = sensorTable
  15. .where($("id").isEqual("sensor_1"))
  16. .select($("id"), $("timeStamp"), $("temperature"));
  17. //step-3 配置元数据
  18. Schema schema = new Schema()
  19. .field("id", DataTypes.STRING())
  20. .field("timeStamp", DataTypes.BIGINT())
  21. .field("temperature", DataTypes.DOUBLE());
  22. tEnv.connect(new Kafka()
  23. .version("universal")
  24. .topic("sensor")
  25. .sinkPartitionerRoundRobin() // 分区轮询
  26. .property("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092"))
  27. .withFormat(new Json())
  28. .withSchema(schema)
  29. .createTemporaryTable("kafka_sink");
  30. resultTable.executeInsert("kafka_sink");

推荐的kafka-Sink

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  4. //step-1 建造数据源
  5. DataStreamSource<SensorReading> waterSensorStream =
  6. env.fromElements(new SensorReading("sensor_1", 1000L, 10D),
  7. new SensorReading("sensor_1", 2000L, 20D),
  8. new SensorReading("sensor_2", 3000L, 30D),
  9. new SensorReading("sensor_1", 4000L, 40D),
  10. new SensorReading("sensor_1", 5000L, 50D),
  11. new SensorReading("sensor_2", 6000L, 60D));
  12. //step-2 处理数据
  13. Table sensorTable = tEnv.fromDataStream(waterSensorStream);
  14. Table resultTable = sensorTable
  15. .where($("id").isEqual("sensor_1"))
  16. .select($("id"), $("timeStamp"), $("temperature"));
  17. tEnv.executeSql(
  18. "create table kafka_sink(" +
  19. "`id` STRING," +
  20. "`timestamp` BIGINT," +
  21. "`temp` DOUBLE" +
  22. ")with(" +
  23. "'connector' = 'kafka'," +
  24. "'topic' = 'sensor'," +
  25. "'properties.bootstrap.servers' = 'hadoop102:9092'," +
  26. "'properties.group.id' = 'bigdata'," +
  27. "'format' = 'json'," +
  28. "'json.ignore-parse-errors' = 'true'" +
  29. ")");
  30. /*
  31. * Conclusion
  32. * 对于source和sink上面的配置文件是没有办法影响的,source和sink的配置文件都是一样的;
  33. * 影响source和sink的就是下面调用的executeInsert("kafka_sink")和
  34. * tEnv.from("kafka_source"),这才是影响source和sink的方法。
  35. * */
  36. resultTable.executeInsert("kafka_sink");

实现结果

至此,实验结束,当作一个学习的记录吧。 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/44158
推荐阅读
相关标签
  

闽ICP备14008679号