2020-01-07 11:23:09 +00:00
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package setting
import (
2020-01-08 15:30:58 +01:00
"path/filepath"
2021-10-17 12:43:25 +01:00
"strconv"
2020-01-07 11:23:09 +00:00
"time"
"code.gitea.io/gitea/modules/log"
2021-11-17 20:34:35 +08:00
2021-10-17 12:43:25 +01:00
ini "gopkg.in/ini.v1"
2020-01-07 11:23:09 +00:00
)
// QueueSettings represent the settings for a queue from the ini
type QueueSettings struct {
2020-10-15 22:40:03 +01:00
Name string
2020-01-07 11:23:09 +00:00
DataDir string
2020-10-15 22:40:03 +01:00
QueueLength int ` ini:"LENGTH" `
2020-01-07 11:23:09 +00:00
BatchLength int
ConnectionString string
Type string
QueueName string
2020-02-02 23:19:58 +00:00
SetName string
2020-01-07 11:23:09 +00:00
WrapIfNecessary bool
MaxAttempts int
Timeout time . Duration
Workers int
MaxWorkers int
BlockTimeout time . Duration
BoostTimeout time . Duration
BoostWorkers int
}
// Queue settings
var Queue = QueueSettings { }
// GetQueueSettings returns the queue settings for the appropriately named queue
func GetQueueSettings ( name string ) QueueSettings {
q := QueueSettings { }
sec := Cfg . Section ( "queue." + name )
2020-10-15 22:40:03 +01:00
q . Name = name
2020-01-07 11:23:09 +00:00
// DataDir is not directly inheritable
2021-05-26 03:50:35 +01:00
q . DataDir = filepath . ToSlash ( filepath . Join ( Queue . DataDir , "common" ) )
2020-01-07 11:23:09 +00:00
// QueueName is not directly inheritable either
q . QueueName = name + Queue . QueueName
for _ , key := range sec . Keys ( ) {
switch key . Name ( ) {
case "DATADIR" :
q . DataDir = key . MustString ( q . DataDir )
case "QUEUE_NAME" :
q . QueueName = key . MustString ( q . QueueName )
2020-02-02 23:19:58 +00:00
case "SET_NAME" :
q . SetName = key . MustString ( q . SetName )
2020-01-07 11:23:09 +00:00
}
}
2020-02-02 23:19:58 +00:00
if len ( q . SetName ) == 0 && len ( Queue . SetName ) > 0 {
q . SetName = q . QueueName + Queue . SetName
}
2020-01-08 15:30:58 +01:00
if ! filepath . IsAbs ( q . DataDir ) {
2021-06-16 23:19:20 +01:00
q . DataDir = filepath . ToSlash ( filepath . Join ( AppDataPath , q . DataDir ) )
2020-01-07 11:23:09 +00:00
}
2020-01-29 01:01:06 +00:00
_ , _ = sec . NewKey ( "DATADIR" , q . DataDir )
2020-10-15 22:40:03 +01:00
2020-01-07 11:23:09 +00:00
// The rest are...
2020-10-15 22:40:03 +01:00
q . QueueLength = sec . Key ( "LENGTH" ) . MustInt ( Queue . QueueLength )
2020-01-07 11:23:09 +00:00
q . BatchLength = sec . Key ( "BATCH_LENGTH" ) . MustInt ( Queue . BatchLength )
q . ConnectionString = sec . Key ( "CONN_STR" ) . MustString ( Queue . ConnectionString )
q . Type = sec . Key ( "TYPE" ) . MustString ( Queue . Type )
q . WrapIfNecessary = sec . Key ( "WRAP_IF_NECESSARY" ) . MustBool ( Queue . WrapIfNecessary )
q . MaxAttempts = sec . Key ( "MAX_ATTEMPTS" ) . MustInt ( Queue . MaxAttempts )
q . Timeout = sec . Key ( "TIMEOUT" ) . MustDuration ( Queue . Timeout )
q . Workers = sec . Key ( "WORKERS" ) . MustInt ( Queue . Workers )
q . MaxWorkers = sec . Key ( "MAX_WORKERS" ) . MustInt ( Queue . MaxWorkers )
q . BlockTimeout = sec . Key ( "BLOCK_TIMEOUT" ) . MustDuration ( Queue . BlockTimeout )
q . BoostTimeout = sec . Key ( "BOOST_TIMEOUT" ) . MustDuration ( Queue . BoostTimeout )
q . BoostWorkers = sec . Key ( "BOOST_WORKERS" ) . MustInt ( Queue . BoostWorkers )
return q
}
// NewQueueService sets up the default settings for Queues
// This is exported for tests to be able to use the queue
func NewQueueService ( ) {
sec := Cfg . Section ( "queue" )
2021-05-26 03:50:35 +01:00
Queue . DataDir = filepath . ToSlash ( sec . Key ( "DATADIR" ) . MustString ( "queues/" ) )
2020-01-08 15:30:58 +01:00
if ! filepath . IsAbs ( Queue . DataDir ) {
2021-05-26 03:50:35 +01:00
Queue . DataDir = filepath . ToSlash ( filepath . Join ( AppDataPath , Queue . DataDir ) )
2020-01-07 11:23:09 +00:00
}
2020-10-15 22:40:03 +01:00
Queue . QueueLength = sec . Key ( "LENGTH" ) . MustInt ( 20 )
2020-01-07 11:23:09 +00:00
Queue . BatchLength = sec . Key ( "BATCH_LENGTH" ) . MustInt ( 20 )
2020-10-04 18:12:26 +01:00
Queue . ConnectionString = sec . Key ( "CONN_STR" ) . MustString ( "" )
2021-06-16 23:19:20 +01:00
defaultType := sec . Key ( "TYPE" ) . String ( )
2020-01-29 01:01:06 +00:00
Queue . Type = sec . Key ( "TYPE" ) . MustString ( "persistable-channel" )
2020-01-07 11:23:09 +00:00
Queue . WrapIfNecessary = sec . Key ( "WRAP_IF_NECESSARY" ) . MustBool ( true )
Queue . MaxAttempts = sec . Key ( "MAX_ATTEMPTS" ) . MustInt ( 10 )
Queue . Timeout = sec . Key ( "TIMEOUT" ) . MustDuration ( GracefulHammerTime + 30 * time . Second )
2021-05-24 00:23:55 +01:00
Queue . Workers = sec . Key ( "WORKERS" ) . MustInt ( 0 )
2020-01-07 11:23:09 +00:00
Queue . MaxWorkers = sec . Key ( "MAX_WORKERS" ) . MustInt ( 10 )
Queue . BlockTimeout = sec . Key ( "BLOCK_TIMEOUT" ) . MustDuration ( 1 * time . Second )
Queue . BoostTimeout = sec . Key ( "BOOST_TIMEOUT" ) . MustDuration ( 5 * time . Minute )
2021-05-24 00:23:55 +01:00
Queue . BoostWorkers = sec . Key ( "BOOST_WORKERS" ) . MustInt ( 1 )
2020-01-07 11:23:09 +00:00
Queue . QueueName = sec . Key ( "QUEUE_NAME" ) . MustString ( "_queue" )
2020-02-02 23:19:58 +00:00
Queue . SetName = sec . Key ( "SET_NAME" ) . MustString ( "" )
2020-01-07 11:23:09 +00:00
// Now handle the old issue_indexer configuration
2022-01-20 18:00:38 +01:00
// FIXME: DEPRECATED to be removed in v1.18.0
2020-01-07 11:23:09 +00:00
section := Cfg . Section ( "queue.issue_indexer" )
2021-10-17 12:43:25 +01:00
directlySet := toDirectlySetKeysMap ( section )
if ! directlySet [ "TYPE" ] && defaultType == "" {
2022-01-20 18:00:38 +01:00
switch typ := Cfg . Section ( "indexer" ) . Key ( "ISSUE_INDEXER_QUEUE_TYPE" ) . MustString ( "" ) ; typ {
case "levelqueue" :
2020-01-29 01:01:06 +00:00
_ , _ = section . NewKey ( "TYPE" , "level" )
2022-01-20 18:00:38 +01:00
case "channel" :
2020-01-29 01:01:06 +00:00
_ , _ = section . NewKey ( "TYPE" , "persistable-channel" )
2022-01-20 18:00:38 +01:00
case "redis" :
2020-01-29 01:01:06 +00:00
_ , _ = section . NewKey ( "TYPE" , "redis" )
2021-06-16 23:19:20 +01:00
case "" :
_ , _ = section . NewKey ( "TYPE" , "level" )
2020-01-07 11:23:09 +00:00
default :
2022-01-20 18:00:38 +01:00
log . Fatal ( "Unsupported indexer queue type: %v" , typ )
2020-01-07 11:23:09 +00:00
}
}
2022-01-20 18:00:38 +01:00
if ! directlySet [ "LENGTH" ] {
length := Cfg . Section ( "indexer" ) . Key ( "UPDATE_BUFFER_LEN" ) . MustInt ( 0 )
if length != 0 {
_ , _ = section . NewKey ( "LENGTH" , strconv . Itoa ( length ) )
}
2020-01-07 11:23:09 +00:00
}
2022-01-20 18:00:38 +01:00
if ! directlySet [ "BATCH_LENGTH" ] {
fallback := Cfg . Section ( "indexer" ) . Key ( "ISSUE_INDEXER_QUEUE_BATCH_NUMBER" ) . MustInt ( 0 )
if fallback != 0 {
_ , _ = section . NewKey ( "BATCH_LENGTH" , strconv . Itoa ( fallback ) )
}
2020-01-07 11:23:09 +00:00
}
2022-01-20 18:00:38 +01:00
if ! directlySet [ "DATADIR" ] {
queueDir := filepath . ToSlash ( Cfg . Section ( "indexer" ) . Key ( "ISSUE_INDEXER_QUEUE_DIR" ) . MustString ( "" ) )
if queueDir != "" {
_ , _ = section . NewKey ( "DATADIR" , queueDir )
}
2020-01-07 11:23:09 +00:00
}
2022-01-20 18:00:38 +01:00
if ! directlySet [ "CONN_STR" ] {
connStr := Cfg . Section ( "indexer" ) . Key ( "ISSUE_INDEXER_QUEUE_CONN_STR" ) . MustString ( "" )
if connStr != "" {
_ , _ = section . NewKey ( "CONN_STR" , connStr )
}
2020-01-07 11:23:09 +00:00
}
2020-01-16 17:55:36 +00:00
2022-01-20 18:00:38 +01:00
// FIXME: DEPRECATED to be removed in v1.18.0
// - will need to set default for [queue.*)] LENGTH appropriately though though
2020-01-16 17:55:36 +00:00
// Handle the old mailer configuration
2022-01-20 18:00:38 +01:00
handleOldLengthConfiguration ( "mailer" , "mailer" , "SEND_BUFFER_LEN" , 100 )
2020-02-02 23:19:58 +00:00
// Handle the old test pull requests configuration
// Please note this will be a unique queue
2022-01-20 18:00:38 +01:00
handleOldLengthConfiguration ( "pr_patch_checker" , "repository" , "PULL_REQUEST_QUEUE_LENGTH" , 1000 )
2021-10-17 12:43:25 +01:00
// Handle the old mirror queue configuration
// Please note this will be a unique queue
2022-01-20 18:00:38 +01:00
handleOldLengthConfiguration ( "mirror" , "repository" , "MIRROR_QUEUE_LENGTH" , 1000 )
2021-10-17 12:43:25 +01:00
}
// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
// if that is left unset then we should fallback to the older configuration. (Except where the new length woul be <=0)
2022-01-20 18:00:38 +01:00
func handleOldLengthConfiguration ( queueName , oldSection , oldKey string , defaultValue int ) {
if Cfg . Section ( oldSection ) . HasKey ( oldKey ) {
log . Error ( "Deprecated fallback for %s queue length `[%s]` `%s` present. Use `[queue.%s]` `LENGTH`. This will be removed in v1.18.0" , queueName , queueName , oldSection , oldKey )
}
value := Cfg . Section ( oldSection ) . Key ( oldKey ) . MustInt ( defaultValue )
2021-10-17 12:43:25 +01:00
// Don't override with 0
if value <= 0 {
return
}
section := Cfg . Section ( "queue." + queueName )
directlySet := toDirectlySetKeysMap ( section )
if ! directlySet [ "LENGTH" ] {
_ , _ = section . NewKey ( "LENGTH" , strconv . Itoa ( value ) )
}
}
// toDirectlySetKeysMap returns a bool map of keys directly set by this section
// Note: we cannot use section.HasKey(...) as that will immediately set the Key if a parent section has the Key
// but this section does not.
func toDirectlySetKeysMap ( section * ini . Section ) map [ string ] bool {
sectionMap := map [ string ] bool { }
2020-02-02 23:19:58 +00:00
for _ , key := range section . Keys ( ) {
sectionMap [ key . Name ( ) ] = true
}
2021-10-17 12:43:25 +01:00
return sectionMap
2020-01-07 11:23:09 +00:00
}