From 92d15afd188c12cffb7b697bf16dcefb354434d6 Mon Sep 17 00:00:00 2001 From: wxiaoguang Date: Sun, 7 Aug 2022 21:26:31 +0800 Subject: [PATCH] database-filesystem, demo log tailing in term --- models/dbfs/dbfile.go | 362 ++++++++++++++++++++++++++++++++++++ models/dbfs/dbfs.go | 74 ++++++++ models/dbfs/dbfs_test.go | 154 +++++++++++++++ models/dbfs/main_test.go | 28 +++ modules/bots/bots.go | 2 +- routers/web/dev/termdemo.go | 51 +++++ routers/web/web.go | 5 + templates/dev/termdemo.tmpl | 27 +++ 8 files changed, 702 insertions(+), 1 deletion(-) create mode 100644 models/dbfs/dbfile.go create mode 100644 models/dbfs/dbfs.go create mode 100644 models/dbfs/dbfs_test.go create mode 100644 models/dbfs/main_test.go create mode 100644 routers/web/dev/termdemo.go create mode 100644 templates/dev/termdemo.tmpl diff --git a/models/dbfs/dbfile.go b/models/dbfs/dbfile.go new file mode 100644 index 0000000000..ebaa4db80c --- /dev/null +++ b/models/dbfs/dbfile.go @@ -0,0 +1,362 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package dbfs + +import ( + "context" + "errors" + "io" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "code.gitea.io/gitea/models/db" +) + +var defaultFileBlockSize int64 = 32 * 1024 + +type File interface { + io.ReadWriteCloser + io.Seeker +} + +type file struct { + ctx context.Context + metaID int64 + fullPath string + blockSize int64 + + allowRead bool + allowWrite bool + offset int64 +} + +var _ File = (*file)(nil) + +func (f *file) readAt(fileMeta *FileMeta, offset int64, p []byte) (n int, err error) { + if f.offset >= fileMeta.FileSize { + return 0, io.EOF + } + + blobPos := int(offset % f.blockSize) + blobOffset := offset - int64(blobPos) + blobRemaining := int(f.blockSize) - blobPos + needRead := len(p) + if needRead > blobRemaining { + needRead = blobRemaining + } + if blobOffset+int64(blobPos)+int64(needRead) > fileMeta.FileSize { + needRead = int(fileMeta.FileSize - blobOffset - int64(blobPos)) + } + if needRead <= 0 { + return 0, io.EOF + } + var fileData FileData + ok, err := db.GetEngine(f.ctx).Where("meta_id = ? AND blob_offset = ?", f.metaID, blobOffset).Get(&fileData) + if err != nil { + return 0, err + } + blobData := fileData.BlobData + if !ok { + blobData = nil + } + + canCopy := len(blobData) - blobPos + if canCopy <= 0 { + canCopy = 0 + } + realRead := needRead + if realRead > canCopy { + realRead = canCopy + } + if realRead > 0 { + copy(p[:realRead], fileData.BlobData[blobPos:blobPos+realRead]) + } + for i := realRead; i < needRead; i++ { + p[i] = 0 + } + return needRead, nil +} + +func (f *file) Read(p []byte) (n int, err error) { + if f.metaID == 0 || !f.allowRead { + return 0, os.ErrInvalid + } + + fileMeta, err := findFileMetaById(f.ctx, f.metaID) + if err != nil { + return 0, err + } + n, err = f.readAt(fileMeta, f.offset, p) + f.offset += int64(n) + return n, err +} + +func (f *file) Write(p []byte) (n int, err error) { + if f.metaID == 0 || !f.allowWrite { + return 0, os.ErrInvalid + } + + fileMeta, err := findFileMetaById(f.ctx, f.metaID) + if err != nil { + return 0, err + } + + needUpdateSize := false + written := 0 + for len(p) > 0 { + blobPos := int(f.offset % f.blockSize) + blobOffset := f.offset - int64(blobPos) + blobRemaining := int(f.blockSize) - blobPos + needWrite := len(p) + if needWrite > blobRemaining { + needWrite = blobRemaining + } + buf := make([]byte, f.blockSize) + readBytes, err := f.readAt(fileMeta, blobOffset, buf) + if err != nil && !errors.Is(err, io.EOF) { + return written, err + } + copy(buf[blobPos:blobPos+needWrite], p[:needWrite]) + if blobPos+needWrite > readBytes { + buf = buf[:blobPos+needWrite] + } else { + buf = buf[:readBytes] + } + + fileData := FileData{ + MetaID: fileMeta.ID, + BlobOffset: blobOffset, + BlobData: buf, + } + if res, err := db.GetEngine(f.ctx).Exec("UPDATE file_data SET revision=revision+1, blob_data=? WHERE meta_id=? AND blob_offset=?", buf, fileMeta.ID, blobOffset); err != nil { + return written, err + } else if updated, err := res.RowsAffected(); err != nil { + return written, err + } else if updated == 0 { + if _, err = db.GetEngine(f.ctx).Insert(&fileData); err != nil { + return written, err + } + } + written += needWrite + f.offset += int64(needWrite) + if f.offset > fileMeta.FileSize { + fileMeta.FileSize = f.offset + needUpdateSize = true + } + p = p[needWrite:] + } + + fileMetaUpdate := FileMeta{ + ModifyTimestamp: timeToFileTimestamp(time.Now()), + } + if needUpdateSize { + fileMetaUpdate.FileSize = f.offset + } + if _, err := db.GetEngine(f.ctx).ID(fileMeta.ID).Update(fileMetaUpdate); err != nil { + return written, err + } + return written, nil +} + +func (f *file) Seek(n int64, whence int) (int64, error) { + if f.metaID == 0 { + return 0, os.ErrInvalid + } + + newOffset := f.offset + switch whence { + case io.SeekStart: + newOffset = n + case io.SeekCurrent: + newOffset += n + case io.SeekEnd: + size, err := f.size() + if err != nil { + return f.offset, err + } + newOffset = size + n + default: + return f.offset, os.ErrInvalid + } + if newOffset < 0 { + return f.offset, os.ErrInvalid + } + f.offset = newOffset + return newOffset, nil +} + +func (f *file) Close() error { + return nil +} + +func timeToFileTimestamp(t time.Time) int64 { + return t.UnixMicro() +} + +func fileTimestampToTime(t int64) time.Time { + return time.UnixMicro(t) +} + +func (f *file) loadMetaByPath() (*FileMeta, error) { + var fileMeta FileMeta + if ok, err := db.GetEngine(f.ctx).Where("full_path = ?", f.fullPath).Get(&fileMeta); err != nil { + return nil, err + } else if ok { + f.metaID = fileMeta.ID + f.blockSize = fileMeta.BlockSize + return &fileMeta, nil + } + return nil, nil +} + +func (f *file) open(flag int) (err error) { + // see os.OpenFile for flag values + if flag&os.O_WRONLY != 0 { + f.allowWrite = true + } else if flag&os.O_RDWR != 0 { + f.allowRead = true + f.allowWrite = true + } else /* O_RDONLY */ { + f.allowRead = true + } + + if f.allowWrite { + if flag&os.O_CREATE != 0 { + if flag&os.O_EXCL != 0 { + // file must not exist. + if f.metaID != 0 { + return os.ErrExist + } + } else { + // create a new file if none exists. + if f.metaID == 0 { + if err = f.createEmpty(); err != nil { + return err + } + } + } + } + if flag&os.O_TRUNC != 0 { + if err = f.truncate(); err != nil { + return err + } + } + if flag&os.O_APPEND != 0 { + if _, err = f.Seek(0, io.SeekEnd); err != nil { + return err + } + } + return nil + } + + // read only mode + if f.metaID == 0 { + return os.ErrNotExist + } + return nil +} + +func (f *file) createEmpty() error { + if f.metaID != 0 { + return os.ErrExist + } + now := time.Now() + _, err := db.GetEngine(f.ctx).Insert(&FileMeta{ + FullPath: f.fullPath, + BlockSize: f.blockSize, + CreateTimestamp: timeToFileTimestamp(now), + ModifyTimestamp: timeToFileTimestamp(now), + }) + if err != nil { + return err + } + if _, err = f.loadMetaByPath(); err != nil { + return err + } + return nil +} + +func (f *file) truncate() error { + if f.metaID == 0 { + return os.ErrNotExist + } + return db.WithTx(func(ctx context.Context) error { + if _, err := db.GetEngine(ctx).Exec("UPDATE file_meta SET file_size = 0 WHERE id = ?", f.metaID); err != nil { + return err + } + if _, err := db.GetEngine(ctx).Delete(&FileData{MetaID: f.metaID}); err != nil { + return err + } + return nil + }, f.ctx) +} + +func (f *file) renameTo(newPath string) error { + if f.metaID == 0 { + return os.ErrNotExist + } + newPath = buildPath(newPath) + return db.WithTx(func(ctx context.Context) error { + if _, err := db.GetEngine(ctx).Exec("UPDATE file_meta SET full_path = ? WHERE id = ?", newPath, f.metaID); err != nil { + return err + } + return nil + }, f.ctx) +} + +func (f *file) delete() error { + if f.metaID == 0 { + return os.ErrNotExist + } + return db.WithTx(func(ctx context.Context) error { + if _, err := db.GetEngine(ctx).Delete(&FileMeta{ID: f.metaID}); err != nil { + return err + } + if _, err := db.GetEngine(ctx).Delete(&FileData{MetaID: f.metaID}); err != nil { + return err + } + return nil + }, f.ctx) +} + +func (f *file) size() (int64, error) { + if f.metaID == 0 { + return 0, os.ErrNotExist + } + fileMeta, err := findFileMetaById(f.ctx, f.metaID) + if err != nil { + return 0, err + } + return fileMeta.FileSize, nil +} + +func findFileMetaById(ctx context.Context, metaID int64) (*FileMeta, error) { + var fileMeta FileMeta + if ok, err := db.GetEngine(ctx).Where("id = ?", metaID).Get(&fileMeta); err != nil { + return nil, err + } else if ok { + return &fileMeta, nil + } + return nil, nil +} + +func buildPath(path string) string { + path = filepath.Clean(path) + path = strings.ReplaceAll(path, "\\", "/") + path = strings.TrimPrefix(path, "/") + return strconv.Itoa(strings.Count(path, "/")) + ":" + path +} + +func newDbFile(ctx context.Context, path string) (*file, error) { + path = buildPath(path) + f := &file{ctx: ctx, fullPath: path, blockSize: defaultFileBlockSize} + if _, err := f.loadMetaByPath(); err != nil { + return nil, err + } + return f, nil +} diff --git a/models/dbfs/dbfs.go b/models/dbfs/dbfs.go new file mode 100644 index 0000000000..9f231a7e15 --- /dev/null +++ b/models/dbfs/dbfs.go @@ -0,0 +1,74 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package dbfs + +import ( + "context" + "os" + + "code.gitea.io/gitea/models/db" +) + +type FileMeta struct { + ID int64 `xorm:"pk autoincr"` + FullPath string `xorm:"VARCHAR(500) UNIQUE NOT NULL"` + BlockSize int64 `xorm:"BIGINT NOT NULL"` + FileSize int64 `xorm:"BIGINT NOT NULL"` + CreateTimestamp int64 `xorm:"BIGINT NOT NULL"` + ModifyTimestamp int64 `xorm:"BIGINT NOT NULL"` +} + +type FileData struct { + ID int64 `xorm:"pk autoincr"` + Revision int64 `xorm:"BIGINT NOT NULL"` + MetaID int64 `xorm:"BIGINT index(meta_offset) NOT NULL"` + BlobOffset int64 `xorm:"BIGINT index(meta_offset) NOT NULL"` + BlobSize int64 `xorm:"BIGINT NOT NULL"` + BlobData []byte `xorm:"BLOB NOT NULL"` +} + +func init() { + db.RegisterModel(new(FileMeta)) + db.RegisterModel(new(FileData)) +} + +func OpenFile(ctx context.Context, name string, flag int) (File, error) { + f, err := newDbFile(ctx, name) + if err != nil { + return nil, err + } + err = f.open(flag) + if err != nil { + _ = f.Close() + return nil, err + } + return f, nil +} + +func Open(ctx context.Context, name string) (File, error) { + return OpenFile(ctx, name, os.O_RDONLY) +} + +func Create(ctx context.Context, name string) (File, error) { + return OpenFile(ctx, name, os.O_RDWR|os.O_CREATE|os.O_TRUNC) +} + +func Rename(ctx context.Context, oldPath, newPath string) error { + f, err := newDbFile(ctx, oldPath) + if err != nil { + return err + } + defer f.Close() + return f.renameTo(newPath) +} + +func Remove(ctx context.Context, name string) error { + f, err := newDbFile(ctx, name) + if err != nil { + return err + } + defer f.Close() + return f.delete() +} diff --git a/models/dbfs/dbfs_test.go b/models/dbfs/dbfs_test.go new file mode 100644 index 0000000000..2c3e0f8c64 --- /dev/null +++ b/models/dbfs/dbfs_test.go @@ -0,0 +1,154 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package dbfs + +import ( + "bufio" + "io" + "os" + "testing" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/unittest" + + "github.com/stretchr/testify/assert" + + _ "github.com/mattn/go-sqlite3" +) + +func changeDefaultFileBlockSize(n int64) (restore func()) { + old := defaultFileBlockSize + defaultFileBlockSize = n + return func() { + defaultFileBlockSize = old + } +} + +func TestDbfsBasic(t *testing.T) { + defer changeDefaultFileBlockSize(4)() + + assert.NoError(t, unittest.PrepareTestDatabase()) + + // test basic write/read + f, err := OpenFile(db.DefaultContext, "test.txt", os.O_RDWR|os.O_CREATE) + assert.NoError(t, err) + + n, err := f.Write([]byte("0123456789")) // blocks: 0123 4567 89 + assert.NoError(t, err) + assert.EqualValues(t, 10, n) + + _, err = f.Seek(0, io.SeekStart) + assert.NoError(t, err) + + buf, err := io.ReadAll(f) + assert.NoError(t, err) + assert.EqualValues(t, 10, n) + assert.EqualValues(t, "0123456789", string(buf)) + + // write some new data + _, err = f.Seek(1, io.SeekStart) + assert.NoError(t, err) + _, err = f.Write([]byte("bcdefghi")) // blocks: 0bcd efgh i9 + assert.NoError(t, err) + + // read from offset + buf, err = io.ReadAll(f) + assert.NoError(t, err) + assert.EqualValues(t, "9", string(buf)) + + // read all + _, err = f.Seek(0, io.SeekStart) + assert.NoError(t, err) + buf, err = io.ReadAll(f) + assert.NoError(t, err) + assert.EqualValues(t, "0bcdefghi9", string(buf)) + + // write to new size + _, err = f.Seek(-1, io.SeekEnd) + assert.NoError(t, err) + _, err = f.Write([]byte("JKLMNOP")) // blocks: 0bcd efgh iJKL MNOP + assert.NoError(t, err) + _, err = f.Seek(0, io.SeekStart) + assert.NoError(t, err) + buf, err = io.ReadAll(f) + assert.NoError(t, err) + assert.EqualValues(t, "0bcdefghiJKLMNOP", string(buf)) + + // write beyond EOF and fill with zero + _, err = f.Seek(5, io.SeekCurrent) + assert.NoError(t, err) + _, err = f.Write([]byte("xyzu")) // blocks: 0bcd efgh iJKL MNOP 0000 0xyz u + assert.NoError(t, err) + _, err = f.Seek(0, io.SeekStart) + assert.NoError(t, err) + buf, err = io.ReadAll(f) + assert.NoError(t, err) + assert.EqualValues(t, "0bcdefghiJKLMNOP\x00\x00\x00\x00\x00xyzu", string(buf)) + + // write to the block with zeros + _, err = f.Seek(-6, io.SeekCurrent) + assert.NoError(t, err) + _, err = f.Write([]byte("ABCD")) // blocks: 0bcd efgh iJKL MNOP 000A BCDz u + assert.NoError(t, err) + _, err = f.Seek(0, io.SeekStart) + assert.NoError(t, err) + buf, err = io.ReadAll(f) + assert.NoError(t, err) + assert.EqualValues(t, "0bcdefghiJKLMNOP\x00\x00\x00ABCDzu", string(buf)) + + assert.NoError(t, f.Close()) + + // test rename + err = Rename(db.DefaultContext, "test.txt", "test2.txt") + assert.NoError(t, err) + + _, err = OpenFile(db.DefaultContext, "test.txt", os.O_RDONLY) + assert.Error(t, err) + + f, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY) + assert.NoError(t, err) + assert.NoError(t, f.Close()) + + // test remove + err = Remove(db.DefaultContext, "test2.txt") + assert.NoError(t, err) + + _, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY) + assert.Error(t, err) +} + +func TestDbfsReadWrite(t *testing.T) { + defer changeDefaultFileBlockSize(4)() + + assert.NoError(t, unittest.PrepareTestDatabase()) + + f1, err := OpenFile(db.DefaultContext, "test.log", os.O_RDWR|os.O_CREATE) + assert.NoError(t, err) + defer f1.Close() + + f2, err := OpenFile(db.DefaultContext, "test.log", os.O_RDONLY) + assert.NoError(t, err) + defer f2.Close() + + _, err = f1.Write([]byte("line 1\n")) + assert.NoError(t, err) + + f2r := bufio.NewReader(f2) + + line, err := f2r.ReadString('\n') + assert.NoError(t, err) + assert.EqualValues(t, "line 1\n", line) + _, err = f2r.ReadString('\n') + assert.ErrorIs(t, err, io.EOF) + + _, err = f1.Write([]byte("line 2\n")) + assert.NoError(t, err) + + line, err = f2r.ReadString('\n') + assert.NoError(t, err) + assert.EqualValues(t, "line 2\n", line) + _, err = f2r.ReadString('\n') + assert.ErrorIs(t, err, io.EOF) +} diff --git a/models/dbfs/main_test.go b/models/dbfs/main_test.go new file mode 100644 index 0000000000..9d63807f8e --- /dev/null +++ b/models/dbfs/main_test.go @@ -0,0 +1,28 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package dbfs + +import ( + "path/filepath" + "testing" + + "code.gitea.io/gitea/models/unittest" + "code.gitea.io/gitea/modules/setting" + + _ "code.gitea.io/gitea/models" + _ "code.gitea.io/gitea/models/repo" + _ "code.gitea.io/gitea/models/user" +) + +func init() { + setting.SetCustomPathAndConf("", "", "") + setting.LoadForTest() +} + +func TestMain(m *testing.M) { + unittest.MainTest(m, &unittest.TestOptions{ + GiteaRootPath: filepath.Join("..", ".."), + }) +} diff --git a/modules/bots/bots.go b/modules/bots/bots.go index a83732e06e..6880a6e73d 100644 --- a/modules/bots/bots.go +++ b/modules/bots/bots.go @@ -25,7 +25,7 @@ func DetectWorkflows(commit *git.Commit, event webhook.HookEventType) (git.Entri return nil, nil, err } - entries, err := tree.ListEntriesRecursive() + entries, err := tree.ListEntriesRecursiveFast() if err != nil { return nil, nil, err } diff --git a/routers/web/dev/termdemo.go b/routers/web/dev/termdemo.go new file mode 100644 index 0000000000..d2408725e4 --- /dev/null +++ b/routers/web/dev/termdemo.go @@ -0,0 +1,51 @@ +package dev + +import ( + "fmt" + "io" + "net/http" + "os" + "sync" + "time" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/dbfs" + "code.gitea.io/gitea/modules/context" +) + +var demoLogWriterOnce sync.Once + +func TermDemo(ctx *context.Context) { + demoLogWriterOnce.Do(func() { + go func() { + f, _ := dbfs.OpenFile(db.DefaultContext, "termdemo.log", os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND) + count := 0 + for { + count++ + s := fmt.Sprintf("\x1B[1;3;31mDemo Log\x1B[0m, count=%d\r\n", count) + _, _ = f.Write([]byte(s)) + time.Sleep(time.Second) + } + }() + }) + + cmd := ctx.FormString("cmd") + if cmd == "tail" { + offset := ctx.FormInt64("offset") + f, _ := dbfs.OpenFile(db.DefaultContext, "termdemo.log", os.O_RDONLY) + if offset == -1 { + _, _ = f.Seek(0, io.SeekEnd) + } else { + _, _ = f.Seek(offset, io.SeekStart) + } + buf, _ := io.ReadAll(f) + offset, _ = f.Seek(0, io.SeekCurrent) + ctx.JSON(http.StatusOK, map[string]interface{}{ + "offset": offset, + "content": string(buf), + }) + return + } + + ctx.HTML(http.StatusOK, "dev/termdemo") +} diff --git a/routers/web/web.go b/routers/web/web.go index a1f601b9c4..1b2b53e5e2 100644 --- a/routers/web/web.go +++ b/routers/web/web.go @@ -28,6 +28,7 @@ import ( "code.gitea.io/gitea/modules/web/routing" "code.gitea.io/gitea/routers/web/admin" "code.gitea.io/gitea/routers/web/auth" + "code.gitea.io/gitea/routers/web/dev" "code.gitea.io/gitea/routers/web/events" "code.gitea.io/gitea/routers/web/explore" "code.gitea.io/gitea/routers/web/feed" @@ -659,6 +660,10 @@ func RegisterRoutes(m *web.Route) { m.Post("/{username}", reqSignIn, context_service.UserAssignmentWeb(), user.Action) + if !setting.IsProd { + m.Any("/dev/termdemo", dev.TermDemo) + } + reqRepoAdmin := context.RequireRepoAdmin() reqRepoCodeWriter := context.RequireRepoWriter(unit.TypeCode) canEnableEditor := context.CanEnableEditor() diff --git a/templates/dev/termdemo.tmpl b/templates/dev/termdemo.tmpl new file mode 100644 index 0000000000..60b314b4ed --- /dev/null +++ b/templates/dev/termdemo.tmpl @@ -0,0 +1,27 @@ + + + + + + + + +
+ + + + +