Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
187364d86d
7 changed files with 287 additions and 151 deletions
  1. 7
      pom.xml
  2. 2
      src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java
  3. 4
      src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java
  4. 287
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
  5. 9
      src/test/java/Demo2.java
  6. 25
      src/test/java/RootCloudIotDataEventSerialization.java
  7. 104
      src/test/java/SourceMockerDemo.java

7
pom.xml

@ -104,6 +104,13 @@
<version>1.2.31</version>
</dependency>
<!--mysql数据库驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>ddd-event</artifactId>

2
src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java

@ -43,7 +43,7 @@ public class DeviceMonitoringData {
private Long currJobDuration;
/**
* 数据实际采样时间
* 数据实际采样时间单位豪秒
*/
private Long reportTime;

4
src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java

@ -19,7 +19,7 @@ public class DeviceTotalData {
private Long theDayJobCount;
/**
* 当天作业时长
* 当天作业时长单位秒
*/
private Long theDayJobDuration;
@ -29,7 +29,7 @@ public class DeviceTotalData {
private Long jobTotal;
/**
* 累计作业时长
* 累计作业时长单位秒
*/
private Long jobDurationTotal;

287
src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

@ -3,6 +3,7 @@ package com.qniao.iot.gizwits;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.db.Db;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
@ -89,9 +90,10 @@ public class IotMonitoringDataJob {
// 获取设备数据源
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers("120.25.199.30:19092")
.setTopics("machine_iot_data_received_event")
.setTopics("test")
//.setTopics("machine_iot_data_received_event")
.setGroupId("123")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
.build();
@ -103,7 +105,8 @@ public class IotMonitoringDataJob {
// 数据过滤
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getReportTime() != null && value.getDataSource() != null);
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getReportTime() != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 861193040814171L);
// mac分组并进行工作时长的集合操作
@ -163,6 +166,7 @@ public class IotMonitoringDataJob {
MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
DeviceTotalData lastedDeviceState = deviceTotalDataStat.value();
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
@ -171,120 +175,125 @@ public class IotMonitoringDataJob {
// 1树根 0机智云
Integer dataSource = receivedEvent.getDataSource();
// 当前数据
DeviceTotalData nowDeviceState = getDeviceTotalData(receivedEvent);
DeviceTotalData nowDeviceState = new DeviceTotalData();
if (lastedDeviceState == null) {
lastedDeviceState = deviceTotalDataStat.value();
lastedDeviceState = getDeviceTotalData(receivedEvent);
lastOnData = receivedEvent;
}
if (lastWorkingStat == null) {
lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac());
}
if (onData == null) {
onData = lastedDeviceState;
onDataState.update(onData);
}
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
Long a;
if (machinePwrStat.equals(0)) {
if (lastWorkingStat == 1) {
// 如果上次是工作中那就进行累加
assert lastedDeviceState != null;
Long lastReportTime = lastedDeviceState.getLastReportTime();
if (lastReportTime == null) {
// 如果上次的消息时间为空那么不进行计算
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal());
} else {
// 直接通过两个消息的时间差进行计算
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
}
if(dataSource == 1) {
// 树根
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
if(lastedDeviceState.getLastReportTime() <= receivedEvent.getReportTime()) {
if (lastWorkingStat == null) {
lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac());
}
if (onData == null) {
onData = lastedDeviceState;
onDataState.update(onData);
}
LocalDate localDate = new Date(reportTime).toLocalDate();
Long a;
if (machinePwrStat.equals(0)) {
if (lastWorkingStat == 1) {
// 如果上次是工作中那就进行累加
Long lastReportTime = lastedDeviceState.getLastReportTime();
if (lastReportTime == null) {
// 如果上次的消息时间为空那么不进行计算
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal());
} else {
// 直接通过两个消息的时间差进行计算毫秒
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
// 转为秒
workingDuration = workingDuration / 1000;
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
if(dataSource == 1) {
// 树根
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
}else {
// 机智云
Long workingJon = accJobCount - lastedDeviceState.getJobTotal();
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
}
nowDeviceState.setCurrLocalDate(localDate);
if (lastOnData != null) {
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime()), ZoneId.systemDefault()));
} else {
nowDeviceState.setLastBootTime(onData.getLastBootTime());
}
nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
lastOffDataState.update(receivedEvent);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
}else {
// 机智云
Long workingJon = accJobCount - lastedDeviceState.getJobTotal();
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
nowDeviceState = lastedDeviceState;
}
nowDeviceState.setCurrLocalDate(localDate);
if (lastOnData != null) {
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
} else {
} else {
if (machineWorkingStat.equals(1)) {
// 工作中
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
// 转为秒
workingDuration = workingDuration /1000;
if(dataSource == 1) {
// 树根
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
}else {
// 机智云
Long workingJon = accJobCount - lastedDeviceState.getJobTotal();
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
nowDeviceState.setCurrLocalDate(localDate);
nowDeviceState.setLastBootTime(onData.getLastBootTime());
}
nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
// 如果关机
onDataState.update(nowDeviceState);
lastOffDataState.update(receivedEvent);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
}
} else {
if (lastOffData != null) {
lastOnData = receivedEvent;
}
if (machineWorkingStat.equals(1)) {
// 工作中
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
if(dataSource == 1) {
// 树根
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
if (lastWaitJobData != null) {
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(lastWaitJobData.getReportTime()),
ZoneId.systemDefault());
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + a);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + a);
} else {
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
}
nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
}else {
// 机智云
Long workingJon = accJobCount - lastedDeviceState.getJobTotal();
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
// 待机
lastWaitJobDataState.update(receivedEvent);
nowDeviceState = lastedDeviceState;
}
nowDeviceState.setCurrLocalDate(localDate);
nowDeviceState.setLastBootTime(onData.getLastBootTime());
if (lastWaitJobData != null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + a);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + a);
} else {
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
if (lastOffData != null) {
// 如果上次是关机消息那么这次就是开机消息
// 记录本次开机作为上次开机时间
lastOnDataState.update(receivedEvent);
// 记录一个周期的开机时间
onDataState.update(nowDeviceState);
}
nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
}
if (machineWorkingStat.equals(2)) {
// 待机
lastWaitJobDataState.update(receivedEvent);
}
}
// 如果上次是待机并且这次也是待机那么就不需要发送了
if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
if (lastOnData == null) {
data.setLastBootTime(reportTime * 1000);
} else {
data.setLastBootTime(lastOnData.getReportTime() * 1000);
// 如果上次是待机并且这次也是待机那么就不需要发送了
if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setLastBootTime(onData.getLastBootTime().atZone(ZoneOffset.systemDefault()).toEpochSecond() * 1000);
out.collect(data);
}
out.collect(data);
}
}
@ -304,7 +313,7 @@ public class IotMonitoringDataJob {
DeviceTotalData value = deviceTotalDataStat.value();
DeviceTotalData data = new DeviceTotalData();
Long reportTime = event.getReportTime();
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
LocalDate localDate = new Date(reportTime).toLocalDate();
if (value == null) {
// 从es中获取
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), null);
@ -318,9 +327,10 @@ public class IotMonitoringDataJob {
LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime();
data.setLastBootTime(ldt);
data.setLastReportTime(deviceMonitoringData.getReportTime());
} else {
// es中也没有直接从老接口拿
data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000);
data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime);
}
value = data;
}
@ -330,25 +340,27 @@ public class IotMonitoringDataJob {
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(),
LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond());
if (deviceMonitoringData != null) {
data.setJobTotal(deviceMonitoringData.getAccJobCount());
data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
data.setCurrLocalDate(localDate);
data.setLastBootTime(LocalDateTime.ofInstant(Instant
value.setJobTotal(deviceMonitoringData.getAccJobCount());
value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
value.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
value.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
value.setCurrLocalDate(localDate);
value.setLastBootTime(LocalDateTime.ofInstant(Instant
.ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault()));
value.setLastReportTime(deviceMonitoringData.getReportTime());
} else {
// value有值但是日期不对说明到了第二天那么只需要对当天数据清零即可
data.setJobTotal(value.getJobTotal());
data.setJobDurationTotal(value.getJobDurationTotal());
data.setTheDayJobDuration(0L);
data.setTheDayJobCount(0L);
data.setCurrLocalDate(localDate);
data.setLastBootTime(value.getLastBootTime());
value.setJobTotal(value.getJobTotal());
value.setJobDurationTotal(value.getJobDurationTotal());
value.setTheDayJobDuration(0L);
value.setTheDayJobCount(0L);
value.setCurrLocalDate(localDate);
value.setLastBootTime(value.getLastBootTime());
value.setLastReportTime(value.getLastReportTime());
}
deviceTotalDataStat.update(data);
}
return data;
deviceTotalDataStat.update(value);
return value;
}
private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) {
@ -363,7 +375,7 @@ public class IotMonitoringDataJob {
if (data == null) {
break;
}
Object records = JSONUtil.getByPath(JSONUtil.parse(result), "records");
Object records = JSONUtil.getByPath(JSONUtil.parse(data), "records");
if (records == null) {
break;
}
@ -373,21 +385,21 @@ public class IotMonitoringDataJob {
}
for (Object o : objects.toArray()) {
Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac");
Long iotMac = (Long) mac;
Long iotMac = Long.parseLong((String)mac);
if (iotMac.equals(machineIotMac)) {
deviceTotalData = new DeviceTotalData();
Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal");
deviceTotalData.setJobTotal((Long) productionTotal);
deviceTotalData.setJobTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(productionTotal))));
Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal");
deviceTotalData.setJobDurationTotal(((Long) workTotalTotal) * 3600);
deviceTotalData.setJobDurationTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(workTotalTotal))) * 3600);
Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime");
LocalDateTime lastBootTime = LocalDateTime
.parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:hh:ss"));
.parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
deviceTotalData.setLastBootTime(lastBootTime);
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now());
deviceTotalData.setLastReportTime(reportTime/1000);
deviceTotalData.setLastReportTime(reportTime);
break;
}
}
@ -400,27 +412,11 @@ public class IotMonitoringDataJob {
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now());
deviceTotalData.setLastReportTime(reportTime/1000);
deviceTotalData.setLastReportTime(reportTime);
}
return deviceTotalData;
}
private String[] getIndicesList() throws IOException {
GetAliasesRequest request = new GetAliasesRequest();
GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT);
Map<String, Set<AliasMetaData>> map = getAliasesResponse.getAliases();
Set<String> indices = map.keySet();
List<String> indicesList = new ArrayList<>();
for (String key : indices) {
if (key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) {
indicesList.add(key);
}
}
return ArrayUtil.toArray(indicesList, String.class);
}
private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac, Long reportTime) {
try {
@ -455,7 +451,6 @@ public class IotMonitoringDataJob {
}
}).name("machineIotDataReceivedEventDataStream keyBy stream");
machineIotDataReceivedEventDataStream.print();
// 写入es

9
src/test/java/Demo2.java

@ -6,12 +6,17 @@ public class Demo2 {
public static void main(String[] args) {
String s = HttpUtil.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:21%7D");
/* String s = HttpUtil.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:21%7D");
Object data = JSONUtil.getByPath(JSONUtil.parse(s), "data");
data = JSONUtil.getByPath(JSONUtil.parse(data), "records");
System.out.println(data);
System.out.println(data);*/
Long a = 2314L;
a = a/1000;
System.out.println(a);
}
}

25
src/test/java/RootCloudIotDataEventSerialization.java

@ -0,0 +1,25 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
public class RootCloudIotDataEventSerialization {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final String topic;
public RootCloudIotDataEventSerialization(String topic) {
this.topic = topic;
}
public ProducerRecord<String, byte[]> serialize(
final MachineIotDataReceivedEvent message, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
}
}
}

104
src/test/java/SourceMockerDemo.java

@ -0,0 +1,104 @@
import cn.hutool.core.util.RandomUtil;
import cn.hutool.db.Db;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
public class SourceMockerDemo {
// 延迟毫秒
public static final long DELAY = 1000;
public static void main(String[] args) throws Exception {
// 创建kafka配置属性
Properties kafkaProps = createKafkaProperties();
// 创建Kafka消息的生产者
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(kafkaProps);
String topic = "test";
// 电源状态0断电 1有电
List<Integer> pwrStaList = Arrays.asList(0, 1);
// 数据源0智慧云 1根云
List<Integer> dataSource = Arrays.asList(0, 1);
// 设备工作状态0停机 1工作 2待机
List<Integer> accStaList = Arrays.asList(0, 1, 2);
Long currJobDuration = 231L;
long accJobCount = 2314234L;
// 循环发送事件
while (true) {
MachineIotDataReceivedEvent event = new MachineIotDataReceivedEvent();
event.setId(RandomUtil.randomLong(999999999999999L));
event.setMachineIotMac(861193040814171L);
event.setMachinePwrStat(RandomUtil.randomEles(pwrStaList, 1).get(0));
event.setMachineWorkingStat(RandomUtil.randomEles(accStaList, 1).get(0));
// 递增每次加一个随机数
event.setCurrJobDuration(currJobDuration = currJobDuration + RandomUtil.randomLong(99L));
// 递增每次加一个随机数
event.setCurrWaitingDuration(0L);
event.setIgStat(0);
event.setReceivedTime(LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")) * 1000);
event.setReportTime(LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")) * 1000);
// 递增加一个随机数
event.setCurrJobCount(currJobDuration = currJobDuration + RandomUtil.randomLong(99L));
// 基础值加CurrJobCount
event.setAccJobCount(accJobCount = accJobCount + RandomUtil.randomLong(99L));
event.setDataSource(1);
// 递增随机加一个数
event.setCurrStoppingDuration(0L);
if(event.getMachinePwrStat().equals(0)) {
event.setMachineWorkingStat(0);
}else {
event.setMachineWorkingStat(generateWorkingSta(RandomUtil.randomEles(accStaList, 1).get(0), event.getMachinePwrStat()));
}
ProducerRecord<String, byte[]> record = new RootCloudIotDataEventSerialization(topic).serialize(
event,
null);
Future<RecordMetadata> send = producer.send(record);
System.out.println(send.get());
Thread.sleep(DELAY);
}
}
private static Integer generateWorkingSta(Integer workingSta, Integer pwrSta) {
if(pwrSta.equals(0)) {
return 0;
}else {
return workingSta;
}
}
private static Properties createKafkaProperties() {
Properties kafkaProps = new Properties();
// 正式环境
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
return kafkaProps;
}
}
Loading…
Cancel
Save