diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java index 2c2c5c8..e296ef7 100644 --- a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java @@ -33,7 +33,7 @@ public class MachineIotDataReceivedEvent implements Serializable { private Integer machinePwrStat; /** - * 机器工作状态(0未工作 1工作中) + * 机器工作状态(0未工作 1工作中 2待机中) */ private Integer machineWorkingStat; diff --git a/iot-machine-state-event-generator-job/pom.xml b/iot-machine-state-event-generator-job/pom.xml index 44bf105..bb2b07f 100644 --- a/iot-machine-state-event-generator-job/pom.xml +++ b/iot-machine-state-event-generator-job/pom.xml @@ -107,6 +107,18 @@ mysql mysql-connector-java + + + + com.ctrip.framework.apollo + apollo-client + 2.0.1 + + + com.ctrip.framework.apollo + apollo-core + 2.0.1 + diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java new file mode 100644 index 0000000..57ad6de --- /dev/null +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java @@ -0,0 +1,24 @@ +package com.qniao.iot.machine.event.generator.config; + +import com.ctrip.framework.apollo.Config; +import com.ctrip.framework.apollo.ConfigService; + +public class ApolloConfig { + + private static final Config config = ConfigService.getAppConfig(); + + public static String get(String key, String defaultValue) { + + return config.getProperty(key, defaultValue); + } + + public static String get(String key) { + + return config.getProperty(key, null); + } + + public static Integer getInt(String key) { + + return config.getIntProperty(key, null); + } +} diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java new file mode 100644 index 0000000..85f2366 --- /dev/null +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java @@ -0,0 +1,38 @@ +package com.qniao.iot.machine.event.generator.constant; + +public interface ConfigConstant { + + String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers"; + + String SOURCE_KAFKA_TOPICS = "source.kafka.topics"; + + String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId"; + + String SINK_RABBITMQ_HOST = "sink.rabbitmq.host"; + + String SINK_RABBITMQ_PORT = "sink.rabbitmq.port"; + + String SINK_RABBITMQ_VIRTUAL_HOST = "sink.rabbitmq.virtualHost"; + + String SINK_RABBITMQ_USER_NAME = "sink.rabbitmq.userName"; + + String SINK_RABBITMQ_PASSWORD = "sink.rabbitmq.password"; + + String SINK_RABBITMQ_ROUTING_KEY = "sink.rabbitmq.routingKey"; + + String SINK_RABBITMQ_EXCHANGE = "sink.rabbitmq.exchange"; + + String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host"; + + String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post"; + + String SINK_ELASTICSEARCH_USER_NAME = "sink.elasticsearch.userName"; + + String SINK_ELASTICSEARCH_PASSWORD = "sink.elasticsearch.password"; + + String SINK_ELASTICSEARCH_CONNECT_TIMEOUT = "sink.elasticsearch.connectTimeout"; + + String SINK_ELASTICSEARCH_SCHEME = "sink.elasticsearch.scheme"; + + String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index"; +} diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index af6fc91..1833a7a 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -1,6 +1,8 @@ package com.qniao.iot.machine.event.generator.job; +import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.db.Db; import cn.hutool.json.JSON; @@ -13,18 +15,19 @@ import com.qniao.iot.machine.command.StopMachineWorkingCommand; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; +import com.qniao.iot.machine.event.generator.config.ApolloConfig; +import com.qniao.iot.machine.event.generator.constant.ConfigConstant; import com.rabbitmq.client.AMQP; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; @@ -41,8 +44,6 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; -import javax.sql.DataSource; -import java.io.IOException; import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; @@ -59,23 +60,22 @@ public class IotMachineEventGeneratorJob { public static void main(String[] args) throws Exception { - - final ParameterTool params = ParameterTool.fromArgs(args); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); KafkaSource source = KafkaSource.builder() - .setBootstrapServers(params.get("source.bootstrap.servers")) - .setTopics("machine_iot_data_received_event") - .setGroupId("root_cloud_iot_data_etl") + .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) + .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) + .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build(); - // 设备数据分组 - DataStream commandDataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source") - .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) + DataStreamSource dataStreamSource = env + .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); + + + DataStream commandDataStream = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { private ValueState deviceState; @@ -93,7 +93,8 @@ public class IotMachineEventGeneratorJob { deviceState -> deviceState, (deviceState1, deviceState2) -> deviceState1)); } deviceState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap))); + .getState(new ValueStateDescriptor<>("deviceState", + TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap))); } @Override @@ -102,13 +103,15 @@ public class IotMachineEventGeneratorJob { // 获取最新设备状态 JSON deviceStateListJson = deviceState.value(); - DeviceState lastedDeviceState = deviceStateListJson.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); + DeviceState lastedDeviceState = deviceStateListJson + .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); Integer deviceStatus = getDeviceStatus(event); if (deviceStatus != null) { deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); if (lastedDeviceState != null) { - DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); + DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), + deviceStatus, event.getReportTime()); collDeviceStatusChange(out, newState, lastedDeviceState, event); } deviceState.update(deviceStateListJson); @@ -121,7 +124,7 @@ public class IotMachineEventGeneratorJob { sinkRabbitMq(commandDataStream); // 写入es - sinkEs(commandDataStream); + sinkEs(dataStreamSource); env.execute("Kafka Job"); } @@ -131,72 +134,48 @@ public class IotMachineEventGeneratorJob { // rabbitmq配置(测试环境) RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() - .setHost("8.135.8.221") - .setVirtualHost("/") - .setUserName("qniao") - .setPassword("Qianniao2020") - .setPort(5672).build(); + .setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST)) + .setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST)) + .setUserName(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_USER_NAME)) + .setPassword(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_PASSWORD)) + .setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)).build(); // 发送相应的指令到rabbitmq的交换机 commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), new RMQSinkPublishOptions() { - @Override - public String computeRoutingKey(BaseCommand command) { - return "machine-iot-data-received-event"; - } - - @Override - public AMQP.BasicProperties computeProperties(BaseCommand command) { - return null; - } + @Override + public String computeRoutingKey(BaseCommand command) { + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_ROUTING_KEY); + } - @Override - public String computeExchange(BaseCommand command) { + @Override + public AMQP.BasicProperties computeProperties(BaseCommand command) { + return null; + } - // 交换机名称 - return "flink_test_exchange"; - } - })).name("commandDataStream to rabbitmq Sink"); + @Override + public String computeExchange(BaseCommand command) { - // 直接发队列 - // commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12"); + // 交换机名称 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE); + } + })).name("commandDataStream to rabbitmq Sink"); } - private static void sinkEs(DataStream commandDataStream) { + private static void sinkEs(DataStreamSource dataStream) { List httpHosts = new ArrayList<>(); - httpHosts.add(new HttpHost("119.23.41.137", 9200, "http")); - ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, - (ElasticsearchSinkFunction) (command, runtimeContext, requestIndexer) -> { - - HashMap map = new HashMap<>(); - if (command instanceof PowerOnMachineCommand) { - // 设备开机数据 - PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand) command; - map.put("id", powerOnMachineCommand.getId().toString()); - map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString()); - } - if (command instanceof PowerOffMachineCommand) { - // 设备关机数据 - PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand) command; - map.put("id", powerOffMachineCommand.getId().toString()); - map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString()); - } - if (command instanceof StopMachineWorkingCommand) { - // 设备待机数据 - StopMachineWorkingCommand stopMachineWorkingCommand = (StopMachineWorkingCommand) command; - map.put("id", stopMachineWorkingCommand.getId().toString()); - map.put("currTotalOutput", stopMachineWorkingCommand.getCurrTotalOutput().toString()); - } - if (command instanceof StartMachineWorkingCommand) { - // 设备工作数据 - StartMachineWorkingCommand startMachineWorkingCommand = (StartMachineWorkingCommand) command; - map.put("id", startMachineWorkingCommand.getId().toString()); - map.put("currTotalOutput", startMachineWorkingCommand.getCurrTotalOutput().toString()); - } + httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST), + ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), + ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); + ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, + (ElasticsearchSinkFunction) (machineIotDataReceivedEvent, runtimeContext, requestIndexer) -> { + //创建es 请求 - IndexRequest indexRequest = Requests.indexRequest().index("machine-iot-data-received-event").source(map); + IndexRequest indexRequest = Requests.indexRequest() + .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX)) + .source(BeanUtil.beanToMap(machineIotDataReceivedEvent)); requestIndexer.add(indexRequest); } ); @@ -204,25 +183,27 @@ public class IotMachineEventGeneratorJob { esSinkBuilder.setBulkFlushMaxActions(10); //刷新前缓冲区的最大数据大小(以MB为单位) esSinkBuilder.setBulkFlushMaxSizeMb(5); - //论缓冲操作的数量或大小如何都要刷新的时间间隔 + //无论缓冲操作的数量或大小如何,都要刷新的时间间隔 esSinkBuilder.setBulkFlushInterval(5000L); // 客户端创建配置回调,配置账号密码 esSinkBuilder.setRestClientFactory( restClientBuilder -> { restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qn56521")); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), + ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }); restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { // 设置es连接超时时间 - requestConfigBuilder.setConnectTimeout(3000); + requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); return requestConfigBuilder; }); } ); //数据流添加sink - commandDataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); + dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); } @@ -235,7 +216,7 @@ public class IotMachineEventGeneratorJob { && Integer.valueOf("1").equals(event.getMachineWorkingStat())) { return 1; } else if (Integer.valueOf("1").equals(event.getMachinePwrStat()) - && Integer.valueOf("0").equals(event.getMachineWorkingStat())) { + && Integer.valueOf("2").equals(event.getMachineWorkingStat())) { return 2; } return null; @@ -256,14 +237,5 @@ public class IotMachineEventGeneratorJob { // 设备开始工作 out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); } - - - /*if (newState.getStatus() == 1 || newState.getStatus() == 2) { - System.out.println("设备开机。相关事件:" + event.toString()); - out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); - } else if (newState.getStatus() == 0) { - System.out.println("设备关机。相关事件:" + event.toString()); - out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); - }*/ } } diff --git a/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties b/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties new file mode 100644 index 0000000..ef428d4 --- /dev/null +++ b/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties @@ -0,0 +1,3 @@ +app.id=machine-state-event-generator + +apollo.meta=http://8.135.8.221:5000 \ No newline at end of file