Flink 1.13 StateBackend 与 CheckpointStorage 拆分 作者:马育民 • 2025-08-12 11:27 • 阅读:10001 https://cloud.tencent.com/developer/article/1945549 # 例子 ### 前面通用代码 ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 开启 Checkpoint,间隔为 3 分钟 env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3)); ``` ### 设置 Checkpoint #### 保存到 RocksDB ``` // ############# RocksDB #################### env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // hdfs://192.168.18.199:9000/sea-flink/ck // 保存到本机 env.getCheckpointConfig().setCheckpointStorage("file:///home/sea/Desktop/xx"); env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///home/sea/Desktop/xx")); ``` #### 保存到 HDFS ``` //HashMapStateBackend System.setProperty("HADOOP_USER_NAME","hadoop"); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.18.199:9000/sea-flink/ck"); env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://192.168.18.199:9000/sea-flink/ck")); ``` ### 后面通用代码 ``` CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 最小间隔 4 分钟 checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4)); // 超时时间 10 分钟 checkpointConfig.setCheckpointTimeout(Duration.ofMinutes(10L).toMillis()); checkpointConfig.setMaxConcurrentCheckpoints(1); checkpointConfig.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.enableUnalignedCheckpoints(); ``` 原文出处:http://www.malaoshi.top/show_1GW1f4jJK1IH.html