赞
踩
开发机器:win10
java:8
flink:1.16.2
hudi:0.14.1
在进行flink+hudi升级的时候本地测试出现了异常,报错如下
- 2024-06-27 17:13:16.659 [bucket_assigner (1/2)#0] WARN org.apache.flink.runtime.taskmanager.Task - bucket_assigner (1/2)#0 (be7ca740282c17e0d01630e88b5f5905_eae0775fbc9b695162a640f9f20e7bb9_0_0) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 1 for operator bucket_assigner (1/2)#0.
- at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1243)
- at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
- at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
- at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
- at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
- at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
- at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
- at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
- at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
- at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
- at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
- at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
- at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
- at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545)
- at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785)
- at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
- at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
- at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
- at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
- at java.lang.Thread.run(Thread.java:748)
- Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator bucket_assigner (1/2)#0. Failure reason: Checkpoint was declined.
- at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
- at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
- at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
- at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
- at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
- at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
- at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
- at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1286)
- at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1274)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1231)
- ... 22 more
- Caused by: org.rocksdb.RocksDBException: Failed to create a NewWriteableFile: C:\Users\Administrator\AppData\Local\Temp\minicluster_76d0223f7cd18f01de3a67a01eee7fa2\tm_0\tmp\job_e2784be18f17fbba96d46813105438ef_op_KeyedProcessOperator_eae0775fbc9b695162a640f9f20e7bb9__1_2__uuid_10af1fed-db05-4b12-bd81-457078114132\chk-1.tmp/MANIFEST-000004: ϵͳÕҲ»µ½ָ¶
- at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
- at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
- at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.takeDBNativeCheckpoint(RocksDBSnapshotStrategyBase.java:172)
- at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:158)
- at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:78)
- at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
- at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:593)
- at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:246)
- ... 33 more

代码刚启动的时候,看日志没有什么问题。
过了一会就报错了,很明显是checkpoint的时候失败了......详细看是rocksdb的异常,主要是下面这一句。
Failed to create a NewWriteableFile: C:\Users\Administrator\AppData\Local\Temp\minicluster_76d0223f7cd18f01de3a67a01eee7fa2\tm_0\tmp\job_e2784be18f17fbba96d46813105438ef_op_KeyedProcessOperator_eae0775fbc9b695162a640f9f20e7bb9__1_2__uuid_10af1fed-db05-4b12-bd81-457078114132\chk-1.tmp/MANIFEST-000004: ϵͳÕҲ»µ½ָ¶
在临时目录创建文件失败,报错message是乱码ϵͳÕҲ»µ½ָ¶(看不懂。。有没有win大神解答一下),刚猜想是权限问题,重启idea使用管理员身份运行,再次本地运行flink,然并卵还是报同样的错误。
之前使用1.13.5版本的时候也没出现这种问题,怎么新版本就出现了啊T_T,(其实还怀疑是idea的问题,有时间的话应该尝试在win10搭个flink集群跑一下的)
对于C盘这路径还是感觉权限问题,所以在想应该使用其他路径去替换。
首先从报错堆栈里跟踪一下代码,进入org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend#snapshot
- //keyedstate进行checkpoint快照
- public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
- final long checkpointId,
- final long timestamp,
- @Nonnull final CheckpointStreamFactory streamFactory,
- @Nonnull CheckpointOptions checkpointOptions)
- throws Exception {
-
- // flush everything into db before taking a snapshot
- writeBatchWrapper.flush();
- /*
- checkpointSnapshotStrategy就是checkpoint的策略,里面有个属性instanceBasePath就是C:\Users\Administrator\AppData\Local\Temp\minicluster....这个值
- 这样只要能够修改到instanceBasePath为其他路径就行了
- */
- return new SnapshotStrategyRunner<>(
- checkpointSnapshotStrategy.getDescription(),
- checkpointSnapshotStrategy,
- cancelStreamRegistry,
- ASYNCHRONOUS)
- .snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
- }

往snapshot方法上层跟踪到EmbeddedRocksDBStateBackend的createKeyedStateBackend方法
- public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
- Environment env,
- JobID jobID,
- String operatorIdentifier,
- TypeSerializer<K> keySerializer,
- int numberOfKeyGroups,
- KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry,
- TtlTimeProvider ttlTimeProvider,
- MetricGroup metricGroup,
- @Nonnull Collection<KeyedStateHandle> stateHandles,
- CloseableRegistry cancelStreamRegistry,
- double managedMemoryFraction)
- throws IOException {
- // first, make sure that the RocksDB JNI library is loaded
- // we do this explicitly here to have better error handling
- String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath();
- ensureRocksDBIsLoaded(tempDir);
-
- // replace all characters that are not legal for filenames with underscore
- String fileCompatibleIdentifier = operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_");
- //初始化
- lazyInitializeForJob(env, fileCompatibleIdentifier);
- //这里生成了rocksdb实例的路径
- File instanceBasePath =
- new File(
- getNextStoragePath(),
- "job_"
- + jobId
- + "_op_"
- + fileCompatibleIdentifier
- + "_uuid_"
- + UUID.randomUUID());
- .......
- }
-
- private void lazyInitializeForJob(
- Environment env, @SuppressWarnings("unused") String operatorIdentifier)
- throws IOException {
- .......
- //这里能够看出来就是从flink的env获取出来的,那就找一下有没有配置项吧
- // initialize the paths where the local RocksDB files should be stored
- if (localRocksDbDirectories == null) {
- initializedDbBasePaths = new File[] {env.getTaskManagerInfo().getTmpWorkingDirectory()};
- } else {
- ......
- }
- .......
- }

现在就是找到这个WorkingDirectory的设置方法就行了
根据tmpWorkingDirectory属性继续往上跟到org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager,因为是本地模式使用的是MiniCluster,在start方法中找到workingDirectory的设置。
明显是从配置项process.working-dir里面读取的,如果没有设置process.working-dir的值,默认会获取系统变量java.io.tmpdir的值
解决方法:在我们的flink程序里面configuration.set(process.working-dir, "D://workspace/temp") 或者 命令启动在上-D process.working-dir=xxxxx
另外flink table config设置里面有点小坑,下次再记录一下
- public void start() throws Exception {
- synchronized (lock) {
- .......
- final Configuration configuration = miniClusterConfiguration.getConfiguration();
- final boolean useSingleRpcService =
- miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
-
- try {
- //这里对workingDirectory 进行生成,generateWorkingDirectoryFile就是根据PROCESS_WORKING_DIR_BASE生成的
- workingDirectory =
- WorkingDirectory.create(
- ClusterEntrypointUtils.generateWorkingDirectoryFile(
- configuration,
- Optional.of(PROCESS_WORKING_DIR_BASE),
- "minicluster_" + ResourceID.generate()));
- ......
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。