mirror of
				https://github.com/go-gitea/gitea
				synced 2025-11-04 05:18:25 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			301 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			301 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package couchbase
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"github.com/couchbase/goutils/logging"
 | 
						|
	"sync"
 | 
						|
)
 | 
						|
 | 
						|
type PersistTo uint8
 | 
						|
 | 
						|
const (
 | 
						|
	PersistNone   = PersistTo(0x00)
 | 
						|
	PersistMaster = PersistTo(0x01)
 | 
						|
	PersistOne    = PersistTo(0x02)
 | 
						|
	PersistTwo    = PersistTo(0x03)
 | 
						|
	PersistThree  = PersistTo(0x04)
 | 
						|
	PersistFour   = PersistTo(0x05)
 | 
						|
)
 | 
						|
 | 
						|
type ObserveTo uint8
 | 
						|
 | 
						|
const (
 | 
						|
	ObserveNone           = ObserveTo(0x00)
 | 
						|
	ObserveReplicateOne   = ObserveTo(0x01)
 | 
						|
	ObserveReplicateTwo   = ObserveTo(0x02)
 | 
						|
	ObserveReplicateThree = ObserveTo(0x03)
 | 
						|
	ObserveReplicateFour  = ObserveTo(0x04)
 | 
						|
)
 | 
						|
 | 
						|
type JobType uint8
 | 
						|
 | 
						|
const (
 | 
						|
	OBSERVE = JobType(0x00)
 | 
						|
	PERSIST = JobType(0x01)
 | 
						|
)
 | 
						|
 | 
						|
type ObservePersistJob struct {
 | 
						|
	vb                 uint16
 | 
						|
	vbuuid             uint64
 | 
						|
	hostname           string
 | 
						|
	jobType            JobType
 | 
						|
	failover           uint8
 | 
						|
	lastPersistedSeqNo uint64
 | 
						|
	currentSeqNo       uint64
 | 
						|
	resultChan         chan *ObservePersistJob
 | 
						|
	errorChan          chan *OPErrResponse
 | 
						|
}
 | 
						|
 | 
						|
type OPErrResponse struct {
 | 
						|
	vb     uint16
 | 
						|
	vbuuid uint64
 | 
						|
	err    error
 | 
						|
	job    *ObservePersistJob
 | 
						|
}
 | 
						|
 | 
						|
var ObservePersistPool = NewPool(1024)
 | 
						|
var OPJobChan = make(chan *ObservePersistJob, 1024)
 | 
						|
var OPJobDone = make(chan bool)
 | 
						|
 | 
						|
var wg sync.WaitGroup
 | 
						|
 | 
						|
