From fdf95085de2d644378115a674fa0cd55847589de Mon Sep 17 00:00:00 2001 From: "hupenghui@qniao.cn" <1049970895> Date: Wed, 7 Sep 2022 00:14:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/qniao/iot/IotMonitoringDataJob.java | 20 ++++++++++++------- src/test/java/Demo2.java | 8 +++++--- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index 03417db..59c8871 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -10,6 +10,7 @@ import com.qniao.iot.machine.command.MachineOutputCommand; import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.state.*; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.CheckpointingMode; @@ -68,8 +69,6 @@ public class IotMonitoringDataJob { return requestConfigBuilder; })); - private static final ReentrantLock lock = new ReentrantLock(true); - /** * 当前索引日期后缀 */ @@ -107,9 +106,19 @@ public class IotMonitoringDataJob { @Override public void open(Configuration parameters) { + // 设置10分钟的过期时间 + StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10)) + .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) + .build(); + + ValueStateDescriptor deviceTotalDataValue = new ValueStateDescriptor<>("deviceTotalData", + TypeInformation.of(DeviceTotalData.class)); + // 设置状态值的过期时间,为了解决手动插入数据但是状态值不同步的问题 + deviceTotalDataValue.enableTimeToLive(ttlConfig); + // 必须在 open 生命周期初始化 - deviceTotalDataStat = getRuntimeContext() - .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class))); + deviceTotalDataStat = getRuntimeContext().getState(deviceTotalDataValue); } @@ -448,7 +457,6 @@ public class IotMonitoringDataJob { try { boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); if (!exists) { - lock.lock(); // 创建索引 CreateIndexRequest request = new CreateIndexRequest(indicesName); // 字段映射 @@ -506,8 +514,6 @@ public class IotMonitoringDataJob { } } catch (Exception e) { e.printStackTrace(); - }finally { - lock.unlock(); } } } diff --git a/src/test/java/Demo2.java b/src/test/java/Demo2.java index e5fc9b8..73be48b 100644 --- a/src/test/java/Demo2.java +++ b/src/test/java/Demo2.java @@ -5,6 +5,7 @@ import cn.hutool.json.JSONUtil; import java.time.LocalDate; import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; public class Demo2 { @@ -22,8 +23,9 @@ public class Demo2 { System.out.println(reportDate);*/ - Long a = null; - long b = a; - System.out.println(b); + LocalDate reportDate = new java.util.Date(1662471808657L) + .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); + String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); + System.out.println(indexDateSuffix); } }