From d1fad744aa7b6acc01efcc87a0a1315bec72a18d Mon Sep 17 00:00:00 2001 From: FanOne Date: Sat, 16 Mar 2024 17:46:49 +0800 Subject: [PATCH] =?UTF-8?q?feature=20saga=20=EF=BC=9Asupport=20generate=20?= =?UTF-8?q?id=20by=20Snowflake=20(#670)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../process_ctrl_statemachine_engine.go | 3 +- .../statemachine/engine/sequence/snowflake.go | 116 ++++++++++++++++++ .../engine/sequence/snowflake_test.go | 28 +++++ 3 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 pkg/saga/statemachine/engine/sequence/snowflake.go create mode 100644 pkg/saga/statemachine/engine/sequence/snowflake_test.go diff --git a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go index afec63cbd..aae9fe89a 100644 --- a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go +++ b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go @@ -2,12 +2,13 @@ package engine import ( "context" + "time" + "github.com/pkg/errors" "github.com/seata/seata-go/pkg/saga/statemachine/constant" "github.com/seata/seata-go/pkg/saga/statemachine/engine/events" "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" "github.com/seata/seata-go/pkg/saga/statemachine/statelang" - "time" ) type ProcessCtrlStateMachineEngine struct { diff --git a/pkg/saga/statemachine/engine/sequence/snowflake.go b/pkg/saga/statemachine/engine/sequence/snowflake.go new file mode 100644 index 000000000..f5622b064 --- /dev/null +++ b/pkg/saga/statemachine/engine/sequence/snowflake.go @@ -0,0 +1,116 @@ +package sequence + +import ( + "fmt" + "sync" + "time" + + "github.com/seata/seata-go/pkg/util/log" +) + +// SnowflakeSeqGenerator snowflake gen ids +// ref: https://en.wikipedia.org/wiki/Snowflake_ID + +var ( + // set the beginning time + epoch = time.Date(2024, time.January, 01, 00, 00, 00, 00, time.UTC).UnixMilli() +) + +const ( + // timestamp occupancy bits + timestampBits = 41 + // dataCenterId occupancy bits + dataCenterIdBits = 5 + // workerId occupancy bits + workerIdBits = 5 + // sequence occupancy bits + seqBits = 12 + + // timestamp max value, just like 2^41-1 = 2199023255551 + timestampMaxValue = -1 ^ (-1 << timestampBits) + // dataCenterId max value, just like 2^5-1 = 31 + dataCenterIdMaxValue = -1 ^ (-1 << dataCenterIdBits) + // workId max value, just like 2^5-1 = 31 + workerIdMaxValue = -1 ^ (-1 << workerIdBits) + // sequence max value, just like 2^12-1 = 4095 + seqMaxValue = -1 ^ (-1 << seqBits) + + // number of workId offsets (seqBits) + workIdShift = 12 + // number of dataCenterId offsets (seqBits + workerIdBits) + dataCenterIdShift = 17 + // number of timestamp offsets (seqBits + workerIdBits + dataCenterIdBits) + timestampShift = 22 + + defaultInitValue = 0 +) + +type SnowflakeSeqGenerator struct { + mu *sync.Mutex + timestamp int64 + dataCenterId int64 + workerId int64 + sequence int64 +} + +// NewSnowflakeSeqGenerator initiates the snowflake generator +func NewSnowflakeSeqGenerator(dataCenterId, workId int64) (r *SnowflakeSeqGenerator, err error) { + if dataCenterId < 0 || dataCenterId > dataCenterIdMaxValue { + err = fmt.Errorf("dataCenterId should between 0 and %d", dataCenterIdMaxValue-1) + return + } + + if workId < 0 || workId > workerIdMaxValue { + err = fmt.Errorf("workId should between 0 and %d", dataCenterIdMaxValue-1) + return + } + + return &SnowflakeSeqGenerator{ + mu: new(sync.Mutex), + timestamp: defaultInitValue - 1, + dataCenterId: dataCenterId, + workerId: workId, + sequence: defaultInitValue, + }, nil +} + +// GenerateId timestamp + dataCenterId + workId + sequence +func (S *SnowflakeSeqGenerator) GenerateId(entity string, ruleName string) string { + S.mu.Lock() + defer S.mu.Unlock() + + now := time.Now().UnixMilli() + + if S.timestamp > now { // Clock callback + log.Errorf("Clock moved backwards. Refusing to generate ID, last timestamp is %d, now is %d", S.timestamp, now) + return "" + } + + if S.timestamp == now { + // generate multiple IDs in the same millisecond, incrementing the sequence number to prevent conflicts + S.sequence = (S.sequence + 1) & seqMaxValue + if S.sequence == 0 { + // sequence overflow, waiting for next millisecond + for now <= S.timestamp { + now = time.Now().UnixMilli() + } + } + } else { + // initialized sequences are used directly at different millisecond timestamps + S.sequence = defaultInitValue + } + tmp := now - epoch + if tmp > timestampMaxValue { + log.Errorf("epoch should between 0 and %d", timestampMaxValue-1) + return "" + } + S.timestamp = now + + // combine the parts to generate the final ID and convert the 64-bit binary to decimal digits. + r := (tmp)<