|
|
@ -1,7 +1,6 @@ |
|
|
package com.qniao.iot.device.power; |
|
|
package com.qniao.iot.device.power; |
|
|
|
|
|
|
|
|
import cn.hutool.core.bean.BeanUtil; |
|
|
import cn.hutool.core.bean.BeanUtil; |
|
|
import cn.hutool.core.util.NumberUtil; |
|
|
|
|
|
import cn.hutool.core.util.StrUtil; |
|
|
import cn.hutool.core.util.StrUtil; |
|
|
import cn.hutool.json.JSONUtil; |
|
|
import cn.hutool.json.JSONUtil; |
|
|
import com.qniao.iot.device.power.config.ApolloConfig; |
|
|
import com.qniao.iot.device.power.config.ApolloConfig; |
|
|
@ -9,12 +8,17 @@ import com.qniao.iot.device.power.constant.ConfigConstant; |
|
|
import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent; |
|
|
import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent; |
|
|
import com.qniao.iot.device.power.utils.SnowFlake; |
|
|
import com.qniao.iot.device.power.utils.SnowFlake; |
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; |
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; |
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqDeserializationSchema; |
|
|
|
|
|
|
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; |
|
|
import com.qniao.iot.rc.constant.MachinePwrStatusEnum; |
|
|
import com.qniao.iot.rc.constant.MachinePwrStatusEnum; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
import org.apache.flink.api.common.state.*; |
|
|
|
|
|
|
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
|
|
|
import org.apache.flink.api.common.functions.FilterFunction; |
|
|
|
|
|
import org.apache.flink.api.common.state.ValueState; |
|
|
|
|
|
import org.apache.flink.api.common.state.ValueStateDescriptor; |
|
|
import org.apache.flink.api.common.typeinfo.TypeInformation; |
|
|
import org.apache.flink.api.common.typeinfo.TypeInformation; |
|
|
import org.apache.flink.configuration.Configuration; |
|
|
import org.apache.flink.configuration.Configuration; |
|
|
|
|
|
import org.apache.flink.connector.kafka.source.KafkaSource; |
|
|
|
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|
|
import org.apache.flink.streaming.api.CheckpointingMode; |
|
|
import org.apache.flink.streaming.api.CheckpointingMode; |
|
|
import org.apache.flink.streaming.api.datastream.DataStreamSource; |
|
|
import org.apache.flink.streaming.api.datastream.DataStreamSource; |
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|
|
@ -22,19 +26,22 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; |
|
|
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; |
|
|
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; |
|
|
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; |
|
|
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; |
|
|
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; |
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; |
|
|
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; |
|
|
|
|
|
import org.apache.flink.util.Collector; |
|
|
import org.apache.flink.util.Collector; |
|
|
import org.apache.http.HttpHost; |
|
|
import org.apache.http.HttpHost; |
|
|
import org.apache.http.auth.AuthScope; |
|
|
import org.apache.http.auth.AuthScope; |
|
|
import org.apache.http.auth.UsernamePasswordCredentials; |
|
|
import org.apache.http.auth.UsernamePasswordCredentials; |
|
|
import org.apache.http.client.CredentialsProvider; |
|
|
import org.apache.http.client.CredentialsProvider; |
|
|
import org.apache.http.impl.client.BasicCredentialsProvider; |
|
|
import org.apache.http.impl.client.BasicCredentialsProvider; |
|
|
|
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig; |
|
|
|
|
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
|
|
import org.elasticsearch.action.admin.indices.alias.Alias; |
|
|
import org.elasticsearch.action.admin.indices.alias.Alias; |
|
|
import org.elasticsearch.action.index.IndexRequest; |
|
|
import org.elasticsearch.action.index.IndexRequest; |
|
|
import org.elasticsearch.action.search.SearchRequest; |
|
|
import org.elasticsearch.action.search.SearchRequest; |
|
|
import org.elasticsearch.action.search.SearchResponse; |
|
|
import org.elasticsearch.action.search.SearchResponse; |
|
|
import org.elasticsearch.client.*; |
|
|
|
|
|
|
|
|
import org.elasticsearch.client.RequestOptions; |
|
|
|
|
|
import org.elasticsearch.client.Requests; |
|
|
|
|
|
import org.elasticsearch.client.RestClient; |
|
|
|
|
|
import org.elasticsearch.client.RestHighLevelClient; |
|
|
import org.elasticsearch.client.indices.CreateIndexRequest; |
|
|
import org.elasticsearch.client.indices.CreateIndexRequest; |
|
|
import org.elasticsearch.client.indices.CreateIndexResponse; |
|
|
import org.elasticsearch.client.indices.CreateIndexResponse; |
|
|
import org.elasticsearch.client.indices.GetIndexRequest; |
|
|
import org.elasticsearch.client.indices.GetIndexRequest; |
|
|
@ -48,9 +55,11 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; |
|
|
import org.elasticsearch.search.sort.SortOrder; |
|
|
import org.elasticsearch.search.sort.SortOrder; |
|
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
import java.io.IOException; |
|
|
import java.time.*; |
|
|
|
|
|
|
|
|
import java.time.LocalDate; |
|
|
|
|
|
import java.time.LocalDateTime; |
|
|
|
|
|
import java.time.ZoneId; |
|
|
|
|
|
import java.time.ZoneOffset; |
|
|
import java.time.format.DateTimeFormatter; |
|
|
import java.time.format.DateTimeFormatter; |
|
|
import java.time.temporal.ChronoUnit; |
|
|
|
|
|
import java.util.ArrayList; |
|
|
import java.util.ArrayList; |
|
|
import java.util.List; |
|
|
import java.util.List; |
|
|
|
|
|
|
|
|
@ -73,7 +82,7 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
}) |
|
|
}) |
|
|
.setRequestConfigCallback(requestConfigBuilder -> { |
|
|
.setRequestConfigCallback(requestConfigBuilder -> { |
|
|
// 设置es连接超时时间 |
|
|
// 设置es连接超时时间 |
|
|
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT)); |
|
|
|
|
|
|
|
|
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); |
|
|
return requestConfigBuilder; |
|
|
return requestConfigBuilder; |
|
|
})); |
|
|
})); |
|
|
|
|
|
|
|
|
@ -88,19 +97,39 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|
|
// 获取设备数据源 |
|
|
// 获取设备数据源 |
|
|
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() |
|
|
|
|
|
.setHost("127.0.0.1") |
|
|
|
|
|
.setPort(5672) |
|
|
|
|
|
.setUserName("admin") |
|
|
|
|
|
.setPassword("admin") |
|
|
|
|
|
.setVirtualHost("datastream") |
|
|
|
|
|
|
|
|
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder() |
|
|
|
|
|
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
|
|
.setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) |
|
|
|
|
|
.setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) |
|
|
|
|
|
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) |
|
|
|
|
|
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") |
|
|
|
|
|
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) |
|
|
.build(); |
|
|
.build(); |
|
|
|
|
|
|
|
|
// 设备数据源转换 |
|
|
// 设备数据源转换 |
|
|
DataStreamSource<MachineIotDataReceivedEvent> streamSource = env.addSource(new RMQSource<>(connectionConfig, |
|
|
|
|
|
"iotDevicePowerOnAndOffDataEvent", true, new MachineIotDataReceivedEventRabbitMqDeserializationSchema())); |
|
|
|
|
|
|
|
|
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env |
|
|
|
|
|
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); |
|
|
|
|
|
|
|
|
|
|
|
// 数据过滤 |
|
|
|
|
|
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource |
|
|
|
|
|
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> { |
|
|
|
|
|
|
|
|
|
|
|
Long reportTime = value.getReportTime(); |
|
|
|
|
|
if(reportTime != null |
|
|
|
|
|
&& value.getDataSource() != null && value.getMachinePwrStat() != null) { |
|
|
|
|
|
String reportTimeStr = StrUtil.toString(reportTime); |
|
|
|
|
|
if(reportTimeStr.length() == 10) { |
|
|
|
|
|
// 机智云那边的设备可能是秒或毫秒 |
|
|
|
|
|
reportTime = reportTime * 1000; |
|
|
|
|
|
} |
|
|
|
|
|
long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); |
|
|
|
|
|
// 晚30分钟的数据就不要了 |
|
|
|
|
|
return nowTime - reportTime <= (30*60*1000); |
|
|
|
|
|
} |
|
|
|
|
|
return false; |
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> streamOperator = streamSource |
|
|
|
|
|
|
|
|
SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> outputStreamOperator = streamOperator |
|
|
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) |
|
|
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) |
|
|
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, IotDevicePowerOnAndOffDataEvent>() { |
|
|
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, IotDevicePowerOnAndOffDataEvent>() { |
|
|
|
|
|
|
|
|
@ -121,63 +150,73 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
Collector<IotDevicePowerOnAndOffDataEvent> out) throws Exception { |
|
|
Collector<IotDevicePowerOnAndOffDataEvent> out) throws Exception { |
|
|
|
|
|
|
|
|
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event); |
|
|
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event); |
|
|
Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); |
|
|
|
|
|
Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); |
|
|
|
|
|
Long accJobCount = event.getAccJobCount(); |
|
|
|
|
|
Long currJobCount = event.getCurrJobCount(); |
|
|
|
|
|
Integer dataSource = event.getDataSource(); |
|
|
|
|
|
Integer machinePwrStat = event.getMachinePwrStat(); |
|
|
|
|
|
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); |
|
|
|
|
|
powerOnAndOffDataEvent.setId(snowflake.nextId()); |
|
|
|
|
|
powerOnAndOffDataEvent.setDataSource(event.getDataSource()); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); |
|
|
|
|
|
powerOnAndOffDataEvent.setReportTime(event.getReportTime()); |
|
|
|
|
|
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime |
|
|
|
|
|
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); |
|
|
|
|
|
if(MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { |
|
|
|
|
|
if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { |
|
|
|
|
|
// 上次是关机,但是这次是开机,说明周期产能从新开始 |
|
|
|
|
|
if(dataSource == 1) { |
|
|
|
|
|
// 根云 |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(accJobCount - lastAccJobCount); |
|
|
|
|
|
}else { |
|
|
|
|
|
// 机智云 |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(currJobCount); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
}else { |
|
|
|
|
|
Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount(); |
|
|
|
|
|
// 直接累加 |
|
|
|
|
|
if(dataSource == 1) { |
|
|
|
|
|
// 根云 |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + (accJobCount - lastAccJobCount)); |
|
|
|
|
|
}else { |
|
|
|
|
|
// 机智云 |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
// 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 |
|
|
|
|
|
if(lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { |
|
|
|
|
|
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { |
|
|
|
|
|
// 只有开机时间不为空 |
|
|
|
|
|
if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); |
|
|
|
|
|
}else { |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); |
|
|
|
|
|
|
|
|
Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); |
|
|
|
|
|
Long reportTime = event.getReportTime(); |
|
|
|
|
|
if(reportTime > lastReportTime) { |
|
|
|
|
|
Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); |
|
|
|
|
|
Integer machinePwrStat = event.getMachinePwrStat(); |
|
|
|
|
|
Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat(); |
|
|
|
|
|
Integer machineWorkingStat = event.getMachineWorkingStat(); |
|
|
|
|
|
if(!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) |
|
|
|
|
|
|| (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { |
|
|
|
|
|
Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); |
|
|
|
|
|
Long accJobCount = event.getAccJobCount(); |
|
|
|
|
|
Long currJobCount = event.getCurrJobCount(); |
|
|
|
|
|
Integer dataSource = event.getDataSource(); |
|
|
|
|
|
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); |
|
|
|
|
|
powerOnAndOffDataEvent.setId(snowflake.nextId()); |
|
|
|
|
|
powerOnAndOffDataEvent.setDataSource(event.getDataSource()); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); |
|
|
|
|
|
powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount()); |
|
|
|
|
|
powerOnAndOffDataEvent.setReportTime(event.getReportTime()); |
|
|
|
|
|
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime |
|
|
|
|
|
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); |
|
|
|
|
|
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { |
|
|
|
|
|
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { |
|
|
|
|
|
// 上次是关机,但是这次是开机,说明周期产能从新开始 |
|
|
|
|
|
if (dataSource == 1) { |
|
|
|
|
|
// 根云 |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(accJobCount - lastAccJobCount); |
|
|
|
|
|
} else { |
|
|
|
|
|
// 机智云 |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(currJobCount); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} else { |
|
|
|
|
|
Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount(); |
|
|
|
|
|
// 直接累加 |
|
|
|
|
|
if (dataSource == 1) { |
|
|
|
|
|
// 根云 |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + (accJobCount - lastAccJobCount)); |
|
|
|
|
|
} else { |
|
|
|
|
|
// 机智云 |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); |
|
|
|
|
|
out.collect(powerOnAndOffDataEvent); |
|
|
|
|
|
} else { |
|
|
|
|
|
// // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 |
|
|
|
|
|
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { |
|
|
|
|
|
// 开机 |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); |
|
|
|
|
|
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); |
|
|
|
|
|
out.collect(powerOnAndOffDataEvent); |
|
|
|
|
|
|
|
|
// 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 |
|
|
|
|
|
if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { |
|
|
|
|
|
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { |
|
|
|
|
|
// 只有开机时间不为空 |
|
|
|
|
|
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); |
|
|
|
|
|
} else { |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); |
|
|
|
|
|
} |
|
|
|
|
|
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); |
|
|
|
|
|
out.collect(powerOnAndOffDataEvent); |
|
|
|
|
|
} else { |
|
|
|
|
|
// // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 |
|
|
|
|
|
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { |
|
|
|
|
|
// 开机 |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); |
|
|
|
|
|
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); |
|
|
|
|
|
out.collect(powerOnAndOffDataEvent); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
@ -186,7 +225,7 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineIotDataReceivedEvent event) throws IOException { |
|
|
private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineIotDataReceivedEvent event) throws IOException { |
|
|
|
|
|
|
|
|
IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); |
|
|
IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); |
|
|
if(iotDevicePowerOnAndOffDataEvent == null) { |
|
|
|
|
|
|
|
|
if (iotDevicePowerOnAndOffDataEvent == null) { |
|
|
iotDevicePowerOnAndOffDataEvent = getByEs(event); |
|
|
iotDevicePowerOnAndOffDataEvent = getByEs(event); |
|
|
} |
|
|
} |
|
|
return iotDevicePowerOnAndOffDataEvent; |
|
|
return iotDevicePowerOnAndOffDataEvent; |
|
|
@ -220,9 +259,10 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
MachineIotDataReceivedEvent deviceMonitoringData = getMachineIotDataReceivedEvent(event.getMachineIotMac()); |
|
|
MachineIotDataReceivedEvent deviceMonitoringData = getMachineIotDataReceivedEvent(event.getMachineIotMac()); |
|
|
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); |
|
|
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); |
|
|
powerOnAndOffDataEvent.setId(snowflake.nextId()); |
|
|
powerOnAndOffDataEvent.setId(snowflake.nextId()); |
|
|
if(deviceMonitoringData != null) { |
|
|
|
|
|
|
|
|
if (deviceMonitoringData != null) { |
|
|
powerOnAndOffDataEvent.setDataSource(deviceMonitoringData.getDataSource()); |
|
|
powerOnAndOffDataEvent.setDataSource(deviceMonitoringData.getDataSource()); |
|
|
powerOnAndOffDataEvent.setMachineIotMac(deviceMonitoringData.getMachineIotMac()); |
|
|
powerOnAndOffDataEvent.setMachineIotMac(deviceMonitoringData.getMachineIotMac()); |
|
|
|
|
|
powerOnAndOffDataEvent.setAccJobCount(deviceMonitoringData.getAccJobCount()); |
|
|
powerOnAndOffDataEvent.setCurrJobCount(0L); |
|
|
powerOnAndOffDataEvent.setCurrJobCount(0L); |
|
|
powerOnAndOffDataEvent.setCurrJobDuration(0L); |
|
|
powerOnAndOffDataEvent.setCurrJobDuration(0L); |
|
|
Integer machinePwrStat = deviceMonitoringData.getMachinePwrStat(); |
|
|
Integer machinePwrStat = deviceMonitoringData.getMachinePwrStat(); |
|
|
@ -237,9 +277,10 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); |
|
|
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); |
|
|
} |
|
|
} |
|
|
powerOnAndOffDataEvent.setReportTime(reportTime); |
|
|
powerOnAndOffDataEvent.setReportTime(reportTime); |
|
|
}else { |
|
|
|
|
|
|
|
|
} else { |
|
|
powerOnAndOffDataEvent.setDataSource(event.getDataSource()); |
|
|
powerOnAndOffDataEvent.setDataSource(event.getDataSource()); |
|
|
powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); |
|
|
powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); |
|
|
|
|
|
powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount()); |
|
|
powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); |
|
|
powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); |
|
|
powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); |
|
|
powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); |
|
|
Integer machinePwrStat = event.getMachinePwrStat(); |
|
|
Integer machinePwrStat = event.getMachinePwrStat(); |
|
|
@ -262,14 +303,14 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
|
|
|
|
|
private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac) { |
|
|
private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac) { |
|
|
|
|
|
|
|
|
try{ |
|
|
|
|
|
|
|
|
try { |
|
|
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); |
|
|
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); |
|
|
BoolQueryBuilder bool = new BoolQueryBuilder(); |
|
|
BoolQueryBuilder bool = new BoolQueryBuilder(); |
|
|
BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", machineIotMac)); |
|
|
BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", machineIotMac)); |
|
|
searchSourceBuilder.size(1); |
|
|
searchSourceBuilder.size(1); |
|
|
searchSourceBuilder.sort("reportTime", SortOrder.DESC); |
|
|
searchSourceBuilder.sort("reportTime", SortOrder.DESC); |
|
|
searchSourceBuilder.query(boolQueryBuilder); |
|
|
searchSourceBuilder.query(boolQueryBuilder); |
|
|
SearchRequest request = new SearchRequest("iot_device_monitoring_data"); |
|
|
|
|
|
|
|
|
SearchRequest request = new SearchRequest(ApolloConfig.getStr(ConfigConstant.DATA_ELASTICSEARCH_INDEX)); |
|
|
request.source(searchSourceBuilder); |
|
|
request.source(searchSourceBuilder); |
|
|
// 执行请求 |
|
|
// 执行请求 |
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); |
|
|
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); |
|
|
@ -281,16 +322,16 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
return JSONUtil.toBean(sourceAsString, MachineIotDataReceivedEvent.class); |
|
|
return JSONUtil.toBean(sourceAsString, MachineIotDataReceivedEvent.class); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
}catch (Exception e) { |
|
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
log.error("获取 machine_iot_data_received_event 索引数据异常"); |
|
|
log.error("获取 machine_iot_data_received_event 索引数据异常"); |
|
|
} |
|
|
} |
|
|
return null; |
|
|
return null; |
|
|
} |
|
|
} |
|
|
}).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); |
|
|
}).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); |
|
|
|
|
|
|
|
|
sinkEs(streamOperator); |
|
|
|
|
|
|
|
|
sinkEs(outputStreamOperator); |
|
|
|
|
|
|
|
|
env.execute("device_monitoring_data"); |
|
|
|
|
|
|
|
|
env.execute("iot_device_power_on_and_off_data"); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private static void sinkEs(SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> dataStream) { |
|
|
private static void sinkEs(SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> dataStream) { |
|
|
@ -370,45 +411,45 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
CreateIndexRequest request = new CreateIndexRequest(indicesName); |
|
|
CreateIndexRequest request = new CreateIndexRequest(indicesName); |
|
|
// 字段映射 |
|
|
// 字段映射 |
|
|
String mappersStr = "{\n" + |
|
|
String mappersStr = "{\n" + |
|
|
" \"properties\": {\n" + |
|
|
|
|
|
" \"accJobCount\": {\n" + |
|
|
|
|
|
" \"type\": \"long\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"accJobCountDuration\": {\n" + |
|
|
|
|
|
" \"type\": \"long\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"currDuration\": {\n" + |
|
|
|
|
|
" \"type\": \"long\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"currJobCount\": {\n" + |
|
|
|
|
|
" \"type\": \"long\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"currJobDuration\": {\n" + |
|
|
|
|
|
" \"type\": \"long\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"dataSource\": {\n" + |
|
|
|
|
|
" \"type\": \"integer\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"lastBootTime\": {\n" + |
|
|
|
|
|
" \"type\": \"date\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"machineIotMac\": {\n" + |
|
|
|
|
|
" \"type\": \"keyword\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"machinePwrStat\": {\n" + |
|
|
|
|
|
" \"type\": \"integer\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"machineWorkingStat\": {\n" + |
|
|
|
|
|
" \"type\": \"integer\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"receivedTime\": {\n" + |
|
|
|
|
|
" \"type\": \"date\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"reportTime\": {\n" + |
|
|
|
|
|
" \"type\": \"date\"\n" + |
|
|
|
|
|
|
|
|
" \"properties\": {\n" + |
|
|
|
|
|
" \"machineWorkingStat\": {\n" + |
|
|
|
|
|
" \"type\": \"integer\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"machineIotMac\": {\n" + |
|
|
|
|
|
" \"type\": \"keyword\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"machinePwrStat\": {\n" + |
|
|
|
|
|
" \"type\": \"integer\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"machinePowerOnTime\": {\n" + |
|
|
|
|
|
" \"type\": \"date\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"machinePowerOffTime\": {\n" + |
|
|
|
|
|
" \"type\": \"date\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"currJobDuration\": {\n" + |
|
|
|
|
|
" \"type\": \"long\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"currJobCount\": {\n" + |
|
|
|
|
|
" \"type\": \"long\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"receivedTime\": {\n" + |
|
|
|
|
|
" \"type\": \"date\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"id\": {\n" + |
|
|
|
|
|
" \"type\": \"long\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"accJobCount\": {\n" + |
|
|
|
|
|
" \"type\": \"long\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"dataSource\": {\n" + |
|
|
|
|
|
" \"type\": \"integer\"\n" + |
|
|
|
|
|
" },\n" + |
|
|
|
|
|
" \"reportTime\": {\n" + |
|
|
|
|
|
" \"type\": \"date\"\n" + |
|
|
|
|
|
" }\n" + |
|
|
" }\n" + |
|
|
" }\n" + |
|
|
" }\n" + |
|
|
|
|
|
"}"; |
|
|
|
|
|
|
|
|
" }"; |
|
|
request.mapping(mappersStr, XContentType.JSON); |
|
|
request.mapping(mappersStr, XContentType.JSON); |
|
|
// 设置索引别名 |
|
|
// 设置索引别名 |
|
|
request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX))); |
|
|
request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX))); |
|
|
|