mirror of
				https://github.com/go-gitea/gitea
				synced 2025-11-04 05:18:25 +00:00 
			
		
		
		
	* add redis queue * finished indexer redis queue * add redis vendor * fix vet * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md Co-Authored-By: lunny <xiaolunwen@gmail.com> * switch to go mod * Update required changes for new logging func signatures
		
			
				
	
	
		
			659 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
			
		
		
	
	
			659 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
package redis
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"math/rand"
 | 
						|
	"strconv"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/go-redis/redis/internal"
 | 
						|
	"github.com/go-redis/redis/internal/consistenthash"
 | 
						|
	"github.com/go-redis/redis/internal/hashtag"
 | 
						|
	"github.com/go-redis/redis/internal/pool"
 | 
						|
)
 | 
						|
 | 
						|
// Hash is type of hash function used in consistent hash.
 | 
						|
type Hash consistenthash.Hash
 | 
						|
 | 
						|
var errRingShardsDown = errors.New("redis: all ring shards are down")
 | 
						|
 | 
						|
// RingOptions are used to configure a ring client and should be
 | 
						|
// passed to NewRing.
 | 
						|
type RingOptions struct {
 | 
						|
	// Map of name => host:port addresses of ring shards.
 | 
						|
	Addrs map[string]string
 | 
						|
 | 
						|
	// Frequency of PING commands sent to check shards availability.
 | 
						|
	// Shard is considered down after 3 subsequent failed checks.
 | 
						|
	HeartbeatFrequency time.Duration
 | 
						|
 | 
						|
	// Hash function used in consistent hash.
 | 
						|
	// Default is crc32.ChecksumIEEE.
 | 
						|
	Hash Hash
 | 
						|
 | 
						|
	// Number of replicas in consistent hash.
 | 
						|
	// Default is 100 replicas.
 | 
						|
	//
 | 
						|
	// Higher number of replicas will provide less deviation, that is keys will be
 | 
						|
	// distributed to nodes more evenly.
 | 
						|
	//
 | 
						|
	// Following is deviation for common nreplicas:
 | 
						|
	//  --------------------------------------------------------
 | 
						|
	//  | nreplicas | standard error | 99% confidence interval |
 | 
						|
	//  |     10    |     0.3152     |      (0.37, 1.98)       |
 | 
						|
	//  |    100    |     0.0997     |      (0.76, 1.28)       |
 | 
						|
	//  |   1000    |     0.0316     |      (0.92, 1.09)       |
 | 
						|
	//  --------------------------------------------------------
 | 
						|
	//
 | 
						|
	//  See https://arxiv.org/abs/1406.2294 for reference
 | 
						|
	HashReplicas int
 | 
						|
 | 
						|
	// Following options are copied from Options struct.
 | 
						|
 | 
						|
	OnConnect func(*Conn) error
 | 
						|
 | 
						|
	DB       int
 | 
						|
	Password string
 | 
						|
 | 
						|
	MaxRetries      int
 | 
						|
	MinRetryBackoff time.Duration
 | 
						|
	MaxRetryBackoff time.Duration
 | 
						|
 | 
						|
	DialTimeout  time.Duration
 | 
						|
	ReadTimeout  time.Duration
 | 
						|
	WriteTimeout time.Duration
 | 
						|
 | 
						|
	PoolSize           int
 | 
						|
	MinIdleConns       int
 | 
						|
	MaxConnAge         time.Duration
 | 
						|
	PoolTimeout        time.Duration
 | 
						|
	IdleTimeout        time.Duration
 | 
						|
	IdleCheckFrequency time.Duration
 | 
						|
}
 | 
						|
 | 
						|
