diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index ce1e330..8b7009d 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -26,23 +26,25 @@ public class RootCloudIotDataEventSourceMocker { String topic = "root_cloud_iot_report_data_event"; // 设备标识 - List assetIdList = Arrays.asList("10000","20000","30000","40000","5000","6000"); - // 电源状态 - List pwrStaList = Arrays.asList(0, 1); + List assetIdList = Arrays.asList("10000","20000","30000","40000","50000","60000"); + // 电源状态(0断电 1有电) + List pwrStaList = Arrays.asList(1); // 设备工作状态(0停机 1工作 2待机) List accStaList = Arrays.asList(0, 1, 2); + + // 循环发送事件 while (true) { RootCloudIotDataReceiptedEvent event = new RootCloudIotDataReceiptedEvent(); event.set__assetId__(RandomUtil.randomEles(assetIdList, 1).get(0)); - event.setACC_count(50L); - event.setACC_count_total(500L); + event.setACC_count(RandomUtil.randomLong(100L)); + event.setACC_count_total(RandomUtil.randomLong(500L)); event.setPWR_sta(RandomUtil.randomEles(pwrStaList, 1).get(0)); - event.setWorking_sta(RandomUtil.randomEles(accStaList, 1).get(0)); - event.setStoping_duration("100"); - event.setRunning_duration(new BigDecimal(1250)); - event.setWaiting_duration(new BigDecimal(500)); + event.setWorking_sta(generateWorkingSta(RandomUtil.randomEles(accStaList, 1).get(0), event.getPWR_sta())); + event.setStoping_duration(RandomUtil.randomNumbers(100)); + event.setRunning_duration(RandomUtil.randomBigDecimal(BigDecimal.valueOf(1250))); + event.setWaiting_duration(RandomUtil.randomBigDecimal(BigDecimal.valueOf(500))); ProducerRecord record = new RootCloudIotDataEventSerialization(topic).serialize( event, null); @@ -53,6 +55,15 @@ public class RootCloudIotDataEventSourceMocker { } } + private static Integer generateWorkingSta(Integer workingSta, Integer pwrSta) { + + if(pwrSta.equals(0)) { + return 0; + }else { + return workingSta; + } + } + private static Properties createKafkaProperties() { Properties kafkaProps = new Properties(); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.19.124.230:9092"); diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 2a8588d..67fa675 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -59,15 +59,15 @@ public class RootCloudIotDataFormatterJob { public static void main(String[] args) throws Exception { - final ParameterTool params = ParameterTool.fromArgs(args); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // 设置并行度为1,并行度要小于等于kafka topic的分区数,否则其他并行度分配不到数据 + env.setParallelism(1); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); KafkaSource source = KafkaSource.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) - .setStartingOffsets(OffsetsInitializer.earliest()) + .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); @@ -102,8 +102,7 @@ public class RootCloudIotDataFormatterJob { machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__())); machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta()); - // 工作状态 - machineIotDataReceivedEvent.setMachineWorkingStat(event.getACC_sta()); + machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta()); machineIotDataReceivedEvent.setIgStat(event.getIG_sta()); machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total()); machineIotDataReceivedEvent.setCurrJobCount(event.getACC_count()); diff --git a/root-cloud-statistics/src/main/resources/META-INF/app.properties b/root-cloud-statistics/src/main/resources/META-INF/app.properties index 5acbd94..996faec 100644 --- a/root-cloud-statistics/src/main/resources/META-INF/app.properties +++ b/root-cloud-statistics/src/main/resources/META-INF/app.properties @@ -1,3 +1,3 @@ -app.id=iot-root-cloud-model-hw-formatter +app.id=root-cloud-model-hw-formatter apollo.meta=http://8.135.8.221:5000 \ No newline at end of file diff --git a/root-cloud-statistics/target/classes/META-INF/app.properties b/root-cloud-statistics/target/classes/META-INF/app.properties index beda063..996faec 100644 --- a/root-cloud-statistics/target/classes/META-INF/app.properties +++ b/root-cloud-statistics/target/classes/META-INF/app.properties @@ -1,3 +1,3 @@ -app.id=ztb-supply-chain-service +app.id=root-cloud-model-hw-formatter apollo.meta=http://8.135.8.221:5000 \ No newline at end of file