commit 2b99f536a039deadfd5a73bfb77e2983a8126b5e Author: 1049970895@qniao.cn <1049970895> Date: Tue Aug 2 15:59:30 2022 +0800 first commit diff --git a/iot-gizwits-statistics/dependency-reduced-pom.xml b/iot-gizwits-statistics/dependency-reduced-pom.xml new file mode 100644 index 0000000..f6098ad --- /dev/null +++ b/iot-gizwits-statistics/dependency-reduced-pom.xml @@ -0,0 +1,108 @@ + + + + iot-gizwits-model-formatter + org.example + 1.0-SNAPSHOT + + 4.0.0 + iot-gizwits-statistics + + + + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + maven-shade-plugin + 3.1.1 + + + package + + shade + + + + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.qniao.iot.gizwits.GizWitsIotDataFormatterJob + + + + + + + + + + + archiva.general + Gizwits General Repo + https://archiva.gizwits.com/repository/general/ + + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.17.2 + runtime + + + org.apache.logging.log4j + log4j-api + 2.17.2 + runtime + + + org.apache.logging.log4j + log4j-core + 2.17.2 + runtime + + + com.alibaba + druid + 1.1.12 + test + + + + + maven-releases + Nexus releases Repository + http://120.78.76.88:8081/repository/maven-snapshots/ + + + + 1.8 + 2.17.2 + 1.15.0 + UTF-8 + ${target.java.version} + ${target.java.version} + + diff --git a/iot-gizwits-statistics/pom.xml b/iot-gizwits-statistics/pom.xml new file mode 100644 index 0000000..eeef5dd --- /dev/null +++ b/iot-gizwits-statistics/pom.xml @@ -0,0 +1,254 @@ + + + + iot-gizwits-model-formatter + org.example + 1.0-SNAPSHOT + + 4.0.0 + + iot-gizwits-statistics + + + 8 + 8 + UTF-8 + 1.15.0 + 1.8 + ${target.java.version} + ${target.java.version} + 2.17.2 + + + + + + com.qniao + root-cloud-event + 0.0.1-SNAPSHOT + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + org.apache.flink + flink-clients + ${flink.version} + + + + org.apache.flink + flink-connector-kafka + ${flink.version} + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + + commons-logging + commons-logging + 1.2 + + + + com.qniao + iot-machine-data-command + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-data-event + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-data-constant + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-state-event-generator-job + 0.0.1-SNAPSHOT + + + + org.apache.flink + flink-connector-rabbitmq_2.12 + 1.14.5 + + + + org.apache.flink + flink-connector-elasticsearch7_2.11 + 1.14.5 + + + + cn.hutool + hutool-all + 5.8.4 + + + + + com.ctrip.framework.apollo + apollo-client + 2.0.1 + + + com.ctrip.framework.apollo + apollo-core + 2.0.1 + + + com.alibaba + druid + 1.1.12 + test + + + + mysql + mysql-connector-java + 8.0.29 + + + + com.gizwits.noti + noti-client + 1.9.0-RELEASE + + + + com.alibaba + fastjson + 1.2.31 + + + + com.qniao + ddd-event + 0.0.1-SNAPSHOT + + + + io.netty + netty-all + 4.1.42.Final + + + + com.aliyun.oss + aliyun-sdk-oss + 2.8.3 + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.qniao.iot.gizwits.GizWitsIotDataFormatterJob + + + + + + + + + + + + maven-releases + Nexus releases Repository + http://120.78.76.88:8081/repository/maven-snapshots/ + + + + + + archiva.general + Gizwits General Repo + https://archiva.gizwits.com/repository/general/ + + + \ No newline at end of file diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/DeviceStatus.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/DeviceStatus.java new file mode 100644 index 0000000..0cf14be --- /dev/null +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/DeviceStatus.java @@ -0,0 +1,170 @@ +package com.qniao.iot.gizwits; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + + +/** + * 设备执行表 + * + * @author qniao + * @date 2020/10/31 + */ +@Data +public class DeviceStatus implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * id + */ + @JSONField( + name = "id" + ) + private Long id; + + /** + * 消息交互id + */ + @JSONField( + name = "delivery_id" + ) + private String deliveryId; + + /** + * 个数 单位为个。 序号为1 采集状态上报,一包数据一共采集8次设备状态,然后再上报,该序号为1 + */ + @JSONField( + name = "count" + ) + private Long count; + + /** + * 时间戳 序号为1 1970年1月1日起的秒数 + */ + @JSONField( + name = "timestamp",format = "yyyy-MM-dd HH:mm:ss" + ) + private Date timestamp; + + /** + * 设备mac编码 + */ + @JSONField( + name = "mac" + ) + private String mac; + + /** + * 持续时间 单位为秒, 序号为1, 当前设备采集数据的时间与上一次采集数据的时间差 + */ + @JSONField( + name = "duration" + ) + private Long duration; + + /** + * 累计个数 序号为1 累计个数。 采集状态上报,一包数据一共采集8次设备状态,然后再上报 + */ + @JSONField( + name = "total" + ) + private Long total; + + /** + * 消息类型 + */ + @JSONField( + name = "event_type" + ) + private String eventType; + + /** + * 产品key + */ + @JSONField( + name = "product_key" + ) + private String productKey; + + /** + * 状态 序号为1 240程序重启 241上电事件 0 设备为关状态 1 设备为开状态 + */ + @JSONField( + name = "state" + ) + private Integer state; + + /** + * cmd + */ + @JSONField( + name = "cmd" + ) + private String cmd; + + /** + * 消息编码 + */ + @JSONField( + name = "msg_id" + ) + private String msgId; + + /** + * 设备did + */ + @JSONField( + name = "did" + ) + private String did; + + /** + * 消息创建时间 + */ + @JSONField( + name = "created_at",format = "yyyy-MM-dd HH:mm:ss" + ) + private Date createdAt; + + /** + * 当前转速 多少转每分钟,设备当前转速,单位为n/min + */ + @JSONField( + name = "cur_speed" + ) + private Integer curSpeed; + + /** + * 当前状态 240程序重启 241上电事件 0 设备为关状态 1 设备为开状态 + */ + @JSONField( + name = "cur_state" + ) + private Integer curState; + + /** + * 信号强度 当前设备的信号强度 + */ + @JSONField( + name = "signal" + ) + private Integer signal; + + /** + * 上传序号 用于上传记录及应答,设备端只要收到该次服务下发的sn值一致时,才会重新上报下一次采集数据。 + */ + @JSONField( + name = "sn" + ) + private Long sn; + + @JSONField( + name = "partition_column" + ) + private Integer partitionColumn; + +} diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java new file mode 100644 index 0000000..90717ba --- /dev/null +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qniao.iot.gizwits; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.CharsetUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.gizwits.noti.noticlient.bean.resp.NotiRespPushEvents; +import com.gizwits.noti.noticlient.bean.resp.body.OffLineEventBody; +import com.gizwits.noti.noticlient.bean.resp.body.OnLineEventBody; +import com.gizwits.noti.noticlient.util.CommandUtils; +import com.qniao.iot.gizwits.config.ApolloConfig; +import com.qniao.iot.gizwits.constant.ConfigConstant; +import com.qniao.iot.gizwits.source.GizWitsIotSource; +import com.qniao.iot.gizwits.util.DataParsingUtils; +import com.qniao.iot.gizwits.util.SnowFlake; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaSerializationSchema; +import com.qniao.iot.rc.constant.DataSource; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.util.Collector; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SaslConfigs; + +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.*; + +/** + * 机智云设备数据转换 + * @author hph + */ +public class GizWitsIotDataFormatterJob { + + static SnowFlake snowflake = new SnowFlake( + Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)), + Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID)) + ); + + public static void main(String[] args) throws Exception { + + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + DataStreamSource streamSource = env.addSource(new GizWitsIotSource()); + + // 把机智云的数据转成我们自己的格式 + SingleOutputStreamOperator transformDs = streamSource + .flatMap(new RichFlatMapFunction() { + @Override + public void flatMap(JSONObject value, Collector out) { + List receivedEvents = transform(value); + if(CollUtil.isNotEmpty(receivedEvents)) { + receivedEvents.forEach(out::collect); + } + } + }).name("Transform MachineIotDataReceivedEvent"); + + //kafka 认证配置,暂时注释,后续可能需要放开 + /*Properties kafkaProducerConfig = new Properties(); + kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); + kafkaProducerConfig.setProperty("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); + */ + + + // 写入kafka + transformDs.sinkTo( + KafkaSink.builder() + .setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) + //.setKafkaProducerConfig(kafkaProducerConfig) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS)) + .setValueSerializationSchema(new MachineIotDataReceivedEventKafkaSerializationSchema()) + .build() + ).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build() + ).name("machineIotDataReceivedEvent kafka Sink"); + + // 发送到OSS存储 + String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); + + StreamingFileSink sink = StreamingFileSink.forRowFormat( + new Path(outputPath), + new SimpleStringEncoder(CharsetUtil.UTF_8) + ).withBucketAssigner(new BucketAssigner() { + @Override + public String getBucketId(String element, Context context) { + + MachineIotDataReceivedEvent receivedEvent = JSONUtil.toBean(element, MachineIotDataReceivedEvent.class); + // 指定以日期的格式生成桶目录 + Long receivedTime = receivedEvent.getReceivedTime(); + // 获取设备状态目录名称 + String deviceStatusStr = getDeviceStatusStr(receivedEvent.getMachineWorkingStat()); + return deviceStatusStr + "/" + LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8")) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "/" + receivedEvent.getMachineIotMac(); + } + + private String getDeviceStatusStr(Integer machineWorkingStat) { + + if(machineWorkingStat == 1){ + return "deviceWorking"; + }else if(machineWorkingStat == 2){ + return "deviceWaiting"; + }else { + return "deviceOff"; + } + } + + @Override + public SimpleVersionedSerializer getSerializer() { + return SimpleVersionedStringSerializer.INSTANCE; + } + }).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build(); + transformDs.map(new RichMapFunction() { + @Override + public String map(MachineIotDataReceivedEvent value) { + return JSONUtil.toJsonStr(value); + } + }).addSink(sink); + + env.execute("gizwits iot data formatter job"); + } + + + private static List transform(com.alibaba.fastjson.JSONObject event) { + + List receivedEventList = new ArrayList<>(); + MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); + if (ObjectUtil.isNotEmpty(event)) { + String pushEventCode = CommandUtils.getPushEventCode(event); + if(NotiRespPushEvents.DEVICE_OFFLINE.getCode().equals(pushEventCode)) { + // 设备下线 + OffLineEventBody offLineEventBody = CommandUtils.parsePushEvent(event, OffLineEventBody.class); + String mac = offLineEventBody.getMac(); + if(StrUtil.isNotEmpty(mac)) { + machineIotDataReceivedEvent.setId(snowflake.nextId()); + machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); + machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac)); + machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); + machineIotDataReceivedEvent.setMachinePwrStat(0); + machineIotDataReceivedEvent.setMachineWorkingStat(0); + machineIotDataReceivedEvent.setCurrJobCount(0L); + machineIotDataReceivedEvent.setCurrJobDuration(0L); + machineIotDataReceivedEvent.setCurrStoppingDuration(0L); + machineIotDataReceivedEvent.setCurrWaitingDuration(0L); + machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); + machineIotDataReceivedEvent.setReportTime(offLineEventBody.getCreatedAt()); + receivedEventList.add(machineIotDataReceivedEvent); + return receivedEventList; + } + }else if(NotiRespPushEvents.DEVICE_ONLINE.getCode().equals(pushEventCode)) { + // 设备上线 + OnLineEventBody onLineEventBody = CommandUtils.parsePushEvent(event, OnLineEventBody.class); + String mac = onLineEventBody.getMac(); + if(StrUtil.isNotEmpty(mac)) { + machineIotDataReceivedEvent.setId(snowflake.nextId()); + machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); + machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac)); + machineIotDataReceivedEvent.setMachinePwrStat(1); + machineIotDataReceivedEvent.setMachineWorkingStat(2); + machineIotDataReceivedEvent.setCurrJobCount(0L); + machineIotDataReceivedEvent.setCurrJobDuration(0L); + machineIotDataReceivedEvent.setCurrStoppingDuration(0L); + machineIotDataReceivedEvent.setCurrWaitingDuration(0L); + machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); + machineIotDataReceivedEvent.setReportTime(onLineEventBody.getCreatedAt()); + receivedEventList.add(machineIotDataReceivedEvent); + return receivedEventList; + } + }else if(NotiRespPushEvents.DEVICE_STATUS_KV.getCode().equals(pushEventCode)) { + // 设备生产数据 + List> mapList = DataParsingUtils.deviceStatusKvParsing(event); + mapList.forEach(e ->{ + JSONObject j = (JSONObject) JSON.toJSON(e); + DeviceStatus deviceStatus = j.toJavaObject(DeviceStatus.class); + machineIotDataReceivedEvent.setId(snowflake.nextId()); + machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); + machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(deviceStatus.getMac())); + machineIotDataReceivedEvent.setMachinePwrStat(1); + machineIotDataReceivedEvent.setMachineWorkingStat(1); + machineIotDataReceivedEvent.setCurrJobCount(deviceStatus.getCount()); + machineIotDataReceivedEvent.setCurrJobDuration(deviceStatus.getDuration()); + machineIotDataReceivedEvent.setCurrStoppingDuration(0L); + machineIotDataReceivedEvent.setCurrWaitingDuration(0L); + machineIotDataReceivedEvent.setAccJobCount(deviceStatus.getTotal()); + machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); + machineIotDataReceivedEvent.setReportTime(deviceStatus.getTimestamp().getTime()); + receivedEventList.add(machineIotDataReceivedEvent); + }); + return receivedEventList; + } + } + return receivedEventList; + } +} diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsProperties.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsProperties.java new file mode 100644 index 0000000..cd11e8c --- /dev/null +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsProperties.java @@ -0,0 +1,73 @@ +package com.qniao.iot.gizwits; + +import com.gizwits.noti.noticlient.OhMyNotiClient; +import com.gizwits.noti.noticlient.bean.req.body.LoginReqCommandBody; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + + +@Data +public class GizWitsProperties { + + public GizWitsProperties() { + this.automaticConfirmation = true; + Item item = new Item(); + this.itemList.add(item); + } + + /** + * snoti 服务器 host + */ + private String host = "snoti.gizwits.com"; + + /** + * snoti 服务器 port + */ + private Integer port = 2017; + + /** + * 自动确认 + * 默认为true + *

