mirror of
https://github.com/go-gitea/gitea
synced 2024-11-09 19:54:25 +00:00
fb7b743bd0
This PR implemented object storages(LFS/Packages/Attachments and etc.) for Azure Blob Storage. It depends on azure official golang SDK and can support both the azure blob storage cloud service and azurite mock server. Replace #25458 Fix #22527 - [x] CI Tests - [x] integration test, MSSQL integration tests will now based on azureblob - [x] unit test - [x] CLI Migrate Storage - [x] Documentation for configuration added ------ TODO (other PRs): - [ ] Improve performance of `blob download`. --------- Co-authored-by: yp05327 <576951401@qq.com>
323 lines
8.0 KiB
Go
323 lines
8.0 KiB
Go
// Copyright 2023 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/setting"
|
|
"code.gitea.io/gitea/modules/util"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
|
|
)
|
|
|
|
var _ Object = &azureBlobObject{}
|
|
|
|
type azureBlobObject struct {
|
|
blobClient *blob.Client
|
|
Context context.Context
|
|
Name string
|
|
Size int64
|
|
ModTime *time.Time
|
|
offset int64
|
|
}
|
|
|
|
func (a *azureBlobObject) Read(p []byte) (int, error) {
|
|
// TODO: improve the performance, we can implement another interface, maybe implement io.WriteTo
|
|
if a.offset >= a.Size {
|
|
return 0, io.EOF
|
|
}
|
|
count := min(int64(len(p)), a.Size-a.offset)
|
|
|
|
res, err := a.blobClient.DownloadBuffer(a.Context, p, &blob.DownloadBufferOptions{
|
|
Range: blob.HTTPRange{
|
|
Offset: a.offset,
|
|
Count: count,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return 0, convertAzureBlobErr(err)
|
|
}
|
|
a.offset += res
|
|
|
|
return int(res), nil
|
|
}
|
|
|
|
func (a *azureBlobObject) Close() error {
|
|
a.offset = 0
|
|
return nil
|
|
}
|
|
|
|
func (a *azureBlobObject) Seek(offset int64, whence int) (int64, error) {
|
|
switch whence {
|
|
case io.SeekStart:
|
|
case io.SeekCurrent:
|
|
offset += a.offset
|
|
case io.SeekEnd:
|
|
offset = a.Size - offset
|
|
default:
|
|
return 0, errors.New("Seek: invalid whence")
|
|
}
|
|
|
|
if offset > a.Size {
|
|
return 0, errors.New("Seek: invalid offset")
|
|
} else if offset < 0 {
|
|
return 0, errors.New("Seek: invalid offset")
|
|
}
|
|
a.offset = offset
|
|
return a.offset, nil
|
|
}
|
|
|
|
func (a *azureBlobObject) Stat() (os.FileInfo, error) {
|
|
return &azureBlobFileInfo{
|
|
a.Name,
|
|
a.Size,
|
|
*a.ModTime,
|
|
}, nil
|
|
}
|
|
|
|
var _ ObjectStorage = &AzureBlobStorage{}
|
|
|
|
// AzureStorage returns a azure blob storage
|
|
type AzureBlobStorage struct {
|
|
cfg *setting.AzureBlobStorageConfig
|
|
ctx context.Context
|
|
credential *azblob.SharedKeyCredential
|
|
client *azblob.Client
|
|
}
|
|
|
|
func convertAzureBlobErr(err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if bloberror.HasCode(err, bloberror.BlobNotFound) {
|
|
return os.ErrNotExist
|
|
}
|
|
var respErr *azcore.ResponseError
|
|
if !errors.As(err, &respErr) {
|
|
return err
|
|
}
|
|
return fmt.Errorf(respErr.ErrorCode)
|
|
}
|
|
|
|
// NewAzureBlobStorage returns a azure blob storage
|
|
func NewAzureBlobStorage(ctx context.Context, cfg *setting.Storage) (ObjectStorage, error) {
|
|
config := cfg.AzureBlobConfig
|
|
|
|
log.Info("Creating Azure Blob storage at %s:%s with base path %s", config.Endpoint, config.Container, config.BasePath)
|
|
|
|
cred, err := azblob.NewSharedKeyCredential(config.AccountName, config.AccountKey)
|
|
if err != nil {
|
|
return nil, convertAzureBlobErr(err)
|
|
}
|
|
client, err := azblob.NewClientWithSharedKeyCredential(config.Endpoint, cred, &azblob.ClientOptions{})
|
|
if err != nil {
|
|
return nil, convertAzureBlobErr(err)
|
|
}
|
|
|
|
_, err = client.CreateContainer(ctx, config.Container, &container.CreateOptions{})
|
|
if err != nil {
|
|
// Check to see if we already own this container (which happens if you run this twice)
|
|
if !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
|
|
return nil, convertMinioErr(err)
|
|
}
|
|
}
|
|
|
|
return &AzureBlobStorage{
|
|
cfg: &config,
|
|
ctx: ctx,
|
|
credential: cred,
|
|
client: client,
|
|
}, nil
|
|
}
|
|
|
|
func (a *AzureBlobStorage) buildAzureBlobPath(p string) string {
|
|
p = util.PathJoinRelX(a.cfg.BasePath, p)
|
|
if p == "." || p == "/" {
|
|
p = "" // azure uses prefix, so path should be empty as relative path
|
|
}
|
|
return p
|
|
}
|
|
|
|
func (a *AzureBlobStorage) getObjectNameFromPath(path string) string {
|
|
s := strings.Split(path, "/")
|
|
return s[len(s)-1]
|
|
}
|
|
|
|
// Open opens a file
|
|
func (a *AzureBlobStorage) Open(path string) (Object, error) {
|
|
blobClient, err := a.getBlobClient(path)
|
|
if err != nil {
|
|
return nil, convertAzureBlobErr(err)
|
|
}
|
|
res, err := blobClient.GetProperties(a.ctx, &blob.GetPropertiesOptions{})
|
|
if err != nil {
|
|
return nil, convertAzureBlobErr(err)
|
|
}
|
|
return &azureBlobObject{
|
|
Context: a.ctx,
|
|
blobClient: blobClient,
|
|
Name: a.getObjectNameFromPath(path),
|
|
Size: *res.ContentLength,
|
|
ModTime: res.LastModified,
|
|
}, nil
|
|
}
|
|
|
|
// Save saves a file to azure blob storage
|
|
func (a *AzureBlobStorage) Save(path string, r io.Reader, size int64) (int64, error) {
|
|
rd := util.NewCountingReader(r)
|
|
_, err := a.client.UploadStream(
|
|
a.ctx,
|
|
a.cfg.Container,
|
|
a.buildAzureBlobPath(path),
|
|
rd,
|
|
// TODO: support set block size and concurrency
|
|
&blockblob.UploadStreamOptions{},
|
|
)
|
|
if err != nil {
|
|
return 0, convertAzureBlobErr(err)
|
|
}
|
|
return int64(rd.Count()), nil
|
|
}
|
|
|
|
type azureBlobFileInfo struct {
|
|
name string
|
|
size int64
|
|
modTime time.Time
|
|
}
|
|
|
|
func (a azureBlobFileInfo) Name() string {
|
|
return path.Base(a.name)
|
|
}
|
|
|
|
func (a azureBlobFileInfo) Size() int64 {
|
|
return a.size
|
|
}
|
|
|
|
func (a azureBlobFileInfo) ModTime() time.Time {
|
|
return a.modTime
|
|
}
|
|
|
|
func (a azureBlobFileInfo) IsDir() bool {
|
|
return strings.HasSuffix(a.name, "/")
|
|
}
|
|
|
|
func (a azureBlobFileInfo) Mode() os.FileMode {
|
|
return os.ModePerm
|
|
}
|
|
|
|
func (a azureBlobFileInfo) Sys() any {
|
|
return nil
|
|
}
|
|
|
|
// Stat returns the stat information of the object
|
|
func (a *AzureBlobStorage) Stat(path string) (os.FileInfo, error) {
|
|
blobClient, err := a.getBlobClient(path)
|
|
if err != nil {
|
|
return nil, convertAzureBlobErr(err)
|
|
}
|
|
res, err := blobClient.GetProperties(a.ctx, &blob.GetPropertiesOptions{})
|
|
if err != nil {
|
|
return nil, convertAzureBlobErr(err)
|
|
}
|
|
s := strings.Split(path, "/")
|
|
return &azureBlobFileInfo{
|
|
s[len(s)-1],
|
|
*res.ContentLength,
|
|
*res.LastModified,
|
|
}, nil
|
|
}
|
|
|
|
// Delete delete a file
|
|
func (a *AzureBlobStorage) Delete(path string) error {
|
|
blobClient, err := a.getBlobClient(path)
|
|
if err != nil {
|
|
return convertAzureBlobErr(err)
|
|
}
|
|
_, err = blobClient.Delete(a.ctx, nil)
|
|
return convertAzureBlobErr(err)
|
|
}
|
|
|
|
// URL gets the redirect URL to a file. The presigned link is valid for 5 minutes.
|
|
func (a *AzureBlobStorage) URL(path, name string) (*url.URL, error) {
|
|
blobClient, err := a.getBlobClient(path)
|
|
if err != nil {
|
|
return nil, convertAzureBlobErr(err)
|
|
}
|
|
|
|
startTime := time.Now()
|
|
u, err := blobClient.GetSASURL(sas.BlobPermissions{
|
|
Read: true,
|
|
}, time.Now().Add(5*time.Minute), &blob.GetSASURLOptions{
|
|
StartTime: &startTime,
|
|
})
|
|
if err != nil {
|
|
return nil, convertAzureBlobErr(err)
|
|
}
|
|
|
|
return url.Parse(u)
|
|
}
|
|
|
|
// IterateObjects iterates across the objects in the azureblobstorage
|
|
func (a *AzureBlobStorage) IterateObjects(dirName string, fn func(path string, obj Object) error) error {
|
|
dirName = a.buildAzureBlobPath(dirName)
|
|
if dirName != "" {
|
|
dirName += "/"
|
|
}
|
|
pager := a.client.NewListBlobsFlatPager(a.cfg.Container, &container.ListBlobsFlatOptions{
|
|
Prefix: &dirName,
|
|
})
|
|
for pager.More() {
|
|
resp, err := pager.NextPage(a.ctx)
|
|
if err != nil {
|
|
return convertAzureBlobErr(err)
|
|
}
|
|
for _, object := range resp.Segment.BlobItems {
|
|
blobClient, err := a.getBlobClient(*object.Name)
|
|
if err != nil {
|
|
return convertAzureBlobErr(err)
|
|
}
|
|
object := &azureBlobObject{
|
|
Context: a.ctx,
|
|
blobClient: blobClient,
|
|
Name: *object.Name,
|
|
Size: *object.Properties.ContentLength,
|
|
ModTime: object.Properties.LastModified,
|
|
}
|
|
if err := func(object *azureBlobObject, fn func(path string, obj Object) error) error {
|
|
defer object.Close()
|
|
return fn(strings.TrimPrefix(object.Name, a.cfg.BasePath), object)
|
|
}(object, fn); err != nil {
|
|
return convertAzureBlobErr(err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Delete delete a file
|
|
func (a *AzureBlobStorage) getBlobClient(path string) (*blob.Client, error) {
|
|
return a.client.ServiceClient().NewContainerClient(a.cfg.Container).NewBlobClient(a.buildAzureBlobPath(path)), nil
|
|
}
|
|
|
|
func init() {
|
|
RegisterStorageType(setting.AzureBlobStorageType, NewAzureBlobStorage)
|
|
}
|