diff --git a/cloud-box-job/dependency-reduced-pom.xml b/cloud-box-job/dependency-reduced-pom.xml new file mode 100644 index 0000000..0cd2bee --- /dev/null +++ b/cloud-box-job/dependency-reduced-pom.xml @@ -0,0 +1,142 @@ + + + + iot-root-cloud-model-hw-formatter + com.qniao + 0.0.1-SNAPSHOT + + 4.0.0 + cloud-box-job + + new-job + + + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + maven-shade-plugin + 3.1.1 + + + package + + shade + + + + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.qniao.iot.CloudBoxEventJob + + + + + + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.17.2 + runtime + + + org.apache.logging.log4j + log4j-api + 2.17.2 + runtime + + + org.apache.logging.log4j + log4j-core + 2.17.2 + runtime + + + org.apache.flink + flink-table-common + 1.15.0 + provided + + + icu4j + com.ibm.icu + + + + + org.apache.flink + flink-table-runtime + 1.15.0 + provided + + + flink-cep + org.apache.flink + + + + + org.apache.flink + flink-table-planner_2.12 + 1.15.0 + provided + + + commons-compiler + org.codehaus.janino + + + janino + org.codehaus.janino + + + + + org.apache.flink + flink-table-api-java-bridge + 1.15.0 + provided + + + + + maven-releases + Nexus releases Repository + http://120.78.76.88:8081/repository/maven-snapshots/ + + + + 1.8 + 2.17.2 + 1.15.0 + ${target.java.version} + UTF-8 + ${target.java.version} + + diff --git a/cloud-box-job/pom.xml b/cloud-box-job/pom.xml new file mode 100644 index 0000000..b2589cf --- /dev/null +++ b/cloud-box-job/pom.xml @@ -0,0 +1,273 @@ + + + + iot-root-cloud-model-hw-formatter + com.qniao + 0.0.1-SNAPSHOT + + 4.0.0 + + cloud-box-job + + + UTF-8 + 1.15.0 + 1.8 + ${target.java.version} + ${target.java.version} + 2.17.2 + + + + + + com.qniao + root-cloud-event + 0.0.1-SNAPSHOT + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + org.apache.flink + flink-clients + ${flink.version} + + + + org.apache.flink + flink-connector-kafka + ${flink.version} + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + + commons-logging + commons-logging + 1.2 + + + + com.qniao + iot-machine-data-command + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-data-event + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-data-constant + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-state-event-generator-job + 0.0.1-SNAPSHOT + + + + org.apache.flink + flink-connector-rabbitmq_2.12 + 1.14.5 + + + + cn.hutool + hutool-all + 5.8.4 + + + + + com.ctrip.framework.apollo + apollo-client + 2.0.1 + + + com.ctrip.framework.apollo + apollo-core + 2.0.1 + + + + + org.apache.flink + flink-table-common + 1.15.0 + provided + + + + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + + org.apache.flink + flink-table-planner_2.12 + 1.15.0 + provided + + + + + org.apache.flink + flink-table-api-java + 1.15.0 + + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala_2.12 + 1.15.0 + + + + + org.apache.flink + flink-table-api-scala-bridge_2.12 + 1.15.0 + + + + com.alibaba + druid + 1.1.12 + + + + mysql + mysql-connector-java + 8.0.29 + + + + com.alibaba.fastjson2 + fastjson2 + 2.0.7 + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + 7.17.3 + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.qniao.iot.CloudBoxEventJob + + + + + + + + new-job + + + + + maven-releases + Nexus releases Repository + http://120.78.76.88:8081/repository/maven-snapshots/ + + + \ No newline at end of file diff --git a/cloud-box-job/src/main/java/com/qniao/iot/Body.java b/cloud-box-job/src/main/java/com/qniao/iot/Body.java new file mode 100644 index 0000000..7d6dc9f --- /dev/null +++ b/cloud-box-job/src/main/java/com/qniao/iot/Body.java @@ -0,0 +1,43 @@ +package com.qniao.iot; + +import lombok.Data; + +import java.io.Serializable; +import java.math.BigDecimal; + +@Data +public class Body { + + private static final long serialVersionUID = 1L; + + private String create_time; + + private Integer data_source; + + private String data_timestamp; + + // 0开机数据 1关机数据 3生产数据 + private Integer data_type; + + private String docId; + + private Long id; + + private Long mac; + + private Long quantity; + + private BigDecimal runningDuration; + + private BigDecimal runningHour; + + private Integer space_of_time; + + private Long total_production; + + private BigDecimal waitingDuration; + + private BigDecimal waitingHour; + + private Long currentTime; +} diff --git a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEvent.java b/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEvent.java new file mode 100644 index 0000000..57c16aa --- /dev/null +++ b/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEvent.java @@ -0,0 +1,17 @@ +package com.qniao.iot; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import lombok.Data; + +import java.io.Serializable; +import java.math.BigDecimal; + +@Data +public class CloudBoxDataHistoryEvent implements Serializable { + + private static final long serialVersionUID = 1L; + + private String body; + + private String eventKey; +} diff --git a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEventDeserializationSchema.java b/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEventDeserializationSchema.java new file mode 100644 index 0000000..5dce9b2 --- /dev/null +++ b/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEventDeserializationSchema.java @@ -0,0 +1,32 @@ +package com.qniao.iot; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * @author Lzk + */ +public class CloudBoxDataHistoryEventDeserializationSchema implements DeserializationSchema { + /** + * 注册JavaTimeModule,支持LocalDateTime字段的解析 + */ + final private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public CloudBoxDataHistoryEvent deserialize(byte[] message) throws IOException { + return objectMapper.readValue(message, CloudBoxDataHistoryEvent.class); + } + + @Override + public boolean isEndOfStream(CloudBoxDataHistoryEvent nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(CloudBoxDataHistoryEvent.class); + } +} diff --git a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob.java b/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob.java new file mode 100644 index 0000000..3197ae5 --- /dev/null +++ b/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob.java @@ -0,0 +1,107 @@ +package com.qniao.iot; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; +import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.util.Collector; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.*; + + +public class CloudBoxEventJob { + + public static void main(String[] args) throws Exception { + + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + env.getConfig().setAutoWatermarkInterval(1000L); + env.setParallelism(1); + /*Map offsets = new HashMap<>(); + TopicPartition topicPartition = new TopicPartition("data-message-channel-qn", 0); + offsets.put(topicPartition, 5872534L);*/ + KafkaSource source = KafkaSource.builder() + .setBootstrapServers("172.19.14.225:9092") + .setTopics("data-message-channel-qn") + .setGroupId("cloud_box_event_job") + .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new CloudBoxDataHistoryEventDeserializationSchema()) + .build(); + + SingleOutputStreamOperator fromSource = env + .fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)), + "cloudBoxDataHistoryEvent fromSource") + .filter((FilterFunction) value -> { + String eventKey = value.getEventKey(); + return StrUtil.isNotEmpty(eventKey) && eventKey.equals("qn_cloud_box_data_history"); + }); + + fromSource.print(); + + SingleOutputStreamOperator flatMap = fromSource + .flatMap(new RichFlatMapFunction() { + @Override + public void flatMap(CloudBoxDataHistoryEvent event, Collector out) { + String body = event.getBody(); + if (StrUtil.isNotEmpty(body)) { + Body bean = JSONUtil.toBean(body, Body.class); + bean.setCurrentTime(LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli()); + out.collect(bean); + } + } + }).name("cloudBoxDataHistoryEvent flatmap"); + SingleOutputStreamOperator> toMysql = flatMap + .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)) + .withTimestampAssigner(((body, recordTimestamp) -> body.getCurrentTime()))) + .keyBy(Body::getData_type) + .window(TumblingEventTimeWindows.of(Time.seconds(2))) + .process(new ProcessWindowFunction, Integer, TimeWindow>() { + @Override + public void process(Integer aLong, ProcessWindowFunction, Integer, TimeWindow>.Context context, + Iterable elements, Collector> out) { + List list = ListUtil.toList(elements); + if (list.size() > 0) { + out.collect(list); + } + } + }).name("to mysql"); + + toMysql.addSink(new SinkMysqlFunc()).name("sink to mysql"); + + env.execute("cloud box event job"); + } +} diff --git a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java b/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java new file mode 100644 index 0000000..bcaee02 --- /dev/null +++ b/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java @@ -0,0 +1,61 @@ +package com.qniao.iot; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.util.FileUtils; + +import java.io.File; +import java.util.List; +import java.util.Map; + + +public class CloudBoxEventJob1 { + + private static EsRestClientService esRestClientService = new EsRestClientService(); + + public static void main(String[] args) throws Exception { + + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // 查询数据searchResponse + String scrollId = null; + DataSet> dataSet = null; + List> dataList = null; + + int count = 0; + + while (!"none".equals(scrollId)) { + + Map map = esRestClientService.queryDeviceListPage(scrollId); + if (map.get("tupleList") instanceof List) + dataList = (List>) map.get("tupleList"); + scrollId = map.get("scrollId").toString(); + + if (dataList == null || dataList.size() < 10000 || count > 3) + break; + + // 导入数据 + DataSet> dataSetTemp = env.fromCollection(dataList); + if (dataSet == null) { + dataSet = dataSetTemp; + } else { + dataSet = dataSet.union(dataSetTemp); + } + ++count; + } + // 分组计算规则 + dataSet = dataSet.groupBy(0).sum(2); + + + //dataSet.print(); + + String output = "C:\\Users\\10499\\Downloads\\1223.txt"; + FileUtils.deleteFileOrDirectory(new File(output)); + dataSet.writeAsText(output); + + env.execute("read es"); + } +} diff --git a/cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java b/cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java new file mode 100644 index 0000000..c01b64f --- /dev/null +++ b/cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java @@ -0,0 +1,45 @@ +package com.qniao.iot; + +import com.alibaba.druid.pool.DruidDataSource; + +public class DruidDataSourceUtil { + + public static DruidDataSource getInstance(){ + + DruidDataSource dataSource = new DruidDataSource(); + //设置连接参数 + dataSource.setUrl("jdbc:mysql://rm-wz9it4fs5tk7n4tm1zo.mysql.rds.aliyuncs.com:3306/cloud_print_cloud_factory?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=GMT%2B8&useSSL=false"); + dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver"); + dataSource.setUsername("qn_cloudprint"); + dataSource.setPassword("qncloudprint5682"); + + //配置初始化大小、最小、最大 + dataSource.setInitialSize(2); + dataSource.setMinIdle(2); + dataSource.setMaxActive(100); + + dataSource.setRemoveAbandoned(true); + //超时时间;单位为秒。180秒=3分钟 + dataSource.setRemoveAbandonedTimeout(180); + + //配置一个连接在池中最小生存的时间,单位是毫秒 + dataSource.setMinEvictableIdleTimeMillis(300000); + //配置获取连接等待超时的时间单位毫秒 + dataSource.setMaxWait(60000); + //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + dataSource.setTimeBetweenEvictionRunsMillis(60000); + //防止过期 + dataSource.setValidationQuery("SELECT 'x'"); + dataSource.setTestWhileIdle(true); + dataSource.setTestOnBorrow(false); + dataSource.setTestOnReturn(false); + + //是否缓存preparedStatement + dataSource.setPoolPreparedStatements(false); + dataSource.setMaxOpenPreparedStatements(100); + //asyncInit是1.1.4中新增加的配置,如果有initialSize数量较多时,打开会加快应用启动时间 + dataSource.setAsyncInit(true); + return dataSource; + } + +} diff --git a/cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java b/cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java new file mode 100644 index 0000000..4321632 --- /dev/null +++ b/cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java @@ -0,0 +1,126 @@ +package com.qniao.iot; + +import com.alibaba.fastjson2.JSONObject; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.elasticsearch.action.search.*; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * 阿里云服务器搭建的ES服务 + * + * @author lizixian + * @date 2020/3/16 10:41 + */ +public class EsRestClientService { + + private String host = "120.79.137.137:9200"; + private String scheme = "http"; + private String index = "qn_cloud_box_data_history"; + private RestClientBuilder builder = null; + private RestHighLevelClient client = null; + + public void init() { + String[] nodeIpInfos = host.split(":"); + builder = RestClient.builder(new HttpHost(nodeIpInfos[0], Integer.parseInt(nodeIpInfos[1]), scheme)) + .setRequestConfigCallback(requestConfigBuilder -> { + requestConfigBuilder.setConnectTimeout(10 * 60 * 1000); + requestConfigBuilder.setSocketTimeout(10 * 60 * 1000); + requestConfigBuilder.setConnectionRequestTimeout(10 * 60 * 1000); + return requestConfigBuilder; + }); + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qnol26215")); + builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider)); + client = new RestHighLevelClient(builder); + } + + /** + * 分页查询应设备应用安装列表-使用游标 + * + * @author lizixian + * @date 2020/5/10 18:01 + */ + public Map queryDeviceListPage(String scrollId) { + String brand = "CH"; + + //设置查询数量 + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + + sourceBuilder.size(10000); + BoolQueryBuilder bool = QueryBuilders.boolQuery(); + + // 平台 +// bool.must(QueryBuilders.termQuery("brand", brand)); + + sourceBuilder.query(bool);//查询条件 + return queryDeviceListPageResult(sourceBuilder, scrollId); + } + + private Map queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, String scrollId) { + SearchRequest searchRequest = new SearchRequest(index) + .scroll("2m") + .source(sourceBuilder); + if (client == null) { + init(); + } + Map resultMap = new HashMap<>(5); + List> tupleList = new ArrayList<>(); + try { + SearchResponse response = null; + + if (scrollId != null) { + SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll("5m"); + response = client.scroll(scrollRequest, RequestOptions.DEFAULT); + } else { + response = client.search(searchRequest, RequestOptions.DEFAULT); + } + + int s = response.status().getStatus(); + if (s == RestStatus.OK.getStatus()) { + SearchHit[] hits = response.getHits().getHits(); + scrollId = response.getScrollId(); + System.out.println("*********************查询es结果"); + if (hits != null) { + for (SearchHit hit : hits) { + System.out.println("*********************查询es结果:" + hit.getSourceAsString()); + JSONObject json = JSONObject.parseObject(hit.getSourceAsString()); + tupleList.add(new Tuple3<>(json.getString("mac"), json.getString("data_source"), 1)); + } + } + } else { + //清除滚屏 + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId);//也可以选择setScrollIds()将多个scrollId一起使用 + ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + boolean succeeded = clearScrollResponse.isSucceeded(); + } + resultMap.put("scrollId", scrollId); + resultMap.put("tupleList", tupleList); + } catch (IOException e) { + e.printStackTrace(); + } + return resultMap; + } +} \ No newline at end of file diff --git a/cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java b/cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java new file mode 100644 index 0000000..7c02dad --- /dev/null +++ b/cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java @@ -0,0 +1,97 @@ +package com.qniao.iot; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.util.StrUtil; +import com.alibaba.druid.pool.DruidDataSource; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.text.SimpleDateFormat; +import java.util.List; + +public class SinkMysqlFunc extends RichSinkFunction> { + + private DruidDataSource dataSource = null; + + /** + * 初始化方法 在invoke前执行 + * + * @param parameters + * @throws Exception + */ + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + // 获取数据库连接池配置 此处省略 + if (dataSource == null) { + dataSource = DruidDataSourceUtil.getInstance(); + } + } + + @Override + public void invoke(List values, Context context) throws Exception { + + if (values.size() != 0) { + Connection connection = dataSource.getConnection(); + PreparedStatement ps = connection.prepareStatement(getSql()); + for (Body body : values) { + if (body != null) { + ps.setInt(1, body.getData_source()); + ps.setLong(2, body.getMac()); + Integer dataType = body.getData_type(); + ps.setInt(3, dataType == 1 ? 0 : 1); + ps.setInt(4, dataType == 2 ? 1 : 0); + Long totalProduction = body.getTotal_production(); + ps.setLong(5, totalProduction == null ? 0 : totalProduction); + Long quantity = body.getQuantity(); + ps.setLong(6, quantity == null ? 0L : quantity); + ps.setBigDecimal(7, body.getRunningDuration()); + ps.setBigDecimal(8, body.getWaitingDuration()); + ps.setBigDecimal(9, null); + ps.setBigDecimal(10, null); + String createTimeStr = body.getCreate_time(); + Date createDate = null; + if (StrUtil.isNotEmpty(createTimeStr)) { + long time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(createTimeStr).getTime(); + createDate = new Date(time); + } + ps.setDate(11, createDate); + Long id = body.getId(); + ps.setString(12, id == null ? "0" : StrUtil.toString(id)); + ps.setString(13, body.getDocId()); + ps.addBatch(); + } + } + ps.executeBatch(); + if (connection != null) { + connection.close(); + } + if (ps != null) { + ps.close(); + } + } + } + + @Override + public void close() throws Exception { + super.close(); + //关闭连接和释放资源 + if (dataSource != null) { + dataSource.close(); + } + } + + private String getSql() { + + return "insert into qn_cloud_box_event(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" + + " curr_job_count, curr_job_duration, curr_waiting_duration, curr_stoping_duration, ig_stat,\n" + + " report_time,event_id, doc_id)\n" + + "values (?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?)"; + } + +} diff --git a/cloud-box-job/src/main/resources/log4j2.properties b/cloud-box-job/src/main/resources/log4j2.properties new file mode 100644 index 0000000..32c696e --- /dev/null +++ b/cloud-box-job/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/cloud-box-job/src/test/java/Test.java b/cloud-box-job/src/test/java/Test.java new file mode 100644 index 0000000..d20a176 --- /dev/null +++ b/cloud-box-job/src/test/java/Test.java @@ -0,0 +1,12 @@ +import cn.hutool.json.JSONUtil; +import com.qniao.iot.Body; +import org.apache.kafka.common.protocol.types.Field; + +public class Test { + + public static void main(String[] args) { + + String s = "{\"create_time\":\"2022-07-12 12:56:15\",\"data_source\":0,\"data_timestamp\":\"2022-07-12 12:56:15\",\"data_type\":2,\"docId\":\"86119304081409802286021474\",\"id\":744217778243899392,\"mac\":861193040814098,\"quantity\":28,\"space_of_time\":60,\"total_production\":21474}"; + System.out.println(JSONUtil.toBean(s, Body.class)); + } +} diff --git a/pom.xml b/pom.xml index ebe32b0..1ff66bf 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,7 @@ under the License. root-cloud-event root-cloud-mocker root-cloud-statistics + cloud-box-job pom diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/Body.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/Body.java new file mode 100644 index 0000000..94eaafd --- /dev/null +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/Body.java @@ -0,0 +1,43 @@ +package com.qniao.iot.rc; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import lombok.Data; + +import java.io.Serializable; +import java.math.BigDecimal; + +@Data +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class Body implements Serializable { + + private static final long serialVersionUID = 1L; + + private String createTime; + + private Integer dataSource; + + private String dataTimestamp; + + // 0开机数据 1关机数据 3生产数据 + private Integer dataType; + + private String docId; + + private Long id; + + private Long mac; + + private Long quantity; + + private BigDecimal runningDuration; + + private BigDecimal runningHour; + + private Integer spaceOfTime; + + private Long totalProduction; + + private BigDecimal waitingDuration; + + private BigDecimal waitingHour; +} diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxDataHistoryEvent.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxDataHistoryEvent.java new file mode 100644 index 0000000..a5cf154 --- /dev/null +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxDataHistoryEvent.java @@ -0,0 +1,15 @@ +package com.qniao.iot.rc; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class CloudBoxDataHistoryEvent implements Serializable { + + private static final long serialVersionUID = 1L; + + private Body body; + + private String eventKey; +} diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxEventSerialization.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxEventSerialization.java new file mode 100644 index 0000000..305e1f5 --- /dev/null +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxEventSerialization.java @@ -0,0 +1,27 @@ +package com.qniao.iot.rc; + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.Nullable; + +public class CloudBoxEventSerialization { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private final String topic; + + public CloudBoxEventSerialization(String topic) { + this.topic = topic; + } + + public ProducerRecord serialize( + final CloudBoxDataHistoryEvent message, @Nullable final Long timestamp) { + try { + //if topic is null, default topic will be used + return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message)); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + message, e); + } + } +} diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxEventSourceMocker.java new file mode 100644 index 0000000..f305896 --- /dev/null +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxEventSourceMocker.java @@ -0,0 +1,95 @@ +package com.qniao.iot.rc; + +import cn.hutool.core.util.RandomUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.db.Db; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Future; + +public class CloudBoxEventSourceMocker { + // 延迟:毫秒 + public static final long DELAY = 500; + + public static void main(String[] args) throws Exception { + // 创建kafka配置属性 + Properties kafkaProps = createKafkaProperties(); + + // 创建Kafka消息的生产者 + KafkaProducer producer = new KafkaProducer<>(kafkaProps); + + String topic = "data-message-channel-qn"; + + // 循环发送事件 + while (true) { + + CloudBoxDataHistoryEvent event = new CloudBoxDataHistoryEvent(); + Body body = new Body(); + + body.setDataSource(1); + body.setCreateTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"))); + body.setDataTimestamp(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"))); + body.setId(RandomUtil.randomLong(99999999)); + body.setMac(RandomUtil.randomLong(9999999999999999L)); + body.setDataType(1); + body.setQuantity(RandomUtil.randomLong(9999)); + body.setRunningDuration(RandomUtil.randomBigDecimal(BigDecimal.valueOf(500))); + body.setRunningHour(RandomUtil.randomBigDecimal(BigDecimal.valueOf(500))); + body.setTotalProduction(RandomUtil.randomLong(999999)); + event.setBody(body); + ProducerRecord record = new CloudBoxEventSerialization(topic).serialize( + event, + null); + + Future send = producer.send(record); + System.out.println(send.get()); + + Thread.sleep(DELAY); + } + } + + private static Integer generateWorkingSta(Integer workingSta, Integer pwrSta) { + + if(pwrSta.equals(0)) { + return 0; + }else { + return workingSta; + } + } + + private static Properties createKafkaProperties() { + Properties kafkaProps = new Properties(); + // 本地环境 + //kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SASL_PLAINTEXT://localhost:9093"); + // 测试环境 + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.29.115.145:9092"); + // 正式环境 + //kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); + kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + + /*kafkaProps.put(ProducerConfig.ACKS_CONFIG, "1"); + // 添加认证配置 + kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + kafkaProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + kafkaProps.put("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); + +*/ + + return kafkaProps; + } +} diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index 18c58fc..d23d198 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -2,9 +2,12 @@ package com.qniao.iot.rc; import cn.hutool.core.util.RandomUtil; import cn.hutool.db.Db; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -12,6 +15,7 @@ import java.math.BigDecimal; import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.Future; public class RootCloudIotDataEventSourceMocker { // 延迟:毫秒 @@ -58,7 +62,8 @@ public class RootCloudIotDataEventSourceMocker { event, null); - producer.send(record); + Future send = producer.send(record); + System.out.println(send.get()); Thread.sleep(DELAY); } @@ -75,12 +80,24 @@ public class RootCloudIotDataEventSourceMocker { private static Properties createKafkaProperties() { Properties kafkaProps = new Properties(); + // 本地环境 + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SASL_PLAINTEXT://localhost:9093"); // 测试环境 - //kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); + //kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.29.115.145:9092"); // 正式环境 - kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); + //kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + + kafkaProps.put(ProducerConfig.ACKS_CONFIG, "1"); + // 添加认证配置 + kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + kafkaProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + kafkaProps.put("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); + + + return kafkaProps; } } diff --git a/root-cloud-statistics/src/main/resources/META-INF/app.properties b/root-cloud-statistics/src/main/resources/META-INF/app.properties index 313a4c2..8eb297e 100644 --- a/root-cloud-statistics/src/main/resources/META-INF/app.properties +++ b/root-cloud-statistics/src/main/resources/META-INF/app.properties @@ -1,5 +1,5 @@ app.id=root-cloud-model-hw-formatter -# ???? 8.135.8.221 -# ???? 47.112.164.224 +# test 8.135.8.221 +# prod 47.112.164.224 apollo.meta=http://47.112.164.224:5000 \ No newline at end of file