当前位置:   article > 正文

记录一次Flink ElasticsearchSink SocketTimeoutException异常问题排查_org.apache.flink.streaming.runtime.tasks.exception

org.apache.flink.streaming.runtime.tasks.exceptioninchainedoperatorexception

ElasticsearchSink SocketTimeoutException异常问题排查解决

1.问题记录

不多说,上异常信息:

2021-08-o5 20:06:12 ERROR:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
	at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:180)
	at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:47)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2.第一次排查问题

异常:Could not forward element to next operator,这个异常对于熟悉Flink开发的大佬都知道什么原因,流数据中不允许字段为null。
review代码发现,所有传递的流数据(POJO/JSONObject),不存在为null值的情况,而且报出来次异常的element,是可以进行正常解析进行下游传递的,诡异的是job从checkpoint重启之后又恢复正常。
故此次问题将所有的POJO以及JSONObject转换为String进行传输。并优化日志输出,方便问题复现时定位问题(这一步真的很关键)。

3.问题复现

job稳定运行了10天,在我以为就这样解决了的时候,它又双叒叕出现了,还好优化了日志记录,可以定位的具体的异常:

2021-08-20 07:06:48 ERROR:PascalAlarm: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
	at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:180)
	at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:47)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
Caused by: java.lang.RuntimeException: An error occurred in ElasticsearchSink.
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:388)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:309)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	... 22 common frames omitted
Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-3 [ACTIVE]
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
	at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
	at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
	at java.lang.Thread.run(Thread.java:748)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

4.定位问题

Flink写入ES 产生了socketTimeOut Exception,但是从ES集群监控看,这个时间点的读写量都不高,为什么会产生超时呢?
ESSink代码如下:

 private static ElasticsearchSink<String> getEsSinkFunc(String hostName, int port, String schema) {
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost(hostName, port, schema));

        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink
                .Builder<String>(httpHosts, new ElasticsearchSinkFunction<String>() {

            @Override
            public void process(String value, RuntimeContext ctx, RequestIndexer indexer) {
                try {
                    DataSyncBurringPointDto element = JSON.parseObject(value, DataSyncBurringPointDto.class);
                    indexer.add(createIndexRequest(element));
                } catch (IOException e) {
                    log.error("ES sink Error:{}",e.getMessage());
                    e.printStackTrace();
                }
            }

            private IndexRequest createIndexRequest(DataSyncBurringPointDto element) throws IOException {
                return Requests.indexRequest()
                        .index(Configurations.getString("es.index.prefix")+"-"+element.getDate())
                        .type("_doc")
                        .id(getESId(element))
                        .source(XContentFactory
                                .jsonBuilder()
                                .startObject()
                                .field("EventId",element.getEventId())
								.
								.
								.
                                .field("@timestamp", element.getTime())
                                .endObject()
                        );
            }
        });
        esSinkBuilder.setBulkFlushMaxActions(Configurations.getInteger("es.v2.max.bulk.flush"));
        esSinkBuilder.setBulkFlushInterval(Configurations.getInteger("es.v2.bulk.flush.interval"));
        esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
        esSinkBuilder.setRestClientFactory(new RestClientFactory() {
            @Override
            public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
                //设置ES用户名和密码
               final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY,
                        new UsernamePasswordCredentials(Configurations.getString("es.sensors.untreated.name"),
                                Configurations.getString("es.sensors.untreated.passwd")));
                restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        httpClientBuilder.disableAuthCaching();
                        httpClientBuilder.setDefaultCredentialsProvider( credentialsProvider);
                        return httpClientBuilder;
                    }
                });
                restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
                    @Override
                    public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                        return requestConfigBuilder
                                .setConnectionRequestTimeout(1000 * 60 * 2)
                                .setSocketTimeout(1000 * 60 * 2);
                    }
                });
            }
        });
        return esSinkBuilder.build();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

因为基本上可以确定,数据没有问题,ES集群也没有压力(有短时间的oldGC),所以就把排查重点放在了sink这一块,由于Flink写ES采用的Http方式,所以排查HttpAsyncClientBuilder类的参数发现:
private ConnectionKeepAliveStrategy keepAliveStrategy;
而ES sink使用的默认连接时间是-1。即永不过期
怀疑ES集群在OldGC时造成 http 长连接dead,而且不会创建新的连接,导致数据刷ES超时。

5.问题解决

添加自定义的KeepAliveStrategy,这里维持时间设置为 5min

 esSinkBuilder.setRestClientFactory(new RestClientFactory() {
            @Override
            public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
                //设置ES用户名和密码
               final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY,
                        new UsernamePasswordCredentials(Configurations.getString("es.sensors.untreated.name"),
                                Configurations.getString("es.sensors.untreated.passwd")));
                restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        httpClientBuilder.disableAuthCaching();
                        httpClientBuilder.setDefaultCredentialsProvider( credentialsProvider);
                        httpClientBuilder.setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis());
                        return httpClientBuilder;
                    }
                });
                restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
                    @Override
                    public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                        return requestConfigBuilder
                                .setConnectionRequestTimeout(1000 * 60 * 2)
                                .setSocketTimeout(1000 * 60 * 2);
                    }
                });
            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

至此,困扰我一个月的问题似乎得到了解决。。。暂时记录一下,希望问题不要复现!

参考文章:
https://blog.csdn.net/yi_master/article/details/80595372

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/220561
推荐阅读
相关标签
  

闽ICP备14008679号