diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index 68e8ce0..ce1e330 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -1,6 +1,5 @@ package com.qniao.iot.rc; -import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.RandomUtil; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -8,7 +7,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; -import javax.swing.plaf.ListUI; import java.math.BigDecimal; import java.util.Arrays; import java.util.List; diff --git a/root-cloud-statistics/pom.xml b/root-cloud-statistics/pom.xml index 723ac16..d9ec5fc 100644 --- a/root-cloud-statistics/pom.xml +++ b/root-cloud-statistics/pom.xml @@ -18,7 +18,6 @@ under the License. --> - com.qniao java-dependency @@ -140,7 +139,13 @@ under the License. mysql mysql-connector-java - 8.0.29 + + + + com.qniao + printing-packaing-factory-service-entity + 0.0.1-SNAPSHOT + compile diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/DeviceState.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/DeviceState.java index 80e2fff..74d4818 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/DeviceState.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/DeviceState.java @@ -9,7 +9,10 @@ import lombok.NoArgsConstructor; @NoArgsConstructor public class DeviceState { - private Long id; + /** + * 机器标识 + */ + private Long machineId; /** * 设备物联地址(云盒物理标识) @@ -29,7 +32,7 @@ public class DeviceState { @Override public String toString() { return "设备状态:{" + - "id='" + id + '\'' + + "machineId='" + machineId + '\'' + ", status='" + status + ", updateTime='" + updateTime + '\'' + diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 1798ee8..65e5a3f 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,43 +18,25 @@ package com.qniao.iot.rc; -import cn.hutool.db.Db; -import cn.hutool.db.Entity; -import com.fasterxml.jackson.databind.util.JSONPObject; -import com.qniao.domain.BaseCommand; -import com.qniao.iot.machine.command.PowerOffMachineCommand; -import com.qniao.iot.machine.command.PowerOnMachineCommand; -import com.qniao.iot.machine.command.StartMachineWorkingCommand; -import com.qniao.iot.machine.command.StopMachineWorkingCommand; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; -import com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob; -import com.qniao.iot.rc.command.BaseCommandSerializationSchema; import com.qniao.iot.rc.constant.DataSource; +import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema; import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.tools.json.JSONUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichFilterFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.util.Collector; import java.math.BigDecimal; -import java.sql.SQLException; -import java.util.*; +import java.util.Objects; /** * Skeleton for a Flink DataStream Job. @@ -92,59 +74,23 @@ public class RootCloudIotDataFormatterJob { .map((MapFunction) RootCloudIotDataFormatterJob::transform) .name("Transform MachineIotDataReceivedEvent"); - // 设备数据分组 - DataStream commandDataStream = transformDs.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) - .process(new KeyedProcessFunction() { - - private ValueState deviceState; - - @Override - public void open(Configuration parameters) throws SQLException { - // 必须在 open 生命周期初始化 - // TODO 获取所有设备的最新状态 - //Db.use().findAll(new Entity(), Object.class); - deviceState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("deviceState", DeviceState.class)); - } - - - - @Override - public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction.Context ctx, Collector out) throws Exception { - - // 获取最新设备状态 - DeviceState lastedDeviceState = deviceState.value(); - Integer deviceStatus = getDeviceStatus(event); - if (deviceStatus == null) { - out.collect(null); - } else { - if (lastedDeviceState == null) { - deviceState.update(new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); - } else { - DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); - DeviceState oldState = deviceState.value(); - collDeviceStatusChange(out, newState, oldState, event); - deviceState.update(newState); - } - } - } - }).filter((FilterFunction) Objects::nonNull).name("keyBy stream"); - - - // 写入rabbitmq - IotMachineEventGeneratorJob.sinkRabbitMq(commandDataStream); - - // 写入es - IotMachineEventGeneratorJob.sinkEs(commandDataStream); + // 写入kafka + transformDs.sinkTo( + KafkaSink.builder() + .setBootstrapServers(params.get("sink.bootstrap.servers")) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic("machine_iot_data_received_event") + .setValueSerializationSchema(new MachineIotDataReceivedEventSerializationSchema()) + .build() + ).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build() + ).name("MachineIotDataReceivedEvent Sink"); env.execute("Kafka Job"); } - - - private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); @@ -153,7 +99,8 @@ public class RootCloudIotDataFormatterJob { machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__())); machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta()); - machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta()); + // 工作状态 + machineIotDataReceivedEvent.setMachineWorkingStat(event.getACC_sta()); machineIotDataReceivedEvent.setIgStat(event.getIG_sta()); machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total()); machineIotDataReceivedEvent.setCurrJobCount(event.getACC_count()); @@ -167,28 +114,4 @@ public class RootCloudIotDataFormatterJob { } return machineIotDataReceivedEvent; } - - - private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) { - - // 设备状态 - return event.getMachineWorkingStat(); - } - - private static void collDeviceStatusChange(Collector out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) { - - if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { - // 设备开机 - out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); - } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { - // 设备关机 - out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); - } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { - // 设备开始待机 - out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); - } else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { - // 设备开始工作 - out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); - } - } } diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/command/BaseCommandSerializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/command/BaseCommandSerializationSchema.java index f1fc662..9c2d8dc 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/command/BaseCommandSerializationSchema.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/command/BaseCommandSerializationSchema.java @@ -1,8 +1,6 @@ package com.qniao.iot.rc.command; import com.qniao.domain.BaseCommand; -import com.qniao.iot.machine.command.PowerOffMachineCommand; -import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java index 47b8373..4b933cb 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java @@ -1,9 +1,9 @@ package com.qniao.iot.rc.event; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.api.common.serialization.SerializationSchema; /** diff --git a/root-cloud-statistics/src/main/resources/db.setting b/root-cloud-statistics/target/classes/db.setting similarity index 54% rename from root-cloud-statistics/src/main/resources/db.setting rename to root-cloud-statistics/target/classes/db.setting index 467c2bf..632fa24 100644 --- a/root-cloud-statistics/src/main/resources/db.setting +++ b/root-cloud-statistics/target/classes/db.setting @@ -1,9 +1,8 @@ ## db.setting文件 -url = jdbc:mysql://localhost:3306/test +url = jdbc:mysql://8.135.8.221:3306/cloud-print-cloud-factory?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=GMT%2B8&useSSL=false user = root -pass = 123456 - +pass = qniaothreetwoonego # 是否在日志中显示执行的SQL showSql = true