|
|
|
@ -128,8 +128,8 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
|
|
|
|
Integer deviceStatus = getDeviceStatus(event); |
|
|
|
|
|
|
|
DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); |
|
|
|
collDeviceStatusChange(out, newState, null, event); |
|
|
|
/*DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); |
|
|
|
collDeviceStatusChange(out, newState, null, event);*/ |
|
|
|
|
|
|
|
if (deviceStatus == null) { |
|
|
|
out.collect(null); |
|
|
|
@ -139,10 +139,10 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
deviceState.update(new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); |
|
|
|
System.out.println("初始化设备状态" + deviceState.value().toString()); |
|
|
|
} else { |
|
|
|
/*DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); |
|
|
|
DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); |
|
|
|
DeviceState oldState = deviceState.value(); |
|
|
|
collDeviceStatusChange(out, newState, oldState, event); |
|
|
|
deviceState.update(newState);*/ |
|
|
|
deviceState.update(newState); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -150,13 +150,19 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
|
|
|
|
|
|
|
|
// 写入rabbitmq |
|
|
|
// sinkRabbitMq(commandDataStream); |
|
|
|
sinkRabbitMq(commandDataStream); |
|
|
|
|
|
|
|
// 写入es |
|
|
|
sinkEs(commandDataStream); |
|
|
|
|
|
|
|
env.execute("Kafka Job"); |
|
|
|
} |
|
|
|
|
|
|
|
private static void sinkEs(DataStream<BaseCommand> commandDataStream) { |
|
|
|
|
|
|
|
List<HttpHost> httpHosts = new ArrayList<>(); |
|
|
|
httpHosts.add(new HttpHost("119.23.41.137", 9200, "http")); |
|
|
|
ElasticsearchSink.Builder<BaseCommand> esSinkBuilder = new ElasticsearchSink.Builder<>( |
|
|
|
httpHosts, |
|
|
|
ElasticsearchSink.Builder<BaseCommand> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, |
|
|
|
(ElasticsearchSinkFunction<BaseCommand>) (command, runtimeContext, requestIndexer) -> { |
|
|
|
|
|
|
|
HashMap<String, String> map = new HashMap<>(); |
|
|
|
@ -164,14 +170,12 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand)command; |
|
|
|
map.put("id", powerOnMachineCommand.getId().toString()); |
|
|
|
map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString()); |
|
|
|
map.put("timestamp", powerOnMachineCommand.getTimestamp().toString()); |
|
|
|
} |
|
|
|
|
|
|
|
if(command instanceof PowerOffMachineCommand) { |
|
|
|
PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand)command; |
|
|
|
map.put("id", powerOffMachineCommand.getId().toString()); |
|
|
|
map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString()); |
|
|
|
map.put("timestamp", powerOffMachineCommand.getTimestamp().toString()); |
|
|
|
} |
|
|
|
//创建es 请求 |
|
|
|
IndexRequest indexRequest = Requests.indexRequest().index("flink").source(map); |
|
|
|
@ -195,8 +199,6 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
); |
|
|
|
//数据流添加sink |
|
|
|
commandDataStream.addSink(esSinkBuilder.build()).name("BaseCommand sink"); |
|
|
|
|
|
|
|
env.execute("Kafka Job"); |
|
|
|
} |
|
|
|
|
|
|
|
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) { |
|
|
|
|