2019-02-19 14:39:39 +00:00
// Copyright 2019 Lunny Xiao. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package levelqueue
import (
"bytes"
"encoding/binary"
"sync"
"github.com/syndtr/goleveldb/leveldb"
2020-05-04 06:51:09 +00:00
"github.com/syndtr/goleveldb/leveldb/errors"
2019-02-19 14:39:39 +00:00
)
2020-02-02 23:19:58 +00:00
const (
lowKeyStr = "low"
highKeyStr = "high"
)
2019-02-19 14:39:39 +00:00
// Queue defines a queue struct
type Queue struct {
2020-02-02 23:19:58 +00:00
db * leveldb . DB
highLock sync . Mutex
lowLock sync . Mutex
low int64
high int64
lowKey [ ] byte
highKey [ ] byte
prefix [ ] byte
closeUnderlyingDB bool
2019-02-19 14:39:39 +00:00
}
2020-02-02 23:19:58 +00:00
// Open opens a queue from the db path or creates a
// queue if it doesn't exist.
// The keys will not be prefixed by default
2019-02-19 14:39:39 +00:00
func Open ( dataDir string ) ( * Queue , error ) {
db , err := leveldb . OpenFile ( dataDir , nil )
if err != nil {
2020-05-04 06:51:09 +00:00
if ! errors . IsCorrupted ( err ) {
return nil , err
}
db , err = leveldb . RecoverFile ( dataDir , nil )
if err != nil {
return nil , err
}
2019-02-19 14:39:39 +00:00
}
2020-02-02 23:19:58 +00:00
return NewQueue ( db , [ ] byte { } , true )
}
// NewQueue creates a queue from a db. The keys will be prefixed with prefix
// and at close the db will be closed as per closeUnderlyingDB
func NewQueue ( db * leveldb . DB , prefix [ ] byte , closeUnderlyingDB bool ) ( * Queue , error ) {
var err error
2019-02-19 14:39:39 +00:00
var queue = & Queue {
2020-02-02 23:19:58 +00:00
db : db ,
closeUnderlyingDB : closeUnderlyingDB ,
2019-02-19 14:39:39 +00:00
}
2020-02-02 23:19:58 +00:00
queue . prefix = make ( [ ] byte , len ( prefix ) )
copy ( queue . prefix , prefix )
queue . lowKey = withPrefix ( prefix , [ ] byte ( lowKeyStr ) )
queue . highKey = withPrefix ( prefix , [ ] byte ( highKeyStr ) )
queue . low , err = queue . readID ( queue . lowKey )
2019-02-19 14:39:39 +00:00
if err == leveldb . ErrNotFound {
queue . low = 1
2020-02-02 23:19:58 +00:00
err = db . Put ( queue . lowKey , id2bytes ( 1 ) , nil )
2019-02-19 14:39:39 +00:00
}
if err != nil {
return nil , err
}
2020-02-02 23:19:58 +00:00
queue . high , err = queue . readID ( queue . highKey )
2019-02-19 14:39:39 +00:00
if err == leveldb . ErrNotFound {
2020-02-02 23:19:58 +00:00
err = db . Put ( queue . highKey , id2bytes ( 0 ) , nil )
2019-02-19 14:39:39 +00:00
}
if err != nil {
return nil , err
}
return queue , nil
}
func ( queue * Queue ) readID ( key [ ] byte ) ( int64 , error ) {
bs , err := queue . db . Get ( key , nil )
if err != nil {
return 0 , err
}
return bytes2id ( bs )
}
func ( queue * Queue ) highincrement ( ) ( int64 , error ) {
id := queue . high + 1
queue . high = id
2020-02-02 23:19:58 +00:00
err := queue . db . Put ( queue . highKey , id2bytes ( queue . high ) , nil )
2019-02-19 14:39:39 +00:00
if err != nil {
queue . high = queue . high - 1
return 0 , err
}
return id , nil
}
func ( queue * Queue ) highdecrement ( ) ( int64 , error ) {
queue . high = queue . high - 1
2020-02-02 23:19:58 +00:00
err := queue . db . Put ( queue . highKey , id2bytes ( queue . high ) , nil )
2019-02-19 14:39:39 +00:00
if err != nil {
queue . high = queue . high + 1
return 0 , err
}
return queue . high , nil
}
func ( queue * Queue ) lowincrement ( ) ( int64 , error ) {
queue . low = queue . low + 1
2020-02-02 23:19:58 +00:00
err := queue . db . Put ( queue . lowKey , id2bytes ( queue . low ) , nil )
2019-02-19 14:39:39 +00:00
if err != nil {
queue . low = queue . low - 1
return 0 , err
}
return queue . low , nil
}
func ( queue * Queue ) lowdecrement ( ) ( int64 , error ) {
queue . low = queue . low - 1
2020-02-02 23:19:58 +00:00
err := queue . db . Put ( queue . lowKey , id2bytes ( queue . low ) , nil )
2019-02-19 14:39:39 +00:00
if err != nil {
queue . low = queue . low + 1
return 0 , err
}
return queue . low , nil
}
// Len returns the length of the queue
func ( queue * Queue ) Len ( ) int64 {
queue . lowLock . Lock ( )
queue . highLock . Lock ( )
l := queue . high - queue . low + 1
queue . highLock . Unlock ( )
queue . lowLock . Unlock ( )
return l
}
func id2bytes ( id int64 ) [ ] byte {
var buf = make ( [ ] byte , 8 )
binary . PutVarint ( buf , id )
return buf
}
func bytes2id ( b [ ] byte ) ( int64 , error ) {
return binary . ReadVarint ( bytes . NewReader ( b ) )
}
2020-02-02 23:19:58 +00:00
func withPrefix ( prefix [ ] byte , value [ ] byte ) [ ] byte {
if len ( prefix ) == 0 {
return value
}
prefixed := make ( [ ] byte , len ( prefix ) + 1 + len ( value ) )
copy ( prefixed [ 0 : len ( prefix ) ] , prefix )
prefixed [ len ( prefix ) ] = '-'
copy ( prefixed [ len ( prefix ) + 1 : ] , value )
return prefixed
}
2019-02-19 14:39:39 +00:00
// RPush pushes a data from right of queue
func ( queue * Queue ) RPush ( data [ ] byte ) error {
queue . highLock . Lock ( )
id , err := queue . highincrement ( )
if err != nil {
queue . highLock . Unlock ( )
return err
}
2020-02-02 23:19:58 +00:00
err = queue . db . Put ( withPrefix ( queue . prefix , id2bytes ( id ) ) , data , nil )
2019-02-19 14:39:39 +00:00
queue . highLock . Unlock ( )
return err
}
// LPush pushes a data from left of queue
func ( queue * Queue ) LPush ( data [ ] byte ) error {
2019-11-28 15:37:33 +00:00
queue . lowLock . Lock ( )
2019-02-19 14:39:39 +00:00
id , err := queue . lowdecrement ( )
if err != nil {
2019-11-28 15:37:33 +00:00
queue . lowLock . Unlock ( )
2019-02-19 14:39:39 +00:00
return err
}
2020-02-02 23:19:58 +00:00
err = queue . db . Put ( withPrefix ( queue . prefix , id2bytes ( id ) ) , data , nil )
2019-11-28 15:37:33 +00:00
queue . lowLock . Unlock ( )
2019-02-19 14:39:39 +00:00
return err
}
// RPop pop a data from right of queue
func ( queue * Queue ) RPop ( ) ( [ ] byte , error ) {
queue . highLock . Lock ( )
2019-11-28 15:37:33 +00:00
defer queue . highLock . Unlock ( )
2019-02-19 14:39:39 +00:00
currentID := queue . high
2020-02-02 23:19:58 +00:00
res , err := queue . db . Get ( withPrefix ( queue . prefix , id2bytes ( currentID ) ) , nil )
2019-02-19 14:39:39 +00:00
if err != nil {
if err == leveldb . ErrNotFound {
return nil , ErrNotFound
}
return nil , err
}
_ , err = queue . highdecrement ( )
if err != nil {
return nil , err
}
2020-02-02 23:19:58 +00:00
err = queue . db . Delete ( withPrefix ( queue . prefix , id2bytes ( currentID ) ) , nil )
2019-02-19 14:39:39 +00:00
if err != nil {
return nil , err
}
return res , nil
}
2019-11-28 15:37:33 +00:00
// RHandle receives a user callback function to handle the right element of the queue, if function return nil, then delete the element, otherwise keep the element.
func ( queue * Queue ) RHandle ( h func ( [ ] byte ) error ) error {
queue . highLock . Lock ( )
defer queue . highLock . Unlock ( )
currentID := queue . high
2020-02-02 23:19:58 +00:00
res , err := queue . db . Get ( withPrefix ( queue . prefix , id2bytes ( currentID ) ) , nil )
2019-11-28 15:37:33 +00:00
if err != nil {
if err == leveldb . ErrNotFound {
return ErrNotFound
}
return err
}
if err = h ( res ) ; err != nil {
return err
}
_ , err = queue . highdecrement ( )
if err != nil {
return err
}
2020-02-02 23:19:58 +00:00
return queue . db . Delete ( withPrefix ( queue . prefix , id2bytes ( currentID ) ) , nil )
2019-11-28 15:37:33 +00:00
}
2019-02-19 14:39:39 +00:00
// LPop pop a data from left of queue
func ( queue * Queue ) LPop ( ) ( [ ] byte , error ) {
queue . lowLock . Lock ( )
2019-11-28 15:37:33 +00:00
defer queue . lowLock . Unlock ( )
2019-02-19 14:39:39 +00:00
currentID := queue . low
2020-02-02 23:19:58 +00:00
res , err := queue . db . Get ( withPrefix ( queue . prefix , id2bytes ( currentID ) ) , nil )
2019-02-19 14:39:39 +00:00
if err != nil {
if err == leveldb . ErrNotFound {
return nil , ErrNotFound
}
return nil , err
}
_ , err = queue . lowincrement ( )
if err != nil {
return nil , err
}
2020-02-02 23:19:58 +00:00
err = queue . db . Delete ( withPrefix ( queue . prefix , id2bytes ( currentID ) ) , nil )
2019-02-19 14:39:39 +00:00
if err != nil {
return nil , err
}
return res , nil
}
2019-11-28 15:37:33 +00:00
// LHandle receives a user callback function to handle the left element of the queue, if function return nil, then delete the element, otherwise keep the element.
func ( queue * Queue ) LHandle ( h func ( [ ] byte ) error ) error {
queue . lowLock . Lock ( )
defer queue . lowLock . Unlock ( )
currentID := queue . low
2020-02-02 23:19:58 +00:00
res , err := queue . db . Get ( withPrefix ( queue . prefix , id2bytes ( currentID ) ) , nil )
2019-11-28 15:37:33 +00:00
if err != nil {
if err == leveldb . ErrNotFound {
return ErrNotFound
}
return err
}
if err = h ( res ) ; err != nil {
return err
}
_ , err = queue . lowincrement ( )
if err != nil {
return err
}
2020-02-02 23:19:58 +00:00
return queue . db . Delete ( withPrefix ( queue . prefix , id2bytes ( currentID ) ) , nil )
2019-11-28 15:37:33 +00:00
}
2020-02-02 23:19:58 +00:00
// Close closes the queue (and the underlying db is set to closeUnderlyingDB)
2019-02-19 14:39:39 +00:00
func ( queue * Queue ) Close ( ) error {
2021-07-27 16:59:06 +00:00
queue . highLock . Lock ( )
queue . lowLock . Lock ( )
defer queue . highLock . Unlock ( )
defer queue . lowLock . Unlock ( )
2020-02-02 23:19:58 +00:00
if ! queue . closeUnderlyingDB {
queue . db = nil
return nil
}
2019-02-19 14:39:39 +00:00
err := queue . db . Close ( )
queue . db = nil
return err
}