diff --git a/integrations/testlogger.go b/integrations/testlogger.go index 9498ad655b..ff408b314c 100644 --- a/integrations/testlogger.go +++ b/integrations/testlogger.go @@ -121,7 +121,7 @@ func PrintCurrentTest(t testing.TB, skip ...int) func() { fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", t.Name(), slowFlush) } }) - if err := queue.GetManager().FlushAll(context.Background(), -1); err != nil { + if err := queue.GetManager().FlushAll(context.Background(), 2*time.Minute); err != nil { t.Errorf("Flushing queues failed with error %v", err) } timer.Stop() diff --git a/modules/queue/manager.go b/modules/queue/manager.go index a88933191a..23e96155a9 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -9,6 +9,7 @@ import ( "fmt" "reflect" "sort" + "strings" "sync" "time" @@ -169,7 +170,17 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error for { select { case <-ctx.Done(): - return ctx.Err() + mqs := m.ManagedQueues() + nonEmptyQueues := []string{} + for _, mq := range mqs { + if !mq.IsEmpty() { + nonEmptyQueues = append(nonEmptyQueues, mq.Name) + } + } + if len(nonEmptyQueues) > 0 { + return fmt.Errorf("flush timeout with non-empty queues: %s", strings.Join(nonEmptyQueues, ", ")) + } + return nil default: } mqs := m.ManagedQueues()