Browse Source

更新

hph-优化版本
1049970895@qniao.cn 3 years ago
parent
commit
b762db6d66
3 changed files with 67 additions and 134 deletions
  1. 6
      iot-device-power-on-and-off-data-job/pom.xml
  2. 174
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java
  3. 21
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java

6
iot-device-power-on-and-off-data-job/pom.xml

@ -143,6 +143,12 @@
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-command</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

174
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java

@ -7,8 +7,10 @@ import com.qniao.iot.device.power.config.ApolloConfig;
import com.qniao.iot.device.power.constant.ConfigConstant; import com.qniao.iot.device.power.constant.ConfigConstant;
import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent; import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent;
import com.qniao.iot.device.power.utils.SnowFlake; import com.qniao.iot.device.power.utils.SnowFlake;
import com.qniao.iot.machine.command.MachineOutputCommand;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema;
import com.qniao.iot.rc.constant.MachinePwrStatusEnum; import com.qniao.iot.rc.constant.MachinePwrStatusEnum;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@ -22,12 +24,15 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope; import org.apache.http.auth.AuthScope;
@ -99,41 +104,22 @@ public class IotDevicePowerOnAndOffDataJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源 // 获取设备数据源
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_HOST))
.setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_PORT))
.setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME))
.setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD))
.setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST))
.build(); .build();
// 设备数据源转换 // 设备数据源转换
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
final DataStream<MachineOutputCommand> dataStreamSource = env
.addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE),
false, new MachineOutputCommandDeserializationSchema())).setParallelism(1);
// 数据过滤
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> {
Long reportTime = value.getReportTime();
if (reportTime != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null) {
String reportTimeStr = StrUtil.toString(reportTime);
if (reportTimeStr.length() == 10) {
// 机智云那边的设备可能是秒或毫秒
reportTime = reportTime * 1000;
}
long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// 晚30分钟的数据就不要了
return nowTime - reportTime <= (30 * 60 * 1000);
}
return false;
});
SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> outputStreamOperator = streamOperator
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, IotDevicePowerOnAndOffDataEvent>() {
SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> outputStreamOperator = dataStreamSource
.keyBy(MachineOutputCommand::getMac)
.process(new KeyedProcessFunction<Long, MachineOutputCommand, IotDevicePowerOnAndOffDataEvent>() {
private ValueState<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValueState; private ValueState<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValueState;
@ -145,7 +131,8 @@ public class IotDevicePowerOnAndOffDataJob {
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build(); .build();
ValueStateDescriptor<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValue = new ValueStateDescriptor<>("powerOnAndOffDataEventValue",
ValueStateDescriptor<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValue
= new ValueStateDescriptor<>("powerOnAndOffDataEventValue",
TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class));
// 设置状态值的过期时间为了解决机器关机没有消息的情况 // 设置状态值的过期时间为了解决机器关机没有消息的情况
powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig);
@ -156,34 +143,33 @@ public class IotDevicePowerOnAndOffDataJob {
} }
@Override @Override
public void processElement(MachineIotDataReceivedEvent event,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, IotDevicePowerOnAndOffDataEvent>.Context ctx,
public void processElement(MachineOutputCommand command,
KeyedProcessFunction<Long, MachineOutputCommand, IotDevicePowerOnAndOffDataEvent>.Context ctx,
Collector<IotDevicePowerOnAndOffDataEvent> out) throws Exception { Collector<IotDevicePowerOnAndOffDataEvent> out) throws Exception {
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event);
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command);
Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime();
Long reportTime = event.getReportTime();
Long reportTime = command.getTimestamp();
if (reportTime > lastReportTime) { if (reportTime > lastReportTime) {
Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat();
Integer machinePwrStat = event.getMachinePwrStat();
Integer machinePwrStat = command.getMachinePwrStat();
Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat(); Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat();
Integer machineWorkingStat = event.getMachineWorkingStat();
Integer machineWorkingStat = command.getMachineWorkingStat();
if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0)
|| (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { || (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) {
Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount();
Long accJobCount = event.getAccJobCount();
Long currJobCount = event.getCurrJobCount();
Integer dataSource = event.getDataSource();
Long accJobCount = command.getCurrTotalOutput();
Long currJobCount = command.getCurrCount();
Integer dataSource = command.getDataSource();
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId()); powerOnAndOffDataEvent.setId(snowflake.nextId());
powerOnAndOffDataEvent.setDataSource(event.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac());
powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat());
powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount());
powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration());
powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount());
powerOnAndOffDataEvent.setReportTime(event.getReportTime());
powerOnAndOffDataEvent.setDataSource(command.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(command.getMac());
powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat());
powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration());
powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput());
powerOnAndOffDataEvent.setReportTime(command.getTimestamp());
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) {
@ -213,7 +199,7 @@ public class IotDevicePowerOnAndOffDataJob {
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) {
// 只有开机时间不为空 // 只有开机时间不为空
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) { if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) {
powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime());
powerOnAndOffDataEvent.setMachinePowerOffTime(command.getTimestamp());
} }
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
@ -222,7 +208,7 @@ public class IotDevicePowerOnAndOffDataJob {
// // 开机和关机时间都不为空说明上一个生产周期已经过了那么当前设备的电源状态必须要是开机状态 // // 开机和关机时间都不为空说明上一个生产周期已经过了那么当前设备的电源状态必须要是开机状态
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
// 开机 // 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
powerOnAndOffDataEvent.setMachinePowerOnTime(command.getTimestamp());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent); out.collect(powerOnAndOffDataEvent);
} }
@ -232,20 +218,20 @@ public class IotDevicePowerOnAndOffDataJob {
} }
} }
private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineIotDataReceivedEvent event) throws IOException {
private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException {
IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value();
if (iotDevicePowerOnAndOffDataEvent == null) { if (iotDevicePowerOnAndOffDataEvent == null) {
iotDevicePowerOnAndOffDataEvent = getByEs(event);
iotDevicePowerOnAndOffDataEvent = getByEs(command);
} }
return iotDevicePowerOnAndOffDataEvent; return iotDevicePowerOnAndOffDataEvent;
} }
private IotDevicePowerOnAndOffDataEvent getByEs(MachineIotDataReceivedEvent event) throws IOException {
private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询 // 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", event.getMachineIotMac()));
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac()));
searchSourceBuilder.sort("receivedTime", SortOrder.DESC); searchSourceBuilder.sort("receivedTime", SortOrder.DESC);
searchSourceBuilder.size(1); searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中 // 创建查询请求对象将查询对象配置到其中
@ -265,76 +251,28 @@ public class IotDevicePowerOnAndOffDataJob {
return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class);
} }
} }
// 如果没有就找清洗后的数据
MachineIotDataReceivedEvent deviceMonitoringData = getMachineIotDataReceivedEvent(event.getMachineIotMac());
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId()); powerOnAndOffDataEvent.setId(snowflake.nextId());
if (deviceMonitoringData != null) {
powerOnAndOffDataEvent.setDataSource(deviceMonitoringData.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(deviceMonitoringData.getMachineIotMac());
powerOnAndOffDataEvent.setAccJobCount(deviceMonitoringData.getAccJobCount());
powerOnAndOffDataEvent.setCurrJobCount(0L);
powerOnAndOffDataEvent.setCurrJobDuration(0L);
Integer machinePwrStat = deviceMonitoringData.getMachinePwrStat();
powerOnAndOffDataEvent.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat());
Long reportTime = deviceMonitoringData.getReportTime();
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
if (machinePwrStat == 0) {
// 关机
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
}
powerOnAndOffDataEvent.setReportTime(reportTime);
} else {
powerOnAndOffDataEvent.setDataSource(event.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac());
powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount());
powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount());
powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration());
Integer machinePwrStat = event.getMachinePwrStat();
powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat());
Long reportTime = event.getReportTime();
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
if (machinePwrStat == 0) {
// 关机
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
}
powerOnAndOffDataEvent.setReportTime(reportTime);
powerOnAndOffDataEvent.setDataSource(command.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(command.getMac());
powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput());
powerOnAndOffDataEvent.setCurrJobCount(command.getCurrCount());
powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration());
Integer machinePwrStat = command.getMachinePwrStat();
powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat);
powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat());
Long reportTime = command.getTimestamp();
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
if (machinePwrStat == 0) {
// 关机
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
} }
powerOnAndOffDataEvent.setReportTime(reportTime);
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
return powerOnAndOffDataEvent; return powerOnAndOffDataEvent;
} }
private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac) {
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder bool = new BoolQueryBuilder();
BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", machineIotMac));
searchSourceBuilder.size(1);
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.query(boolQueryBuilder);
SearchRequest request = new SearchRequest(ApolloConfig.getStr(ConfigConstant.DATA_ELASTICSEARCH_INDEX));
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
if (RestStatus.OK.equals(response.status())) {
SearchHit[] hits = response.getHits().getHits();
if (hits.length > 0) {
SearchHit hit = hits[0];
String sourceAsString = hit.getSourceAsString();
return JSONUtil.toBean(sourceAsString, MachineIotDataReceivedEvent.class);
}
}
} catch (Exception e) {
log.error("获取 machine_iot_data_received_event 索引数据异常");
}
return null;
}
}).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); }).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac");
sinkEs(outputStreamOperator); sinkEs(outputStreamOperator);

21
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java

@ -1,14 +1,7 @@
package com.qniao.iot.device.power.constant; package com.qniao.iot.device.power.constant;
public interface ConfigConstant { public interface ConfigConstant {
String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers";
String SOURCE_KAFKA_TOPICS = "source.kafka.topics";
String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId";
String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host"; String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host";
String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post"; String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post";
@ -23,23 +16,19 @@ public interface ConfigConstant {
String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index"; String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index";
String ES_CONNECT_TIMEOUT = "es.connect.timeout";
String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id"; String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id";
String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id";
String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host"; String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host";
String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port";
String SOURCE_RABBITMQ_USER_NAME = "source.rabbitmq.userName";
String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username";
String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password"; String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password";
String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtualHost";
String DATA_ELASTICSEARCH_INDEX = "data.elasticsearch.index";
String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host";
String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue"; String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue";
String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port";
} }
Loading…
Cancel
Save