Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repair corrupt files #12

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package goque

import (
"errors"

ldberrors "github.com/syndtr/goleveldb/leveldb/errors"
)

var (
Expand All @@ -21,3 +23,9 @@ var (
// its underlying database.
ErrDBClosed = errors.New("goque: Database is closed")
)

// IsCorrupted returns a boolean indicating whether the error is indicating
// a corruption.
func IsCorrupted(err error) bool {
return ldberrors.IsCorrupted(err)
}
7 changes: 7 additions & 0 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package goque
import (
"os"
"path/filepath"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)

// goqueType defines the type of Goque data structure used.
Expand All @@ -17,6 +20,10 @@ const (
goquePrefixQueue
)

// levelDbOpener is a function type matching both
// leveldb.OpenFile() and leveldb.Recover().
type levelDbOpener func(string, *opt.Options) (*leveldb.DB, error)

// checkGoqueType checks if the type of Goque data structure
// trying to be opened is compatible with the opener type.
//
Expand Down
16 changes: 15 additions & 1 deletion prefix_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,21 @@ type PrefixQueue struct {

// OpenPrefixQueue opens a prefix queue if one exists at the given directory.
// If one does not already exist, a new prefix queue is created.
// If the underlying database is corrupt, an error for which
// IsCorrupted() returns true is returned.
func OpenPrefixQueue(dataDir string) (*PrefixQueue, error) {
return openPrefixQueue(dataDir, leveldb.OpenFile)
}

// RecoverPrefixQueue attempts to recover a corrupt prefix queue.
func RecoverPrefixQueue(dataDir string) (*PrefixQueue, error) {
return openPrefixQueue(dataDir, leveldb.RecoverFile)
}

// openPrefixQueue opens a prefix queue if one exists at the given directory
// using the specified opener.
// If one does not already exist, a new prefix queue is created.
func openPrefixQueue(dataDir string, open levelDbOpener) (*PrefixQueue, error) {
var err error

// Create a new Queue.
Expand All @@ -50,7 +64,7 @@ func OpenPrefixQueue(dataDir string) (*PrefixQueue, error) {
}

// Open database for the prefix queue.
pq.db, err = leveldb.OpenFile(dataDir, nil)
pq.db, err = open(dataDir, nil)
if err != nil {
return nil, err
}
Expand Down
28 changes: 28 additions & 0 deletions prefix_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,34 @@ func TestPrefixQueueOutOfBounds(t *testing.T) {
}
}

func TestPrefixQueueRecover(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
pq, err := OpenPrefixQueue(file)
if err != nil {
t.Error(err)
}
defer pq.Drop()

_, err = pq.EnqueueString("prefix", "value for item")
if err != nil {
t.Error(err)
}

if err = pq.Close(); err != nil {
t.Error(err)
}
if err = os.Remove(file + "/MANIFEST-000000"); err != nil {
t.Error(err)
}

if pq, err = OpenPrefixQueue(file); !IsCorrupted(err) {
t.Errorf("Expected corruption error, got %s", err)
}
if pq, err = RecoverPrefixQueue(file); err != nil {
t.Error(err)
}
}

func BenchmarkPrefixQueueEnqueue(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down
17 changes: 16 additions & 1 deletion priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,22 @@ type PriorityQueue struct {
// OpenPriorityQueue opens a priority queue if one exists at the given
// directory. If one does not already exist, a new priority queue is
// created.
// If the underlying database is corrupt, an error for which
// IsCorrupted() returns true is returned.
func OpenPriorityQueue(dataDir string, order order) (*PriorityQueue, error) {
return openPriorityQueue(dataDir, order, leveldb.OpenFile)
}

// RecoverPriorityQueue attempts to recover a corrupt priority queue.
func RecoverPriorityQueue(dataDir string, order order) (*PriorityQueue, error) {
return openPriorityQueue(dataDir, order, leveldb.RecoverFile)
}

// openPriorityQueue opens a priority queue if one exists at the given
// directory using the specified opener. If one does not already exist,
// a new priority queue is
// created.
func openPriorityQueue(dataDir string, order order, open levelDbOpener) (*PriorityQueue, error) {
var err error

// Create a new PriorityQueue.
Expand All @@ -61,7 +76,7 @@ func OpenPriorityQueue(dataDir string, order order) (*PriorityQueue, error) {
}

// Open database for the priority queue.
pq.db, err = leveldb.OpenFile(dataDir, nil)
pq.db, err = open(dataDir, nil)
if err != nil {
return pq, err
}
Expand Down
28 changes: 28 additions & 0 deletions priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,34 @@ func TestPriorityQueueOutOfBounds(t *testing.T) {
}
}

func TestPriorityQueueRecover(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
pq, err := OpenPriorityQueue(file, ASC)
if err != nil {
t.Error(err)
}
defer pq.Drop()

_, err = pq.EnqueueString(0, "value for item")
if err != nil {
t.Error(err)
}

if err = pq.Close(); err != nil {
t.Error(err)
}
if err = os.Remove(file + "/MANIFEST-000000"); err != nil {
t.Error(err)
}

if pq, err = OpenPriorityQueue(file, ASC); !IsCorrupted(err) {
t.Errorf("Expected corruption error, got %s", err)
}
if pq, err = RecoverPriorityQueue(file, ASC); err != nil {
t.Error(err)
}
}

func BenchmarkPriorityQueueEnqueue(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down
16 changes: 15 additions & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,21 @@ type Queue struct {

// OpenQueue opens a queue if one exists at the given directory. If one
// does not already exist, a new queue is created.
// If the underlying database is corrupt, an error for which
// IsCorrupted() returns true is returned.
func OpenQueue(dataDir string) (*Queue, error) {
return openQueue(dataDir, leveldb.OpenFile)
}

// RecoverQueue attempts to recover a corrupt queue.
func RecoverQueue(dataDir string) (*Queue, error) {
return openQueue(dataDir, leveldb.RecoverFile)
}

// openQueue opens a queue if one exists at the given directory
// using the specified opener. If one
// does not already exist, a new queue is created.
func openQueue(dataDir string, open levelDbOpener) (*Queue, error) {
var err error

// Create a new Queue.
Expand All @@ -34,7 +48,7 @@ func OpenQueue(dataDir string) (*Queue, error) {
}

// Open database for the queue.
q.db, err = leveldb.OpenFile(dataDir, nil)
q.db, err = open(dataDir, nil)
if err != nil {
return q, err
}
Expand Down
28 changes: 28 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,34 @@ func TestQueueOutOfBounds(t *testing.T) {
}
}

func TestQueueRecover(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
q, err := OpenQueue(file)
if err != nil {
t.Error(err)
}
defer q.Drop()

_, err = q.EnqueueString("value for item")
if err != nil {
t.Error(err)
}

if err = q.Close(); err != nil {
t.Error(err)
}
if err = os.Remove(file + "/MANIFEST-000000"); err != nil {
t.Error(err)
}

if q, err = OpenQueue(file); !IsCorrupted(err) {
t.Errorf("Expected corruption error, got %s", err)
}
if q, err = RecoverQueue(file); err != nil {
t.Error(err)
}
}

func BenchmarkQueueEnqueue(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down
16 changes: 15 additions & 1 deletion stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,21 @@ type Stack struct {

// OpenStack opens a stack if one exists at the given directory. If one
// does not already exist, a new stack is created.
// If the underlying database is corrupt, an error for which
// IsCorrupted() returns true is returned.
func OpenStack(dataDir string) (*Stack, error) {
return openStack(dataDir, leveldb.OpenFile)
}

// RecoverStack attempts to recover a corrupt stack.
func RecoverStack(dataDir string) (*Stack, error) {
return openStack(dataDir, leveldb.RecoverFile)
}

// openStack opens a stack if one exists at the given directory
// using the specified opener. If one
// does not already exist, a new stack is created.
func openStack(dataDir string, open levelDbOpener) (*Stack, error) {
var err error

// Create a new Stack.
Expand All @@ -34,7 +48,7 @@ func OpenStack(dataDir string) (*Stack, error) {
}

// Open database for the stack.
s.db, err = leveldb.OpenFile(dataDir, nil)
s.db, err = open(dataDir, nil)
if err != nil {
return s, err
}
Expand Down
28 changes: 28 additions & 0 deletions stack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,34 @@ func TestStackOutOfBounds(t *testing.T) {
}
}

func TestStackRecover(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
s, err := OpenStack(file)
if err != nil {
t.Error(err)
}
defer s.Drop()

_, err = s.PushString("value for item")
if err != nil {
t.Error(err)
}

if err = s.Close(); err != nil {
t.Error(err)
}
if err = os.Remove(file + "/MANIFEST-000000"); err != nil {
t.Error(err)
}

if s, err = OpenStack(file); !IsCorrupted(err) {
t.Errorf("Expected corruption error, got %s", err)
}
if s, err = RecoverStack(file); err != nil {
t.Error(err)
}
}

func BenchmarkStackPush(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down