Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GOQUE-15: Expose compacting functionality to queues and stack #16

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions prefix_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/util"
)

// prefixDelimiter defines the delimiter used to separate a prefix from an
Expand Down Expand Up @@ -356,6 +357,24 @@ func (pq *PrefixQueue) Drop() error {
return os.RemoveAll(pq.DataDir)
}

// CompactQueue compacts the underlying DB for the given key range
// should only used by users who know what they are doing
func (pq *PrefixQueue) CompactQueue() error {
var err error

// we need to compact the entire range on level db
myrange := util.Range{
Start: nil,
Limit: nil,
}

err = pq.db.CompactRange(myrange)
if err != nil {
return err
}
return err
}

// getQueue gets the unique queue for the given prefix.
func (pq *PrefixQueue) getQueue(prefix []byte) (*queue, error) {
// Try to get the queue gob value.
Expand Down
44 changes: 44 additions & 0 deletions prefix_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package goque

import (
"bytes"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -538,6 +539,49 @@ func TestPrefixQueueOutOfBounds(t *testing.T) {
}
}

func TestCompactPrefixQueue(t *testing.T) {

// open our new queuedb
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
pq, err := OpenPrefixQueue(file)
if err != nil {
t.Error(err)
}
defer pq.Drop()

// create a new struct that we are going to store in the queue
var myStruct struct {
data []byte
}

// give some size to the struct such that we can have many ldb files
// during the enqueueing
prefix := []byte("prefix")
myStruct.data = bytes.Repeat([]byte{10}, 1000*1024)
for i := 0; i < 1000; i++ {
_, err = pq.EnqueueObject(prefix, myStruct.data)
if err != nil {
t.Error(err)
}
}

// dequeue all of our struct
// this will cause that after the ldb files
// to stay still on disk
for i := 0; i < 1000; i++ {
_, err = pq.Dequeue(prefix)
if err != nil {
t.Error(err)
}
}

// Compact db to remove unused ldb files from disk
err = pq.CompactQueue()
if err != nil {
t.Error(err)
}

}
func BenchmarkPrefixQueueEnqueue(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down
18 changes: 18 additions & 0 deletions priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,24 @@ func (pq *PriorityQueue) Drop() error {
return os.RemoveAll(pq.DataDir)
}

// CompactQueue compacts the underlying DB for the given key range
// should only used by users who know what they are doing
func (pq *PriorityQueue) CompactQueue() error {
var err error

// we need to compact the entire range on level db
myrange := util.Range{
Start: nil,
Limit: nil,
}

err = pq.db.CompactRange(myrange)
if err != nil {
return err
}
return err
}

// cmpAsc returns wehther the given priority level is higher than the
// current priority level based on ascending order.
func (pq *PriorityQueue) cmpAsc(priority uint8) bool {
Expand Down
44 changes: 44 additions & 0 deletions priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package goque

import (
"bytes"
"fmt"
"math"
"os"
Expand Down Expand Up @@ -1056,6 +1057,49 @@ func TestPriorityQueueOutOfBounds(t *testing.T) {
}
}

func TestCompactPriorityQueue(t *testing.T) {

// open our new queuedb
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
pq, err := OpenPriorityQueue(file, ASC)
if err != nil {
t.Error(err)
}
defer pq.Drop()

// create a new struct that we are going to store in the queue
var myStruct struct {
data []byte
}

// give some size to the struct such that we can have many ldb files
// during the enqueueing
myStruct.data = bytes.Repeat([]byte{10}, 1000*1024)
for i := 0; i < 1000; i++ {
_, err = pq.EnqueueObject(uint8(i%255), myStruct.data)
if err != nil {
t.Error(err)
}
}

// dequeue all of our struct
// this will cause that after the ldb files
// to stay still on disk
for i := 0; i < 1000; i++ {
_, err = pq.Dequeue()
if err != nil {
t.Error(err)
}
}

// Compact db to remove unused ldb files from disk
err = pq.CompactQueue()
if err != nil {
t.Error(err)
}

}

func BenchmarkPriorityQueueEnqueue(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down
19 changes: 19 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)

// Queue is a standard FIFO (first in, first out) queue.
Expand Down Expand Up @@ -292,6 +293,24 @@ func (q *Queue) Drop() error {
return os.RemoveAll(q.DataDir)
}

// CompactQueue compacts the underlying DB for the given key range
// should only used by users who know what they are doing
func (q *Queue) CompactQueue() error {
var err error

// we need to compact the entire range on level db
myrange := util.Range{
Start: nil,
Limit: nil,
}

err = q.db.CompactRange(myrange)
if err != nil {
return err
}
return err
}

// getItemByID returns an item, if found, for the given ID.
func (q *Queue) getItemByID(id uint64) (*Item, error) {
// Check if empty or out of bounds.
Expand Down
44 changes: 44 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package goque

import (
"bytes"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -588,6 +589,49 @@ func TestQueueOutOfBounds(t *testing.T) {
}
}

func TestCompactQueue(t *testing.T) {

// open our new queuedb
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
q, err := OpenQueue(file)
if err != nil {
t.Error(err)
}
defer q.Drop()

// create a new struct that we are going to store in the queue
var myStruct struct {
data []byte
}

// give some size to the struct such that we can have many ldb files
// during the enqueueing
myStruct.data = bytes.Repeat([]byte{10}, 1000*1024)
for i := 0; i < 1000; i++ {
_, err = q.EnqueueObject(myStruct.data)
if err != nil {
t.Error(err)
}
}

// dequeue all of our struct
// this will cause that after the ldb files
// to stay still on disk
for i := 0; i < 1000; i++ {
_, err = q.Dequeue()
if err != nil {
t.Error(err)
}
}

// Compact db to remove unused ldb files from disk
err = q.CompactQueue()
if err != nil {
t.Error(err)
}

}

func BenchmarkQueueEnqueue(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down
19 changes: 19 additions & 0 deletions stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)

// Stack is a standard LIFO (last in, first out) stack.
Expand Down Expand Up @@ -292,6 +293,24 @@ func (s *Stack) Drop() error {
return os.RemoveAll(s.DataDir)
}

// CompactStack compacts the underlying DB for the given key range
// should only used by users who know what they are doing
func (s *Stack) CompactStack() error {
var err error

// we need to compact the entire range on level db
myrange := util.Range{
Start: nil,
Limit: nil,
}

err = s.db.CompactRange(myrange)
if err != nil {
return err
}
return err
}

// getItemByID returns an item, if found, for the given ID.
func (s *Stack) getItemByID(id uint64) (*Item, error) {
// Check if empty or out of bounds.
Expand Down
44 changes: 44 additions & 0 deletions stack_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package goque

import (
"bytes"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -588,6 +589,49 @@ func TestStackOutOfBounds(t *testing.T) {
}
}

func TestCompactStack(t *testing.T) {

// open our new queuedb
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
s, err := OpenStack(file)
if err != nil {
t.Error(err)
}
defer s.Drop()

// create a new struct that we are going to store in the queue
var myStruct struct {
data []byte
}

// give some size to the struct such that we can have many ldb files
// during the pushing
myStruct.data = bytes.Repeat([]byte{10}, 1000*1024)
for i := 0; i < 1000; i++ {
_, err = s.PushObject(myStruct.data)
if err != nil {
t.Error(err)
}
}

// pop all of our struct
// this will cause that after the ldb files
// to stay still on disk
for i := 0; i < 1000; i++ {
_, err = s.Pop()
if err != nil {
t.Error(err)
}
}

// Compact db to remove unused ldb files from disk
err = s.CompactStack()
if err != nil {
t.Error(err)
}

}

func BenchmarkStackPush(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down