赞
踩
- 1、使用Table api必须要添加Flink或者Blink的计划器。
- 2、Flink1.10(含)之前默认使用flink计划器,flink1.11(含)之后默认使用blink计划器。
- <!--flink 1.11及之后使用的都是blink的计划器,这儿引入的也是blink的-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!--flink 1.10(含)之前默认的计划器-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.12</artifactId>
- <version>${flink.version}</version>
- package com.atguigu.GTable_api_Flink_sql;
-
- import com.atguigu.Zbeans.SensorReading;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.EnvironmentSettings;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.java.StreamTableEnvironment;
- import org.apache.flink.types.Row;
-
- /**
- * Table api&Flink sql第一课:基于dataStream的Table api&Flink sql
- * 1、需要引入flink的计划器的依赖
- * flink1.10(含)之前默认使用flink计划器,flink1.11之后使用blink的计划器
- * 2、需要创建表的执行环境。
- * 3、Table api实际上是基于DSL语法来处理数据的,每次操作都返回一个Table对象。
- * 4、tableEnv.fromDataStream得到的表,必须注册成视图才能使用SQL api.
- * SQL api的操作后同样返回一张表。
- * 5、输出表对象必须转换成DataStream对象才行,Row对象导包导的是flink.types.Row包,别导错了。
- *
- */
- public class AFirstExample {
- public static void main(String[] args) throws Exception{
- //加载环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner() //使用blink计划器 useOldPlanner()使用的就是flink的计划器,也需要依赖
- .inStreamingMode() //流模式
- .build();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings); //创建表的执行环境
-
- //读取数据包装成pojo类型
- DataStreamSource<String> inputStream = env.readTextFile("G:\\SoftwareInstall\\idea\\project\\UserBehaviorAnalysis\\BasicKnowledge\\src\\main\\resources\\sensor.txt");
- SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
- String[] fields = line.split(",");
- return new SensorReading(new String(fields[0]), new Long(fields[1]), new Double(fields[2]));
- });
-
- //基于流创建一张表
- Table inputTable = tableEnv.fromDataStream(dataStream);
-
- //table api处理数据
- Table resultTable1 = inputTable.select("id,temperature")
- .where("id = 'sensor_1'");
-
- //flink sql 处理数据
- //fromDataStream得到的表,必须注册成视图才能使用SQL api
- tableEnv.createTemporaryView("sensor",inputTable);
- Table resultTable2 = tableEnv.sqlQuery("select id,temperature from sensor where id='sensor_1'");
-
- //转成流输出,Row导包导的是flink.types.Row,别导错了
- //如果Table api或Flink sql有聚合操作,且聚合操作非内窗口聚合,则需要使用toRetractStream
- //toRetractStream会将一次更新转换成一次删除和一次新增,删除在数据头部添加false,新增添加true.
- tableEnv.toAppendStream(resultTable1,Row.class).print("table api处理数据");
- tableEnv.toAppendStream(resultTable2,Row.class).print("flink sql处理数据");
-
- //执行
- env.execute("基于DataStream的table api和flink sql");
- }
- }
- package com.atguigu.GTable_api_Flink_sql;
-
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.DataTypes;
- import org.apache.flink.table.api.EnvironmentSettings;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.java.StreamTableEnvironment;
- import org.apache.flink.table.descriptors.Csv;
- import org.apache.flink.table.descriptors.Kafka;
- import org.apache.flink.table.descriptors.Schema;
-
-
-
- /**
- * 使用纯粹的Table api&Flink sql来完成一个Test
- * 从文件、kafka读取数据,输出到文件或kafka。
- * 直接读取数据源再通过createTemporaryTable得到的表:
- * Flink sql中直接使用,table api则还需要先tableEnv.from("inputTable")才可以使用DSL语法进行操作。
- * 写出到文件和kafka都不支持非窗口内聚合操作,不能用聚合结果表调用insertInto进行输出。
- * 除非聚合是窗口内的聚合。
- * 若要突破这一限制,则可以输出到ES或者MySQL。
- * 从kafka读取数据输入到kafka时的注意事项:
- * kafka连接器依赖中artifactId如果是flink-connector-kafka_2.12,则为通用连接器,
- * 无论是读取kafka还是写入kafka,version方法中版本应该写“universal”。
- * kafka连接器依赖中artifactId如果是flink-connector-kafka-0.11_2.12,则为0.11版的连接器。
- * 无论是读取kafka还是写入kafka,version方法中版本应该写“0.11”,当然也可以写0.10,高版本兼容低版本。
- * 启动kafka:
- * cd /opt/apps/kafka_2.11-0.11.0.3/bin/ && zkServer.sh start && kafka-server-start.sh ../config/server.properties
- * 启动生产者:
- * kafka-console-producer.sh -broker-list Linux001:9092 --topic topic_producer
- * 启动消费者:
- * kafka-console-consumer.sh --bootstrap-server Linux001:9092 --topic topic_consumer
- */
- public class BPureTableApi {
- public static void main(String[] args) throws Exception{
- //加载环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode().build();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings); //创建表的执行环境
-
- // /**
- // * 读取文件注册成表(flink1.12已经废弃了这种方式)
- // * 这样注册的表在Flink sql中可以直接使用,但在table api中使用经过tableEnv.from("inputTable")才行
- // */
- // String inpath="G:\\SoftwareInstall\\idea\\project\\UserBehaviorAnalysis\\BasicKnowledge\\src\\main\\resources\\sensor.txt";
- // tableEnv.connect(new FileSystem().path(inpath))
- // .withFormat(new Csv()) //是需要加入csv依赖的
- // .withSchema(new Schema() //字段名可以更改,顺序不能改变
- // .field("id", DataTypes.STRING())
- // .field("timestamp",DataTypes.BIGINT())
- // .field("temperature",DataTypes.DOUBLE())
- // ).createTemporaryTable("inputTable");
- /**
- * 读取kafka数据
- * 这样注册的表在Flink sql中可以直接使用,但在table api中使用经过tableEnv.from("inputTable")才行
- * flink table api连接kafka的属性配置中zookeeper.connect和bootstrap.servers都要配置,简直神奇
- */
- tableEnv.connect(new Kafka()
- .version("universal") //kafka通用连接器版本
- .topic("topic_producer")
- .property("zookeeper.connect","192.168.149.131:2181")
- .property("bootstrap.servers","192.168.149.131:9092")
- )
- .withFormat(new Csv()) //解析格式,有些格式是需要导依赖的
- .withSchema(new Schema() //字段名可以更改,顺序不能改变
- .field("id", DataTypes.STRING())
- .field("timestamp",DataTypes.BIGINT())
- .field("temperature",DataTypes.DOUBLE())
- ).createTemporaryTable("inputTable");
-
-
- //使用Table api操作
- Table inputTable=tableEnv.from("inputTable");
- Table Ttable=inputTable.select("id,temperature")
- .where("id = 'sensor_1'");
-
- //使用Flink sql操作
- Table Stable=tableEnv.sqlQuery("select id,avg(temperature) as avg_temp from inputTable group by id");
-
- // /**
- // * 将结果数据注册成表,然后输出到文件中
- // * withschema的字段要和Table api&Flink sql查询字段一致
- // */
- // String outpath="G:\\SoftwareInstall\\idea\\project\\UserBehaviorAnalysis\\BasicKnowledge\\src\\main\\resources\\sensor_out.txt";
- // tableEnv.connect(new FileSystem().path(outpath))
- // .withFormat(new Csv()) //是需要加入csv依赖的
- // .withSchema(new Schema() //字段名可以更改,顺序不能改变
- // .field("id", DataTypes.STRING())
- // .field("temp",DataTypes.DOUBLE()))
- // .createTemporaryTable("outputTable");
-
- /**
- * 将结果数据注册成表,然后写入到kafka中
- * withschema的字段要和Table api&Flink sql查询字段一致
- */
- tableEnv.connect(new Kafka()
- .version("universal") //kafka通用连接器版本
- .topic("topic_consumer")
- .property("zookeeper.connect","192.168.149.131:2181")
- .property("bootstrap.servers","192.168.149.131:9092")
- )
- .withFormat(new Csv()) //解析格式,有些格式是需要导依赖的
- .withSchema(new Schema() //字段名可以更改,顺序不能改变
- .field("id", DataTypes.STRING())
- .field("temp",DataTypes.DOUBLE())
- ).createTemporaryTable("outputTable");
-
- /**
- * 写出到文件,不支持非窗口内的聚合操作,不能用聚合结果表调用insertInto。除非聚合是窗口内聚合。
- * 写出到Kafka,不支持非窗口内的聚合操作,不能用聚合结果表调用insertInto。除非聚合是窗口内聚合。
- */
- Ttable.insertInto("outputTable");
- //执行
- env.execute("测试纯粹的Table api & Flink sql");
-
- }
- }
- 1、Flink支持三种输出模式
- 追加(append)模式:
- 只支持插入。
- 对应方法toAppendStream。
- 撤回(retract)模式:
- 支持插入、删除、更新。插入和删除都很单纯。
- 更新则会转换成一次撤回和一次插入,撤回的消息添加false前缀,插入的消息添加true前缀。
- 对应方法toRetractStream。
- 更新插入(upsert)模式:
- 支持插入、删除、更新。删除很单纯。
- 插入和更新都是upsert,需要指定key来判断当前写入操作是插入还是更新。
-
- 只有外部系统支持retract或upsert模式,才可以将聚合操作写出。
- 这样的系统有ES\MySQL\Oracle等。
- 依赖
- <!--Elasticsearch-connector连接器-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- tableEnv.connect(new Elasticsearch()
- .version("6") //ES版本
- .host("localhost",9200,"http")
- .index("id")
- .documentType("temp")
- )
- .inUpsertMode()
- .withFormat(new Json()) //解析格式,有些格式是需要导依赖的
- .withSchema(new Schema()
- .field("id",DataTypes.STRING())
- .field("temp",DataTypes.DOUBLE())
- )
- .createTemporaryTable("outputTable");
-
- aggtable.insertInto("outputTable");
- 依赖
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-jdbc_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
- String sinkDDL = "create table outputTable("+
- "id varchar(20) not null,"+
- "temp double(10,2) not null"+
- ") with ("+
- "'connector.type'='jdbc',"+
- "'connector.url'='jdbc:mysql://localhost:3306/test',"+
- "'connector.table'='id_count',"+
- "'connector.driver'='com.mysql.jdbc.Driver',"+
- "'connector.username'='root',"+
- "'connector.password'='123456')";
- tableEnv.sqlUpdate(sinkDDL);
- aggTable.insertInto("outputTable");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。