From 0b81f592581f6425dc69397f24b2971d70e53970 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Sat, 6 May 2023 09:40:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/qniao/iot/IotMonitoringDataJob.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index d7638cd..9b5ada6 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -13,6 +13,7 @@ import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema; import com.qniao.iot.utils.DeviceMonitoringDataRabbitMqSerializationSchema; import com.rabbitmq.client.AMQP; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -84,6 +85,8 @@ public class IotMonitoringDataJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // 设置任务重启策略 + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10))); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); // 获取设备数据源 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()