mirror of
				https://github.com/go-gitea/gitea
				synced 2025-10-31 03:18:24 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			465 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			465 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
 | |
| // All rights reserved.
 | |
| //
 | |
| // Use of this source code is governed by a BSD-style license that can be
 | |
| // found in the LICENSE file.
 | |
| 
 | |
| package leveldb
 | |
| 
 | |
| import (
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/syndtr/goleveldb/leveldb/memdb"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/opt"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/util"
 | |
| )
 | |
| 
 | |
| func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
 | |
| 	wr, err := db.journal.Next()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := db.journal.Flush(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if sync {
 | |
| 		return db.journalWriter.Sync()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
 | |
| 	retryLimit := 3
 | |
| retry:
 | |
| 	// Wait for pending memdb compaction.
 | |
| 	err = db.compTriggerWait(db.mcompCmdC)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	retryLimit--
 | |
| 
 | |
| 	// Create new memdb and journal.
 | |
| 	mem, err = db.newMem(n)
 | |
| 	if err != nil {
 | |
| 		if err == errHasFrozenMem {
 | |
| 			if retryLimit <= 0 {
 | |
| 				panic("BUG: still has frozen memdb")
 | |
| 			}
 | |
| 			goto retry
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Schedule memdb compaction.
 | |
| 	if wait {
 | |
| 		err = db.compTriggerWait(db.mcompCmdC)
 | |
| 	} else {
 | |
| 		db.compTrigger(db.mcompCmdC)
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
 | |
| 	delayed := false
 | |
| 	slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
 | |
| 	pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
 | |
| 	flush := func() (retry bool) {
 | |
| 		mdb = db.getEffectiveMem()
 | |
| 		if mdb == nil {
 | |
| 			err = ErrClosed
 | |
| 			return false
 | |
| 		}
 | |
| 		defer func() {
 | |
| 			if retry {
 | |
| 				mdb.decref()
 | |
| 				mdb = nil
 | |
| 			}
 | |
| 		}()
 | |
| 		tLen := db.s.tLen(0)
 | |
| 		mdbFree = mdb.Free()
 | |
| 		switch {
 | |
| 		case tLen >= slowdownTrigger && !delayed:
 | |
| 			delayed = true
 | |
| 			time.Sleep(time.Millisecond)
 | |
| 		case mdbFree >= n:
 | |
| 			return false
 | |
| 		case tLen >= pauseTrigger:
 | |
| 			delayed = true
 | |
| 			// Set the write paused flag explicitly.
 | |
| 			atomic.StoreInt32(&db.inWritePaused, 1)
 | |
| 			err = db.compTriggerWait(db.tcompCmdC)
 | |
| 			// Unset the write paused flag.
 | |
| 			atomic.StoreInt32(&db.inWritePaused, 0)
 | |
| 			if err != nil {
 | |
| 				return false
 | |
| 			}
 | |
| 		default:
 | |
| 			// Allow memdb to grow if it has no entry.
 | |
| 			if mdb.Len() == 0 {
 | |
| 				mdbFree = n
 | |
| 			} else {
 | |
| 				mdb.decref()
 | |
| 				mdb, err = db.rotateMem(n, false)
 | |
| 				if err == nil {
 | |
| 					mdbFree = mdb.Free()
 | |
| 				} else {
 | |
| 					mdbFree = 0
 | |
| 				}
 | |
| 			}
 | |
| 			return false
 | |
| 		}
 | |
| 		return true
 | |
| 	}
 | |
| 	start := time.Now()
 | |
| 	for flush() {
 | |
| 	}
 | |
| 	if delayed {
 | |
| 		db.writeDelay += time.Since(start)
 | |
| 		db.writeDelayN++
 | |
| 	} else if db.writeDelayN > 0 {
 | |
| 		db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
 | |
| 		atomic.AddInt32(&db.cWriteDelayN, int32(db.writeDelayN))
 | |
| 		atomic.AddInt64(&db.cWriteDelay, int64(db.writeDelay))
 | |
| 		db.writeDelay = 0
 | |
| 		db.writeDelayN = 0
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| type writeMerge struct {
 | |
| 	sync       bool
 | |
| 	batch      *Batch
 | |
| 	keyType    keyType
 | |
| 	key, value []byte
 | |
| }
 | |
| 
 | |
| func (db *DB) unlockWrite(overflow bool, merged int, err error) {
 | |
| 	for i := 0; i < merged; i++ {
 | |
| 		db.writeAckC <- err
 | |
| 	}
 | |
| 	if overflow {
 | |
| 		// Pass lock to the next write (that failed to merge).
 | |
| 		db.writeMergedC <- false
 | |
| 	} else {
 | |
| 		// Release lock.
 | |
| 		<-db.writeLockC
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // ourBatch is batch that we can modify.
 | |
| func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
 | |
| 	// Try to flush memdb. This method would also trying to throttle writes
 | |
| 	// if it is too fast and compaction cannot catch-up.
 | |
| 	mdb, mdbFree, err := db.flush(batch.internalLen)
 | |
| 	if err != nil {
 | |
| 		db.unlockWrite(false, 0, err)
 | |
| 		return err
 | |
| 	}
 | |
| 	defer mdb.decref()
 | |
| 
 | |
| 	var (
 | |
| 		overflow bool
 | |
| 		merged   int
 | |
| 		batches  = []*Batch{batch}
 | |
| 	)
 | |
| 
 | |
| 	if merge {
 | |
| 		// Merge limit.
 | |
| 		var mergeLimit int
 | |
| 		if batch.internalLen > 128<<10 {
 | |
| 			mergeLimit = (1 << 20) - batch.internalLen
 | |
| 		} else {
 | |
| 			mergeLimit = 128 << 10
 | |
| 		}
 | |
| 		mergeCap := mdbFree - batch.internalLen
 | |
| 		if mergeLimit > mergeCap {
 | |
| 			mergeLimit = mergeCap
 | |
| 		}
 | |
| 
 | |
| 	merge:
 | |
| 		for mergeLimit > 0 {
 | |
| 			select {
 | |
| 			case incoming := <-db.writeMergeC:
 | |
| 				if incoming.batch != nil {
 | |
| 					// Merge batch.
 | |
| 					if incoming.batch.internalLen > mergeLimit {
 | |
| 						overflow = true
 | |
| 						break merge
 | |
| 					}
 | |
| 					batches = append(batches, incoming.batch)
 | |
| 					mergeLimit -= incoming.batch.internalLen
 | |
| 				} else {
 | |
| 					// Merge put.
 | |
| 					internalLen := len(incoming.key) + len(incoming.value) + 8
 | |
| 					if internalLen > mergeLimit {
 | |
| 						overflow = true
 | |
| 						break merge
 | |
| 					}
 | |
| 					if ourBatch == nil {
 | |
| 						ourBatch = db.batchPool.Get().(*Batch)
 | |
| 						ourBatch.Reset()
 | |
| 						batches = append(batches, ourBatch)
 | |
| 					}
 | |
| 					// We can use same batch since concurrent write doesn't
 | |
| 					// guarantee write order.
 | |
| 					ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
 | |
| 					mergeLimit -= internalLen
 | |
| 				}
 | |
| 				sync = sync || incoming.sync
 | |
| 				merged++
 | |
| 				db.writeMergedC <- true
 | |
| 
 | |
| 			default:
 | |
| 				break merge
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Release ourBatch if any.
 | |
| 	if ourBatch != nil {
 | |
| 		defer db.batchPool.Put(ourBatch)
 | |
| 	}
 | |
| 
 | |
| 	// Seq number.
 | |
| 	seq := db.seq + 1
 | |
| 
 | |
| 	// Write journal.
 | |
| 	if err := db.writeJournal(batches, seq, sync); err != nil {
 | |
| 		db.unlockWrite(overflow, merged, err)
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Put batches.
 | |
| 	for _, batch := range batches {
 | |
| 		if err := batch.putMem(seq, mdb.DB); err != nil {
 | |
| 			panic(err)
 | |
| 		}
 | |
| 		seq += uint64(batch.Len())
 | |
| 	}
 | |
| 
 | |
| 	// Incr seq number.
 | |
| 	db.addSeq(uint64(batchesLen(batches)))
 | |
| 
 | |
| 	// Rotate memdb if it's reach the threshold.
 | |
| 	if batch.internalLen >= mdbFree {
 | |
| 		db.rotateMem(0, false)
 | |
| 	}
 | |
| 
 | |
| 	db.unlockWrite(overflow, merged, nil)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Write apply the given batch to the DB. The batch records will be applied
 | |
| // sequentially. Write might be used concurrently, when used concurrently and
 | |
| // batch is small enough, write will try to merge the batches. Set NoWriteMerge
 | |
| // option to true to disable write merge.
 | |
| //
 | |
| // It is safe to modify the contents of the arguments after Write returns but
 | |
| // not before. Write will not modify content of the batch.
 | |
| func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
 | |
| 	if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// If the batch size is larger than write buffer, it may justified to write
 | |
| 	// using transaction instead. Using transaction the batch will be written
 | |
| 	// into tables directly, skipping the journaling.
 | |
| 	if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
 | |
| 		tr, err := db.OpenTransaction()
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if err := tr.Write(batch, wo); err != nil {
 | |
| 			tr.Discard()
 | |
| 			return err
 | |
| 		}
 | |
| 		return tr.Commit()
 | |
| 	}
 | |
| 
 | |
| 	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
 | |
| 	sync := wo.GetSync() && !db.s.o.GetNoSync()
 | |
| 
 | |
| 	// Acquire write lock.
 | |
| 	if merge {
 | |
| 		select {
 | |
| 		case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
 | |
| 			if <-db.writeMergedC {
 | |
| 				// Write is merged.
 | |
| 				return <-db.writeAckC
 | |
| 			}
 | |
| 			// Write is not merged, the write lock is handed to us. Continue.
 | |
| 		case db.writeLockC <- struct{}{}:
 | |
| 			// Write lock acquired.
 | |
| 		case err := <-db.compPerErrC:
 | |
| 			// Compaction error.
 | |
| 			return err
 | |
| 		case <-db.closeC:
 | |
| 			// Closed
 | |
| 			return ErrClosed
 | |
| 		}
 | |
| 	} else {
 | |
| 		select {
 | |
| 		case db.writeLockC <- struct{}{}:
 | |
| 			// Write lock acquired.
 | |
| 		case err := <-db.compPerErrC:
 | |
| 			// Compaction error.
 | |
| 			return err
 | |
| 		case <-db.closeC:
 | |
| 			// Closed
 | |
| 			return ErrClosed
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return db.writeLocked(batch, nil, merge, sync)
 | |
| }
 | |
| 
 | |
| func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
 | |
| 	if err := db.ok(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
 | |
| 	sync := wo.GetSync() && !db.s.o.GetNoSync()
 | |
| 
 | |
| 	// Acquire write lock.
 | |
| 	if merge {
 | |
| 		select {
 | |
| 		case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
 | |
| 			if <-db.writeMergedC {
 | |
| 				// Write is merged.
 | |
| 				return <-db.writeAckC
 | |
| 			}
 | |
| 			// Write is not merged, the write lock is handed to us. Continue.
 | |
| 		case db.writeLockC <- struct{}{}:
 | |
| 			// Write lock acquired.
 | |
| 		case err := <-db.compPerErrC:
 | |
| 			// Compaction error.
 | |
| 			return err
 | |
| 		case <-db.closeC:
 | |
| 			// Closed
 | |
| 			return ErrClosed
 | |
| 		}
 | |
| 	} else {
 | |
| 		select {
 | |
| 		case db.writeLockC <- struct{}{}:
 | |
| 			// Write lock acquired.
 | |
| 		case err := <-db.compPerErrC:
 | |
| 			// Compaction error.
 | |
| 			return err
 | |
| 		case <-db.closeC:
 | |
| 			// Closed
 | |
| 			return ErrClosed
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	batch := db.batchPool.Get().(*Batch)
 | |
| 	batch.Reset()
 | |
| 	batch.appendRec(kt, key, value)
 | |
| 	return db.writeLocked(batch, batch, merge, sync)
 | |
| }
 | |
| 
 | |
| // Put sets the value for the given key. It overwrites any previous value
 | |
| // for that key; a DB is not a multi-map. Write merge also applies for Put, see
 | |
| // Write.
 | |
| //
 | |
| // It is safe to modify the contents of the arguments after Put returns but not
 | |
| // before.
 | |
| func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
 | |
| 	return db.putRec(keyTypeVal, key, value, wo)
 | |
| }
 | |
| 
 | |
| // Delete deletes the value for the given key. Delete will not returns error if
 | |
| // key doesn't exist. Write merge also applies for Delete, see Write.
 | |
| //
 | |
| // It is safe to modify the contents of the arguments after Delete returns but
 | |
| // not before.
 | |
| func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
 | |
| 	return db.putRec(keyTypeDel, key, nil, wo)
 | |
| }
 | |
| 
 | |
| func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
 | |
| 	iter := mem.NewIterator(nil)
 | |
| 	defer iter.Release()
 | |
| 	return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
 | |
| 		(min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
 | |
| }
 | |
| 
 | |
| // CompactRange compacts the underlying DB for the given key range.
 | |
| // In particular, deleted and overwritten versions are discarded,
 | |
| // and the data is rearranged to reduce the cost of operations
 | |
| // needed to access the data. This operation should typically only
 | |
| // be invoked by users who understand the underlying implementation.
 | |
| //
 | |
| // A nil Range.Start is treated as a key before all keys in the DB.
 | |
| // And a nil Range.Limit is treated as a key after all keys in the DB.
 | |
| // Therefore if both is nil then it will compact entire DB.
 | |
| func (db *DB) CompactRange(r util.Range) error {
 | |
| 	if err := db.ok(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Lock writer.
 | |
| 	select {
 | |
| 	case db.writeLockC <- struct{}{}:
 | |
| 	case err := <-db.compPerErrC:
 | |
| 		return err
 | |
| 	case <-db.closeC:
 | |
| 		return ErrClosed
 | |
| 	}
 | |
| 
 | |
| 	// Check for overlaps in memdb.
 | |
| 	mdb := db.getEffectiveMem()
 | |
| 	if mdb == nil {
 | |
| 		return ErrClosed
 | |
| 	}
 | |
| 	defer mdb.decref()
 | |
| 	if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
 | |
| 		// Memdb compaction.
 | |
| 		if _, err := db.rotateMem(0, false); err != nil {
 | |
| 			<-db.writeLockC
 | |
| 			return err
 | |
| 		}
 | |
| 		<-db.writeLockC
 | |
| 		if err := db.compTriggerWait(db.mcompCmdC); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	} else {
 | |
| 		<-db.writeLockC
 | |
| 	}
 | |
| 
 | |
| 	// Table compaction.
 | |
| 	return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
 | |
| }
 | |
| 
 | |
| // SetReadOnly makes DB read-only. It will stay read-only until reopened.
 | |
| func (db *DB) SetReadOnly() error {
 | |
| 	if err := db.ok(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Lock writer.
 | |
| 	select {
 | |
| 	case db.writeLockC <- struct{}{}:
 | |
| 		db.compWriteLocking = true
 | |
| 	case err := <-db.compPerErrC:
 | |
| 		return err
 | |
| 	case <-db.closeC:
 | |
| 		return ErrClosed
 | |
| 	}
 | |
| 
 | |
| 	// Set compaction read-only.
 | |
| 	select {
 | |
| 	case db.compErrSetC <- ErrReadOnly:
 | |
| 	case perr := <-db.compPerErrC:
 | |
| 		return perr
 | |
| 	case <-db.closeC:
 | |
| 		return ErrClosed
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |