diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 99d4d36..89a2ea1 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -24,6 +24,7 @@ import com.qniao.iot.rc.constant.ConfigConstant; import com.qniao.iot.rc.constant.DataSource; import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema; import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; +import com.qniao.iot.rc.until.SnowFlake; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; @@ -59,8 +60,14 @@ import java.util.Objects; */ public class RootCloudIotDataFormatterJob { + static SnowFlake snowflake = new SnowFlake( + Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)), + Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID)) + ); + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); KafkaSource source = KafkaSource.builder() @@ -104,9 +111,10 @@ public class RootCloudIotDataFormatterJob { private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { + MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); if (Objects.nonNull(event)) { - machineIotDataReceivedEvent.setId((long) (event.get__assetId__() + System.currentTimeMillis()).hashCode()); + machineIotDataReceivedEvent.setId(snowflake.nextId()); machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__())); machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta()); diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java index b8878c8..253f9b4 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java @@ -12,5 +12,9 @@ public interface ConfigConstant { String SINK_KAFKA_TOPICS = "sink.kafka.topics"; + String SNOW_FLAKE_DATACENTER_ID = "snow-flake.datacenter.id"; + + String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; + String SINK_OSS_PATH = "sink.oss.path"; } diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/until/SnowFlake.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/until/SnowFlake.java new file mode 100644 index 0000000..1e7085c --- /dev/null +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/until/SnowFlake.java @@ -0,0 +1,103 @@ +package com.qniao.iot.rc.until; + +/** + * @description: Twitter的分布式自增ID雪花算法snowflake + * @author: zp + * @date: 2019-10-29 10:05 + */ +public class SnowFlake { + + /** + * 起始的时间戳 + */ + private final static long START_STMP = 1480166465631L; + + /** + * 每一部分占用的位数 + */ + private final static long SEQUENCE_BIT = 12; //序列号占用的位数 + private final static long MACHINE_BIT = 5; //机器标识占用的位数 + private final static long DATACENTER_BIT = 5;//数据中心占用的位数 + + /** + * 每一部分的最大值 + */ + private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT); + private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT); + private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT); + + /** + * 每一部分向左的位移 + */ + private final static long MACHINE_LEFT = SEQUENCE_BIT; + private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT; + private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT; + + private long datacenterId = 1L; //数据中心 + private long machineId = 1L; //机器标识 + private long sequence = 0L; //序列号 + private long lastStmp = -1L;//上一次时间戳 + +// public SnowFlake(){ +// } + + public SnowFlake(long datacenterId, + long machineId) { + if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) { + throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0"); + } + if (machineId > MAX_MACHINE_NUM || machineId < 0) { + throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0"); + } + this.datacenterId = datacenterId; + this.machineId = machineId; + } + + /** + * 产生下一个ID + * + * @return + */ + public synchronized Long nextId() { + long currStmp = getNewstmp(); + if (currStmp < lastStmp) { + throw new RuntimeException("Clock moved backwards. Refusing to generate id"); + } + + if (currStmp == lastStmp) { + //相同毫秒内,序列号自增 + sequence = (sequence + 1) & MAX_SEQUENCE; + //同一毫秒的序列数已经达到最大 + if (sequence == 0L) { + currStmp = getNextMill(); + } + } else { + //不同毫秒内,序列号置为0 + sequence = 0L; + } + + lastStmp = currStmp; + + return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分 + | datacenterId << DATACENTER_LEFT //数据中心部分 + | machineId << MACHINE_LEFT //机器标识部分 + | sequence; //序列号部分 + } + + private long getNextMill() { + long mill = getNewstmp(); + while (mill <= lastStmp) { + mill = getNewstmp(); + } + return mill; + } + + private long getNewstmp() { + return System.currentTimeMillis(); + } + + public static void main(String[] args) { + SnowFlake s = new SnowFlake(1, 1); + System.out.println(s.nextId()); + } +} \ No newline at end of file