func (b *Bucket) StartOPPollers(maxWorkers int) {
 | 
						|
 | 
						|
	for i := 0; i < maxWorkers; i++ {
 | 
						|
		go b.OPJobPoll()
 | 
						|
		wg.Add(1)
 | 
						|
	}
 | 
						|
	wg.Wait()
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) {
 | 
						|
 | 
						|
	numNodes := len(b.Nodes())
 | 
						|
	if int(nPersist) > numNodes || int(nObserve) > numNodes {
 | 
						|
		return fmt.Errorf("Not enough healthy nodes in the cluster")
 | 
						|
	}
 | 
						|
 | 
						|
	if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas {
 | 
						|
		return fmt.Errorf("Not enough replicas in the cluster")
 | 
						|
	}
 | 
						|
 | 
						|
	if EnableMutationToken == false {
 | 
						|
		return fmt.Errorf("Mutation Tokens not enabled ")
 | 
						|
	}
 | 
						|
 | 
						|
	b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) {
 | 
						|
	b.RLock()
 | 
						|
	ds := b.ds
 | 
						|
	b.RUnlock()
 | 
						|
 | 
						|
	if ds == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	nj := 0 // total number of jobs
 | 
						|
	resultChan := make(chan *ObservePersistJob, 10)
 | 
						|
	errChan := make(chan *OPErrResponse, 10)
 | 
						|
 | 
						|
	nodes := b.GetNodeList(vb)
 | 
						|
	if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) {
 | 
						|
		return fmt.Errorf("Not enough healthy nodes in the cluster"), false
 | 
						|
	}
 | 
						|
 | 
						|
	logging.Infof("Node list %v", nodes)
 | 
						|
 | 
						|
	if ds.Observe >= ObserveReplicateOne {
 | 
						|
		// create a job for each host
 | 
						|
		for i := ObserveReplicateOne; i < ds.Observe+1; i++ {
 | 
						|
			opJob := ObservePersistPool.Get()
 | 
						|
			opJob.vb = vb
 | 
						|
			opJob.vbuuid = vbuuid
 | 
						|
			opJob.jobType = OBSERVE
 | 
						|
			opJob.hostname = nodes[i]
 | 
						|
			opJob.resultChan = resultChan
 | 
						|
			opJob.errorChan = errChan
 | 
						|
 | 
						|
			OPJobChan <- opJob
 | 
						|
			nj++
 | 
						|
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if ds.Persist >= PersistMaster {
 | 
						|
		for i := PersistMaster; i < ds.Persist+1; i++ {
 | 
						|
			opJob := ObservePersistPool.Get()
 | 
						|
			opJob.vb = vb
 | 
						|
			opJob.vbuuid = vbuuid
 | 
						|
			opJob.jobType = PERSIST
 | 
						|
			opJob.hostname = nodes[i]
 | 
						|
			opJob.resultChan = resultChan
 | 
						|
			opJob.errorChan = errChan
 | 
						|
 | 
						|
			OPJobChan <- opJob
 | 
						|
			nj++
 | 
						|
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	ok := true
 | 
						|
	for ok {
 | 
						|
		select {
 | 
						|
		case res := <-resultChan:
 | 
						|
			jobDone := false
 | 
						|
			if res.failover == 0 {
 | 
						|
				// no failover
 | 
						|
				if res.jobType == PERSIST {
 | 
						|
					if res.lastPersistedSeqNo >= seqNo {
 | 
						|
						jobDone = true
 | 
						|
					}
 | 
						|
 | 
						|
				} else {
 | 
						|
					if res.currentSeqNo >= seqNo {
 | 
						|
						jobDone = true
 | 
						|
					}
 | 
						|
				}
 | 
						|
 | 
						|
				if jobDone == true {
 | 
						|
					nj--
 | 
						|
					ObservePersistPool.Put(res)
 | 
						|
				} else {
 | 
						|
					// requeue this job
 | 
						|
					OPJobChan <- res
 | 
						|
				}
 | 
						|
 | 
						|
			} else {
 | 
						|
				// Not currently handling failover scenarios TODO
 | 
						|
				nj--
 | 
						|
				ObservePersistPool.Put(res)
 | 
						|
				failover = true
 | 
						|
			}
 | 
						|
 | 
						|
			if nj == 0 {
 | 
						|
				// done with all the jobs
 | 
						|
				ok = false
 | 
						|
				close(resultChan)
 | 
						|
				close(errChan)
 | 
						|
			}
 | 
						|
 | 
						|
		case Err := <-errChan:
 | 
						|
			logging.Errorf("Error in Observe/Persist %v", Err.err)
 | 
						|
			err = fmt.Errorf("Error in Observe/Persist job %v", Err.err)
 | 
						|
			nj--
 | 
						|
			ObservePersistPool.Put(Err.job)
 | 
						|
			if nj == 0 {
 | 
						|
				close(resultChan)
 | 
						|
				close(errChan)
 | 
						|
				ok = false
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) OPJobPoll() {
 | 
						|
 | 
						|
	ok := true
 | 
						|
	for ok == true {
 | 
						|
		select {
 | 
						|
		case job := <-OPJobChan:
 | 
						|
			pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */)
 | 
						|
			if pool == nil {
 | 
						|
				errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
 | 
						|
				errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname)
 | 
						|
				errRes.job = job
 | 
						|
				job.errorChan <- errRes
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			conn, err := pool.Get()
 | 
						|
			if err != nil {
 | 
						|
				errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
 | 
						|
				errRes.err = fmt.Errorf("Unable to get connection from pool %v", err)
 | 
						|
				errRes.job = job
 | 
						|
				job.errorChan <- errRes
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			res, err := conn.ObserveSeq(job.vb, job.vbuuid)
 | 
						|
			if err != nil {
 | 
						|
				errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
 | 
						|
				errRes.err = fmt.Errorf("Command failed %v", err)
 | 
						|
				errRes.job = job
 | 
						|
				job.errorChan <- errRes
 | 
						|
				continue
 | 
						|
 | 
						|
			}
 | 
						|
			pool.Return(conn)
 | 
						|
			job.lastPersistedSeqNo = res.LastPersistedSeqNo
 | 
						|
			job.currentSeqNo = res.CurrentSeqNo
 | 
						|
			job.failover = res.Failover
 | 
						|
 | 
						|
			job.resultChan <- job
 | 
						|
		case <-OPJobDone:
 | 
						|
			logging.Infof("Observe Persist Poller exitting")
 | 
						|
			ok = false
 | 
						|
		}
 | 
						|
	}
 | 
						|
	wg.Done()
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) GetNodeList(vb uint16) []string {
 | 
						|
 | 
						|
	vbm := b.VBServerMap()
 | 
						|
	if len(vbm.VBucketMap) < int(vb) {
 | 
						|
		logging.Infof("vbmap smaller than vblist")
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	nodes := make([]string, len(vbm.VBucketMap[vb]))
 | 
						|
	for i := 0; i < len(vbm.VBucketMap[vb]); i++ {
 | 
						|
		n := vbm.VBucketMap[vb][i]
 | 
						|
		if n < 0 {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		node := b.getMasterNode(n)
 | 
						|
		if len(node) > 1 {
 | 
						|
			nodes[i] = node
 | 
						|
		}
 | 
						|
		continue
 | 
						|
 | 
						|
	}
 | 
						|
	return nodes
 | 
						|
}
 | 
						|
 | 
						|
//pool of ObservePersist Jobs
 | 
						|
type OPpool struct {
 | 
						|
	pool chan *ObservePersistJob
 | 
						|
}
 | 
						|
 | 
						|
// NewPool creates a new pool of jobs
 | 
						|
func NewPool(max int) *OPpool {
 | 
						|
	return &OPpool{
 | 
						|
		pool: make(chan *ObservePersistJob, max),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Borrow a Client from the pool.
 | 
						|
func (p *OPpool) Get() *ObservePersistJob {
 | 
						|
	var o *ObservePersistJob
 | 
						|
	select {
 | 
						|
	case o = <-p.pool:
 | 
						|
	default:
 | 
						|
		o = &ObservePersistJob{}
 | 
						|
	}
 | 
						|
	return o
 | 
						|
}
 | 
						|
 | 
						|
// Return returns a Client to the pool.
 | 
						|
func (p *OPpool) Put(o *ObservePersistJob) {
 | 
						|
	select {
 | 
						|
	case p.pool <- o:
 | 
						|
	default:
 | 
						|
		// let it go, let it go...
 | 
						|
	}
 | 
						|
}
 |