5 changed files with 44 additions and 94 deletions
Split View
Diff Options
-
6root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java
-
6root-cloud-statistics/dependency-reduced-pom.xml
-
18root-cloud-statistics/pom.xml
-
18root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
-
90root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java
@ -1,90 +0,0 @@ |
|||
package com.qniao.iot.rc; |
|||
|
|||
import cn.hutool.core.collection.ListUtil; |
|||
import cn.hutool.core.util.StrUtil; |
|||
import cn.hutool.json.JSONUtil; |
|||
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|||
import org.apache.flink.api.common.functions.FilterFunction; |
|||
import org.apache.flink.api.common.functions.RichFlatMapFunction; |
|||
import org.apache.flink.connector.kafka.source.KafkaSource; |
|||
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|||
import org.apache.flink.streaming.api.CheckpointingMode; |
|||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|||
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; |
|||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; |
|||
import org.apache.flink.streaming.api.windowing.time.Time; |
|||
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; |
|||
import org.apache.flink.util.Collector; |
|||
import org.apache.kafka.clients.consumer.ConsumerConfig; |
|||
|
|||
import java.time.Duration; |
|||
import java.time.LocalDateTime; |
|||
import java.time.ZoneOffset; |
|||
import java.util.*; |
|||
|
|||
|
|||
public class CloudBoxEventJob { |
|||
|
|||
public static void main(String[] args) throws Exception { |
|||
|
|||
|
|||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|||
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|||
env.getConfig().setAutoWatermarkInterval(1000L); |
|||
env.setParallelism(1); |
|||
/*Map<TopicPartition, Long> offsets = new HashMap<>(); |
|||
TopicPartition topicPartition = new TopicPartition("data-message-channel-qn", 0); |
|||
offsets.put(topicPartition, 5872534L);*/ |
|||
KafkaSource<CloudBoxDataHistoryEvent> source = KafkaSource.<CloudBoxDataHistoryEvent>builder() |
|||
.setBootstrapServers("172.19.14.225:9092") |
|||
.setTopics("data-message-channel-qn") |
|||
.setGroupId("cloud_box_event_job") |
|||
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") |
|||
.setStartingOffsets(OffsetsInitializer.earliest()) |
|||
.setValueOnlyDeserializer(new CloudBoxDataHistoryEventDeserializationSchema()) |
|||
.build(); |
|||
|
|||
SingleOutputStreamOperator<CloudBoxDataHistoryEvent> fromSource = env |
|||
.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)), |
|||
"cloudBoxDataHistoryEvent fromSource") |
|||
.filter((FilterFunction<CloudBoxDataHistoryEvent>) value -> { |
|||
String eventKey = value.getEventKey(); |
|||
return StrUtil.isNotEmpty(eventKey) && eventKey.equals("qn_cloud_box_data_history"); |
|||
}); |
|||
|
|||
fromSource.print(); |
|||
|
|||
SingleOutputStreamOperator<Body> flatMap = fromSource |
|||
.flatMap(new RichFlatMapFunction<CloudBoxDataHistoryEvent, Body>() { |
|||
@Override |
|||
public void flatMap(CloudBoxDataHistoryEvent event, Collector<Body> out) { |
|||
String body = event.getBody(); |
|||
if (StrUtil.isNotEmpty(body)) { |
|||
Body bean = JSONUtil.toBean(body, Body.class); |
|||
bean.setCurrentTime(LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli()); |
|||
out.collect(bean); |
|||
} |
|||
} |
|||
}).name("cloudBoxDataHistoryEvent flatmap"); |
|||
SingleOutputStreamOperator<List<Body>> toMysql = flatMap |
|||
.assignTimestampsAndWatermarks(WatermarkStrategy.<Body>forBoundedOutOfOrderness(Duration.ofSeconds(1)) |
|||
.withTimestampAssigner(((body, recordTimestamp) -> body.getCurrentTime()))) |
|||
.keyBy(Body::getData_type) |
|||
.window(TumblingEventTimeWindows.of(Time.seconds(2))) |
|||
.process(new ProcessWindowFunction<Body, List<Body>, Integer, TimeWindow>() { |
|||
@Override |
|||
public void process(Integer aLong, ProcessWindowFunction<Body, List<Body>, Integer, TimeWindow>.Context context, |
|||
Iterable<Body> elements, Collector<List<Body>> out) { |
|||
List<Body> list = ListUtil.toList(elements); |
|||
if (list.size() > 0) { |
|||
out.collect(list); |
|||
} |
|||
} |
|||
}).name("to mysql"); |
|||
|
|||
toMysql.addSink(new SinkMysqlFunc()).name("sink to mysql"); |
|||
|
|||
env.execute("cloud box event job"); |
|||
} |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save