Browse Source

feat;清洗根云数据格式

hph_优化版本
lizhongkang@qniao.cn 3 years ago
parent
commit
6f0156a20a
4 changed files with 162 additions and 1 deletions
  1. 27
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  2. 17
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java
  3. 98
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java
  4. 21
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java

27
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -18,15 +18,21 @@
package com.qniao.iot.rc;
import com.qniao.iot.rc.event.MachineIotDataReceivedEvent;
import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema;
import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
@ -57,6 +63,7 @@ public class RootCloudIotDataFormatterJob {
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema())
.build();
// 发送到OSS存储
DataStream<RootCloudIotDataReceiptedEvent> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
String outputPath = "oss://qn-flink-test/test";
StreamingFileSink<RootCloudIotDataReceiptedEvent> sink = StreamingFileSink.forRowFormat(
@ -65,6 +72,24 @@ public class RootCloudIotDataFormatterJob {
).build();
ds.addSink(sink);
// 把树根的数据转成我们自己的格式
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = ds
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) MachineIotDataReceivedEvent::transform)
.name("Transform MachineIotDataReceivedEvent");
// 转换后的格式发送到kafka
transformDs.sinkTo(
KafkaSink.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("machine_iot_data_received_event")
.setValueSerializationSchema(new MachineIotDataReceivedEventSerializationSchema())
.build()
).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
).name("MachineIotDataReceivedEvent Sink");
env.execute("Kafka Job");
}
}

17
root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java

@ -0,0 +1,17 @@
package com.qniao.iot.rc.constant;
/**
* @author Lzk
* @date 2022/7/2
**/
public interface DataSource {
/**
* 树根云
*/
Integer ROOT_CLOUD = 1;
/**
* 机智云
*/
Integer TACT_CLOUD = 0;
}

98
root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java

@ -0,0 +1,98 @@
package com.qniao.iot.rc.event;
import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent;
import com.qniao.iot.rc.constant.DataSource;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.Objects;
/**
* @author Lzk
* @date 2022/7/2
**/
@Data
public class MachineIotDataReceivedEvent implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 唯一标识
*/
private Long id;
/**
* 数据来源
*/
private Integer dataSource;
/**
* 设备物联地址(云盒物理标识)
*/
private Long machineIotMac;
/**
* 机器电源状态
*/
private Integer machinePwrStat;
/**
* 机器工作状态
*/
private Integer machineWorkingStat;
/**
* 累加作业总数
*/
private Long accJobCount;
/**
* 当前作业计数
*/
private Long currJobCount;
/**
* 当前作业时长
*/
private Long currJobDuration;
/**
* 当前待机时长
*/
private Long currWaitingDuration;
/**
* 当前停机时长
*/
private Long currStoppingDuration;
/**
* 计数开关状态
*/
private Integer igStat;
/**
* 数据采样时间
*/
private Long reportTime;
public static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) {
MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent();
if (Objects.nonNull(event)) {
machineIotDataReceivedEvent.setId((long) (event.get__assetId__() + System.currentTimeMillis()).hashCode());
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__()));
machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD);
machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta());
machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta());
machineIotDataReceivedEvent.setIgStat(event.getIG_sta());
machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total());
machineIotDataReceivedEvent.setCurrJobCount(event.getACC_count());
machineIotDataReceivedEvent.setCurrJobDuration(Objects.isNull(event.getRunning_duration()) ? null : event.getRunning_duration().longValue());
machineIotDataReceivedEvent.setCurrStoppingDuration(StringUtils.isBlank(event.getStoping_duration()) ? null : Long.valueOf(event.getStoping_duration()));
machineIotDataReceivedEvent.setCurrWaitingDuration(Objects.isNull(event.getWaiting_duration()) ? null : event.getWaiting_duration().longValue());
machineIotDataReceivedEvent.setReportTime(System.currentTimeMillis());
}
return machineIotDataReceivedEvent;
}
}

21
root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java

@ -0,0 +1,21 @@
package com.qniao.iot.rc.event;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author Lzk
*/
public class MachineIotDataReceivedEventSerializationSchema implements SerializationSchema<MachineIotDataReceivedEvent> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public byte[] serialize(MachineIotDataReceivedEvent event) {
try {
return OBJECT_MAPPER.writeValueAsBytes(event);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + event, e);
}
}
}
Loading…
Cancel
Save