Browse Source

更新

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
a4ec3ba409
5 changed files with 14 additions and 29 deletions
  1. 2
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java
  2. 11
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  3. 2
      root-cloud-statistics/src/main/resources/META-INF/app.properties
  4. 3
      root-cloud-statistics/target/classes/META-INF/app.properties
  5. 25
      root-cloud-statistics/target/classes/log4j2.properties

2
root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java

@ -29,6 +29,8 @@ public class RootCloudIotDataEventSourceMocker {
String sql = "select iot_mac\n" + String sql = "select iot_mac\n" +
"from qn_machine_realtime_state\n" + "from qn_machine_realtime_state\n" +
"where is_delete = 0"; "where is_delete = 0";
// 设备标识 // 设备标识
List<String> assetIdList = Db.use().query(sql, String.class); List<String> assetIdList = Db.use().query(sql, String.class);
// 电源状态0断电 1有电 // 电源状态0断电 1有电

11
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -27,14 +27,17 @@ import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchem
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Objects; import java.util.Objects;
@ -86,6 +89,14 @@ public class RootCloudIotDataFormatterJob {
.build() .build()
).name("MachineIotDataReceivedEvent Sink"); ).name("MachineIotDataReceivedEvent Sink");
// 发送到OSS存储
String outputPath = "oss://qn-flink-test/root-cloud-model-hw-reported-data";
StreamingFileSink<MachineIotDataReceivedEvent> sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<MachineIotDataReceivedEvent>("UTF-8")
).build();
transformDs.addSink(sink);
env.execute("root cloud iot data formatter job"); env.execute("root cloud iot data formatter job");
} }

2
root-cloud-statistics/src/main/resources/META-INF/app.properties

@ -1,3 +1,3 @@
app.id=root-cloud-model-hw-formatter app.id=root-cloud-model-hw-formatter
apollo.meta=http://8.135.8.221:5000
apollo.meta=http://47.112.164.224:5000

3
root-cloud-statistics/target/classes/META-INF/app.properties

@ -1,3 +0,0 @@
app.id=root-cloud-model-hw-formatter
apollo.meta=http://8.135.8.221:5000

25
root-cloud-statistics/target/classes/log4j2.properties

@ -1,25 +0,0 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Loading…
Cancel
Save