当前位置:   article > 正文

Flink入门第十三课:从DataStream api过渡到使用Table api&Flink SQL读取数据源计算后写出_datastream 和 flink sql

datastream 和 flink sql

1、基于DataStream api的Table api&Flink SQL

  1. 1、使用Table api必须要添加Flink或者Blink的计划器。
  2. 2、Flink1.10(含)之前默认使用flink计划器,flink1.11(含)之后默认使用blink计划器。
  3. <!--flink 1.11及之后使用的都是blink的计划器,这儿引入的也是blink的-->
  4. <dependency>
  5. <groupId>org.apache.flink</groupId>
  6. <artifactId>flink-table-planner-blink_2.12</artifactId>
  7. <version>${flink.version}</version>
  8. </dependency>
  9. <!--flink 1.10(含)之前默认的计划器-->
  10. <dependency>
  11. <groupId>org.apache.flink</groupId>
  12. <artifactId>flink-table-planner_2.12</artifactId>
  13. <version>${flink.version}</version>
  1. package com.atguigu.GTable_api_Flink_sql;
  2. import com.atguigu.Zbeans.SensorReading;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.table.api.EnvironmentSettings;
  7. import org.apache.flink.table.api.Table;
  8. import org.apache.flink.table.api.java.StreamTableEnvironment;
  9. import org.apache.flink.types.Row;
  10. /**
  11. * Table api&Flink sql第一课:基于dataStream的Table api&Flink sql
  12. * 1、需要引入flink的计划器的依赖
  13. * flink1.10(含)之前默认使用flink计划器,flink1.11之后使用blink的计划器
  14. * 2、需要创建表的执行环境。
  15. * 3、Table api实际上是基于DSL语法来处理数据的,每次操作都返回一个Table对象。
  16. * 4、tableEnv.fromDataStream得到的表,必须注册成视图才能使用SQL api.
  17. * SQL api的操作后同样返回一张表。
  18. * 5、输出表对象必须转换成DataStream对象才行,Row对象导包导的是flink.types.Row包,别导错了。
  19. *
  20. */
  21. public class AFirstExample {
  22. public static void main(String[] args) throws Exception{
  23. //加载环境
  24. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25. env.setParallelism(1);
  26. EnvironmentSettings settings = EnvironmentSettings.newInstance()
  27. .useBlinkPlanner() //使用blink计划器 useOldPlanner()使用的就是flink的计划器,也需要依赖
  28. .inStreamingMode() //流模式
  29. .build();
  30. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings); //创建表的执行环境
  31. //读取数据包装成pojo类型
  32. DataStreamSource<String> inputStream = env.readTextFile("G:\\SoftwareInstall\\idea\\project\\UserBehaviorAnalysis\\BasicKnowledge\\src\\main\\resources\\sensor.txt");
  33. SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
  34. String[] fields = line.split(",");
  35. return new SensorReading(new String(fields[0]), new Long(fields[1]), new Double(fields[2]));
  36. });
  37. //基于流创建一张表
  38. Table inputTable = tableEnv.fromDataStream(dataStream);
  39. //table api处理数据
  40. Table resultTable1 = inputTable.select("id,temperature")
  41. .where("id = 'sensor_1'");
  42. //flink sql 处理数据
  43. //fromDataStream得到的表,必须注册成视图才能使用SQL api
  44. tableEnv.createTemporaryView("sensor",inputTable);
  45. Table resultTable2 = tableEnv.sqlQuery("select id,temperature from sensor where id='sensor_1'");
  46. //转成流输出,Row导包导的是flink.types.Row,别导错了
  47. //如果Table api或Flink sql有聚合操作,且聚合操作非内窗口聚合,则需要使用toRetractStream
  48. //toRetractStream会将一次更新转换成一次删除和一次新增,删除在数据头部添加false,新增添加true.
  49. tableEnv.toAppendStream(resultTable1,Row.class).print("table api处理数据");
  50. tableEnv.toAppendStream(resultTable2,Row.class).print("flink sql处理数据");
  51. //执行
  52. env.execute("基于DataStream的table api和flink sql");
  53. }
  54. }

2、纯粹的Table api&Flink SQL

