From 04ee12dd9825c00f699ac7688437d53bb46bd03c Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 22 Aug 2022 19:01:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../power/IotDevicePowerOnAndOffDataJob.java | 281 ++++++++++-------- .../device/power/constant/ConfigConstant.java | 21 +- .../main/resources/META-INF/app.properties | 4 +- 3 files changed, 176 insertions(+), 130 deletions(-) diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index 968fd39..229e285 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -1,7 +1,6 @@ package com.qniao.iot.device.power; import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.qniao.iot.device.power.config.ApolloConfig; @@ -9,12 +8,17 @@ import com.qniao.iot.device.power.constant.ConfigConstant; import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent; import com.qniao.iot.device.power.utils.SnowFlake; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; -import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqDeserializationSchema; +import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import com.qniao.iot.rc.constant.MachinePwrStatusEnum; import lombok.extern.slf4j.Slf4j; -import org.apache.flink.api.common.state.*; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -22,19 +26,22 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; -import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; -import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.util.Collector; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.*; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; @@ -48,9 +55,11 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; -import java.time.*; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; @@ -73,7 +82,7 @@ public class IotDevicePowerOnAndOffDataJob { }) .setRequestConfigCallback(requestConfigBuilder -> { // 设置es连接超时时间 - requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT)); + requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); return requestConfigBuilder; })); @@ -88,19 +97,39 @@ public class IotDevicePowerOnAndOffDataJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); // 获取设备数据源 - RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() - .setHost("127.0.0.1") - .setPort(5672) - .setUserName("admin") - .setPassword("admin") - .setVirtualHost("datastream") + KafkaSource source = KafkaSource.builder() + .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) + .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) + .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) + .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") + .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build(); + // 设备数据源转换 - DataStreamSource streamSource = env.addSource(new RMQSource<>(connectionConfig, - "iotDevicePowerOnAndOffDataEvent", true, new MachineIotDataReceivedEventRabbitMqDeserializationSchema())); + DataStreamSource dataStreamSource = env + .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); + + // 数据过滤 + SingleOutputStreamOperator streamOperator = dataStreamSource + .filter((FilterFunction) value -> { + Long reportTime = value.getReportTime(); + if(reportTime != null + && value.getDataSource() != null && value.getMachinePwrStat() != null) { + String reportTimeStr = StrUtil.toString(reportTime); + if(reportTimeStr.length() == 10) { + // 机智云那边的设备可能是秒或毫秒 + reportTime = reportTime * 1000; + } + long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + // 晚30分钟的数据就不要了 + return nowTime - reportTime <= (30*60*1000); + } + return false; + }); - SingleOutputStreamOperator streamOperator = streamSource + SingleOutputStreamOperator outputStreamOperator = streamOperator .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { @@ -121,63 +150,73 @@ public class IotDevicePowerOnAndOffDataJob { Collector out) throws Exception { IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event); - Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); - Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); - Long accJobCount = event.getAccJobCount(); - Long currJobCount = event.getCurrJobCount(); - Integer dataSource = event.getDataSource(); - Integer machinePwrStat = event.getMachinePwrStat(); - IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); - powerOnAndOffDataEvent.setId(snowflake.nextId()); - powerOnAndOffDataEvent.setDataSource(event.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); - powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); - powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); - powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); - powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); - powerOnAndOffDataEvent.setReportTime(event.getReportTime()); - powerOnAndOffDataEvent.setReceivedTime(LocalDateTime - .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); - if(MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { - if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { - // 上次是关机,但是这次是开机,说明周期产能从新开始 - if(dataSource == 1) { - // 根云 - powerOnAndOffDataEvent.setCurrJobCount(accJobCount - lastAccJobCount); - }else { - // 机智云 - powerOnAndOffDataEvent.setCurrJobCount(currJobCount); - } - } - }else { - Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount(); - // 直接累加 - if(dataSource == 1) { - // 根云 - powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + (accJobCount - lastAccJobCount)); - }else { - // 机智云 - powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); - } - } - // 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 - if(lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { - if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { - // 只有开机时间不为空 - if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { - powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); - }else { - powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); + Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); + Long reportTime = event.getReportTime(); + if(reportTime > lastReportTime) { + Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); + Integer machinePwrStat = event.getMachinePwrStat(); + Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat(); + Integer machineWorkingStat = event.getMachineWorkingStat(); + if(!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) + || (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { + Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); + Long accJobCount = event.getAccJobCount(); + Long currJobCount = event.getCurrJobCount(); + Integer dataSource = event.getDataSource(); + IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); + powerOnAndOffDataEvent.setId(snowflake.nextId()); + powerOnAndOffDataEvent.setDataSource(event.getDataSource()); + powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); + powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); + powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); + powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); + powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); + powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount()); + powerOnAndOffDataEvent.setReportTime(event.getReportTime()); + powerOnAndOffDataEvent.setReceivedTime(LocalDateTime + .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); + if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { + if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { + // 上次是关机,但是这次是开机,说明周期产能从新开始 + if (dataSource == 1) { + // 根云 + powerOnAndOffDataEvent.setCurrJobCount(accJobCount - lastAccJobCount); + } else { + // 机智云 + powerOnAndOffDataEvent.setCurrJobCount(currJobCount); + } + } + } else { + Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount(); + // 直接累加 + if (dataSource == 1) { + // 根云 + powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + (accJobCount - lastAccJobCount)); + } else { + // 机智云 + powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); + } } - powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); - out.collect(powerOnAndOffDataEvent); - } else { - // // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 - if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); - powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); - out.collect(powerOnAndOffDataEvent); + // 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 + if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { + if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { + // 只有开机时间不为空 + if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { + powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); + } else { + powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); + } + powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); + out.collect(powerOnAndOffDataEvent); + } else { + // // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 + if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); + powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); + out.collect(powerOnAndOffDataEvent); + } + } } } } @@ -186,7 +225,7 @@ public class IotDevicePowerOnAndOffDataJob { private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineIotDataReceivedEvent event) throws IOException { IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); - if(iotDevicePowerOnAndOffDataEvent == null) { + if (iotDevicePowerOnAndOffDataEvent == null) { iotDevicePowerOnAndOffDataEvent = getByEs(event); } return iotDevicePowerOnAndOffDataEvent; @@ -220,9 +259,10 @@ public class IotDevicePowerOnAndOffDataJob { MachineIotDataReceivedEvent deviceMonitoringData = getMachineIotDataReceivedEvent(event.getMachineIotMac()); IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); powerOnAndOffDataEvent.setId(snowflake.nextId()); - if(deviceMonitoringData != null) { + if (deviceMonitoringData != null) { powerOnAndOffDataEvent.setDataSource(deviceMonitoringData.getDataSource()); powerOnAndOffDataEvent.setMachineIotMac(deviceMonitoringData.getMachineIotMac()); + powerOnAndOffDataEvent.setAccJobCount(deviceMonitoringData.getAccJobCount()); powerOnAndOffDataEvent.setCurrJobCount(0L); powerOnAndOffDataEvent.setCurrJobDuration(0L); Integer machinePwrStat = deviceMonitoringData.getMachinePwrStat(); @@ -237,9 +277,10 @@ public class IotDevicePowerOnAndOffDataJob { powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); } powerOnAndOffDataEvent.setReportTime(reportTime); - }else { + } else { powerOnAndOffDataEvent.setDataSource(event.getDataSource()); powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); + powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount()); powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); Integer machinePwrStat = event.getMachinePwrStat(); @@ -262,14 +303,14 @@ public class IotDevicePowerOnAndOffDataJob { private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac) { - try{ + try { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder bool = new BoolQueryBuilder(); BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", machineIotMac)); searchSourceBuilder.size(1); searchSourceBuilder.sort("reportTime", SortOrder.DESC); searchSourceBuilder.query(boolQueryBuilder); - SearchRequest request = new SearchRequest("iot_device_monitoring_data"); + SearchRequest request = new SearchRequest(ApolloConfig.getStr(ConfigConstant.DATA_ELASTICSEARCH_INDEX)); request.source(searchSourceBuilder); // 执行请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); @@ -281,16 +322,16 @@ public class IotDevicePowerOnAndOffDataJob { return JSONUtil.toBean(sourceAsString, MachineIotDataReceivedEvent.class); } } - }catch (Exception e) { + } catch (Exception e) { log.error("获取 machine_iot_data_received_event 索引数据异常"); } return null; } }).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); - sinkEs(streamOperator); + sinkEs(outputStreamOperator); - env.execute("device_monitoring_data"); + env.execute("iot_device_power_on_and_off_data"); } private static void sinkEs(SingleOutputStreamOperator dataStream) { @@ -370,45 +411,45 @@ public class IotDevicePowerOnAndOffDataJob { CreateIndexRequest request = new CreateIndexRequest(indicesName); // 字段映射 String mappersStr = "{\n" + - " \"properties\": {\n" + - " \"accJobCount\": {\n" + - " \"type\": \"long\"\n" + - " },\n" + - " \"accJobCountDuration\": {\n" + - " \"type\": \"long\"\n" + - " },\n" + - " \"currDuration\": {\n" + - " \"type\": \"long\"\n" + - " },\n" + - " \"currJobCount\": {\n" + - " \"type\": \"long\"\n" + - " },\n" + - " \"currJobDuration\": {\n" + - " \"type\": \"long\"\n" + - " },\n" + - " \"dataSource\": {\n" + - " \"type\": \"integer\"\n" + - " },\n" + - " \"lastBootTime\": {\n" + - " \"type\": \"date\"\n" + - " },\n" + - " \"machineIotMac\": {\n" + - " \"type\": \"keyword\"\n" + - " },\n" + - " \"machinePwrStat\": {\n" + - " \"type\": \"integer\"\n" + - " },\n" + - " \"machineWorkingStat\": {\n" + - " \"type\": \"integer\"\n" + - " },\n" + - " \"receivedTime\": {\n" + - " \"type\": \"date\"\n" + - " },\n" + - " \"reportTime\": {\n" + - " \"type\": \"date\"\n" + + " \"properties\": {\n" + + " \"machineWorkingStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"machineIotMac\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"machinePwrStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"machinePowerOnTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"machinePowerOffTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"currJobDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currJobCount\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"receivedTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"id\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"accJobCount\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"dataSource\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"reportTime\": {\n" + + " \"type\": \"date\"\n" + + " }\n" + " }\n" + - " }\n" + - "}"; + " }"; request.mapping(mappersStr, XContentType.JSON); // 设置索引别名 request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX))); diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java index 2679b21..fa1b96e 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java @@ -2,6 +2,7 @@ package com.qniao.iot.device.power.constant; public interface ConfigConstant { + String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers"; String SOURCE_KAFKA_TOPICS = "source.kafka.topics"; @@ -22,19 +23,23 @@ public interface ConfigConstant { String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index"; - String ES_HOST_NAME = "es.host.name"; + String ES_CONNECT_TIMEOUT = "es.connect.timeout"; - String ES_POST = "es.post"; + String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id"; - String ES_SCHEME = "es.scheme"; + String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; - String ES_USER_NAME = "es.user.name"; + String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host"; - String ES_PASSWORD = "es.password"; + String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port"; - String ES_CONNECT_TIMEOUT = "es.connect.timeout"; + String SOURCE_RABBITMQ_USER_NAME = "source.rabbitmq.userName"; - String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id"; + String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password"; - String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; + String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtualHost"; + + String DATA_ELASTICSEARCH_INDEX = "data.elasticsearch.index"; + + String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue"; } diff --git a/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties b/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties index d10864b..6a3c421 100644 --- a/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties +++ b/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties @@ -1,5 +1,5 @@ -app.id=iot-device-monitoring-data +app.id=iot-device-power-on-and-off-data # test 8.135.8.221 # prod 47.112.164.224 -apollo.meta=http://47.112.164.224:5000 \ No newline at end of file +apollo.meta=http://8.135.8.221:5000 \ No newline at end of file