当前位置:   article > 正文

Flink Table API 输出到MySql报错:Required context properties mismatch.

required context properties mismatch.


代码:

  1. package com.tabletest
  2. import com.apitest.SensorReading
  3. import org.apache.flink.streaming.api.scala._
  4. import org.apache.flink.table.api.scala._
  5. import org.apache.flink.table.api.{DataTypes, Table}
  6. /**
  7. * @Description TODO
  8. * @Author 海若
  9. * @Date 2019/1/1 1:51
  10. * @Version 1.0
  11. */
  12. object MySqlOutPutTest {
  13. def main(args: Array[String]): Unit = {
  14. val env = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. // 1. 创建表执行环境
  17. val tableEnv = StreamTableEnvironment.create(env)
  18. // 2. 读取数据转换成流,map成样例类
  19. val filePath: String = "F:\\IDEA_HAIRUO_BIGDATA\\FlinkTutorial\\src\\main\\resources\\sources"
  20. val inputStream: DataStream[String] = env.readTextFile(filePath)
  21. // map成样例类类型
  22. val dataStream: DataStream[SensorReading] = inputStream
  23. .map(data => {
  24. val dataArray = data.split(",")
  25. SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
  26. })
  27. // 3. 把流转换成表
  28. val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature as 'temp, 'timestamp as 'ts)
  29. // 4. 进行表的转换操作
  30. // 4.1 简单查询转换
  31. val resultTable: Table = sensorTable
  32. .select('id, 'temp)
  33. .filter('id === "sensor_1")
  34. // 4.2 聚合转换
  35. val aggResultTable: Table = sensorTable
  36. .groupBy('id)
  37. .select('id, 'id.count as 'count)
  38. // 5. 将结果表输出到Mysql中
  39. val sinkDDL: String =
  40. """|create table jdbcOutputTable (
  41. | id varchar(20) not null,
  42. | temp double not null
  43. |) with (
  44. | 'connector.type' = 'jdbc',
  45. | 'connector.url' = 'jdbc:mysql://hadoop102:3306/FlinkTest',
  46. | 'connector.table' = 'sensor_count',
  47. | 'connector.driver' = 'com.mysql.jdbc.Driver',
  48. | 'connector.username' = 'root',
  49. | 'connector.password' = '123456'
  50. |)""".stripMargin
  51. tableEnv.sqlUpdate(sinkDDL) // 执行 DDL创建表
  52. resultTable.insertInto("jdbcOutputTable")
  53. // aggResultTable.insertInto("jdbcOutputTable")
  54. env.execute("MySql output test job")
  55. }
  56. }

报错:

  1. Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
  2. the classpath.
  3. Reason: Required context properties mismatch.
  4. The following properties are requested:
  5. connector.driver=com.mysql.jdbc.Driver
  6. connector.password=123456
  7. connector.table=sensor_count
  8. connector.type=jdbc
  9. connector.url=jdbc:mysql://hadoop102:3306/FlinkTest
  10. connector.username=root
  11. schema.0.data-type=VARCHAR(2147483647)
  12. schema.0.name=id
  13. schema.1.data-type=DOUBLE
  14. schema.1.name=temp
  15. The following factories have been considered:
  16. org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
  17. org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
  18. org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
  19. org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSinkFactory
  20. org.apache.flink.table.sinks.CsvBatchTableSinkFactory
  21. org.apache.flink.table.sinks.CsvAppendTableSinkFactory
  22. at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
  23. at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
  24. at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
  25. at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
  26. at org.apache.flink.table.planner.StreamPlanner.getTableSink(StreamPlanner.scala:447)
  27. at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:144)
  28. at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117)
  29. at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117)
  30. at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  31. at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  32. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  33. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  34. at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  35. at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  36. at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  37. at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  38. at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
  39. at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
  40. at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
  41. at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
  42. at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
  43. at com.atguigu.tabletest.MySqlOutPutTest$.main(MySqlOutPutTest.scala:60)
  44. at com.atguigu.tabletest.MySqlOutPutTest.main(MySqlOutPutTest.scala)

错误分析:主要看这一句

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath.

大概意思是找不到合适的工厂类去创建表


解决办法:

在POM文件添加依赖即可

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-jdbc_2.11</artifactId>
  4. <version>1.10.0</version>
  5. </dependency>


最终成功将Flink流转换成的表的查询结果写入Mysql数据库

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/278808
推荐阅读
相关标签
  

闽ICP备14008679号