diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 9da6f81..b2b47f5 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -125,12 +125,8 @@ public class RootCloudIotDataFormatterJob { System.out.println("收到事件数据-------------------------:" + event); // 获取最新设备状态 DeviceState lastedDeviceState = deviceState.value(); - Integer deviceStatus = getDeviceStatus(event); - /*DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); - collDeviceStatusChange(out, newState, null, event);*/ - if (deviceStatus == null) { out.collect(null); } else { @@ -179,7 +175,6 @@ public class RootCloudIotDataFormatterJob { } //创建es 请求 IndexRequest indexRequest = Requests.indexRequest().index("flink").source(map); - //用 requestIndexer 发送最后的请求 requestIndexer.add(indexRequest); } ); @@ -189,7 +184,7 @@ public class RootCloudIotDataFormatterJob { esSinkBuilder.setBulkFlushMaxSizeMb(5); //论缓冲操作的数量或大小如何都要刷新的时间间隔 esSinkBuilder.setBulkFlushInterval(5000L); - + // 客户端创建配置回调,配置账号密码 esSinkBuilder.setRestClientFactory( restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); @@ -228,17 +223,11 @@ public class RootCloudIotDataFormatterJob { public String computeExchange(BaseCommand command) { // 交换机名称 - /*if(command != null) { - System.out.println("发送消息:------------------" + command.toString()); - return "flink_test_exchange"; - } - return "";*/ System.out.println("发送消息:------------------" + command.toString()); return "flink_test_exchange"; } })).name("PowerOffMachineCommand Sink"); - // 直接发队列 // commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12"); }