Skip to content

Commit 5fab82b

Browse files
authored
Merge pull request #51 from hrko/main
Enhance locking with atomic acquisition and refresh
2 parents faad0c5 + 530d8ae commit 5fab82b

File tree

3 files changed

+284
-38
lines changed

3 files changed

+284
-38
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.39.6
1010
github.com/caddyserver/caddy/v2 v2.8.1
1111
github.com/caddyserver/certmagic v0.21.2
12+
github.com/google/uuid v1.6.0
1213
)
1314

1415
require (
@@ -30,7 +31,6 @@ require (
3031
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3132
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
3233
github.com/google/pprof v0.0.0-20240528025155-186aa0362fba // indirect
33-
github.com/google/uuid v1.6.0 // indirect
3434
github.com/jmespath/go-jmespath v0.4.0 // indirect
3535
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
3636
github.com/libdns/libdns v0.2.2 // indirect

storage.go

Lines changed: 170 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ import (
77
"fmt"
88
"io/fs"
99
"log"
10+
"sync"
1011
"time"
1112

13+
"github.com/google/uuid"
14+
1215
"github.com/aws/aws-sdk-go-v2/aws"
1316
"github.com/aws/aws-sdk-go-v2/config"
1417
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
@@ -23,6 +26,8 @@ const (
2326
contentsAttribute = "Contents"
2427
primaryKeyAttribute = "PrimaryKey"
2528
lastUpdatedAttribute = "LastUpdated"
29+
lockIdAttribute = "LockID"
30+
expiresAtAttribute = "ExpiresAt"
2631
lockTimeoutMinutes = caddy.Duration(5 * time.Minute)
2732
lockPollingInterval = caddy.Duration(5 * time.Second)
2833
)
@@ -62,6 +67,18 @@ type Storage struct {
6267

6368
// LockPollingInterval - [optional] how often to check for lock released. Default: 5 seconds
6469
LockPollingInterval caddy.Duration `json:"lock_polling_interval,omitempty"`
70+
71+
// LockRefreshInterval - [optional] how often to refresh the lock. Default: LockTimeout / 3
72+
LockRefreshInterval caddy.Duration `json:"lock_refresh_interval,omitempty"`
73+
74+
locks *sync.Map // map[string]*LockHandle
75+
}
76+
77+
// LockHandle holds the information of a lock
78+
type LockHandle struct {
79+
Key string
80+
LockID string // UUID to identify the lock
81+
cancelFunc context.CancelFunc // Function to cancel periodic refresh
6582
}
6683

6784
// initConfig initializes configuration for table name and AWS client
@@ -76,6 +93,9 @@ func (s *Storage) initConfig(ctx context.Context) error {
7693
if s.LockPollingInterval == 0 {
7794
s.LockPollingInterval = lockPollingInterval
7895
}
96+
if s.LockRefreshInterval == 0 {
97+
s.LockRefreshInterval = s.LockTimeout / 3
98+
}
7999

80100
// Initialize AWS Client if needed
81101
if s.Client == nil {
@@ -92,6 +112,11 @@ func (s *Storage) initConfig(ctx context.Context) error {
92112
o.EndpointOptions.DisableHTTPS = s.AwsDisableSSL
93113
})
94114
}
115+
116+
if s.locks == nil {
117+
s.locks = &sync.Map{}
118+
}
119+
95120
return nil
96121
}
97122

@@ -245,6 +270,11 @@ func (s *Storage) Stat(ctx context.Context, key string) (certmagic.KeyInfo, erro
245270
}, nil
246271
}
247272

273+
// generateLockItemPrimaryKey generates the primary key for dynamodb lock item
274+
func generateLockItemPrimaryKey(key string) string {
275+
return fmt.Sprintf("LOCK-%s", key)
276+
}
277+
248278
// Lock acquires the lock for key, blocking until the lock
249279
// can be obtained or an error is returned. Note that, even
250280
// after acquiring a lock, an idempotent operation may have
@@ -267,43 +297,124 @@ func (s *Storage) Lock(ctx context.Context, key string) error {
267297
return err
268298
}
269299

270-
lockKey := fmt.Sprintf("LOCK-%s", key)
300+
lockKey := generateLockItemPrimaryKey(key)
301+
lockID := uuid.NewString()
302+
expiresAt := time.Now().Add(time.Duration(s.LockTimeout)).Unix()
271303

272-
// Check for existing lock
273304
for {
274-
existing, err := s.getItem(ctx, lockKey)
275-
isErrNotExists := errors.Is(err, fs.ErrNotExist)
276-
if err != nil && !isErrNotExists {
277-
return err
305+
// Acquire lock if it doesn't exist or expired
306+
input := &dynamodb.PutItemInput{
307+
TableName: aws.String(s.Table),
308+
Item: map[string]types.AttributeValue{
309+
primaryKeyAttribute: &types.AttributeValueMemberS{Value: lockKey},
310+
lockIdAttribute: &types.AttributeValueMemberS{Value: lockID},
311+
expiresAtAttribute: &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", expiresAt)},
312+
},
313+
ConditionExpression: aws.String(fmt.Sprintf("attribute_not_exists(%s) OR %s < :now", primaryKeyAttribute, expiresAtAttribute)),
314+
ExpressionAttributeValues: map[string]types.AttributeValue{
315+
":now": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", time.Now().Unix())},
316+
},
278317
}
279318

280-
// if lock doesn't exist or is empty, break to create a new one
281-
if isErrNotExists || existing.Contents == "" {
282-
break
283-
}
319+
_, err := s.Client.PutItem(ctx, input)
284320

285-
// Lock exists, check if expired or sleep 5 seconds and check again
286-
expires, err := time.Parse(time.RFC3339, existing.Contents)
287-
if err != nil {
288-
return err
289-
}
290-
if time.Now().After(expires) {
291-
if err := s.Unlock(ctx, key); err != nil {
292-
return err
321+
// Lock acquired successfully
322+
if err == nil {
323+
lockCtx, cancel := context.WithCancel(ctx)
324+
lockHandle := &LockHandle{
325+
Key: key,
326+
LockID: lockID,
327+
cancelFunc: cancel,
293328
}
294-
break
329+
s.locks.Store(key, lockHandle)
330+
331+
// Start periodic refresh
332+
go s.keepLockFresh(lockCtx, lockHandle)
333+
334+
return nil
295335
}
296336

337+
// Lock not acquired, retry
297338
select {
298339
case <-time.After(time.Duration(s.LockPollingInterval)):
340+
continue
299341
case <-ctx.Done():
300342
return ctx.Err()
301343
}
302344
}
345+
}
346+
347+
// keepLockFresh periodically updates the lock expiration
348+
// to prevent it from expiring while the critical section
349+
// is still running.
350+
func (s *Storage) keepLockFresh(ctx context.Context, handle *LockHandle) {
351+
ticker := time.NewTicker(time.Duration(s.LockRefreshInterval))
352+
defer ticker.Stop()
353+
354+
for {
355+
select {
356+
case <-ticker.C:
357+
timeout := time.Duration(s.LockRefreshInterval) / 2
358+
err := s.updateLockExpiration(ctx, handle, timeout)
359+
if err != nil {
360+
// The critical section should be aborted if lock refresh fails.
361+
// However, there is no way to notify the critical section to abort,
362+
// so we just log the error and stop refreshing the lock.
363+
log.Printf("failed to update lock expiration for key %s: %v", handle.Key, err)
364+
return
365+
}
366+
case <-ctx.Done():
367+
return // Unlock or external cancellation
368+
}
369+
}
370+
}
371+
372+
// updateLockExpiration updates the lock expiration atomically.
373+
func (s *Storage) updateLockExpiration(ctx context.Context, handle *LockHandle, timeout time.Duration) error {
374+
lockKey := generateLockItemPrimaryKey(handle.Key)
375+
newExpiresAt := time.Now().Add(time.Duration(s.LockTimeout)).Unix()
376+
377+
// Check LockID in ConditionExpression and update only the lock created by itself
378+
input := &dynamodb.UpdateItemInput{
379+
TableName: aws.String(s.Table),
380+
Key: map[string]types.AttributeValue{
381+
primaryKeyAttribute: &types.AttributeValueMemberS{Value: lockKey},
382+
},
383+
UpdateExpression: aws.String(fmt.Sprintf("SET %s = :newExpiresAt", expiresAtAttribute)),
384+
ConditionExpression: aws.String(fmt.Sprintf("%s = :lockID", lockIdAttribute)),
385+
ExpressionAttributeValues: map[string]types.AttributeValue{
386+
":newExpiresAt": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", newExpiresAt)},
387+
":lockID": &types.AttributeValueMemberS{Value: handle.LockID},
388+
},
389+
}
303390

304-
// lock doesn't exist, create it
305-
contents := []byte(time.Now().Add(time.Duration(s.LockTimeout)).Format(time.RFC3339))
306-
return s.Store(ctx, lockKey, contents)
391+
timeoutTimer := time.NewTimer(timeout)
392+
defer timeoutTimer.Stop()
393+
for {
394+
_, err := s.Client.UpdateItem(ctx, input)
395+
if err == nil {
396+
return nil
397+
}
398+
399+
// Do not retry if lock deleted or acquired by another process
400+
var ccfe *types.ConditionalCheckFailedException
401+
if errors.As(err, &ccfe) {
402+
return fmt.Errorf("failed to update lock expiration: lock may have been deleted or updated by another process")
403+
}
404+
405+
// Retry in case of network error or other transient issues
406+
delay := min(timeout/10, time.Second) // delay should be smaller enough than timeout
407+
delayTimer := time.NewTimer(delay)
408+
defer delayTimer.Stop()
409+
select {
410+
case <-delayTimer.C:
411+
continue
412+
case <-timeoutTimer.C:
413+
return fmt.Errorf("failed to update lock expiration: timeout")
414+
case <-ctx.Done():
415+
return nil
416+
}
417+
}
307418
}
308419

309420
// Unlock releases the lock for key. This method must ONLY be
@@ -315,9 +426,44 @@ func (s *Storage) Unlock(ctx context.Context, key string) error {
315426
return err
316427
}
317428

318-
lockKey := fmt.Sprintf("LOCK-%s", key)
429+
lockKey := generateLockItemPrimaryKey(key)
430+
431+
handle, ok := s.locks.LoadAndDelete(key)
432+
if !ok {
433+
// this line is not reached in normal operation, but it's here for safety
434+
return nil
435+
}
436+
lockHandle, _ := handle.(*LockHandle)
437+
438+
// Stop periodic refresh of lock expiration
439+
lockHandle.cancelFunc()
319440

320-
return s.Delete(ctx, lockKey)
441+
// Delete lock only if it was created by itself
442+
input := &dynamodb.DeleteItemInput{
443+
TableName: aws.String(s.Table),
444+
Key: map[string]types.AttributeValue{
445+
primaryKeyAttribute: &types.AttributeValueMemberS{Value: lockKey},
446+
},
447+
ConditionExpression: aws.String(fmt.Sprintf("%s = :lockID", lockIdAttribute)),
448+
ExpressionAttributeValues: map[string]types.AttributeValue{
449+
":lockID": &types.AttributeValueMemberS{Value: lockHandle.LockID},
450+
},
451+
}
452+
453+
_, err := s.Client.DeleteItem(ctx, input)
454+
455+
if err != nil {
456+
var ccfe *types.ConditionalCheckFailedException
457+
if errors.As(err, &ccfe) {
458+
// Lock already deleted or updated by another process, so this process is not the owner anymore.
459+
// This should not be considered an error according to the `Unlock` interface definition.
460+
// ref. https://github.com/caddyserver/certmagic/blob/2134b61d5db3cf61d9255725219ab6591541c19f/storage.go#L147-L148
461+
return nil
462+
}
463+
return err
464+
}
465+
466+
return nil
321467
}
322468

323469
func (s *Storage) getItem(ctx context.Context, key string) (Item, error) {

0 commit comments

Comments
 (0)