diff --git a/pom.xml b/pom.xml index 1ec550f..900a0f6 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,13 @@ 1.2.31 + + + mysql + mysql-connector-java + 8.0.29 + + com.qniao ddd-event diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java b/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java index 3d23f96..a25e528 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java @@ -43,7 +43,7 @@ public class DeviceMonitoringData { private Long currJobDuration; /** - * 数据实际采样时间 + * 数据实际采样时间(单位豪秒) */ private Long reportTime; diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java index de8d1e9..78c009a 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java @@ -19,7 +19,7 @@ public class DeviceTotalData { private Long theDayJobCount; /** - * 当天作业时长 + * 当天作业时长(单位秒) */ private Long theDayJobDuration; @@ -29,7 +29,7 @@ public class DeviceTotalData { private Long jobTotal; /** - * 累计作业时长 + * 累计作业时长(单位秒) */ private Long jobDurationTotal; diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java index 7a03c9f..8510df8 100644 --- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java @@ -3,6 +3,7 @@ package com.qniao.iot.gizwits; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.NumberUtil; import cn.hutool.db.Db; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONArray; @@ -89,9 +90,10 @@ public class IotMonitoringDataJob { // 获取设备数据源 KafkaSource source = KafkaSource.builder() .setBootstrapServers("120.25.199.30:19092") - .setTopics("machine_iot_data_received_event") + .setTopics("test") + //.setTopics("machine_iot_data_received_event") .setGroupId("123") - .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build(); @@ -103,7 +105,8 @@ public class IotMonitoringDataJob { // 数据过滤 SingleOutputStreamOperator streamOperator = dataStreamSource - .filter((FilterFunction) value -> value.getReportTime() != null && value.getDataSource() != null); + .filter((FilterFunction) value -> value.getReportTime() != null + && value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 861193040814171L); // mac分组并进行工作时长的集合操作 @@ -163,6 +166,7 @@ public class IotMonitoringDataJob { MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value(); Integer lastWorkingStat = lastWorkingStatState.value(); DeviceTotalData lastedDeviceState = deviceTotalDataStat.value(); + // 如果当前消息的时间大于等于上次消息的时间才进行处理 Integer machinePwrStat = receivedEvent.getMachinePwrStat(); Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); lastWorkingStatState.update(machineWorkingStat); @@ -171,120 +175,125 @@ public class IotMonitoringDataJob { // 1树根 0机智云 Integer dataSource = receivedEvent.getDataSource(); // 当前数据 - DeviceTotalData nowDeviceState = getDeviceTotalData(receivedEvent); + DeviceTotalData nowDeviceState = new DeviceTotalData(); if (lastedDeviceState == null) { - lastedDeviceState = deviceTotalDataStat.value(); + lastedDeviceState = getDeviceTotalData(receivedEvent); lastOnData = receivedEvent; } - if (lastWorkingStat == null) { - lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac()); - } - if (onData == null) { - onData = lastedDeviceState; - onDataState.update(onData); - } - LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); - Long a; - if (machinePwrStat.equals(0)) { - if (lastWorkingStat == 1) { - // 如果上次是工作中,那就进行累加 - assert lastedDeviceState != null; - Long lastReportTime = lastedDeviceState.getLastReportTime(); - if (lastReportTime == null) { - // 如果上次的消息时间为空,那么不进行计算 - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount()); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration()); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal()); - } else { - // 直接通过两个消息的时间差进行计算 - Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobDuration() + workingDuration); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); - } - if(dataSource == 1) { - // 树根 - nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + if(lastedDeviceState.getLastReportTime() <= receivedEvent.getReportTime()) { + if (lastWorkingStat == null) { + lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac()); + } + if (onData == null) { + onData = lastedDeviceState; + onDataState.update(onData); + } + LocalDate localDate = new Date(reportTime).toLocalDate(); + Long a; + if (machinePwrStat.equals(0)) { + if (lastWorkingStat == 1) { + // 如果上次是工作中,那就进行累加 + Long lastReportTime = lastedDeviceState.getLastReportTime(); + if (lastReportTime == null) { + // 如果上次的消息时间为空,那么不进行计算 + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount()); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration()); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal()); + nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal()); + } else { + // 直接通过两个消息的时间差进行计算(毫秒) + Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); + // 转为秒 + workingDuration = workingDuration / 1000; + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); + if(dataSource == 1) { + // 树根 + nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + }else { + // 机智云 + Long workingJon = accJobCount - lastedDeviceState.getJobTotal(); + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); + nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); + } + } + nowDeviceState.setCurrLocalDate(localDate); + if (lastOnData != null) { + nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime()), ZoneId.systemDefault())); + } else { + nowDeviceState.setLastBootTime(onData.getLastBootTime()); + } + nowDeviceState.setLastReportTime(reportTime); + deviceTotalDataStat.update(nowDeviceState); + lastOffDataState.update(receivedEvent); + // 关机后将待机数据清除 + lastWaitJobDataState.update(null); }else { - // 机智云 - Long workingJon = accJobCount - lastedDeviceState.getJobTotal(); - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); + nowDeviceState = lastedDeviceState; } - nowDeviceState.setCurrLocalDate(localDate); - if (lastOnData != null) { - nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); - } else { + } else { + if (machineWorkingStat.equals(1)) { + // 工作中 + Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); + // 转为秒 + workingDuration = workingDuration /1000; + if(dataSource == 1) { + // 树根 + nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + }else { + // 机智云 + Long workingJon = accJobCount - lastedDeviceState.getJobTotal(); + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); + nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); + } + nowDeviceState.setCurrLocalDate(localDate); nowDeviceState.setLastBootTime(onData.getLastBootTime()); - } - nowDeviceState.setLastReportTime(reportTime); - deviceTotalDataStat.update(nowDeviceState); - // 如果关机 - onDataState.update(nowDeviceState); - lastOffDataState.update(receivedEvent); - // 关机后将待机数据清除 - lastWaitJobDataState.update(null); - } - } else { - if (lastOffData != null) { - lastOnData = receivedEvent; - } - if (machineWorkingStat.equals(1)) { - // 工作中 - Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); - if(dataSource == 1) { - // 树根 - nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + if (lastWaitJobData != null) { + LocalDateTime lastWaitJobTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(lastWaitJobData.getReportTime()), + ZoneId.systemDefault()); + LocalDateTime localDateTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime), + ZoneId.systemDefault()); + a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + a); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + a); + } else { + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); + } + nowDeviceState.setLastReportTime(reportTime); + deviceTotalDataStat.update(nowDeviceState); }else { - // 机智云 - Long workingJon = accJobCount - lastedDeviceState.getJobTotal(); - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); + // 待机 + lastWaitJobDataState.update(receivedEvent); + nowDeviceState = lastedDeviceState; } - nowDeviceState.setCurrLocalDate(localDate); - nowDeviceState.setLastBootTime(onData.getLastBootTime()); - if (lastWaitJobData != null) { - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - LocalDateTime lastWaitJobTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + a); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + a); - } else { - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); + if (lastOffData != null) { + // 如果上次是关机消息,那么这次就是开机消息 + // 记录本次开机作为上次开机时间 + lastOnDataState.update(receivedEvent); + // 记录一个周期的开机时间 + onDataState.update(nowDeviceState); } - nowDeviceState.setLastReportTime(reportTime); - deviceTotalDataStat.update(nowDeviceState); } - if (machineWorkingStat.equals(2)) { - // 待机 - lastWaitJobDataState.update(receivedEvent); - } - } - // 如果上次是待机,并且这次也是待机,那么就不需要发送了 - if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) { - DeviceMonitoringData data = new DeviceMonitoringData(); - data.setDataSource(receivedEvent.getDataSource()); - data.setMachineIotMac(receivedEvent.getMachineIotMac()); - data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); - data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); - data.setAccJobCount(nowDeviceState.getJobTotal()); - data.setCurrJobCount(nowDeviceState.getTheDayJobCount()); - data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); - data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); - data.setReportTime(reportTime); - if (lastOnData == null) { - data.setLastBootTime(reportTime * 1000); - } else { - data.setLastBootTime(lastOnData.getReportTime() * 1000); + // 如果上次是待机,并且这次也是待机,那么就不需要发送了 + if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) { + DeviceMonitoringData data = new DeviceMonitoringData(); + data.setDataSource(receivedEvent.getDataSource()); + data.setMachineIotMac(receivedEvent.getMachineIotMac()); + data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); + data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); + data.setAccJobCount(nowDeviceState.getJobTotal()); + data.setCurrJobCount(nowDeviceState.getTheDayJobCount()); + data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); + data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); + data.setReportTime(reportTime); + data.setLastBootTime(onData.getLastBootTime().atZone(ZoneOffset.systemDefault()).toEpochSecond() * 1000); + out.collect(data); } - out.collect(data); } } @@ -304,7 +313,7 @@ public class IotMonitoringDataJob { DeviceTotalData value = deviceTotalDataStat.value(); DeviceTotalData data = new DeviceTotalData(); Long reportTime = event.getReportTime(); - LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); + LocalDate localDate = new Date(reportTime).toLocalDate(); if (value == null) { // 从es中获取 DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), null); @@ -318,9 +327,10 @@ public class IotMonitoringDataJob { LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime()) .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime(); data.setLastBootTime(ldt); + data.setLastReportTime(deviceMonitoringData.getReportTime()); } else { // es中也没有,直接从老接口拿 - data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000); + data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime); } value = data; } @@ -330,25 +340,27 @@ public class IotMonitoringDataJob { DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond()); if (deviceMonitoringData != null) { - data.setJobTotal(deviceMonitoringData.getAccJobCount()); - data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); - data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); - data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); - data.setCurrLocalDate(localDate); - data.setLastBootTime(LocalDateTime.ofInstant(Instant + value.setJobTotal(deviceMonitoringData.getAccJobCount()); + value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); + value.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); + value.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); + value.setCurrLocalDate(localDate); + value.setLastBootTime(LocalDateTime.ofInstant(Instant .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault())); + value.setLastReportTime(deviceMonitoringData.getReportTime()); } else { // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 - data.setJobTotal(value.getJobTotal()); - data.setJobDurationTotal(value.getJobDurationTotal()); - data.setTheDayJobDuration(0L); - data.setTheDayJobCount(0L); - data.setCurrLocalDate(localDate); - data.setLastBootTime(value.getLastBootTime()); + value.setJobTotal(value.getJobTotal()); + value.setJobDurationTotal(value.getJobDurationTotal()); + value.setTheDayJobDuration(0L); + value.setTheDayJobCount(0L); + value.setCurrLocalDate(localDate); + value.setLastBootTime(value.getLastBootTime()); + value.setLastReportTime(value.getLastReportTime()); } - deviceTotalDataStat.update(data); } - return data; + deviceTotalDataStat.update(value); + return value; } private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) { @@ -363,7 +375,7 @@ public class IotMonitoringDataJob { if (data == null) { break; } - Object records = JSONUtil.getByPath(JSONUtil.parse(result), "records"); + Object records = JSONUtil.getByPath(JSONUtil.parse(data), "records"); if (records == null) { break; } @@ -373,21 +385,21 @@ public class IotMonitoringDataJob { } for (Object o : objects.toArray()) { Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac"); - Long iotMac = (Long) mac; + Long iotMac = Long.parseLong((String)mac); if (iotMac.equals(machineIotMac)) { deviceTotalData = new DeviceTotalData(); Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal"); - deviceTotalData.setJobTotal((Long) productionTotal); + deviceTotalData.setJobTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(productionTotal)))); Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal"); - deviceTotalData.setJobDurationTotal(((Long) workTotalTotal) * 3600); + deviceTotalData.setJobDurationTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(workTotalTotal))) * 3600); Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime"); LocalDateTime lastBootTime = LocalDateTime - .parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:hh:ss")); + .parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); deviceTotalData.setLastBootTime(lastBootTime); deviceTotalData.setTheDayJobDuration(0L); deviceTotalData.setTheDayJobCount(0L); deviceTotalData.setCurrLocalDate(LocalDate.now()); - deviceTotalData.setLastReportTime(reportTime/1000); + deviceTotalData.setLastReportTime(reportTime); break; } } @@ -400,27 +412,11 @@ public class IotMonitoringDataJob { deviceTotalData.setTheDayJobDuration(0L); deviceTotalData.setTheDayJobCount(0L); deviceTotalData.setCurrLocalDate(LocalDate.now()); - deviceTotalData.setLastReportTime(reportTime/1000); + deviceTotalData.setLastReportTime(reportTime); } return deviceTotalData; } - - private String[] getIndicesList() throws IOException { - - GetAliasesRequest request = new GetAliasesRequest(); - GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT); - Map> map = getAliasesResponse.getAliases(); - Set indices = map.keySet(); - List indicesList = new ArrayList<>(); - for (String key : indices) { - if (key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) { - indicesList.add(key); - } - } - return ArrayUtil.toArray(indicesList, String.class); - } - private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac, Long reportTime) { try { @@ -455,7 +451,6 @@ public class IotMonitoringDataJob { } }).name("machineIotDataReceivedEventDataStream keyBy stream"); - machineIotDataReceivedEventDataStream.print(); // 写入es diff --git a/src/test/java/Demo2.java b/src/test/java/Demo2.java index 369b133..5e0d16e 100644 --- a/src/test/java/Demo2.java +++ b/src/test/java/Demo2.java @@ -6,12 +6,17 @@ public class Demo2 { public static void main(String[] args) { - String s = HttpUtil.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:21%7D"); + /* String s = HttpUtil.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:21%7D"); Object data = JSONUtil.getByPath(JSONUtil.parse(s), "data"); data = JSONUtil.getByPath(JSONUtil.parse(data), "records"); - System.out.println(data); + System.out.println(data);*/ + + Long a = 2314L; +a = a/1000; + + System.out.println(a); } } diff --git a/src/test/java/RootCloudIotDataEventSerialization.java b/src/test/java/RootCloudIotDataEventSerialization.java new file mode 100644 index 0000000..e3c9f61 --- /dev/null +++ b/src/test/java/RootCloudIotDataEventSerialization.java @@ -0,0 +1,25 @@ +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.Nullable; + +public class RootCloudIotDataEventSerialization { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private final String topic; + + public RootCloudIotDataEventSerialization(String topic) { + this.topic = topic; + } + + public ProducerRecord serialize( + final MachineIotDataReceivedEvent message, @Nullable final Long timestamp) { + try { + //if topic is null, default topic will be used + return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message)); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + message, e); + } + } +} diff --git a/src/test/java/SourceMockerDemo.java b/src/test/java/SourceMockerDemo.java new file mode 100644 index 0000000..8f00dde --- /dev/null +++ b/src/test/java/SourceMockerDemo.java @@ -0,0 +1,104 @@ +import cn.hutool.core.util.RandomUtil; +import cn.hutool.db.Db; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Future; + +public class SourceMockerDemo { + // 延迟:毫秒 + public static final long DELAY = 1000; + + public static void main(String[] args) throws Exception { + // 创建kafka配置属性 + Properties kafkaProps = createKafkaProperties(); + + // 创建Kafka消息的生产者 + KafkaProducer producer = new KafkaProducer<>(kafkaProps); + + String topic = "test"; + + // 电源状态(0断电 1有电) + List pwrStaList = Arrays.asList(0, 1); + + // 数据源(0智慧云 1根云) + List dataSource = Arrays.asList(0, 1); + + // 设备工作状态(0停机 1工作 2待机) + List accStaList = Arrays.asList(0, 1, 2); + Long currJobDuration = 231L; + + long accJobCount = 2314234L; + + // 循环发送事件 + while (true) { + + MachineIotDataReceivedEvent event = new MachineIotDataReceivedEvent(); + event.setId(RandomUtil.randomLong(999999999999999L)); + event.setMachineIotMac(861193040814171L); + event.setMachinePwrStat(RandomUtil.randomEles(pwrStaList, 1).get(0)); + event.setMachineWorkingStat(RandomUtil.randomEles(accStaList, 1).get(0)); + // 递增每次加一个随机数 + event.setCurrJobDuration(currJobDuration = currJobDuration + RandomUtil.randomLong(99L)); + // 递增每次加一个随机数 + event.setCurrWaitingDuration(0L); + event.setIgStat(0); + event.setReceivedTime(LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")) * 1000); + event.setReportTime(LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")) * 1000); + + // 递增加一个随机数 + event.setCurrJobCount(currJobDuration = currJobDuration + RandomUtil.randomLong(99L)); + // 基础值加CurrJobCount + event.setAccJobCount(accJobCount = accJobCount + RandomUtil.randomLong(99L)); + event.setDataSource(1); + // 递增随机加一个数 + event.setCurrStoppingDuration(0L); + + if(event.getMachinePwrStat().equals(0)) { + event.setMachineWorkingStat(0); + }else { + event.setMachineWorkingStat(generateWorkingSta(RandomUtil.randomEles(accStaList, 1).get(0), event.getMachinePwrStat())); + } + ProducerRecord record = new RootCloudIotDataEventSerialization(topic).serialize( + event, + null); + + Future send = producer.send(record); + System.out.println(send.get()); + + Thread.sleep(DELAY); + } + } + + private static Integer generateWorkingSta(Integer workingSta, Integer pwrSta) { + + if(pwrSta.equals(0)) { + return 0; + }else { + return workingSta; + } + } + + private static Properties createKafkaProperties() { + + Properties kafkaProps = new Properties(); + // 正式环境 + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); + kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + return kafkaProps; + } +}