赞
踩
代码:
- package com.tabletest
-
- import com.apitest.SensorReading
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.table.api.scala._
- import org.apache.flink.table.api.{DataTypes, Table}
-
-
- /**
- * @Description TODO
- * @Author 海若
- * @Date 2019/1/1 1:51
- * @Version 1.0
- */
- object MySqlOutPutTest {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
-
- // 1. 创建表执行环境
- val tableEnv = StreamTableEnvironment.create(env)
-
- // 2. 读取数据转换成流,map成样例类
- val filePath: String = "F:\\IDEA_HAIRUO_BIGDATA\\FlinkTutorial\\src\\main\\resources\\sources"
- val inputStream: DataStream[String] = env.readTextFile(filePath)
- // map成样例类类型
- val dataStream: DataStream[SensorReading] = inputStream
- .map(data => {
- val dataArray = data.split(",")
- SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
- })
-
- // 3. 把流转换成表
- val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature as 'temp, 'timestamp as 'ts)
- // 4. 进行表的转换操作
- // 4.1 简单查询转换
- val resultTable: Table = sensorTable
- .select('id, 'temp)
- .filter('id === "sensor_1")
- // 4.2 聚合转换
- val aggResultTable: Table = sensorTable
- .groupBy('id)
- .select('id, 'id.count as 'count)
-
- // 5. 将结果表输出到Mysql中
- val sinkDDL: String =
- """|create table jdbcOutputTable (
- | id varchar(20) not null,
- | temp double not null
- |) with (
- | 'connector.type' = 'jdbc',
- | 'connector.url' = 'jdbc:mysql://hadoop102:3306/FlinkTest',
- | 'connector.table' = 'sensor_count',
- | 'connector.driver' = 'com.mysql.jdbc.Driver',
- | 'connector.username' = 'root',
- | 'connector.password' = '123456'
- |)""".stripMargin
- tableEnv.sqlUpdate(sinkDDL) // 执行 DDL创建表
- resultTable.insertInto("jdbcOutputTable")
- // aggResultTable.insertInto("jdbcOutputTable")
-
- env.execute("MySql output test job")
- }
- }
报错:
- Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
- the classpath.
-
- Reason: Required context properties mismatch.
-
- The following properties are requested:
- connector.driver=com.mysql.jdbc.Driver
- connector.password=123456
- connector.table=sensor_count
- connector.type=jdbc
- connector.url=jdbc:mysql://hadoop102:3306/FlinkTest
- connector.username=root
- schema.0.data-type=VARCHAR(2147483647)
- schema.0.name=id
- schema.1.data-type=DOUBLE
- schema.1.name=temp
-
- The following factories have been considered:
- org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
- org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
- org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
- org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSinkFactory
- org.apache.flink.table.sinks.CsvBatchTableSinkFactory
- org.apache.flink.table.sinks.CsvAppendTableSinkFactory
- at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
- at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
- at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
- at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
- at org.apache.flink.table.planner.StreamPlanner.getTableSink(StreamPlanner.scala:447)
- at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:144)
- at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117)
- at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117)
- at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
- at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
- at scala.collection.Iterator$class.foreach(Iterator.scala:893)
- at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
- at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
- at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
- at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
- at scala.collection.AbstractTraversable.map(Traversable.scala:104)
- at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
- at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
- at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
- at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
- at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
- at com.atguigu.tabletest.MySqlOutPutTest$.main(MySqlOutPutTest.scala:60)
- at com.atguigu.tabletest.MySqlOutPutTest.main(MySqlOutPutTest.scala)
错误分析:主要看这一句
Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath.
大概意思是找不到合适的工厂类去创建表
解决办法:
在POM文件添加依赖即可
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-jdbc_2.11</artifactId>
- <version>1.10.0</version>
- </dependency>
最终成功将Flink流转换成的表的查询结果写入Mysql数据库
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。