diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java index cca3f6d..c358f1c 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java @@ -24,6 +24,11 @@ public class DeviceState { */ private Integer status; + /** + * 计算单位 + */ + private Integer countUnit; + /** * 发生时间 */ diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 6a1c6ed..4c67e93 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -2,6 +2,7 @@ package com.qniao.iot.machine.event.generator.job; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; import cn.hutool.db.Db; import com.qniao.domain.BaseCommand; import com.qniao.iot.machine.command.PowerOffMachineCommand; @@ -45,7 +46,6 @@ import org.elasticsearch.client.Requests; import java.io.IOException; import java.sql.SQLException; import java.time.LocalDate; -import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -54,15 +54,17 @@ import java.util.List; public class IotMachineEventGeneratorJob { - private final static String SQL = "select machine_id, iot_mac as machine_iot_mac, status " + - "from qn_machine_realtime_state where iot_mac = ? and is_delete = 0"; + private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" + + "from qn_machine_realtime_state qmrs\n" + + " LEFT JOIN qn_machine_list qml ON qmrs.iot_mac = qml.example_id\n" + + "where qmrs.iot_mac = ?\n" + + " and qmrs.is_delete = 0\n" + + " and qml.is_delete = 0"; public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); - // 设置并行度为1,并行度要小于等于kafka topic的分区数,否则其他并行度分配不到数据 - // env.setParallelism(1); KafkaSource source = KafkaSource.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) @@ -100,7 +102,7 @@ public class IotMachineEventGeneratorJob { // 更新状态 if (lastedDeviceState != null) { DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), - deviceStatus, event.getReportTime()); + deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime()); collDeviceStatusChange1(out, newState, lastedDeviceState, event); this.deviceState.update(newState); } @@ -134,7 +136,7 @@ public class IotMachineEventGeneratorJob { // 更新状态 if (lastedDeviceState != null) { DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), - deviceStatus, event.getReportTime()); + deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime()); collDeviceStatusChange(out, newState, lastedDeviceState, event); this.deviceState.update(newState); } @@ -229,14 +231,15 @@ public class IotMachineEventGeneratorJob { ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction) (machineIotDataReceivedEvent, runtimeContext, requestIndexer) -> { - // 按日期进行分割 + // 按日期进行分片 LocalDate reportDate = new Date(machineIotDataReceivedEvent.getReportTime()) .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); - String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); //创建es 请求 IndexRequest indexRequest = Requests.indexRequest() - .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + indexDateSuffix) - .source(BeanUtil.beanToMap(machineIotDataReceivedEvent)); + .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix) + .source(BeanUtil.beanToMap(machineIotDataReceivedEvent)) + .id(StrUtil.toString(machineIotDataReceivedEvent.getId())); requestIndexer.add(indexRequest); } ); @@ -287,29 +290,33 @@ public class IotMachineEventGeneratorJob { DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) { + Integer countUnit = newState.getCountUnit(); + countUnit = countUnit == null ? 1 : countUnit; + Long currJobCount = event.getCurrJobCount(); + currJobCount = currJobCount == null ? 0 : currJobCount * countUnit; if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { // 设备开机 PowerOnMachineCommand powerOnMachineCommand = new PowerOnMachineCommand(newState.getMachineId(), - newState.getMachineIotMac(), event.getCurrJobCount()); - powerOnMachineCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + newState.getMachineIotMac(), currJobCount); + powerOnMachineCommand.setTimestamp(event.getReportTime()); out.collect(powerOnMachineCommand); } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { // 设备关机 PowerOffMachineCommand powerOffMachineCommand = new PowerOffMachineCommand(newState.getMachineId(), - newState.getMachineIotMac(), event.getCurrJobCount()); - powerOffMachineCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + newState.getMachineIotMac(), currJobCount); + powerOffMachineCommand.setTimestamp(event.getReportTime()); out.collect(powerOffMachineCommand); } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { // 设备开始待机 StopMachineWorkingCommand stopMachineWorkingCommand = new StopMachineWorkingCommand(newState.getMachineId(), - newState.getMachineIotMac(), event.getCurrJobCount()); - stopMachineWorkingCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + newState.getMachineIotMac(), currJobCount); + stopMachineWorkingCommand.setTimestamp(event.getReportTime()); out.collect(stopMachineWorkingCommand); } else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { // 设备开始工作 StartMachineWorkingCommand startMachineWorkingCommand = new StartMachineWorkingCommand(newState.getMachineId(), - newState.getMachineIotMac(), event.getCurrJobCount()); - startMachineWorkingCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + newState.getMachineIotMac(), currJobCount); + startMachineWorkingCommand.setTimestamp(event.getReportTime()); out.collect(startMachineWorkingCommand); } } @@ -318,6 +325,16 @@ public class IotMachineEventGeneratorJob { DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) { + Integer countUnit = newState.getCountUnit(); + countUnit = countUnit == null ? 1 : countUnit; + Long currJobCount = event.getCurrJobCount(); + if (currJobCount != null) { + event.setCurrJobCount(currJobCount * countUnit); + } + Long accJobCount = event.getAccJobCount(); + if (accJobCount != null) { + event.setAccJobCount(accJobCount * countUnit); + } if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { // 设备开机 out.collect(event); diff --git a/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties b/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties index 01661b3..22ba09b 100644 --- a/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties +++ b/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties @@ -1,6 +1,6 @@ app.id=machine-state-event-generator -# ???? 8.135.8.221 -# ???? 47.112.164.224 +# test 8.135.8.221 +# pro 47.112.164.224 apollo.meta=http://47.112.164.224:5000 \ No newline at end of file