diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 579f41c..0cd4b26 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -42,6 +42,7 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.io.IOException; @@ -53,17 +54,7 @@ import java.util.stream.Collectors; public class IotMachineEventGeneratorJob { - /*private final static String SQL = "select qm.id as machine_id, qei.mac as machine_iot_mac, qm.status\n" + - "from (select id, status from qn_machine where is_delete = 0) qm\n" + - " left join (select machine_id, equipment_information_id\n" + - " from qn_machine_binding_cloud_box\n" + - " where is_delete = 0) qmbcb ON qm.id = qmbcb.machine_id\n" + - " left join (select id, mac from qn_equipment_information where is_delete = 0) qei\n" + - " on qei.id = qmbcb.equipment_information_id";*/ - - private final static String SQL = "select machine_id, iot_mac as machine_iot_mac, status\n" + - "from qn_machine_realtime_state\n" + - "where is_delete = 0"; + private final static String SQL = "select machine_id, iot_mac as machine_iot_mac, status from qn_machine_realtime_state where is_delete = 0"; public static void main(String[] args) throws Exception { @@ -75,7 +66,7 @@ public class IotMachineEventGeneratorJob { .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) - .setStartingOffsets(OffsetsInitializer.latest()) + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build();