1
1
mirror of https://github.com/go-gitea/gitea synced 2025-08-04 16:48:37 +00:00

Automatically pause queue if index service is unavailable (#15066)

* Handle keyword search error when issue indexer service is not available

* Implement automatic disabling and resume of code indexer queue
This commit is contained in:
Lauris BH
2022-01-27 10:30:51 +02:00
committed by GitHub
parent 2649eddcf0
commit 8038610a42
28 changed files with 522 additions and 151 deletions

View File

@@ -5,6 +5,7 @@
package issues
import (
"context"
"fmt"
"os"
"strconv"
@@ -186,6 +187,15 @@ func (b *BleveIndexer) Init() (bool, error) {
return false, err
}
// SetAvailabilityChangeCallback does nothing
func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
}
// Ping does nothing
func (b *BleveIndexer) Ping() bool {
return true
}
// Close will close the bleve indexer
func (b *BleveIndexer) Close() {
if b.indexer != nil {
@@ -229,7 +239,7 @@ func (b *BleveIndexer) Delete(ids ...int64) error {
// Search searches for issues by given conditions.
// Returns the matching issue IDs
func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
func (b *BleveIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
var repoQueriesP []*query.NumericRangeQuery
for _, repoID := range repoIDs {
repoQueriesP = append(repoQueriesP, numericEqualityQuery(repoID, "RepoID"))
@@ -249,7 +259,7 @@ func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int)
search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false)
search.SortBy([]string{"-_score"})
result, err := b.indexer.Search(search)
result, err := b.indexer.SearchInContext(ctx, search)
if err != nil {
return nil, err
}

View File

@@ -5,6 +5,7 @@
package issues
import (
"context"
"os"
"testing"
@@ -84,7 +85,7 @@ func TestBleveIndexAndSearch(t *testing.T) {
}
for _, kw := range keywords {
res, err := indexer.Search(kw.Keyword, []int64{2}, 10, 0)
res, err := indexer.Search(context.TODO(), kw.Keyword, []int64{2}, 10, 0)
assert.NoError(t, err)
ids := make([]int64, 0, len(res.Hits))

View File

@@ -4,33 +4,47 @@
package issues
import "code.gitea.io/gitea/models"
import (
"context"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/models/db"
)
// DBIndexer implements Indexer interface to use database's like search
type DBIndexer struct{}
// Init dummy function
func (db *DBIndexer) Init() (bool, error) {
func (i *DBIndexer) Init() (bool, error) {
return false, nil
}
// SetAvailabilityChangeCallback dummy function
func (i *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
}
// Ping checks if database is available
func (i *DBIndexer) Ping() bool {
return db.GetEngine(db.DefaultContext).Ping() != nil
}
// Index dummy function
func (db *DBIndexer) Index(issue []*IndexerData) error {
func (i *DBIndexer) Index(issue []*IndexerData) error {
return nil
}
// Delete dummy function
func (db *DBIndexer) Delete(ids ...int64) error {
func (i *DBIndexer) Delete(ids ...int64) error {
return nil
}
// Close dummy function
func (db *DBIndexer) Close() {
func (i *DBIndexer) Close() {
}
// Search dummy function
func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start)
func (i *DBIndexer) Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
total, ids, err := models.SearchIssueIDsByKeyword(ctx, kw, repoIDs, limit, start)
if err != nil {
return nil, err
}

View File

@@ -8,9 +8,12 @@ import (
"context"
"errors"
"fmt"
"net"
"strconv"
"sync"
"time"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"github.com/olivere/elastic/v7"
@@ -20,8 +23,12 @@ var _ Indexer = &ElasticSearchIndexer{}
// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
client *elastic.Client
indexerName string
client *elastic.Client
indexerName string
available bool
availabilityCallback func(bool)
stopTimer chan struct{}
lock sync.RWMutex
}
type elasticLogger struct {
@@ -56,10 +63,27 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, er
return nil, err
}
return &ElasticSearchIndexer{
indexer := &ElasticSearchIndexer{
client: client,
indexerName: indexerName,
}, nil
available: true,
stopTimer: make(chan struct{}),
}
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ticker.C:
indexer.checkAvailability()
case <-indexer.stopTimer:
ticker.Stop()
return
}
}
}()
return indexer, nil
}
const (
@@ -93,10 +117,10 @@ const (
// Init will initialize the indexer
func (b *ElasticSearchIndexer) Init() (bool, error) {
ctx := context.Background()
ctx := graceful.GetManager().HammerContext()
exists, err := b.client.IndexExists(b.indexerName).Do(ctx)
if err != nil {
return false, err
return false, b.checkError(err)
}
if !exists {
@@ -104,7 +128,7 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx)
if err != nil {
return false, err
return false, b.checkError(err)
}
if !createIndex.Acknowledged {
return false, errors.New("init failed")
@@ -115,6 +139,20 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
return true, nil
}
// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
b.lock.Lock()
defer b.lock.Unlock()
b.availabilityCallback = callback
}
// Ping checks if elastic is available
func (b *ElasticSearchIndexer) Ping() bool {
b.lock.RLock()
defer b.lock.RUnlock()
return b.available
}
// Index will save the index data
func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
if len(issues) == 0 {
@@ -131,8 +169,8 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
"content": issue.Content,
"comments": issue.Comments,
}).
Do(context.Background())
return err
Do(graceful.GetManager().HammerContext())
return b.checkError(err)
}
reqs := make([]elastic.BulkableRequest, 0)
@@ -154,8 +192,8 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
_, err := b.client.Bulk().
Index(b.indexerName).
Add(reqs...).
Do(context.Background())
return err
Do(graceful.GetManager().HammerContext())
return b.checkError(err)
}
// Delete deletes indexes by ids
@@ -166,8 +204,8 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
_, err := b.client.Delete().
Index(b.indexerName).
Id(fmt.Sprintf("%d", ids[0])).
Do(context.Background())
return err
Do(graceful.GetManager().HammerContext())
return b.checkError(err)
}
reqs := make([]elastic.BulkableRequest, 0)
@@ -182,13 +220,13 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
_, err := b.client.Bulk().
Index(b.indexerName).
Add(reqs...).
Do(context.Background())
return err
Do(graceful.GetManager().HammerContext())
return b.checkError(err)
}
// Search searches for issues by given conditions.
// Returns the matching issue IDs
func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
func (b *ElasticSearchIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments")
query := elastic.NewBoolQuery()
query = query.Must(kwQuery)
@@ -205,9 +243,9 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st
Query(query).
Sort("_score", false).
From(start).Size(limit).
Do(context.Background())
Do(ctx)
if err != nil {
return nil, err
return nil, b.checkError(err)
}
hits := make([]Match, 0, limit)
@@ -225,4 +263,51 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st
}
// Close implements indexer
func (b *ElasticSearchIndexer) Close() {}
func (b *ElasticSearchIndexer) Close() {
select {
case <-b.stopTimer:
default:
close(b.stopTimer)
}
}
func (b *ElasticSearchIndexer) checkError(err error) error {
var opErr *net.OpError
if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
return err
}
b.setAvailability(false)
return err
}
func (b *ElasticSearchIndexer) checkAvailability() {
if b.Ping() {
return
}
// Request cluster state to check if elastic is available again
_, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext())
if err != nil {
b.setAvailability(false)
return
}
b.setAvailability(true)
}
func (b *ElasticSearchIndexer) setAvailability(available bool) {
b.lock.Lock()
defer b.lock.Unlock()
if b.available == available {
return
}
b.available = available
if b.availabilityCallback != nil {
// Call the callback from within the lock to ensure that the ordering remains correct
b.availabilityCallback(b.available)
}
}

