概述

我们都知道,使用Scala或者Java写代码的时候可以配置Flink Checkpoint:

1
2
3
4
5
6
7
val env = StreamExecutionEnvironment.getExecutionEnvironment
.enableCheckpointing(5 * 60 * 1000)

val checkpointConfig = env.getCheckpointConfig
checkpointConfig.setMinPauseBetweenCheckpoints(2 * 60 * 1000)
checkpointConfig.setCheckpointTimeout(3 * 60 * 1000)
checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

但这对于其他并不善于写代码的同事来说是很麻烦的事情,难以维护。我们使用Flink Sql + Zeppelin不就是想尽可能地干掉代码,使用纯SQL+配置吗?

好在Flink已经支持了Checkpoint相关配置,接下来开始介绍。

Zeppelin配置Flink Checkpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
%flink.conf
# 开启Checkpoint,指定两次checkpoint开始调度之间的间隔,单位毫秒
# 当然,还会受到checkpoint并发数和min-pause影响
execution.checkpointing.interval 120000
# 开始下次Checkpoint时距离上一次Checkpoint完成后的最小时间间隔,单位毫秒
execution.checkpointing.min-pause 60000
# 如果某次Checkpoint超过此阈值还没完成,则将进行中的Checkpoint干掉作废,单位毫秒
execution.checkpointing.timeout 60000
# 当Cancel该job时也保留 Checkpoint,用于作业手动重启
# 此模式下我们必须在Cancel后需要手动删除Checkpoint文件。
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
# 从Checkpoint或Savepoint恢复时使用
# execution.savepoint.path hdfs:/flink/flink-checkpoints/a84fccc7d3ff03f0c111bb98e176e1da/chk-1

这样就配置好了Checkpoint。

关于Checkpoint更多详细配置请参考Flink官网
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html

从Checkpoint恢复

  1. 如果要准备修改作业需要重启,则先在Flink UI中记录下最新的Checkpoint地址:

  2. 随后在Zeppelin中暂停该任务:

  1. 修改完成后,请务必在%flink.conf中加入以下配置

    1
    execution.savepoint.path hdfs:/flink/flink-checkpoints/a84fccc7d3ff03f0c111bb98e176e1da/chk-1

    当然,路径换成我们刚才记录的。

  2. 重启该Notebook的Flink Interpreter,随后重新运行%flink.conf使得新配置生效。

  3. 最后,在Zeppelin重新提交该Flink任务,可观察到该任务从Checkpoint恢复:

本文章转载于 Apache Zeppelin 原创 蔡聘 侵删

本文章仅用于个人记录学习 转载请注明原作者