|
|
|
@ -125,12 +125,8 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
System.out.println("收到事件数据-------------------------:" + event); |
|
|
|
// 获取最新设备状态 |
|
|
|
DeviceState lastedDeviceState = deviceState.value(); |
|
|
|
|
|
|
|
Integer deviceStatus = getDeviceStatus(event); |
|
|
|
|
|
|
|
/*DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); |
|
|
|
collDeviceStatusChange(out, newState, null, event);*/ |
|
|
|
|
|
|
|
if (deviceStatus == null) { |
|
|
|
out.collect(null); |
|
|
|
} else { |
|
|
|
@ -179,7 +175,6 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
} |
|
|
|
//创建es 请求 |
|
|
|
IndexRequest indexRequest = Requests.indexRequest().index("flink").source(map); |
|
|
|
//用 requestIndexer 发送最后的请求 |
|
|
|
requestIndexer.add(indexRequest); |
|
|
|
} |
|
|
|
); |
|
|
|
@ -189,7 +184,7 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
esSinkBuilder.setBulkFlushMaxSizeMb(5); |
|
|
|
//论缓冲操作的数量或大小如何都要刷新的时间间隔 |
|
|
|
esSinkBuilder.setBulkFlushInterval(5000L); |
|
|
|
|
|
|
|
// 客户端创建配置回调,配置账号密码 |
|
|
|
esSinkBuilder.setRestClientFactory( |
|
|
|
restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|
|
|
@ -228,17 +223,11 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
public String computeExchange(BaseCommand command) { |
|
|
|
|
|
|
|
// 交换机名称 |
|
|
|
/*if(command != null) { |
|
|
|
System.out.println("发送消息:------------------" + command.toString()); |
|
|
|
return "flink_test_exchange"; |
|
|
|
} |
|
|
|
return "";*/ |
|
|
|
System.out.println("发送消息:------------------" + command.toString()); |
|
|
|
return "flink_test_exchange"; |
|
|
|
} |
|
|
|
})).name("PowerOffMachineCommand Sink"); |
|
|
|
|
|
|
|
|
|
|
|
// 直接发队列 |
|
|
|
// commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12"); |
|
|
|
} |
|
|
|
|