From 40004867b988f10ab887489f8fc072b1d53c9f90 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 12 Jul 2022 20:48:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rc/RootCloudIotDataEventSourceMocker.java | 2 +- root-cloud-statistics/pom.xml | 45 ++- .../java/com/qniao/iot/rc/DeviceState.java | 38 +++ .../iot/rc/RootCloudIotDataFormatterJob.java | 266 ++++++++++++++++-- .../BaseCommandSerializationSchema.java | 23 ++ .../com/qniao/iot/rc/constant/DataSource.java | 17 -- .../rc/event/MachineIotDataReceivedEvent.java | 102 ------- ...tDataReceivedEventSerializationSchema.java | 1 + .../target/classes/log4j2.properties | 25 ++ 9 files changed, 367 insertions(+), 152 deletions(-) create mode 100644 root-cloud-statistics/src/main/java/com/qniao/iot/rc/DeviceState.java create mode 100644 root-cloud-statistics/src/main/java/com/qniao/iot/rc/command/BaseCommandSerializationSchema.java delete mode 100644 root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java delete mode 100644 root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java create mode 100644 root-cloud-statistics/target/classes/log4j2.properties diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index da71a91..efa66e2 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -48,7 +48,7 @@ public class RootCloudIotDataEventSourceMocker { private static Properties createKafkaProperties() { Properties kafkaProps = new Properties(); - kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:9092"); + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.19.124.230:9092"); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); return kafkaProps; diff --git a/root-cloud-statistics/pom.xml b/root-cloud-statistics/pom.xml index a9569d8..f4ae707 100644 --- a/root-cloud-statistics/pom.xml +++ b/root-cloud-statistics/pom.xml @@ -18,11 +18,16 @@ under the License. --> + + + com.qniao + java-dependency + 0.0.1-SNAPSHOT + + 4.0.0 - com.qniao root-cloud-statistics - 0.0.1-SNAPSHOT jar root-cloud-statistics @@ -83,7 +88,7 @@ under the License. ${log4j.version} runtime - + + + + com.qniao + iot-machine-data-command + 0.0.1-SNAPSHOT + + + com.qniao + iot-machine-data-event + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-data-constant + 0.0.1-SNAPSHOT + + + + org.apache.flink + flink-connector-rabbitmq_2.12 + 1.14.5 + + + + org.apache.flink + flink-connector-elasticsearch7_2.11 + 1.14.5 + + + + + diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/DeviceState.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/DeviceState.java new file mode 100644 index 0000000..80e2fff --- /dev/null +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/DeviceState.java @@ -0,0 +1,38 @@ +package com.qniao.iot.rc; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class DeviceState { + + private Long id; + + /** + * 设备物联地址(云盒物理标识) + */ + private Long machineIotMac; + + /** + * 状态: 0:关机 1:生产中 2:待机 + */ + private Integer status; + + /** + * 发生时间 + */ + private Long updateTime; + + @Override + public String toString() { + return "设备状态:{" + + "id='" + id + '\'' + + ", status='" + status + + ", updateTime='" + updateTime + + '\'' + + '}'; + } +} diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 4d75a9b..f653971 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,23 +18,56 @@ package com.qniao.iot.rc; -import com.qniao.iot.rc.event.MachineIotDataReceivedEvent; -import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema; +import com.fasterxml.jackson.databind.util.JSONPObject; +import com.qniao.domain.BaseCommand; +import com.qniao.iot.machine.command.PowerOffMachineCommand; +import com.qniao.iot.machine.command.PowerOnMachineCommand; +import com.qniao.iot.machine.command.StartMachineWorkingCommand; +import com.qniao.iot.machine.command.StopMachineWorkingCommand; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import com.qniao.iot.rc.command.BaseCommandSerializationSchema; +import com.qniao.iot.rc.constant.DataSource; import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.tools.json.JSONUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.connector.base.DeliveryGuarantee; -import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; -import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; -import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; +import org.apache.flink.util.Collector; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.RestClientBuilder; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Objects; /** * Skeleton for a Flink DataStream Job. @@ -53,6 +86,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin public class RootCloudIotDataFormatterJob { public static void main(String[] args) throws Exception { + final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -68,30 +102,204 @@ public class RootCloudIotDataFormatterJob { // 把树根的数据转成我们自己的格式 SingleOutputStreamOperator transformDs = env .fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source") - .map((MapFunction) MachineIotDataReceivedEvent::transform) + .map((MapFunction) RootCloudIotDataFormatterJob::transform) .name("Transform MachineIotDataReceivedEvent"); - // 发送到OSS存储 - String outputPath = "oss://qn-flink-test/root-cloud-model-hw-reported-data"; - StreamingFileSink sink = StreamingFileSink.forRowFormat( - new Path(outputPath), - new SimpleStringEncoder("UTF-8") - ).build(); - transformDs.addSink(sink); - - // 再发送到kafka队列中 - transformDs.sinkTo( - KafkaSink.builder() - .setBootstrapServers(params.get("sink.bootstrap.servers")) - .setRecordSerializer( - KafkaRecordSerializationSchema.builder() - .setTopic("machine_iot_data_received_event") - .setValueSerializationSchema(new MachineIotDataReceivedEventSerializationSchema()) - .build() - ).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) - .build() - ).name("MachineIotDataReceivedEvent Sink"); + // 设备数据分组 + DataStream commandDataStream = transformDs.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) + .process(new KeyedProcessFunction() { + + private ValueState deviceState; + + @Override + public void open(Configuration parameters) { + // 必须在 open 生命周期初始化 + deviceState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("deviceState", DeviceState.class)); + } + + @Override + public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction.Context ctx, Collector out) throws Exception { + + System.out.println("收到事件数据-------------------------:" + event); + // 获取最新设备状态 + DeviceState lastedDeviceState = deviceState.value(); + + Integer deviceStatus = getDeviceStatus(event); + + DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); + collDeviceStatusChange(out, newState, null, event); + + if (deviceStatus == null) { + out.collect(null); + } else { + if (lastedDeviceState == null) { + // TODO 后续优化,先从数据库中获取设备最新的状态 + deviceState.update(new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); + System.out.println("初始化设备状态" + deviceState.value().toString()); + } else { + /*DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); + DeviceState oldState = deviceState.value(); + collDeviceStatusChange(out, newState, oldState, event); + deviceState.update(newState);*/ + } + } + } + }).name(""); + + + //sinkRabbitMq(commandDataStream); + + // 写入es + List httpHosts = new ArrayList<>(); + httpHosts.add(new HttpHost("119.23.41.137", 9200, "http")); + ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>( + httpHosts, + (ElasticsearchSinkFunction) (command, runtimeContext, requestIndexer) -> { + + HashMap map = new HashMap<>(); + if(command instanceof PowerOnMachineCommand) { + PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand)command; + map.put("id", powerOnMachineCommand.getId().toString()); + map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString()); + map.put("timestamp", powerOnMachineCommand.getTimestamp().toString()); + } + + if(command instanceof PowerOffMachineCommand) { + PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand)command; + map.put("id", powerOffMachineCommand.getId().toString()); + map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString()); + map.put("timestamp", powerOffMachineCommand.getTimestamp().toString()); + } + //创建es 请求 + IndexRequest indexRequest = Requests.indexRequest().index("flink").source(map); + //用 requestIndexer 发送最后的请求 + requestIndexer.add(indexRequest); + } + ); + /* 必须设置flush参数 */ + //刷新前缓冲的最大动作量 + esSinkBuilder.setBulkFlushMaxActions(10); + //刷新前缓冲区的最大数据大小(以MB为单位) + esSinkBuilder.setBulkFlushMaxSizeMb(5); + //论缓冲操作的数量或大小如何都要刷新的时间间隔 + esSinkBuilder.setBulkFlushInterval(5000L); + esSinkBuilder.setRestClientFactory((RestClientFactory) restClientBuilder -> { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials("elastic", "qn56521")); + restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { + httpAsyncClientBuilder.disableAuthCaching(); + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + }); + }); + //数据流添加sink + commandDataStream.addSink(esSinkBuilder.build()); env.execute("Kafka Job"); } + + private static void sinkRabbitMq(DataStream commandDataStream) { + + // rabbitmq配置(测试环境) + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("8.135.8.221") + .setVirtualHost("/") + .setUserName("qniao") + .setPassword("Qianniao2020") + .setPort(5672).build(); + + // 发送相应的指令到rabbitmq的交换机 + commandDataStream.addSink(new RMQSink<>(connectionConfig, new BaseCommandSerializationSchema(), new RMQSinkPublishOptions() { + + @Override + public String computeRoutingKey(BaseCommand command) { + return "flink"; + } + + @Override + public AMQP.BasicProperties computeProperties(BaseCommand command) { + return null; + } + + @Override + public String computeExchange(BaseCommand command) { + + // 交换机名称 + /*if(command != null) { + System.out.println("发送消息:------------------" + command.toString()); + return "flink_test_exchange"; + } + return "";*/ + System.out.println("发送消息:------------------" + command.toString()); + return "flink_test_exchange"; + } + })).name("PowerOffMachineCommand Sink"); + + + // 直接发队列 + // commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12"); + } + + private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { + + MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); + if (Objects.nonNull(event)) { + machineIotDataReceivedEvent.setId((long) (event.get__assetId__() + System.currentTimeMillis()).hashCode()); + machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__())); + machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); + machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta()); + machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta()); + machineIotDataReceivedEvent.setIgStat(event.getIG_sta()); + machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total()); + machineIotDataReceivedEvent.setCurrJobCount(event.getACC_count()); + machineIotDataReceivedEvent.setCurrJobDuration(Objects.isNull(event.getRunning_duration()) ? null : event.getRunning_duration().longValue()); + if (StringUtils.isNotBlank(event.getStoping_duration())) { + BigDecimal stoppingDuration = new BigDecimal(event.getStoping_duration()); + machineIotDataReceivedEvent.setCurrStoppingDuration(stoppingDuration.longValue()); + } + machineIotDataReceivedEvent.setCurrWaitingDuration(Objects.isNull(event.getWaiting_duration()) ? null : event.getWaiting_duration().longValue()); + machineIotDataReceivedEvent.setReportTime(System.currentTimeMillis()); + } + return machineIotDataReceivedEvent; + } + + + private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) { + + if (event.getMachinePwrStat() == 0) { + return 0; + } else if (event.getMachinePwrStat() == 1 && event.getMachineWorkingStat() == 1) { + return 1; + } else if (event.getMachinePwrStat() == 1 && event.getMachineWorkingStat() == 0) { + return 2; + } + + return null; + } + + private static void collDeviceStatusChange(Collector out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) { + + /*if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { + System.out.println("设备开机。相关事件:" + event.toString()); + out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { + System.out.println("设备关机。相关事件:" + event.toString()); + out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { + System.out.println("设备开始待机。相关事件:" + event.toString()); + out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + } else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { + System.out.println("设备开始工作。相关事件:" + event.toString()); + out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + }*/ + if (newState.getStatus() == 1 || newState.getStatus() == 2) { + System.out.println("设备开机。相关事件:" + event.toString()); + out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + } else if (newState.getStatus() == 0) { + System.out.println("设备关机。相关事件:" + event.toString()); + out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + } + } } diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/command/BaseCommandSerializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/command/BaseCommandSerializationSchema.java new file mode 100644 index 0000000..f1fc662 --- /dev/null +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/command/BaseCommandSerializationSchema.java @@ -0,0 +1,23 @@ +package com.qniao.iot.rc.command; + +import com.qniao.domain.BaseCommand; +import com.qniao.iot.machine.command.PowerOffMachineCommand; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + + +public class BaseCommandSerializationSchema implements SerializationSchema { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public byte[] serialize(BaseCommand command) { + try { + return OBJECT_MAPPER.writeValueAsBytes(command); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + command, e); + } + } +} diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java deleted file mode 100644 index 0478908..0000000 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.qniao.iot.rc.constant; - -/** - * @author Lzk - * @date 2022/7/2 - **/ - -public interface DataSource { - /** - * 树根云 - */ - Integer ROOT_CLOUD = 1; - /** - * 机智云 - */ - Integer TACT_CLOUD = 0; -} diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java deleted file mode 100644 index 2fee742..0000000 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.qniao.iot.rc.event; - -import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent; -import com.qniao.iot.rc.constant.DataSource; -import lombok.Data; -import org.apache.commons.lang3.StringUtils; - -import java.io.Serializable; -import java.math.BigDecimal; -import java.util.Objects; - -/** - * @author Lzk - * @date 2022/7/2 - **/ -@Data -public class MachineIotDataReceivedEvent implements Serializable { - - private static final long serialVersionUID = 1L; - /** - * 唯一标识 - */ - private Long id; - - /** - * 数据来源 - */ - private Integer dataSource; - - /** - * 设备物联地址(云盒物理标识) - */ - private Long machineIotMac; - - /** - * 机器电源状态 - */ - private Integer machinePwrStat; - - /** - * 机器工作状态 - */ - private Integer machineWorkingStat; - - /** - * 累加作业总数 - */ - private Long accJobCount; - - /** - * 当前作业计数 - */ - private Long currJobCount; - - /** - * 当前作业时长 - */ - private Long currJobDuration; - - /** - * 当前待机时长 - */ - private Long currWaitingDuration; - - /** - * 当前停机时长 - */ - private Long currStoppingDuration; - - /** - * 计数开关状态 - */ - private Integer igStat; - - /** - * 数据采样时间 - */ - private Long reportTime; - - public static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { - - MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); - if (Objects.nonNull(event)) { - machineIotDataReceivedEvent.setId((long) (event.get__assetId__() + System.currentTimeMillis()).hashCode()); - machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__())); - machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); - machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta()); - machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta()); - machineIotDataReceivedEvent.setIgStat(event.getIG_sta()); - machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total()); - machineIotDataReceivedEvent.setCurrJobCount(event.getACC_count()); - machineIotDataReceivedEvent.setCurrJobDuration(Objects.isNull(event.getRunning_duration()) ? null : event.getRunning_duration().longValue()); - if (StringUtils.isNotBlank(event.getStoping_duration())) { - BigDecimal stoppingDuration = new BigDecimal(event.getStoping_duration()); - machineIotDataReceivedEvent.setCurrStoppingDuration(stoppingDuration.longValue()); - } - machineIotDataReceivedEvent.setCurrWaitingDuration(Objects.isNull(event.getWaiting_duration()) ? null : event.getWaiting_duration().longValue()); - machineIotDataReceivedEvent.setReportTime(System.currentTimeMillis()); - } - return machineIotDataReceivedEvent; - } -} diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java index 82971b5..47b8373 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java @@ -1,5 +1,6 @@ package com.qniao.iot.rc.event; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.serialization.SerializationSchema; diff --git a/root-cloud-statistics/target/classes/log4j2.properties b/root-cloud-statistics/target/classes/log4j2.properties new file mode 100644 index 0000000..32c696e --- /dev/null +++ b/root-cloud-statistics/target/classes/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n