Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
1165bc2afb
4 changed files with 79 additions and 24 deletions
  1. 3
      .gitignore
  2. 88
      src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java
  3. 1
      src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java
  4. 11
      src/test/java/Demo1.java

3
.gitignore

@ -0,0 +1,3 @@
/target/*
/.idea/*
/*.iml

88
src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java

@ -1,5 +1,6 @@
package com.qniao.iot.gizwits;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.json.JSONUtil;
@ -163,8 +164,8 @@ public class GizWitsIotMonitoringDataJob {
List<MachineIotDataReceivedEvent> receivedEventList = new ArrayList<>();
EsRestClientUtil.queryDeviceListPageResult(searchSourceBuilder,
receivedEventList::add, MachineIotDataReceivedEvent.class, getIndicesList());
List<Tuple2<Long, Long>> tuple3List = statistics(receivedEventList);
//List<Tuple2<Long, Long>> tuple3List = statistics(receivedEventList);
return null;
}
/**
@ -172,35 +173,80 @@ public class GizWitsIotMonitoringDataJob {
* @param receivedEventList
* @return 时长数量
*/
private List<Tuple2<Long, Long>> statistics(List<MachineIotDataReceivedEvent> receivedEventList) {
Map<String, Long> map = new HashMap<>();
MachineIotDataReceivedEvent firstEvent;
Integer nextPwrStat;
ArrayList<Integer> nextWorkingStatList;
boolean isHasWaitingWork = false;
private List<Map<String, Long>> statistics(List<MachineIotDataReceivedEvent> receivedEventList) {
List<Map<String, Long>> mapList = new ArrayList<>();
MachineIotDataReceivedEvent startEvent = null;
List<Integer> nextPwrStatList = ListUtil.toList(0, 1);
ArrayList<Integer> nextWorkingStatList = ListUtil.toList(0, 1, 2);
Map<String, Long> workingJobMap = new HashMap<>(2);
Map<String, Long> map = new HashMap<>(2);
MachineIotDataReceivedEvent waitJobEvent = null;
// 一个工作周期期间是否待机过
boolean isHadWaitJob = false;
for (int i = 0; i < receivedEventList.size(); i++) {
MachineIotDataReceivedEvent receivedEvent = receivedEventList.get(i);
firstEvent = receivedEvent;
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
Long reportTime = receivedEvent.getReportTime();
map.clear();
if(nextPwrStatList.contains(machinePwrStat) && nextWorkingStatList.contains(machineWorkingStat)) {
if (i == 0) {
Instant instant = Instant.ofEpochMilli(reportTime * 1000);
ZoneId zone = ZoneId.systemDefault();
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, zone);
LocalDateTime startTime = LocalDateTime.of(localDateTime.toLocalDate(), LocalTime.MIN);
long l = Duration.between(startTime, localDateTime).get(SECONDS);
map.put("currJobDuration", l);
map.put("currJobCount", receivedEvent.getCurrJobCount());
nextPwrStat = 1;
LocalDateTime startTime;
if(startEvent == null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault());
startTime = LocalDateTime.of(localDateTime.toLocalDate(), LocalTime.MIN);
}else {
startTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(startEvent.getReportTime() * 1000),
ZoneId.systemDefault());
}
if(machinePwrStat.equals(0)) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
long l = Duration.between(startTime, localDateTime).get(SECONDS);
if(isHadWaitJob) {
map.put("currJobDuration", workingJobMap.get("currJobDuration"));
}else {
map.put("currJobDuration", l);
}
map.put("currJobCount", receivedEvent.getCurrJobCount());
mapList.add(map);
nextPwrStatList = ListUtil.toList(1);
workingJobMap.clear();
mapList.add(map);
}else {
if(machineWorkingStat.equals(1)) {
// 工作中只计算工作时长
if(isHadWaitJob) {
// 如果前面的消息是待机消息
startTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(waitJobEvent.getReportTime() * 1000),
ZoneId.systemDefault());
// 状态重置
waitJobEvent = null;
isHadWaitJob = false;
}
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
long l = Duration.between(startTime, localDateTime).get(SECONDS);
workingJobMap.put("currJobDuration", l + workingJobMap.get("currJobDuration"));
}
if(machineWorkingStat.equals(2)) {
// 待机中
isHadWaitJob = true;
waitJobEvent = receivedEvent;
}
}
}else {
startEvent = receivedEvent;
}
}
return mapList;
}
private String[] getIndicesList() throws IOException {

1
src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java

@ -86,6 +86,5 @@ public class EsRestClientUtil {
} catch (IOException e) {
e.printStackTrace();
}
return tupleList;
}
}

11
src/test/java/Demo1.java

@ -1,11 +1,18 @@
import java.sql.Date;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;
public class Demo1 {
public static void main(String[] args) {
LocalDate localDate = new Date(1659088698L * 1000).toLocalDate();
System.out.println(localDate);
Map<String, Long> workingJobMap = new HashMap<>();
workingJobMap.put("231231", 2325443523L);
workingJobMap.put("23rwer31", 2325443523L);
workingJobMap.put("2231f3f231", 2325443523L);
workingJobMap.clear();
workingJobMap.put("231232222231", 2325443523L);
System.out.println(workingJobMap.get("231232222231"));
}
}
Loading…
Cancel
Save