赞
踩
跟标量函数一样,表函数的输入参数也可以是 0 个、1 个或多个标量值;不同的是,它可以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系。之前我们介绍过的窗口 TVF,本质上就是表函数。
类似地,要实现自定义的表函数,需要自定义类来继承抽象类 TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction 类本身是有一个泛型参数T 的,这就是表函数返回数据的类型;而 eval()方法没有返回类型,内部也没有 return语句,是通过调用 collect()方法来发送想要输出的行数据的。这和 Hive 中的 UDTF 非常相似。那对于原先输入的整张表来说,又该得到什么呢?一个简单的想法是,就让输入表中的每一行,与它转换得到的表进行联结(join),然后再拼成一个完整的大表,这就相当于对原来的表进行了扩展。在 Hive 的 SQL 语法中,提供了“侧向视图”(lateral view,也叫横向视图)的功能,可以将表中的一行数据拆分成多行;Flink SQL 也有类似的功能,是用 LATERAL TABLE 语法来实现的。
在 SQL 中调用表函数,需要使用 LATERAL TABLE(< TableFunction >)来生成扩展的“侧向表”,然后与原始表进行联结(Join)。这里的 Join 操作可以是直接做交叉联结(cross join),在 FROM 后用逗号分隔两个表就可以;也可以是以 ON TRUE 为条件的左联结(LEFT JOIN)。
下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数MySplit,可以将一个字符串转换成(字符串,长度)的二元组。
public class UdfTest_TableFunction { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //1.在创建表的DDL中直接定义时间属性 String creatDDL = "CREATE TABLE clickTable (" + "user_name STRING," + "url STRING," + "ts BIGINT," + "et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000))," + //事件时间 FROM_UNIXTIME() 能转换为年月日时分秒这样的格式 转换秒 " WATERMARK FOR et AS et - INTERVAL '1' SECOND " + //watermark 延迟一秒 ")WITH(" + " 'connector' = 'filesystem'," + " 'path' = 'input/clicks.txt'," + " 'format' = 'csv'" + ")"; tableEnv.executeSql(creatDDL); //2.注册自定义表函数 tableEnv.createTemporarySystemFunction("MySplit", MySplit.class); //3.调用UDF进行查询转换 Table resultTable = tableEnv.sqlQuery("select user_name,url,word,length " + "from clickTable,LATERAL TABLE(MySplit(url)) AS T(word,length)" ); //4.转换成流打印 tableEnv.toDataStream(resultTable).print(); env.execute(); } //实现自定义的表函数 public static class MySplit extends TableFunction<Tuple2<String,Integer>>{ public void eval(String str){ String[] fields = str.split("\\?"); for (String field : fields){ collect(Tuple2.of(field,field.length())); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。