Browse Source

更新

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
9e51b83a16
2 changed files with 12 additions and 8 deletions
  1. 14
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java
  2. 6
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

14
root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java

@ -15,7 +15,7 @@ import java.util.Properties;
public class RootCloudIotDataEventSourceMocker { public class RootCloudIotDataEventSourceMocker {
// 延迟毫秒 // 延迟毫秒
public static final long DELAY = 1000;
public static final long DELAY = 5000;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// 创建kafka配置属性 // 创建kafka配置属性
@ -26,7 +26,9 @@ public class RootCloudIotDataEventSourceMocker {
String topic = "root_cloud_iot_report_data_event"; String topic = "root_cloud_iot_report_data_event";
String sql = "select mac from qn_equipment_information where is_delete = 0";
String sql = "select iot_mac\n" +
"from qn_machine_realtime_state\n" +
"where is_delete = 0";
// 设备标识 // 设备标识
List<String> assetIdList = Db.use().query(sql, String.class); List<String> assetIdList = Db.use().query(sql, String.class);
// 电源状态0断电 1有电 // 电源状态0断电 1有电
@ -42,7 +44,11 @@ public class RootCloudIotDataEventSourceMocker {
event.setACC_count(RandomUtil.randomLong(100L)); event.setACC_count(RandomUtil.randomLong(100L));
event.setACC_count_total(RandomUtil.randomLong(500L)); event.setACC_count_total(RandomUtil.randomLong(500L));
event.setPWR_sta(RandomUtil.randomEles(pwrStaList, 1).get(0)); event.setPWR_sta(RandomUtil.randomEles(pwrStaList, 1).get(0));
event.setWorking_sta(generateWorkingSta(RandomUtil.randomEles(accStaList, 1).get(0), event.getPWR_sta()));
if(event.getPWR_sta().equals(0)) {
event.setWorking_sta(0);
}else {
event.setWorking_sta(generateWorkingSta(RandomUtil.randomEles(accStaList, 1).get(0), event.getPWR_sta()));
}
event.setStoping_duration(RandomUtil.randomNumbers(100)); event.setStoping_duration(RandomUtil.randomNumbers(100));
event.setRunning_duration(RandomUtil.randomBigDecimal(BigDecimal.valueOf(1250))); event.setRunning_duration(RandomUtil.randomBigDecimal(BigDecimal.valueOf(1250)));
event.setWaiting_duration(RandomUtil.randomBigDecimal(BigDecimal.valueOf(500))); event.setWaiting_duration(RandomUtil.randomBigDecimal(BigDecimal.valueOf(500)));
@ -67,7 +73,7 @@ public class RootCloudIotDataEventSourceMocker {
private static Properties createKafkaProperties() { private static Properties createKafkaProperties() {
Properties kafkaProps = new Properties(); Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "8.135.8.221:9092");
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
return kafkaProps; return kafkaProps;

6
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -58,20 +58,18 @@ public class RootCloudIotDataFormatterJob {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1并行度要小于等于kafka topic的分区数否则其他并行度分配不到数据
env.setParallelism(1);
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<RootCloudIotDataReceiptedEvent> source = KafkaSource.<RootCloudIotDataReceiptedEvent>builder() KafkaSource<RootCloudIotDataReceiptedEvent> source = KafkaSource.<RootCloudIotDataReceiptedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID))
.setStartingOffsets(OffsetsInitializer.earliest())
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema())
.build(); .build();
// 把树根的数据转成我们自己的格式 // 把树根的数据转成我们自己的格式
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = env SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source")
.fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source")
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) .map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) RootCloudIotDataFormatterJob::transform)
.name("Transform MachineIotDataReceivedEvent"); .name("Transform MachineIotDataReceivedEvent");

Loading…
Cancel
Save