mirror of
				https://github.com/go-gitea/gitea
				synced 2025-11-04 05:18:25 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1475 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
			
		
		
	
	
			1475 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
package couchbase
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"bytes"
 | 
						|
	"crypto/tls"
 | 
						|
	"crypto/x509"
 | 
						|
	"encoding/base64"
 | 
						|
	"encoding/json"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"io/ioutil"
 | 
						|
	"math/rand"
 | 
						|
	"net/http"
 | 
						|
	"net/url"
 | 
						|
	"runtime"
 | 
						|
	"sort"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
	"unsafe"
 | 
						|
 | 
						|
	"github.com/couchbase/goutils/logging"
 | 
						|
 | 
						|
	"github.com/couchbase/gomemcached"        // package name is 'gomemcached'
 | 
						|
	"github.com/couchbase/gomemcached/client" // package name is 'memcached'
 | 
						|
)
 | 
						|
 | 
						|
// HTTPClient to use for REST and view operations.
 | 
						|
var MaxIdleConnsPerHost = 256
 | 
						|
var ClientTimeOut = 10 * time.Second
 | 
						|
var HTTPTransport = &http.Transport{MaxIdleConnsPerHost: MaxIdleConnsPerHost}
 | 
						|
var HTTPClient = &http.Client{Transport: HTTPTransport, Timeout: ClientTimeOut}
 | 
						|
 | 
						|
// PoolSize is the size of each connection pool (per host).
 | 
						|
var PoolSize = 64
 | 
						|
 | 
						|
// PoolOverflow is the number of overflow connections allowed in a
 | 
						|
// pool.
 | 
						|
var PoolOverflow = 16
 | 
						|
 | 
						|
// AsynchronousCloser turns on asynchronous closing for overflow connections
 | 
						|
var AsynchronousCloser = false
 | 
						|
 | 
						|
// TCP KeepAlive enabled/disabled
 | 
						|
var TCPKeepalive = false
 | 
						|
 | 
						|
// Enable MutationToken
 | 
						|
var EnableMutationToken = false
 | 
						|
 | 
						|
// Enable Data Type response
 | 
						|
var EnableDataType = false
 | 
						|
 | 
						|
// Enable Xattr
 | 
						|
var EnableXattr = false
 | 
						|
 | 
						|
// Enable Collections
 | 
						|
var EnableCollections = false
 | 
						|
 | 
						|
// TCP keepalive interval in seconds. Default 30 minutes
 | 
						|
var TCPKeepaliveInterval = 30 * 60
 | 
						|
 | 
						|
// Used to decide whether to skip verification of certificates when
 | 
						|
// connecting to an ssl port.
 | 
						|
var skipVerify = true
 | 
						|
var certFile = ""
 | 
						|
var keyFile = ""
 | 
						|
var rootFile = ""
 | 
						|
 | 
						|
func SetSkipVerify(skip bool) {
 | 
						|
	skipVerify = skip
 | 
						|
}
 | 
						|
 | 
						|
func SetCertFile(cert string) {
 | 
						|
	certFile = cert
 | 
						|
}
 | 
						|
 | 
						|
func SetKeyFile(cert string) {
 | 
						|
	keyFile = cert
 | 
						|
}
 | 
						|
 | 
						|
func SetRootFile(cert string) {
 | 
						|
	rootFile = cert
 | 
						|
}
 | 
						|
 | 
						|
// Allow applications to speciify the Poolsize and Overflow
 | 
						|
