diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 1833a7a..7c08103 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -62,11 +62,13 @@ public class IotMachineEventGeneratorJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + // 设置并行度为1,并行度要小于等于kafka topic的分区数,否则其他并行度分配不到数据 + env.setParallelism(1); KafkaSource source = KafkaSource.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) - .setStartingOffsets(OffsetsInitializer.earliest()) + .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build(); @@ -75,6 +77,50 @@ public class IotMachineEventGeneratorJob { .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); + DataStream commandDataStream1 = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) + .process(new KeyedProcessFunction() { + + private ValueState deviceState; + + @Override + public void open(Configuration parameters) throws SQLException { + // 必须在 open 生命周期初始化 + // 获取所有设备的最新状态 + List deviceStateList = Db.use().query(sql, DeviceState.class); + Map allMachineMap = new HashMap<>(); + if (CollUtil.isNotEmpty(deviceStateList)) { + allMachineMap = deviceStateList.stream() + .collect(Collectors.toMap(DeviceState::getMachineIotMac, + deviceState -> deviceState, (deviceState1, deviceState2) -> deviceState1)); + } + deviceState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("deviceState1", + TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap))); + } + + @Override + public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction.Context ctx, Collector out) throws Exception { + + // 获取最新设备状态 + JSON deviceStateListJson = deviceState.value(); + DeviceState lastedDeviceState = deviceStateListJson + .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); + Integer deviceStatus = getDeviceStatus(event); + if (deviceStatus != null) { + deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), + new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); + if (lastedDeviceState != null) { + DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), + deviceStatus, event.getReportTime()); + collDeviceStatusChange1(out, newState, lastedDeviceState, event); + } + deviceState.update(deviceStateListJson); + } + } + }).name("commandDataStream1 keyBy stream"); + + DataStream commandDataStream = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { @@ -85,7 +131,6 @@ public class IotMachineEventGeneratorJob { // 必须在 open 生命周期初始化 // 获取所有设备的最新状态 List deviceStateList = Db.use().query(sql, DeviceState.class); - System.out.println("已获取到设备最新状态数据:" + deviceStateList.size()); Map allMachineMap = new HashMap<>(); if (CollUtil.isNotEmpty(deviceStateList)) { allMachineMap = deviceStateList.stream() @@ -124,7 +169,7 @@ public class IotMachineEventGeneratorJob { sinkRabbitMq(commandDataStream); // 写入es - sinkEs(dataStreamSource); + sinkEs(commandDataStream1); env.execute("Kafka Job"); } @@ -163,7 +208,7 @@ public class IotMachineEventGeneratorJob { })).name("commandDataStream to rabbitmq Sink"); } - private static void sinkEs(DataStreamSource dataStream) { + private static void sinkEs(DataStream dataStream) { List httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST), @@ -238,4 +283,22 @@ public class IotMachineEventGeneratorJob { out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); } } + + private static void collDeviceStatusChange1(Collector out, DeviceState newState, DeviceState oldState, + MachineIotDataReceivedEvent event) { + + if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { + // 设备开机 + out.collect(event); + } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { + // 设备关机 + out.collect(event); + } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { + // 设备开始待机 + out.collect(event); + } else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { + // 设备开始工作 + out.collect(event); + } + } }