diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index ec75f56..ac7c04b 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -15,7 +15,7 @@ import java.util.Properties; public class RootCloudIotDataEventSourceMocker { // 延迟:毫秒 - public static final long DELAY = 1000; + public static final long DELAY = 5000; public static void main(String[] args) throws Exception { // 创建kafka配置属性 @@ -26,7 +26,9 @@ public class RootCloudIotDataEventSourceMocker { String topic = "root_cloud_iot_report_data_event"; - String sql = "select mac from qn_equipment_information where is_delete = 0"; + String sql = "select iot_mac\n" + + "from qn_machine_realtime_state\n" + + "where is_delete = 0"; // 设备标识 List assetIdList = Db.use().query(sql, String.class); // 电源状态(0断电 1有电) @@ -42,7 +44,11 @@ public class RootCloudIotDataEventSourceMocker { event.setACC_count(RandomUtil.randomLong(100L)); event.setACC_count_total(RandomUtil.randomLong(500L)); event.setPWR_sta(RandomUtil.randomEles(pwrStaList, 1).get(0)); - event.setWorking_sta(generateWorkingSta(RandomUtil.randomEles(accStaList, 1).get(0), event.getPWR_sta())); + if(event.getPWR_sta().equals(0)) { + event.setWorking_sta(0); + }else { + event.setWorking_sta(generateWorkingSta(RandomUtil.randomEles(accStaList, 1).get(0), event.getPWR_sta())); + } event.setStoping_duration(RandomUtil.randomNumbers(100)); event.setRunning_duration(RandomUtil.randomBigDecimal(BigDecimal.valueOf(1250))); event.setWaiting_duration(RandomUtil.randomBigDecimal(BigDecimal.valueOf(500))); @@ -67,7 +73,7 @@ public class RootCloudIotDataEventSourceMocker { private static Properties createKafkaProperties() { Properties kafkaProps = new Properties(); - kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.135.8.221:9092"); + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); return kafkaProps; diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 2f82878..3ab4748 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -58,20 +58,18 @@ public class RootCloudIotDataFormatterJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // 设置并行度为1,并行度要小于等于kafka topic的分区数,否则其他并行度分配不到数据 - env.setParallelism(1); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); KafkaSource source = KafkaSource.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) - .setStartingOffsets(OffsetsInitializer.earliest()) + .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); // 把树根的数据转成我们自己的格式 SingleOutputStreamOperator transformDs = env - .fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source") + .fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source") .map((MapFunction) RootCloudIotDataFormatterJob::transform) .name("Transform MachineIotDataReceivedEvent");