Browse Source

更新

master
1049970895@qniao.cn 3 years ago
parent
commit
f119b23a99
2 changed files with 8 additions and 21 deletions
  1. 4
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java
  2. 25
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

4
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java

@ -1,13 +1,9 @@
package com.qniao.iot.machine.event.generator.job;
import cn.hutool.json.JSONUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
@Data
@AllArgsConstructor
@NoArgsConstructor

25
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -2,13 +2,8 @@ package com.qniao.iot.machine.event.generator.job;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.lang.Tuple;
import cn.hutool.db.Db;
import cn.hutool.db.handler.RsHandler;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.qniao.domain.BaseCommand;
import com.qniao.iot.machine.command.PowerOffMachineCommand;
import com.qniao.iot.machine.command.PowerOnMachineCommand;
@ -24,8 +19,6 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
@ -47,17 +40,15 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.stream.Collectors;
import java.util.ArrayList;
import java.util.List;
public class IotMachineEventGeneratorJob {
@ -83,7 +74,6 @@ public class IotMachineEventGeneratorJob {
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
DataStream<MachineIotDataReceivedEvent> machineIotDataReceivedEventDataStream = dataStreamSource
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>() {
@ -161,14 +151,15 @@ public class IotMachineEventGeneratorJob {
}
private static DeviceState getDeviceStateListJson(ValueState<DeviceState> deviceState,
Long machineIotMac) throws IOException, SQLException {
Long machineIotMac) throws IOException, SQLException {
// 获取最新设备状态
DeviceState deviceStateListJson = deviceState.value();
if (deviceStateListJson == null) {
deviceStateListJson = Db.use().query(SQL, (RsHandler<DeviceState>) rs -> new DeviceState(rs.getLong(1),
rs.getLong(2),
rs.getInt(3), System.currentTimeMillis()), machineIotMac);
List<DeviceState> list = Db.use().query(SQL, DeviceState.class, machineIotMac);
if(CollUtil.isNotEmpty(list)) {
deviceStateListJson = list.get(0);
}
if (deviceStateListJson != null) {
// 如果是空的并且在表中都没找到说明是没有被记录的设备不用管
deviceState.update(deviceStateListJson);

Loading…
Cancel
Save