Browse Source

修改oss sink

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
a050926ab2
1 changed files with 55 additions and 5 deletions
  1. 60
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

60
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -18,6 +18,8 @@
package com.qniao.iot.rc;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.rc.config.ApolloConfig;
import com.qniao.iot.rc.constant.ConfigConstant;
@ -28,6 +30,7 @@ import com.qniao.iot.rc.until.SnowFlake;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
@ -35,10 +38,14 @@ import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -47,6 +54,10 @@ import org.apache.kafka.common.config.SaslConfigs;
import java.math.BigDecimal;
import java.security.Provider;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
/**
@ -98,7 +109,7 @@ public class RootCloudIotDataFormatterJob {
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
kafkaProducerConfig.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
// 写入kafka
transformDs.sinkTo(
KafkaSink.<MachineIotDataReceivedEvent>builder()
@ -115,11 +126,50 @@ public class RootCloudIotDataFormatterJob {
// 发送到OSS存储
String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH);
StreamingFileSink<MachineIotDataReceivedEvent> sink = StreamingFileSink.forRowFormat(
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<MachineIotDataReceivedEvent>("UTF-8")
).build();
transformDs.addSink(sink);
new SimpleStringEncoder<String>(CharsetUtil.UTF_8)
).withBucketAssigner(new BucketAssigner<String, String>() {
@Override
public String getBucketId(String element, Context context) {
MachineIotDataReceivedEvent receivedEvent = JSONUtil.toBean(element, MachineIotDataReceivedEvent.class);
// 指定以日期的格式生成桶目录
Long receivedTime = receivedEvent.getReceivedTime();
// 获取设备状态目录名称
String deviceStatusStr = getDeviceStatusStr(receivedEvent.getMachineWorkingStat());
return deviceStatusStr + "/" + LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8"))
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "/" + receivedEvent.getMachineIotMac();
}
private String getDeviceStatusStr(Integer machineWorkingStat) {
if(machineWorkingStat == null) {
return "deviceOff";
}else {
if (machineWorkingStat == 1) {
return "deviceWorking";
} else if (machineWorkingStat == 2) {
return "deviceWaiting";
} else {
return "deviceOff";
}
}
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build();
transformDs.map(new RichMapFunction<MachineIotDataReceivedEvent, String>() {
@Override
public String map(MachineIotDataReceivedEvent value) {
return JSONUtil.toJsonStr(value);
}
}).addSink(sink);
env.execute("root cloud iot data formatter job");
}

Loading…
Cancel
Save