当前位置:   article > 正文

使用了flink官方示例,尽然提交任务后报错了_org.apache.flink.client.program.programinvocatione

org.apache.flink.client.program.programinvocationexception: the main method

使用了flink官方示例,尽然提交任务后报错了


背景:参考flink的官方示例,开发FlinkSQL的ETL应用。ETL应用从kafka消费数据,经过数据处理,将处理后的结果数据下沉存储到Clickhouse中。

问题:工程的pom文件中明明依赖了相关连接器,本地运行正常。但打包提交应用到flink时,却报错确少连接器依赖。

错误日志

当pom.xml中依赖的Kafka连接器依赖放置在Clickhouse连接器前面,报错下面日志:


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a sink for writing table 'default_catalog.default_database.report'.

Table options are:

'connector'='clickhouse'
'database-name'='demo_test'
'password'=''
'sink.batch-size'='1'
'sink.flush-interval'='1000'
'sink.max-retries'='3'
'table-name'='report'
'url'='clickhouse://127.0.0.1:8123/'
'username'=''
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:360)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:248)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1058)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.report'.

Table options are:

'connector'='clickhouse'
'database-name'='demo_test'
'password'=''
'sink.batch-size'='1'
'sink.flush-interval'='1000'
'sink.max-retries'='3'
'table-name'='report'
'url'='clickhouse://127.0.0.1:8123/'
'username'=''
        at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:156)
        at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:362)
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:220)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
        at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:98)
        at com.hundsun.light.dem.analysis.job.KafkaToClickhouseJob.execute(KafkaToClickhouseJob.java:81)
        at com.hundsun.light.dem.analysis.job.KafkaToClickhouseJob.main(KafkaToClickhouseJob.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
        ... 8 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='clickhouse'
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:367)
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:354)
        at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:152)
        ... 31 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'clickhouse' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
kafka
print
upsert-kafka
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:235)
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:363)
        ... 33 more

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

当pom.xml中依赖的KafKa连接器依赖放置在Clickhouse连接器后面,报错下面日志:


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.dem_event'.

Table options are:

'connector'='kafka'
'format'='json'
'json.fail-on-missing-field'='false'
'json.ignore-parse-errors'='true'
'properties.bootstrap.servers'='127.0.0.1:9092'
'properties.group.id'='test'
'scan.startup.mode'='group-offsets'
'topic'='testTopic'
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:360)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:248)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1058)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.dem_event'.

Table options are:

'connector'='kafka'
'format'='json'
'json.fail-on-missing-field'='false'
'json.ignore-parse-errors'='true'
'properties.bootstrap.servers'='127.0.0.1:9092'
'properties.group.id'='test'
'scan.startup.mode'='group-offsets'
'topic'='testTopic'
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:254)
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
        at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:902)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:871)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:564)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
        at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:50)
        at com.hundsun.light.dem.analysis.job.KafkaToClickhouseJob.lambda$execute$1(KafkaToClickhouseJob.java:79)
        at java.util.HashMap.forEach(HashMap.java:1289)
        at com.hundsun.light.dem.analysis.job.KafkaToClickhouseJob.execute(KafkaToClickhouseJob.java:77)
        at com.hundsun.light.dem.analysis.job.KafkaToClickhouseJob.main(KafkaToClickhouseJob.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
        ... 8 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kafka'
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:367)
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:354)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
        ... 40 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
clickhouse
datagen
filesystem
print
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:235)
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:363)
        ... 42 more

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
连接器

kafka连接器: 应用使用的kafka连接器,消费Kafka数据,其中依赖如下:


 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-connector-kafka_2.11</artifactId>
     <version>1.12.0</version>
 </dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Clickhouse连接器:应用使用Clinckhouse连接器,下沉存储处理好的数据,其依赖如下:


<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>flink-connector-clickhouse</artifactId>
    <version>1.12.0</version>
</dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
应用的打包插件 (参考flink官方示例demo)
... ...

<build>
    <plugins>
        <!-- Java Compiler -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>${java.version}</source>
                <target>${java.version}</target>
            </configuration>
        </plugin>

        <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
        <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <!-- Run shade goal on package phase -->
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.flink:force-shading</exclude>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>spendreport.FraudDetectionJob</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
... ...

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
问题分析

工程本地运行正常,打包提交flink时报缺少依赖,分析问题可能出在打包。查看打包日志,打包日志明确输出包含Kafka、Clickhouse连接器依赖。解压打好的Jar包,在解压的文件里有Kafka、Clinkhouse连接器的依赖文件。以上确实证明打包将Kafka、Clinkhouse连接器依赖打进提交的应用Jar包中。


... 

[INFO] Including org.apache.flink:flink-connector-base:jar:1.12.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-asm-7:jar:7.1-12.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.12.0 in the shaded jar.  //kafka连接器依赖
[INFO] Including org.apache.kafka:kafka-clients:jar:2.4.1 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.4.3-1 in the shaded jar.
[INFO] Including com.aliyun:flink-connector-clickhouse:jar:1.12.0 in the shaded jar.  //Clickhouse连接器
[INFO] Including org.apache.flink:flink-json:jar:1.12.0 in the shaded jar.
[INFO] Including com.alibaba:fastjson:jar:1.2.76 in the shaded jar.

...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

提交应用Jar包中既然存在需要的Kafka、Clinkhouse连接器的依赖。怎么提交flink时会报缺少连接器依赖呢?问题出在那里呢?
分析报错的日志,发现两种依赖报错信息是一致的。工程pom.xml的依赖顺序中最前面的连接器都能成功被加载成可用的factory。那这个可用的factory是什么呢。从日志中可以看到是实现了org.apache.flink.table.factories.DynamicTableFactory接口的工厂类。


# 在工程pom.xml中kafka连接器依赖在Clickhouse连接器依赖前
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'clickhouse' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
kafka
print
upsert-kafka

# 在工程pom.xml中Clickhouse连接器依赖在kafka连接器依赖前面
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath

Available factory identifiers are:

blackhole
clickhouse
datagen
filesystem
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

从上面分析可知,错误来源于没有找到实现org.apache.flink.factories.DynamicTableFactory的工厂。现在去看看flink源码。

  • DynamicTableFactory 源码

DynamicTableFactory是个接口类,继承自Factory接口类

public interface DynamicTableFactory extends Factory {

    /** Provides catalog and session information describing the dynamic table to be accessed. */
    interface Context {

        /** Returns the identifier of the table in the {@link Catalog}. */
        ObjectIdentifier getObjectIdentifier();

        /**
         * Returns the resolved table information received from the {@link Catalog}.
         *
         * <p>The {@link ResolvedCatalogTable} forwards the metadata from the catalog but offers a
         * validated {@link ResolvedSchema}. The original metadata object is available via {@link
         * ResolvedCatalogTable#getOrigin()}.
         *
         * <p>In most cases, a factory is interested in the following two characteristics:
         *
         * <pre>{@code
         * // get the physical data type to initialize the connector
         * context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType()
         *
         * // get primary key information if the connector supports upserts
         * context.getCatalogTable().getResolvedSchema().getPrimaryKey()
         * }</pre>
         *
         * <p>Other characteristics such as metadata columns or watermarks will be pushed down into
         * the created {@link DynamicTableSource} or {@link DynamicTableSink} during planning
         * depending on the implemented ability interfaces.
         */
        ResolvedCatalogTable getCatalogTable();

        /** Gives read-only access to the configuration of the current session. */
        ReadableConfig getConfiguration();

        /**
         * Returns the class loader of the current session.
         *
         * <p>The class loader is in particular useful for discovering further (nested) factories.
         */
        ClassLoader getClassLoader();

        /** Whether the table is temporary. */
        boolean isTemporary();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • Factory接口类源码
package org.apache.flink.table.factories;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;

import java.util.Set;

/**
 * Base interface for all kind of factories that create object instances from a list of key-value
 * pairs in Flink's Table & SQL API.
 *
 * <p>A factory is uniquely identified by {@link Class} and {@link #factoryIdentifier()}.
 *
 * <p>The list of available factories is discovered using Java's Service Provider Interfaces (SPI).
 * Classes that implement this interface can be added to {@code
 * META_INF/services/org.apache.flink.table.factories.Factory} in JAR files.
 *
 * <p>Every factory declares a set of required and optional options. This information will not be
 * used during discovery but is helpful when generating documentation and performing validation. A
 * factory may discover further (nested) factories, the options of the nested factories must not be
 * declared in the sets of this factory.
 *
 * <p>It is the responsibility of each factory to perform validation before returning an instance.
 *
 * <p>For consistency, the following style for key names of {@link ConfigOption} is recommended:
 *
 * <ul>
 *   <li>Try to <b>reuse</b> key names as much as possible. Use other factory implementations as an
 *       example.
 *   <li>Key names should be declared in <b>lower case</b>. Use "-" instead of dots or camel case to
 *       split words.
 *   <li>Key names should be <b>hierarchical</b> where appropriate. Think about how one would define
 *       such a hierarchy in JSON or YAML file (e.g. {@code sink.bulk-flush.max-actions}).
 *   <li>In case of a hierarchy, try not to use the higher level again in the key name (e.g. do
 *       {@code sink.partitioner} instead of {@code sink.sink-partitioner}) to <b>keep the keys
 *       short</b>.
 *   <li>Key names which can be templated, e.g. to refer to a specific column, should be listed
 *       using '#' as the placeholder symbol. For example, use {@code fields.#.min}.
 * </ul>
 */
@PublicEvolving
public interface Factory {

