From 830ae614560b0c504c00d693b63d9889bac1a2d8 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Tue, 19 Feb 2019 22:39:39 +0800 Subject: [PATCH] Refactor issue indexer (#5363) --- Gopkg.lock | 9 + models/issue.go | 18 +- models/issue_comment.go | 2 + models/issue_indexer.go | 168 +++++++------ models/issue_list.go | 16 +- models/models.go | 14 -- models/unit_tests.go | 4 + modules/indexer/issues/bleve.go | 250 ++++++++++++++++++++ modules/indexer/issues/bleve_test.go | 88 +++++++ modules/indexer/issues/indexer.go | 36 +++ modules/indexer/issues/queue.go | 11 + modules/indexer/issues/queue_channel.go | 56 +++++ modules/indexer/issues/queue_disk.go | 104 ++++++++ modules/notification/indexer/indexer.go | 62 ++++- modules/setting/indexer.go | 55 +++++ modules/setting/setting.go | 11 +- routers/api/v1/repo/issue.go | 3 +- routers/init.go | 4 +- routers/repo/issue.go | 7 +- vendor/github.com/lunny/levelqueue/LICENSE | 19 ++ vendor/github.com/lunny/levelqueue/error.go | 12 + vendor/github.com/lunny/levelqueue/queue.go | 214 +++++++++++++++++ 22 files changed, 1046 insertions(+), 117 deletions(-) create mode 100644 modules/indexer/issues/bleve.go create mode 100644 modules/indexer/issues/bleve_test.go create mode 100644 modules/indexer/issues/indexer.go create mode 100644 modules/indexer/issues/queue.go create mode 100644 modules/indexer/issues/queue_channel.go create mode 100644 modules/indexer/issues/queue_disk.go create mode 100644 modules/setting/indexer.go create mode 100644 vendor/github.com/lunny/levelqueue/LICENSE create mode 100644 vendor/github.com/lunny/levelqueue/error.go create mode 100644 vendor/github.com/lunny/levelqueue/queue.go diff --git a/Gopkg.lock b/Gopkg.lock index fa2a58a1a3..3ef6f552ed 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -616,6 +616,14 @@ pruneopts = "NUT" revision = "e3534c89ef969912856dfa39e56b09e58c5f5daf" +[[projects]] + branch = "master" + digest = "1:3ea59a5ada4bbac04da58e6177ca63da8c377a3143b48fca584408bf415fdafb" + name = "github.com/lunny/levelqueue" + packages = ["."] + pruneopts = "NUT" + revision = "02b525a4418e684a7786215296984e364746806f" + [[projects]] digest = "1:1e6a29ed1f189354030e3371f63ec58aacbc2bf232fd104c6e0d41174ac5af48" name = "github.com/lunny/log" @@ -1270,6 +1278,7 @@ "github.com/lafriks/xormstore", "github.com/lib/pq", "github.com/lunny/dingtalk_webhook", + "github.com/lunny/levelqueue", "github.com/markbates/goth", "github.com/markbates/goth/gothic", "github.com/markbates/goth/providers/bitbucket", diff --git a/models/issue.go b/models/issue.go index 8ce8658fee..835c6cf9fc 100644 --- a/models/issue.go +++ b/models/issue.go @@ -183,12 +183,21 @@ func (issue *Issue) LoadPullRequest() error { } func (issue *Issue) loadComments(e Engine) (err error) { + return issue.loadCommentsByType(e, CommentTypeUnknown) +} + +// LoadDiscussComments loads discuss comments +func (issue *Issue) LoadDiscussComments() error { + return issue.loadCommentsByType(x, CommentTypeComment) +} + +func (issue *Issue) loadCommentsByType(e Engine, tp CommentType) (err error) { if issue.Comments != nil { return nil } issue.Comments, err = findComments(e, FindCommentsOptions{ IssueID: issue.ID, - Type: CommentTypeUnknown, + Type: tp, }) return err } @@ -681,7 +690,6 @@ func updateIssueCols(e Engine, issue *Issue, cols ...string) error { if _, err := e.ID(issue.ID).Cols(cols...).Update(issue); err != nil { return err } - UpdateIssueIndexerCols(issue.ID, cols...) return nil } @@ -1217,6 +1225,12 @@ func getIssuesByIDs(e Engine, issueIDs []int64) ([]*Issue, error) { return issues, e.In("id", issueIDs).Find(&issues) } +func getIssueIDsByRepoID(e Engine, repoID int64) ([]int64, error) { + var ids = make([]int64, 0, 10) + err := e.Table("issue").Where("repo_id = ?", repoID).Find(&ids) + return ids, err +} + // GetIssuesByIDs return issues with the given IDs. func GetIssuesByIDs(issueIDs []int64) ([]*Issue, error) { return getIssuesByIDs(x, issueIDs) diff --git a/models/issue_comment.go b/models/issue_comment.go index 1b02918cb7..c3654460ff 100644 --- a/models/issue_comment.go +++ b/models/issue_comment.go @@ -1035,6 +1035,7 @@ func UpdateComment(doer *User, c *Comment, oldContent string) error { if err := c.LoadIssue(); err != nil { return err } + if err := c.Issue.LoadAttributes(); err != nil { return err } @@ -1093,6 +1094,7 @@ func DeleteComment(doer *User, comment *Comment) error { if err := comment.LoadIssue(); err != nil { return err } + if err := comment.Issue.LoadAttributes(); err != nil { return err } diff --git a/models/issue_indexer.go b/models/issue_indexer.go index 48c0b9f246..d02b7164da 100644 --- a/models/issue_indexer.go +++ b/models/issue_indexer.go @@ -7,25 +7,60 @@ package models import ( "fmt" - "code.gitea.io/gitea/modules/indexer" + "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" ) -// issueIndexerUpdateQueue queue of issue ids to be updated -var issueIndexerUpdateQueue chan int64 +var ( + // issueIndexerUpdateQueue queue of issue ids to be updated + issueIndexerUpdateQueue issues.Queue + issueIndexer issues.Indexer +) // InitIssueIndexer initialize issue indexer -func InitIssueIndexer() { - indexer.InitIssueIndexer(populateIssueIndexer) - issueIndexerUpdateQueue = make(chan int64, setting.Indexer.UpdateQueueLength) - go processIssueIndexerUpdateQueue() +func InitIssueIndexer() error { + var populate bool + switch setting.Indexer.IssueType { + case "bleve": + issueIndexer = issues.NewBleveIndexer(setting.Indexer.IssuePath) + exist, err := issueIndexer.Init() + if err != nil { + return err + } + populate = !exist + default: + return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType) + } + + var err error + switch setting.Indexer.IssueIndexerQueueType { + case setting.LevelQueueType: + issueIndexerUpdateQueue, err = issues.NewLevelQueue( + issueIndexer, + setting.Indexer.IssueIndexerQueueDir, + setting.Indexer.IssueIndexerQueueBatchNumber) + if err != nil { + return err + } + case setting.ChannelQueueType: + issueIndexerUpdateQueue = issues.NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber) + default: + return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType) + } + + go issueIndexerUpdateQueue.Run() + + if populate { + go populateIssueIndexer() + } + + return nil } // populateIssueIndexer populate the issue indexer with issue data -func populateIssueIndexer() error { - batch := indexer.IssueIndexerBatch() +func populateIssueIndexer() { for page := 1; ; page++ { repos, _, err := SearchRepositoryByName(&SearchRepoOptions{ Page: page, @@ -35,98 +70,79 @@ func populateIssueIndexer() error { Collaborate: util.OptionalBoolFalse, }) if err != nil { - return fmt.Errorf("Repositories: %v", err) + log.Error(4, "SearchRepositoryByName: %v", err) + continue } if len(repos) == 0 { - return batch.Flush() + return } + for _, repo := range repos { - issues, err := Issues(&IssuesOptions{ + is, err := Issues(&IssuesOptions{ RepoIDs: []int64{repo.ID}, IsClosed: util.OptionalBoolNone, IsPull: util.OptionalBoolNone, }) if err != nil { - return err + log.Error(4, "Issues: %v", err) + continue } - if err = IssueList(issues).LoadComments(); err != nil { - return err + if err = IssueList(is).LoadDiscussComments(); err != nil { + log.Error(4, "LoadComments: %v", err) + continue } - for _, issue := range issues { - if err := issue.update().AddToFlushingBatch(batch); err != nil { - return err - } + for _, issue := range is { + UpdateIssueIndexer(issue) } } } } -func processIssueIndexerUpdateQueue() { - batch := indexer.IssueIndexerBatch() - for { - var issueID int64 - select { - case issueID = <-issueIndexerUpdateQueue: - default: - // flush whatever updates we currently have, since we - // might have to wait a while - if err := batch.Flush(); err != nil { - log.Error(4, "IssueIndexer: %v", err) - } - issueID = <-issueIndexerUpdateQueue - } - issue, err := GetIssueByID(issueID) - if err != nil { - log.Error(4, "GetIssueByID: %v", err) - } else if err = issue.update().AddToFlushingBatch(batch); err != nil { - log.Error(4, "IssueIndexer: %v", err) - } - } -} - -func (issue *Issue) update() indexer.IssueIndexerUpdate { - comments := make([]string, 0, 5) +// UpdateIssueIndexer add/update an issue to the issue indexer +func UpdateIssueIndexer(issue *Issue) { + var comments []string for _, comment := range issue.Comments { if comment.Type == CommentTypeComment { comments = append(comments, comment.Content) } } - return indexer.IssueIndexerUpdate{ - IssueID: issue.ID, - Data: &indexer.IssueIndexerData{ - RepoID: issue.RepoID, - Title: issue.Title, - Content: issue.Content, - Comments: comments, - }, - } + issueIndexerUpdateQueue.Push(&issues.IndexerData{ + ID: issue.ID, + RepoID: issue.RepoID, + Title: issue.Title, + Content: issue.Content, + Comments: comments, + }) } -// updateNeededCols whether a change to the specified columns requires updating -// the issue indexer -func updateNeededCols(cols []string) bool { - for _, col := range cols { - switch col { - case "name", "content": - return true - } +// DeleteRepoIssueIndexer deletes repo's all issues indexes +func DeleteRepoIssueIndexer(repo *Repository) { + var ids []int64 + ids, err := getIssueIDsByRepoID(x, repo.ID) + if err != nil { + log.Error(4, "getIssueIDsByRepoID failed: %v", err) + return } - return false + + if len(ids) <= 0 { + return + } + + issueIndexerUpdateQueue.Push(&issues.IndexerData{ + IDs: ids, + IsDelete: true, + }) } -// UpdateIssueIndexerCols update an issue in the issue indexer, given changes -// to the specified columns -func UpdateIssueIndexerCols(issueID int64, cols ...string) { - updateNeededCols(cols) -} - -// UpdateIssueIndexer add/update an issue to the issue indexer -func UpdateIssueIndexer(issueID int64) { - select { - case issueIndexerUpdateQueue <- issueID: - default: - go func() { - issueIndexerUpdateQueue <- issueID - }() +// SearchIssuesByKeyword search issue ids by keywords and repo id +func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) { + var issueIDs []int64 + res, err := issueIndexer.Search(keyword, repoID, 1000, 0) + if err != nil { + return nil, err } + for _, r := range res.Hits { + issueIDs = append(issueIDs, r.ID) + } + return issueIDs, nil } diff --git a/models/issue_list.go b/models/issue_list.go index 7e4c264643..a1aab488fc 100644 --- a/models/issue_list.go +++ b/models/issue_list.go @@ -4,7 +4,11 @@ package models -import "fmt" +import ( + "fmt" + + "github.com/go-xorm/builder" +) // IssueList defines a list of issues type IssueList []*Issue @@ -338,7 +342,7 @@ func (issues IssueList) loadAttachments(e Engine) (err error) { return nil } -func (issues IssueList) loadComments(e Engine) (err error) { +func (issues IssueList) loadComments(e Engine, cond builder.Cond) (err error) { if len(issues) == 0 { return nil } @@ -354,6 +358,7 @@ func (issues IssueList) loadComments(e Engine) (err error) { rows, err := e.Table("comment"). Join("INNER", "issue", "issue.id = comment.issue_id"). In("issue.id", issuesIDs[:limit]). + Where(cond). Rows(new(Comment)) if err != nil { return err @@ -479,5 +484,10 @@ func (issues IssueList) LoadAttachments() error { // LoadComments loads comments func (issues IssueList) LoadComments() error { - return issues.loadComments(x) + return issues.loadComments(x, builder.NewCond()) +} + +// LoadDiscussComments loads discuss comments +func (issues IssueList) LoadDiscussComments() error { + return issues.loadComments(x, builder.Eq{"comment.type": CommentTypeComment}) } diff --git a/models/models.go b/models/models.go index daef7c07e8..b8fe588b5a 100644 --- a/models/models.go +++ b/models/models.go @@ -12,7 +12,6 @@ import ( "net/url" "os" "path" - "path/filepath" "strings" "code.gitea.io/gitea/modules/log" @@ -158,19 +157,6 @@ func LoadConfigs() { DbCfg.SSLMode = sec.Key("SSL_MODE").MustString("disable") DbCfg.Path = sec.Key("PATH").MustString("data/gitea.db") DbCfg.Timeout = sec.Key("SQLITE_TIMEOUT").MustInt(500) - - sec = setting.Cfg.Section("indexer") - setting.Indexer.IssuePath = sec.Key("ISSUE_INDEXER_PATH").MustString(path.Join(setting.AppDataPath, "indexers/issues.bleve")) - if !filepath.IsAbs(setting.Indexer.IssuePath) { - setting.Indexer.IssuePath = path.Join(setting.AppWorkPath, setting.Indexer.IssuePath) - } - setting.Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false) - setting.Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(setting.AppDataPath, "indexers/repos.bleve")) - if !filepath.IsAbs(setting.Indexer.RepoPath) { - setting.Indexer.RepoPath = path.Join(setting.AppWorkPath, setting.Indexer.RepoPath) - } - setting.Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20) - setting.Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024) } // parsePostgreSQLHostPort parses given input in various forms defined in diff --git a/models/unit_tests.go b/models/unit_tests.go index 28cd91215e..f87dd7ee96 100644 --- a/models/unit_tests.go +++ b/models/unit_tests.go @@ -44,6 +44,10 @@ func MainTest(m *testing.M, pathToGiteaRoot string) { fatalTestError("Error creating test engine: %v\n", err) } + if err = InitIssueIndexer(); err != nil { + fatalTestError("Error InitIssueIndexer: %v\n", err) + } + setting.AppURL = "https://try.gitea.io/" setting.RunUser = "runuser" setting.SSH.Port = 3000 diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go new file mode 100644 index 0000000000..36279198b8 --- /dev/null +++ b/modules/indexer/issues/bleve.go @@ -0,0 +1,250 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "fmt" + "os" + "strconv" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/analysis/analyzer/custom" + "github.com/blevesearch/bleve/analysis/token/lowercase" + "github.com/blevesearch/bleve/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/analysis/tokenizer/unicode" + "github.com/blevesearch/bleve/index/upsidedown" + "github.com/blevesearch/bleve/mapping" + "github.com/blevesearch/bleve/search/query" + "github.com/ethantkoenig/rupture" +) + +const ( + issueIndexerAnalyzer = "issueIndexer" + issueIndexerDocType = "issueIndexerDocType" + issueIndexerLatestVersion = 1 +) + +// indexerID a bleve-compatible unique identifier for an integer id +func indexerID(id int64) string { + return strconv.FormatInt(id, 36) +} + +// idOfIndexerID the integer id associated with an indexer id +func idOfIndexerID(indexerID string) (int64, error) { + id, err := strconv.ParseInt(indexerID, 36, 64) + if err != nil { + return 0, fmt.Errorf("Unexpected indexer ID %s: %v", indexerID, err) + } + return id, nil +} + +// numericEqualityQuery a numeric equality query for the given value and field +func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { + f := float64(value) + tru := true + q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) + q.SetField(field) + return q +} + +func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhraseQuery { + q := bleve.NewMatchPhraseQuery(matchPhrase) + q.FieldVal = field + q.Analyzer = analyzer + return q +} + +const unicodeNormalizeName = "unicodeNormalize" + +func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { + return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ + "type": unicodenorm.Name, + "form": unicodenorm.NFC, + }) +} + +const maxBatchSize = 16 + +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, error) { + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + + metadata, err := rupture.ReadIndexMetadata(path) + if err != nil { + return nil, err + } + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, os.RemoveAll(path) + } + + index, err := bleve.Open(path) + if err != nil && err == upsidedown.IncompatibleVersion { + // the indexer was built with a previous version of bleve, so we should + // delete it and re-populate + return nil, os.RemoveAll(path) + } else if err != nil { + return nil, err + } + + return index, nil +} + +// BleveIndexerData an update to the issue indexer +type BleveIndexerData IndexerData + +// Type returns the document type, for bleve's mapping.Classifier interface. +func (i *BleveIndexerData) Type() string { + return issueIndexerDocType +} + +// createIssueIndexer create an issue indexer if one does not already exist +func createIssueIndexer(path string, latestVersion int) (bleve.Index, error) { + mapping := bleve.NewIndexMapping() + docMapping := bleve.NewDocumentMapping() + + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) + + textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.Store = false + textFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("Title", textFieldMapping) + docMapping.AddFieldMappingsAt("Content", textFieldMapping) + docMapping.AddFieldMappingsAt("Comments", textFieldMapping) + + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { + return nil, err + } else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{ + "type": custom.Name, + "char_filters": []string{}, + "tokenizer": unicode.Name, + "token_filters": []string{unicodeNormalizeName, lowercase.Name}, + }); err != nil { + return nil, err + } + + mapping.DefaultAnalyzer = issueIndexerAnalyzer + mapping.AddDocumentMapping(issueIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) + + index, err := bleve.New(path, mapping) + if err != nil { + return nil, err + } + + if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{ + Version: latestVersion, + }); err != nil { + return nil, err + } + return index, nil +} + +var ( + _ Indexer = &BleveIndexer{} +) + +// BleveIndexer implements Indexer interface +type BleveIndexer struct { + indexDir string + indexer bleve.Index +} + +// NewBleveIndexer creates a new bleve local indexer +func NewBleveIndexer(indexDir string) *BleveIndexer { + return &BleveIndexer{ + indexDir: indexDir, + } +} + +// Init will initial the indexer +func (b *BleveIndexer) Init() (bool, error) { + var err error + b.indexer, err = openIndexer(b.indexDir, issueIndexerLatestVersion) + if err != nil { + return false, err + } + if b.indexer != nil { + return true, nil + } + + b.indexer, err = createIssueIndexer(b.indexDir, issueIndexerLatestVersion) + return false, err +} + +// Index will save the index data +func (b *BleveIndexer) Index(issues []*IndexerData) error { + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + for _, issue := range issues { + if err := batch.Index(indexerID(issue.ID), struct { + RepoID int64 + Title string + Content string + Comments []string + }{ + RepoID: issue.RepoID, + Title: issue.Title, + Content: issue.Content, + Comments: issue.Comments, + }); err != nil { + return err + } + } + return batch.Flush() +} + +// Delete deletes indexes by ids +func (b *BleveIndexer) Delete(ids ...int64) error { + batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize) + for _, id := range ids { + if err := batch.Delete(indexerID(id)); err != nil { + return err + } + } + return batch.Flush() +} + +// Search searches for issues by given conditions. +// Returns the matching issue IDs +func (b *BleveIndexer) Search(keyword string, repoID int64, limit, start int) (*SearchResult, error) { + indexerQuery := bleve.NewConjunctionQuery( + numericEqualityQuery(repoID, "RepoID"), + bleve.NewDisjunctionQuery( + newMatchPhraseQuery(keyword, "Title", issueIndexerAnalyzer), + newMatchPhraseQuery(keyword, "Content", issueIndexerAnalyzer), + newMatchPhraseQuery(keyword, "Comments", issueIndexerAnalyzer), + )) + search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false) + + result, err := b.indexer.Search(search) + if err != nil { + return nil, err + } + + var ret = SearchResult{ + Hits: make([]Match, 0, len(result.Hits)), + } + for _, hit := range result.Hits { + id, err := idOfIndexerID(hit.ID) + if err != nil { + return nil, err + } + ret.Hits = append(ret.Hits, Match{ + ID: id, + RepoID: repoID, + }) + } + return &ret, nil +} diff --git a/modules/indexer/issues/bleve_test.go b/modules/indexer/issues/bleve_test.go new file mode 100644 index 0000000000..720266e3b5 --- /dev/null +++ b/modules/indexer/issues/bleve_test.go @@ -0,0 +1,88 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexAndSearch(t *testing.T) { + dir := "./bleve.index" + indexer := NewBleveIndexer(dir) + defer os.RemoveAll(dir) + + _, err := indexer.Init() + assert.NoError(t, err) + + err = indexer.Index([]*IndexerData{ + { + ID: 1, + RepoID: 2, + Title: "Issue search should support Chinese", + Content: "As title", + Comments: []string{ + "test1", + "test2", + }, + }, + { + ID: 2, + RepoID: 2, + Title: "CJK support could be optional", + Content: "Chinese Korean and Japanese should be supported but I would like it's not enabled by default", + Comments: []string{ + "LGTM", + "Good idea", + }, + }, + }) + assert.NoError(t, err) + + var ( + keywords = []struct { + Keyword string + IDs []int64 + }{ + { + Keyword: "search", + IDs: []int64{1}, + }, + { + Keyword: "test1", + IDs: []int64{1}, + }, + { + Keyword: "test2", + IDs: []int64{1}, + }, + { + Keyword: "support", + IDs: []int64{1, 2}, + }, + { + Keyword: "chinese", + IDs: []int64{1, 2}, + }, + { + Keyword: "help", + IDs: []int64{}, + }, + } + ) + + for _, kw := range keywords { + res, err := indexer.Search(kw.Keyword, 2, 10, 0) + assert.NoError(t, err) + + var ids = make([]int64, 0, len(res.Hits)) + for _, hit := range res.Hits { + ids = append(ids, hit.ID) + } + assert.EqualValues(t, kw.IDs, ids) + } +} diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go new file mode 100644 index 0000000000..c31006d0dd --- /dev/null +++ b/modules/indexer/issues/indexer.go @@ -0,0 +1,36 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +// IndexerData data stored in the issue indexer +type IndexerData struct { + ID int64 + RepoID int64 + Title string + Content string + Comments []string + IsDelete bool + IDs []int64 +} + +// Match represents on search result +type Match struct { + ID int64 `json:"id"` + RepoID int64 `json:"repo_id"` + Score float64 `json:"score"` +} + +// SearchResult represents search results +type SearchResult struct { + Hits []Match +} + +// Indexer defines an inteface to indexer issues contents +type Indexer interface { + Init() (bool, error) + Index(issue []*IndexerData) error + Delete(ids ...int64) error + Search(kw string, repoID int64, limit, start int) (*SearchResult, error) +} diff --git a/modules/indexer/issues/queue.go b/modules/indexer/issues/queue.go new file mode 100644 index 0000000000..6f4ee4c13a --- /dev/null +++ b/modules/indexer/issues/queue.go @@ -0,0 +1,11 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +// Queue defines an interface to save an issue indexer queue +type Queue interface { + Run() error + Push(*IndexerData) +} diff --git a/modules/indexer/issues/queue_channel.go b/modules/indexer/issues/queue_channel.go new file mode 100644 index 0000000000..99a90ad499 --- /dev/null +++ b/modules/indexer/issues/queue_channel.go @@ -0,0 +1,56 @@ +// Copyright 2018 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "time" + + "code.gitea.io/gitea/modules/setting" +) + +// ChannelQueue implements +type ChannelQueue struct { + queue chan *IndexerData + indexer Indexer + batchNumber int +} + +// NewChannelQueue create a memory channel queue +func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue { + return &ChannelQueue{ + queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength), + indexer: indexer, + batchNumber: batchNumber, + } +} + +// Run starts to run the queue +func (c *ChannelQueue) Run() error { + var i int + var datas = make([]*IndexerData, 0, c.batchNumber) + for { + select { + case data := <-c.queue: + datas = append(datas, data) + if len(datas) >= c.batchNumber { + c.indexer.Index(datas) + // TODO: save the point + datas = make([]*IndexerData, 0, c.batchNumber) + } + case <-time.After(time.Millisecond * 100): + i++ + if i >= 3 && len(datas) > 0 { + c.indexer.Index(datas) + // TODO: save the point + datas = make([]*IndexerData, 0, c.batchNumber) + } + } + } +} + +// Push will push the indexer data to queue +func (c *ChannelQueue) Push(data *IndexerData) { + c.queue <- data +} diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go new file mode 100644 index 0000000000..97e9a3d965 --- /dev/null +++ b/modules/indexer/issues/queue_disk.go @@ -0,0 +1,104 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package issues + +import ( + "encoding/json" + "time" + + "code.gitea.io/gitea/modules/log" + "github.com/lunny/levelqueue" +) + +var ( + _ Queue = &LevelQueue{} +) + +// LevelQueue implements a disk library queue +type LevelQueue struct { + indexer Indexer + queue *levelqueue.Queue + batchNumber int +} + +// NewLevelQueue creates a ledis local queue +func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) { + queue, err := levelqueue.Open(dataDir) + if err != nil { + return nil, err + } + + return &LevelQueue{ + indexer: indexer, + queue: queue, + batchNumber: batchNumber, + }, nil +} + +// Run starts to run the queue +func (l *LevelQueue) Run() error { + var i int + var datas = make([]*IndexerData, 0, l.batchNumber) + for { + bs, err := l.queue.RPop() + if err != nil { + log.Error(4, "RPop: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + i++ + if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) { + l.indexer.Index(datas) + datas = make([]*IndexerData, 0, l.batchNumber) + i = 0 + } + + if len(bs) <= 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + var data IndexerData + err = json.Unmarshal(bs, &data) + if err != nil { + log.Error(4, "Unmarshal: %v", err) + time.Sleep(time.Millisecond * 100) + continue + } + + log.Trace("LedisLocalQueue: task found: %#v", data) + + if data.IsDelete { + if data.ID > 0 { + if err = l.indexer.Delete(data.ID); err != nil { + log.Error(4, "indexer.Delete: %v", err) + } + } else if len(data.IDs) > 0 { + if err = l.indexer.Delete(data.IDs...); err != nil { + log.Error(4, "indexer.Delete: %v", err) + } + } + time.Sleep(time.Millisecond * 10) + continue + } + + datas = append(datas, &data) + time.Sleep(time.Millisecond * 10) + } +} + +// Push will push the indexer data to queue +func (l *LevelQueue) Push(data *IndexerData) { + bs, err := json.Marshal(data) + if err != nil { + log.Error(4, "Marshal: %v", err) + return + } + err = l.queue.LPush(bs) + if err != nil { + log.Error(4, "LPush: %v", err) + } +} diff --git a/modules/notification/indexer/indexer.go b/modules/notification/indexer/indexer.go index 3fd3352188..66d483c017 100644 --- a/modules/notification/indexer/indexer.go +++ b/modules/notification/indexer/indexer.go @@ -6,6 +6,7 @@ package indexer import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification/base" ) @@ -25,38 +26,83 @@ func NewNotifier() base.Notifier { func (r *indexerNotifier) NotifyCreateIssueComment(doer *models.User, repo *models.Repository, issue *models.Issue, comment *models.Comment) { if comment.Type == models.CommentTypeComment { - models.UpdateIssueIndexer(issue.ID) + if issue.Comments == nil { + if err := issue.LoadDiscussComments(); err != nil { + log.Error(4, "LoadComments failed: %v", err) + return + } + } else { + issue.Comments = append(issue.Comments, comment) + } + + models.UpdateIssueIndexer(issue) } } func (r *indexerNotifier) NotifyNewIssue(issue *models.Issue) { - models.UpdateIssueIndexer(issue.ID) + models.UpdateIssueIndexer(issue) } func (r *indexerNotifier) NotifyNewPullRequest(pr *models.PullRequest) { - models.UpdateIssueIndexer(pr.Issue.ID) + models.UpdateIssueIndexer(pr.Issue) } func (r *indexerNotifier) NotifyUpdateComment(doer *models.User, c *models.Comment, oldContent string) { if c.Type == models.CommentTypeComment { - models.UpdateIssueIndexer(c.IssueID) + var found bool + if c.Issue.Comments != nil { + for i := 0; i < len(c.Issue.Comments); i++ { + if c.Issue.Comments[i].ID == c.ID { + c.Issue.Comments[i] = c + found = true + break + } + } + } + + if !found { + if err := c.Issue.LoadDiscussComments(); err != nil { + log.Error(4, "LoadComments failed: %v", err) + return + } + } + + models.UpdateIssueIndexer(c.Issue) } } func (r *indexerNotifier) NotifyDeleteComment(doer *models.User, comment *models.Comment) { if comment.Type == models.CommentTypeComment { - models.UpdateIssueIndexer(comment.IssueID) + var found bool + if comment.Issue.Comments != nil { + for i := 0; i < len(comment.Issue.Comments); i++ { + if comment.Issue.Comments[i].ID == comment.ID { + comment.Issue.Comments = append(comment.Issue.Comments[:i], comment.Issue.Comments[i+1:]...) + found = true + break + } + } + } + + if !found { + if err := comment.Issue.LoadDiscussComments(); err != nil { + log.Error(4, "LoadComments failed: %v", err) + return + } + } + // reload comments to delete the old comment + models.UpdateIssueIndexer(comment.Issue) } } func (r *indexerNotifier) NotifyDeleteRepository(doer *models.User, repo *models.Repository) { - models.DeleteRepoFromIndexer(repo) + models.DeleteRepoIssueIndexer(repo) } func (r *indexerNotifier) NotifyIssueChangeContent(doer *models.User, issue *models.Issue, oldContent string) { - models.UpdateIssueIndexer(issue.ID) + models.UpdateIssueIndexer(issue) } func (r *indexerNotifier) NotifyIssueChangeTitle(doer *models.User, issue *models.Issue, oldTitle string) { - models.UpdateIssueIndexer(issue.ID) + models.UpdateIssueIndexer(issue) } diff --git a/modules/setting/indexer.go b/modules/setting/indexer.go new file mode 100644 index 0000000000..245ebb0496 --- /dev/null +++ b/modules/setting/indexer.go @@ -0,0 +1,55 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package setting + +import ( + "path" + "path/filepath" +) + +// enumerates all the indexer queue types +const ( + LevelQueueType = "levelqueue" + ChannelQueueType = "channel" +) + +var ( + // Indexer settings + Indexer = struct { + IssueType string + IssuePath string + RepoIndexerEnabled bool + RepoPath string + UpdateQueueLength int + MaxIndexerFileSize int64 + IssueIndexerQueueType string + IssueIndexerQueueDir string + IssueIndexerQueueBatchNumber int + }{ + IssueType: "bleve", + IssuePath: "indexers/issues.bleve", + IssueIndexerQueueType: LevelQueueType, + IssueIndexerQueueDir: "indexers/issues.queue", + IssueIndexerQueueBatchNumber: 20, + } +) + +func newIndexerService() { + sec := Cfg.Section("indexer") + Indexer.IssuePath = sec.Key("ISSUE_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/issues.bleve")) + if !filepath.IsAbs(Indexer.IssuePath) { + Indexer.IssuePath = path.Join(AppWorkPath, Indexer.IssuePath) + } + Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false) + Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/repos.bleve")) + if !filepath.IsAbs(Indexer.RepoPath) { + Indexer.RepoPath = path.Join(AppWorkPath, Indexer.RepoPath) + } + Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20) + Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024) + Indexer.IssueIndexerQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType) + Indexer.IssueIndexerQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue")) + Indexer.IssueIndexerQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20) +} diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 5f65570540..4c016f3489 100644 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -179,15 +179,6 @@ var ( DBConnectRetries int DBConnectBackoff time.Duration - // Indexer settings - Indexer struct { - IssuePath string - RepoIndexerEnabled bool - RepoPath string - UpdateQueueLength int - MaxIndexerFileSize int64 - } - // Repository settings Repository = struct { AnsiCharset string @@ -1214,6 +1205,7 @@ func NewContext() { IsInputFile: sec.Key("IS_INPUT_FILE").MustBool(false), }) } + sec = Cfg.Section("U2F") U2F.TrustedFacets, _ = shellquote.Split(sec.Key("TRUSTED_FACETS").MustString(strings.TrimRight(AppURL, "/"))) U2F.AppID = sec.Key("APP_ID").MustString(strings.TrimRight(AppURL, "/")) @@ -1240,4 +1232,5 @@ func NewServices() { newRegisterMailService() newNotifyMailService() newWebhookService() + newIndexerService() } diff --git a/routers/api/v1/repo/issue.go b/routers/api/v1/repo/issue.go index d339d8f0b7..b13af33548 100644 --- a/routers/api/v1/repo/issue.go +++ b/routers/api/v1/repo/issue.go @@ -13,7 +13,6 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/context" - "code.gitea.io/gitea/modules/indexer" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" @@ -78,7 +77,7 @@ func ListIssues(ctx *context.APIContext) { var labelIDs []int64 var err error if len(keyword) > 0 { - issueIDs, err = indexer.SearchIssuesByKeyword(ctx.Repo.Repository.ID, keyword) + issueIDs, err = models.SearchIssuesByKeyword(ctx.Repo.Repository.ID, keyword) } if splitted := strings.Split(ctx.Query("labels"), ","); len(splitted) > 0 { diff --git a/routers/init.go b/routers/init.go index 4da786cc00..1da21a351b 100644 --- a/routers/init.go +++ b/routers/init.go @@ -90,7 +90,9 @@ func GlobalInit() { // Booting long running goroutines. cron.NewContext() - models.InitIssueIndexer() + if err := models.InitIssueIndexer(); err != nil { + log.Fatal(4, "Failed to initialize issue indexer: %v", err) + } models.InitRepoIndexer() models.InitSyncMirrors() models.InitDeliverHooks() diff --git a/routers/repo/issue.go b/routers/repo/issue.go index 6783d279b5..1843e00144 100644 --- a/routers/repo/issue.go +++ b/routers/repo/issue.go @@ -23,7 +23,6 @@ import ( "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/context" - "code.gitea.io/gitea/modules/indexer" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/markup/markdown" "code.gitea.io/gitea/modules/notification" @@ -147,7 +146,11 @@ func issues(ctx *context.Context, milestoneID int64, isPullOption util.OptionalB var issueIDs []int64 if len(keyword) > 0 { - issueIDs, err = indexer.SearchIssuesByKeyword(repo.ID, keyword) + issueIDs, err = models.SearchIssuesByKeyword(repo.ID, keyword) + if err != nil { + ctx.ServerError("issueIndexer.Search", err) + return + } if len(issueIDs) == 0 { forceEmpty = true } diff --git a/vendor/github.com/lunny/levelqueue/LICENSE b/vendor/github.com/lunny/levelqueue/LICENSE new file mode 100644 index 0000000000..4a5a4ea0ff --- /dev/null +++ b/vendor/github.com/lunny/levelqueue/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2019 Lunny Xiao + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/lunny/levelqueue/error.go b/vendor/github.com/lunny/levelqueue/error.go new file mode 100644 index 0000000000..d639c5d496 --- /dev/null +++ b/vendor/github.com/lunny/levelqueue/error.go @@ -0,0 +1,12 @@ +// Copyright 2019 Lunny Xiao. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import "errors" + +var ( + // ErrNotFound means no element in queue + ErrNotFound = errors.New("no key found") +) diff --git a/vendor/github.com/lunny/levelqueue/queue.go b/vendor/github.com/lunny/levelqueue/queue.go new file mode 100644 index 0000000000..0b2bef6c84 --- /dev/null +++ b/vendor/github.com/lunny/levelqueue/queue.go @@ -0,0 +1,214 @@ +// Copyright 2019 Lunny Xiao. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import ( + "bytes" + "encoding/binary" + "sync" + + "github.com/syndtr/goleveldb/leveldb" +) + +// Queue defines a queue struct +type Queue struct { + db *leveldb.DB + highLock sync.Mutex + lowLock sync.Mutex + low int64 + high int64 +} + +// Open opens a queue object or create it if not exist +func Open(dataDir string) (*Queue, error) { + db, err := leveldb.OpenFile(dataDir, nil) + if err != nil { + return nil, err + } + + var queue = &Queue{ + db: db, + } + queue.low, err = queue.readID(lowKey) + if err == leveldb.ErrNotFound { + queue.low = 1 + err = db.Put(lowKey, id2bytes(1), nil) + } + if err != nil { + return nil, err + } + + queue.high, err = queue.readID(highKey) + if err == leveldb.ErrNotFound { + err = db.Put(highKey, id2bytes(0), nil) + } + if err != nil { + return nil, err + } + + return queue, nil +} + +func (queue *Queue) readID(key []byte) (int64, error) { + bs, err := queue.db.Get(key, nil) + if err != nil { + return 0, err + } + return bytes2id(bs) +} + +var ( + lowKey = []byte("low") + highKey = []byte("high") +) + +func (queue *Queue) highincrement() (int64, error) { + id := queue.high + 1 + queue.high = id + err := queue.db.Put(highKey, id2bytes(queue.high), nil) + if err != nil { + queue.high = queue.high - 1 + return 0, err + } + return id, nil +} + +func (queue *Queue) highdecrement() (int64, error) { + queue.high = queue.high - 1 + err := queue.db.Put(highKey, id2bytes(queue.high), nil) + if err != nil { + queue.high = queue.high + 1 + return 0, err + } + return queue.high, nil +} + +func (queue *Queue) lowincrement() (int64, error) { + queue.low = queue.low + 1 + err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + if err != nil { + queue.low = queue.low - 1 + return 0, err + } + return queue.low, nil +} + +func (queue *Queue) lowdecrement() (int64, error) { + queue.low = queue.low - 1 + err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + if err != nil { + queue.low = queue.low + 1 + return 0, err + } + return queue.low, nil +} + +// Len returns the length of the queue +func (queue *Queue) Len() int64 { + queue.lowLock.Lock() + queue.highLock.Lock() + l := queue.high - queue.low + 1 + queue.highLock.Unlock() + queue.lowLock.Unlock() + return l +} + +func id2bytes(id int64) []byte { + var buf = make([]byte, 8) + binary.PutVarint(buf, id) + return buf +} + +func bytes2id(b []byte) (int64, error) { + return binary.ReadVarint(bytes.NewReader(b)) +} + +// RPush pushes a data from right of queue +func (queue *Queue) RPush(data []byte) error { + queue.highLock.Lock() + id, err := queue.highincrement() + if err != nil { + queue.highLock.Unlock() + return err + } + err = queue.db.Put(id2bytes(id), data, nil) + queue.highLock.Unlock() + return err +} + +// LPush pushes a data from left of queue +func (queue *Queue) LPush(data []byte) error { + queue.highLock.Lock() + id, err := queue.lowdecrement() + if err != nil { + queue.highLock.Unlock() + return err + } + err = queue.db.Put(id2bytes(id), data, nil) + queue.highLock.Unlock() + return err +} + +// RPop pop a data from right of queue +func (queue *Queue) RPop() ([]byte, error) { + queue.highLock.Lock() + currentID := queue.high + + res, err := queue.db.Get(id2bytes(currentID), nil) + if err != nil { + queue.highLock.Unlock() + if err == leveldb.ErrNotFound { + return nil, ErrNotFound + } + return nil, err + } + + _, err = queue.highdecrement() + if err != nil { + queue.highLock.Unlock() + return nil, err + } + + err = queue.db.Delete(id2bytes(currentID), nil) + queue.highLock.Unlock() + if err != nil { + return nil, err + } + return res, nil +} + +// LPop pop a data from left of queue +func (queue *Queue) LPop() ([]byte, error) { + queue.lowLock.Lock() + currentID := queue.low + + res, err := queue.db.Get(id2bytes(currentID), nil) + if err != nil { + queue.lowLock.Unlock() + if err == leveldb.ErrNotFound { + return nil, ErrNotFound + } + return nil, err + } + + _, err = queue.lowincrement() + if err != nil { + return nil, err + } + + err = queue.db.Delete(id2bytes(currentID), nil) + queue.lowLock.Unlock() + if err != nil { + return nil, err + } + return res, nil +} + +// Close closes the queue +func (queue *Queue) Close() error { + err := queue.db.Close() + queue.db = nil + return err +}