func SetConnectionPoolParams(size, overflow int) {
 | 
						|
 | 
						|
	if size > 0 {
 | 
						|
		PoolSize = size
 | 
						|
	}
 | 
						|
 | 
						|
	if overflow > 0 {
 | 
						|
		PoolOverflow = overflow
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Turn off overflow connections
 | 
						|
func DisableOverflowConnections() {
 | 
						|
	PoolOverflow = 0
 | 
						|
}
 | 
						|
 | 
						|
// Toggle asynchronous overflow closer
 | 
						|
func EnableAsynchronousCloser(closer bool) {
 | 
						|
	AsynchronousCloser = closer
 | 
						|
}
 | 
						|
 | 
						|
// Allow TCP keepalive parameters to be set by the application
 | 
						|
func SetTcpKeepalive(enabled bool, interval int) {
 | 
						|
 | 
						|
	TCPKeepalive = enabled
 | 
						|
 | 
						|
	if interval > 0 {
 | 
						|
		TCPKeepaliveInterval = interval
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// AuthHandler is a callback that gets the auth username and password
 | 
						|
// for the given bucket.
 | 
						|
type AuthHandler interface {
 | 
						|
	GetCredentials() (string, string, string)
 | 
						|
}
 | 
						|
 | 
						|
// AuthHandler is a callback that gets the auth username and password
 | 
						|
// for the given bucket and sasl for memcached.
 | 
						|
type AuthWithSaslHandler interface {
 | 
						|
	AuthHandler
 | 
						|
	GetSaslCredentials() (string, string)
 | 
						|
}
 | 
						|
 | 
						|
// MultiBucketAuthHandler is kind of AuthHandler that may perform
 | 
						|
// different auth for different buckets.
 | 
						|
type MultiBucketAuthHandler interface {
 | 
						|
	AuthHandler
 | 
						|
	ForBucket(bucket string) AuthHandler
 | 
						|
}
 | 
						|
 | 
						|
// HTTPAuthHandler is kind of AuthHandler that performs more general
 | 
						|
// for outgoing http requests than is possible via simple
 | 
						|
// GetCredentials() call (i.e. digest auth or different auth per
 | 
						|
// different destinations).
 | 
						|
type HTTPAuthHandler interface {
 | 
						|
	AuthHandler
 | 
						|
	SetCredsForRequest(req *http.Request) error
 | 
						|
}
 | 
						|
 | 
						|
// RestPool represents a single pool returned from the pools REST API.
 | 
						|
type RestPool struct {
 | 
						|
	Name         string `json:"name"`
 | 
						|
	StreamingURI string `json:"streamingUri"`
 | 
						|
	URI          string `json:"uri"`
 | 
						|
}
 | 
						|
 | 
						|
// Pools represents the collection of pools as returned from the REST API.
 | 
						|
type Pools struct {
 | 
						|
	ComponentsVersion     map[string]string `json:"componentsVersion,omitempty"`
 | 
						|
	ImplementationVersion string            `json:"implementationVersion"`
 | 
						|
	IsAdmin               bool              `json:"isAdminCreds"`
 | 
						|
	UUID                  string            `json:"uuid"`
 | 
						|
	Pools                 []RestPool        `json:"pools"`
 | 
						|
}
 | 
						|
 | 
						|
// A Node is a computer in a cluster running the couchbase software.
 | 
						|
type Node struct {
 | 
						|
	ClusterCompatibility int                `json:"clusterCompatibility"`
 | 
						|
	ClusterMembership    string             `json:"clusterMembership"`
 | 
						|
	CouchAPIBase         string             `json:"couchApiBase"`
 | 
						|
	Hostname             string             `json:"hostname"`
 | 
						|
	InterestingStats     map[string]float64 `json:"interestingStats,omitempty"`
 | 
						|
	MCDMemoryAllocated   float64            `json:"mcdMemoryAllocated"`
 | 
						|
	MCDMemoryReserved    float64            `json:"mcdMemoryReserved"`
 | 
						|
	MemoryFree           float64            `json:"memoryFree"`
 | 
						|
	MemoryTotal          float64            `json:"memoryTotal"`
 | 
						|
	OS                   string             `json:"os"`
 | 
						|
	Ports                map[string]int     `json:"ports"`
 | 
						|
	Services             []string           `json:"services"`
 | 
						|
	Status               string             `json:"status"`
 | 
						|
	Uptime               int                `json:"uptime,string"`
 | 
						|
	Version              string             `json:"version"`
 | 
						|
	ThisNode             bool               `json:"thisNode,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
// A Pool of nodes and buckets.
 | 
						|
type Pool struct {
 | 
						|
	BucketMap map[string]*Bucket
 | 
						|
	Nodes     []Node
 | 
						|
 | 
						|
	BucketURL map[string]string `json:"buckets"`
 | 
						|
 | 
						|
	client *Client
 | 
						|
}
 | 
						|
 | 
						|
// VBucketServerMap is the a mapping of vbuckets to nodes.
 | 
						|
type VBucketServerMap struct {
 | 
						|
	HashAlgorithm string   `json:"hashAlgorithm"`
 | 
						|
	NumReplicas   int      `json:"numReplicas"`
 | 
						|
	ServerList    []string `json:"serverList"`
 | 
						|
	VBucketMap    [][]int  `json:"vBucketMap"`
 | 
						|
}
 | 
						|
 | 
						|
type DurablitySettings struct {
 | 
						|
	Persist PersistTo
 | 
						|
	Observe ObserveTo
 | 
						|
}
 | 
						|
 | 
						|
// Bucket is the primary entry point for most data operations.
 | 
						|
// Bucket is a locked data structure. All access to its fields should be done using read or write locking,
 | 
						|
// as appropriate.
 | 
						|
//
 | 
						|
// Some access methods require locking, but rely on the caller to do so. These are appropriate
 | 
						|
// for calls from methods that have already locked the structure. Methods like this
 | 
						|
// take a boolean parameter "bucketLocked".
 | 
						|
type Bucket struct {
 | 
						|
	sync.RWMutex
 | 
						|
	AuthType               string             `json:"authType"`
 | 
						|
	Capabilities           []string           `json:"bucketCapabilities"`
 | 
						|
	CapabilitiesVersion    string             `json:"bucketCapabilitiesVer"`
 | 
						|
	Type                   string             `json:"bucketType"`
 | 
						|
	Name                   string             `json:"name"`
 | 
						|
	NodeLocator            string             `json:"nodeLocator"`
 | 
						|
	Quota                  map[string]float64 `json:"quota,omitempty"`
 | 
						|
	Replicas               int                `json:"replicaNumber"`
 | 
						|
	Password               string             `json:"saslPassword"`
 | 
						|
	URI                    string             `json:"uri"`
 | 
						|
	StreamingURI           string             `json:"streamingUri"`
 | 
						|
	LocalRandomKeyURI      string             `json:"localRandomKeyUri,omitempty"`
 | 
						|
	UUID                   string             `json:"uuid"`
 | 
						|
	ConflictResolutionType string             `json:"conflictResolutionType,omitempty"`
 | 
						|
	DDocs                  struct {
 | 
						|
		URI string `json:"uri"`
 | 
						|
	} `json:"ddocs,omitempty"`
 | 
						|
	BasicStats  map[string]interface{} `json:"basicStats,omitempty"`
 | 
						|
	Controllers map[string]interface{} `json:"controllers,omitempty"`
 | 
						|
 | 
						|
	// These are used for JSON IO, but isn't used for processing
 | 
						|
	// since it needs to be swapped out safely.
 | 
						|
	VBSMJson  VBucketServerMap `json:"vBucketServerMap"`
 | 
						|
	NodesJSON []Node           `json:"nodes"`
 | 
						|
 | 
						|
	pool             *Pool
 | 
						|
	connPools        unsafe.Pointer // *[]*connectionPool
 | 
						|
	vBucketServerMap unsafe.Pointer // *VBucketServerMap
 | 
						|
	nodeList         unsafe.Pointer // *[]Node
 | 
						|
	commonSufix      string
 | 
						|
	ah               AuthHandler        // auth handler
 | 
						|
	ds               *DurablitySettings // Durablity Settings for this bucket
 | 
						|
	closed           bool
 | 
						|
}
 | 
						|
 | 
						|
// PoolServices is all the bucket-independent services in a pool
 | 
						|
type PoolServices struct {
 | 
						|
	Rev          int             `json:"rev"`
 | 
						|
	NodesExt     []NodeServices  `json:"nodesExt"`
 | 
						|
	Capabilities json.RawMessage `json:"clusterCapabilities"`
 | 
						|
}
 | 
						|
 | 
						|
// NodeServices is all the bucket-independent services running on
 | 
						|
// a node (given by Hostname)
 | 
						|
type NodeServices struct {
 | 
						|
	Services map[string]int `json:"services,omitempty"`
 | 
						|
	Hostname string         `json:"hostname"`
 | 
						|
	ThisNode bool           `json:"thisNode"`
 | 
						|
}
 | 
						|
 | 
						|
type BucketNotFoundError struct {
 | 
						|
	bucket string
 | 
						|
}
 | 
						|
 | 
						|
func (e *BucketNotFoundError) Error() string {
 | 
						|
	return fmt.Sprint("No bucket named " + e.bucket)
 | 
						|
}
 | 
						|
 | 
						|
type BucketAuth struct {
 | 
						|
	name    string
 | 
						|
	saslPwd string
 | 
						|
	bucket  string
 | 
						|
}
 | 
						|
 | 
						|
func newBucketAuth(name string, pass string, bucket string) *BucketAuth {
 | 
						|
	return &BucketAuth{name: name, saslPwd: pass, bucket: bucket}
 | 
						|
}
 | 
						|
 | 
						|
func (ba *BucketAuth) GetCredentials() (string, string, string) {
 | 
						|
	return ba.name, ba.saslPwd, ba.bucket
 | 
						|
}
 | 
						|
 | 
						|
// VBServerMap returns the current VBucketServerMap.
 | 
						|
func (b *Bucket) VBServerMap() *VBucketServerMap {
 | 
						|
	b.RLock()
 | 
						|
	defer b.RUnlock()
 | 
						|
	ret := (*VBucketServerMap)(b.vBucketServerMap)
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error) {
 | 
						|
	vbmap := b.VBServerMap()
 | 
						|
	servers := vbmap.ServerList
 | 
						|
	if addrs == nil {
 | 
						|
		addrs = vbmap.ServerList
 | 
						|
	}
 | 
						|
 | 
						|
	m := make(map[string][]uint16)
 | 
						|
	for _, addr := range addrs {
 | 
						|
		m[addr] = make([]uint16, 0)
 | 
						|
	}
 | 
						|
	for vbno, idxs := range vbmap.VBucketMap {
 | 
						|
		if len(idxs) == 0 {
 | 
						|
			return nil, fmt.Errorf("vbmap: No KV node no for vb %d", vbno)
 | 
						|
		} else if idxs[0] < 0 || idxs[0] >= len(servers) {
 | 
						|
			return nil, fmt.Errorf("vbmap: Invalid KV node no %d for vb %d", idxs[0], vbno)
 | 
						|
		}
 | 
						|
		addr := servers[idxs[0]]
 | 
						|
		if _, ok := m[addr]; ok {
 | 
						|
			m[addr] = append(m[addr], uint16(vbno))
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return m, nil
 | 
						|
}
 | 
						|
 | 
						|
// true if node is not on the bucket VBmap
 | 
						|
func (b *Bucket) checkVBmap(node string) bool {
 | 
						|
	vbmap := b.VBServerMap()
 | 
						|
	servers := vbmap.ServerList
 | 
						|
 | 
						|
	for _, idxs := range vbmap.VBucketMap {
 | 
						|
		if len(idxs) == 0 {
 | 
						|
			return true
 | 
						|
		} else if idxs[0] < 0 || idxs[0] >= len(servers) {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
		if servers[idxs[0]] == node {
 | 
						|
			return false
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) GetName() string {
 | 
						|
	b.RLock()
 | 
						|
	defer b.RUnlock()
 | 
						|
	ret := b.Name
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
// Nodes returns the current list of nodes servicing this bucket.
 | 
						|
func (b *Bucket) Nodes() []Node {
 | 
						|
	b.RLock()
 | 
						|
	defer b.RUnlock()
 | 
						|
	ret := *(*[]Node)(b.nodeList)
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
// return the list of healthy nodes
 | 
						|
func (b *Bucket) HealthyNodes() []Node {
 | 
						|
	nodes := []Node{}
 | 
						|
 | 
						|
	for _, n := range b.Nodes() {
 | 
						|
		if n.Status == "healthy" && n.CouchAPIBase != "" {
 | 
						|
			nodes = append(nodes, n)
 | 
						|
		}
 | 
						|
		if n.Status != "healthy" { // log non-healthy node
 | 
						|
			logging.Infof("Non-healthy node; node details:")
 | 
						|
			logging.Infof("Hostname=%v, Status=%v, CouchAPIBase=%v, ThisNode=%v", n.Hostname, n.Status, n.CouchAPIBase, n.ThisNode)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nodes
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) getConnPools(bucketLocked bool) []*connectionPool {
 | 
						|
	if !bucketLocked {
 | 
						|
		b.RLock()
 | 
						|
		defer b.RUnlock()
 | 
						|
	}
 | 
						|
	if b.connPools != nil {
 | 
						|
		return *(*[]*connectionPool)(b.connPools)
 | 
						|
	} else {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) replaceConnPools(with []*connectionPool) {
 | 
						|
	b.Lock()
 | 
						|
	defer b.Unlock()
 | 
						|
 | 
						|
	old := b.connPools
 | 
						|
	b.connPools = unsafe.Pointer(&with)
 | 
						|
	if old != nil {
 | 
						|
		for _, pool := range *(*[]*connectionPool)(old) {
 | 
						|
			if pool != nil {
 | 
						|
				pool.Close()
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) getConnPool(i int) *connectionPool {
 | 
						|
 | 
						|
	if i < 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	p := b.getConnPools(false /* not already locked */)
 | 
						|
	if len(p) > i {
 | 
						|
		return p[i]
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) getConnPoolByHost(host string, bucketLocked bool) *connectionPool {
 | 
						|
	pools := b.getConnPools(bucketLocked)
 | 
						|
	for _, p := range pools {
 | 
						|
		if p != nil && p.host == host {
 | 
						|
			return p
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Given a vbucket number, returns a memcached connection to it.
 | 
						|
// The connection must be returned to its pool after use.
 | 
						|
func (b *Bucket) getConnectionToVBucket(vb uint32) (*memcached.Client, *connectionPool, error) {
 | 
						|
	for {
 | 
						|
		vbm := b.VBServerMap()
 | 
						|
		if len(vbm.VBucketMap) < int(vb) {
 | 
						|
			return nil, nil, fmt.Errorf("go-couchbase: vbmap smaller than vbucket list: %v vs. %v",
 | 
						|
				vb, vbm.VBucketMap)
 | 
						|
		}
 | 
						|
		masterId := vbm.VBucketMap[vb][0]
 | 
						|
		if masterId < 0 {
 | 
						|
			return nil, nil, fmt.Errorf("go-couchbase: No master for vbucket %d", vb)
 | 
						|
		}
 | 
						|
		pool := b.getConnPool(masterId)
 | 
						|
		conn, err := pool.Get()
 | 
						|
		if err != errClosedPool {
 | 
						|
			return conn, pool, err
 | 
						|
		}
 | 
						|
		// If conn pool was closed, because another goroutine refreshed the vbucket map, retry...
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// To get random documents, we need to cover all the nodes, so select
 | 
						|
// a connection at random.
 | 
						|
 | 
						|
func (b *Bucket) getRandomConnection() (*memcached.Client, *connectionPool, error) {
 | 
						|
	for {
 | 
						|
		var currentPool = 0
 | 
						|
		pools := b.getConnPools(false /* not already locked */)
 | 
						|
		if len(pools) == 0 {
 | 
						|
			return nil, nil, fmt.Errorf("No connection pool found")
 | 
						|
		} else if len(pools) > 1 { // choose a random connection
 | 
						|
			currentPool = rand.Intn(len(pools))
 | 
						|
		} // if only one pool, currentPool defaults to 0, i.e., the only pool
 | 
						|
 | 
						|
		// get the pool
 | 
						|
		pool := pools[currentPool]
 | 
						|
		conn, err := pool.Get()
 | 
						|
		if err != errClosedPool {
 | 
						|
			return conn, pool, err
 | 
						|
		}
 | 
						|
 | 
						|
		// If conn pool was closed, because another goroutine refreshed the vbucket map, retry...
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
//
 | 
						|
// Get a random document from a bucket. Since the bucket may be distributed
 | 
						|
// across nodes, we must first select a random connection, and then use the
 | 
						|
// Client.GetRandomDoc() call to get a random document from that node.
 | 
						|
//
 | 
						|
 | 
						|
func (b *Bucket) GetRandomDoc() (*gomemcached.MCResponse, error) {
 | 
						|
	// get a connection from the pool
 | 
						|
	conn, pool, err := b.getRandomConnection()
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// We may need to select the bucket before GetRandomDoc()
 | 
						|
	// will work. This is sometimes done at startup (see defaultMkConn())
 | 
						|
	// but not always, depending on the auth type.
 | 
						|
	_, err = conn.SelectBucket(b.Name)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// get a randomm document from the connection
 | 
						|
	doc, err := conn.GetRandomDoc()
 | 
						|
	// need to return the connection to the pool
 | 
						|
	pool.Return(conn)
 | 
						|
	return doc, err
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) getMasterNode(i int) string {
 | 
						|
	p := b.getConnPools(false /* not already locked */)
 | 
						|
	if len(p) > i {
 | 
						|
		return p[i].host
 | 
						|
	}
 | 
						|
	return ""
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) authHandler(bucketLocked bool) (ah AuthHandler) {
 | 
						|
	if !bucketLocked {
 | 
						|
		b.RLock()
 | 
						|
		defer b.RUnlock()
 | 
						|
	}
 | 
						|
	pool := b.pool
 | 
						|
	name := b.Name
 | 
						|
 | 
						|
	if pool != nil {
 | 
						|
		ah = pool.client.ah
 | 
						|
	}
 | 
						|
	if mbah, ok := ah.(MultiBucketAuthHandler); ok {
 | 
						|
		return mbah.ForBucket(name)
 | 
						|
	}
 | 
						|
	if ah == nil {
 | 
						|
		ah = &basicAuth{name, ""}
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// NodeAddresses gets the (sorted) list of memcached node addresses
 | 
						|
// (hostname:port).
 | 
						|
func (b *Bucket) NodeAddresses() []string {
 | 
						|
	vsm := b.VBServerMap()
 | 
						|
	rv := make([]string, len(vsm.ServerList))
 | 
						|
	copy(rv, vsm.ServerList)
 | 
						|
	sort.Strings(rv)
 | 
						|
	return rv
 | 
						|
}
 | 
						|
 | 
						|
// CommonAddressSuffix finds the longest common suffix of all
 | 
						|
// host:port strings in the node list.
 | 
						|
func (b *Bucket) CommonAddressSuffix() string {
 | 
						|
	input := []string{}
 | 
						|
	for _, n := range b.Nodes() {
 | 
						|
		input = append(input, n.Hostname)
 | 
						|
	}
 | 
						|
	return FindCommonSuffix(input)
 | 
						|
}
 | 
						|
 | 
						|
// A Client is the starting point for all services across all buckets
 | 
						|
// in a Couchbase cluster.
 | 
						|
type Client struct {
 | 
						|
	BaseURL   *url.URL
 | 
						|
	ah        AuthHandler
 | 
						|
	Info      Pools
 | 
						|
	tlsConfig *tls.Config
 | 
						|
}
 | 
						|
 | 
						|
func maybeAddAuth(req *http.Request, ah AuthHandler) error {
 | 
						|
	if hah, ok := ah.(HTTPAuthHandler); ok {
 | 
						|
		return hah.SetCredsForRequest(req)
 | 
						|
	}
 | 
						|
	if ah != nil {
 | 
						|
		user, pass, _ := ah.GetCredentials()
 | 
						|
		req.Header.Set("Authorization", "Basic "+
 | 
						|
			base64.StdEncoding.EncodeToString([]byte(user+":"+pass)))
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// arbitary number, may need to be tuned #FIXME
 | 
						|
const HTTP_MAX_RETRY = 5
 | 
						|
 | 
						|
// Someday golang network packages will implement standard
 | 
						|
// error codes. Until then #sigh
 | 
						|
func isHttpConnError(err error) bool {
 | 
						|
 | 
						|
	estr := err.Error()
 | 
						|
	return strings.Contains(estr, "broken pipe") ||
 | 
						|
		strings.Contains(estr, "broken connection") ||
 | 
						|
		strings.Contains(estr, "connection reset")
 | 
						|
}
 | 
						|
 | 
						|
var client *http.Client
 | 
						|
 | 
						|
func ClientConfigForX509(certFile, keyFile, rootFile string) (*tls.Config, error) {
 | 
						|
	cfg := &tls.Config{}
 | 
						|
 | 
						|
	if certFile != "" && keyFile != "" {
 | 
						|
		tlsCert, err := tls.LoadX509KeyPair(certFile, keyFile)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		cfg.Certificates = []tls.Certificate{tlsCert}
 | 
						|
	} else {
 | 
						|
		//error need to pass both certfile and keyfile
 | 
						|
		return nil, fmt.Errorf("N1QL: Need to pass both certfile and keyfile")
 | 
						|
	}
 | 
						|
 | 
						|
	var caCert []byte
 | 
						|
	var err1 error
 | 
						|
 | 
						|
	caCertPool := x509.NewCertPool()
 | 
						|
	if rootFile != "" {
 | 
						|
		// Read that value in
 | 
						|
		caCert, err1 = ioutil.ReadFile(rootFile)
 | 
						|
		if err1 != nil {
 | 
						|
			return nil, fmt.Errorf(" Error in reading cacert file, err: %v", err1)
 | 
						|
		}
 | 
						|
		caCertPool.AppendCertsFromPEM(caCert)
 | 
						|
	}
 | 
						|
 | 
						|
	cfg.RootCAs = caCertPool
 | 
						|
	return cfg, nil
 | 
						|
}
 | 
						|
 | 
						|
func doHTTPRequest(req *http.Request) (*http.Response, error) {
 | 
						|
 | 
						|
	var err error
 | 
						|
	var res *http.Response
 | 
						|
 | 
						|
	// we need a client that ignores certificate errors, since we self-sign
 | 
						|
	// our certs
 | 
						|
	if client == nil && req.URL.Scheme == "https" {
 | 
						|
		var tr *http.Transport
 | 
						|
 | 
						|
		if skipVerify {
 | 
						|
			tr = &http.Transport{
 | 
						|
				TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			// Handle cases with cert
 | 
						|
 | 
						|
			cfg, err := ClientConfigForX509(certFile, keyFile, rootFile)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
 | 
						|
			tr = &http.Transport{
 | 
						|
				TLSClientConfig: cfg,
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		client = &http.Client{Transport: tr}
 | 
						|
 | 
						|
	} else if client == nil {
 | 
						|
		client = HTTPClient
 | 
						|
	}
 | 
						|
 | 
						|
	for i := 0; i < HTTP_MAX_RETRY; i++ {
 | 
						|
		res, err = client.Do(req)
 | 
						|
		if err != nil && isHttpConnError(err) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		break
 | 
						|
	}
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return res, err
 | 
						|
}
 | 
						|
 | 
						|
func doPutAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error {
 | 
						|
	return doOutputAPI("PUT", baseURL, path, params, authHandler, out)
 | 
						|
}
 | 
						|
 | 
						|
func doPostAPI(baseURL *url.URL, path string, params map[string]interface{}, authHandler AuthHandler, out interface{}) error {
 | 
						|
	return doOutputAPI("POST", baseURL, path, params, authHandler, out)
 | 
						|
}
 | 
						|
 | 
						|
func doOutputAPI(
 | 
						|
	httpVerb string,
 | 
						|
	baseURL *url.URL,
 | 
						|
	path string,
 | 
						|
	params map[string]interface{},
 | 
						|
	authHandler AuthHandler,
 | 
						|
	out interface{}) error {
 | 
						|
 | 
						|
	var requestUrl string
 | 
						|
 | 
						|
	if q := strings.Index(path, "?"); q > 0 {
 | 
						|
		requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
 | 
						|
	} else {
 | 
						|
		requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
 | 
						|
	}
 | 
						|
 | 
						|
	postData := url.Values{}
 | 
						|
	for k, v := range params {
 | 
						|
		postData.Set(k, fmt.Sprintf("%v", v))
 | 
						|
	}
 | 
						|
 | 
						|
	req, err := http.NewRequest(httpVerb, requestUrl, bytes.NewBufferString(postData.Encode()))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
 | 
						|
 | 
						|
	err = maybeAddAuth(req, authHandler)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	res, err := doHTTPRequest(req)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	defer res.Body.Close()
 | 
						|
	if res.StatusCode != 200 {
 | 
						|
		bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
 | 
						|
		return fmt.Errorf("HTTP error %v getting %q: %s",
 | 
						|
			res.Status, requestUrl, bod)
 | 
						|
	}
 | 
						|
 | 
						|
	d := json.NewDecoder(res.Body)
 | 
						|
	if err = d.Decode(&out); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func queryRestAPI(
 | 
						|
	baseURL *url.URL,
 | 
						|
	path string,
 | 
						|
	authHandler AuthHandler,
 | 
						|
	out interface{}) error {
 | 
						|
 | 
						|
	var requestUrl string
 | 
						|
 | 
						|
	if q := strings.Index(path, "?"); q > 0 {
 | 
						|
		requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
 | 
						|
	} else {
 | 
						|
		requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
 | 
						|
	}
 | 
						|
 | 
						|
	req, err := http.NewRequest("GET", requestUrl, nil)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	err = maybeAddAuth(req, authHandler)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	res, err := doHTTPRequest(req)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	defer res.Body.Close()
 | 
						|
	if res.StatusCode != 200 {
 | 
						|
		bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
 | 
						|
		return fmt.Errorf("HTTP error %v getting %q: %s",
 | 
						|
			res.Status, requestUrl, bod)
 | 
						|
	}
 | 
						|
 | 
						|
	d := json.NewDecoder(res.Body)
 | 
						|
	if err = d.Decode(&out); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *Client) ProcessStream(path string, callb func(interface{}) error, data interface{}) error {
 | 
						|
	return c.processStream(c.BaseURL, path, c.ah, callb, data)
 | 
						|
}
 | 
						|
 | 
						|
// Based on code in http://src.couchbase.org/source/xref/trunk/goproj/src/github.com/couchbase/indexing/secondary/dcp/pools.go#309
 | 
						|
func (c *Client) processStream(baseURL *url.URL, path string, authHandler AuthHandler, callb func(interface{}) error, data interface{}) error {
 | 
						|
	var requestUrl string
 | 
						|
 | 
						|
	if q := strings.Index(path, "?"); q > 0 {
 | 
						|
		requestUrl = baseURL.Scheme + "://" + baseURL.Host + path[:q] + "?" + path[q+1:]
 | 
						|
	} else {
 | 
						|
		requestUrl = baseURL.Scheme + "://" + baseURL.Host + path
 | 
						|
	}
 | 
						|
 | 
						|
	req, err := http.NewRequest("GET", requestUrl, nil)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	err = maybeAddAuth(req, authHandler)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	res, err := doHTTPRequest(req)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	defer res.Body.Close()
 | 
						|
	if res.StatusCode != 200 {
 | 
						|
		bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
 | 
						|
		return fmt.Errorf("HTTP error %v getting %q: %s",
 | 
						|
			res.Status, requestUrl, bod)
 | 
						|
	}
 | 
						|
 | 
						|
	reader := bufio.NewReader(res.Body)
 | 
						|
	for {
 | 
						|
		bs, err := reader.ReadBytes('\n')
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if len(bs) == 1 && bs[0] == '\n' {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		err = json.Unmarshal(bs, data)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		err = callb(data)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func (c *Client) parseURLResponse(path string, out interface{}) error {
 | 
						|
	return queryRestAPI(c.BaseURL, path, c.ah, out)
 | 
						|
}
 | 
						|
 | 
						|
func (c *Client) parsePostURLResponse(path string, params map[string]interface{}, out interface{}) error {
 | 
						|
	return doPostAPI(c.BaseURL, path, params, c.ah, out)
 | 
						|
}
 | 
						|
 | 
						|
func (c *Client) parsePutURLResponse(path string, params map[string]interface{}, out interface{}) error {
 | 
						|
	return doPutAPI(c.BaseURL, path, params, c.ah, out)
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) parseURLResponse(path string, out interface{}) error {
 | 
						|
	nodes := b.Nodes()
 | 
						|
	if len(nodes) == 0 {
 | 
						|
		return errors.New("no couch rest URLs")
 | 
						|
	}
 | 
						|
 | 
						|
	// Pick a random node to start querying.
 | 
						|
	startNode := rand.Intn(len(nodes))
 | 
						|
	maxRetries := len(nodes)
 | 
						|
	for i := 0; i < maxRetries; i++ {
 | 
						|
		node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list.
 | 
						|
		// Skip non-healthy nodes.
 | 
						|
		if node.Status != "healthy" || node.CouchAPIBase == "" {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		url := &url.URL{
 | 
						|
			Host:   node.Hostname,
 | 
						|
			Scheme: "http",
 | 
						|
		}
 | 
						|
 | 
						|
		// Lock here to avoid having pool closed under us.
 | 
						|
		b.RLock()
 | 
						|
		err := queryRestAPI(url, path, b.pool.client.ah, out)
 | 
						|
		b.RUnlock()
 | 
						|
		if err == nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return errors.New("All nodes failed to respond or no healthy nodes for bucket found")
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) parseAPIResponse(path string, out interface{}) error {
 | 
						|
	nodes := b.Nodes()
 | 
						|
	if len(nodes) == 0 {
 | 
						|
		return errors.New("no couch rest URLs")
 | 
						|
	}
 | 
						|
 | 
						|
	var err error
 | 
						|
	var u *url.URL
 | 
						|
 | 
						|
	// Pick a random node to start querying.
 | 
						|
	startNode := rand.Intn(len(nodes))
 | 
						|
	maxRetries := len(nodes)
 | 
						|
	for i := 0; i < maxRetries; i++ {
 | 
						|
		node := nodes[(startNode+i)%len(nodes)] // Wrap around the nodes list.
 | 
						|
		// Skip non-healthy nodes.
 | 
						|
		if node.Status != "healthy" || node.CouchAPIBase == "" {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		u, err = ParseURL(node.CouchAPIBase)
 | 
						|
		// Lock here so pool does not get closed under us.
 | 
						|
		b.RLock()
 | 
						|
		if err != nil {
 | 
						|
			b.RUnlock()
 | 
						|
			return fmt.Errorf("config error: Bucket %q node #%d CouchAPIBase=%q: %v",
 | 
						|
				b.Name, i, node.CouchAPIBase, err)
 | 
						|
		} else if b.pool != nil {
 | 
						|
			u.User = b.pool.client.BaseURL.User
 | 
						|
		}
 | 
						|
		u.Path = path
 | 
						|
 | 
						|
		// generate the path so that the strings are properly escaped
 | 
						|
		// MB-13770
 | 
						|
		requestPath := strings.Split(u.String(), u.Host)[1]
 | 
						|
 | 
						|
		err = queryRestAPI(u, requestPath, b.pool.client.ah, out)
 | 
						|
		b.RUnlock()
 | 
						|
		if err == nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	var errStr string
 | 
						|
	if err != nil {
 | 
						|
		errStr = "Error " + err.Error()
 | 
						|
	}
 | 
						|
 | 
						|
	return errors.New("All nodes failed to respond or returned error or no healthy nodes for bucket found." + errStr)
 | 
						|
}
 | 
						|
 | 
						|
type basicAuth struct {
 | 
						|
	u, p string
 | 
						|
}
 | 
						|
 | 
						|
func (b basicAuth) GetCredentials() (string, string, string) {
 | 
						|
	return b.u, b.p, b.u
 | 
						|
}
 | 
						|
 | 
						|
func basicAuthFromURL(us string) (ah AuthHandler) {
 | 
						|
	u, err := ParseURL(us)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if user := u.User; user != nil {
 | 
						|
		pw, _ := user.Password()
 | 
						|
		ah = basicAuth{user.Username(), pw}
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// ConnectWithAuth connects to a couchbase cluster with the given
 | 
						|
// authentication handler.
 | 
						|
func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error) {
 | 
						|
	c.BaseURL, err = ParseURL(baseU)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	c.ah = ah
 | 
						|
 | 
						|
	return c, c.parseURLResponse("/pools", &c.Info)
 | 
						|
}
 | 
						|
 | 
						|
// Call this method with a TLS certificate file name to make communication
 | 
						|
// with the KV engine encrypted.
 | 
						|
//
 | 
						|
// This method should be called immediately after a Connect*() method.
 | 
						|
func (c *Client) InitTLS(certFile string) error {
 | 
						|
	serverCert, err := ioutil.ReadFile(certFile)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	CA_Pool := x509.NewCertPool()
 | 
						|
	CA_Pool.AppendCertsFromPEM(serverCert)
 | 
						|
	c.tlsConfig = &tls.Config{RootCAs: CA_Pool}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *Client) ClearTLS() {
 | 
						|
	c.tlsConfig = nil
 | 
						|
}
 | 
						|
 | 
						|
// ConnectWithAuthCreds connects to a couchbase cluster with the give
 | 
						|
// authorization creds returned by cb_auth
 | 
						|
func ConnectWithAuthCreds(baseU, username, password string) (c Client, err error) {
 | 
						|
	c.BaseURL, err = ParseURL(baseU)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	c.ah = newBucketAuth(username, password, "")
 | 
						|
	return c, c.parseURLResponse("/pools", &c.Info)
 | 
						|
}
 | 
						|
 | 
						|
// Connect to a couchbase cluster.  An authentication handler will be
 | 
						|
// created from the userinfo in the URL if provided.
 | 
						|
func Connect(baseU string) (Client, error) {
 | 
						|
	return ConnectWithAuth(baseU, basicAuthFromURL(baseU))
 | 
						|
}
 | 
						|
 | 
						|
type BucketInfo struct {
 | 
						|
	Name     string // name of bucket
 | 
						|
	Password string // SASL password of bucket
 | 
						|
}
 | 
						|
 | 
						|
//Get SASL buckets
 | 
						|
func GetBucketList(baseU string) (bInfo []BucketInfo, err error) {
 | 
						|
 | 
						|
	c := &Client{}
 | 
						|
	c.BaseURL, err = ParseURL(baseU)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	c.ah = basicAuthFromURL(baseU)
 | 
						|
 | 
						|
	var buckets []Bucket
 | 
						|
	err = c.parseURLResponse("/pools/default/buckets", &buckets)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	bInfo = make([]BucketInfo, 0)
 | 
						|
	for _, bucket := range buckets {
 | 
						|
		bucketInfo := BucketInfo{Name: bucket.Name, Password: bucket.Password}
 | 
						|
		bInfo = append(bInfo, bucketInfo)
 | 
						|
	}
 | 
						|
	return bInfo, err
 | 
						|
}
 | 
						|
 | 
						|
//Set viewUpdateDaemonOptions
 | 
						|
func SetViewUpdateParams(baseU string, params map[string]interface{}) (viewOpts map[string]interface{}, err error) {
 | 
						|
 | 
						|
	c := &Client{}
 | 
						|
	c.BaseURL, err = ParseURL(baseU)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	c.ah = basicAuthFromURL(baseU)
 | 
						|
 | 
						|
	if len(params) < 1 {
 | 
						|
		return nil, fmt.Errorf("No params to set")
 | 
						|
	}
 | 
						|
 | 
						|
	err = c.parsePostURLResponse("/settings/viewUpdateDaemon", params, &viewOpts)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	return viewOpts, err
 | 
						|
}
 | 
						|
 | 
						|
// This API lets the caller know, if the list of nodes a bucket is
 | 
						|
// connected to has gone through an edit (a rebalance operation)
 | 
						|
// since the last update to the bucket, in which case a Refresh is
 | 
						|
// advised.
 | 
						|
func (b *Bucket) NodeListChanged() bool {
 | 
						|
	b.RLock()
 | 
						|
	pool := b.pool
 | 
						|
	uri := b.URI
 | 
						|
	b.RUnlock()
 | 
						|
 | 
						|
	tmpb := &Bucket{}
 | 
						|
	err := pool.client.parseURLResponse(uri, tmpb)
 | 
						|
	if err != nil {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	bNodes := *(*[]Node)(b.nodeList)
 | 
						|
	if len(bNodes) != len(tmpb.NodesJSON) {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	bucketHostnames := map[string]bool{}
 | 
						|
	for _, node := range bNodes {
 | 
						|
		bucketHostnames[node.Hostname] = true
 | 
						|
	}
 | 
						|
 | 
						|
	for _, node := range tmpb.NodesJSON {
 | 
						|
		if _, found := bucketHostnames[node.Hostname]; !found {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// Sample data for scopes and collections as returned from the
 | 
						|
// /pooles/default/$BUCKET_NAME/collections API.
 | 
						|
// {"myScope2":{"myCollectionC":{}},"myScope1":{"myCollectionB":{},"myCollectionA":{}},"_default":{"_default":{}}}
 | 
						|
 | 
						|
// Structures for parsing collections manifest.
 | 
						|
// The map key is the name of the scope.
 | 
						|
// Example data:
 | 
						|
// {"uid":"b","scopes":[
 | 
						|
//    {"name":"_default","uid":"0","collections":[
 | 
						|
//       {"name":"_default","uid":"0"}]},
 | 
						|
//    {"name":"myScope1","uid":"8","collections":[
 | 
						|
//       {"name":"myCollectionB","uid":"c"},
 | 
						|
//       {"name":"myCollectionA","uid":"b"}]},
 | 
						|
//    {"name":"myScope2","uid":"9","collections":[
 | 
						|
//       {"name":"myCollectionC","uid":"d"}]}]}
 | 
						|
type InputManifest struct {
 | 
						|
	Uid    string
 | 
						|
	Scopes []InputScope
 | 
						|
}
 | 
						|
type InputScope struct {
 | 
						|
	Name        string
 | 
						|
	Uid         string
 | 
						|
	Collections []InputCollection
 | 
						|
}
 | 
						|
type InputCollection struct {
 | 
						|
	Name string
 | 
						|
	Uid  string
 | 
						|
}
 | 
						|
 | 
						|
// Structures for storing collections information.
 | 
						|
type Manifest struct {
 | 
						|
	Uid    uint64
 | 
						|
	Scopes map[string]*Scope // map by name
 | 
						|
}
 | 
						|
type Scope struct {
 | 
						|
	Name        string
 | 
						|
	Uid         uint64
 | 
						|
	Collections map[string]*Collection // map by name
 | 
						|
}
 | 
						|
type Collection struct {
 | 
						|
	Name string
 | 
						|
	Uid  uint64
 | 
						|
}
 | 
						|
 | 
						|
var _EMPTY_MANIFEST *Manifest = &Manifest{Uid: 0, Scopes: map[string]*Scope{}}
 | 
						|
 | 
						|
func parseCollectionsManifest(res *gomemcached.MCResponse) (*Manifest, error) {
 | 
						|
	if !EnableCollections {
 | 
						|
		return _EMPTY_MANIFEST, nil
 | 
						|
	}
 | 
						|
 | 
						|
	var im InputManifest
 | 
						|
	err := json.Unmarshal(res.Body, &im)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	uid, err := strconv.ParseUint(im.Uid, 16, 64)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	mani := &Manifest{Uid: uid, Scopes: make(map[string]*Scope, len(im.Scopes))}
 | 
						|
	for _, iscope := range im.Scopes {
 | 
						|
		scope_uid, err := strconv.ParseUint(iscope.Uid, 16, 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		scope := &Scope{Uid: scope_uid, Name: iscope.Name, Collections: make(map[string]*Collection, len(iscope.Collections))}
 | 
						|
		mani.Scopes[iscope.Name] = scope
 | 
						|
		for _, icoll := range iscope.Collections {
 | 
						|
			coll_uid, err := strconv.ParseUint(icoll.Uid, 16, 64)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
			coll := &Collection{Uid: coll_uid, Name: icoll.Name}
 | 
						|
			scope.Collections[icoll.Name] = coll
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return mani, nil
 | 
						|
}
 | 
						|
 | 
						|
// This function assumes the bucket is locked.
 | 
						|
func (b *Bucket) GetCollectionsManifest() (*Manifest, error) {
 | 
						|
	// Collections not used?
 | 
						|
	if !EnableCollections {
 | 
						|
		return nil, fmt.Errorf("Collections not enabled.")
 | 
						|
	}
 | 
						|
 | 
						|
	b.RLock()
 | 
						|
	pools := b.getConnPools(true /* already locked */)
 | 
						|
	pool := pools[0] // Any pool will do, so use the first one.
 | 
						|
	b.RUnlock()
 | 
						|
	client, err := pool.Get()
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("Unable to get connection to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
 | 
						|
	}
 | 
						|
 | 
						|
	// We need to select the bucket before GetCollectionsManifest()
 | 
						|
	// will work. This is sometimes done at startup (see defaultMkConn())
 | 
						|
	// but not always, depending on the auth type.
 | 
						|
	// Doing this is safe because we collect the the connections
 | 
						|
	// by bucket, so the bucket being selected will never change.
 | 
						|
	_, err = client.SelectBucket(b.Name)
 | 
						|
	if err != nil {
 | 
						|
		pool.Return(client)
 | 
						|
		return nil, fmt.Errorf("Unable to select bucket %s: %v. No collections access to bucket %s.", err, b.Name, b.Name)
 | 
						|
	}
 | 
						|
 | 
						|
	res, err := client.GetCollectionsManifest()
 | 
						|
	if err != nil {
 | 
						|
		pool.Return(client)
 | 
						|
		return nil, fmt.Errorf("Unable to retrieve collections manifest: %v. No collections access to bucket %s.", err, b.Name)
 | 
						|
	}
 | 
						|
	mani, err := parseCollectionsManifest(res)
 | 
						|
	if err != nil {
 | 
						|
		pool.Return(client)
 | 
						|
		return nil, fmt.Errorf("Unable to parse collections manifest: %v. No collections access to bucket %s.", err, b.Name)
 | 
						|
	}
 | 
						|
 | 
						|
	pool.Return(client)
 | 
						|
	return mani, nil
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) RefreshFully() error {
 | 
						|
	return b.refresh(false)
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) Refresh() error {
 | 
						|
	return b.refresh(true)
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) refresh(preserveConnections bool) error {
 | 
						|
	b.RLock()
 | 
						|
	pool := b.pool
 | 
						|
	uri := b.URI
 | 
						|
	client := pool.client
 | 
						|
	b.RUnlock()
 | 
						|
	tlsConfig := client.tlsConfig
 | 
						|
 | 
						|
	var poolServices PoolServices
 | 
						|
	var err error
 | 
						|
	if tlsConfig != nil {
 | 
						|
		poolServices, err = client.GetPoolServices("default")
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	tmpb := &Bucket{}
 | 
						|
	err = pool.client.parseURLResponse(uri, tmpb)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	pools := b.getConnPools(false /* bucket not already locked */)
 | 
						|
 | 
						|
	// We need this lock to ensure that bucket refreshes happening because
 | 
						|
	// of NMVb errors received during bulkGet do not end up over-writing
 | 
						|
	// pool.inUse.
 | 
						|
	b.Lock()
 | 
						|
 | 
						|
	for _, pool := range pools {
 | 
						|
		if pool != nil {
 | 
						|
			pool.inUse = false
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
 | 
						|
	for i := range newcps {
 | 
						|
 | 
						|
		if preserveConnections {
 | 
						|
			pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */)
 | 
						|
			if pool != nil && pool.inUse == false {
 | 
						|
				// if the hostname and index is unchanged then reuse this pool
 | 
						|
				newcps[i] = pool
 | 
						|
				pool.inUse = true
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		hostport := tmpb.VBSMJson.ServerList[i]
 | 
						|
		if tlsConfig != nil {
 | 
						|
			hostport, err = MapKVtoSSL(hostport, &poolServices)
 | 
						|
			if err != nil {
 | 
						|
				b.Unlock()
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if b.ah != nil {
 | 
						|
			newcps[i] = newConnectionPool(hostport,
 | 
						|
				b.ah, AsynchronousCloser, PoolSize, PoolOverflow, tlsConfig, b.Name)
 | 
						|
 | 
						|
		} else {
 | 
						|
			newcps[i] = newConnectionPool(hostport,
 | 
						|
				b.authHandler(true /* bucket already locked */),
 | 
						|
				AsynchronousCloser, PoolSize, PoolOverflow, tlsConfig, b.Name)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	b.replaceConnPools2(newcps, true /* bucket already locked */)
 | 
						|
	tmpb.ah = b.ah
 | 
						|
	b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
 | 
						|
	b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
 | 
						|
 | 
						|
	b.Unlock()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (p *Pool) refresh() (err error) {
 | 
						|
	p.BucketMap = make(map[string]*Bucket)
 | 
						|
 | 
						|
	buckets := []Bucket{}
 | 
						|
	err = p.client.parseURLResponse(p.BucketURL["uri"], &buckets)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	for i, _ := range buckets {
 | 
						|
		b := new(Bucket)
 | 
						|
		*b = buckets[i]
 | 
						|
		b.pool = p
 | 
						|
		b.nodeList = unsafe.Pointer(&b.NodesJSON)
 | 
						|
 | 
						|
		// MB-33185 this is merely defensive, just in case
 | 
						|
		// refresh() gets called on a perfectly node pool
 | 
						|
		ob, ok := p.BucketMap[b.Name]
 | 
						|
		if ok && ob.connPools != nil {
 | 
						|
			ob.Close()
 | 
						|
		}
 | 
						|
		b.replaceConnPools(make([]*connectionPool, len(b.VBSMJson.ServerList)))
 | 
						|
		p.BucketMap[b.Name] = b
 | 
						|
		runtime.SetFinalizer(b, bucketFinalizer)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// GetPool gets a pool from within the couchbase cluster (usually
 | 
						|
// "default").
 | 
						|
func (c *Client) GetPool(name string) (p Pool, err error) {
 | 
						|
	var poolURI string
 | 
						|
 | 
						|
	for _, p := range c.Info.Pools {
 | 
						|
		if p.Name == name {
 | 
						|
			poolURI = p.URI
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if poolURI == "" {
 | 
						|
		return p, errors.New("No pool named " + name)
 | 
						|
	}
 | 
						|
 | 
						|
	err = c.parseURLResponse(poolURI, &p)
 | 
						|
 | 
						|
	p.client = c
 | 
						|
 | 
						|
	err = p.refresh()
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// GetPoolServices returns all the bucket-independent services in a pool.
 | 
						|
// (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV)
 | 
						|
func (c *Client) GetPoolServices(name string) (ps PoolServices, err error) {
 | 
						|
	var poolName string
 | 
						|
	for _, p := range c.Info.Pools {
 | 
						|
		if p.Name == name {
 | 
						|
			poolName = p.Name
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if poolName == "" {
 | 
						|
		return ps, errors.New("No pool named " + name)
 | 
						|
	}
 | 
						|
 | 
						|
	poolURI := "/pools/" + poolName + "/nodeServices"
 | 
						|
	err = c.parseURLResponse(poolURI, &ps)
 | 
						|
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) GetPoolServices(name string) (*PoolServices, error) {
 | 
						|
	b.RLock()
 | 
						|
	pool := b.pool
 | 
						|
	b.RUnlock()
 | 
						|
 | 
						|
	ps, err := pool.client.GetPoolServices(name)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &ps, nil
 | 
						|
}
 | 
						|
 | 
						|
// Close marks this bucket as no longer needed, closing connections it
 | 
						|
// may have open.
 | 
						|
func (b *Bucket) Close() {
 | 
						|
	b.Lock()
 | 
						|
	defer b.Unlock()
 | 
						|
	if b.connPools != nil {
 | 
						|
		for _, c := range b.getConnPools(true /* already locked */) {
 | 
						|
			if c != nil {
 | 
						|
				c.Close()
 | 
						|
			}
 | 
						|
		}
 | 
						|
		b.connPools = nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func bucketFinalizer(b *Bucket) {
 | 
						|
	if b.connPools != nil {
 | 
						|
		if !b.closed {
 | 
						|
			logging.Warnf("Finalizing a bucket with active connections.")
 | 
						|
		}
 | 
						|
 | 
						|
		// MB-33185 do not leak connection pools
 | 
						|
		b.Close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// GetBucket gets a bucket from within this pool.
 | 
						|
func (p *Pool) GetBucket(name string) (*Bucket, error) {
 | 
						|
	rv, ok := p.BucketMap[name]
 | 
						|
	if !ok {
 | 
						|
		return nil, &BucketNotFoundError{bucket: name}
 | 
						|
	}
 | 
						|
	err := rv.Refresh()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return rv, nil
 | 
						|
}
 | 
						|
 | 
						|
// GetBucket gets a bucket from within this pool.
 | 
						|
func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error) {
 | 
						|
	rv, ok := p.BucketMap[bucket]
 | 
						|
	if !ok {
 | 
						|
		return nil, &BucketNotFoundError{bucket: bucket}
 | 
						|
	}
 | 
						|
	rv.ah = newBucketAuth(username, password, bucket)
 | 
						|
	err := rv.Refresh()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return rv, nil
 | 
						|
}
 | 
						|
 | 
						|
// GetPool gets the pool to which this bucket belongs.
 | 
						|
func (b *Bucket) GetPool() *Pool {
 | 
						|
	b.RLock()
 | 
						|
	defer b.RUnlock()
 | 
						|
	ret := b.pool
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
// GetClient gets the client from which we got this pool.
 | 
						|
func (p *Pool) GetClient() *Client {
 | 
						|
	return p.client
 | 
						|
}
 | 
						|
 | 
						|
// Release bucket connections when the pool is no longer in use
 | 
						|
func (p *Pool) Close() {
 | 
						|
	// fine to loop through the buckets unlocked
 | 
						|
	// locking happens at the bucket level
 | 
						|
	for b, _ := range p.BucketMap {
 | 
						|
 | 
						|
		// MB-33208 defer closing connection pools until the bucket is no longer used
 | 
						|
		bucket := p.BucketMap[b]
 | 
						|
		bucket.Lock()
 | 
						|
		bucket.closed = true
 | 
						|
		bucket.Unlock()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// GetBucket is a convenience function for getting a named bucket from
 | 
						|
// a URL
 | 
						|
func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error) {
 | 
						|
	var err error
 | 
						|
	client, err := Connect(endpoint)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	pool, err := client.GetPool(poolname)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return pool.GetBucket(bucketname)
 | 
						|
}
 | 
						|
 | 
						|
// ConnectWithAuthAndGetBucket is a convenience function for
 | 
						|
// getting a named bucket from a given URL and an auth callback
 | 
						|
func ConnectWithAuthAndGetBucket(endpoint, poolname, bucketname string,
 | 
						|
	ah AuthHandler) (*Bucket, error) {
 | 
						|
	client, err := ConnectWithAuth(endpoint, ah)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	pool, err := client.GetPool(poolname)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return pool.GetBucket(bucketname)
 | 
						|
}
 |