当前位置:   article > 正文

Flink Table API & SQL 编程_flink1.16 table api demo

flink1.16 table api demo

目录

Maven依赖

DEMO1 消费Kafka主题数据,生产主题数据

DEMO2 Table与DataStream的互相转换

DEMO3 时间窗口

sql-client的使用

执行建表语句

filesystem数据源

kafka数据源

执行查询语句



Maven依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-clients_2.11</artifactId>
  5. <version>${flink.version}</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  10. <version>${flink.version}</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-table-planner-blink_2.11</artifactId>
  15. <version>${flink.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-streaming-scala_2.11</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-streaming-java_2.11</artifactId>
  25. <version>${flink.version}</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-table-common</artifactId>
  30. <version>${flink.version}</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.flink</groupId>
  34. <artifactId>flink-connector-kafka_2.11</artifactId>
  35. <version>${flink.version}</version>
  36. </dependency>
  37. <!--json-->
  38. <dependency>
  39. <groupId>org.apache.flink</groupId>
  40. <artifactId>flink-json</artifactId>
  41. <version>${flink.version}</version>
  42. </dependency>
  43. <!--csv-->
  44. <dependency>
  45. <groupId>org.apache.flink</groupId>
  46. <artifactId>flink-csv</artifactId>
  47. <version>${flink.version}</version>
  48. </dependency>
  49. </dependencies>

DEMO1 消费Kafka主题数据,生产主题数据

  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.table.api.Table;
  3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  4. /**
  5. * Description: 从Kafka主题kafkaInput读取数据,筛选后将指定数据写入kafkaOutput
  6. * 数据样式:
  7. * {"id":"1","user_id":"1001","status":"1"}
  8. * {"id":"2","user_id":"1002","status":"1"}
  9. * {"id":"3","user_id":"1003","status":"1"}
  10. * {"id":"4","user_id":"1004","status":"1"}
  11. * {"id":"5","user_id":"1005","status":"0"}
  12. */
  13. public class Demo01 {
  14. public static void main(String[] args) throws Exception {
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  17. tableEnv.executeSql("CREATE TABLE kafka_input(id BIGINT,user_id BIGINT," +
  18. "status STRING) " +
  19. "WITH ('connector' = 'kafka','topic' = 'kafkaInput'," +
  20. "'properties.bootstrap.servers' = 'vm01:9092'," +
  21. "'properties.group.id' = 'testGroup'," +
  22. "'scan.startup.mode' = 'latest-offset'," +
  23. "'format' = 'json')");
  24. Table table = tableEnv.sqlQuery("SELECT * FROM kafka_input WHERE status='1'");
  25. tableEnv.executeSql("CREATE TABLE kafka_output(id BIGINT,user_id BIGINT," +
  26. "status STRING)" +
  27. " WITH ('connector'='kafka','topic'='kafkaOutput'," +
  28. "'properties.bootstrap.servers' = 'vm01:9092'," +
  29. "'format'='json'," +
  30. "'sink.partitioner'='round-robin')");
  31. tableEnv.executeSql("INSERT INTO kafka_output SELECT * FROM " + table);
  32. // 会报错:No operators defined in streaming topology. Cannot execute.但不影响程序运行
  33. // tableEnv.execute("Run TableapiKafkaDemo");
  34. }
  35. }

DEMO2 Table与DataStream的互相转换

  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. public class Demo02 {
  6. public static void main(String[] args) throws Exception {
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  9. // kafka table -> stream
  10. // table2Stream(tEnv);
  11. // stream -> table -> stream
  12. stream2Table(env, tEnv);
  13. env.execute();
  14. }
  15. /**
  16. * kafka table -> stream print()
  17. * 数据格式:1,Jordan,male
  18. * @param tEnv StreamTableEnvironment
  19. */
  20. private static void table2Stream(StreamTableEnvironment tEnv) {
  21. String createTableSql = "CREATE TABLE user_info(" +
  22. "id INT," +
  23. "name STRING," +
  24. "gender STRING" +
  25. ")WITH(" +
  26. "'connector'='kafka'," +
  27. "'topic'='user_info'," +
  28. "'properties.bootstrap.servers'='vm01:9092'," +
  29. "'properties.group.id'='testGroup'," +
  30. "'scan.startup.mode'='latest-offset'," +
  31. "'format'='csv'" +
  32. ")";
  33. tEnv.executeSql(createTableSql);
  34. Table table = tEnv.sqlQuery("SELECT * FROM user_info");
  35. tEnv.toDataStream(table).print();
  36. }
  37. /**
  38. * stream -> table -> stream
  39. *
  40. * @param env StreamExecutionEnvironment
  41. * @param tEnv StreamTableEnvironment
  42. */
  43. private static void stream2Table(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {
  44. DataStream<String> dataStream = env.fromElements("Jordan", "Alice", "Tom");
  45. Table inputTable = tEnv.fromDataStream(dataStream);
  46. tEnv.createTemporaryView("INPUT_TABLE", inputTable);
  47. Table resultTable = tEnv.sqlQuery("SELECT UPPER(f0) FROM INPUT_TABLE");
  48. resultTable.printSchema();
  49. tEnv.toDataStream(resultTable).print();
  50. }
  51. }

DEMO3 时间窗口

建表时定义一个TIMESTAMP(3)数据格式的列作为时间窗口取值列,并通过:WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND 指定水位线为5秒,然后正常开窗和使用聚合函数。

  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.table.api.Slide;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.Tumble;
  5. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  6. import static org.apache.flink.table.api.Expressions.$;
  7. import static org.apache.flink.table.api.Expressions.lit;
  8. /**
  9. * Kafka数据格式:1,jordan,79,2022-03-05 00:00:30
  10. */
  11. public class Demo03 {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  15. String createTableSql = "CREATE TABLE user_info(" +
  16. "id INT," +
  17. "name STRING," +
  18. "score INT," +
  19. "row_time TIMESTAMP(3)," +
  20. "WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND" +
  21. ")WITH(" +
  22. "'connector'='kafka'," +
  23. "'topic'='t_score'," +
  24. "'properties.bootstrap.servers'='vm01:9092'," +
  25. "'properties.group.id'='testGroup'," +
  26. "'scan.startup.mode'='latest-offset'," +
  27. "'format'='csv'" +
  28. ")";
  29. tEnv.executeSql(createTableSql);
  30. Table table = tEnv.sqlQuery("SELECT * FROM user_info");
  31. Table resultTable = table.window(Tumble.over(lit(10).seconds()).on($("row_time")).as("w"))
  32. // table.window(Slide.over(lit(10).seconds()).every(lit(20).seconds()).on($("row_time")).as("w"))
  33. .groupBy($("name"), $("w"))
  34. .select($("name"), $("score").avg(), $("w").end().as("hour"));
  35. resultTable.printSchema();
  36. tEnv.toDataStream(resultTable).print();
  37. env.execute("Demo03");
  38. }
  39. }

sql-client的使用

  1. # 启动集群
  2. ${FLINK_HOME}/bin/start-cluster.sh
  3. # 启动命令行
  4. ${FLINK_HOME}/bin/sql-client.sh

此时可以交互式执行sql语句。

此外,可以通过使用以下方式执行sql脚本

  1. # 其中:~/file.sql为可正常执行的sql脚本,-f也可换成--file
  2. ${FLINK_HOME}/bin/sql-client.sh -f ~/file.sql

执行建表语句

如果需要使用时间窗口,则必须指定watermark

filesystem数据源

  1. CREATE TABLE bid(
  2. bidtime TIMESTAMP(3),
  3. price DECIMAL(10, 2),
  4. item STRING,
  5. WATERMARK FOR bidtime AS bidtime - INTERVAL '10' MINUTES
  6. ) WITH (
  7. 'connector' = 'filesystem',
  8. 'path' = '/root/data/bid.csv',
  9. 'format' = 'csv'
  10. );

kafka数据源

  1. CREATE TABLE user_info(
  2. id INT,
  3. name STRING,
  4. score INT,
  5. row_time TIMESTAMP(3),
  6. WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
  7. ) WITH (
  8. 'connector'='kafka',
  9. 'topic'='t_score',
  10. 'properties.bootstrap.servers'='vm01:9092',
  11. 'properties.group.id'='testGroup',
  12. 'scan.startup.mode'='latest-offset',
  13. 'format'='csv'
  14. );

执行查询语句

一般的查询语句与通用SQL类似,对于内置函数,可以通过SHOW FUNCTIONS进行查看。

下面演示的是窗口函数的使用。

  1. # 所查询的表为章节【执行建表语句->filesystem数据源】所建的表
  2. SELECT window_start, window_end, SUM(price) FROM TABLE
  3. (TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  4. GROUP BY window_start, window_end;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/43841
推荐阅读
相关标签
  

闽ICP备14008679号