mirror of
				https://github.com/go-gitea/gitea
				synced 2025-11-03 21:08:25 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			272 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
			
		
		
	
	
			272 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
 | 
						|
// All rights reserved.
 | 
						|
//
 | 
						|
// Use of this source code is governed by a BSD-style license that can be
 | 
						|
// found in the LICENSE file.
 | 
						|
 | 
						|
package leveldb
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"sync/atomic"
 | 
						|
 | 
						|
	"github.com/syndtr/goleveldb/leveldb/journal"
 | 
						|
	"github.com/syndtr/goleveldb/leveldb/storage"
 | 
						|
)
 | 
						|
 | 
						|
// Logging.
 | 
						|
 | 
						|
type dropper struct {
 | 
						|
	s  *session
 | 
						|
	fd storage.FileDesc
 | 
						|
}
 | 
						|
 | 
						|
func (d dropper) Drop(err error) {
 | 
						|
	if e, ok := err.(*journal.ErrCorrupted); ok {
 | 
						|
		d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason)
 | 
						|
	} else {
 | 
						|
		d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *session) log(v ...interface{})                 { s.stor.Log(fmt.Sprint(v...)) }
 | 
						|
func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
 | 
						|
 | 
						|
// File utils.
 | 
						|
 | 
						|
func (s *session) newTemp() storage.FileDesc {
 | 
						|
	num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
 | 
						|
	return storage.FileDesc{Type: storage.TypeTemp, Num: num}
 | 
						|
}
 | 
						|
 | 
						|
func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
 | 
						|
	ref += s.fileRef[fd.Num]
 | 
						|
	if ref > 0 {
 | 
						|
		s.fileRef[fd.Num] = ref
 | 
						|
	} else if ref == 0 {
 | 
						|
		delete(s.fileRef, fd.Num)
 | 
						|
	} else {
 | 
						|
		panic(fmt.Sprintf("negative ref: %v", fd))
 | 
						|
	}
 | 
						|
	return ref
 | 
						|
}
 | 
						|
 | 
						|
// Session state.
 | 
						|
 | 
						|
// Get current version. This will incr version ref, must call
 | 
						|
// version.release (exactly once) after use.
 | 
						|
func (s *session) version() *version {
 | 
						|
	s.vmu.Lock()
 | 
						|
	defer s.vmu.Unlock()
 | 
						|
	s.stVersion.incref()
 | 
						|
	return s.stVersion
 | 
						|
}
 | 
						|
 | 
						|
func (s *session) tLen(level int) int {
 | 
						|
	s.vmu.Lock()
 | 
						|
	defer s.vmu.Unlock()
 | 
						|
	return s.stVersion.tLen(level)
 | 
						|
}
 | 
						|
 | 
						|
// Set current version to v.
 | 
						|
func (s *session) setVersion(v *version) {
 | 
						|
	s.vmu.Lock()
 | 
						|
	defer s.vmu.Unlock()
 | 
						|
	// Hold by session. It is important to call this first before releasing
 | 
						|
	// current version, otherwise the still used files might get released.
 | 
						|
	v.incref()
 | 
						|
	if s.stVersion != nil {
 | 
						|
		// Release current version.
 | 
						|
		s.stVersion.releaseNB()
 | 
						|
	}
 | 
						|
	s.stVersion = v
 | 
						|
}
 | 
						|
 | 
						|
// Get current unused file number.
 | 
						|
func (s *session) nextFileNum() int64 {
 | 
						|
	return atomic.LoadInt64(&s.stNextFileNum)
 | 
						|
}
 | 
						|
 | 
						|
// Set current unused file number to num.
 | 
						|
func (s *session) setNextFileNum(num int64) {
 | 
						|
	atomic.StoreInt64(&s.stNextFileNum, num)
 | 
						|
}
 | 
						|
 | 
						|
// Mark file number as used.
 | 
						|