    /**
     * Returns a unique identifier among same factory interfaces.
     *
     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
     * kafka}). If multiple factories exist for different versions, a version should be appended
     * using "-" (e.g. {@code elasticsearch-7}).
     */
    String factoryIdentifier();

    /**
     * Returns a set of {@link ConfigOption} that an implementation of this factory requires in
     * addition to {@link #optionalOptions()}.
     *
     * <p>See the documentation of {@link Factory} for more information.
     */
    Set<ConfigOption<?>> requiredOptions();

    /**
     * Returns a set of {@link ConfigOption} that an implementation of this factory consumes in
     * addition to {@link #requiredOptions()}.
     *
     * <p>See the documentation of {@link Factory} for more information.
     */
    Set<ConfigOption<?>> optionalOptions();
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

从源码接口类上注释可以清楚的看到,org.apache.flink.table.factories.Factory是所有种类工厂的基础接口。这些工厂是从Flink’s Table &SQL API的键值对列表实例化的。
拿SQL API来说就就是使用我们定义sql时的with()片段实例化工厂。

# with片段示例
CREATE Table `tableName`(...) WITH (
    'connector'='kafka',
    'format'='json',
    'json.fail-on-missing-field'='false',
    'json.ignore-parse-errors'='true',
    'properties.bootstrap.servers'='127.0.0.1:9092',
    'properties.group.id'='test',
    'scan.startup.mode'='group-offsets',
    'topic'='testTopic',
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

从源码接口类的注释上,可以看到工厂接口类的实现类能被发现,使用的是java的SPI(Service Provider Interfaces)。工厂接口的实现类的全限定类名会被配置到Jar文件的META_INF/services/org.apache.flink.table.factories.Factory文件中。 到这里实例化工厂实现类时是怎么发现工厂的实现类的已经很清楚了。

接下来回到文章前面报错的job的JAR文件中,查看META_INF/services/org.apache.flink.table.factories.Factory文件,查看结果如下:

  • 在工程pom.xml中Clickhouse连接器依赖在kafka连接器依赖前面, META_INF/services/org.apache.flink.table.factories.Factory文件内容:

com.aliyun.flink.connector.clickhouse.table.ClickHouseDynamicTableFactory

没有kafka连接器依赖的工厂实现类信息。

  • 在工程pom.xml中kafka连接器依赖在Clickhouse连接器依赖前面,META_INF/services/org.apache.flink.table.factories.Factory文件内容:

org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory

没有Clickhouse连接器依赖的工厂实现类信息。

到这里文章前面依赖报错的问题很清楚了,就是因为打的job的JAR包的META_INF/services/org.apache.flink.table.factories.Factory的文件中总是没有工程pom.xml文件第二顺序的连接器依赖的工厂实现类信息。提交运行时报错找不到第二个连接器的工厂实现类。

问题的解决

为什么会出现META_INF/services/org.apache.flink.table.factories.Factory的文件信息写不全呢?是因为多个依赖构件中的META-INF/services/org.apache.table.factories.Factory文件不能合并造成的。要解决这个问题,还是要回到打包上。
打包使用的是"maven-shade-plugin"打包插件,打包时需要将多个构件中的class文件或资源文件聚合。资源的聚合需要shade插件提供了丰富的Transformer工具类。其中 ServicesResourceTransformer是解决SPI文件的聚合。

<!--JDK的服务发现机制是基于META-INF/services/目录的,如果同一接口存在多个实现需要合并 ,则可以使用此Transformer-->
<configuration>
    <transformers>
        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
    </transformers>
</configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

修改工程中pom.xml的打包配置,(配置如下)打包提交Job ,解决报错问题。

...
<build>
    <plugins>
        <!-- Java Compiler -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>${java.version}</source>
                <target>${java.version}</target>
            </configuration>
        </plugin>

        <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
        <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <!-- Run shade goal on package phase -->
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.flink:force-shading</exclude>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        </transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>spendreport.FraudDetectionJob</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
... 

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
总结

整个问题涉及到的知识点还是比较多的,通过详细记录问题的排查过程,对此做个总结,同时希望能给遇到同类问题的人有所帮助。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/214029
推荐阅读
相关标签
  

闽ICP备14008679号