lizhongkang@qniao.cn 3 years ago
parent
commit
f33f4379dc
1 changed files with 14 additions and 20 deletions
  1. 34
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java

34
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java

@ -73,10 +73,7 @@ import java.util.Properties;
*/
public class RootCloudIotDataFormatterJob {
static SnowFlake snowflake = new SnowFlake(
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)),
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID))
);
static SnowFlake snowflake = new SnowFlake(1L, 1L);
public static void main(String[] args) throws Exception {
@ -85,9 +82,9 @@ public class RootCloudIotDataFormatterJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<AIRootCloudWarningDataReceivedEvent> source = KafkaSource.<AIRootCloudWarningDataReceivedEvent>builder()
.setBootstrapServers("kq.qniao.cn:19092")
.setTopics("ai_root_cloud_warning_report_data_event")
.setGroupId("ai_root_cloud_warning_data_etl")
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID))
.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN")
.setProperty("sasl.jaas.config",
@ -112,7 +109,8 @@ public class RootCloudIotDataFormatterJob {
.build();
// 写入Rabbit
transformDs.addSink(new RMQSink<>(connectionConfig, new AIWarningDataReceivedEventSerializationSchema(),
transformDs.addSink(new RMQSink<>(connectionConfig,
new AIWarningDataReceivedEventSerializationSchema(),
new RMQSinkPublishOptions<AIWarningDataReceivedEvent>() {
@Override
@ -130,25 +128,21 @@ public class RootCloudIotDataFormatterJob {
// 交换机名称
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE);
}
})
})).name("AIWarningDataStream to rabbitmq Sink");
).name("AIWarningDataStream to rabbitmq Sink");
// // 发送到OSS存储
// String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH);
// StreamingFileSink<AIWarningDataReceivedEvent> sink = StreamingFileSink.forRowFormat(
// new Path(outputPath),
// new SimpleStringEncoder<AIWarningDataReceivedEvent>("UTF-8")
// ).build();
// transformDs.addSink(sink);
// 发送到OSS存储
String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH);
StreamingFileSink<AIWarningDataReceivedEvent> sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<AIWarningDataReceivedEvent>("UTF-8")
).build();
transformDs.addSink(sink);
env.execute("ai root cloud waring data formatter job");
}
private static AIWarningDataReceivedEvent transform(AIRootCloudWarningDataReceivedEvent event) {
System.out.println("********" + event);
AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent();
if (Objects.nonNull(event) && Objects.nonNull(event.getPayload())) {
aiWaringDataReceivedEvent.setId(snowflake.nextId());

Loading…
Cancel
Save