Browse Source

更新

master
1049970895@qniao.cn 3 years ago
parent
commit
33ec95bde8
3 changed files with 43 additions and 21 deletions
  1. 5
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java
  2. 55
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java
  3. 4
      iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties

5
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java

@ -24,6 +24,11 @@ public class DeviceState {
*/
private Integer status;
/**
* 计算单位
*/
private Integer countUnit;
/**
* 发生时间
*/

55
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -2,6 +2,7 @@ package com.qniao.iot.machine.event.generator.job;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db;
import com.qniao.domain.BaseCommand;
import com.qniao.iot.machine.command.PowerOffMachineCommand;
@ -45,7 +46,6 @@ import org.elasticsearch.client.Requests;
import java.io.IOException;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
@ -54,15 +54,17 @@ import java.util.List;
public class IotMachineEventGeneratorJob {
private final static String SQL = "select machine_id, iot_mac as machine_iot_mac, status " +
"from qn_machine_realtime_state where iot_mac = ? and is_delete = 0";
private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" +
"from qn_machine_realtime_state qmrs\n" +
" LEFT JOIN qn_machine_list qml ON qmrs.iot_mac = qml.example_id\n" +
"where qmrs.iot_mac = ?\n" +
" and qmrs.is_delete = 0\n" +
" and qml.is_delete = 0";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 设置并行度为1并行度要小于等于kafka topic的分区数否则其他并行度分配不到数据
// env.setParallelism(1);
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
@ -100,7 +102,7 @@ public class IotMachineEventGeneratorJob {
// 更新状态
if (lastedDeviceState != null) {
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(),
deviceStatus, event.getReportTime());
deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime());
collDeviceStatusChange1(out, newState, lastedDeviceState, event);
this.deviceState.update(newState);
}
@ -134,7 +136,7 @@ public class IotMachineEventGeneratorJob {
// 更新状态
if (lastedDeviceState != null) {
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(),
deviceStatus, event.getReportTime());
deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime());
collDeviceStatusChange(out, newState, lastedDeviceState, event);
this.deviceState.update(newState);
}
@ -229,14 +231,15 @@ public class IotMachineEventGeneratorJob {
ElasticsearchSink.Builder<MachineIotDataReceivedEvent> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<MachineIotDataReceivedEvent>) (machineIotDataReceivedEvent, runtimeContext, requestIndexer) -> {
// 按日期进行分
// 按日期进行分
LocalDate reportDate = new Date(machineIotDataReceivedEvent.getReportTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest()
.index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + indexDateSuffix)
.source(BeanUtil.beanToMap(machineIotDataReceivedEvent));
.index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix)
.source(BeanUtil.beanToMap(machineIotDataReceivedEvent))
.id(StrUtil.toString(machineIotDataReceivedEvent.getId()));
requestIndexer.add(indexRequest);
}
);
@ -287,29 +290,33 @@ public class IotMachineEventGeneratorJob {
DeviceState newState,
DeviceState oldState, MachineIotDataReceivedEvent event) {
Integer countUnit = newState.getCountUnit();
countUnit = countUnit == null ? 1 : countUnit;
Long currJobCount = event.getCurrJobCount();
currJobCount = currJobCount == null ? 0 : currJobCount * countUnit;
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) {
// 设备开机
PowerOnMachineCommand powerOnMachineCommand = new PowerOnMachineCommand(newState.getMachineId(),
newState.getMachineIotMac(), event.getCurrJobCount());
powerOnMachineCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
newState.getMachineIotMac(), currJobCount);
powerOnMachineCommand.setTimestamp(event.getReportTime());
out.collect(powerOnMachineCommand);
} else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) {
// 设备关机
PowerOffMachineCommand powerOffMachineCommand = new PowerOffMachineCommand(newState.getMachineId(),
newState.getMachineIotMac(), event.getCurrJobCount());
powerOffMachineCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
newState.getMachineIotMac(), currJobCount);
powerOffMachineCommand.setTimestamp(event.getReportTime());
out.collect(powerOffMachineCommand);
} else if (oldState.getStatus() == 1 && newState.getStatus() == 2) {
// 设备开始待机
StopMachineWorkingCommand stopMachineWorkingCommand = new StopMachineWorkingCommand(newState.getMachineId(),
newState.getMachineIotMac(), event.getCurrJobCount());
stopMachineWorkingCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
newState.getMachineIotMac(), currJobCount);
stopMachineWorkingCommand.setTimestamp(event.getReportTime());
out.collect(stopMachineWorkingCommand);
} else if (oldState.getStatus() == 2 && newState.getStatus() == 1) {
// 设备开始工作
StartMachineWorkingCommand startMachineWorkingCommand = new StartMachineWorkingCommand(newState.getMachineId(),
newState.getMachineIotMac(), event.getCurrJobCount());
startMachineWorkingCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
newState.getMachineIotMac(), currJobCount);
startMachineWorkingCommand.setTimestamp(event.getReportTime());
out.collect(startMachineWorkingCommand);
}
}
@ -318,6 +325,16 @@ public class IotMachineEventGeneratorJob {
DeviceState newState,
DeviceState oldState, MachineIotDataReceivedEvent event) {
Integer countUnit = newState.getCountUnit();
countUnit = countUnit == null ? 1 : countUnit;
Long currJobCount = event.getCurrJobCount();
if (currJobCount != null) {
event.setCurrJobCount(currJobCount * countUnit);
}
Long accJobCount = event.getAccJobCount();
if (accJobCount != null) {
event.setAccJobCount(accJobCount * countUnit);
}
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) {
// 设备开机
out.collect(event);

4
iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties

@ -1,6 +1,6 @@
app.id=machine-state-event-generator
# ???? 8.135.8.221
# ???? 47.112.164.224
# test 8.135.8.221
# pro 47.112.164.224
apollo.meta=http://47.112.164.224:5000
Loading…
Cancel
Save