diff --git a/go.mod b/go.mod index f5c189893f..69695fa178 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/go-git/go-billy/v5 v5.5.0 github.com/go-git/go-git/v5 v5.12.0 github.com/go-ldap/ldap/v3 v3.4.6 + github.com/go-redsync/redsync/v4 v4.13.0 github.com/go-sql-driver/mysql v1.8.1 github.com/go-swagger/go-swagger v0.31.0 github.com/go-testfixtures/testfixtures/v3 v3.11.0 @@ -218,7 +219,9 @@ require ( github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/securecookie v1.1.2 // indirect github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-retryablehttp v0.7.7 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.16 // indirect diff --git a/go.sum b/go.sum index f1780fada7..510ef8479d 100644 --- a/go.sum +++ b/go.sum @@ -342,6 +342,14 @@ github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ github.com/go-openapi/validate v0.24.0 h1:LdfDKwNbpB6Vn40xhTdNZAnfLECL81w+VX3BumrGD58= github.com/go-openapi/validate v0.24.0/go.mod h1:iyeX1sEufmv3nPbBdX3ieNviWnOZaJ1+zquzJEf2BAQ= github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI= +github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redsync/redsync/v4 v4.13.0 h1:49X6GJfnbLGaIpBBREM/zA4uIMDXKAh1NDkvQ1EkZKA= +github.com/go-redsync/redsync/v4 v4.13.0/go.mod h1:HMW4Q224GZQz6x1Xc7040Yfgacukdzu7ifTDAKiyErQ= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= @@ -397,6 +405,8 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= +github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -449,10 +459,15 @@ github.com/h2non/gock v1.2.0 h1:K6ol8rfrRkUOefooBC8elXoaNGYkpp7y2qcxGG6BzUE= github.com/h2non/gock v1.2.0/go.mod h1:tNhoxHYW2W42cYkYb1WqzdbYIieALC99kpYr7rH/BQk= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= @@ -674,6 +689,8 @@ github.com/quasoft/websspi v1.1.2/go.mod h1:HmVdl939dQ0WIXZhyik+ARdI03M6bQzaSEKc github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.6.0 h1:NLck+Rab3AOTHw21CGRpvQpgTrAU4sgdCswqGtlhGRA= github.com/redis/go-redis/v9 v9.6.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo= +github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rhysd/actionlint v1.7.1 h1:WJaDzyT1StBWVKGSsZPYnbV0HF9Y9/vD6KFdZQL42qE= @@ -765,6 +782,8 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= diff --git a/modules/globallock/globallock.go b/modules/globallock/globallock.go new file mode 100644 index 0000000000..707d169f05 --- /dev/null +++ b/modules/globallock/globallock.go @@ -0,0 +1,66 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package globallock + +import ( + "context" + "sync" +) + +var ( + defaultLocker Locker + initOnce sync.Once + initFunc = func() { + // TODO: read the setting and initialize the default locker. + // Before implementing this, don't use it. + } // define initFunc as a variable to make it possible to change it in tests +) + +// DefaultLocker returns the default locker. +func DefaultLocker() Locker { + initOnce.Do(func() { + initFunc() + }) + return defaultLocker +} + +// Lock tries to acquire a lock for the given key, it uses the default locker. +// Read the documentation of Locker.Lock for more information about the behavior. +func Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) { + return DefaultLocker().Lock(ctx, key) +} + +// TryLock tries to acquire a lock for the given key, it uses the default locker. +// Read the documentation of Locker.TryLock for more information about the behavior. +func TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) { + return DefaultLocker().TryLock(ctx, key) +} + +// LockAndDo tries to acquire a lock for the given key and then calls the given function. +// It uses the default locker, and it will return an error if failed to acquire the lock. +func LockAndDo(ctx context.Context, key string, f func(context.Context) error) error { + ctx, release, err := Lock(ctx, key) + if err != nil { + return err + } + defer release() + + return f(ctx) +} + +// TryLockAndDo tries to acquire a lock for the given key and then calls the given function. +// It uses the default locker, and it will return false if failed to acquire the lock. +func TryLockAndDo(ctx context.Context, key string, f func(context.Context) error) (bool, error) { + ok, ctx, release, err := TryLock(ctx, key) + if err != nil { + return false, err + } + defer release() + + if !ok { + return false, nil + } + + return true, f(ctx) +} diff --git a/modules/globallock/globallock_test.go b/modules/globallock/globallock_test.go new file mode 100644 index 0000000000..88a555c86f --- /dev/null +++ b/modules/globallock/globallock_test.go @@ -0,0 +1,96 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package globallock + +import ( + "context" + "os" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLockAndDo(t *testing.T) { + t.Run("redis", func(t *testing.T) { + url := "redis://127.0.0.1:6379/0" + if os.Getenv("CI") == "" { + // Make it possible to run tests against a local redis instance + url = os.Getenv("TEST_REDIS_URL") + if url == "" { + t.Skip("TEST_REDIS_URL not set and not running in CI") + return + } + } + + oldDefaultLocker := defaultLocker + oldInitFunc := initFunc + defer func() { + defaultLocker = oldDefaultLocker + initFunc = oldInitFunc + if defaultLocker == nil { + initOnce = sync.Once{} + } + }() + + initOnce = sync.Once{} + initFunc = func() { + defaultLocker = NewRedisLocker(url) + } + + testLockAndDo(t) + require.NoError(t, defaultLocker.(*redisLocker).Close()) + }) + t.Run("memory", func(t *testing.T) { + oldDefaultLocker := defaultLocker + oldInitFunc := initFunc + defer func() { + defaultLocker = oldDefaultLocker + initFunc = oldInitFunc + if defaultLocker == nil { + initOnce = sync.Once{} + } + }() + + initOnce = sync.Once{} + initFunc = func() { + defaultLocker = NewMemoryLocker() + } + + testLockAndDo(t) + }) +} + +func testLockAndDo(t *testing.T) { + const concurrency = 1000 + + ctx := context.Background() + count := 0 + wg := sync.WaitGroup{} + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + err := LockAndDo(ctx, "test", func(ctx context.Context) error { + count++ + + // It's impossible to acquire the lock inner the function + ok, err := TryLockAndDo(ctx, "test", func(ctx context.Context) error { + assert.Fail(t, "should not acquire the lock") + return nil + }) + assert.False(t, ok) + assert.NoError(t, err) + + return nil + }) + require.NoError(t, err) + }() + } + + wg.Wait() + + assert.Equal(t, concurrency, count) +} diff --git a/modules/globallock/locker.go b/modules/globallock/locker.go new file mode 100644 index 0000000000..b0764cd71c --- /dev/null +++ b/modules/globallock/locker.go @@ -0,0 +1,60 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package globallock + +import ( + "context" + "fmt" +) + +type Locker interface { + // Lock tries to acquire a lock for the given key, it blocks until the lock is acquired or the context is canceled. + // + // Lock returns a new context which should be used in the following code. + // The new context will be canceled when the lock is released or lost - yes, it's possible to lose a lock. + // For example, it lost the connection to the redis server while holding the lock. + // If it fails to acquire the lock, the returned context will be the same as the input context. + // + // Lock returns a ReleaseFunc to release the lock, it cannot be nil. + // It's always safe to call this function even if it fails to acquire the lock, and it will do nothing in that case. + // And it's also safe to call it multiple times, but it will only release the lock once. + // That's why it's called ReleaseFunc, not UnlockFunc. + // But be aware that it's not safe to not call it at all; it could lead to a memory leak. + // So a recommended pattern is to use defer to call it: + // ctx, release, err := locker.Lock(ctx, "key") + // if err != nil { + // return err + // } + // defer release() + // The ReleaseFunc will return the original context which was used to acquire the lock. + // It's useful when you want to continue to do something after releasing the lock. + // At that time, the ctx will be canceled, and you can use the returned context by the ReleaseFunc to continue: + // ctx, release, err := locker.Lock(ctx, "key") + // if err != nil { + // return err + // } + // defer release() + // doSomething(ctx) + // ctx = release() + // doSomethingElse(ctx) + // Please ignore it and use `defer release()` instead if you don't need this, to avoid forgetting to release the lock. + // + // Lock returns an error if failed to acquire the lock. + // Be aware that even the context is not canceled, it's still possible to fail to acquire the lock. + // For example, redis is down, or it reached the maximum number of tries. + Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) + + // TryLock tries to acquire a lock for the given key, it returns immediately. + // It follows the same pattern as Lock, but it doesn't block. + // And if it fails to acquire the lock because it's already locked, not other reasons like redis is down, + // it will return false without any error. + TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) +} + +// ReleaseFunc is a function that releases a lock. +// It returns the original context which was used to acquire the lock. +type ReleaseFunc func() context.Context + +// ErrLockReleased is used as context cause when a lock is released +var ErrLockReleased = fmt.Errorf("lock released") diff --git a/modules/globallock/locker_test.go b/modules/globallock/locker_test.go new file mode 100644 index 0000000000..15a3c65bb0 --- /dev/null +++ b/modules/globallock/locker_test.go @@ -0,0 +1,211 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package globallock + +import ( + "context" + "os" + "sync" + "testing" + "time" + + "github.com/go-redsync/redsync/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLocker(t *testing.T) { + t.Run("redis", func(t *testing.T) { + url := "redis://127.0.0.1:6379/0" + if os.Getenv("CI") == "" { + // Make it possible to run tests against a local redis instance + url = os.Getenv("TEST_REDIS_URL") + if url == "" { + t.Skip("TEST_REDIS_URL not set and not running in CI") + return + } + } + oldExpiry := redisLockExpiry + redisLockExpiry = 5 * time.Second // make it shorter for testing + defer func() { + redisLockExpiry = oldExpiry + }() + + locker := NewRedisLocker(url) + testLocker(t, locker) + testRedisLocker(t, locker.(*redisLocker)) + require.NoError(t, locker.(*redisLocker).Close()) + }) + t.Run("memory", func(t *testing.T) { + locker := NewMemoryLocker() + testLocker(t, locker) + testMemoryLocker(t, locker.(*memoryLocker)) + }) +} + +func testLocker(t *testing.T, locker Locker) { + t.Run("lock", func(t *testing.T) { + parentCtx := context.Background() + ctx, release, err := locker.Lock(parentCtx, "test") + defer release() + + assert.NotEqual(t, parentCtx, ctx) // new context should be returned + assert.NoError(t, err) + + func() { + parentCtx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + ctx, release, err := locker.Lock(parentCtx, "test") + defer release() + + assert.Error(t, err) + assert.Equal(t, parentCtx, ctx) // should return the same context + }() + + release() + assert.Error(t, ctx.Err()) + + func() { + _, release, err := locker.Lock(context.Background(), "test") + defer release() + + assert.NoError(t, err) + }() + }) + + t.Run("try lock", func(t *testing.T) { + parentCtx := context.Background() + ok, ctx, release, err := locker.TryLock(parentCtx, "test") + defer release() + + assert.True(t, ok) + assert.NotEqual(t, parentCtx, ctx) // new context should be returned + assert.NoError(t, err) + + func() { + parentCtx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + ok, ctx, release, err := locker.TryLock(parentCtx, "test") + defer release() + + assert.False(t, ok) + assert.NoError(t, err) + assert.Equal(t, parentCtx, ctx) // should return the same context + }() + + release() + assert.Error(t, ctx.Err()) + + func() { + ok, _, release, _ := locker.TryLock(context.Background(), "test") + defer release() + + assert.True(t, ok) + }() + }) + + t.Run("wait and acquired", func(t *testing.T) { + ctx := context.Background() + _, release, err := locker.Lock(ctx, "test") + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + started := time.Now() + _, release, err := locker.Lock(context.Background(), "test") // should be blocked for seconds + defer release() + assert.Greater(t, time.Since(started), time.Second) + assert.NoError(t, err) + }() + + time.Sleep(2 * time.Second) + release() + + wg.Wait() + }) + + t.Run("continue after release", func(t *testing.T) { + ctx := context.Background() + + ctxBeforeLock := ctx + ctx, release, err := locker.Lock(ctx, "test") + + require.NoError(t, err) + assert.NoError(t, ctx.Err()) + assert.NotEqual(t, ctxBeforeLock, ctx) + + ctxBeforeRelease := ctx + ctx = release() + + assert.NoError(t, ctx.Err()) + assert.Error(t, ctxBeforeRelease.Err()) + + // so it can continue with ctx to do more work + }) + + t.Run("multiple release", func(t *testing.T) { + ctx := context.Background() + + _, release1, err := locker.Lock(ctx, "test") + require.NoError(t, err) + + release1() + + _, release2, err := locker.Lock(ctx, "test") + defer release2() + require.NoError(t, err) + + // Call release1 again, + // it should not panic or block, + // and it shouldn't affect the other lock + release1() + + ok, _, release3, err := locker.TryLock(ctx, "test") + defer release3() + require.NoError(t, err) + // It should be able to acquire the lock; + // otherwise, it means the lock has been released by release1 + assert.False(t, ok) + }) +} + +// testMemoryLocker does specific tests for memoryLocker +func testMemoryLocker(t *testing.T, locker *memoryLocker) { + // nothing to do +} + +// testRedisLocker does specific tests for redisLocker +func testRedisLocker(t *testing.T, locker *redisLocker) { + defer func() { + // This case should be tested at the end. + // Otherwise, it will affect other tests. + t.Run("close", func(t *testing.T) { + assert.NoError(t, locker.Close()) + _, _, err := locker.Lock(context.Background(), "test") + assert.Error(t, err) + }) + }() + + t.Run("failed extend", func(t *testing.T) { + ctx, release, err := locker.Lock(context.Background(), "test") + defer release() + require.NoError(t, err) + + // It simulates that there are some problems with extending like network issues or redis server down. + v, ok := locker.mutexM.Load("test") + require.True(t, ok) + m := v.(*redisMutex) + _, _ = m.mutex.Unlock() // release it to make it impossible to extend + + select { + case <-time.After(redisLockExpiry + time.Second): + t.Errorf("lock should be expired") + case <-ctx.Done(): + var errTaken *redsync.ErrTaken + assert.ErrorAs(t, context.Cause(ctx), &errTaken) + } + }) +} diff --git a/modules/globallock/memory_locker.go b/modules/globallock/memory_locker.go new file mode 100644 index 0000000000..fb1fc79bd0 --- /dev/null +++ b/modules/globallock/memory_locker.go @@ -0,0 +1,80 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package globallock + +import ( + "context" + "sync" + "time" +) + +type memoryLocker struct { + locks sync.Map +} + +var _ Locker = &memoryLocker{} + +func NewMemoryLocker() Locker { + return &memoryLocker{} +} + +func (l *memoryLocker) Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) { + originalCtx := ctx + + if l.tryLock(key) { + ctx, cancel := context.WithCancelCause(ctx) + releaseOnce := sync.Once{} + return ctx, func() context.Context { + releaseOnce.Do(func() { + l.locks.Delete(key) + cancel(ErrLockReleased) + }) + return originalCtx + }, nil + } + + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx, func() context.Context { return originalCtx }, ctx.Err() + case <-ticker.C: + if l.tryLock(key) { + ctx, cancel := context.WithCancelCause(ctx) + releaseOnce := sync.Once{} + return ctx, func() context.Context { + releaseOnce.Do(func() { + l.locks.Delete(key) + cancel(ErrLockReleased) + }) + return originalCtx + }, nil + } + } + } +} + +func (l *memoryLocker) TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) { + originalCtx := ctx + + if l.tryLock(key) { + ctx, cancel := context.WithCancelCause(ctx) + releaseOnce := sync.Once{} + return true, ctx, func() context.Context { + releaseOnce.Do(func() { + cancel(ErrLockReleased) + l.locks.Delete(key) + }) + return originalCtx + }, nil + } + + return false, ctx, func() context.Context { return originalCtx }, nil +} + +func (l *memoryLocker) tryLock(key string) bool { + _, loaded := l.locks.LoadOrStore(key, struct{}{}) + return !loaded +} diff --git a/modules/globallock/redis_locker.go b/modules/globallock/redis_locker.go new file mode 100644 index 0000000000..34b2fabfb3 --- /dev/null +++ b/modules/globallock/redis_locker.go @@ -0,0 +1,154 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package globallock + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "code.gitea.io/gitea/modules/nosql" + + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v9" +) + +const redisLockKeyPrefix = "gitea:globallock:" + +// redisLockExpiry is the default expiry time for a lock. +// Define it as a variable to make it possible to change it in tests. +var redisLockExpiry = 30 * time.Second + +type redisLocker struct { + rs *redsync.Redsync + + mutexM sync.Map + closed atomic.Bool + extendWg sync.WaitGroup +} + +var _ Locker = &redisLocker{} + +func NewRedisLocker(connection string) Locker { + l := &redisLocker{ + rs: redsync.New( + goredis.NewPool( + nosql.GetManager().GetRedisClient(connection), + ), + ), + } + + l.extendWg.Add(1) + l.startExtend() + + return l +} + +func (l *redisLocker) Lock(ctx context.Context, key string) (context.Context, ReleaseFunc, error) { + return l.lock(ctx, key, 0) +} + +func (l *redisLocker) TryLock(ctx context.Context, key string) (bool, context.Context, ReleaseFunc, error) { + ctx, f, err := l.lock(ctx, key, 1) + + var ( + errTaken *redsync.ErrTaken + errNodeTaken *redsync.ErrNodeTaken + ) + if errors.As(err, &errTaken) || errors.As(err, &errNodeTaken) { + return false, ctx, f, nil + } + return err == nil, ctx, f, err +} + +// Close closes the locker. +// It will stop extending the locks and refuse to acquire new locks. +// In actual use, it is not necessary to call this function. +// But it's useful in tests to release resources. +// It could take some time since it waits for the extending goroutine to finish. +func (l *redisLocker) Close() error { + l.closed.Store(true) + l.extendWg.Wait() + return nil +} + +type redisMutex struct { + mutex *redsync.Mutex + cancel context.CancelCauseFunc +} + +func (l *redisLocker) lock(ctx context.Context, key string, tries int) (context.Context, ReleaseFunc, error) { + if l.closed.Load() { + return ctx, func() context.Context { return ctx }, fmt.Errorf("locker is closed") + } + + originalCtx := ctx + + options := []redsync.Option{ + redsync.WithExpiry(redisLockExpiry), + } + if tries > 0 { + options = append(options, redsync.WithTries(tries)) + } + mutex := l.rs.NewMutex(redisLockKeyPrefix+key, options...) + if err := mutex.LockContext(ctx); err != nil { + return ctx, func() context.Context { return originalCtx }, err + } + + ctx, cancel := context.WithCancelCause(ctx) + + l.mutexM.Store(key, &redisMutex{ + mutex: mutex, + cancel: cancel, + }) + + releaseOnce := sync.Once{} + return ctx, func() context.Context { + releaseOnce.Do(func() { + l.mutexM.Delete(key) + + // It's safe to ignore the error here, + // if it failed to unlock, it will be released automatically after the lock expires. + // Do not call mutex.UnlockContext(ctx) here, or it will fail to release when ctx has timed out. + _, _ = mutex.Unlock() + + cancel(ErrLockReleased) + }) + return originalCtx + }, nil +} + +func (l *redisLocker) startExtend() { + if l.closed.Load() { + l.extendWg.Done() + return + } + + toExtend := make([]*redisMutex, 0) + l.mutexM.Range(func(_, value any) bool { + m := value.(*redisMutex) + + // Extend the lock if it is not expired. + // Although the mutex will be removed from the map before it is released, + // it still can be expired because of a failed extension. + // If it happens, the cancel function should have been called, + // so it does not need to be extended anymore. + if time.Now().After(m.mutex.Until()) { + return true + } + + toExtend = append(toExtend, m) + return true + }) + for _, v := range toExtend { + if ok, err := v.mutex.Extend(); !ok { + v.cancel(err) + } + } + + time.AfterFunc(redisLockExpiry/2, l.startExtend) +}