Browse Source

重构

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
3bf1157784
1 changed files with 51 additions and 8 deletions
  1. 59
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

59
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -31,6 +31,7 @@ import com.qniao.iot.rc.until.SnowFlake;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringEncoder;
@ -113,11 +114,19 @@ public class RootCloudIotDataFormatterJob {
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema())
.build(); .build();
// 把树根的数据转成我们自己的格式
//DataStreamSource<RootCloudIotDataReceiptedEvent> streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source");
// 把树根的数据转成我们自己的格式 // 把树根的数据转成我们自己的格式
DataStreamSource<RootCloudIotDataReceiptedEvent> streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source");
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "root cloud iot data receipted event Source")
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) RootCloudIotDataFormatterJob::transform)
.name("Transform MachineIotDataReceivedEvent");
// 数据过滤 // 数据过滤
SingleOutputStreamOperator<RootCloudIotDataReceiptedEvent> streamOperator = streamSource
/*SingleOutputStreamOperator<RootCloudIotDataReceiptedEvent> streamOperator = streamSource
.filter(new RichFilterFunction<RootCloudIotDataReceiptedEvent>() { .filter(new RichFilterFunction<RootCloudIotDataReceiptedEvent>() {
@Override @Override
public boolean filter(RootCloudIotDataReceiptedEvent value) { public boolean filter(RootCloudIotDataReceiptedEvent value) {
@ -138,10 +147,10 @@ public class RootCloudIotDataFormatterJob {
} }
return false; return false;
} }
}).name("machine iot data received event filter operator");
}).name("machine iot data received event filter operator");*/
// 分组操作 // 分组操作
SingleOutputStreamOperator<MachineIotDataReceivedEvent> outputStreamOperator = streamOperator
/*SingleOutputStreamOperator<MachineIotDataReceivedEvent> outputStreamOperator = streamOperator
.keyBy(RootCloudIotDataReceiptedEvent::get__assetId__) .keyBy(RootCloudIotDataReceiptedEvent::get__assetId__)
.process(new KeyedProcessFunction<String, RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>() { .process(new KeyedProcessFunction<String, RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>() {
@ -204,7 +213,7 @@ public class RootCloudIotDataFormatterJob {
if (reportTime - lastReportTime <= 30 * 60 * 1000) { if (reportTime - lastReportTime <= 30 * 60 * 1000) {
receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount()); receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount());
// 单位是秒 // 单位是秒
receivedEvent.setCurrJobDuration((reportTime - lastReportTime)/3600);
receivedEvent.setCurrJobDuration((reportTime - lastReportTime) / 3600);
} }
} }
receivedEvent.setCurrWaitingDuration(0L); receivedEvent.setCurrWaitingDuration(0L);
@ -271,7 +280,7 @@ public class RootCloudIotDataFormatterJob {
} }
return null; return null;
} }
}).name("machine iot data received event keyBy");
}).name("machine iot data received event keyBy");*/
Properties kafkaProducerConfig = new Properties(); Properties kafkaProducerConfig = new Properties();
@ -281,7 +290,7 @@ public class RootCloudIotDataFormatterJob {
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
// 写入kafka // 写入kafka
outputStreamOperator.sinkTo(
transformDs.sinkTo(
KafkaSink.<MachineIotDataReceivedEvent>builder() KafkaSink.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS))
//.setKafkaProducerConfig(kafkaProducerConfig) //.setKafkaProducerConfig(kafkaProducerConfig)
@ -341,7 +350,7 @@ public class RootCloudIotDataFormatterJob {
.build()).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build(); .build()).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build();
outputStreamOperator.map(new RichMapFunction<MachineIotDataReceivedEvent, String>() {
transformDs.map(new RichMapFunction<MachineIotDataReceivedEvent, String>() {
@Override @Override
public String map(MachineIotDataReceivedEvent value) { public String map(MachineIotDataReceivedEvent value) {
return JSONUtil.toJsonStr(value); return JSONUtil.toJsonStr(value);
@ -350,4 +359,38 @@ public class RootCloudIotDataFormatterJob {
env.execute("root cloud iot data formatter job"); env.execute("root cloud iot data formatter job");
} }
private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) {
MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent();
machineIotDataReceivedEvent.setId(snowflake.nextId());
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__()));
machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD);
machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta());
machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta());
machineIotDataReceivedEvent.setIgStat(event.getIG_sta());
machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total());
// 当天的工作产能
machineIotDataReceivedEvent.setJobCountOfTheDay(event.getACC_count());
// 当天的工作时长
machineIotDataReceivedEvent
.setJobDurationOfTheDay(event.getRunning_duration() == null ? 0L : event.getRunning_duration().longValue());
String stopDuration = event.getStoping_duration();
if (stopDuration != null) {
machineIotDataReceivedEvent.setCurrStoppingDuration(Long.parseLong(stopDuration));
}
machineIotDataReceivedEvent
.setCurrWaitingDuration(event.getWaiting_duration() == null ? 0L : event.getWaiting_duration().longValue());
machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis());
Long timestamp = event.get__timestamp__();
if (timestamp != null) {
String timestampStr = StrUtil.toString(timestamp);
if (timestampStr.length() == 10) {
// 秒转为毫秒
timestampStr = timestampStr + "000";
}
machineIotDataReceivedEvent.setReportTime(Long.parseLong(timestampStr));
}
return machineIotDataReceivedEvent;
}
} }
Loading…
Cancel
Save