func (opt *RingOptions) init() {
 | 
						|
	if opt.HeartbeatFrequency == 0 {
 | 
						|
		opt.HeartbeatFrequency = 500 * time.Millisecond
 | 
						|
	}
 | 
						|
 | 
						|
	if opt.HashReplicas == 0 {
 | 
						|
		opt.HashReplicas = 100
 | 
						|
	}
 | 
						|
 | 
						|
	switch opt.MinRetryBackoff {
 | 
						|
	case -1:
 | 
						|
		opt.MinRetryBackoff = 0
 | 
						|
	case 0:
 | 
						|
		opt.MinRetryBackoff = 8 * time.Millisecond
 | 
						|
	}
 | 
						|
	switch opt.MaxRetryBackoff {
 | 
						|
	case -1:
 | 
						|
		opt.MaxRetryBackoff = 0
 | 
						|
	case 0:
 | 
						|
		opt.MaxRetryBackoff = 512 * time.Millisecond
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (opt *RingOptions) clientOptions() *Options {
 | 
						|
	return &Options{
 | 
						|
		OnConnect: opt.OnConnect,
 | 
						|
 | 
						|
		DB:       opt.DB,
 | 
						|
		Password: opt.Password,
 | 
						|
 | 
						|
		DialTimeout:  opt.DialTimeout,
 | 
						|
		ReadTimeout:  opt.ReadTimeout,
 | 
						|
		WriteTimeout: opt.WriteTimeout,
 | 
						|
 | 
						|
		PoolSize:           opt.PoolSize,
 | 
						|
		MinIdleConns:       opt.MinIdleConns,
 | 
						|
		MaxConnAge:         opt.MaxConnAge,
 | 
						|
		PoolTimeout:        opt.PoolTimeout,
 | 
						|
		IdleTimeout:        opt.IdleTimeout,
 | 
						|
		IdleCheckFrequency: opt.IdleCheckFrequency,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
type ringShard struct {
 | 
						|
	Client *Client
 | 
						|
	down   int32
 | 
						|
}
 | 
						|
 | 
						|
func (shard *ringShard) String() string {
 | 
						|
	var state string
 | 
						|
	if shard.IsUp() {
 | 
						|
		state = "up"
 | 
						|
	} else {
 | 
						|
		state = "down"
 | 
						|
	}
 | 
						|
	return fmt.Sprintf("%s is %s", shard.Client, state)
 | 
						|
}
 | 
						|
 | 
						|
func (shard *ringShard) IsDown() bool {
 | 
						|
	const threshold = 3
 | 
						|
	return atomic.LoadInt32(&shard.down) >= threshold
 | 
						|
}
 | 
						|
 | 
						|
func (shard *ringShard) IsUp() bool {
 | 
						|
	return !shard.IsDown()
 | 
						|
}
 | 
						|
 | 
						|
// Vote votes to set shard state and returns true if state was changed.
 | 
						|
func (shard *ringShard) Vote(up bool) bool {
 | 
						|
	if up {
 | 
						|
		changed := shard.IsDown()
 | 
						|
		atomic.StoreInt32(&shard.down, 0)
 | 
						|
		return changed
 | 
						|
	}
 | 
						|
 | 
						|
	if shard.IsDown() {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	atomic.AddInt32(&shard.down, 1)
 | 
						|
	return shard.IsDown()
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
type ringShards struct {
 | 
						|
	opt *RingOptions
 | 
						|
 | 
						|
	mu     sync.RWMutex
 | 
						|
	hash   *consistenthash.Map
 | 
						|
	shards map[string]*ringShard // read only
 | 
						|
	list   []*ringShard          // read only
 | 
						|
	len    int
 | 
						|
	closed bool
 | 
						|
}
 | 
						|
 | 
						|
func newRingShards(opt *RingOptions) *ringShards {
 | 
						|
	return &ringShards{
 | 
						|
		opt: opt,
 | 
						|
 | 
						|
		hash:   newConsistentHash(opt),
 | 
						|
		shards: make(map[string]*ringShard),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *ringShards) Add(name string, cl *Client) {
 | 
						|
	shard := &ringShard{Client: cl}
 | 
						|
	c.hash.Add(name)
 | 
						|
	c.shards[name] = shard
 | 
						|
	c.list = append(c.list, shard)
 | 
						|
}
 | 
						|
 | 
						|
func (c *ringShards) List() []*ringShard {
 | 
						|
	c.mu.RLock()
 | 
						|
	list := c.list
 | 
						|
	c.mu.RUnlock()
 | 
						|
	return list
 | 
						|
}
 | 
						|
 | 
						|
func (c *ringShards) Hash(key string) string {
 | 
						|
	c.mu.RLock()
 | 
						|
	hash := c.hash.Get(key)
 | 
						|
	c.mu.RUnlock()
 | 
						|
	return hash
 | 
						|
}
 | 
						|
 | 
						|
func (c *ringShards) GetByKey(key string) (*ringShard, error) {
 | 
						|
	key = hashtag.Key(key)
 | 
						|
 | 
						|
	c.mu.RLock()
 | 
						|
 | 
						|
	if c.closed {
 | 
						|
		c.mu.RUnlock()
 | 
						|
		return nil, pool.ErrClosed
 | 
						|
	}
 | 
						|
 | 
						|
	hash := c.hash.Get(key)
 | 
						|
	if hash == "" {
 | 
						|
		c.mu.RUnlock()
 | 
						|
		return nil, errRingShardsDown
 | 
						|
	}
 | 
						|
 | 
						|
	shard := c.shards[hash]
 | 
						|
	c.mu.RUnlock()
 | 
						|
 | 
						|
	return shard, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *ringShards) GetByHash(name string) (*ringShard, error) {
 | 
						|
	if name == "" {
 | 
						|
		return c.Random()
 | 
						|
	}
 | 
						|
 | 
						|
	c.mu.RLock()
 | 
						|
	shard := c.shards[name]
 | 
						|
	c.mu.RUnlock()
 | 
						|
	return shard, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *ringShards) Random() (*ringShard, error) {
 | 
						|
	return c.GetByKey(strconv.Itoa(rand.Int()))
 | 
						|
}
 | 
						|
 | 
						|
// heartbeat monitors state of each shard in the ring.
 | 
						|
func (c *ringShards) Heartbeat(frequency time.Duration) {
 | 
						|
	ticker := time.NewTicker(frequency)
 | 
						|
	defer ticker.Stop()
 | 
						|
	for range ticker.C {
 | 
						|
		var rebalance bool
 | 
						|
 | 
						|
		c.mu.RLock()
 | 
						|
 | 
						|
		if c.closed {
 | 
						|
			c.mu.RUnlock()
 | 
						|
			break
 | 
						|
		}
 | 
						|
 | 
						|
		shards := c.list
 | 
						|
		c.mu.RUnlock()
 | 
						|
 | 
						|
		for _, shard := range shards {
 | 
						|
			err := shard.Client.Ping().Err()
 | 
						|
			if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
 | 
						|
				internal.Logf("ring shard state changed: %s", shard)
 | 
						|
				rebalance = true
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if rebalance {
 | 
						|
			c.rebalance()
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// rebalance removes dead shards from the Ring.
 | 
						|
func (c *ringShards) rebalance() {
 | 
						|
	hash := newConsistentHash(c.opt)
 | 
						|
	var shardsNum int
 | 
						|
	for name, shard := range c.shards {
 | 
						|
		if shard.IsUp() {
 | 
						|
			hash.Add(name)
 | 
						|
			shardsNum++
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	c.mu.Lock()
 | 
						|
	c.hash = hash
 | 
						|
	c.len = shardsNum
 | 
						|
	c.mu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (c *ringShards) Len() int {
 | 
						|
	c.mu.RLock()
 | 
						|
	l := c.len
 | 
						|
	c.mu.RUnlock()
 | 
						|
	return l
 | 
						|
}
 | 
						|
 | 
						|
func (c *ringShards) Close() error {
 | 
						|
	c.mu.Lock()
 | 
						|
	defer c.mu.Unlock()
 | 
						|
 | 
						|
	if c.closed {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	c.closed = true
 | 
						|
 | 
						|
	var firstErr error
 | 
						|
	for _, shard := range c.shards {
 | 
						|
		if err := shard.Client.Close(); err != nil && firstErr == nil {
 | 
						|
			firstErr = err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	c.hash = nil
 | 
						|
	c.shards = nil
 | 
						|
	c.list = nil
 | 
						|
 | 
						|
	return firstErr
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
// Ring is a Redis client that uses consistent hashing to distribute
 | 
						|
// keys across multiple Redis servers (shards). It's safe for
 | 
						|
// concurrent use by multiple goroutines.
 | 
						|
//
 | 
						|
// Ring monitors the state of each shard and removes dead shards from
 | 
						|
// the ring. When a shard comes online it is added back to the ring. This
 | 
						|
// gives you maximum availability and partition tolerance, but no
 | 
						|
// consistency between different shards or even clients. Each client
 | 
						|
// uses shards that are available to the client and does not do any
 | 
						|
// coordination when shard state is changed.
 | 
						|
//
 | 
						|
// Ring should be used when you need multiple Redis servers for caching
 | 
						|
// and can tolerate losing data when one of the servers dies.
 | 
						|
// Otherwise you should use Redis Cluster.
 | 
						|
type Ring struct {
 | 
						|
	cmdable
 | 
						|
 | 
						|
	ctx context.Context
 | 
						|
 | 
						|
	opt           *RingOptions
 | 
						|
	shards        *ringShards
 | 
						|
	cmdsInfoCache *cmdsInfoCache
 | 
						|
 | 
						|
	process         func(Cmder) error
 | 
						|
	processPipeline func([]Cmder) error
 | 
						|
}
 | 
						|
 | 
						|
func NewRing(opt *RingOptions) *Ring {
 | 
						|
	opt.init()
 | 
						|
 | 
						|
	ring := &Ring{
 | 
						|
		opt:    opt,
 | 
						|
		shards: newRingShards(opt),
 | 
						|
	}
 | 
						|
	ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
 | 
						|
 | 
						|
	ring.process = ring.defaultProcess
 | 
						|
	ring.processPipeline = ring.defaultProcessPipeline
 | 
						|
	ring.cmdable.setProcessor(ring.Process)
 | 
						|
 | 
						|
	for name, addr := range opt.Addrs {
 | 
						|
		clopt := opt.clientOptions()
 | 
						|
		clopt.Addr = addr
 | 
						|
		ring.shards.Add(name, NewClient(clopt))
 | 
						|
	}
 | 
						|
 | 
						|
	go ring.shards.Heartbeat(opt.HeartbeatFrequency)
 | 
						|
 | 
						|
	return ring
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) Context() context.Context {
 | 
						|
	if c.ctx != nil {
 | 
						|
		return c.ctx
 | 
						|
	}
 | 
						|
	return context.Background()
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) WithContext(ctx context.Context) *Ring {
 | 
						|
	if ctx == nil {
 | 
						|
		panic("nil context")
 | 
						|
	}
 | 
						|
	c2 := c.copy()
 | 
						|
	c2.ctx = ctx
 | 
						|
	return c2
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) copy() *Ring {
 | 
						|
	cp := *c
 | 
						|
	return &cp
 | 
						|
}
 | 
						|
 | 
						|
// Options returns read-only Options that were used to create the client.
 | 
						|
func (c *Ring) Options() *RingOptions {
 | 
						|
	return c.opt
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) retryBackoff(attempt int) time.Duration {
 | 
						|
	return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
 | 
						|
}
 | 
						|
 | 
						|
// PoolStats returns accumulated connection pool stats.
 | 
						|
func (c *Ring) PoolStats() *PoolStats {
 | 
						|
	shards := c.shards.List()
 | 
						|
	var acc PoolStats
 | 
						|
	for _, shard := range shards {
 | 
						|
		s := shard.Client.connPool.Stats()
 | 
						|
		acc.Hits += s.Hits
 | 
						|
		acc.Misses += s.Misses
 | 
						|
		acc.Timeouts += s.Timeouts
 | 
						|
		acc.TotalConns += s.TotalConns
 | 
						|
		acc.IdleConns += s.IdleConns
 | 
						|
	}
 | 
						|
	return &acc
 | 
						|
}
 | 
						|
 | 
						|
// Len returns the current number of shards in the ring.
 | 
						|
func (c *Ring) Len() int {
 | 
						|
	return c.shards.Len()
 | 
						|
}
 | 
						|
 | 
						|
// Subscribe subscribes the client to the specified channels.
 | 
						|
func (c *Ring) Subscribe(channels ...string) *PubSub {
 | 
						|
	if len(channels) == 0 {
 | 
						|
		panic("at least one channel is required")
 | 
						|
	}
 | 
						|
 | 
						|
	shard, err := c.shards.GetByKey(channels[0])
 | 
						|
	if err != nil {
 | 
						|
		// TODO: return PubSub with sticky error
 | 
						|
		panic(err)
 | 
						|
	}
 | 
						|
	return shard.Client.Subscribe(channels...)
 | 
						|
}
 | 
						|
 | 
						|
// PSubscribe subscribes the client to the given patterns.
 | 
						|
func (c *Ring) PSubscribe(channels ...string) *PubSub {
 | 
						|
	if len(channels) == 0 {
 | 
						|
		panic("at least one channel is required")
 | 
						|
	}
 | 
						|
 | 
						|
	shard, err := c.shards.GetByKey(channels[0])
 | 
						|
	if err != nil {
 | 
						|
		// TODO: return PubSub with sticky error
 | 
						|
		panic(err)
 | 
						|
	}
 | 
						|
	return shard.Client.PSubscribe(channels...)
 | 
						|
}
 | 
						|
 | 
						|
// ForEachShard concurrently calls the fn on each live shard in the ring.
 | 
						|
// It returns the first error if any.
 | 
						|
func (c *Ring) ForEachShard(fn func(client *Client) error) error {
 | 
						|
	shards := c.shards.List()
 | 
						|
	var wg sync.WaitGroup
 | 
						|
	errCh := make(chan error, 1)
 | 
						|
	for _, shard := range shards {
 | 
						|
		if shard.IsDown() {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		wg.Add(1)
 | 
						|
		go func(shard *ringShard) {
 | 
						|
			defer wg.Done()
 | 
						|
			err := fn(shard.Client)
 | 
						|
			if err != nil {
 | 
						|
				select {
 | 
						|
				case errCh <- err:
 | 
						|
				default:
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}(shard)
 | 
						|
	}
 | 
						|
	wg.Wait()
 | 
						|
 | 
						|
	select {
 | 
						|
	case err := <-errCh:
 | 
						|
		return err
 | 
						|
	default:
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) {
 | 
						|
	shards := c.shards.List()
 | 
						|
	firstErr := errRingShardsDown
 | 
						|
	for _, shard := range shards {
 | 
						|
		cmdsInfo, err := shard.Client.Command().Result()
 | 
						|
		if err == nil {
 | 
						|
			return cmdsInfo, nil
 | 
						|
		}
 | 
						|
		if firstErr == nil {
 | 
						|
			firstErr = err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil, firstErr
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) cmdInfo(name string) *CommandInfo {
 | 
						|
	cmdsInfo, err := c.cmdsInfoCache.Get()
 | 
						|
	if err != nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	info := cmdsInfo[name]
 | 
						|
	if info == nil {
 | 
						|
		internal.Logf("info for cmd=%s not found", name)
 | 
						|
	}
 | 
						|
	return info
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
 | 
						|
	cmdInfo := c.cmdInfo(cmd.Name())
 | 
						|
	pos := cmdFirstKeyPos(cmd, cmdInfo)
 | 
						|
	if pos == 0 {
 | 
						|
		return c.shards.Random()
 | 
						|
	}
 | 
						|
	firstKey := cmd.stringArg(pos)
 | 
						|
	return c.shards.GetByKey(firstKey)
 | 
						|
}
 | 
						|
 | 
						|
// Do creates a Cmd from the args and processes the cmd.
 | 
						|
func (c *Ring) Do(args ...interface{}) *Cmd {
 | 
						|
	cmd := NewCmd(args...)
 | 
						|
	c.Process(cmd)
 | 
						|
	return cmd
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) WrapProcess(
 | 
						|
	fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
 | 
						|
) {
 | 
						|
	c.process = fn(c.process)
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) Process(cmd Cmder) error {
 | 
						|
	return c.process(cmd)
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) defaultProcess(cmd Cmder) error {
 | 
						|
	for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
 | 
						|
		if attempt > 0 {
 | 
						|
			time.Sleep(c.retryBackoff(attempt))
 | 
						|
		}
 | 
						|
 | 
						|
		shard, err := c.cmdShard(cmd)
 | 
						|
		if err != nil {
 | 
						|
			cmd.setErr(err)
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		err = shard.Client.Process(cmd)
 | 
						|
		if err == nil {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return cmd.Err()
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) Pipeline() Pipeliner {
 | 
						|
	pipe := Pipeline{
 | 
						|
		exec: c.processPipeline,
 | 
						|
	}
 | 
						|
	pipe.cmdable.setProcessor(pipe.Process)
 | 
						|
	return &pipe
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
 | 
						|
	return c.Pipeline().Pipelined(fn)
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) WrapProcessPipeline(
 | 
						|
	fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
 | 
						|
) {
 | 
						|
	c.processPipeline = fn(c.processPipeline)
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
 | 
						|
	cmdsMap := make(map[string][]Cmder)
 | 
						|
	for _, cmd := range cmds {
 | 
						|
		cmdInfo := c.cmdInfo(cmd.Name())
 | 
						|
		hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
 | 
						|
		if hash != "" {
 | 
						|
			hash = c.shards.Hash(hashtag.Key(hash))
 | 
						|
		}
 | 
						|
		cmdsMap[hash] = append(cmdsMap[hash], cmd)
 | 
						|
	}
 | 
						|
 | 
						|
	for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
 | 
						|
		if attempt > 0 {
 | 
						|
			time.Sleep(c.retryBackoff(attempt))
 | 
						|
		}
 | 
						|
 | 
						|
		var mu sync.Mutex
 | 
						|
		var failedCmdsMap map[string][]Cmder
 | 
						|
		var wg sync.WaitGroup
 | 
						|
 | 
						|
		for hash, cmds := range cmdsMap {
 | 
						|
			wg.Add(1)
 | 
						|
			go func(hash string, cmds []Cmder) {
 | 
						|
				defer wg.Done()
 | 
						|
 | 
						|
				shard, err := c.shards.GetByHash(hash)
 | 
						|
				if err != nil {
 | 
						|
					setCmdsErr(cmds, err)
 | 
						|
					return
 | 
						|
				}
 | 
						|
 | 
						|
				cn, err := shard.Client.getConn()
 | 
						|
				if err != nil {
 | 
						|
					setCmdsErr(cmds, err)
 | 
						|
					return
 | 
						|
				}
 | 
						|
 | 
						|
				canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
 | 
						|
				shard.Client.releaseConnStrict(cn, err)
 | 
						|
 | 
						|
				if canRetry && internal.IsRetryableError(err, true) {
 | 
						|
					mu.Lock()
 | 
						|
					if failedCmdsMap == nil {
 | 
						|
						failedCmdsMap = make(map[string][]Cmder)
 | 
						|
					}
 | 
						|
					failedCmdsMap[hash] = cmds
 | 
						|
					mu.Unlock()
 | 
						|
				}
 | 
						|
			}(hash, cmds)
 | 
						|
		}
 | 
						|
 | 
						|
		wg.Wait()
 | 
						|
		if len(failedCmdsMap) == 0 {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		cmdsMap = failedCmdsMap
 | 
						|
	}
 | 
						|
 | 
						|
	return cmdsFirstErr(cmds)
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) TxPipeline() Pipeliner {
 | 
						|
	panic("not implemented")
 | 
						|
}
 | 
						|
 | 
						|
func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
 | 
						|
	panic("not implemented")
 | 
						|
}
 | 
						|
 | 
						|
// Close closes the ring client, releasing any open resources.
 | 
						|
//
 | 
						|
// It is rare to Close a Ring, as the Ring is meant to be long-lived
 | 
						|
// and shared between many goroutines.
 | 
						|
func (c *Ring) Close() error {
 | 
						|
	return c.shards.Close()
 | 
						|
}
 | 
						|
 | 
						|
func newConsistentHash(opt *RingOptions) *consistenthash.Map {
 | 
						|
	return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
 | 
						|
}
 |