当前位置:   article > 正文

flink本地模式使用rocksdb进行checkpoint报错:RocksDBException: Failed to create a NewWriteableFile_flink 本地启动rocksdb

flink 本地启动rocksdb

环境

开发机器:win10

java:8

flink:1.16.2

hudi:0.14.1

现象

在进行flink+hudi升级的时候本地测试出现了异常,报错如下

  1. 2024-06-27 17:13:16.659 [bucket_assigner (1/2)#0] WARN org.apache.flink.runtime.taskmanager.Task - bucket_assigner (1/2)#0 (be7ca740282c17e0d01630e88b5f5905_eae0775fbc9b695162a640f9f20e7bb9_0_0) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 1 for operator bucket_assigner (1/2)#0.
  2. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1243)
  3. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
  4. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
  5. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
  6. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
  7. at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
  8. at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
  9. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
  10. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
  11. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
  12. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
  13. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
  14. at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
  15. at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
  16. at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545)
  17. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
  18. at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)
  19. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785)
  20. at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
  21. at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
  22. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
  23. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
  24. at java.lang.Thread.run(Thread.java:748)
  25. Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator bucket_assigner (1/2)#0. Failure reason: Checkpoint was declined.
  26. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
  27. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
  28. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
  29. at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
  30. at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
  31. at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
  32. at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
  33. at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
  34. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1286)
  35. at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
  36. at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1274)
  37. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1231)
  38. ... 22 more
  39. Caused by: org.rocksdb.RocksDBException: Failed to create a NewWriteableFile: C:\Users\Administrator\AppData\Local\Temp\minicluster_76d0223f7cd18f01de3a67a01eee7fa2\tm_0\tmp\job_e2784be18f17fbba96d46813105438ef_op_KeyedProcessOperator_eae0775fbc9b695162a640f9f20e7bb9__1_2__uuid_10af1fed-db05-4b12-bd81-457078114132\chk-1.tmp/MANIFEST-000004: ϵͳÕҲ»µ½ָ¶
  40. at org.rocksdb.Checkpoint.createCheckpoint(Native Method)
  41. at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51)
  42. at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.takeDBNativeCheckpoint(RocksDBSnapshotStrategyBase.java:172)
  43. at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:158)
  44. at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:78)
  45. at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
  46. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:593)
  47. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:246)
  48. ... 33 more

 

代码刚启动的时候,看日志没有什么问题。

过了一会就报错了,很明显是checkpoint的时候失败了......详细看是rocksdb的异常,主要是下面这一句。

Failed to create a NewWriteableFile: C:\Users\Administrator\AppData\Local\Temp\minicluster_76d0223f7cd18f01de3a67a01eee7fa2\tm_0\tmp\job_e2784be18f17fbba96d46813105438ef_op_KeyedProcessOperator_eae0775fbc9b695162a640f9f20e7bb9__1_2__uuid_10af1fed-db05-4b12-bd81-457078114132\chk-1.tmp/MANIFEST-000004: ϵͳÕҲ»µ½ָ¶

在临时目录创建文件失败,报错message是乱码ϵͳÕҲ»µ½ָ¶(看不懂。。有没有win大神解答一下),刚猜想是权限问题,重启idea使用管理员身份运行,再次本地运行flink,然并卵还是报同样的错误。

之前使用1.13.5版本的时候也没出现这种问题,怎么新版本就出现了啊T_T,(其实还怀疑是idea的问题,有时间的话应该尝试在win10搭个flink集群跑一下的)

 

解决

对于C盘这路径还是感觉权限问题,所以在想应该使用其他路径去替换。