View File

@@ -47,9 +47,11 @@ type SearchResult struct {
// Indexer defines an interface to indexer issues contents
type Indexer interface {
Init() (bool, error)
Ping() bool
SetAvailabilityChangeCallback(callback func(bool))
Index(issue []*IndexerData) error
Delete(ids ...int64) error
Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
Close()
}
@@ -111,6 +113,7 @@ func InitIssueIndexer(syncReindex bool) {
}
iData := make([]*IndexerData, 0, len(data))
unhandled := make([]queue.Data, 0, len(data))
for _, datum := range data {
indexerData, ok := datum.(*IndexerData)
if !ok {
@@ -119,13 +122,34 @@ func InitIssueIndexer(syncReindex bool) {
}
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
if indexerData.IsDelete {
_ = indexer.Delete(indexerData.IDs...)
if err := indexer.Delete(indexerData.IDs...); err != nil {
log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err)
if indexer.Ping() {
continue
}
// Add back to queue
unhandled = append(unhandled, datum)
}
continue
}
iData = append(iData, indexerData)
}
if len(unhandled) > 0 {
for _, indexerData := range iData {
unhandled = append(unhandled, indexerData)
}
return unhandled
}
if err := indexer.Index(iData); err != nil {
log.Error("Error whilst indexing: %v Error: %v", iData, err)
if indexer.Ping() {
return nil
}
// Add back to queue
for _, indexerData := range iData {
unhandled = append(unhandled, indexerData)
}
return unhandled
}
return nil
}
@@ -193,6 +217,18 @@ func InitIssueIndexer(syncReindex bool) {
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
if queue, ok := issueIndexerQueue.(queue.Pausable); ok {
holder.get().SetAvailabilityChangeCallback(func(available bool) {
if !available {
log.Info("Issue index queue paused")
queue.Pause()
} else {
log.Info("Issue index queue resumed")
queue.Resume()
}
})
}
// Start processing the queue
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
@@ -334,7 +370,7 @@ func DeleteRepoIssueIndexer(repo *repo_model.Repository) {
// SearchIssuesByKeyword search issue ids by keywords and repo id
// WARNNING: You have to ensure user have permission to visit repoIDs' issues
func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
func SearchIssuesByKeyword(ctx context.Context, repoIDs []int64, keyword string) ([]int64, error) {
var issueIDs []int64
indexer := holder.get()
@@ -342,7 +378,7 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
log.Error("SearchIssuesByKeyword(): unable to get indexer!")
return nil, fmt.Errorf("unable to get issue indexer")
}
res, err := indexer.Search(keyword, repoIDs, 50, 0)
res, err := indexer.Search(ctx, keyword, repoIDs, 50, 0)
if err != nil {
return nil, err
}
@@ -351,3 +387,14 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
}
return issueIDs, nil
}
// IsAvailable checks if issue indexer is available
func IsAvailable() bool {
indexer := holder.get()
if indexer == nil {
log.Error("IsAvailable(): unable to get indexer!")
return false
}
return indexer.Ping()
}