读取文件、kafka计算后输出到文件、kafka

  1. package com.atguigu.GTable_api_Flink_sql;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.DataTypes;
  4. import org.apache.flink.table.api.EnvironmentSettings;
  5. import org.apache.flink.table.api.Table;
  6. import org.apache.flink.table.api.java.StreamTableEnvironment;
  7. import org.apache.flink.table.descriptors.Csv;
  8. import org.apache.flink.table.descriptors.Kafka;
  9. import org.apache.flink.table.descriptors.Schema;
  10. /**
  11. * 使用纯粹的Table api&Flink sql来完成一个Test
  12. * 从文件、kafka读取数据,输出到文件或kafka。
  13. * 直接读取数据源再通过createTemporaryTable得到的表:
  14. * Flink sql中直接使用,table api则还需要先tableEnv.from("inputTable")才可以使用DSL语法进行操作。
  15. * 写出到文件和kafka都不支持非窗口内聚合操作,不能用聚合结果表调用insertInto进行输出。
  16. * 除非聚合是窗口内的聚合。
  17. * 若要突破这一限制,则可以输出到ES或者MySQL。
  18. * 从kafka读取数据输入到kafka时的注意事项:
  19. * kafka连接器依赖中artifactId如果是flink-connector-kafka_2.12,则为通用连接器,
  20. * 无论是读取kafka还是写入kafka,version方法中版本应该写“universal”。
  21. * kafka连接器依赖中artifactId如果是flink-connector-kafka-0.11_2.12,则为0.11版的连接器。
  22. * 无论是读取kafka还是写入kafka,version方法中版本应该写“0.11”,当然也可以写0.10,高版本兼容低版本。
  23. * 启动kafka:
  24. * cd /opt/apps/kafka_2.11-0.11.0.3/bin/ && zkServer.sh start && kafka-server-start.sh ../config/server.properties
  25. * 启动生产者:
  26. * kafka-console-producer.sh -broker-list Linux001:9092 --topic topic_producer
  27. * 启动消费者:
  28. * kafka-console-consumer.sh --bootstrap-server Linux001:9092 --topic topic_consumer
  29. */
  30. public class BPureTableApi {
  31. public static void main(String[] args) throws Exception{
  32. //加载环境
  33. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  34. EnvironmentSettings settings = EnvironmentSettings.newInstance()
  35. .useBlinkPlanner()
  36. .inStreamingMode().build();
  37. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings); //创建表的执行环境
  38. // /**
  39. // * 读取文件注册成表(flink1.12已经废弃了这种方式)
  40. // * 这样注册的表在Flink sql中可以直接使用,但在table api中使用经过tableEnv.from("inputTable")才行
  41. // */
  42. // String inpath="G:\\SoftwareInstall\\idea\\project\\UserBehaviorAnalysis\\BasicKnowledge\\src\\main\\resources\\sensor.txt";
  43. // tableEnv.connect(new FileSystem().path(inpath))
  44. // .withFormat(new Csv()) //是需要加入csv依赖的
  45. // .withSchema(new Schema() //字段名可以更改,顺序不能改变
  46. // .field("id", DataTypes.STRING())
  47. // .field("timestamp",DataTypes.BIGINT())
  48. // .field("temperature",DataTypes.DOUBLE())
  49. // ).createTemporaryTable("inputTable");
  50. /**
  51. * 读取kafka数据
  52. * 这样注册的表在Flink sql中可以直接使用,但在table api中使用经过tableEnv.from("inputTable")才行
  53. * flink table api连接kafka的属性配置中zookeeper.connect和bootstrap.servers都要配置,简直神奇
  54. */
  55. tableEnv.connect(new Kafka()
  56. .version("universal") //kafka通用连接器版本
  57. .topic("topic_producer")
  58. .property("zookeeper.connect","192.168.149.131:2181")
  59. .property("bootstrap.servers","192.168.149.131:9092")
  60. )
  61. .withFormat(new Csv()) //解析格式,有些格式是需要导依赖的
  62. .withSchema(new Schema() //字段名可以更改,顺序不能改变
  63. .field("id", DataTypes.STRING())
  64. .field("timestamp",DataTypes.BIGINT())
  65. .field("temperature",DataTypes.DOUBLE())
  66. ).createTemporaryTable("inputTable");
  67. //使用Table api操作
  68. Table inputTable=tableEnv.from("inputTable");
  69. Table Ttable=inputTable.select("id,temperature")
  70. .where("id = 'sensor_1'");
  71. //使用Flink sql操作
  72. Table Stable=tableEnv.sqlQuery("select id,avg(temperature) as avg_temp from inputTable group by id");
  73. // /**
  74. // * 将结果数据注册成表,然后输出到文件中
  75. // * withschema的字段要和Table api&Flink sql查询字段一致
  76. // */
  77. // String outpath="G:\\SoftwareInstall\\idea\\project\\UserBehaviorAnalysis\\BasicKnowledge\\src\\main\\resources\\sensor_out.txt";
  78. // tableEnv.connect(new FileSystem().path(outpath))
  79. // .withFormat(new Csv()) //是需要加入csv依赖的
  80. // .withSchema(new Schema() //字段名可以更改,顺序不能改变
  81. // .field("id", DataTypes.STRING())
  82. // .field("temp",DataTypes.DOUBLE()))
  83. // .createTemporaryTable("outputTable");
  84. /**
  85. * 将结果数据注册成表,然后写入到kafka中
  86. * withschema的字段要和Table api&Flink sql查询字段一致
  87. */
  88. tableEnv.connect(new Kafka()
  89. .version("universal") //kafka通用连接器版本
  90. .topic("topic_consumer")
  91. .property("zookeeper.connect","192.168.149.131:2181")
  92. .property("bootstrap.servers","192.168.149.131:9092")
  93. )
  94. .withFormat(new Csv()) //解析格式,有些格式是需要导依赖的
  95. .withSchema(new Schema() //字段名可以更改,顺序不能改变
  96. .field("id", DataTypes.STRING())
  97. .field("temp",DataTypes.DOUBLE())
  98. ).createTemporaryTable("outputTable");
  99. /**
  100. * 写出到文件,不支持非窗口内的聚合操作,不能用聚合结果表调用insertInto。除非聚合是窗口内聚合。
  101. * 写出到Kafka,不支持非窗口内的聚合操作,不能用聚合结果表调用insertInto。除非聚合是窗口内聚合。
  102. */
  103. Ttable.insertInto("outputTable");
  104. //执行
  105. env.execute("测试纯粹的Table api & Flink sql");
  106. }
  107. }