首先从报错堆栈里跟踪一下代码,进入org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend#snapshot

  1. //keyedstate进行checkpoint快照
  2. public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
  3. final long checkpointId,
  4. final long timestamp,
  5. @Nonnull final CheckpointStreamFactory streamFactory,
  6. @Nonnull CheckpointOptions checkpointOptions)
  7. throws Exception {
  8. // flush everything into db before taking a snapshot
  9. writeBatchWrapper.flush();
  10. /*
  11. checkpointSnapshotStrategy就是checkpoint的策略,里面有个属性instanceBasePath就是C:\Users\Administrator\AppData\Local\Temp\minicluster....这个值
  12. 这样只要能够修改到instanceBasePath为其他路径就行了
  13. */
  14. return new SnapshotStrategyRunner<>(
  15. checkpointSnapshotStrategy.getDescription(),
  16. checkpointSnapshotStrategy,
  17. cancelStreamRegistry,
  18. ASYNCHRONOUS)
  19. .snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
  20. }

往snapshot方法上层跟踪到EmbeddedRocksDBStateBackend的createKeyedStateBackend方法

  1. public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
  2. Environment env,
  3. JobID jobID,
  4. String operatorIdentifier,
  5. TypeSerializer<K> keySerializer,
  6. int numberOfKeyGroups,
  7. KeyGroupRange keyGroupRange,
  8. TaskKvStateRegistry kvStateRegistry,
  9. TtlTimeProvider ttlTimeProvider,
  10. MetricGroup metricGroup,
  11. @Nonnull Collection<KeyedStateHandle> stateHandles,
  12. CloseableRegistry cancelStreamRegistry,
  13. double managedMemoryFraction)
  14. throws IOException {
  15. // first, make sure that the RocksDB JNI library is loaded
  16. // we do this explicitly here to have better error handling
  17. String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath();
  18. ensureRocksDBIsLoaded(tempDir);
  19. // replace all characters that are not legal for filenames with underscore
  20. String fileCompatibleIdentifier = operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_");
  21. //初始化
  22. lazyInitializeForJob(env, fileCompatibleIdentifier);
  23. //这里生成了rocksdb实例的路径
  24. File instanceBasePath =
  25. new File(
  26. getNextStoragePath(),
  27. "job_"
  28. + jobId
  29. + "_op_"
  30. + fileCompatibleIdentifier
  31. + "_uuid_"
  32. + UUID.randomUUID());
  33. .......
  34. }
  35. private void lazyInitializeForJob(
  36. Environment env, @SuppressWarnings("unused") String operatorIdentifier)
  37. throws IOException {
  38. .......
  39. //这里能够看出来就是从flink的env获取出来的,那就找一下有没有配置项吧
  40. // initialize the paths where the local RocksDB files should be stored
  41. if (localRocksDbDirectories == null) {
  42. initializedDbBasePaths = new File[] {env.getTaskManagerInfo().getTmpWorkingDirectory()};
  43. } else {
  44. ......
  45. }
  46. .......
  47. }

现在就是找到这个WorkingDirectory的设置方法就行了

根据tmpWorkingDirectory属性继续往上跟到org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager,因为是本地模式使用的是MiniCluster,在start方法中找到workingDirectory的设置。

明显是从配置项process.working-dir里面读取的,如果没有设置process.working-dir的值,默认会获取系统变量java.io.tmpdir的值

解决方法:在我们的flink程序里面configuration.set(process.working-dir, "D://workspace/temp") 或者 命令启动在上-D process.working-dir=xxxxx

另外flink table config设置里面有点小坑,下次再记录一下

  1. public void start() throws Exception {
  2. synchronized (lock) {
  3. .......
  4. final Configuration configuration = miniClusterConfiguration.getConfiguration();
  5. final boolean useSingleRpcService =
  6. miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
  7. try {
  8. //这里对workingDirectory 进行生成,generateWorkingDirectoryFile就是根据PROCESS_WORKING_DIR_BASE生成的
  9. workingDirectory =
  10. WorkingDirectory.create(
  11. ClusterEntrypointUtils.generateWorkingDirectoryFile(
  12. configuration,
  13. Optional.of(PROCESS_WORKING_DIR_BASE),
  14. "minicluster_" + ResourceID.generate()));
  15. ......
  16. }

 

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号