Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
97badf5838
5 changed files with 530 additions and 271 deletions
  1. 2
      pom.xml
  2. 5
      src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java
  3. 137
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
  4. 443
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob_bak.java
  5. 214
      src/main/java/com/qniao/iot/gizwits/TestJob.java

2
pom.xml

@ -165,7 +165,7 @@
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.qniao.iot.gizwits.GizWitsIotMonitoringDataJob</mainClass>
<mainClass>com.qniao.iot.gizwits.IotMonitoringDataJob</mainClass>
</transformer>
</transformers>
</configuration>

5
src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java

@ -37,4 +37,9 @@ public class DeviceTotalData {
* 当前日期
*/
private LocalDate currLocalDate;
/**
* 上次消息事件
*/
private Long lastReportTime;
}

src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java → src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

@ -53,7 +53,7 @@ import java.util.*;
import static java.time.temporal.ChronoUnit.SECONDS;
@Slf4j
public class GizWitsIotMonitoringDataJob {
public class IotMonitoringDataJob {
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
.builder(new HttpHost("120.79.137.137", 9200, "http"))
@ -69,15 +69,15 @@ public class GizWitsIotMonitoringDataJob {
return requestConfigBuilder;
}));
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setBootstrapServers("120.25.199.30:19092")
.setTopics("machine_iot_data_received_event")
.setGroupId("123")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
@ -148,8 +148,12 @@ public class GizWitsIotMonitoringDataJob {
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
Long reportTime = receivedEvent.getReportTime();
Long accJobCount = receivedEvent.getAccJobCount();
// 1树根 0机智云
Integer dataSource = receivedEvent.getDataSource();
// 当前数据
DeviceTotalData nowDeviceState = getDeviceTotalData(receivedEvent);
if (lastedDeviceState == null) {
lastedDeviceState = getDeviceTotalData(receivedEvent);
lastOnData = receivedEvent;
}
if (lastWorkingStat == null) {
@ -161,24 +165,47 @@ public class GizWitsIotMonitoringDataJob {
}
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
Long a;
if (machinePwrStat.equals(0)) {
assert lastedDeviceState != null;
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setCurrLocalDate(localDate);
if (lastOnData != null) {
lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
} else {
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
if (lastWorkingStat == 1) {
// 如果上次是工作中那就进行累加
assert lastedDeviceState != null;
Long lastReportTime = lastedDeviceState.getLastReportTime();
if (lastReportTime == null) {
// 如果上次的消息时间为空那么不进行计算
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal());
} else {
// 直接通过两个消息的时间差进行计算
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
}
if(dataSource == 1) {
// 树根
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
}else {
// 机智云
Long workingJon = accJobCount - lastedDeviceState.getJobTotal();
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
nowDeviceState.setCurrLocalDate(localDate);
if (lastOnData != null) {
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
} else {
nowDeviceState.setLastBootTime(onData.getLastBootTime());
}
deviceTotalDataStat.update(nowDeviceState);
// 如果关机
onDataState.update(nowDeviceState);
lastOffDataState.update(receivedEvent);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
}
deviceTotalDataStat.update(lastedDeviceState);
// 如果关机
onDataState.update(lastedDeviceState);
lastOffDataState.update(receivedEvent);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
} else {
if (lastOffData != null) {
lastOnData = receivedEvent;
@ -186,10 +213,13 @@ public class GizWitsIotMonitoringDataJob {
assert lastedDeviceState != null;
if (machineWorkingStat.equals(1)) {
// 工作中
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setCurrLocalDate(localDate);
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobDuration() + workingDuration);
//nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
nowDeviceState.setCurrLocalDate(localDate);
nowDeviceState.setLastBootTime(onData.getLastBootTime());
if (lastWaitJobData != null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
@ -198,35 +228,30 @@ public class GizWitsIotMonitoringDataJob {
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + a);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + a);
} else {
if(receivedEvent.getDataSource() == 0) {
// 机智云
lastedDeviceState.setTheDayJobDuration(receivedEvent.getCurrJobDuration());
}else {
// 树根
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
}
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
}
deviceTotalDataStat.update(lastedDeviceState);
deviceTotalDataStat.update(nowDeviceState);
}
if (machineWorkingStat.equals(2)) {
// 待机
lastWaitJobDataState.update(receivedEvent);
}
}
if (lastWorkingStat != 1) {
// 如果上次是待机并且这次也是待机那么就不需要发送了
if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(lastedDeviceState.getJobTotal());
data.setCurrJobCount(lastedDeviceState.getTheDayJobCount());
data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
if (lastOnData == null) {
data.setLastBootTime(reportTime * 1000);
@ -240,12 +265,12 @@ public class GizWitsIotMonitoringDataJob {
private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
DeviceTotalData value = deviceTotalDataStat.value();
DeviceTotalData data = new DeviceTotalData();
Long reportTime = event.getReportTime();
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
if (value == null) {
// 从es中获取
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), null);
DeviceTotalData data = new DeviceTotalData();
if (deviceMonitoringData != null) {
data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
data.setJobTotal(deviceMonitoringData.getAccJobCount());
@ -257,10 +282,9 @@ public class GizWitsIotMonitoringDataJob {
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime();
data.setLastBootTime(ldt);
} else {
// es中也没有qn_cloud_box_data索引中
// es中也没有直接从老接口
data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000);
}
deviceTotalDataStat.update(data);
}
// 是否日期是当天的否则需要更新当天工作时长和当天工作量
if (value.getCurrLocalDate().isBefore(localDate)) {
@ -268,7 +292,6 @@ public class GizWitsIotMonitoringDataJob {
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(),
LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond());
if (deviceMonitoringData != null) {
DeviceTotalData data = new DeviceTotalData();
data.setJobTotal(deviceMonitoringData.getAccJobCount());
data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
@ -281,9 +304,10 @@ public class GizWitsIotMonitoringDataJob {
// value有值但是日期不对说明到了第二天那么只需要对当天数据清零即可
value.setTheDayJobDuration(0L);
value.setTheDayJobCount(0L);
data = value;
}
}
return null;
return data;
}
private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) {
@ -295,26 +319,26 @@ public class GizWitsIotMonitoringDataJob {
String result = HttpUtil
.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D");
Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data");
if(data == null) {
if (data == null) {
break;
}
Object records = JSONUtil.getByPath(JSONUtil.parse(result), "records");
if(records == null) {
if (records == null) {
break;
}
JSONArray objects = JSONUtil.parseArray(records);
if(objects.isEmpty()) {
if (objects.isEmpty()) {
break;
}
for (Object o : objects.toArray()) {
Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac");
Long iotMac = (Long)mac;
if(iotMac.equals(machineIotMac)) {
Long iotMac = (Long) mac;
if (iotMac.equals(machineIotMac)) {
deviceTotalData = new DeviceTotalData();
Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal");
deviceTotalData.setJobTotal((Long)productionTotal);
deviceTotalData.setJobTotal((Long) productionTotal);
Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal");
deviceTotalData.setJobDurationTotal(((Long)workTotalTotal) * 3600);
deviceTotalData.setJobDurationTotal(((Long) workTotalTotal) * 3600);
Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime");
LocalDateTime lastBootTime = LocalDateTime
.parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:hh:ss"));
@ -325,7 +349,7 @@ public class GizWitsIotMonitoringDataJob {
}
}
}
if(deviceTotalData == null) {
if (deviceTotalData == null) {
deviceTotalData = new DeviceTotalData();
deviceTotalData.setJobTotal(0L);
deviceTotalData.setJobDurationTotal(0L);
@ -337,7 +361,6 @@ public class GizWitsIotMonitoringDataJob {
}
private String[] getIndicesList() throws IOException {
GetAliasesRequest request = new GetAliasesRequest();
@ -384,6 +407,8 @@ public class GizWitsIotMonitoringDataJob {
// 写入es
sinkEs(machineIotDataReceivedEventDataStream);
env.execute("iot_monitoring_data_job");
}
private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) {

443
src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob_bak.java

@ -0,0 +1,443 @@
//package com.qniao.iot.gizwits;
//
//import cn.hutool.core.bean.BeanUtil;
//import cn.hutool.core.util.ArrayUtil;
//import cn.hutool.http.HttpUtil;
//import cn.hutool.json.JSONArray;
//import cn.hutool.json.JSONUtil;
//import com.qniao.iot.gizwits.config.ApolloConfig;
//import com.qniao.iot.gizwits.constant.ConfigConstant;
//import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
//import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.flink.api.common.eventtime.WatermarkStrategy;
//import org.apache.flink.api.common.state.ValueState;
//import org.apache.flink.api.common.state.ValueStateDescriptor;
//import org.apache.flink.api.common.typeinfo.TypeInformation;
//import org.apache.flink.configuration.Configuration;
//import org.apache.flink.connector.kafka.source.KafkaSource;
//import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
//import org.apache.flink.streaming.api.CheckpointingMode;
//import org.apache.flink.streaming.api.datastream.DataStream;
//import org.apache.flink.streaming.api.datastream.DataStreamSource;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
//import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
//import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
//import org.apache.flink.util.Collector;
//import org.apache.http.HttpHost;
//import org.apache.http.auth.AuthScope;
//import org.apache.http.auth.UsernamePasswordCredentials;
//import org.apache.http.client.CredentialsProvider;
//import org.apache.http.impl.client.BasicCredentialsProvider;
//import org.apache.kafka.clients.consumer.ConsumerConfig;
//import org.apache.kafka.clients.consumer.OffsetResetStrategy;
//import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
//import org.elasticsearch.action.index.IndexRequest;
//import org.elasticsearch.action.search.SearchRequest;
//import org.elasticsearch.action.search.SearchResponse;
//import org.elasticsearch.client.*;
//import org.elasticsearch.cluster.metadata.AliasMetaData;
//import org.elasticsearch.index.query.QueryBuilders;
//import org.elasticsearch.rest.RestStatus;
//import org.elasticsearch.search.SearchHit;
//import org.elasticsearch.search.SearchHits;
//import org.elasticsearch.search.builder.SearchSourceBuilder;
//import org.elasticsearch.search.sort.SortOrder;
//
//import java.io.IOException;
//import java.sql.Date;
//import java.time.*;
//import java.time.format.DateTimeFormatter;
//import java.util.ArrayList;
//import java.util.List;
//import java.util.Map;
//import java.util.Set;
//
//import static java.time.temporal.ChronoUnit.SECONDS;
//
///**
// * 备份代码
// */
//@Slf4j
//public class IotMonitoringDataJob_bak {
//
// private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
// .builder(new HttpHost("120.79.137.137", 9200, "http"))
// .setHttpClientConfigCallback(httpAsyncClientBuilder -> {
// CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY,
// new UsernamePasswordCredentials("elastic", "qnol26215"));
// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// })
// .setRequestConfigCallback(requestConfigBuilder -> {
// // 设置es连接超时时间
// requestConfigBuilder.setConnectTimeout(3000);
// return requestConfigBuilder;
// }));
//
// public static void main(String[] args) throws Exception {
//
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// // 获取设备数据源
// KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
// .setBootstrapServers("120.25.199.30:19092")
// .setTopics("machine_iot_data_received_event")
// .setGroupId("123")
// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
// .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
// .build();
//
// // 设备数据源转换
// DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
// .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
//
// // mac分组并进行工作时长的集合操作
// DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = dataStreamSource
// .keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
// .process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
//
// // 最新的设备数据
// private ValueState<DeviceTotalData> deviceTotalDataStat;
//
// // 开机数据
// private ValueState<DeviceTotalData> onDataState;
//
// // 上次的关机数据
// private ValueState<MachineIotDataReceivedEvent> lastOffDataState;
//
// // 上次的开机数据
// private ValueState<MachineIotDataReceivedEvent> lastOnDataState;
//
// // 当前周期的待机数据
// private ValueState<MachineIotDataReceivedEvent> lastWaitJobDataState;
//
// // 上次的状态
// private ValueState<Integer> lastWorkingStatState;
//
// @Override
// public void open(Configuration parameters) {
//
// // 必须在 open 生命周期初始化
// deviceTotalDataStat = getRuntimeContext()
// .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
//
// onDataState = getRuntimeContext()
// .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class)));
//
// lastOffDataState = getRuntimeContext()
// .getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
//
// lastOnDataState = getRuntimeContext()
// .getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
//
// lastWaitJobDataState = getRuntimeContext()
// .getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
//
// lastWorkingStatState = getRuntimeContext()
// .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
// }
//
// @Override
// public void processElement(MachineIotDataReceivedEvent receivedEvent,
// KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx,
// Collector<DeviceMonitoringData> out) throws Exception {
//
// DeviceTotalData onData = onDataState.value();
// MachineIotDataReceivedEvent lastOffData = lastOffDataState.value();
// MachineIotDataReceivedEvent lastOnData = lastOnDataState.value();
// MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value();
// Integer lastWorkingStat = lastWorkingStatState.value();
// DeviceTotalData lastedDeviceState = deviceTotalDataStat.value();
// Integer machinePwrStat = receivedEvent.getMachinePwrStat();
// Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
// lastWorkingStatState.update(machineWorkingStat);
// Long reportTime = receivedEvent.getReportTime();
// if (lastedDeviceState == null) {
// lastedDeviceState = getDeviceTotalData(receivedEvent);
// lastOnData = receivedEvent;
// }
// if (lastWorkingStat == null) {
// lastWorkingStat = 0;
// }
// if (onData == null) {
// onData = lastedDeviceState;
// onDataState.update(onData);
// }
// LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
// Long a;
// if (machinePwrStat.equals(0)) {
// assert lastedDeviceState != null;
// lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
// lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
// lastedDeviceState.setCurrLocalDate(localDate);
// if (lastOnData != null) {
// lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
// } else {
// lastedDeviceState.setLastBootTime(onData.getLastBootTime());
// }
// deviceTotalDataStat.update(lastedDeviceState);
// // 如果关机
// onDataState.update(lastedDeviceState);
// lastOffDataState.update(receivedEvent);
// // 关机后将待机数据清除
// lastWaitJobDataState.update(null);
// } else {
// if (lastOffData != null) {
// lastOnData = receivedEvent;
// }
// assert lastedDeviceState != null;
// if (machineWorkingStat.equals(1)) {
// // 工作中
// lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
// lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
// lastedDeviceState.setCurrLocalDate(localDate);
// lastedDeviceState.setLastBootTime(onData.getLastBootTime());
// if (lastWaitJobData != null) {
// LocalDateTime localDateTime = LocalDateTime
// .ofInstant(Instant.ofEpochMilli(reportTime * 1000),
// ZoneId.systemDefault());
// LocalDateTime lastWaitJobTime = LocalDateTime
// .ofInstant(Instant.ofEpochMilli(reportTime * 1000),
// ZoneId.systemDefault());
// a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
// } else {
// if(receivedEvent.getDataSource() == 0) {
// // 机智云
// lastedDeviceState.setTheDayJobDuration(receivedEvent.getCurrJobDuration());
// }else {
// // 树根
// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
// }
// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
// }
// deviceTotalDataStat.update(lastedDeviceState);
// }
// if (machineWorkingStat.equals(2)) {
// // 待机
// lastWaitJobDataState.update(receivedEvent);
// }
// }
// if (lastWorkingStat != 1) {
// DeviceMonitoringData data = new DeviceMonitoringData();
// data.setDataSource(receivedEvent.getDataSource());
// data.setMachineIotMac(receivedEvent.getMachineIotMac());
// data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
// data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
// data.setAccJobCount(lastedDeviceState.getJobTotal());
// data.setCurrJobCount(lastedDeviceState.getTheDayJobCount());
// data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
// data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal());
// data.setReportTime(reportTime);
// if (lastOnData == null) {
// data.setLastBootTime(reportTime * 1000);
// } else {
// data.setLastBootTime(lastOnData.getReportTime() * 1000);
// }
// out.collect(data);
// }
// }
//
// private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
//
// DeviceTotalData value = deviceTotalDataStat.value();
// Long reportTime = event.getReportTime();
// LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
// if (value == null) {
// // 从es中获取
// DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), null);
// DeviceTotalData data = new DeviceTotalData();
// if (deviceMonitoringData != null) {
// data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
// data.setJobTotal(deviceMonitoringData.getAccJobCount());
// // 单位秒
// data.setCurrLocalDate(localDate);
// data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
// data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
// LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime())
// .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime();
// data.setLastBootTime(ldt);
// } else {
// // es中也没有qn_cloud_box_data索引中拿
// data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000);
// }
// deviceTotalDataStat.update(data);
// }
// // 是否日期是当天的否则需要更新当天工作时长和当天工作量
// if (value.getCurrLocalDate().isBefore(localDate)) {
// // 先从es中拿昨天最新的
// DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(),
// LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond());
// if (deviceMonitoringData != null) {
// DeviceTotalData data = new DeviceTotalData();
// data.setJobTotal(deviceMonitoringData.getAccJobCount());
// data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
// data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
// data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
// data.setCurrLocalDate(localDate);
// data.setLastBootTime(LocalDateTime.ofInstant(Instant
// .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault()));
// deviceTotalDataStat.update(data);
// } else {
// // value有值但是日期不对说明到了第二天那么只需要对当天数据清零即可
// value.setTheDayJobDuration(0L);
// value.setTheDayJobCount(0L);
// }
// }
// return null;
// }
//
// private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) {
//
// DeviceTotalData deviceTotalData = null;
//
// // 通过http去请求之前的接口拿数据
// for (int i = 1; i <= 20; i++) {
// String result = HttpUtil
// .get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D");
// Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data");
// if(data == null) {
// break;
// }
// Object records = JSONUtil.getByPath(JSONUtil.parse(result), "records");
// if(records == null) {
// break;
// }
// JSONArray objects = JSONUtil.parseArray(records);
// if(objects.isEmpty()) {
// break;
// }
// for (Object o : objects.toArray()) {
// Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac");
// Long iotMac = (Long)mac;
// if(iotMac.equals(machineIotMac)) {
// deviceTotalData = new DeviceTotalData();
// Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal");
// deviceTotalData.setJobTotal((Long)productionTotal);
// Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal");
// deviceTotalData.setJobDurationTotal(((Long)workTotalTotal) * 3600);
// Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime");
// LocalDateTime lastBootTime = LocalDateTime
// .parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:hh:ss"));
// deviceTotalData.setLastBootTime(lastBootTime);
// deviceTotalData.setTheDayJobDuration(0L);
// deviceTotalData.setTheDayJobCount(0L);
// break;
// }
// }
// }
// if(deviceTotalData == null) {
// deviceTotalData = new DeviceTotalData();
// deviceTotalData.setJobTotal(0L);
// deviceTotalData.setJobDurationTotal(0L);
// deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()));
// deviceTotalData.setTheDayJobDuration(0L);
// deviceTotalData.setTheDayJobCount(0L);
// }
// return deviceTotalData;
// }
//
//
//
// private String[] getIndicesList() throws IOException {
//
// GetAliasesRequest request = new GetAliasesRequest();
// GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT);
// Map<String, Set<AliasMetaData>> map = getAliasesResponse.getAliases();
// Set<String> indices = map.keySet();
// List<String> indicesList = new ArrayList<>();
// for (String key : indices) {
// if (key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) {
// indicesList.add(key);
// }
// }
// return ArrayUtil.toArray(indicesList, String.class);
// }
//
// private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac, Long reportTime) {
//
// try {
// // 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
// SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
// if (reportTime != null) {
// searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime").lt(reportTime));
// }
// searchSourceBuilder.sort("reportTime", SortOrder.DESC);
// searchSourceBuilder.size(1);
// // 创建查询请求对象将查询对象配置到其中
// SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData");
// searchRequest.source(searchSourceBuilder);
// // 执行查询然后处理响应结果
// SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// // 根据状态和数据条数验证是否返回了数据
// if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
// SearchHits hits = searchResponse.getHits();
// SearchHit reqHit = hits.getHits()[0];
// return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class);
// }
// } catch (IOException e) {
// log.error("获取es数据异常", e);
// }
// return null;
// }
// }).name("machineIotDataReceivedEventDataStream keyBy stream");
//
// // 写入es
// sinkEs(machineIotDataReceivedEventDataStream);
//
// env.execute("iot_monitoring_data_job");
// }
//
// private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) {
//
// List<HttpHost> httpHosts = new ArrayList<>();
// httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST),
// ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
// ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
// ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
// (ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> {
//
// LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime())
// .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
// String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
// //创建es 请求
// IndexRequest indexRequest = Requests.indexRequest()
// .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix)
// .source(BeanUtil.beanToMap(deviceMonitoringData));
// requestIndexer.add(indexRequest);
// }
// );
// //刷新前缓冲的最大动作量
// esSinkBuilder.setBulkFlushMaxActions(10);
// //刷新前缓冲区的最大数据大小以MB为单位
// esSinkBuilder.setBulkFlushMaxSizeMb(5);
// //无论缓冲操作的数量或大小如何都要刷新的时间间隔
// esSinkBuilder.setBulkFlushInterval(5000L);
// // 客户端创建配置回调配置账号密码
// esSinkBuilder.setRestClientFactory(
// restClientBuilder -> {
// restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
// CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY,
// new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
// ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// });
// restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
// // 设置es连接超时时间
// requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
// return requestConfigBuilder;
// });
// }
// );
// //数据流添加sink
// dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink");
// }
//}

214
src/main/java/com/qniao/iot/gizwits/TestJob.java

@ -1,214 +0,0 @@
/*
package com.qniao.iot.gizwits;
import com.qniao.iot.gizwits.config.ApolloConfig;
import com.qniao.iot.gizwits.constant.ConfigConstant;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.sql.Date;
import java.time.*;
import static java.time.temporal.ChronoUnit.SECONDS;
@Slf4j
public class TestJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers("120.25.199.30:19092")
.setTopics("machine_iot_data_received_event")
.setGroupId("123")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
.build();
// 设备数据源转换
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source")
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getMachinePwrStat() != null && value.getMachineIotMac() != null
&& value.getDataSource() != null && value.getMachineWorkingStat() != null);
streamOperator.print("输出源数据");
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = streamOperator
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
// 最新的设备数据
private ValueState<DeviceTotalData> deviceTotalData;
// 开机数据
private ValueState<DeviceTotalData> onDataState;
// 上次的关机数据
private ValueState<MachineIotDataReceivedEvent> lastOffDataState;
// 上次的开机数据
private ValueState<MachineIotDataReceivedEvent> lastOnDataState;
// 当前周期的待机数据
private ValueState<MachineIotDataReceivedEvent> lastWaitJobDataState;
// 上次的状态
private ValueState<Integer> lastWorkingStatState;
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
deviceTotalData = getRuntimeContext()
.getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
onDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class)));
lastOffDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
lastOnDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
lastWaitJobDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
lastWorkingStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
}
@Override
public void processElement(MachineIotDataReceivedEvent receivedEvent,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx,
Collector<DeviceMonitoringData> out) throws Exception {
DeviceTotalData onData = onDataState.value();
MachineIotDataReceivedEvent lastOffData = lastOffDataState.value();
MachineIotDataReceivedEvent lastOnData = lastOnDataState.value();
MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
DeviceTotalData lastedDeviceState = deviceTotalData.value();
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
Long reportTime = receivedEvent.getReportTime();
if(lastedDeviceState == null) {
lastedDeviceState = new DeviceTotalData();
lastedDeviceState.setJobTotal(0L);
lastedDeviceState.setJobDurationTotal(0L);
lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()));
lastedDeviceState.setTheDayJobDuration(0L);
lastedDeviceState.setTheDayJobCount(0L);
lastOnData = receivedEvent;
}
if(lastWorkingStat == null) {
lastWorkingStat = 0;
}
if (onData == null) {
onData = lastedDeviceState;
onDataState.update(onData);
}
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
Long a;
if (machinePwrStat.equals(0)) {
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setCurrLocalDate(localDate);
if (lastOnData != null) {
lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
} else {
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
}
deviceTotalData.update(lastedDeviceState);
// 如果关机
onDataState.update(lastedDeviceState);
lastOffDataState.update(receivedEvent);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
} else {
if (lastOffData != null) {
lastOnData = receivedEvent;
}
if (machineWorkingStat.equals(1)) {
// 工作中
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setCurrLocalDate(localDate);
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
if (lastWaitJobData != null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
} else {
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
}
deviceTotalData.update(lastedDeviceState);
}
if (machineWorkingStat.equals(2)) {
// 待机
lastWaitJobDataState.update(receivedEvent);
}
}
if(lastWorkingStat != 1) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(lastedDeviceState.getJobTotal());
data.setCurrJobCount(lastedDeviceState.getTheDayJobCount());
data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
if(lastOnData == null) {
data.setLastBootTime(reportTime * 1000);
}else {
data.setLastBootTime(lastOnData.getReportTime() * 1000);
}
out.collect(data);
}
}
}).name("machineIotDataReceivedEventDataStream keyBy stream");
machineIotDataReceivedEventDataStream.print("输出清洗数据");
env.execute();
}
}
*/
Loading…
Cancel
Save