3、输出模式

  1. 1、Flink支持三种输出模式
  2. 追加(append)模式:
  3. 只支持插入。
  4. 对应方法toAppendStream。
  5. 撤回(retract)模式:
  6. 支持插入、删除、更新。插入和删除都很单纯。
  7. 更新则会转换成一次撤回和一次插入,撤回的消息添加false前缀,插入的消息添加true前缀。
  8. 对应方法toRetractStream。
  9. 更新插入(upsert)模式:
  10. 支持插入、删除、更新。删除很单纯。
  11. 插入和更新都是upsert,需要指定key来判断当前写入操作是插入还是更新。
  12. 只有外部系统支持retract或upsert模式,才可以将聚合操作写出。
  13. 这样的系统有ES\MySQL\Oracle等。

4、输出到ES

  1. 依赖
  2. <!--Elasticsearch-connector连接器-->
  3. <dependency>
  4. <groupId>org.apache.flink</groupId>
  5. <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
  6. <version>${flink.version}</version>
  7. </dependency>
  1. tableEnv.connect(new Elasticsearch()
  2. .version("6") //ES版本
  3. .host("localhost",9200,"http")
  4. .index("id")
  5. .documentType("temp")
  6. )
  7. .inUpsertMode()
  8. .withFormat(new Json()) //解析格式,有些格式是需要导依赖的
  9. .withSchema(new Schema()
  10. .field("id",DataTypes.STRING())
  11. .field("temp",DataTypes.DOUBLE())
  12. )
  13. .createTemporaryTable("outputTable");
  14. aggtable.insertInto("outputTable");

 5、输出到MySQL

  1. 依赖
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-jdbc_2.12</artifactId>
  5. <version>1.10.1</version>
  6. </dependency>
  1. String sinkDDL = "create table outputTable("+
  2. "id varchar(20) not null,"+
  3. "temp double(10,2) not null"+
  4. ") with ("+
  5. "'connector.type'='jdbc',"+
  6. "'connector.url'='jdbc:mysql://localhost:3306/test',"+
  7. "'connector.table'='id_count',"+
  8. "'connector.driver'='com.mysql.jdbc.Driver',"+
  9. "'connector.username'='root',"+
  10. "'connector.password'='123456')";
  11. tableEnv.sqlUpdate(sinkDDL);
  12. aggTable.insertInto("outputTable");

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/970700
推荐阅读
相关标签
  

闽ICP备14008679号