赞
踩
Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。目前功能尚未完善,处于活跃的开发阶段。
Table API是一套内嵌在Java和Scala语言中的查询API,它允许我们以非常直观的方式,组合来自一些关系运算符的查询(比如select、filter和join)。而对于Flink SQL,就是直接可以在代码中写SQL,来实现一些查询(Query)操作。Flink的SQL支持,基于实现了SQL标准的Apache Calcite(Apache开源SQL解析工具)。
无论输入是批输入还是流式输入,在这两套API中,指定的查询都具有相同的语义,得到相同的结果。
取决于你使用的编程语言,比如这里,我们选择 Scala API 来构建你的 Table API 和 SQL 程序:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>
除此之外,如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner,我们这里选择使用 blink planner:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>
如果你想实现自定义格式来解析 Kafka 数据,或者自定义函数,使用下面的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Event> stream1 = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStamp; } })); // 创建表执行环境 tableEnv 是写sql的 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 将DataStream转换成Table Table table = tableEnv.fromDataStream(stream1); // 直接写sql转换 Table table1 = tableEnv.sqlQuery("select user,url,`timestamp` from " + table); Table table2 = table.select($("user"), $("url")).where($("user").isEqual("vv")); tableEnv.toDataStream(table1).print("result"); tableEnv.toDataStream(table2).print("result"); env.execute(); }
public static void main(String[] args) { // 1.1、定义环境配置执行创建表的执行环境 EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .useBlinkPlanner() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); // 1.1、定义环境配置执行创建表的执行环境 EnvironmentSettings setting3 = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build(); TableEnvironment tableEnv3 = TableEnvironment.create(setting3); // 2.1、基于老版本planner进行流处理 EnvironmentSettings settings1 = EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build(); TableEnvironment tableEnv1 = TableEnvironment.create(settings1); // 2.2、基于老版本planner进行批处理 ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv2 = BatchTableEnvironment.create(batchEnv); }
public static void main(String[] args) { // 1.1、定义环境配置执行创建表的执行环境 EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .useBlinkPlanner() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); // 1.1、定义环境配置执行创建表的执行环境 EnvironmentSettings setting3 = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build(); TableEnvironment tableEnv3 = TableEnvironment.create(setting3); // 2.1、基于老版本planner进行流处理 EnvironmentSettings settings1 = EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build(); TableEnvironment tableEnv1 = TableEnvironment.create(settings1); // 2.2、基于老版本planner进行批处理 ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv2 = BatchTableEnvironment.create(batchEnv); String createDDL = "CREATE TABLE clickTable("+ " user STRING," + " url STRING," + " ts BIGINT" + " ) WITH (" + " 'connector' = 'filesystem'"+ " 'path' = 'input/clicks.txt'," + " 'format' = 'csv'" + " )"; tableEnv.executeSql(createDDL); // 创建一张用于输出的表 String createOutDDL = "CREATE TABLE outTable("+ " user STRING," + " url STRING" + " ) WITH (" + " 'connector' = 'filesystem'"+ " 'path' = 'output/clicks.txt'," + " 'format' = 'csv'" + " )"; tableEnv.executeSql(createOutDDL); }
在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建:
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// table is the result of a simple projection query
val projTable: Table = tableEnv.from("X").select(...)
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)
扩展表标识
// get a TableEnvironment val tEnv: TableEnvironment = ...; tEnv.useCatalog("custom_catalog") tEnv.useDatabase("custom_database") val table: Table = ...; // register the view named 'exampleView' in the catalog named 'custom_catalog' // in the database named 'custom_database' tableEnv.createTemporaryView("exampleView", table) // register the view named 'exampleView' in the catalog named 'custom_catalog' // in the database named 'other_database' tableEnv.createTemporaryView("other_database.exampleView", table) // register the view named 'example.View' in the catalog named 'custom_catalog' // in the database named 'custom_database' tableEnv.createTemporaryView("`example.View`", table) // register the view named 'exampleView' in the catalog named 'other_catalog' // in the database named 'other_database' tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)
public static void main(String[] args) { // 1.1、定义环境配置执行创建表的执行环境 EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .useBlinkPlanner() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); // 1.1、定义环境配置执行创建表的执行环境 EnvironmentSettings setting3 = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build(); TableEnvironment tableEnv3 = TableEnvironment.create(setting3); // 2.1、基于老版本planner进行流处理 EnvironmentSettings settings1 = EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build(); TableEnvironment tableEnv1 = TableEnvironment.create(settings1); // 2.2、基于老版本planner进行批处理 ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv2 = BatchTableEnvironment.create(batchEnv); String createDDL = "CREATE TABLE clickTable("+ " user STRING," + " url STRING," + " ts BIGINT" + " ) WITH (" + " 'connector' = 'filesystem'"+ " 'path' = 'input/clicks.txt'," + " 'format' = 'csv'" + " )"; tableEnv.executeSql(createDDL); Table clickTable = tableEnv.from("clickTable"); Table resultTable = clickTable.where($("").isEqual("Bob")) .select($("user"), $("url")); tableEnv.createTemporaryView("resultTable",resultTable); Table resultTable2 = tableEnv.sqlQuery("select user,url from resultTable"); // 创建一张用于输出的表 String createOutDDL = "CREATE TABLE outTable("+ " user STRING," + " url STRING" + " ) WITH (" + " 'connector' = 'filesystem'"+ " 'path' = 'output/clicks.txt'," + " 'format' = 'csv'" + " )"; tableEnv.executeSql(createOutDDL); resultTable2.executeInsert("outTable"); }
// 转换成流进行输出
tableEnv.toDataStream(table1).print("result");
tableEnv.toDataStream(table2).print("result");
// 聚合转换
tableEnv.createTemporaryView("clickTable",table2);
Table agg = tableEnv.sqlQuery("select user,count(user) from clickTable group by user");
// 更新日志操作的流、要去进行修改
tableEnv.toChangelogStream(agg).print();
public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 1、事件事件 // 创建表时指定watermark String createDDL = "CREATE TABLE clickTable("+ " user STRING," + " url STRING," + " ts BIGINT," + " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000) )" + " WATERMARK FOR et as et - INTERVAL '1' SECOND" + " ) WITH (" + " 'connector' = 'filesystem'"+ " 'path' = 'input/clicks.txt'," + " 'format' = 'csv'" + " )"; // 在流转换为表时进行转换 SingleOutputStreamOperator<Event> stream1 = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStamp; } })); tableEnv.fromDataStream(stream1,$("user"),$("url"),$("timeStamp").as("ts"),$("et").rowtime()); // 2、处理事件 // 创建表时指定PROCTIME String createDDL01 = "CREATE TABLE clickTable("+ " user STRING," + " url STRING," + " ts AS PROCTIME()" + " ) WITH (" + " 'connector' = 'filesystem'"+ " 'path' = 'input/clicks.txt'," + " 'format' = 'csv'" + " )"; // 在流转换为表时进行转换 SingleOutputStreamOperator<Event> stream2 = env.addSource(new ClickSource()); Table table = tableEnv.fromDataStream(stream2, $("user"), $("url"), $("ts").proctime());
基于时间的操作、需要定义相关的事件和时间数据来源的信息、在TableAPI和SQL中、会给表单单独提供一个逻辑上的时间字段、专门用来在表处理程序中指示时间。
按照时间语义的不同、我们可以把时间属性的定义分为事件事件(event time)和处理时间(Processing time)
在Flink1.13版本开始、Flink开始使用窗口表值函数(Windowing table-valued functions,Windowinng TVFs)来定义窗口。窗口表值函数是Flink定义的多态表函数(PTF)、可以将表进行扩展后返回、表函数(table Function)可以看作是返回一个表的函数。
目前Flink提供以下几个窗口的TVF():
滚动窗口在SQL中的概念与DataStreamAPI中的定义完全一样、是长度固定、时间对齐、无重叠的窗口、一般用于周期性的计算。
在SQL中通过调用TUMBLE()函数就可以声明一个滚动窗口、只有一个核心窗口大小(SIZE)、在SQL中不考虑计数窗口、所以滚动窗口就是滚动时间窗口、参数中还需要将当前时间属性字段传入;另外、窗口TVF本质上是表函数、可以对表进行扩展、所以还应该把当前查询的表作为参数整体传入
TUMBLE(TABLE EventTable,DESCRIPTOR(ts) , INTERVAL '1' HOUR)
这里基于时间字段TS、对表EventTable中数据开了大小为1小时的滚动窗口、窗口将会表里每一行数据、按照TS的值分配到一个指定的窗口。
滑动窗口的使用与滚动窗口类似、可以通过设置滑动步长来控制统计输出的频率、在SQL中通过调用HOP()来声明滑动窗口、除了也要传入表名、时间属性外、还需要传入窗口大小(size)和滑动步长(side)连个参数。
HOP(TABLE EventTable,DESCRIPTOR(ts) ,INTERVAL '5' MINUTES,INTERVAL '1' HOURS)
累计窗口时窗口TVF中新增的窗口功能、它会在一定的统计周期内进行累计计算、累计窗口有两个核心的参数:最大窗口长度(MAX Window Size)和累计步长(step)。所谓最大窗口长度其实就是我们所说的"统计周期"、最终的目的就是统计这段时间内的数据
CUMULATE(TABLE EventTable,DESCRIPTOE(ts),INTERVAL '1' HOURS,INTERVAL '1' DAYS)
在SQL中、一个很常见的功能就是对某一列的多条数据做一个合并统计、得到一个或多个结果值:比如求和、最大最小值、平均值、这种操作叫做聚合查询。Flink中的SQL是流处理和标准SQL结合产物、所以聚合查询也可以分为两种:流处理中特有的聚合(主要指窗口聚合)以及SQL原生的聚合查询方式。
public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 1、事件事件 // 创建表时指定watermark String createDDL = "CREATE TABLE clickTable("+ " user STRING," + " url STRING," + " ts BIGINT," + " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000) )" + " WATERMARK FOR et as et - INTERVAL '1' SECOND" + " ) WITH (" + " 'connector' = 'filesystem'"+ " 'path' = 'input/clicks.txt'," + " 'format' = 'csv'" + " )"; tableEnv.executeSql(createDDL); // 在流转换为表时进行转换 SingleOutputStreamOperator<Event> stream1 = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStamp; } })); tableEnv.fromDataStream(stream1,$("user"),$("url"),$("timeStamp").as("ts"),$("et").rowtime()); // 2、处理事件 // 创建表时指定PROCTIME String createDDL01 = "CREATE TABLE clickTable("+ " user STRING," + " url STRING," + " ts AS PROCTIME()" + " ) WITH (" + " 'connector' = 'filesystem'"+ " 'path' = 'input/clicks.txt'," + " 'format' = 'csv'" + " )"; // 在流转换为表时进行转换 SingleOutputStreamOperator<Event> stream2 = env.addSource(new ClickSource()); Table table = tableEnv.fromDataStream(stream2, $("user"), $("url"), $("ts").proctime()); // 3.窗口函数 // 3.1、滚动窗口 Table tumbleWindowTable = tableEnv.sqlQuery("select user_name,count(1) as cnt," + " window_end as endT " + " from TABLE(" + " TUMBLE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '10' SECOND)" + " )" + "GROUP BY user_name,window_end,window_start" ); tableEnv.toDataStream(tumbleWindowTable).print(); // 3.2、滑动窗口 Table hopWindowTable = tableEnv.sqlQuery("select user,count(1) as cnt,"+ " window_end as endT " + " from TABLE( " + " HOP(TABLE clickTable,DESCRIPTOR(et),INTERVAL '5' SECOND,INTERVAL '10' SECOND)" + " GROUP BY user,window_end,window_start" ); tableEnv.toDataStream(hopWindowTable).print(); }
按照数据库理论、关系型表的设计往往至少需要满足第三范式(3NF)、表中的列都直接依赖于主键、这样可以避免数据冗余和更新异常、例如商品的订单信息、我们会保存在一个订单表中、而这个表中只有商品ID、详情则需要到"商品表"按照ID去查询、这样的好处是当商品信息发生变化时、只要更新商品表即可、而不需要在订单表中对应这个商品的所有订单进行修改、不过这样一来、我们无法从单独的表中提取想要的数据。
与标准SQL一致、FlinkSQL的常规联结也可以分为内联结(INNER JOIN) 和外联结(OUTER JOIN)、区别在于结果中是否包含不符合条件的行、目前仅支持"等值条件"、作为联结条件、也就是关键字ON后面必须是判断两表中字段相等的逻辑表达式。
内联结用INNER JOIN来定义、会返回两表中符合条件的所有行的组合、也就是所谓的笛卡尔积.
例如之前提到的"订单表"(Order)和"商品表"(Product)的联结查询
SELECT *
FROM Order
INNER JOIN Product
ON Order.product_id = Product.id
与内联结类似、外联结也会返回符合联结条件的所有行的笛卡尔积。另外,还可以将某一侧中找不到任何匹配的行也单独返回、FlinkSQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN)、分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。例如、订单表中未必会包含商品表中所有的ID、为了将哪些没有任何订单的商品信息也查询出来、我们就可以使用右外联结(RIGHT JOIN)、当然、外联结查询目前也仅支持等值联结条件
SELECT *
FROM Order
LEFT JOIN Product
ON Order.product_id = Product.id
SELECT *
FROM Order
RIGHT JOIN Product
ON Order.product_id = Product.id
SELECT *
FROM Order
FULL OUTER JOIN Product
ON Order.product_id = Product.id
在SQL中、我们可以把一些数据的转换操作包装起来、嵌入到SQL查询中统一调用、这就是函数(Functions)
TableAPI
str.upperCase();
SQL
UPPER(str)
FlinkSQL中函数可以分为两类、一类是SQL中内置的系统函数、直接通过函数名调用就可以了、能够实现一些常用的转换操作、比如我们之前用到的COUNT()、CHAR_LENGTH()、UPPER()、而另一类函数则是用户自定义的函数(UDF)、需要在表环境中注册才能使用。
系统函数也叫做内置函数、是在系统中预先实现好的功能模块、我们可以通过固定的函数名直接调用、实现想要的转换操作、FlinkSQL提供了大量的系统函数、几乎支持所有的标准SQL中的操作、这为我们使用SQL编写流处理程序提供了极大的方便。
FlinkSQL中的系统函数又主要分为两大类:标量函数(Scalar Function)和聚合函数(Aggregate Functions)
所谓的"标量"、是指只有数值大小、没有方向的量、所以标量函数指定是只对输入数据做转换操作、返回一个值的函数、这里的输入数据对应在表中、一般就是一行数据中一个或者多个字段、因此这种操作有点像流处理转换算子中的Map、另外、对于一些没有输入参数、直接可以得到唯一结果的函数、也属于标量函数。
标量函数是最常见、也简单的一类函数、数量非常庞大、很多在标准SQL中也有定义。
聚合函数是以表中多个行作为输入、提取字段进行聚合操作的函数、会将唯一的聚合值作为结果返回、聚合函数应用非常广泛、不论分组聚合、窗口聚合还是开窗(Over)聚合、对数据的聚合操作都可以用相同的函数来定义。
标准SQL中、常见的函数的聚合函数FlinkSQL都是支持、目前也在不断
Flink的TableAPI和SQL提供了多种自定义函数的接口、以抽象类的形式定义、当前UDF主要有一下几类。
(1)、注册函数
tableEnv.createTemporarySystemFunction("MyFunction",MyFunction.class);
(2)、使用TableAPI调用函数
tableEnv.from("MyTable").select(call("MyFunction",$("myField")));
(3)、在SQL中调用函数
tableEnv.sqlQuery("SELECT MyFunction(myFiled) FROM MyTable");
有了TableAPI和SQL、我们就可以使用熟悉的SQL来编写语句进行流处理、Flink为我们提供了一个工具来进行Flink程序的编写、测试和提交、这个工具叫做"SQL客户端"。SQL客户端提供了一个命令行交互界面(CLI)、我们可以在里面非常容易编写SQL进行查询、就像MYSQL一样、整个FLINK应用编写、提交的过程全变成写SQL、不需要写一行Java/Scala代码。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。