赞
踩
不多说,上异常信息:
2021-08-o5 20:06:12 ERROR: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:180) at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:47) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
异常:Could not forward element to next operator,这个异常对于熟悉Flink开发的大佬都知道什么原因,流数据中不允许字段为null。
review代码发现,所有传递的流数据(POJO/JSONObject),不存在为null值的情况,而且报出来次异常的element,是可以进行正常解析进行下游传递的,诡异的是job从checkpoint重启之后又恢复正常。
故此次问题将所有的POJO以及JSONObject转换为String进行传输。并优化日志输出,方便问题复现时定位问题(这一步真的很关键)。
job稳定运行了10天,在我以为就这样解决了的时候,它又双叒叕出现了,还好优化了日志记录,可以定位的具体的异常:
2021-08-20 07:06:48 ERROR:PascalAlarm: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:180) at com.ddmc.streaming.filter.DataSyncBurringPointV2FlatMap.flatMap(DataSyncBurringPointV2FlatMap.java:47) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) Caused by: java.lang.RuntimeException: An error occurred in ElasticsearchSink. at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:388) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:309) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ... 22 common frames omitted Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-3 [ACTIVE] at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) at java.lang.Thread.run(Thread.java:748)
Flink写入ES 产生了socketTimeOut Exception,但是从ES集群监控看,这个时间点的读写量都不高,为什么会产生超时呢?
ESSink代码如下:
private static ElasticsearchSink<String> getEsSinkFunc(String hostName, int port, String schema) { List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost(hostName, port, schema)); ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink .Builder<String>(httpHosts, new ElasticsearchSinkFunction<String>() { @Override public void process(String value, RuntimeContext ctx, RequestIndexer indexer) { try { DataSyncBurringPointDto element = JSON.parseObject(value, DataSyncBurringPointDto.class); indexer.add(createIndexRequest(element)); } catch (IOException e) { log.error("ES sink Error:{}",e.getMessage()); e.printStackTrace(); } } private IndexRequest createIndexRequest(DataSyncBurringPointDto element) throws IOException { return Requests.indexRequest() .index(Configurations.getString("es.index.prefix")+"-"+element.getDate()) .type("_doc") .id(getESId(element)) .source(XContentFactory .jsonBuilder() .startObject() .field("EventId",element.getEventId()) . . . .field("@timestamp", element.getTime()) .endObject() ); } }); esSinkBuilder.setBulkFlushMaxActions(Configurations.getInteger("es.v2.max.bulk.flush")); esSinkBuilder.setBulkFlushInterval(Configurations.getInteger("es.v2.bulk.flush.interval")); esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); esSinkBuilder.setRestClientFactory(new RestClientFactory() { @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { //设置ES用户名和密码 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(Configurations.getString("es.sensors.untreated.name"), Configurations.getString("es.sensors.untreated.passwd"))); restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.disableAuthCaching(); httpClientBuilder.setDefaultCredentialsProvider( credentialsProvider); return httpClientBuilder; } }); restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { @Override public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { return requestConfigBuilder .setConnectionRequestTimeout(1000 * 60 * 2) .setSocketTimeout(1000 * 60 * 2); } }); } }); return esSinkBuilder.build(); }
因为基本上可以确定,数据没有问题,ES集群也没有压力(有短时间的oldGC),所以就把排查重点放在了sink这一块,由于Flink写ES采用的Http方式,所以排查HttpAsyncClientBuilder类的参数发现:
private ConnectionKeepAliveStrategy keepAliveStrategy;
而ES sink使用的默认连接时间是-1。即永不过期
怀疑ES集群在OldGC时造成 http 长连接dead,而且不会创建新的连接,导致数据刷ES超时。
添加自定义的KeepAliveStrategy,这里维持时间设置为 5min
esSinkBuilder.setRestClientFactory(new RestClientFactory() { @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { //设置ES用户名和密码 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(Configurations.getString("es.sensors.untreated.name"), Configurations.getString("es.sensors.untreated.passwd"))); restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.disableAuthCaching(); httpClientBuilder.setDefaultCredentialsProvider( credentialsProvider); httpClientBuilder.setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis()); return httpClientBuilder; } }); restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { @Override public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { return requestConfigBuilder .setConnectionRequestTimeout(1000 * 60 * 2) .setSocketTimeout(1000 * 60 * 2); } }); } });
至此,困扰我一个月的问题似乎得到了解决。。。暂时记录一下,希望问题不要复现!
参考文章:
https://blog.csdn.net/yi_master/article/details/80595372
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。