commit
2b99f536a0
16 changed files with 1451 additions and 0 deletions
Split View
Diff Options
-
108iot-gizwits-statistics/dependency-reduced-pom.xml
-
254iot-gizwits-statistics/pom.xml
-
170iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/DeviceStatus.java
-
239iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java
-
73iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsProperties.java
-
21iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/command/BaseCommandSerializationSchema.java
-
19iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java
-
14iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java
-
102iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/source/GizWitsIotSource.java
-
260iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/DataParsingUtils.java
-
101iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/SnowFlake.java
-
5iot-gizwits-statistics/src/main/resources/META-INF/app.properties
-
25iot-gizwits-statistics/src/main/resources/log4j2.properties
-
15iot-gizwits-statistics/src/test/java/com/qniao/Test1.java
-
25iot-gizwits-statistics/src/test/java/com/qniao/TestOss.java
-
20pom.xml
@ -0,0 +1,108 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> |
|||
<parent> |
|||
<artifactId>iot-gizwits-model-formatter</artifactId> |
|||
<groupId>org.example</groupId> |
|||
<version>1.0-SNAPSHOT</version> |
|||
</parent> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
<artifactId>iot-gizwits-statistics</artifactId> |
|||
<build> |
|||
<plugins> |
|||
<plugin> |
|||
<artifactId>maven-compiler-plugin</artifactId> |
|||
<version>3.1</version> |
|||
<configuration> |
|||
<source>${target.java.version}</source> |
|||
<target>${target.java.version}</target> |
|||
</configuration> |
|||
</plugin> |
|||
<plugin> |
|||
<artifactId>maven-shade-plugin</artifactId> |
|||
<version>3.1.1</version> |
|||
<executions> |
|||
<execution> |
|||
<phase>package</phase> |
|||
<goals> |
|||
<goal>shade</goal> |
|||
</goals> |
|||
<configuration> |
|||
<artifactSet> |
|||
<excludes> |
|||
<exclude>org.apache.flink:flink-shaded-force-shading</exclude> |
|||
<exclude>com.google.code.findbugs:jsr305</exclude> |
|||
<exclude>org.slf4j:*</exclude> |
|||
<exclude>org.apache.logging.log4j:*</exclude> |
|||
</excludes> |
|||
</artifactSet> |
|||
<filters> |
|||
<filter> |
|||
<artifact>*:*</artifact> |
|||
<excludes> |
|||
<exclude>META-INF/*.SF</exclude> |
|||
<exclude>META-INF/*.DSA</exclude> |
|||
<exclude>META-INF/*.RSA</exclude> |
|||
</excludes> |
|||
</filter> |
|||
</filters> |
|||
<transformers> |
|||
<transformer /> |
|||
<transformer> |
|||
<mainClass>com.qniao.iot.gizwits.GizWitsIotDataFormatterJob</mainClass> |
|||
</transformer> |
|||
</transformers> |
|||
</configuration> |
|||
</execution> |
|||
</executions> |
|||
</plugin> |
|||
</plugins> |
|||
</build> |
|||
<repositories> |
|||
<repository> |
|||
<id>archiva.general</id> |
|||
<name>Gizwits General Repo</name> |
|||
<url>https://archiva.gizwits.com/repository/general/</url> |
|||
</repository> |
|||
</repositories> |
|||
<dependencies> |
|||
<dependency> |
|||
<groupId>org.apache.logging.log4j</groupId> |
|||
<artifactId>log4j-slf4j-impl</artifactId> |
|||
<version>2.17.2</version> |
|||
<scope>runtime</scope> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.apache.logging.log4j</groupId> |
|||
<artifactId>log4j-api</artifactId> |
|||
<version>2.17.2</version> |
|||
<scope>runtime</scope> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.apache.logging.log4j</groupId> |
|||
<artifactId>log4j-core</artifactId> |
|||
<version>2.17.2</version> |
|||
<scope>runtime</scope> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>com.alibaba</groupId> |
|||
<artifactId>druid</artifactId> |
|||
<version>1.1.12</version> |
|||
<scope>test</scope> |
|||
</dependency> |
|||
</dependencies> |
|||
<distributionManagement> |
|||
<repository> |
|||
<id>maven-releases</id> |
|||
<name>Nexus releases Repository</name> |
|||
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url> |
|||
</repository> |
|||
</distributionManagement> |
|||
<properties> |
|||
<target.java.version>1.8</target.java.version> |
|||
<log4j.version>2.17.2</log4j.version> |
|||
<flink.version>1.15.0</flink.version> |
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
|||
<maven.compiler.source>${target.java.version}</maven.compiler.source> |
|||
<maven.compiler.target>${target.java.version}</maven.compiler.target> |
|||
</properties> |
|||
</project> |
|||
@ -0,0 +1,254 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" |
|||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|||
<parent> |
|||
<artifactId>iot-gizwits-model-formatter</artifactId> |
|||
<groupId>org.example</groupId> |
|||
<version>1.0-SNAPSHOT</version> |
|||
</parent> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
|
|||
<artifactId>iot-gizwits-statistics</artifactId> |
|||
|
|||
<properties> |
|||
<maven.compiler.source>8</maven.compiler.source> |
|||
<maven.compiler.target>8</maven.compiler.target> |
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
|||
<flink.version>1.15.0</flink.version> |
|||
<target.java.version>1.8</target.java.version> |
|||
<maven.compiler.source>${target.java.version}</maven.compiler.source> |
|||
<maven.compiler.target>${target.java.version}</maven.compiler.target> |
|||
<log4j.version>2.17.2</log4j.version> |
|||
</properties> |
|||
|
|||
<dependencies> |
|||
|
|||
<dependency> |
|||
<groupId>com.qniao</groupId> |
|||
<artifactId>root-cloud-event</artifactId> |
|||
<version>0.0.1-SNAPSHOT</version> |
|||
</dependency> |
|||
|
|||
<!-- Apache Flink dependencies --> |
|||
<!-- These dependencies are provided, because they should not be packaged into the JAR file. --> |
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-streaming-java</artifactId> |
|||
<version>${flink.version}</version> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-clients</artifactId> |
|||
<version>${flink.version}</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-connector-kafka</artifactId> |
|||
<version>${flink.version}</version> |
|||
</dependency> |
|||
|
|||
<!-- Add logging framework, to produce console output when running in the IDE. --> |
|||
<!-- These dependencies are excluded from the application JAR by default. --> |
|||
<dependency> |
|||
<groupId>org.apache.logging.log4j</groupId> |
|||
<artifactId>log4j-slf4j-impl</artifactId> |
|||
<version>${log4j.version}</version> |
|||
<scope>runtime</scope> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.apache.logging.log4j</groupId> |
|||
<artifactId>log4j-api</artifactId> |
|||
<version>${log4j.version}</version> |
|||
<scope>runtime</scope> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>org.apache.logging.log4j</groupId> |
|||
<artifactId>log4j-core</artifactId> |
|||
<version>${log4j.version}</version> |
|||
<scope>runtime</scope> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>commons-logging</groupId> |
|||
<artifactId>commons-logging</artifactId> |
|||
<version>1.2</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>com.qniao</groupId> |
|||
<artifactId>iot-machine-data-command</artifactId> |
|||
<version>0.0.1-SNAPSHOT</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>com.qniao</groupId> |
|||
<artifactId>iot-machine-data-event</artifactId> |
|||
<version>0.0.1-SNAPSHOT</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>com.qniao</groupId> |
|||
<artifactId>iot-machine-data-constant</artifactId> |
|||
<version>0.0.1-SNAPSHOT</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>com.qniao</groupId> |
|||
<artifactId>iot-machine-state-event-generator-job</artifactId> |
|||
<version>0.0.1-SNAPSHOT</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-connector-rabbitmq_2.12</artifactId> |
|||
<version>1.14.5</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>org.apache.flink</groupId> |
|||
<artifactId>flink-connector-elasticsearch7_2.11</artifactId> |
|||
<version>1.14.5</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>cn.hutool</groupId> |
|||
<artifactId>hutool-all</artifactId> |
|||
<version>5.8.4</version> |
|||
</dependency> |
|||
|
|||
<!-- apollo --> |
|||
<dependency> |
|||
<groupId>com.ctrip.framework.apollo</groupId> |
|||
<artifactId>apollo-client</artifactId> |
|||
<version>2.0.1</version> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>com.ctrip.framework.apollo</groupId> |
|||
<artifactId>apollo-core</artifactId> |
|||
<version>2.0.1</version> |
|||
</dependency> |
|||
<dependency> |
|||
<groupId>com.alibaba</groupId> |
|||
<artifactId>druid</artifactId> |
|||
<version>1.1.12</version> |
|||
<scope>test</scope> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>mysql</groupId> |
|||
<artifactId>mysql-connector-java</artifactId> |
|||
<version>8.0.29</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>com.gizwits.noti</groupId> |
|||
<artifactId>noti-client</artifactId> |
|||
<version>1.9.0-RELEASE</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>com.alibaba</groupId> |
|||
<artifactId>fastjson</artifactId> |
|||
<version>1.2.31</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>com.qniao</groupId> |
|||
<artifactId>ddd-event</artifactId> |
|||
<version>0.0.1-SNAPSHOT</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>io.netty</groupId> |
|||
<artifactId>netty-all</artifactId> |
|||
<version>4.1.42.Final</version> |
|||
</dependency> |
|||
|
|||
<dependency> |
|||
<groupId>com.aliyun.oss</groupId> |
|||
<artifactId>aliyun-sdk-oss</artifactId> |
|||
<version>2.8.3</version> |
|||
</dependency> |
|||
</dependencies> |
|||
|
|||
<build> |
|||
<plugins> |
|||
|
|||
<!-- Java Compiler --> |
|||
<plugin> |
|||
<groupId>org.apache.maven.plugins</groupId> |
|||
<artifactId>maven-compiler-plugin</artifactId> |
|||
<version>3.1</version> |
|||
<configuration> |
|||
<source>${target.java.version}</source> |
|||
<target>${target.java.version}</target> |
|||
</configuration> |
|||
</plugin> |
|||
|
|||
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> |
|||
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> |
|||
<plugin> |
|||
<groupId>org.apache.maven.plugins</groupId> |
|||
<artifactId>maven-shade-plugin</artifactId> |
|||
<version>3.1.1</version> |
|||
<executions> |
|||
<!-- Run shade goal on package phase --> |
|||
<execution> |
|||
<phase>package</phase> |
|||
<goals> |
|||
<goal>shade</goal> |
|||
</goals> |
|||
<configuration> |
|||
<artifactSet> |
|||
<excludes> |
|||
<exclude>org.apache.flink:flink-shaded-force-shading</exclude> |
|||
<exclude>com.google.code.findbugs:jsr305</exclude> |
|||
<exclude>org.slf4j:*</exclude> |
|||
<exclude>org.apache.logging.log4j:*</exclude> |
|||
</excludes> |
|||
</artifactSet> |
|||
<filters> |
|||
<filter> |
|||
<!-- Do not copy the signatures in the META-INF folder. |
|||
Otherwise, this might cause SecurityExceptions when using the JAR. --> |
|||
<artifact>*:*</artifact> |
|||
<excludes> |
|||
<exclude>META-INF/*.SF</exclude> |
|||
<exclude>META-INF/*.DSA</exclude> |
|||
<exclude>META-INF/*.RSA</exclude> |
|||
</excludes> |
|||
</filter> |
|||
</filters> |
|||
<transformers> |
|||
<transformer |
|||
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> |
|||
<transformer |
|||
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> |
|||
<mainClass>com.qniao.iot.gizwits.GizWitsIotDataFormatterJob</mainClass> |
|||
</transformer> |
|||
</transformers> |
|||
</configuration> |
|||
</execution> |
|||
</executions> |
|||
</plugin> |
|||
</plugins> |
|||
</build> |
|||
|
|||
<distributionManagement> |
|||
<repository> |
|||
<id>maven-releases</id> |
|||
<name>Nexus releases Repository</name> |
|||
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url> |
|||
</repository> |
|||
</distributionManagement> |
|||
|
|||
<repositories> |
|||
<repository> |
|||
<id>archiva.general</id> |
|||
<name>Gizwits General Repo</name> |
|||
<url>https://archiva.gizwits.com/repository/general/</url> |
|||
</repository> |
|||
</repositories> |
|||
</project> |
|||
@ -0,0 +1,170 @@ |
|||
package com.qniao.iot.gizwits; |
|||
|
|||
import com.alibaba.fastjson.annotation.JSONField; |
|||
import lombok.Data; |
|||
|
|||
import java.io.Serializable; |
|||
import java.util.Date; |
|||
|
|||
|
|||
/** |
|||
* 设备执行表 |
|||
* |
|||
* @author qniao |
|||
* @date 2020/10/31 |
|||
*/ |
|||
@Data |
|||
public class DeviceStatus implements Serializable { |
|||
|
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
/** |
|||
* id |
|||
*/ |
|||
@JSONField( |
|||
name = "id" |
|||
) |
|||
private Long id; |
|||
|
|||
/** |
|||
* 消息交互id |
|||
*/ |
|||
@JSONField( |
|||
name = "delivery_id" |
|||
) |
|||
private String deliveryId; |
|||
|
|||
/** |
|||
* 个数 单位为个。 序号为1 采集状态上报,一包数据一共采集8次设备状态,然后再上报,该序号为1 |
|||
*/ |
|||
@JSONField( |
|||
name = "count" |
|||
) |
|||
private Long count; |
|||
|
|||
/** |
|||
* 时间戳 序号为1 1970年1月1日起的秒数 |
|||
*/ |
|||
@JSONField( |
|||
name = "timestamp",format = "yyyy-MM-dd HH:mm:ss" |
|||
) |
|||
private Date timestamp; |
|||
|
|||
/** |
|||
* 设备mac编码 |
|||
*/ |
|||
@JSONField( |
|||
name = "mac" |
|||
) |
|||
private String mac; |
|||
|
|||
/** |
|||
* 持续时间 单位为秒, 序号为1, 当前设备采集数据的时间与上一次采集数据的时间差 |
|||
*/ |
|||
@JSONField( |
|||
name = "duration" |
|||
) |
|||
private Long duration; |
|||
|
|||
/** |
|||
* 累计个数 序号为1 累计个数。 采集状态上报,一包数据一共采集8次设备状态,然后再上报 |
|||
*/ |
|||
@JSONField( |
|||
name = "total" |
|||
) |
|||
private Long total; |
|||
|
|||
/** |
|||
* 消息类型 |
|||
*/ |
|||
@JSONField( |
|||
name = "event_type" |
|||
) |
|||
private String eventType; |
|||
|
|||
/** |
|||
* 产品key |
|||
*/ |
|||
@JSONField( |
|||
name = "product_key" |
|||
) |
|||
private String productKey; |
|||
|
|||
/** |
|||
* 状态 序号为1 240程序重启 241上电事件 0 设备为关状态 1 设备为开状态 |
|||
*/ |
|||
@JSONField( |
|||
name = "state" |
|||
) |
|||
private Integer state; |
|||
|
|||
/** |
|||
* cmd |
|||
*/ |
|||
@JSONField( |
|||
name = "cmd" |
|||
) |
|||
private String cmd; |
|||
|
|||
/** |
|||
* 消息编码 |
|||
*/ |
|||
@JSONField( |
|||
name = "msg_id" |
|||
) |
|||
private String msgId; |
|||
|
|||
/** |
|||
* 设备did |
|||
*/ |
|||
@JSONField( |
|||
name = "did" |
|||
) |
|||
private String did; |
|||
|
|||
/** |
|||
* 消息创建时间 |
|||
*/ |
|||
@JSONField( |
|||
name = "created_at",format = "yyyy-MM-dd HH:mm:ss" |
|||
) |
|||
private Date createdAt; |
|||
|
|||
/** |
|||
* 当前转速 多少转每分钟,设备当前转速,单位为n/min |
|||
*/ |
|||
@JSONField( |
|||
name = "cur_speed" |
|||
) |
|||
private Integer curSpeed; |
|||
|
|||
/** |
|||
* 当前状态 240程序重启 241上电事件 0 设备为关状态 1 设备为开状态 |
|||
*/ |
|||
@JSONField( |
|||
name = "cur_state" |
|||
) |
|||
private Integer curState; |
|||
|
|||
/** |
|||
* 信号强度 当前设备的信号强度 |
|||
*/ |
|||
@JSONField( |
|||
name = "signal" |
|||
) |
|||
private Integer signal; |
|||
|
|||
/** |
|||
* 上传序号 用于上传记录及应答,设备端只要收到该次服务下发的sn值一致时,才会重新上报下一次采集数据。 |
|||
*/ |
|||
@JSONField( |
|||
name = "sn" |
|||
) |
|||
private Long sn; |
|||
|
|||
@JSONField( |
|||
name = "partition_column" |
|||
) |
|||
private Integer partitionColumn; |
|||
|
|||
} |
|||
@ -0,0 +1,239 @@ |
|||
/* |
|||
* Licensed to the Apache Software Foundation (ASF) under one |
|||
* or more contributor license agreements. See the NOTICE file |
|||
* distributed with this work for additional information |
|||
* regarding copyright ownership. The ASF licenses this file |
|||
* to you under the Apache License, Version 2.0 (the |
|||
* "License"); you may not use this file except in compliance |
|||
* with the License. You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0 |
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
package com.qniao.iot.gizwits; |
|||
|
|||
import cn.hutool.core.collection.CollUtil; |
|||
import cn.hutool.core.util.CharsetUtil; |
|||
import cn.hutool.core.util.ObjectUtil; |
|||
import cn.hutool.core.util.StrUtil; |
|||
import cn.hutool.json.JSONUtil; |
|||
import com.alibaba.fastjson.JSON; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.gizwits.noti.noticlient.bean.resp.NotiRespPushEvents; |
|||
import com.gizwits.noti.noticlient.bean.resp.body.OffLineEventBody; |
|||
import com.gizwits.noti.noticlient.bean.resp.body.OnLineEventBody; |
|||
import com.gizwits.noti.noticlient.util.CommandUtils; |
|||
import com.qniao.iot.gizwits.config.ApolloConfig; |
|||
import com.qniao.iot.gizwits.constant.ConfigConstant; |
|||
import com.qniao.iot.gizwits.source.GizWitsIotSource; |
|||
import com.qniao.iot.gizwits.util.DataParsingUtils; |
|||
import com.qniao.iot.gizwits.util.SnowFlake; |
|||
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; |
|||
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaSerializationSchema; |
|||
import com.qniao.iot.rc.constant.DataSource; |
|||
import org.apache.flink.api.common.functions.RichFlatMapFunction; |
|||
import org.apache.flink.api.common.functions.RichMapFunction; |
|||
import org.apache.flink.api.common.serialization.BulkWriter; |
|||
import org.apache.flink.api.common.serialization.SimpleStringEncoder; |
|||
import org.apache.flink.connector.base.DeliveryGuarantee; |
|||
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; |
|||
import org.apache.flink.connector.kafka.sink.KafkaSink; |
|||
import org.apache.flink.core.fs.FSDataOutputStream; |
|||
import org.apache.flink.core.fs.Path; |
|||
import org.apache.flink.core.io.SimpleVersionedSerializer; |
|||
import org.apache.flink.streaming.api.CheckpointingMode; |
|||
import org.apache.flink.streaming.api.datastream.DataStreamSource; |
|||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|||
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; |
|||
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; |
|||
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; |
|||
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; |
|||
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; |
|||
import org.apache.flink.util.Collector; |
|||
import org.apache.kafka.clients.CommonClientConfigs; |
|||
import org.apache.kafka.common.config.SaslConfigs; |
|||
|
|||
import java.io.IOException; |
|||
import java.time.Instant; |
|||
import java.time.LocalDate; |
|||
import java.time.LocalDateTime; |
|||
import java.time.ZoneOffset; |
|||
import java.time.format.DateTimeFormatter; |
|||
import java.util.*; |
|||
|
|||
/** |
|||
* 机智云设备数据转换 |
|||
* @author hph |
|||
*/ |
|||
public class GizWitsIotDataFormatterJob { |
|||
|
|||
static SnowFlake snowflake = new SnowFlake( |
|||
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)), |
|||
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID)) |
|||
); |
|||
|
|||
public static void main(String[] args) throws Exception { |
|||
|
|||
|
|||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|||
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|||
DataStreamSource<com.alibaba.fastjson.JSONObject> streamSource = env.addSource(new GizWitsIotSource()); |
|||
|
|||
// 把机智云的数据转成我们自己的格式 |
|||
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = streamSource |
|||
.flatMap(new RichFlatMapFunction<JSONObject, MachineIotDataReceivedEvent>() { |
|||
@Override |
|||
public void flatMap(JSONObject value, Collector<MachineIotDataReceivedEvent> out) { |
|||
List<MachineIotDataReceivedEvent> receivedEvents = transform(value); |
|||
if(CollUtil.isNotEmpty(receivedEvents)) { |
|||
receivedEvents.forEach(out::collect); |
|||
} |
|||
} |
|||
}).name("Transform MachineIotDataReceivedEvent"); |
|||
|
|||
//kafka 认证配置,暂时注释,后续可能需要放开 |
|||
/*Properties kafkaProducerConfig = new Properties(); |
|||
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); |
|||
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); |
|||
kafkaProducerConfig.setProperty("sasl.jaas.config", |
|||
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); |
|||
*/ |
|||
|
|||
|
|||
// 写入kafka |
|||
transformDs.sinkTo( |
|||
KafkaSink.<MachineIotDataReceivedEvent>builder() |
|||
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) |
|||
//.setKafkaProducerConfig(kafkaProducerConfig) |
|||
.setRecordSerializer( |
|||
KafkaRecordSerializationSchema.builder() |
|||
.setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS)) |
|||
.setValueSerializationSchema(new MachineIotDataReceivedEventKafkaSerializationSchema()) |
|||
.build() |
|||
).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build() |
|||
).name("machineIotDataReceivedEvent kafka Sink"); |
|||
|
|||
// 发送到OSS存储 |
|||
String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); |
|||
|
|||
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( |
|||
new Path(outputPath), |
|||
new SimpleStringEncoder<String>(CharsetUtil.UTF_8) |
|||
).withBucketAssigner(new BucketAssigner<String, String>() { |
|||
@Override |
|||
public String getBucketId(String element, Context context) { |
|||
|
|||
MachineIotDataReceivedEvent receivedEvent = JSONUtil.toBean(element, MachineIotDataReceivedEvent.class); |
|||
// 指定以日期的格式生成桶目录 |
|||
Long receivedTime = receivedEvent.getReceivedTime(); |
|||
// 获取设备状态目录名称 |
|||
String deviceStatusStr = getDeviceStatusStr(receivedEvent.getMachineWorkingStat()); |
|||
return deviceStatusStr + "/" + LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8")) |
|||
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "/" + receivedEvent.getMachineIotMac(); |
|||
} |
|||
|
|||
private String getDeviceStatusStr(Integer machineWorkingStat) { |
|||
|
|||
if(machineWorkingStat == 1){ |
|||
return "deviceWorking"; |
|||
}else if(machineWorkingStat == 2){ |
|||
return "deviceWaiting"; |
|||
}else { |
|||
return "deviceOff"; |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public SimpleVersionedSerializer<String> getSerializer() { |
|||
return SimpleVersionedStringSerializer.INSTANCE; |
|||
} |
|||
}).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build(); |
|||
transformDs.map(new RichMapFunction<MachineIotDataReceivedEvent, String>() { |
|||
@Override |
|||
public String map(MachineIotDataReceivedEvent value) { |
|||
return JSONUtil.toJsonStr(value); |
|||
} |
|||
}).addSink(sink); |
|||
|
|||
env.execute("gizwits iot data formatter job"); |
|||
} |
|||
|
|||
|
|||
private static List<MachineIotDataReceivedEvent> transform(com.alibaba.fastjson.JSONObject event) { |
|||
|
|||
List<MachineIotDataReceivedEvent> receivedEventList = new ArrayList<>(); |
|||
MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); |
|||
if (ObjectUtil.isNotEmpty(event)) { |
|||
String pushEventCode = CommandUtils.getPushEventCode(event); |
|||
if(NotiRespPushEvents.DEVICE_OFFLINE.getCode().equals(pushEventCode)) { |
|||
// 设备下线 |
|||
OffLineEventBody offLineEventBody = CommandUtils.parsePushEvent(event, OffLineEventBody.class); |
|||
String mac = offLineEventBody.getMac(); |
|||
if(StrUtil.isNotEmpty(mac)) { |
|||
machineIotDataReceivedEvent.setId(snowflake.nextId()); |
|||
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); |
|||
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac)); |
|||
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); |
|||
machineIotDataReceivedEvent.setMachinePwrStat(0); |
|||
machineIotDataReceivedEvent.setMachineWorkingStat(0); |
|||
machineIotDataReceivedEvent.setCurrJobCount(0L); |
|||
machineIotDataReceivedEvent.setCurrJobDuration(0L); |
|||
machineIotDataReceivedEvent.setCurrStoppingDuration(0L); |
|||
machineIotDataReceivedEvent.setCurrWaitingDuration(0L); |
|||
machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); |
|||
machineIotDataReceivedEvent.setReportTime(offLineEventBody.getCreatedAt()); |
|||
receivedEventList.add(machineIotDataReceivedEvent); |
|||
return receivedEventList; |
|||
} |
|||
}else if(NotiRespPushEvents.DEVICE_ONLINE.getCode().equals(pushEventCode)) { |
|||
// 设备上线 |
|||
OnLineEventBody onLineEventBody = CommandUtils.parsePushEvent(event, OnLineEventBody.class); |
|||
String mac = onLineEventBody.getMac(); |
|||
if(StrUtil.isNotEmpty(mac)) { |
|||
machineIotDataReceivedEvent.setId(snowflake.nextId()); |
|||
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); |
|||
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac)); |
|||
machineIotDataReceivedEvent.setMachinePwrStat(1); |
|||
machineIotDataReceivedEvent.setMachineWorkingStat(2); |
|||
machineIotDataReceivedEvent.setCurrJobCount(0L); |
|||
machineIotDataReceivedEvent.setCurrJobDuration(0L); |
|||
machineIotDataReceivedEvent.setCurrStoppingDuration(0L); |
|||
machineIotDataReceivedEvent.setCurrWaitingDuration(0L); |
|||
machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); |
|||
machineIotDataReceivedEvent.setReportTime(onLineEventBody.getCreatedAt()); |
|||
receivedEventList.add(machineIotDataReceivedEvent); |
|||
return receivedEventList; |
|||
} |
|||
}else if(NotiRespPushEvents.DEVICE_STATUS_KV.getCode().equals(pushEventCode)) { |
|||
// 设备生产数据 |
|||
List<Map<String,Object>> mapList = DataParsingUtils.deviceStatusKvParsing(event); |
|||
mapList.forEach(e ->{ |
|||
JSONObject j = (JSONObject) JSON.toJSON(e); |
|||
DeviceStatus deviceStatus = j.toJavaObject(DeviceStatus.class); |
|||
machineIotDataReceivedEvent.setId(snowflake.nextId()); |
|||
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); |
|||
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(deviceStatus.getMac())); |
|||
machineIotDataReceivedEvent.setMachinePwrStat(1); |
|||
machineIotDataReceivedEvent.setMachineWorkingStat(1); |
|||
machineIotDataReceivedEvent.setCurrJobCount(deviceStatus.getCount()); |
|||
machineIotDataReceivedEvent.setCurrJobDuration(deviceStatus.getDuration()); |
|||
machineIotDataReceivedEvent.setCurrStoppingDuration(0L); |
|||
machineIotDataReceivedEvent.setCurrWaitingDuration(0L); |
|||
machineIotDataReceivedEvent.setAccJobCount(deviceStatus.getTotal()); |
|||
machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); |
|||
machineIotDataReceivedEvent.setReportTime(deviceStatus.getTimestamp().getTime()); |
|||
receivedEventList.add(machineIotDataReceivedEvent); |
|||
}); |
|||
return receivedEventList; |
|||
} |
|||
} |
|||
return receivedEventList; |
|||
} |
|||
} |
|||
@ -0,0 +1,73 @@ |
|||
package com.qniao.iot.gizwits; |
|||
|
|||
import com.gizwits.noti.noticlient.OhMyNotiClient; |
|||
import com.gizwits.noti.noticlient.bean.req.body.LoginReqCommandBody; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
|
|||
|
|||
@Data |
|||
public class GizWitsProperties { |
|||
|
|||
public GizWitsProperties() { |
|||
this.automaticConfirmation = true; |
|||
Item item = new Item(); |
|||
this.itemList.add(item); |
|||
} |
|||
|
|||
/** |
|||
* snoti 服务器 host |
|||
*/ |
|||
private String host = "snoti.gizwits.com"; |
|||
|
|||
/** |
|||
* snoti 服务器 port |
|||
*/ |
|||
private Integer port = 2017; |
|||
|
|||
/** |
|||
* 自动确认 |
|||
* 默认为true |
|||
* <p> |
|||
* 当true时, |
|||
* 通过使用 {@link OhMyNotiClient#receiveMessage()} 方法接受消息时会自动ack |
|||
* 当false时, |
|||
* 通过使用 {@link OhMyNotiClient#receiveMessage()} 方法接受消息时需要手动调用 |
|||
* 注意, 如果此时没有手动回复ack. 当达到了 preFetch{@link LoginReqCommandBody#getPrefetchCount()}时, |
|||
* snoti服务端不会再推送消息. |
|||
*/ |
|||
private Boolean automaticConfirmation; |
|||
|
|||
/** |
|||
* 登陆信息 |
|||
*/ |
|||
private List<Item> itemList = new ArrayList<>(1); |
|||
|
|||
@Data |
|||
@NoArgsConstructor |
|||
public static class Item { |
|||
|
|||
/** |
|||
* 机智云产品key |
|||
*/ |
|||
private String productKey = "816caf9e2b2141be916f204214461df4"; |
|||
|
|||
/** |
|||
* 机智云snoti auth id |
|||
*/ |
|||
private String authId = "+A6DBSboRBCeCFlHwPi3Kw"; |
|||
|
|||
/** |
|||
* 机智云snoti auth secret |
|||
*/ |
|||
private String authSecret = "TeUMuwQNTmy+JiWrLAuMHQ"; |
|||
|
|||
/** |
|||
* 机智云snoti sub key |
|||
*/ |
|||
private String subKey = "e3f9719ff3a045f8b5c8c6b1c5284108"; |
|||
} |
|||
} |
|||
@ -0,0 +1,21 @@ |
|||
package com.qniao.iot.gizwits.command; |
|||
|
|||
import com.qniao.domain.BaseCommand; |
|||
import org.apache.flink.api.common.serialization.SerializationSchema; |
|||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; |
|||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; |
|||
|
|||
|
|||
public class BaseCommandSerializationSchema implements SerializationSchema<BaseCommand> { |
|||
|
|||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); |
|||
|
|||
@Override |
|||
public byte[] serialize(BaseCommand command) { |
|||
try { |
|||
return OBJECT_MAPPER.writeValueAsBytes(command); |
|||
} catch (JsonProcessingException e) { |
|||
throw new IllegalArgumentException("Could not serialize record: " + command, e); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,19 @@ |
|||
package com.qniao.iot.gizwits.config; |
|||
|
|||
import com.ctrip.framework.apollo.Config; |
|||
import com.ctrip.framework.apollo.ConfigService; |
|||
|
|||
public class ApolloConfig { |
|||
|
|||
private static final Config config = ConfigService.getAppConfig(); |
|||
|
|||
public static String get(String key, String defaultValue) { |
|||
|
|||
return config.getProperty(key, defaultValue); |
|||
} |
|||
|
|||
public static String get(String key) { |
|||
|
|||
return config.getProperty(key, null); |
|||
} |
|||
} |
|||
@ -0,0 +1,14 @@ |
|||
package com.qniao.iot.gizwits.constant; |
|||
|
|||
public interface ConfigConstant { |
|||
|
|||
String SINK_KAFKA_BOOTSTRAP_SERVERS = "sink.kafka.bootstrap.servers"; |
|||
|
|||
String SINK_KAFKA_TOPICS = "sink.kafka.topics"; |
|||
|
|||
String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id"; |
|||
|
|||
String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; |
|||
|
|||
String SINK_OSS_PATH = "sink.oss.path"; |
|||
} |
|||
@ -0,0 +1,102 @@ |
|||
package com.qniao.iot.gizwits.source; |
|||
|
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.gizwits.noti.noticlient.OhMyNotiClient; |
|||
import com.gizwits.noti.noticlient.OhMyNotiClientImpl; |
|||
import com.gizwits.noti.noticlient.bean.req.NotiReqPushEvents; |
|||
import com.gizwits.noti.noticlient.bean.req.body.AuthorizationData; |
|||
import com.gizwits.noti.noticlient.config.SnotiCallback; |
|||
import com.gizwits.noti.noticlient.config.SnotiConfig; |
|||
import com.gizwits.noti.noticlient.enums.ProtocolType; |
|||
import com.google.common.base.Preconditions; |
|||
import com.qniao.iot.gizwits.GizWitsProperties; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.collections.CollectionUtils; |
|||
import org.apache.flink.configuration.Configuration; |
|||
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; |
|||
|
|||
import java.util.List; |
|||
import java.util.stream.Collectors; |
|||
import java.util.stream.Stream; |
|||
|
|||
@Slf4j |
|||
public class GizWitsIotSource extends RichSourceFunction<JSONObject> { |
|||
|
|||
private static Boolean RUNNING = true; |
|||
|
|||
private static OhMyNotiClient client; |
|||
|
|||
private static final GizWitsProperties PROPERTIES = new GizWitsProperties(); |
|||
|
|||
@Override |
|||
public void open(Configuration parameters) { |
|||
|
|||
log.info("开始启动gizwits客户端..."); |
|||
|
|||
client = new OhMyNotiClientImpl() |
|||
.addLoginAuthorizes(getSnotiLoginCredential()) |
|||
.setCallback(getSnotiCallback()) |
|||
.setSnotiConfig(getGizWitsConfig()); |
|||
|
|||
//启动client |
|||
client.doStart(); |
|||
|
|||
log.info("启动gizwits客户端完成"); |
|||
} |
|||
|
|||
@Override |
|||
public void run(SourceContext<JSONObject> ctx) { |
|||
while (RUNNING) { |
|||
JSONObject message = client.receiveMessage(); |
|||
ctx.collect(message); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void cancel() { |
|||
RUNNING = false; |
|||
} |
|||
|
|||
private AuthorizationData[] getSnotiLoginCredential() { |
|||
List<GizWitsProperties.Item> itemList = PROPERTIES.getItemList(); |
|||
Preconditions.checkArgument(!CollectionUtils.isEmpty(itemList), "未配置gizwits登陆信息, gizwits初始化失败"); |
|||
return itemList.stream() |
|||
.map(it -> new AuthorizationData() |
|||
//监听所有推送事件 |
|||
.setProtocolType(ProtocolType.V2) |
|||
.addEvents(NotiReqPushEvents.values()) |
|||
.setSubkey(it.getSubKey()) |
|||
.setAuth_id(it.getAuthId()) |
|||
.setAuth_secret(it.getAuthSecret()) |
|||
.setProduct_key(it.getProductKey())) |
|||
.toArray(AuthorizationData[]::new); |
|||
} |
|||
|
|||
private SnotiCallback getSnotiCallback() { |
|||
return new SnotiCallback() { |
|||
|
|||
@Override |
|||
public void loginFailed(String errorMessage) { |
|||
log.warn("gizwits登录失败, 请检查登录参数是否有效!!! errorMsg\n {}", errorMessage); |
|||
} |
|||
|
|||
@Override |
|||
public void disconnected() { |
|||
log.warn("gizwits客户端连接断开, 即将尝试重连..."); |
|||
} |
|||
|
|||
@Override |
|||
public void reload(AuthorizationData... authorizationData) { |
|||
log.info("gizwits重载登录信息[{}]...", Stream.of(authorizationData).map(AuthorizationData::toString) |
|||
.collect(Collectors.joining(","))); |
|||
} |
|||
}; |
|||
} |
|||
|
|||
private SnotiConfig getGizWitsConfig() { |
|||
return new SnotiConfig() |
|||
.setAutomaticConfirmation(PROPERTIES.getAutomaticConfirmation()) |
|||
.setHost(PROPERTIES.getHost()) |
|||
.setPort(PROPERTIES.getPort()); |
|||
} |
|||
} |
|||
@ -0,0 +1,260 @@ |
|||
package com.qniao.iot.gizwits.util; |
|||
|
|||
|
|||
import com.alibaba.fastjson.JSONObject; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
|
|||
import java.math.BigDecimal; |
|||
import java.text.DecimalFormat; |
|||
import java.text.SimpleDateFormat; |
|||
import java.time.Instant; |
|||
import java.time.LocalDateTime; |
|||
import java.time.ZoneOffset; |
|||
import java.util.*; |
|||
|
|||
/** |
|||
* Created by ZGF on 2020/10/30. |
|||
*/ |
|||
public class DataParsingUtils { |
|||
|
|||
private static final String CREATED_AT = "created_at"; |
|||
|
|||
private static final String TIMESTAMP = "timestamp"; |
|||
|
|||
private static final String LATITUDE = "latitude"; |
|||
|
|||
private static final String LONGITUDE = "longitude"; |
|||
|
|||
private static final String KEEP_ALIVE = "keep_alive"; |
|||
|
|||
private static String[][] getCols(){ |
|||
String[][] colsarray = new String[8][5]; |
|||
for(int i=0;i<8;i++){ |
|||
int p = i+1; |
|||
colsarray[i]=new String[]{TIMESTAMP+"_"+p,"state_"+p,"duration_"+p,"count_"+p,"total_"+p}; |
|||
} |
|||
return colsarray; |
|||
} |
|||
|
|||
private static String getColMapping(String colName){ |
|||
return colName.split("_")[0]; |
|||
} |
|||
|
|||
|
|||
/** |
|||
* 字符串时间转换为时间戳 |
|||
* @param string |
|||
* @return |
|||
*/ |
|||
public static Long stringToTimestamp(String string){ |
|||
if(StringUtils.isNotBlank(string)) { |
|||
char[] d = string.toCharArray(); |
|||
String datatime = String.valueOf(d[6] + "" + d[7]); |
|||
datatime = datatime + String.valueOf(d[4] + "" + d[5]); |
|||
datatime = datatime + String.valueOf(d[2] + "" + d[3]); |
|||
datatime = datatime + String.valueOf(d[0] + "" + d[1]); |
|||
return Long.parseLong(datatime, 16); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
public static LocalDateTime getDateLocalDateTime(String string){ |
|||
return Instant.ofEpochSecond(stringToTimestamp(string)).atOffset(ZoneOffset.of("+08:00")).toLocalDateTime(); |
|||
} |
|||
|
|||
public static LocalDateTime getDateLocalDateTime(Long times){ |
|||
return Instant.ofEpochSecond(times).atOffset(ZoneOffset.of("+08:00")).toLocalDateTime(); |
|||
} |
|||
|
|||
|
|||
|
|||
|
|||
public static String timesTampDateFormat(String string){ |
|||
Long timesTamp = stringToTimestamp(string); |
|||
if(timesTamp != null) { |
|||
|
|||
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timesTamp); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
public static String timesTampDateFormat(Long timesTamp){ |
|||
if(timesTamp != null) { |
|||
return new SimpleDateFormat("yyyy-MM-dd").format(new Date(timesTamp*1000L)); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
public static List<Map<String,Object>> deviceStatusKvParsing(JSONObject message){ |
|||
String[] row = new String[]{"cmd",CREATED_AT,"delivery_id","did","event_type","mac","msg_id","product_key"}; |
|||
String[] cl = new String[]{"sn","signal","cur_state","cur_speed"}; |
|||
List<Map<String,Object>> list = new ArrayList<>(); |
|||
String[][] cols = getCols(); |
|||
Map<String,Object> jsonObject = (Map<String, Object>) message.get("data"); |
|||
if(jsonObject != null){ |
|||
for(String[] c: cols){ |
|||
Map<String,Object> map =new HashMap<>(); |
|||
boolean bool = false; |
|||
for(String col : c){ |
|||
if(jsonObject.containsKey(col)){ |
|||
bool = true; |
|||
if(col.contains(TIMESTAMP)){ |
|||
LocalDateTime localDateTime = getDateLocalDateTime(jsonObject.get(col).toString()); |
|||
map.put(getColMapping(col),localDateTime); |
|||
map.put("partition_column",localDateTime.getMonthValue()); |
|||
}else { |
|||
map.put(getColMapping(col), jsonObject.get(col)); |
|||
} |
|||
} |
|||
} |
|||
if(bool) { |
|||
list.add(map); |
|||
} |
|||
} |
|||
|
|||
for(Map<String,Object> map : list){ |
|||
for(String s : cl){ |
|||
if(jsonObject.containsKey(s)){ |
|||
map.put(s,jsonObject.get(s)); |
|||
} |
|||
} |
|||
} |
|||
|
|||
for(Map<String,Object> map : list){ |
|||
for(String s :row){ |
|||
if(message.containsKey(s)){ |
|||
if(s.equals(CREATED_AT)) { |
|||
BigDecimal bigDecimal = new BigDecimal(String.valueOf(message.get(s))); |
|||
map.put(s,getDateLocalDateTime(bigDecimal.longValue())); |
|||
}else{ |
|||
map.put(s, message.get(s)); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
return list; |
|||
} |
|||
|
|||
public static JSONObject parsingOfflineMessage(JSONObject message){ |
|||
|
|||
if(message.containsKey("data")){ |
|||
JSONObject o = message.getJSONObject("data"); |
|||
if(o.containsKey("duration")){ |
|||
message.put("duration",o.get("duration")); |
|||
} |
|||
if(o.containsKey("reason")){ |
|||
message.put("reason",o.get("reason")); |
|||
} |
|||
if(o.containsKey("heartbeat")){ |
|||
JSONObject jb = o.getJSONObject("heartbeat"); |
|||
if(jb.containsKey("min")){ |
|||
message.put("min",jb.get("min")); |
|||
} |
|||
if(jb.containsKey("avg")){ |
|||
message.put("avg",jb.get("avg")); |
|||
} |
|||
if(jb.containsKey("last")){ |
|||
message.put("last",jb.get("last")); |
|||
} |
|||
if(jb.containsKey("max")){ |
|||
message.put("max",jb.get("max")); |
|||
} |
|||
if(jb.containsKey("count")){ |
|||
message.put("count",jb.get("count")); |
|||
} |
|||
} |
|||
} |
|||
return message; |
|||
} |
|||
|
|||
public static JSONObject parsingOnlineMessage(JSONObject message){ |
|||
if(message.containsKey("data")){ |
|||
JSONObject o = message.getJSONObject("data"); |
|||
if(o.containsKey(KEEP_ALIVE)){ |
|||
message.put(KEEP_ALIVE,o.get(KEEP_ALIVE)); |
|||
} |
|||
} |
|||
return message; |
|||
} |
|||
|
|||
public static JSONObject parsingMessage(JSONObject message){ |
|||
if(message != null){ |
|||
if(message.containsKey(CREATED_AT)){ |
|||
BigDecimal bigDecimal = new BigDecimal(String.valueOf(message.get(CREATED_AT))); |
|||
message.put(CREATED_AT,getDateLocalDateTime(bigDecimal.longValue())); |
|||
} |
|||
|
|||
if(message.containsKey(LATITUDE)){ |
|||
DecimalFormat df = new DecimalFormat("#.000000"); |
|||
message.put(LATITUDE,df.format(new Double(String.valueOf(message.get(LATITUDE))))); |
|||
} |
|||
|
|||
if(message.containsKey(LONGITUDE)){ |
|||
DecimalFormat df = new DecimalFormat("#.000000"); |
|||
message.put(LONGITUDE,df.format(new Double(String.valueOf(message.get(LONGITUDE))))); |
|||
} |
|||
} |
|||
return message; |
|||
} |
|||
|
|||
/* public static void main(String[] args) { |
|||
String str = "{\n" + |
|||
"\t\"event_type\": \"device_offline\",\n" + |
|||
"\t\"product_key\": \"816caf9e2b2141be916f204214461df4\",\n" + |
|||
"\t\"data\": {\n" + |
|||
"\t\t\"duration\": 13280,\n" + |
|||
"\t\t\"reason\": \"no_heartbeat\",\n" + |
|||
"\t\t\"heartbeat\": {\n" + |
|||
"\t\t\t\"min\": 48,\n" + |
|||
"\t\t\t\"avg\": 50,\n" + |
|||
"\t\t\t\"last\": 130,\n" + |
|||
"\t\t\t\"max\": 52,\n" + |
|||
"\t\t\t\"count\": 263\n" + |
|||
"\t\t}\n" + |
|||
"\t},\n" + |
|||
"\t\"delivery_id\": 29,\n" + |
|||
"\t\"ip\": \"39.144.3.158\",\n" + |
|||
"\t\"created_at\": 1604289336.56731605530,\n" + |
|||
"\t\"cmd\": \"event_push\",\n" + |
|||
"\t\"msg_id\": \"jJqZWskgSAODEG6x1ksSjw\",\n" + |
|||
"\t\"mac\": \"861193040025935\",\n" + |
|||
"\t\"did\": \"hSOUIuLIYsPpaydiFFIA7M\"\n" + |
|||
"}"; |
|||
JSONObject j = JSON.parseObject(str); |
|||
JSONObject x = parsingMessage(j); |
|||
System.out.println(x); |
|||
|
|||
DeviceOffline deviceOffline = x.toJavaObject(DeviceOffline.class); |
|||
System.out.println(JSON.toJSONString(deviceOffline)); |
|||
|
|||
|
|||
String str2 = "{\n" + |
|||
"\t\"country\": \"China\",\n" + |
|||
"\t\"data\": {\n" + |
|||
"\t\t\"keep_alive\": 130\n" + |
|||
"\t},\n" + |
|||
"\t\"city\": \"Unkown\",\n" + |
|||
"\t\"delivery_id\": 72,\n" + |
|||
"\t\"ip\": \"39.144.7.209\",\n" + |
|||
"\t\"latitude\": 34.7725000000000008527,\n" + |
|||
"\t\"created_at\": 1604295582.08462190628,\n" + |
|||
"\t\"mac\": \"861193040025935\",\n" + |
|||
"\t\"event_type\": \"device_online\",\n" + |
|||
"\t\"product_key\": \"816caf9e2b2141be916f204214461df4\",\n" + |
|||
"\t\"cmd\": \"event_push\",\n" + |
|||
"\t\"msg_id\": \"FZ3AUG7/Sge7BohizNzJ/g\",\n" + |
|||
"\t\"region\": \"Unkown\",\n" + |
|||
"\t\"did\": \"hSOUIuLIYsPpaydiFFIA7M\",\n" + |
|||
"\t\"longitude\": 113.726600000000004798\n" + |
|||
"}"; |
|||
JSONObject j2 = JSON.parseObject(str2); |
|||
JSONObject x2 = parsingMessage(j2); |
|||
System.out.println(x2); |
|||
|
|||
DeviceOnline deviceOnline = x.toJavaObject(DeviceOnline.class); |
|||
System.out.println(JSON.toJSONString(deviceOnline)); |
|||
|
|||
}*/ |
|||
} |
|||
@ -0,0 +1,101 @@ |
|||
package com.qniao.iot.gizwits.util; |
|||
|
|||
/** |
|||
* 雪花ID算法 |
|||
*/ |
|||
public class SnowFlake { |
|||
|
|||
/** |
|||
* 起始的时间戳 |
|||
*/ |
|||
private final static long START_STMP = 1480166465631L; |
|||
|
|||
/** |
|||
* 每一部分占用的位数 |
|||
*/ |
|||
private final static long SEQUENCE_BIT = 12; //序列号占用的位数 |
|||
private final static long MACHINE_BIT = 5; //机器标识占用的位数 |
|||
private final static long DATACENTER_BIT = 5;//数据中心占用的位数 |
|||
|
|||
/** |
|||
* 每一部分的最大值 |
|||
*/ |
|||
private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT); |
|||
private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT); |
|||
private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT); |
|||
|
|||
/** |
|||
* 每一部分向左的位移 |
|||
*/ |
|||
private final static long MACHINE_LEFT = SEQUENCE_BIT; |
|||
private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT; |
|||
private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT; |
|||
|
|||
private long datacenterId = 1L; //数据中心 |
|||
private long machineId = 1L; //机器标识 |
|||
private long sequence = 0L; //序列号 |
|||
private long lastStmp = -1L;//上一次时间戳 |
|||
|
|||
// public SnowFlake(){ |
|||
// } |
|||
|
|||
public SnowFlake(long datacenterId, |
|||
long machineId) { |
|||
if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) { |
|||
throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0"); |
|||
} |
|||
if (machineId > MAX_MACHINE_NUM || machineId < 0) { |
|||
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0"); |
|||
} |
|||
this.datacenterId = datacenterId; |
|||
this.machineId = machineId; |
|||
} |
|||
|
|||
/** |
|||
* 产生下一个ID |
|||
* |
|||
* @return |
|||
*/ |
|||
public synchronized Long nextId() { |
|||
long currStmp = getNewstmp(); |
|||
if (currStmp < lastStmp) { |
|||
throw new RuntimeException("Clock moved backwards. Refusing to generate id"); |
|||
} |
|||
|
|||
if (currStmp == lastStmp) { |
|||
//相同毫秒内,序列号自增 |
|||
sequence = (sequence + 1) & MAX_SEQUENCE; |
|||
//同一毫秒的序列数已经达到最大 |
|||
if (sequence == 0L) { |
|||
currStmp = getNextMill(); |
|||
} |
|||
} else { |
|||
//不同毫秒内,序列号置为0 |
|||
sequence = 0L; |
|||
} |
|||
|
|||
lastStmp = currStmp; |
|||
|
|||
return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分 |
|||
| datacenterId << DATACENTER_LEFT //数据中心部分 |
|||
| machineId << MACHINE_LEFT //机器标识部分 |
|||
| sequence; //序列号部分 |
|||
} |
|||
|
|||
private long getNextMill() { |
|||
long mill = getNewstmp(); |
|||
while (mill <= lastStmp) { |
|||
mill = getNewstmp(); |
|||
} |
|||
return mill; |
|||
} |
|||
|
|||
private long getNewstmp() { |
|||
return System.currentTimeMillis(); |
|||
} |
|||
|
|||
public static void main(String[] args) { |
|||
SnowFlake s = new SnowFlake(1, 1); |
|||
System.out.println(s.nextId()); |
|||
} |
|||
} |
|||
@ -0,0 +1,5 @@ |
|||
app.id=iot-gizwits-model-formatter |
|||
|
|||
# test 8.135.8.221 |
|||
# prod 47.112.164.224 |
|||
apollo.meta=http://47.112.164.224:5000 |
|||
@ -0,0 +1,25 @@ |
|||
################################################################################ |
|||
# Licensed to the Apache Software Foundation (ASF) under one |
|||
# or more contributor license agreements. See the NOTICE file |
|||
# distributed with this work for additional information |
|||
# regarding copyright ownership. The ASF licenses this file |
|||
# to you under the Apache License, Version 2.0 (the |
|||
# "License"); you may not use this file except in compliance |
|||
# with the License. You may obtain a copy of the License at |
|||
# |
|||
# http://www.apache.org/licenses/LICENSE-2.0 |
|||
# |
|||
# Unless required by applicable law or agreed to in writing, software |
|||
# distributed under the License is distributed on an "AS IS" BASIS, |
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
# See the License for the specific language governing permissions and |
|||
# limitations under the License. |
|||
################################################################################ |
|||
|
|||
rootLogger.level = INFO |
|||
rootLogger.appenderRef.console.ref = ConsoleAppender |
|||
|
|||
appender.console.name = ConsoleAppender |
|||
appender.console.type = CONSOLE |
|||
appender.console.layout.type = PatternLayout |
|||
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n |
|||
@ -0,0 +1,15 @@ |
|||
package com.qniao; |
|||
|
|||
import java.time.Instant; |
|||
import java.time.LocalDateTime; |
|||
import java.time.ZoneOffset; |
|||
import java.time.format.DateTimeFormatter; |
|||
|
|||
public class Test1 { |
|||
|
|||
public static void main(String[] args) { |
|||
Long receivedTime = LocalDateTime.now().toInstant(ZoneOffset.ofHours(+8)).toEpochMilli(); |
|||
System.out.println(LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8")) |
|||
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); |
|||
} |
|||
} |
|||
@ -0,0 +1,25 @@ |
|||
package com.qniao; |
|||
|
|||
import com.aliyun.oss.OSSClient; |
|||
import com.aliyun.oss.model.ObjectMetadata; |
|||
|
|||
import java.io.ByteArrayInputStream; |
|||
|
|||
public class TestOss { |
|||
|
|||
public static void main(String[] args) { |
|||
|
|||
OSSClient ossClient = new OSSClient("oss-cn-shenzhen.aliyuncs.com", "LTAINmC91NqIGN38", "Hh10dQPjq1jMLLSpbDAR05ZzR3nXsU"); |
|||
|
|||
String str = "weqgwrgwefqwefwefqerwegwefwefgweghjtyjtyjergyjrrfwgrth"; |
|||
ObjectMetadata objectMetadata = new ObjectMetadata(); |
|||
objectMetadata.setContentLength(str.length()); |
|||
objectMetadata.setCacheControl("no-cache"); |
|||
objectMetadata.setHeader("Pragma", "no-cache"); |
|||
objectMetadata.setContentType("application/json"); |
|||
objectMetadata.setContentEncoding("utf-8"); |
|||
objectMetadata.setContentDisposition("inline;filename=" + "213.json"); |
|||
ossClient.putObject("qn-data-lake", "gizwits-model-reported-data/213.json", new ByteArrayInputStream(str.getBytes()),objectMetadata); |
|||
ossClient.shutdown(); |
|||
} |
|||
} |
|||
@ -0,0 +1,20 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" |
|||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|||
<modelVersion>4.0.0</modelVersion> |
|||
|
|||
<groupId>org.example</groupId> |
|||
<artifactId>iot-gizwits-model-formatter</artifactId> |
|||
<packaging>pom</packaging> |
|||
<version>1.0-SNAPSHOT</version> |
|||
<modules> |
|||
<module>iot-gizwits-statistics</module> |
|||
</modules> |
|||
|
|||
<properties> |
|||
<maven.compiler.source>8</maven.compiler.source> |
|||
<maven.compiler.target>8</maven.compiler.target> |
|||
</properties> |
|||
|
|||
</project> |
|||
Write
Preview
Loading…
Cancel
Save