当前位置:   article > 正文

Flink从入门到真香(17、使用flink table api 输出到文件和kafka)

flink写出到文件

对于流式查询,需要声明如何在表和外部连接器之间进行转换
与外部系统交换的消息类型,由更新模式(update model)指定,下面3种,能使用那种模式取决于输出的目标,比如如果输出到文件你就没法用更新和撤回模式,因为不知道,只能追加,但是如果换成mysql就都可以用

  1. 追加模式(Append)--文件系统只支持追加模式
    表只做插入操作,和外部连接器只交换插入(insert)消息
  2. 撤回模式(Retract)--先删除再插入,实现更新操作
    表和外部连接器交换添加(Add)和撤回(Retract)消息
    插入操作(insert)编码为add消息;删除(delete)编码为retract消息;更新(update)编码为上一条的retract和下一条的add消息
  3. 更新插入模式(upsert)
    更新和插入都被编码为upsert消息;删除编码为delete消息

栗子1-从一个文件读出来,做一波操作写到另一个文件

  1. /**
  2. *
  3. * @author mafei
  4. * @date 2020/11/22
  5. */
  6. package com.mafei.apitest.tabletest
  7. import org.apache.flink.streaming.api.scala._
  8. import org.apache.flink.table.api.DataTypes
  9. import org.apache.flink.table.api.scala._
  10. import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
  11. object FileOutputTest {
  12. def main(args: Array[String]): Unit = {
  13. //1 、创建环境
  14. val env = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. val tableEnv = StreamTableEnvironment.create(env)
  17. //2、读取文件
  18. val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt"
  19. tableEnv.connect(new FileSystem().path(filePath))
  20. .withFormat(new Csv()) //因为txt里头是以,分割的跟csv一样,所以可以用oldCsv
  21. .withSchema(new Schema() //这个表结构要跟你txt中的内容对的上
  22. .field("id", DataTypes.STRING())
  23. .field("timestamp", DataTypes.BIGINT())
  24. .field("temper", DataTypes.DOUBLE())
  25. ).createTemporaryTable("inputTable")
  26. val sensorTable = tableEnv.from("inputTable")
  27. //做简单转换
  28. val simpleTramsformTable = sensorTable
  29. .select("id,temper")
  30. .filter("id='sensor1'")
  31. //聚合转换
  32. val aggTable = sensorTable
  33. .groupBy('id)
  34. .select('id, 'id.count as 'count)
  35. //直接打印输出效果:
  36. simpleTramsformTable.toAppendStream[(String, Double)].print("simpleTramsformTable: ")
  37. //聚合的结果就不能用toAppendStream 因为他实现的是后面再来一条数据,表中就会增加一条,但是聚合的不是,是要更新之前的结果
  38. aggTable.toRetractStream[(String, Long)].print("aggTable")
  39. /**
  40. * 输出的效果:
  41. * aggTable> (true,(sensor1,1))
  42. * simpleTramsformTable: > (sensor1,1.0)
  43. * aggTable> (true,(sensor2,1))
  44. * aggTable> (true,(sensor3,1))
  45. * aggTable> (true,(sensor4,1))
  46. * aggTable> (false,(sensor4,1)) //false代表重新计算了
  47. * aggTable> (true,(sensor4,2))
  48. * aggTable> (false,(sensor4,2))
  49. * aggTable> (true,(sensor4,3))
  50. */
  51. // 输出到文件中
  52. val outputPath = "/opt/java2020_study/maven/flink1/src/main/resources/output.txt"
  53. tableEnv.connect(new FileSystem().path(outputPath))
  54. .withFormat(new Csv())
  55. .withSchema(
  56. new Schema()
  57. .field("id", DataTypes.STRING())
  58. .field("temper", DataTypes.DOUBLE())
  59. )
  60. .createTemporaryTable("outputTable")
  61. simpleTramsformTable.insertInto("outputTable")
  62. env.execute("file ouput")
  63. }
  64. }

代码结构及运行效果

Flink从入门到真香(17、使用flink table api 输出到文件和kafka)

第二个栗子, 从kakfa的一个topic读出来,写到另一个topic里头

  1. /**
  2. *
  3. * @author mafei
  4. * @date 2020/11/23
  5. */
  6. package com.mafei.apitest.tabletest
  7. import org.apache.flink.streaming.api.scala._
  8. import org.apache.flink.table.api.DataTypes
  9. import org.apache.flink.table.api.scala._
  10. import org.apache.flink.table.descriptors.{Csv, Kafka, Schema}
  11. object KafkaOutputTest {
  12. def main(args: Array[String]): Unit = {
  13. //1 、创建环境
  14. val env = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. val tableEnv = StreamTableEnvironment.create(env)
  17. //2、从kafka中读取数据
  18. tableEnv.connect(
  19. new Kafka()
  20. .version("0.11")
  21. .topic("sourceTopic")
  22. .startFromLatest()
  23. .property("zookeeper.connect", "localhost:2181")
  24. .property("bootstrap.servers", "localhost:9092")
  25. ).withFormat(new Csv())
  26. .withSchema(new Schema() // 这个表结构要跟你kafka中的内容对的上
  27. .field("id", DataTypes.STRING())
  28. .field("timestamp", DataTypes.BIGINT())
  29. .field("temperature", DataTypes.DOUBLE())
  30. )
  31. .createTemporaryTable("kafkaInputTable")
  32. val sensorTable = tableEnv.from("kafkaInputTable")
  33. //做简单转换
  34. val simpleTramsformTable = sensorTable
  35. .select("id,temperature")
  36. .filter("id='sensor1'")
  37. tableEnv.connect(
  38. new Kafka()
  39. .version("0.11")
  40. .topic("sinkTopic")
  41. .startFromLatest()
  42. .property("zookeeper.connect", "localhost:2181")
  43. .property("bootstrap.servers", "localhost:9092")
  44. ).withFormat(new Csv())
  45. .withSchema(new Schema() //这个表结构要跟你kafka中的内容对的上
  46. .field("id", DataTypes.STRING())
  47. .field("temper", DataTypes.DOUBLE())
  48. )
  49. .createTemporaryTable("kafkaOutputTable")
  50. simpleTramsformTable.insertInto("kafkaOutputTable")
  51. env.execute("kafka sink test by table api")
  52. }
  53. }

这时候就可以起2个窗口,一个窗口往"sourceTopic" 这个topic里面写,Flink程序会从这个topic里面读出来写到"sinkTopic" 这个topic里面,再起一个consumer的命令行去消费这个topic就可以看到效果了

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/43904
推荐阅读
相关标签
  

闽ICP备14008679号