赞
踩
目录
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!--json-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!--csv-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-csv</artifactId>
- <version>${flink.version}</version>
- </dependency>
- </dependencies>

- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
- /**
- * Description: 从Kafka主题kafkaInput读取数据,筛选后将指定数据写入kafkaOutput
- * 数据样式:
- * {"id":"1","user_id":"1001","status":"1"}
- * {"id":"2","user_id":"1002","status":"1"}
- * {"id":"3","user_id":"1003","status":"1"}
- * {"id":"4","user_id":"1004","status":"1"}
- * {"id":"5","user_id":"1005","status":"0"}
- */
- public class Demo01 {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
- tableEnv.executeSql("CREATE TABLE kafka_input(id BIGINT,user_id BIGINT," +
- "status STRING) " +
- "WITH ('connector' = 'kafka','topic' = 'kafkaInput'," +
- "'properties.bootstrap.servers' = 'vm01:9092'," +
- "'properties.group.id' = 'testGroup'," +
- "'scan.startup.mode' = 'latest-offset'," +
- "'format' = 'json')");
- Table table = tableEnv.sqlQuery("SELECT * FROM kafka_input WHERE status='1'");
- tableEnv.executeSql("CREATE TABLE kafka_output(id BIGINT,user_id BIGINT," +
- "status STRING)" +
- " WITH ('connector'='kafka','topic'='kafkaOutput'," +
- "'properties.bootstrap.servers' = 'vm01:9092'," +
- "'format'='json'," +
- "'sink.partitioner'='round-robin')");
- tableEnv.executeSql("INSERT INTO kafka_output SELECT * FROM " + table);
- // 会报错:No operators defined in streaming topology. Cannot execute.但不影响程序运行
- // tableEnv.execute("Run TableapiKafkaDemo");
- }
- }

- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
-
- public class Demo02 {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- // kafka table -> stream
- // table2Stream(tEnv);
- // stream -> table -> stream
- stream2Table(env, tEnv);
- env.execute();
- }
-
- /**
- * kafka table -> stream print()
- * 数据格式:1,Jordan,male
- * @param tEnv StreamTableEnvironment
- */
- private static void table2Stream(StreamTableEnvironment tEnv) {
- String createTableSql = "CREATE TABLE user_info(" +
- "id INT," +
- "name STRING," +
- "gender STRING" +
- ")WITH(" +
- "'connector'='kafka'," +
- "'topic'='user_info'," +
- "'properties.bootstrap.servers'='vm01:9092'," +
- "'properties.group.id'='testGroup'," +
- "'scan.startup.mode'='latest-offset'," +
- "'format'='csv'" +
- ")";
- tEnv.executeSql(createTableSql);
- Table table = tEnv.sqlQuery("SELECT * FROM user_info");
- tEnv.toDataStream(table).print();
- }
-
- /**
- * stream -> table -> stream
- *
- * @param env StreamExecutionEnvironment
- * @param tEnv StreamTableEnvironment
- */
- private static void stream2Table(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {
- DataStream<String> dataStream = env.fromElements("Jordan", "Alice", "Tom");
- Table inputTable = tEnv.fromDataStream(dataStream);
- tEnv.createTemporaryView("INPUT_TABLE", inputTable);
- Table resultTable = tEnv.sqlQuery("SELECT UPPER(f0) FROM INPUT_TABLE");
- resultTable.printSchema();
- tEnv.toDataStream(resultTable).print();
- }
- }

建表时定义一个TIMESTAMP(3)数据格式的列作为时间窗口取值列,并通过:WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND 指定水位线为5秒,然后正常开窗和使用聚合函数。
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Slide;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.Tumble;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
- import static org.apache.flink.table.api.Expressions.$;
- import static org.apache.flink.table.api.Expressions.lit;
-
- /**
- * Kafka数据格式:1,jordan,79,2022-03-05 00:00:30
- */
- public class Demo03 {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- String createTableSql = "CREATE TABLE user_info(" +
- "id INT," +
- "name STRING," +
- "score INT," +
- "row_time TIMESTAMP(3)," +
- "WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND" +
- ")WITH(" +
- "'connector'='kafka'," +
- "'topic'='t_score'," +
- "'properties.bootstrap.servers'='vm01:9092'," +
- "'properties.group.id'='testGroup'," +
- "'scan.startup.mode'='latest-offset'," +
- "'format'='csv'" +
- ")";
- tEnv.executeSql(createTableSql);
- Table table = tEnv.sqlQuery("SELECT * FROM user_info");
- Table resultTable = table.window(Tumble.over(lit(10).seconds()).on($("row_time")).as("w"))
- // table.window(Slide.over(lit(10).seconds()).every(lit(20).seconds()).on($("row_time")).as("w"))
- .groupBy($("name"), $("w"))
- .select($("name"), $("score").avg(), $("w").end().as("hour"));
- resultTable.printSchema();
- tEnv.toDataStream(resultTable).print();
-
- env.execute("Demo03");
- }
- }

- # 启动集群
- ${FLINK_HOME}/bin/start-cluster.sh
- # 启动命令行
- ${FLINK_HOME}/bin/sql-client.sh
此时可以交互式执行sql语句。
此外,可以通过使用以下方式执行sql脚本
- # 其中:~/file.sql为可正常执行的sql脚本,-f也可换成--file
- ${FLINK_HOME}/bin/sql-client.sh -f ~/file.sql
如果需要使用时间窗口,则必须指定watermark
- CREATE TABLE bid(
- bidtime TIMESTAMP(3),
- price DECIMAL(10, 2),
- item STRING,
- WATERMARK FOR bidtime AS bidtime - INTERVAL '10' MINUTES
- ) WITH (
- 'connector' = 'filesystem',
- 'path' = '/root/data/bid.csv',
- 'format' = 'csv'
- );
- CREATE TABLE user_info(
- id INT,
- name STRING,
- score INT,
- row_time TIMESTAMP(3),
- WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
- ) WITH (
- 'connector'='kafka',
- 'topic'='t_score',
- 'properties.bootstrap.servers'='vm01:9092',
- 'properties.group.id'='testGroup',
- 'scan.startup.mode'='latest-offset',
- 'format'='csv'
- );
一般的查询语句与通用SQL类似,对于内置函数,可以通过SHOW FUNCTIONS进行查看。
下面演示的是窗口函数的使用。
- # 所查询的表为章节【执行建表语句->filesystem数据源】所建的表
- SELECT window_start, window_end, SUM(price) FROM TABLE
- (TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
- GROUP BY window_start, window_end;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。