func (s *session) markFileNum(num int64) {
 | 
						|
	nextFileNum := num + 1
 | 
						|
	for {
 | 
						|
		old, x := s.stNextFileNum, nextFileNum
 | 
						|
		if old > x {
 | 
						|
			x = old
 | 
						|
		}
 | 
						|
		if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Allocate a file number.
 | 
						|
func (s *session) allocFileNum() int64 {
 | 
						|
	return atomic.AddInt64(&s.stNextFileNum, 1) - 1
 | 
						|
}
 | 
						|
 | 
						|
// Reuse given file number.
 | 
						|
func (s *session) reuseFileNum(num int64) {
 | 
						|
	for {
 | 
						|
		old, x := s.stNextFileNum, num
 | 
						|
		if old != x+1 {
 | 
						|
			x = old
 | 
						|
		}
 | 
						|
		if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Set compaction ptr at given level; need external synchronization.
 | 
						|
func (s *session) setCompPtr(level int, ik internalKey) {
 | 
						|
	if level >= len(s.stCompPtrs) {
 | 
						|
		newCompPtrs := make([]internalKey, level+1)
 | 
						|
		copy(newCompPtrs, s.stCompPtrs)
 | 
						|
		s.stCompPtrs = newCompPtrs
 | 
						|
	}
 | 
						|
	s.stCompPtrs[level] = append(internalKey{}, ik...)
 | 
						|
}
 | 
						|
 | 
						|
// Get compaction ptr at given level; need external synchronization.
 | 
						|
func (s *session) getCompPtr(level int) internalKey {
 | 
						|
	if level >= len(s.stCompPtrs) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	return s.stCompPtrs[level]
 | 
						|
}
 | 
						|
 | 
						|
// Manifest related utils.
 | 
						|
 | 
						|
// Fill given session record obj with current states; need external
 | 
						|
// synchronization.
 | 
						|
func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
 | 
						|
	r.setNextFileNum(s.nextFileNum())
 | 
						|
 | 
						|
	if snapshot {
 | 
						|
		if !r.has(recJournalNum) {
 | 
						|
			r.setJournalNum(s.stJournalNum)
 | 
						|
		}
 | 
						|
 | 
						|
		if !r.has(recSeqNum) {
 | 
						|
			r.setSeqNum(s.stSeqNum)
 | 
						|
		}
 | 
						|
 | 
						|
		for level, ik := range s.stCompPtrs {
 | 
						|
			if ik != nil {
 | 
						|
				r.addCompPtr(level, ik)
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		r.setComparer(s.icmp.uName())
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Mark if record has been committed, this will update session state;
 | 
						|
// need external synchronization.
 | 
						|
func (s *session) recordCommited(rec *sessionRecord) {
 | 
						|
	if rec.has(recJournalNum) {
 | 
						|
		s.stJournalNum = rec.journalNum
 | 
						|
	}
 | 
						|
 | 
						|
	if rec.has(recPrevJournalNum) {
 | 
						|
		s.stPrevJournalNum = rec.prevJournalNum
 | 
						|
	}
 | 
						|
 | 
						|
	if rec.has(recSeqNum) {
 | 
						|
		s.stSeqNum = rec.seqNum
 | 
						|
	}
 | 
						|
 | 
						|
	for _, r := range rec.compPtrs {
 | 
						|
		s.setCompPtr(r.level, internalKey(r.ikey))
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Create a new manifest file; need external synchronization.
 | 
						|
func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
 | 
						|
	fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()}
 | 
						|
	writer, err := s.stor.Create(fd)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	jw := journal.NewWriter(writer)
 | 
						|
 | 
						|
	if v == nil {
 | 
						|
		v = s.version()
 | 
						|
		defer v.release()
 | 
						|
	}
 | 
						|
	if rec == nil {
 | 
						|
		rec = &sessionRecord{}
 | 
						|
	}
 | 
						|
	s.fillRecord(rec, true)
 | 
						|
	v.fillRecord(rec)
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		if err == nil {
 | 
						|
			s.recordCommited(rec)
 | 
						|
			if s.manifest != nil {
 | 
						|
				s.manifest.Close()
 | 
						|
			}
 | 
						|
			if s.manifestWriter != nil {
 | 
						|
				s.manifestWriter.Close()
 | 
						|
			}
 | 
						|
			if !s.manifestFd.Zero() {
 | 
						|
				s.stor.Remove(s.manifestFd)
 | 
						|
			}
 | 
						|
			s.manifestFd = fd
 | 
						|
			s.manifestWriter = writer
 | 
						|
			s.manifest = jw
 | 
						|
		} else {
 | 
						|
			writer.Close()
 | 
						|
			s.stor.Remove(fd)
 | 
						|
			s.reuseFileNum(fd.Num)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	w, err := jw.Next()
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	err = rec.encode(w)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	err = jw.Flush()
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	err = s.stor.SetMeta(fd)
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// Flush record to disk.
 | 
						|
func (s *session) flushManifest(rec *sessionRecord) (err error) {
 | 
						|
	s.fillRecord(rec, false)
 | 
						|
	w, err := s.manifest.Next()
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	err = rec.encode(w)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	err = s.manifest.Flush()
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if !s.o.GetNoSync() {
 | 
						|
		err = s.manifestWriter.Sync()
 | 
						|
		if err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
	s.recordCommited(rec)
 | 
						|
	return
 | 
						|
}
 |