View File

@@ -5,6 +5,7 @@
package issues
import (
"context"
"os"
"path"
"path/filepath"
@@ -56,19 +57,19 @@ func TestBleveSearchIssues(t *testing.T) {
time.Sleep(5 * time.Second)
ids, err := SearchIssuesByKeyword([]int64{1}, "issue2")
ids, err := SearchIssuesByKeyword(context.TODO(), []int64{1}, "issue2")
assert.NoError(t, err)
assert.EqualValues(t, []int64{2}, ids)
ids, err = SearchIssuesByKeyword([]int64{1}, "first")
ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "first")
assert.NoError(t, err)
assert.EqualValues(t, []int64{1}, ids)
ids, err = SearchIssuesByKeyword([]int64{1}, "for")
ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "for")
assert.NoError(t, err)
assert.ElementsMatch(t, []int64{1, 2, 3, 5, 11}, ids)
ids, err = SearchIssuesByKeyword([]int64{1}, "good")
ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "good")
assert.NoError(t, err)
assert.EqualValues(t, []int64{1}, ids)
}
@@ -79,19 +80,19 @@ func TestDBSearchIssues(t *testing.T) {
setting.Indexer.IssueType = "db"
InitIssueIndexer(true)
ids, err := SearchIssuesByKeyword([]int64{1}, "issue2")
ids, err := SearchIssuesByKeyword(context.TODO(), []int64{1}, "issue2")
assert.NoError(t, err)
assert.EqualValues(t, []int64{2}, ids)
ids, err = SearchIssuesByKeyword([]int64{1}, "first")
ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "first")
assert.NoError(t, err)
assert.EqualValues(t, []int64{1}, ids)
ids, err = SearchIssuesByKeyword([]int64{1}, "for")
ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "for")
assert.NoError(t, err)
assert.ElementsMatch(t, []int64{1, 2, 3, 5, 11}, ids)
ids, err = SearchIssuesByKeyword([]int64{1}, "good")
ids, err = SearchIssuesByKeyword(context.TODO(), []int64{1}, "good")
assert.NoError(t, err)
assert.EqualValues(t, []int64{1}, ids)
}