diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 08a0221..d20b001 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,15 +18,21 @@ package com.qniao.iot.rc; +import com.qniao.iot.rc.event.MachineIotDataReceivedEvent; +import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema; import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; -import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; @@ -57,6 +63,7 @@ public class RootCloudIotDataFormatterJob { .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); + // 发送到OSS存储 DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); String outputPath = "oss://qn-flink-test/test"; StreamingFileSink sink = StreamingFileSink.forRowFormat( @@ -65,6 +72,24 @@ public class RootCloudIotDataFormatterJob { ).build(); ds.addSink(sink); + // 把树根的数据转成我们自己的格式 + SingleOutputStreamOperator transformDs = ds + .map((MapFunction) MachineIotDataReceivedEvent::transform) + .name("Transform MachineIotDataReceivedEvent"); + + // 转换后的格式发送到kafka + transformDs.sinkTo( + KafkaSink.builder() + .setBootstrapServers("kafka:9092") + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic("machine_iot_data_received_event") + .setValueSerializationSchema(new MachineIotDataReceivedEventSerializationSchema()) + .build() + ).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build() + ).name("MachineIotDataReceivedEvent Sink"); + env.execute("Kafka Job"); } } diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java new file mode 100644 index 0000000..0478908 --- /dev/null +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java @@ -0,0 +1,17 @@ +package com.qniao.iot.rc.constant; + +/** + * @author Lzk + * @date 2022/7/2 + **/ + +public interface DataSource { + /** + * 树根云 + */ + Integer ROOT_CLOUD = 1; + /** + * 机智云 + */ + Integer TACT_CLOUD = 0; +} diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java new file mode 100644 index 0000000..60b5e20 --- /dev/null +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java @@ -0,0 +1,98 @@ +package com.qniao.iot.rc.event; + +import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent; +import com.qniao.iot.rc.constant.DataSource; +import lombok.Data; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.util.Objects; + +/** + * @author Lzk + * @date 2022/7/2 + **/ +@Data +public class MachineIotDataReceivedEvent implements Serializable { + + private static final long serialVersionUID = 1L; + /** + * 唯一标识 + */ + private Long id; + + /** + * 数据来源 + */ + private Integer dataSource; + + /** + * 设备物联地址(云盒物理标识) + */ + private Long machineIotMac; + + /** + * 机器电源状态 + */ + private Integer machinePwrStat; + + /** + * 机器工作状态 + */ + private Integer machineWorkingStat; + + /** + * 累加作业总数 + */ + private Long accJobCount; + + /** + * 当前作业计数 + */ + private Long currJobCount; + + /** + * 当前作业时长 + */ + private Long currJobDuration; + + /** + * 当前待机时长 + */ + private Long currWaitingDuration; + + /** + * 当前停机时长 + */ + private Long currStoppingDuration; + + /** + * 计数开关状态 + */ + private Integer igStat; + + /** + * 数据采样时间 + */ + private Long reportTime; + + public static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { + + MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); + if (Objects.nonNull(event)) { + machineIotDataReceivedEvent.setId((long) (event.get__assetId__() + System.currentTimeMillis()).hashCode()); + machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__())); + machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); + machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta()); + machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta()); + machineIotDataReceivedEvent.setIgStat(event.getIG_sta()); + machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total()); + machineIotDataReceivedEvent.setCurrJobCount(event.getACC_count()); + machineIotDataReceivedEvent.setCurrJobDuration(Objects.isNull(event.getRunning_duration()) ? null : event.getRunning_duration().longValue()); + machineIotDataReceivedEvent.setCurrStoppingDuration(StringUtils.isBlank(event.getStoping_duration()) ? null : Long.valueOf(event.getStoping_duration())); + machineIotDataReceivedEvent.setCurrWaitingDuration(Objects.isNull(event.getWaiting_duration()) ? null : event.getWaiting_duration().longValue()); + machineIotDataReceivedEvent.setReportTime(System.currentTimeMillis()); + } + return machineIotDataReceivedEvent; + } +} diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java new file mode 100644 index 0000000..46f9464 --- /dev/null +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java @@ -0,0 +1,21 @@ +package com.qniao.iot.rc.event; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +/** + * @author Lzk + */ +public class MachineIotDataReceivedEventSerializationSchema implements SerializationSchema { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public byte[] serialize(MachineIotDataReceivedEvent event) { + try { + return OBJECT_MAPPER.writeValueAsBytes(event); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + event, e); + } + } +}