Browse Source

更新

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
40004867b9
9 changed files with 367 additions and 152 deletions
  1. 2
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java
  2. 45
      root-cloud-statistics/pom.xml
  3. 38
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/DeviceState.java
  4. 266
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  5. 23
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/command/BaseCommandSerializationSchema.java
  6. 17
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java
  7. 102
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java
  8. 1
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java
  9. 25
      root-cloud-statistics/target/classes/log4j2.properties

2
root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java

@ -48,7 +48,7 @@ public class RootCloudIotDataEventSourceMocker {
private static Properties createKafkaProperties() {
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:9092");
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.19.124.230:9092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
return kafkaProps;

45
root-cloud-statistics/pom.xml

@ -18,11 +18,16 @@ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.qniao</groupId>
<artifactId>java-dependency</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.qniao</groupId>
<artifactId>root-cloud-statistics</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>root-cloud-statistics</name>
@ -83,7 +88,7 @@ under the License.
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<!--<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
@ -92,7 +97,41 @@ under the License.
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.3</version>
</dependency>-->
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-command</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-constant</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.14.5</version>
</dependency>
</dependencies>
<build>

38
root-cloud-statistics/src/main/java/com/qniao/iot/rc/DeviceState.java

@ -0,0 +1,38 @@
package com.qniao.iot.rc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DeviceState {
private Long id;
/**
* 设备物联地址(云盒物理标识)
*/
private Long machineIotMac;
/**
* 状态: 0:关机 1:生产中 2:待机
*/
private Integer status;
/**
* 发生时间
*/
private Long updateTime;
@Override
public String toString() {
return "设备状态:{" +
"id='" + id + '\'' +
", status='" + status +
", updateTime='" + updateTime +
'\'' +
'}';
}
}

266
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -18,23 +18,56 @@
package com.qniao.iot.rc;
import com.qniao.iot.rc.event.MachineIotDataReceivedEvent;
import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema;
import com.fasterxml.jackson.databind.util.JSONPObject;
import com.qniao.domain.BaseCommand;
import com.qniao.iot.machine.command.PowerOffMachineCommand;
import com.qniao.iot.machine.command.PowerOnMachineCommand;
import com.qniao.iot.machine.command.StartMachineWorkingCommand;
import com.qniao.iot.machine.command.StopMachineWorkingCommand;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.rc.command.BaseCommandSerializationSchema;
import com.qniao.iot.rc.constant.DataSource;
import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.tools.json.JSONUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
/**
* Skeleton for a Flink DataStream Job.
@ -53,6 +86,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin
public class RootCloudIotDataFormatterJob {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@ -68,30 +102,204 @@ public class RootCloudIotDataFormatterJob {
// 把树根的数据转成我们自己的格式
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source")
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) MachineIotDataReceivedEvent::transform)
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) RootCloudIotDataFormatterJob::transform)
.name("Transform MachineIotDataReceivedEvent");
// 发送到OSS存储
String outputPath = "oss://qn-flink-test/root-cloud-model-hw-reported-data";
StreamingFileSink<MachineIotDataReceivedEvent> sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<MachineIotDataReceivedEvent>("UTF-8")
).build();
transformDs.addSink(sink);
// 再发送到kafka队列中
transformDs.sinkTo(
KafkaSink.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(params.get("sink.bootstrap.servers"))
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("machine_iot_data_received_event")
.setValueSerializationSchema(new MachineIotDataReceivedEventSerializationSchema())
.build()
).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
).name("MachineIotDataReceivedEvent Sink");
// 设备数据分组
DataStream<BaseCommand> commandDataStream = transformDs.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() {
private ValueState<DeviceState> deviceState;
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
deviceState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("deviceState", DeviceState.class));
}
@Override
public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction<Long,
MachineIotDataReceivedEvent, BaseCommand>.Context ctx, Collector<BaseCommand> out) throws Exception {
System.out.println("收到事件数据-------------------------:" + event);
// 获取最新设备状态
DeviceState lastedDeviceState = deviceState.value();
Integer deviceStatus = getDeviceStatus(event);
DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime());
collDeviceStatusChange(out, newState, null, event);
if (deviceStatus == null) {
out.collect(null);
} else {
if (lastedDeviceState == null) {
// TODO 后续优化先从数据库中获取设备最新的状态
deviceState.update(new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()));
System.out.println("初始化设备状态" + deviceState.value().toString());
} else {
/*DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime());
DeviceState oldState = deviceState.value();
collDeviceStatusChange(out, newState, oldState, event);
deviceState.update(newState);*/
}
}
}
}).name("");
//sinkRabbitMq(commandDataStream);
// 写入es
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("119.23.41.137", 9200, "http"));
ElasticsearchSink.Builder<BaseCommand> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
(ElasticsearchSinkFunction<BaseCommand>) (command, runtimeContext, requestIndexer) -> {
HashMap<String, String> map = new HashMap<>();
if(command instanceof PowerOnMachineCommand) {
PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand)command;
map.put("id", powerOnMachineCommand.getId().toString());
map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString());
map.put("timestamp", powerOnMachineCommand.getTimestamp().toString());
}
if(command instanceof PowerOffMachineCommand) {
PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand)command;
map.put("id", powerOffMachineCommand.getId().toString());
map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString());
map.put("timestamp", powerOffMachineCommand.getTimestamp().toString());
}
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest().index("flink").source(map);
// requestIndexer 发送最后的请求
requestIndexer.add(indexRequest);
}
);
/* 必须设置flush参数 */
//刷新前缓冲的最大动作量
esSinkBuilder.setBulkFlushMaxActions(10);
//刷新前缓冲区的最大数据大小以MB为单位
esSinkBuilder.setBulkFlushMaxSizeMb(5);
//论缓冲操作的数量或大小如何都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(5000L);
esSinkBuilder.setRestClientFactory((RestClientFactory) restClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "qn56521"));
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
httpAsyncClientBuilder.disableAuthCaching();
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
});
//数据流添加sink
commandDataStream.addSink(esSinkBuilder.build());
env.execute("Kafka Job");
}
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) {
// rabbitmq配置测试环境
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("8.135.8.221")
.setVirtualHost("/")
.setUserName("qniao")
.setPassword("Qianniao2020")
.setPort(5672).build();
// 发送相应的指令到rabbitmq的交换机
commandDataStream.addSink(new RMQSink<>(connectionConfig, new BaseCommandSerializationSchema(), new RMQSinkPublishOptions<BaseCommand>() {
@Override
public String computeRoutingKey(BaseCommand command) {
return "flink";
}
@Override
public AMQP.BasicProperties computeProperties(BaseCommand command) {
return null;
}
@Override
public String computeExchange(BaseCommand command) {
// 交换机名称
/*if(command != null) {
System.out.println("发送消息:------------------" + command.toString());
return "flink_test_exchange";
}
return "";*/
System.out.println("发送消息:------------------" + command.toString());
return "flink_test_exchange";
}
})).name("PowerOffMachineCommand Sink");
// 直接发队列
// commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12");
}
private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) {
MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent();
if (Objects.nonNull(event)) {
machineIotDataReceivedEvent.setId((long) (event.get__assetId__() + System.currentTimeMillis()).hashCode());
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__()));
machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD);
machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta());
machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta());
machineIotDataReceivedEvent.setIgStat(event.getIG_sta());
machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total());
machineIotDataReceivedEvent.setCurrJobCount(event.getACC_count());
machineIotDataReceivedEvent.setCurrJobDuration(Objects.isNull(event.getRunning_duration()) ? null : event.getRunning_duration().longValue());
if (StringUtils.isNotBlank(event.getStoping_duration())) {
BigDecimal stoppingDuration = new BigDecimal(event.getStoping_duration());
machineIotDataReceivedEvent.setCurrStoppingDuration(stoppingDuration.longValue());
}
machineIotDataReceivedEvent.setCurrWaitingDuration(Objects.isNull(event.getWaiting_duration()) ? null : event.getWaiting_duration().longValue());
machineIotDataReceivedEvent.setReportTime(System.currentTimeMillis());
}
return machineIotDataReceivedEvent;
}
private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) {
if (event.getMachinePwrStat() == 0) {
return 0;
} else if (event.getMachinePwrStat() == 1 && event.getMachineWorkingStat() == 1) {
return 1;
} else if (event.getMachinePwrStat() == 1 && event.getMachineWorkingStat() == 0) {
return 2;
}
return null;
}
private static void collDeviceStatusChange(Collector<BaseCommand> out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) {
/*if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) {
System.out.println("设备开机。相关事件:" + event.toString());
out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount()));
} else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) {
System.out.println("设备关机。相关事件:" + event.toString());
out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount()));
} else if (oldState.getStatus() == 1 && newState.getStatus() == 2) {
System.out.println("设备开始待机。相关事件:" + event.toString());
out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount()));
} else if (oldState.getStatus() == 2 && newState.getStatus() == 1) {
System.out.println("设备开始工作。相关事件:" + event.toString());
out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount()));
}*/
if (newState.getStatus() == 1 || newState.getStatus() == 2) {
System.out.println("设备开机。相关事件:" + event.toString());
out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount()));
} else if (newState.getStatus() == 0) {
System.out.println("设备关机。相关事件:" + event.toString());
out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount()));
}
}
}

23
root-cloud-statistics/src/main/java/com/qniao/iot/rc/command/BaseCommandSerializationSchema.java

@ -0,0 +1,23 @@
package com.qniao.iot.rc.command;
import com.qniao.domain.BaseCommand;
import com.qniao.iot.machine.command.PowerOffMachineCommand;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
public class BaseCommandSerializationSchema implements SerializationSchema<BaseCommand> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public byte[] serialize(BaseCommand command) {
try {
return OBJECT_MAPPER.writeValueAsBytes(command);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + command, e);
}
}
}

17
root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/DataSource.java

@ -1,17 +0,0 @@
package com.qniao.iot.rc.constant;
/**
* @author Lzk
* @date 2022/7/2
**/
public interface DataSource {
/**
* 树根云
*/
Integer ROOT_CLOUD = 1;
/**
* 机智云
*/
Integer TACT_CLOUD = 0;
}

102
root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEvent.java

@ -1,102 +0,0 @@
package com.qniao.iot.rc.event;
import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent;
import com.qniao.iot.rc.constant.DataSource;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Objects;
/**
* @author Lzk
* @date 2022/7/2
**/
@Data
public class MachineIotDataReceivedEvent implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 唯一标识
*/
private Long id;
/**
* 数据来源
*/
private Integer dataSource;
/**
* 设备物联地址(云盒物理标识)
*/
private Long machineIotMac;
/**
* 机器电源状态
*/
private Integer machinePwrStat;
/**
* 机器工作状态
*/
private Integer machineWorkingStat;
/**
* 累加作业总数
*/
private Long accJobCount;
/**
* 当前作业计数
*/
private Long currJobCount;
/**
* 当前作业时长
*/
private Long currJobDuration;
/**
* 当前待机时长
*/
private Long currWaitingDuration;
/**
* 当前停机时长
*/
private Long currStoppingDuration;
/**
* 计数开关状态
*/
private Integer igStat;
/**
* 数据采样时间
*/
private Long reportTime;
public static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) {
MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent();
if (Objects.nonNull(event)) {
machineIotDataReceivedEvent.setId((long) (event.get__assetId__() + System.currentTimeMillis()).hashCode());
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__()));
machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD);
machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta());
machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta());
machineIotDataReceivedEvent.setIgStat(event.getIG_sta());
machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total());
machineIotDataReceivedEvent.setCurrJobCount(event.getACC_count());
machineIotDataReceivedEvent.setCurrJobDuration(Objects.isNull(event.getRunning_duration()) ? null : event.getRunning_duration().longValue());
if (StringUtils.isNotBlank(event.getStoping_duration())) {
BigDecimal stoppingDuration = new BigDecimal(event.getStoping_duration());
machineIotDataReceivedEvent.setCurrStoppingDuration(stoppingDuration.longValue());
}
machineIotDataReceivedEvent.setCurrWaitingDuration(Objects.isNull(event.getWaiting_duration()) ? null : event.getWaiting_duration().longValue());
machineIotDataReceivedEvent.setReportTime(System.currentTimeMillis());
}
return machineIotDataReceivedEvent;
}
}

1
root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java

@ -1,5 +1,6 @@
package com.qniao.iot.rc.event;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.SerializationSchema;

25
root-cloud-statistics/target/classes/log4j2.properties

@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Loading…
Cancel
Save