+ * 当true时, + * 通过使用 {@link OhMyNotiClient#receiveMessage()} 方法接受消息时会自动ack + * 当false时, + * 通过使用 {@link OhMyNotiClient#receiveMessage()} 方法接受消息时需要手动调用 + * 注意, 如果此时没有手动回复ack. 当达到了 preFetch{@link LoginReqCommandBody#getPrefetchCount()}时, + * snoti服务端不会再推送消息. + */ + private Boolean automaticConfirmation; + + /** + * 登陆信息 + */ + private List itemList = new ArrayList<>(1); + + @Data + @NoArgsConstructor + public static class Item { + + /** + * 机智云产品key + */ + private String productKey = "816caf9e2b2141be916f204214461df4"; + + /** + * 机智云snoti auth id + */ + private String authId = "+A6DBSboRBCeCFlHwPi3Kw"; + + /** + * 机智云snoti auth secret + */ + private String authSecret = "TeUMuwQNTmy+JiWrLAuMHQ"; + + /** + * 机智云snoti sub key + */ + private String subKey = "e3f9719ff3a045f8b5c8c6b1c5284108"; + } +} diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/command/BaseCommandSerializationSchema.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/command/BaseCommandSerializationSchema.java new file mode 100644 index 0000000..6ffe542 --- /dev/null +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/command/BaseCommandSerializationSchema.java @@ -0,0 +1,21 @@ +package com.qniao.iot.gizwits.command; + +import com.qniao.domain.BaseCommand; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + + +public class BaseCommandSerializationSchema implements SerializationSchema { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public byte[] serialize(BaseCommand command) { + try { + return OBJECT_MAPPER.writeValueAsBytes(command); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + command, e); + } + } +} diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java new file mode 100644 index 0000000..87c057e --- /dev/null +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java @@ -0,0 +1,19 @@ +package com.qniao.iot.gizwits.config; + +import com.ctrip.framework.apollo.Config; +import com.ctrip.framework.apollo.ConfigService; + +public class ApolloConfig { + + private static final Config config = ConfigService.getAppConfig(); + + public static String get(String key, String defaultValue) { + + return config.getProperty(key, defaultValue); + } + + public static String get(String key) { + + return config.getProperty(key, null); + } +} diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java new file mode 100644 index 0000000..af528a1 --- /dev/null +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java @@ -0,0 +1,14 @@ +package com.qniao.iot.gizwits.constant; + +public interface ConfigConstant { + + String SINK_KAFKA_BOOTSTRAP_SERVERS = "sink.kafka.bootstrap.servers"; + + String SINK_KAFKA_TOPICS = "sink.kafka.topics"; + + String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id"; + + String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; + + String SINK_OSS_PATH = "sink.oss.path"; +} diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/source/GizWitsIotSource.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/source/GizWitsIotSource.java new file mode 100644 index 0000000..63ffc76 --- /dev/null +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/source/GizWitsIotSource.java @@ -0,0 +1,102 @@ +package com.qniao.iot.gizwits.source; + +import com.alibaba.fastjson.JSONObject; +import com.gizwits.noti.noticlient.OhMyNotiClient; +import com.gizwits.noti.noticlient.OhMyNotiClientImpl; +import com.gizwits.noti.noticlient.bean.req.NotiReqPushEvents; +import com.gizwits.noti.noticlient.bean.req.body.AuthorizationData; +import com.gizwits.noti.noticlient.config.SnotiCallback; +import com.gizwits.noti.noticlient.config.SnotiConfig; +import com.gizwits.noti.noticlient.enums.ProtocolType; +import com.google.common.base.Preconditions; +import com.qniao.iot.gizwits.GizWitsProperties; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class GizWitsIotSource extends RichSourceFunction { + + private static Boolean RUNNING = true; + + private static OhMyNotiClient client; + + private static final GizWitsProperties PROPERTIES = new GizWitsProperties(); + + @Override + public void open(Configuration parameters) { + + log.info("开始启动gizwits客户端..."); + + client = new OhMyNotiClientImpl() + .addLoginAuthorizes(getSnotiLoginCredential()) + .setCallback(getSnotiCallback()) + .setSnotiConfig(getGizWitsConfig()); + + //启动client + client.doStart(); + + log.info("启动gizwits客户端完成"); + } + + @Override + public void run(SourceContext ctx) { + while (RUNNING) { + JSONObject message = client.receiveMessage(); + ctx.collect(message); + } + } + + @Override + public void cancel() { + RUNNING = false; + } + + private AuthorizationData[] getSnotiLoginCredential() { + List itemList = PROPERTIES.getItemList(); + Preconditions.checkArgument(!CollectionUtils.isEmpty(itemList), "未配置gizwits登陆信息, gizwits初始化失败"); + return itemList.stream() + .map(it -> new AuthorizationData() + //监听所有推送事件 + .setProtocolType(ProtocolType.V2) + .addEvents(NotiReqPushEvents.values()) + .setSubkey(it.getSubKey()) + .setAuth_id(it.getAuthId()) + .setAuth_secret(it.getAuthSecret()) + .setProduct_key(it.getProductKey())) + .toArray(AuthorizationData[]::new); + } + + private SnotiCallback getSnotiCallback() { + return new SnotiCallback() { + + @Override + public void loginFailed(String errorMessage) { + log.warn("gizwits登录失败, 请检查登录参数是否有效!!! errorMsg\n {}", errorMessage); + } + + @Override + public void disconnected() { + log.warn("gizwits客户端连接断开, 即将尝试重连..."); + } + + @Override + public void reload(AuthorizationData... authorizationData) { + log.info("gizwits重载登录信息[{}]...", Stream.of(authorizationData).map(AuthorizationData::toString) + .collect(Collectors.joining(","))); + } + }; + } + + private SnotiConfig getGizWitsConfig() { + return new SnotiConfig() + .setAutomaticConfirmation(PROPERTIES.getAutomaticConfirmation()) + .setHost(PROPERTIES.getHost()) + .setPort(PROPERTIES.getPort()); + } +} diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/DataParsingUtils.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/DataParsingUtils.java new file mode 100644 index 0000000..1d983bd --- /dev/null +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/DataParsingUtils.java @@ -0,0 +1,260 @@ +package com.qniao.iot.gizwits.util; + + +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.lang3.StringUtils; + +import java.math.BigDecimal; +import java.text.DecimalFormat; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.*; + +/** + * Created by ZGF on 2020/10/30. + */ +public class DataParsingUtils { + + private static final String CREATED_AT = "created_at"; + + private static final String TIMESTAMP = "timestamp"; + + private static final String LATITUDE = "latitude"; + + private static final String LONGITUDE = "longitude"; + + private static final String KEEP_ALIVE = "keep_alive"; + + private static String[][] getCols(){ + String[][] colsarray = new String[8][5]; + for(int i=0;i<8;i++){ + int p = i+1; + colsarray[i]=new String[]{TIMESTAMP+"_"+p,"state_"+p,"duration_"+p,"count_"+p,"total_"+p}; + } + return colsarray; + } + + private static String getColMapping(String colName){ + return colName.split("_")[0]; + } + + + /** + * 字符串时间转换为时间戳 + * @param string + * @return + */ + public static Long stringToTimestamp(String string){ + if(StringUtils.isNotBlank(string)) { + char[] d = string.toCharArray(); + String datatime = String.valueOf(d[6] + "" + d[7]); + datatime = datatime + String.valueOf(d[4] + "" + d[5]); + datatime = datatime + String.valueOf(d[2] + "" + d[3]); + datatime = datatime + String.valueOf(d[0] + "" + d[1]); + return Long.parseLong(datatime, 16); + } + return null; + } + + public static LocalDateTime getDateLocalDateTime(String string){ + return Instant.ofEpochSecond(stringToTimestamp(string)).atOffset(ZoneOffset.of("+08:00")).toLocalDateTime(); + } + + public static LocalDateTime getDateLocalDateTime(Long times){ + return Instant.ofEpochSecond(times).atOffset(ZoneOffset.of("+08:00")).toLocalDateTime(); + } + + + + + public static String timesTampDateFormat(String string){ + Long timesTamp = stringToTimestamp(string); + if(timesTamp != null) { + + return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timesTamp); + } + return null; + } + + public static String timesTampDateFormat(Long timesTamp){ + if(timesTamp != null) { + return new SimpleDateFormat("yyyy-MM-dd").format(new Date(timesTamp*1000L)); + } + return null; + } + + public static List> deviceStatusKvParsing(JSONObject message){ + String[] row = new String[]{"cmd",CREATED_AT,"delivery_id","did","event_type","mac","msg_id","product_key"}; + String[] cl = new String[]{"sn","signal","cur_state","cur_speed"}; + List> list = new ArrayList<>(); + String[][] cols = getCols(); + Map jsonObject = (Map) message.get("data"); + if(jsonObject != null){ + for(String[] c: cols){ + Map map =new HashMap<>(); + boolean bool = false; + for(String col : c){ + if(jsonObject.containsKey(col)){ + bool = true; + if(col.contains(TIMESTAMP)){ + LocalDateTime localDateTime = getDateLocalDateTime(jsonObject.get(col).toString()); + map.put(getColMapping(col),localDateTime); + map.put("partition_column",localDateTime.getMonthValue()); + }else { + map.put(getColMapping(col), jsonObject.get(col)); + } + } + } + if(bool) { + list.add(map); + } + } + + for(Map map : list){ + for(String s : cl){ + if(jsonObject.containsKey(s)){ + map.put(s,jsonObject.get(s)); + } + } + } + + for(Map map : list){ + for(String s :row){ + if(message.containsKey(s)){ + if(s.equals(CREATED_AT)) { + BigDecimal bigDecimal = new BigDecimal(String.valueOf(message.get(s))); + map.put(s,getDateLocalDateTime(bigDecimal.longValue())); + }else{ + map.put(s, message.get(s)); + } + } + } + } + } + + return list; + } + + public static JSONObject parsingOfflineMessage(JSONObject message){ + + if(message.containsKey("data")){ + JSONObject o = message.getJSONObject("data"); + if(o.containsKey("duration")){ + message.put("duration",o.get("duration")); + } + if(o.containsKey("reason")){ + message.put("reason",o.get("reason")); + } + if(o.containsKey("heartbeat")){ + JSONObject jb = o.getJSONObject("heartbeat"); + if(jb.containsKey("min")){ + message.put("min",jb.get("min")); + } + if(jb.containsKey("avg")){ + message.put("avg",jb.get("avg")); + } + if(jb.containsKey("last")){ + message.put("last",jb.get("last")); + } + if(jb.containsKey("max")){ + message.put("max",jb.get("max")); + } + if(jb.containsKey("count")){ + message.put("count",jb.get("count")); + } + } + } + return message; + } + + public static JSONObject parsingOnlineMessage(JSONObject message){ + if(message.containsKey("data")){ + JSONObject o = message.getJSONObject("data"); + if(o.containsKey(KEEP_ALIVE)){ + message.put(KEEP_ALIVE,o.get(KEEP_ALIVE)); + } + } + return message; + } + + public static JSONObject parsingMessage(JSONObject message){ + if(message != null){ + if(message.containsKey(CREATED_AT)){ + BigDecimal bigDecimal = new BigDecimal(String.valueOf(message.get(CREATED_AT))); + message.put(CREATED_AT,getDateLocalDateTime(bigDecimal.longValue())); + } + + if(message.containsKey(LATITUDE)){ + DecimalFormat df = new DecimalFormat("#.000000"); + message.put(LATITUDE,df.format(new Double(String.valueOf(message.get(LATITUDE))))); + } + + if(message.containsKey(LONGITUDE)){ + DecimalFormat df = new DecimalFormat("#.000000"); + message.put(LONGITUDE,df.format(new Double(String.valueOf(message.get(LONGITUDE))))); + } + } + return message; + } + + /* public static void main(String[] args) { + String str = "{\n" + + "\t\"event_type\": \"device_offline\",\n" + + "\t\"product_key\": \"816caf9e2b2141be916f204214461df4\",\n" + + "\t\"data\": {\n" + + "\t\t\"duration\": 13280,\n" + + "\t\t\"reason\": \"no_heartbeat\",\n" + + "\t\t\"heartbeat\": {\n" + + "\t\t\t\"min\": 48,\n" + + "\t\t\t\"avg\": 50,\n" + + "\t\t\t\"last\": 130,\n" + + "\t\t\t\"max\": 52,\n" + + "\t\t\t\"count\": 263\n" + + "\t\t}\n" + + "\t},\n" + + "\t\"delivery_id\": 29,\n" + + "\t\"ip\": \"39.144.3.158\",\n" + + "\t\"created_at\": 1604289336.56731605530,\n" + + "\t\"cmd\": \"event_push\",\n" + + "\t\"msg_id\": \"jJqZWskgSAODEG6x1ksSjw\",\n" + + "\t\"mac\": \"861193040025935\",\n" + + "\t\"did\": \"hSOUIuLIYsPpaydiFFIA7M\"\n" + + "}"; + JSONObject j = JSON.parseObject(str); + JSONObject x = parsingMessage(j); + System.out.println(x); + + DeviceOffline deviceOffline = x.toJavaObject(DeviceOffline.class); + System.out.println(JSON.toJSONString(deviceOffline)); + + + String str2 = "{\n" + + "\t\"country\": \"China\",\n" + + "\t\"data\": {\n" + + "\t\t\"keep_alive\": 130\n" + + "\t},\n" + + "\t\"city\": \"Unkown\",\n" + + "\t\"delivery_id\": 72,\n" + + "\t\"ip\": \"39.144.7.209\",\n" + + "\t\"latitude\": 34.7725000000000008527,\n" + + "\t\"created_at\": 1604295582.08462190628,\n" + + "\t\"mac\": \"861193040025935\",\n" + + "\t\"event_type\": \"device_online\",\n" + + "\t\"product_key\": \"816caf9e2b2141be916f204214461df4\",\n" + + "\t\"cmd\": \"event_push\",\n" + + "\t\"msg_id\": \"FZ3AUG7/Sge7BohizNzJ/g\",\n" + + "\t\"region\": \"Unkown\",\n" + + "\t\"did\": \"hSOUIuLIYsPpaydiFFIA7M\",\n" + + "\t\"longitude\": 113.726600000000004798\n" + + "}"; + JSONObject j2 = JSON.parseObject(str2); + JSONObject x2 = parsingMessage(j2); + System.out.println(x2); + + DeviceOnline deviceOnline = x.toJavaObject(DeviceOnline.class); + System.out.println(JSON.toJSONString(deviceOnline)); + + }*/ +} diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/SnowFlake.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/SnowFlake.java new file mode 100644 index 0000000..ff2d44c --- /dev/null +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/SnowFlake.java @@ -0,0 +1,101 @@ +package com.qniao.iot.gizwits.util; + +/** + * 雪花ID算法 + */ +public class SnowFlake { + + /** + * 起始的时间戳 + */ + private final static long START_STMP = 1480166465631L; + + /** + * 每一部分占用的位数 + */ + private final static long SEQUENCE_BIT = 12; //序列号占用的位数 + private final static long MACHINE_BIT = 5; //机器标识占用的位数 + private final static long DATACENTER_BIT = 5;//数据中心占用的位数 + + /** + * 每一部分的最大值 + */ + private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT); + private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT); + private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT); + + /** + * 每一部分向左的位移 + */ + private final static long MACHINE_LEFT = SEQUENCE_BIT; + private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT; + private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT; + + private long datacenterId = 1L; //数据中心 + private long machineId = 1L; //机器标识 + private long sequence = 0L; //序列号 + private long lastStmp = -1L;//上一次时间戳 + +// public SnowFlake(){ +// } + + public SnowFlake(long datacenterId, + long machineId) { + if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) { + throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0"); + } + if (machineId > MAX_MACHINE_NUM || machineId < 0) { + throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0"); + } + this.datacenterId = datacenterId; + this.machineId = machineId; + } + + /** + * 产生下一个ID + * + * @return + */ + public synchronized Long nextId() { + long currStmp = getNewstmp(); + if (currStmp < lastStmp) { + throw new RuntimeException("Clock moved backwards. Refusing to generate id"); + } + + if (currStmp == lastStmp) { + //相同毫秒内,序列号自增 + sequence = (sequence + 1) & MAX_SEQUENCE; + //同一毫秒的序列数已经达到最大 + if (sequence == 0L) { + currStmp = getNextMill(); + } + } else { + //不同毫秒内,序列号置为0 + sequence = 0L; + } + + lastStmp = currStmp; + + return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分 + | datacenterId << DATACENTER_LEFT //数据中心部分 + | machineId << MACHINE_LEFT //机器标识部分 + | sequence; //序列号部分 + } + + private long getNextMill() { + long mill = getNewstmp(); + while (mill <= lastStmp) { + mill = getNewstmp(); + } + return mill; + } + + private long getNewstmp() { + return System.currentTimeMillis(); + } + + public static void main(String[] args) { + SnowFlake s = new SnowFlake(1, 1); + System.out.println(s.nextId()); + } +} \ No newline at end of file diff --git a/iot-gizwits-statistics/src/main/resources/META-INF/app.properties b/iot-gizwits-statistics/src/main/resources/META-INF/app.properties new file mode 100644 index 0000000..94b0185 --- /dev/null +++ b/iot-gizwits-statistics/src/main/resources/META-INF/app.properties @@ -0,0 +1,5 @@ +app.id=iot-gizwits-model-formatter + +# test 8.135.8.221 +# prod 47.112.164.224 +apollo.meta=http://47.112.164.224:5000 \ No newline at end of file diff --git a/iot-gizwits-statistics/src/main/resources/log4j2.properties b/iot-gizwits-statistics/src/main/resources/log4j2.properties new file mode 100644 index 0000000..32c696e --- /dev/null +++ b/iot-gizwits-statistics/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/iot-gizwits-statistics/src/test/java/com/qniao/Test1.java b/iot-gizwits-statistics/src/test/java/com/qniao/Test1.java new file mode 100644 index 0000000..3a4ff79 --- /dev/null +++ b/iot-gizwits-statistics/src/test/java/com/qniao/Test1.java @@ -0,0 +1,15 @@ +package com.qniao; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +public class Test1 { + + public static void main(String[] args) { + Long receivedTime = LocalDateTime.now().toInstant(ZoneOffset.ofHours(+8)).toEpochMilli(); + System.out.println(LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8")) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + } +} diff --git a/iot-gizwits-statistics/src/test/java/com/qniao/TestOss.java b/iot-gizwits-statistics/src/test/java/com/qniao/TestOss.java new file mode 100644 index 0000000..00e12fb --- /dev/null +++ b/iot-gizwits-statistics/src/test/java/com/qniao/TestOss.java @@ -0,0 +1,25 @@ +package com.qniao; + +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.model.ObjectMetadata; + +import java.io.ByteArrayInputStream; + +public class TestOss { + + public static void main(String[] args) { + + OSSClient ossClient = new OSSClient("oss-cn-shenzhen.aliyuncs.com", "LTAINmC91NqIGN38", "Hh10dQPjq1jMLLSpbDAR05ZzR3nXsU"); + + String str = "weqgwrgwefqwefwefqerwegwefwefgweghjtyjtyjergyjrrfwgrth"; + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setContentLength(str.length()); + objectMetadata.setCacheControl("no-cache"); + objectMetadata.setHeader("Pragma", "no-cache"); + objectMetadata.setContentType("application/json"); + objectMetadata.setContentEncoding("utf-8"); + objectMetadata.setContentDisposition("inline;filename=" + "213.json"); + ossClient.putObject("qn-data-lake", "gizwits-model-reported-data/213.json", new ByteArrayInputStream(str.getBytes()),objectMetadata); + ossClient.shutdown(); + } +} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..27f7dbe --- /dev/null +++ b/pom.xml @@ -0,0 +1,20 @@ + + + 4.0.0 + + org.example + iot-gizwits-model-formatter + pom + 1.0-SNAPSHOT + + iot-gizwits-statistics + + + + 8 + 8 + + + \ No newline at end of file