diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index ac7c04b..f9bfd21 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -29,6 +29,8 @@ public class RootCloudIotDataEventSourceMocker { String sql = "select iot_mac\n" + "from qn_machine_realtime_state\n" + "where is_delete = 0"; + + // 设备标识 List assetIdList = Db.use().query(sql, String.class); // 电源状态(0断电 1有电) diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 3ab4748..0bfa210 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -27,14 +27,17 @@ import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchem import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import java.math.BigDecimal; import java.util.Objects; @@ -86,6 +89,14 @@ public class RootCloudIotDataFormatterJob { .build() ).name("MachineIotDataReceivedEvent Sink"); + // 发送到OSS存储 + String outputPath = "oss://qn-flink-test/root-cloud-model-hw-reported-data"; + StreamingFileSink sink = StreamingFileSink.forRowFormat( + new Path(outputPath), + new SimpleStringEncoder("UTF-8") + ).build(); + transformDs.addSink(sink); + env.execute("root cloud iot data formatter job"); } diff --git a/root-cloud-statistics/src/main/resources/META-INF/app.properties b/root-cloud-statistics/src/main/resources/META-INF/app.properties index 996faec..1452591 100644 --- a/root-cloud-statistics/src/main/resources/META-INF/app.properties +++ b/root-cloud-statistics/src/main/resources/META-INF/app.properties @@ -1,3 +1,3 @@ app.id=root-cloud-model-hw-formatter -apollo.meta=http://8.135.8.221:5000 \ No newline at end of file +apollo.meta=http://47.112.164.224:5000 \ No newline at end of file diff --git a/root-cloud-statistics/target/classes/META-INF/app.properties b/root-cloud-statistics/target/classes/META-INF/app.properties deleted file mode 100644 index 996faec..0000000 --- a/root-cloud-statistics/target/classes/META-INF/app.properties +++ /dev/null @@ -1,3 +0,0 @@ -app.id=root-cloud-model-hw-formatter - -apollo.meta=http://8.135.8.221:5000 \ No newline at end of file diff --git a/root-cloud-statistics/target/classes/log4j2.properties b/root-cloud-statistics/target/classes/log4j2.properties deleted file mode 100644 index 32c696e..0000000 --- a/root-cloud-statistics/target/classes/log4j2.properties +++ /dev/null @@ -1,25 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -rootLogger.level = INFO -rootLogger.appenderRef.console.ref = ConsoleAppender - -appender.console.name = ConsoleAppender -appender.console.type = CONSOLE -appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n