|
|
|
@ -62,11 +62,13 @@ public class IotMachineEventGeneratorJob { |
|
|
|
|
|
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|
|
|
// 设置并行度为1,并行度要小于等于kafka topic的分区数,否则其他并行度分配不到数据 |
|
|
|
env.setParallelism(1); |
|
|
|
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder() |
|
|
|
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) |
|
|
|
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) |
|
|
|
.setStartingOffsets(OffsetsInitializer.earliest()) |
|
|
|
.setStartingOffsets(OffsetsInitializer.latest()) |
|
|
|
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) |
|
|
|
.build(); |
|
|
|
|
|
|
|
@ -75,6 +77,50 @@ public class IotMachineEventGeneratorJob { |
|
|
|
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); |
|
|
|
|
|
|
|
|
|
|
|
DataStream<MachineIotDataReceivedEvent> commandDataStream1 = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) |
|
|
|
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>() { |
|
|
|
|
|
|
|
private ValueState<JSON> deviceState; |
|
|
|
|
|
|
|
@Override |
|
|
|
public void open(Configuration parameters) throws SQLException { |
|
|
|
// 必须在 open 生命周期初始化 |
|
|
|
// 获取所有设备的最新状态 |
|
|
|
List<DeviceState> deviceStateList = Db.use().query(sql, DeviceState.class); |
|
|
|
Map<Long, DeviceState> allMachineMap = new HashMap<>(); |
|
|
|
if (CollUtil.isNotEmpty(deviceStateList)) { |
|
|
|
allMachineMap = deviceStateList.stream() |
|
|
|
.collect(Collectors.toMap(DeviceState::getMachineIotMac, |
|
|
|
deviceState -> deviceState, (deviceState1, deviceState2) -> deviceState1)); |
|
|
|
} |
|
|
|
deviceState = getRuntimeContext() |
|
|
|
.getState(new ValueStateDescriptor<>("deviceState1", |
|
|
|
TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap))); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction<Long, |
|
|
|
MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>.Context ctx, Collector<MachineIotDataReceivedEvent> out) throws Exception { |
|
|
|
|
|
|
|
// 获取最新设备状态 |
|
|
|
JSON deviceStateListJson = deviceState.value(); |
|
|
|
DeviceState lastedDeviceState = deviceStateListJson |
|
|
|
.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); |
|
|
|
Integer deviceStatus = getDeviceStatus(event); |
|
|
|
if (deviceStatus != null) { |
|
|
|
deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), |
|
|
|
new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); |
|
|
|
if (lastedDeviceState != null) { |
|
|
|
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), |
|
|
|
deviceStatus, event.getReportTime()); |
|
|
|
collDeviceStatusChange1(out, newState, lastedDeviceState, event); |
|
|
|
} |
|
|
|
deviceState.update(deviceStateListJson); |
|
|
|
} |
|
|
|
} |
|
|
|
}).name("commandDataStream1 keyBy stream"); |
|
|
|
|
|
|
|
|
|
|
|
DataStream<BaseCommand> commandDataStream = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) |
|
|
|
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() { |
|
|
|
|
|
|
|
@ -85,7 +131,6 @@ public class IotMachineEventGeneratorJob { |
|
|
|
// 必须在 open 生命周期初始化 |
|
|
|
// 获取所有设备的最新状态 |
|
|
|
List<DeviceState> deviceStateList = Db.use().query(sql, DeviceState.class); |
|
|
|
System.out.println("已获取到设备最新状态数据:" + deviceStateList.size()); |
|
|
|
Map<Long, DeviceState> allMachineMap = new HashMap<>(); |
|
|
|
if (CollUtil.isNotEmpty(deviceStateList)) { |
|
|
|
allMachineMap = deviceStateList.stream() |
|
|
|
@ -124,7 +169,7 @@ public class IotMachineEventGeneratorJob { |
|
|
|
sinkRabbitMq(commandDataStream); |
|
|
|
|
|
|
|
// 写入es |
|
|
|
sinkEs(dataStreamSource); |
|
|
|
sinkEs(commandDataStream1); |
|
|
|
|
|
|
|
env.execute("Kafka Job"); |
|
|
|
} |
|
|
|
@ -163,7 +208,7 @@ public class IotMachineEventGeneratorJob { |
|
|
|
})).name("commandDataStream to rabbitmq Sink"); |
|
|
|
} |
|
|
|
|
|
|
|
private static void sinkEs(DataStreamSource<MachineIotDataReceivedEvent> dataStream) { |
|
|
|
private static void sinkEs(DataStream<MachineIotDataReceivedEvent> dataStream) { |
|
|
|
|
|
|
|
List<HttpHost> httpHosts = new ArrayList<>(); |
|
|
|
httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST), |
|
|
|
@ -238,4 +283,22 @@ public class IotMachineEventGeneratorJob { |
|
|
|
out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private static void collDeviceStatusChange1(Collector<MachineIotDataReceivedEvent> out, DeviceState newState, DeviceState oldState, |
|
|
|
MachineIotDataReceivedEvent event) { |
|
|
|
|
|
|
|
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { |
|
|
|
// 设备开机 |
|
|
|
out.collect(event); |
|
|
|
} else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { |
|
|
|
// 设备关机 |
|
|
|
out.collect(event); |
|
|
|
} else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { |
|
|
|
// 设备开始待机 |
|
|
|
out.collect(event); |
|
|
|
} else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { |
|
|
|
// 设备开始工作 |
|
|
|
out.collect(event); |
|
|
|
} |
|
|
|
} |
|
|
|
} |