Fix queue test (#30646) (#30650)

Backport #30553 and #30646
This commit is contained in:
wxiaoguang 2024-04-23 16:30:32 +08:00 committed by GitHub
parent d95408bd5d
commit 32f895f2d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 43 additions and 19 deletions

View File

@ -8,7 +8,7 @@ import (
"time" "time"
) )
const ( var (
backoffBegin = 50 * time.Millisecond backoffBegin = 50 * time.Millisecond
backoffUpper = 2 * time.Second backoffUpper = 2 * time.Second
) )
@ -18,6 +18,14 @@ type (
backoffFuncErr func() (retry bool, err error) backoffFuncErr func() (retry bool, err error)
) )
func mockBackoffDuration(d time.Duration) func() {
oldBegin, oldUpper := backoffBegin, backoffUpper
backoffBegin, backoffUpper = d, d
return func() {
backoffBegin, backoffUpper = oldBegin, oldUpper
}
}
func backoffRetErr[T any](ctx context.Context, begin, upper time.Duration, end <-chan time.Time, fn backoffFuncRetErr[T]) (ret T, err error) { func backoffRetErr[T any](ctx context.Context, begin, upper time.Duration, end <-chan time.Time, fn backoffFuncRetErr[T]) (ret T, err error) {
d := begin d := begin
for { for {

View File

@ -63,6 +63,8 @@ func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushCh
// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum" // TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later // The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
// So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary. // So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
// This data-race is not serious, as long as a new worker will be started soon to make sure there are enough workers,
// so no need to hugely refactor at the moment.
q.workerNumMu.Lock() q.workerNumMu.Lock()
noWorker := q.workerNum == 0 noWorker := q.workerNum == 0
if full || noWorker { if full || noWorker {
@ -136,6 +138,14 @@ func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
return true return true
} }
func resetIdleTicker(t *time.Ticker, dur time.Duration) {
t.Reset(dur)
select {
case <-t.C:
default:
}
}
// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items. // doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items.
func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
wp.wg.Add(1) wp.wg.Add(1)
@ -146,8 +156,6 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
log.Debug("Queue %q starts new worker", q.GetName()) log.Debug("Queue %q starts new worker", q.GetName())
defer log.Debug("Queue %q stops idle worker", q.GetName()) defer log.Debug("Queue %q stops idle worker", q.GetName())
atomic.AddInt32(&q.workerStartedCounter, 1) // Only increase counter, used for debugging
t := time.NewTicker(workerIdleDuration) t := time.NewTicker(workerIdleDuration)
defer t.Stop() defer t.Stop()
@ -169,11 +177,7 @@ func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
} }
q.doWorkerHandle(batch) q.doWorkerHandle(batch)
// reset the idle ticker, and drain the tick after reset in case a tick is already triggered // reset the idle ticker, and drain the tick after reset in case a tick is already triggered
t.Reset(workerIdleDuration) resetIdleTicker(t, workerIdleDuration) // key code for TestWorkerPoolQueueWorkerIdleReset
select {
case <-t.C:
default:
}
case <-t.C: case <-t.C:
q.workerNumMu.Lock() q.workerNumMu.Lock()
keepWorking = q.workerNum <= 1 // keep the last worker running keepWorking = q.workerNum <= 1 // keep the last worker running

View File

@ -40,8 +40,6 @@ type WorkerPoolQueue[T any] struct {
workerMaxNum int workerMaxNum int
workerActiveNum int workerActiveNum int
workerNumMu sync.Mutex workerNumMu sync.Mutex
workerStartedCounter int32
} }
type flushType chan struct{} type flushType chan struct{}

View File

@ -5,8 +5,10 @@ package queue
import ( import (
"context" "context"
"slices"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -250,22 +252,34 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) { func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)() defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
defer mockBackoffDuration(5 * time.Millisecond)()
var q *WorkerPoolQueue[int]
var handledCount atomic.Int32
var hasOnlyOneWorkerRunning atomic.Bool
handler := func(items ...int) (unhandled []int) { handler := func(items ...int) (unhandled []int) {
time.Sleep(50 * time.Millisecond) handledCount.Add(int32(len(items)))
// make each work have different duration, and check the active worker number periodically
var activeNums []int
for i := 0; i < 5-items[0]%2; i++ {
time.Sleep(workerIdleDuration * 2)
activeNums = append(activeNums, q.GetWorkerActiveNumber())
}
// When the queue never becomes empty, the existing workers should keep working
// It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */
// If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started.
if slices.Equal([]int{1, 1}, activeNums[len(activeNums)-2:]) {
hasOnlyOneWorkerRunning.Store(true)
}
return nil return nil
} }
q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
stop := runWorkerPoolQueue(q) stop := runWorkerPoolQueue(q)
for i := 0; i < 20; i++ { for i := 0; i < 100; i++ {
assert.NoError(t, q.Push(i)) assert.NoError(t, q.Push(i))
} }
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
assert.EqualValues(t, 2, q.GetWorkerNumber()) assert.Greater(t, int(handledCount.Load()), 4) // make sure there are enough items handled during the test
assert.EqualValues(t, 2, q.GetWorkerActiveNumber()) assert.False(t, hasOnlyOneWorkerRunning.Load(), "a slow handler should not block other workers from starting")
// when the queue never becomes empty, the existing workers should keep working
assert.EqualValues(t, 2, q.workerStartedCounter)
stop() stop()
} }