Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
fdf95085de
2 changed files with 18 additions and 10 deletions
  1. 20
      src/main/java/com/qniao/iot/IotMonitoringDataJob.java
  2. 8
      src/test/java/Demo2.java

20
src/main/java/com/qniao/iot/IotMonitoringDataJob.java

@ -10,6 +10,7 @@ import com.qniao.iot.machine.command.MachineOutputCommand;
import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema; import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
@ -68,8 +69,6 @@ public class IotMonitoringDataJob {
return requestConfigBuilder; return requestConfigBuilder;
})); }));
private static final ReentrantLock lock = new ReentrantLock(true);
/** /**
* 当前索引日期后缀 * 当前索引日期后缀
*/ */
@ -107,9 +106,19 @@ public class IotMonitoringDataJob {
@Override @Override
public void open(Configuration parameters) { public void open(Configuration parameters) {
// 设置10分钟的过期时间
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<DeviceTotalData> deviceTotalDataValue = new ValueStateDescriptor<>("deviceTotalData",
TypeInformation.of(DeviceTotalData.class));
// 设置状态值的过期时间为了解决手动插入数据但是状态值不同步的问题
deviceTotalDataValue.enableTimeToLive(ttlConfig);
// 必须在 open 生命周期初始化 // 必须在 open 生命周期初始化
deviceTotalDataStat = getRuntimeContext()
.getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
deviceTotalDataStat = getRuntimeContext().getState(deviceTotalDataValue);
} }
@ -448,7 +457,6 @@ public class IotMonitoringDataJob {
try { try {
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (!exists) { if (!exists) {
lock.lock();
// 创建索引 // 创建索引
CreateIndexRequest request = new CreateIndexRequest(indicesName); CreateIndexRequest request = new CreateIndexRequest(indicesName);
// 字段映射 // 字段映射
@ -506,8 +514,6 @@ public class IotMonitoringDataJob {
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
}finally {
lock.unlock();
} }
} }
} }

8
src/test/java/Demo2.java

@ -5,6 +5,7 @@ import cn.hutool.json.JSONUtil;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
public class Demo2 { public class Demo2 {
@ -22,8 +23,9 @@ public class Demo2 {
System.out.println(reportDate);*/ System.out.println(reportDate);*/
Long a = null;
long b = a;
System.out.println(b);
LocalDate reportDate = new java.util.Date(1662471808657L)
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
System.out.println(indexDateSuffix);
} }
} }
Loading…
Cancel
Save