diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 7bab372..9911716 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -31,6 +31,7 @@ import com.qniao.iot.rc.until.SnowFlake; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; @@ -113,11 +114,19 @@ public class RootCloudIotDataFormatterJob { .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); + + // 把树根的数据转成我们自己的格式 + //DataStreamSource streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source"); + // 把树根的数据转成我们自己的格式 - DataStreamSource streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source"); + SingleOutputStreamOperator transformDs = env + .fromSource(source, WatermarkStrategy.noWatermarks(), "root cloud iot data receipted event Source") + .map((MapFunction) RootCloudIotDataFormatterJob::transform) + .name("Transform MachineIotDataReceivedEvent"); + // 数据过滤 - SingleOutputStreamOperator streamOperator = streamSource + /*SingleOutputStreamOperator streamOperator = streamSource .filter(new RichFilterFunction() { @Override public boolean filter(RootCloudIotDataReceiptedEvent value) { @@ -138,10 +147,10 @@ public class RootCloudIotDataFormatterJob { } return false; } - }).name("machine iot data received event filter operator"); + }).name("machine iot data received event filter operator");*/ // 分组操作 - SingleOutputStreamOperator outputStreamOperator = streamOperator + /*SingleOutputStreamOperator outputStreamOperator = streamOperator .keyBy(RootCloudIotDataReceiptedEvent::get__assetId__) .process(new KeyedProcessFunction() { @@ -204,7 +213,7 @@ public class RootCloudIotDataFormatterJob { if (reportTime - lastReportTime <= 30 * 60 * 1000) { receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount()); // 单位是秒 - receivedEvent.setCurrJobDuration((reportTime - lastReportTime)/3600); + receivedEvent.setCurrJobDuration((reportTime - lastReportTime) / 3600); } } receivedEvent.setCurrWaitingDuration(0L); @@ -271,7 +280,7 @@ public class RootCloudIotDataFormatterJob { } return null; } - }).name("machine iot data received event keyBy"); + }).name("machine iot data received event keyBy");*/ Properties kafkaProducerConfig = new Properties(); @@ -281,7 +290,7 @@ public class RootCloudIotDataFormatterJob { "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); // 写入kafka - outputStreamOperator.sinkTo( + transformDs.sinkTo( KafkaSink.builder() .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) //.setKafkaProducerConfig(kafkaProducerConfig) @@ -341,7 +350,7 @@ public class RootCloudIotDataFormatterJob { .build()).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build(); - outputStreamOperator.map(new RichMapFunction() { + transformDs.map(new RichMapFunction() { @Override public String map(MachineIotDataReceivedEvent value) { return JSONUtil.toJsonStr(value); @@ -350,4 +359,38 @@ public class RootCloudIotDataFormatterJob { env.execute("root cloud iot data formatter job"); } + + private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { + + MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); + machineIotDataReceivedEvent.setId(snowflake.nextId()); + machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__())); + machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); + machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta()); + machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta()); + machineIotDataReceivedEvent.setIgStat(event.getIG_sta()); + machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total()); + // 当天的工作产能 + machineIotDataReceivedEvent.setJobCountOfTheDay(event.getACC_count()); + // 当天的工作时长 + machineIotDataReceivedEvent + .setJobDurationOfTheDay(event.getRunning_duration() == null ? 0L : event.getRunning_duration().longValue()); + String stopDuration = event.getStoping_duration(); + if (stopDuration != null) { + machineIotDataReceivedEvent.setCurrStoppingDuration(Long.parseLong(stopDuration)); + } + machineIotDataReceivedEvent + .setCurrWaitingDuration(event.getWaiting_duration() == null ? 0L : event.getWaiting_duration().longValue()); + machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); + Long timestamp = event.get__timestamp__(); + if (timestamp != null) { + String timestampStr = StrUtil.toString(timestamp); + if (timestampStr.length() == 10) { + // 秒转为毫秒 + timestampStr = timestampStr + "000"; + } + machineIotDataReceivedEvent.setReportTime(Long.parseLong(timestampStr)); + } + return machineIotDataReceivedEvent; + } }