Browse Source

代码完成,待测试

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
e1ed012067
3 changed files with 223 additions and 75 deletions
  1. 275
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  2. 9
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/config/ApolloConfig.java
  3. 14
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java

275
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -19,6 +19,7 @@
package com.qniao.iot.rc; package com.qniao.iot.rc;
import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.rc.config.ApolloConfig; import com.qniao.iot.rc.config.ApolloConfig;
@ -27,11 +28,16 @@ import com.qniao.iot.rc.constant.DataSource;
import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema; import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema;
import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema;
import com.qniao.iot.rc.until.SnowFlake; import com.qniao.iot.rc.until.SnowFlake;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
@ -41,48 +47,52 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SaslConfigs;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.security.Provider;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.*;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
/** /**
* Skeleton for a Flink DataStream Job.
*
* <p>For a tutorial how to write a Flink application, check the
* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
*
* @author Lzk * @author Lzk
*/ */
@Slf4j
public class RootCloudIotDataFormatterJob { public class RootCloudIotDataFormatterJob {
static SnowFlake snowflake = new SnowFlake( static SnowFlake snowflake = new SnowFlake(
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)),
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID))
Long.parseLong(ApolloConfig.getStr(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)),
Long.parseLong(ApolloConfig.getStr(ConfigConstant.SNOW_FLAKE_MACHINE_ID))
); );
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -91,9 +101,9 @@ public class RootCloudIotDataFormatterJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<RootCloudIotDataReceiptedEvent> source = KafkaSource.<RootCloudIotDataReceiptedEvent>builder() KafkaSource<RootCloudIotDataReceiptedEvent> source = KafkaSource.<RootCloudIotDataReceiptedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID))
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUPID))
/*.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") /*.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN")
.setProperty("sasl.jaas.config", .setProperty("sasl.jaas.config",
@ -104,32 +114,184 @@ public class RootCloudIotDataFormatterJob {
.build(); .build();
// 把树根的数据转成我们自己的格式 // 把树根的数据转成我们自己的格式
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source")
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) RootCloudIotDataFormatterJob::transform)
.name("Transform MachineIotDataReceivedEvent");
DataStreamSource<RootCloudIotDataReceiptedEvent> streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source");
// 数据过滤
SingleOutputStreamOperator<RootCloudIotDataReceiptedEvent> streamOperator = streamSource
.filter(new RichFilterFunction<RootCloudIotDataReceiptedEvent>() {
@Override
public boolean filter(RootCloudIotDataReceiptedEvent value) {
Long reportTime = value.get__timestamp__();
if (reportTime != null) {
String reportTimeStr = StrUtil.toString(reportTime);
if (reportTimeStr.length() == 10) {
value.set__timestamp__(reportTime * 1000);
}
}
return value.getWorking_sta() != null
&& value.get__assetId__() != null
&& value.getPWR_sta() != null && reportTime != null && value.getACC_count() != null;
}
}).name("machine iot data received event filter operator");
// 分组操作
SingleOutputStreamOperator<MachineIotDataReceivedEvent> outputStreamOperator = streamOperator
.keyBy(RootCloudIotDataReceiptedEvent::get__assetId__)
.process(new KeyedProcessFunction<String, RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>() {
private final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(
RestClient.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.ELASTICSEARCH_POST),
ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_SCHEME)))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_USER_NAME),
ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}).setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ELASTICSEARCH_CONNECT_TIMEOUT));
return requestConfigBuilder;
}));
private ValueState<MachineIotDataReceivedEvent> eventValueState;
@Override
public void open(Configuration parameters) {
eventValueState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("machineIotDataReceivedEventState",
TypeInformation.of(MachineIotDataReceivedEvent.class)));
}
@Override
public void processElement(RootCloudIotDataReceiptedEvent value,
KeyedProcessFunction<String, RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>.Context ctx,
Collector<MachineIotDataReceivedEvent> out) throws IOException {
// 数据清洗
Long machineIotMac = Long.valueOf(value.get__assetId__());
MachineIotDataReceivedEvent lastReceivedEvent = eventValueState.value();
if (lastReceivedEvent == null) {
lastReceivedEvent = getMachineIotDataReceivedEvent(machineIotMac, value);
}
Integer pwrSta = value.getPWR_sta();
Integer workingSta = value.getWorking_sta();
Long accCount = value.getACC_count();
Integer lastPwrStat = lastReceivedEvent.getMachinePwrStat();
Integer lastWorkingStat = lastReceivedEvent.getMachineWorkingStat();
MachineIotDataReceivedEvent receivedEvent = new MachineIotDataReceivedEvent();
receivedEvent.setId(snowflake.nextId());
receivedEvent.setDataSource(DataSource.ROOT_CLOUD);
receivedEvent.setMachineIotMac(machineIotMac);
receivedEvent.setMachinePwrStat(pwrSta);
if (pwrSta == 1 && accCount != null && accCount == 0) {
// 如果是开机状态并且没有产量就设置为待机
receivedEvent.setMachineWorkingStat(2);
} else {
receivedEvent.setMachineWorkingStat(workingSta);
}
receivedEvent.setAccJobCount(accCount);
if ((pwrSta == 1 && workingSta == 1)
|| (lastPwrStat == 1 && lastWorkingStat == 1)) {
// 只有当前是工作中或上次是工作中才进行计算
Long lastReportTime = lastReceivedEvent.getReportTime();
Long reportTime = value.get__timestamp__();
// 如果这次的消息和上次的消息相差半个小时那么不进行计算
if (reportTime - lastReportTime <= 30 * 60 * 1000) {
receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount());
receivedEvent.setCurrJobDuration(reportTime - lastReportTime);
}
}
receivedEvent.setCurrWaitingDuration(0L);
receivedEvent.setCurrStoppingDuration(0L);
receivedEvent.setIgStat(value.getIG_sta());
receivedEvent.setReportTime(value.get__timestamp__());
receivedEvent.setReceivedTime(System.currentTimeMillis());
eventValueState.update(receivedEvent);
out.collect(receivedEvent);
}
private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac,
RootCloudIotDataReceiptedEvent event) {
MachineIotDataReceivedEvent receivedEvent = getFromEs(machineIotMac);
if (receivedEvent == null) {
// 在es中没有查到说明是新机器
receivedEvent = new MachineIotDataReceivedEvent();
receivedEvent.setId(snowflake.nextId());
receivedEvent.setDataSource(DataSource.ROOT_CLOUD);
receivedEvent.setMachineIotMac(machineIotMac);
receivedEvent.setMachinePwrStat(event.getPWR_sta());
receivedEvent.setMachineWorkingStat(event.getWorking_sta());
receivedEvent.setAccJobCount(event.getACC_count());
receivedEvent.setCurrJobCount(0L);
receivedEvent.setCurrJobDuration(0L);
receivedEvent.setCurrWaitingDuration(0L);
receivedEvent.setCurrStoppingDuration(0L);
receivedEvent.setIgStat(event.getIG_sta());
receivedEvent.setReportTime(event.get__timestamp__());
receivedEvent.setReceivedTime(System.currentTimeMillis());
}
return receivedEvent;
}
private MachineIotDataReceivedEvent getFromEs(Long machineIotMac) {
try {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_INDEX));
searchRequest.source(searchSourceBuilder);
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_INDEX) + "_" + nowDate);
// 先判断索引是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) {
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class);
}
}
} catch (Exception e) {
log.error("获取es数据异常", e);
}
return null;
}
}).name("machine iot data received event keyBy");
Properties kafkaProducerConfig = new Properties(); Properties kafkaProducerConfig = new Properties();
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
kafkaProducerConfig.setProperty("sasl.jaas.config", kafkaProducerConfig.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
// 写入kafka // 写入kafka
transformDs.sinkTo(
outputStreamOperator.sinkTo(
KafkaSink.<MachineIotDataReceivedEvent>builder() KafkaSink.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS))
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS))
//.setKafkaProducerConfig(kafkaProducerConfig) //.setKafkaProducerConfig(kafkaProducerConfig)
.setRecordSerializer( .setRecordSerializer(
KafkaRecordSerializationSchema.builder() KafkaRecordSerializationSchema.builder()
.setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS))
.setTopic(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_TOPICS))
.setValueSerializationSchema(new MachineIotDataReceivedEventSerializationSchema()) .setValueSerializationSchema(new MachineIotDataReceivedEventSerializationSchema())
.build() .build()
).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) ).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build() .build()
).name("MachineIotDataReceivedEvent Sink");
).name("machine iot data received event Sink");
// 发送到OSS存储 // 发送到OSS存储
String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH);
String outputPath = ApolloConfig.getStr(ConfigConstant.SINK_OSS_PATH);
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
new Path(outputPath), new Path(outputPath),
new SimpleStringEncoder<String>(CharsetUtil.UTF_8) new SimpleStringEncoder<String>(CharsetUtil.UTF_8)
@ -148,9 +310,9 @@ public class RootCloudIotDataFormatterJob {
private String getDeviceStatusStr(Integer machineWorkingStat) { private String getDeviceStatusStr(Integer machineWorkingStat) {
if(machineWorkingStat == null) {
if (machineWorkingStat == null) {
return "deviceOff"; return "deviceOff";
}else {
} else {
if (machineWorkingStat == 1) { if (machineWorkingStat == 1) {
return "deviceWorking"; return "deviceWorking";
} else if (machineWorkingStat == 2) { } else if (machineWorkingStat == 2) {
@ -167,15 +329,15 @@ public class RootCloudIotDataFormatterJob {
} }
}).withRollingPolicy(DefaultRollingPolicy.builder() }).withRollingPolicy(DefaultRollingPolicy.builder()
// 每隔多长时间生成一个文件 // 每隔多长时间生成一个文件
.withRolloverInterval(Duration.ofHours(12))
// 默认60秒,未写入数据处于不活跃状态超时会滚动新文件
.withInactivityInterval(Duration.ofHours(12))
.withRolloverInterval(Duration.ofHours(6))
// 未写入数据处于不活跃状态超时会滚动新文件
.withInactivityInterval(Duration.ofHours(1))
// 设置每个文件的最大大小 ,默认是128M // 设置每个文件的最大大小 ,默认是128M
.withMaxPartSize(MemorySize.ofMebiBytes(128 * 1024 * 1024)) .withMaxPartSize(MemorySize.ofMebiBytes(128 * 1024 * 1024))
.build()).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build(); .build()).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build();
transformDs.map(new RichMapFunction<MachineIotDataReceivedEvent, String>() {
outputStreamOperator.map(new RichMapFunction<MachineIotDataReceivedEvent, String>() {
@Override @Override
public String map(MachineIotDataReceivedEvent value) { public String map(MachineIotDataReceivedEvent value) {
return JSONUtil.toJsonStr(value); return JSONUtil.toJsonStr(value);
@ -184,37 +346,4 @@ public class RootCloudIotDataFormatterJob {
env.execute("root cloud iot data formatter job"); env.execute("root cloud iot data formatter job");
} }
private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) {
MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent();
if (Objects.nonNull(event)) {
machineIotDataReceivedEvent.setId(snowflake.nextId());
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__()));
machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD);
Integer pwrSta = event.getPWR_sta();
machineIotDataReceivedEvent.setMachinePwrStat(pwrSta);
machineIotDataReceivedEvent.setIgStat(event.getIG_sta());
machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total());
Long accCount = event.getACC_count();
machineIotDataReceivedEvent.setCurrJobCount(accCount);
if(pwrSta != null && pwrSta == 1 && accCount != null && accCount == 0) {
// 如果是开机状态并且没有产量就设置为待机
machineIotDataReceivedEvent.setMachineWorkingStat(2);
}else {
machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta());
}
machineIotDataReceivedEvent.setCurrJobDuration(Objects.isNull(event.getRunning_duration()) ? null : event.getRunning_duration().longValue());
if (StringUtils.isNotBlank(event.getStoping_duration())) {
BigDecimal stoppingDuration = new BigDecimal(event.getStoping_duration());
machineIotDataReceivedEvent.setCurrStoppingDuration(stoppingDuration.longValue());
}
machineIotDataReceivedEvent.setCurrWaitingDuration(Objects.isNull(event.getWaiting_duration()) ? null : event.getWaiting_duration().longValue());
machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis());
machineIotDataReceivedEvent.setReportTime(event.get__timestamp__());
}
return machineIotDataReceivedEvent;
}
} }

9
root-cloud-statistics/src/main/java/com/qniao/iot/rc/config/ApolloConfig.java

@ -7,13 +7,18 @@ public class ApolloConfig {
private static final Config config = ConfigService.getAppConfig(); private static final Config config = ConfigService.getAppConfig();
public static String get(String key, String defaultValue) {
public static String getStr(String key, String defaultValue) {
return config.getProperty(key, defaultValue); return config.getProperty(key, defaultValue);
} }
public static String get(String key) {
public static String getStr(String key) {
return config.getProperty(key, null); return config.getProperty(key, null);
} }
public static Integer getInt(String key) {
return config.getIntProperty(key, null);
}
} }

14
root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java

@ -17,4 +17,18 @@ public interface ConfigConstant {
String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id";
String SINK_OSS_PATH = "sink.oss.path"; String SINK_OSS_PATH = "sink.oss.path";
String ELASTICSEARCH_INDEX = "elasticsearch.index";
String ELASTICSEARCH_HOST = "elasticsearch.host";
String ELASTICSEARCH_POST = "elasticsearch.post";
String ELASTICSEARCH_SCHEME = "elasticsearch.scheme";
String ELASTICSEARCH_USER_NAME = "elasticsearch.userName";
String ELASTICSEARCH_PASSWORD = "elasticsearch.password";
String ELASTICSEARCH_CONNECT_TIMEOUT = "elasticsearch.connect.timeout";
} }
Loading…
Cancel
Save