赞
踩
package cn.edu.tju.demo3; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.*; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.*; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; public class Test49 { private static String HOST_NAME = "xx.xx.xx.xx"; private static int PORT = 9999; private static String DELIMITER ="\n"; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<String> socketDataInfo = env.socketTextStream(HOST_NAME, PORT, DELIMITER); SingleOutputStreamOperator<DataInfo> dataInfoStream = socketDataInfo.map(new MapFunction<String, DataInfo>() { @Override public DataInfo map(String value) throws Exception { String[] stringList = value.split(","); DataInfo dataInfo = new DataInfo(Long.parseLong( stringList[0]), stringList[1], Double.parseDouble(stringList[2])); return dataInfo; } }); Table dataTable = tableEnv.fromDataStream(dataInfoStream,"ts,info,val"); tableEnv.registerFunction("myTableFunction", new MyTableFunction()); Table resultTable = dataTable.select("ts,info,val") .joinLateral("myTableFunction(val) as(a,b)") .select("ts,info,a,b"); tableEnv.createTemporaryView("dataInfo", dataTable); Table resultTableSql = tableEnv.sqlQuery( "select ts,info,val,a,b from dataInfo,LATERAL TABLE(myTableFunction(val)) as res (a,b)" ); tableEnv.toAppendStream(resultTable, Row.class).print(); tableEnv.toAppendStream(resultTableSql, Row.class).print("sql"); env.execute("my job"); } public static class DataInfo{ private long ts; private String info; private double val; public long getTs() { return ts; } public void setTs(long ts) { this.ts = ts; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } public double getVal() { return val; } public void setVal(double val) { this.val = val; } @Override public String toString() { return "DataInfo{" + "ts=" + ts + ", info='" + info + '\'' + ", val='" + val + '\'' + '}'; } public DataInfo(long ts, String info, double val) { this.ts = ts; this.info = info; this.val = val; } public DataInfo() { } } //自定义表函数,必须定义一个public的 名字为eval方法 public static class MyTableFunction extends TableFunction<Tuple2<Double, Double>>{ public void eval(double d){ collect(new Tuple2<>(d, Math.floor(d))); } } }
nc -lk 9999
输入:
1690000001,ffff,87.12
执行结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。