赞
踩
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.9.3</version>
</dependency>

def main(args: Array[String]): Unit = { //创建flink环境对象 val env = StreamExecutionEnvironment.getExecutionEnvironment //获取数据 val inputStream = env.readTextFile("..\\sensor.txt") //map为javabean对象 val dataStream = inputStream .map( data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) } // 基于 env 创建 tableEnv val settings: EnvironmentSettings = EnvironmentSettings.newInstance() .useOldPlanner() .inStreamingMode() .build() //创建表的环境 val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) //方式一: //从一条流创建一张表 val fTable: Table = tableEnv.fromDataStream(dataStream) // 从表里选取特定的数据 val selectedTable: Table = fTable.select('id, 'temperature) .filter("id = 'sensor_1'") //转化为DataStream val selectedStream: DataStream[(String, Double)] = selectedTable .toAppendStream[(String, Double)] //打印输出 selectedStream.print() //方式 二 //注册一张表 val rTable: Table = tableEnv.registerDataStream("t_SensorReading",dataStream) //sql var senTable:Table = rTable.sql("select * from t_SensorReading") //转化为DataStream val senStream: DataStream[(String, Double)] = senTable.toRetractStream(Row.class) //输出 senStream.print() env.execute("table test")
TableEnvironment 组成部分如下:
Flink 1.9 中保留了 5 个 TableEnvironment,在实现上是 5 个面向用户的接口,在接口底层进行了不同的实现。
5 个接口包括一个 TableEnvironment 接口,两个 BatchTableEnvironment 接口,两个 StreamTableEnvironment 接口
org/apache/flink/table/api/TableEnvironment.java
org/apache/flink/table/api/java/BatchTableEnvironment.java
org/apache/flink/table/api/scala/BatchTableEnvironment.scala
org/apache/flink/table/api/java/StreamTableEnvironment.java
org/apache/flink/table/api/scala/StreamTableEnvironment.scala

TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:
TableDataStream 或 DataSet 转换成 TableExecutionEnvironment 或 StreamExecutionEnvironment 的引用通过静态方法 BatchTableEnvironment.create() 或者 StreamTableEnvironment.create() 中创建。
如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table
tableEnv.fromDataStream(dataStream)
或者根据字段顺序单独命名
tableEnv.fromDataStream(dataStream,’id,’timestamp .......)
最后的动态表可以转换为流进行输出
table.toAppendStream[(String,String)]
如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table
tableEnv.registerDataStrem("t_table",dataStream);
最后转换为留输出
tableEnv.toRetractStream(table,Row.class);
//创建视图
tableEnv.createTemporaryView("t_table",dataStream);
//转换成流
tableEnv.toAppendStream(table, Row.class).print();
//创建零时表
tableEnv.createTemporaryTable("inputTable",dataStream);
//转换成流
tableEnv.toAppendStream(inputTable, Row.class).print();
Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。
Event Time事件时间
为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。
在DataStream 转换成 Table,schema 的定义期间,使用.rowtime 可以定义事件时间属性。注意,必须在转换的数据流中分配时间戳和 watermark。
在将数据流转换为表时,有两种定义时间属性的方法。根据指定的.rowtime 字段名是否存在于数据流的架构中,timestamp 字段可以:
1、作为新字段追加到 schema
2、替换现有字段
SELECT 用于从 DataSet/DataStream 中选择数据,用于筛选出某些列。
WHERE 用于从数据集/流中过滤数据,与 SELECT 一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。
DISTINCT 用于从数据集/流中去重根据 SELECT 的结果进行去重。
GROUP BY 是对数据进行分组操作。
UNION 用于将两个结果集合并起来,要求两个结果集字段完全一致,包括字段类型、字段顺序。不同于 UNION ALL 的是,UNION 会对结果数据去重。
JOIN 用于把来自两个表的数据联合起来形成结果表,Flink 支持的 JOIN 类型包括:
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。