Browse Source

更新

master
1049970895@qniao.cn 3 years ago
parent
commit
bfa6e9d7c6
1 changed files with 3 additions and 12 deletions
  1. 15
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

15
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -42,6 +42,7 @@ import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.io.IOException;
@ -53,17 +54,7 @@ import java.util.stream.Collectors;
public class IotMachineEventGeneratorJob {
/*private final static String SQL = "select qm.id as machine_id, qei.mac as machine_iot_mac, qm.status\n" +
"from (select id, status from qn_machine where is_delete = 0) qm\n" +
" left join (select machine_id, equipment_information_id\n" +
" from qn_machine_binding_cloud_box\n" +
" where is_delete = 0) qmbcb ON qm.id = qmbcb.machine_id\n" +
" left join (select id, mac from qn_equipment_information where is_delete = 0) qei\n" +
" on qei.id = qmbcb.equipment_information_id";*/
private final static String SQL = "select machine_id, iot_mac as machine_iot_mac, status\n" +
"from qn_machine_realtime_state\n" +
"where is_delete = 0";
private final static String SQL = "select machine_id, iot_mac as machine_iot_mac, status from qn_machine_realtime_state where is_delete = 0";
public static void main(String[] args) throws Exception {
@ -75,7 +66,7 @@ public class IotMachineEventGeneratorJob {
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.latest())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
.build();

Loading…
Cancel
Save