diff --git a/errors.go b/errors.go index 7865d8a..ab00865 100644 --- a/errors.go +++ b/errors.go @@ -2,6 +2,8 @@ package goque import ( "errors" + + ldberrors "github.com/syndtr/goleveldb/leveldb/errors" ) var ( @@ -21,3 +23,9 @@ var ( // its underlying database. ErrDBClosed = errors.New("goque: Database is closed") ) + +// IsCorrupted returns a boolean indicating whether the error is indicating +// a corruption. +func IsCorrupted(err error) bool { + return ldberrors.IsCorrupted(err) +} diff --git a/file.go b/file.go index 6a3c0ed..63788a4 100644 --- a/file.go +++ b/file.go @@ -3,6 +3,9 @@ package goque import ( "os" "path/filepath" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" ) // goqueType defines the type of Goque data structure used. @@ -17,6 +20,10 @@ const ( goquePrefixQueue ) +// levelDbOpener is a function type matching both +// leveldb.OpenFile() and leveldb.Recover(). +type levelDbOpener func(string, *opt.Options) (*leveldb.DB, error) + // checkGoqueType checks if the type of Goque data structure // trying to be opened is compatible with the opener type. // diff --git a/prefix_queue.go b/prefix_queue.go index 737c194..ea97a44 100644 --- a/prefix_queue.go +++ b/prefix_queue.go @@ -39,7 +39,21 @@ type PrefixQueue struct { // OpenPrefixQueue opens a prefix queue if one exists at the given directory. // If one does not already exist, a new prefix queue is created. +// If the underlying database is corrupt, an error for which +// IsCorrupted() returns true is returned. func OpenPrefixQueue(dataDir string) (*PrefixQueue, error) { + return openPrefixQueue(dataDir, leveldb.OpenFile) +} + +// RecoverPrefixQueue attempts to recover a corrupt prefix queue. +func RecoverPrefixQueue(dataDir string) (*PrefixQueue, error) { + return openPrefixQueue(dataDir, leveldb.RecoverFile) +} + +// openPrefixQueue opens a prefix queue if one exists at the given directory +// using the specified opener. +// If one does not already exist, a new prefix queue is created. +func openPrefixQueue(dataDir string, open levelDbOpener) (*PrefixQueue, error) { var err error // Create a new Queue. @@ -50,7 +64,7 @@ func OpenPrefixQueue(dataDir string) (*PrefixQueue, error) { } // Open database for the prefix queue. - pq.db, err = leveldb.OpenFile(dataDir, nil) + pq.db, err = open(dataDir, nil) if err != nil { return nil, err } diff --git a/prefix_queue_test.go b/prefix_queue_test.go index d8c250b..da1a0e6 100644 --- a/prefix_queue_test.go +++ b/prefix_queue_test.go @@ -408,6 +408,34 @@ func TestPrefixQueueOutOfBounds(t *testing.T) { } } +func TestPrefixQueueRecover(t *testing.T) { + file := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) + pq, err := OpenPrefixQueue(file) + if err != nil { + t.Error(err) + } + defer pq.Drop() + + _, err = pq.EnqueueString("prefix", "value for item") + if err != nil { + t.Error(err) + } + + if err = pq.Close(); err != nil { + t.Error(err) + } + if err = os.Remove(file + "/MANIFEST-000000"); err != nil { + t.Error(err) + } + + if pq, err = OpenPrefixQueue(file); !IsCorrupted(err) { + t.Errorf("Expected corruption error, got %s", err) + } + if pq, err = RecoverPrefixQueue(file); err != nil { + t.Error(err) + } +} + func BenchmarkPrefixQueueEnqueue(b *testing.B) { // Open test database file := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) diff --git a/priority_queue.go b/priority_queue.go index f2fcca7..833af0c 100644 --- a/priority_queue.go +++ b/priority_queue.go @@ -49,7 +49,22 @@ type PriorityQueue struct { // OpenPriorityQueue opens a priority queue if one exists at the given // directory. If one does not already exist, a new priority queue is // created. +// If the underlying database is corrupt, an error for which +// IsCorrupted() returns true is returned. func OpenPriorityQueue(dataDir string, order order) (*PriorityQueue, error) { + return openPriorityQueue(dataDir, order, leveldb.OpenFile) +} + +// RecoverPriorityQueue attempts to recover a corrupt priority queue. +func RecoverPriorityQueue(dataDir string, order order) (*PriorityQueue, error) { + return openPriorityQueue(dataDir, order, leveldb.RecoverFile) +} + +// openPriorityQueue opens a priority queue if one exists at the given +// directory using the specified opener. If one does not already exist, +// a new priority queue is +// created. +func openPriorityQueue(dataDir string, order order, open levelDbOpener) (*PriorityQueue, error) { var err error // Create a new PriorityQueue. @@ -61,7 +76,7 @@ func OpenPriorityQueue(dataDir string, order order) (*PriorityQueue, error) { } // Open database for the priority queue. - pq.db, err = leveldb.OpenFile(dataDir, nil) + pq.db, err = open(dataDir, nil) if err != nil { return pq, err } diff --git a/priority_queue_test.go b/priority_queue_test.go index 3ae9cd3..40d5e99 100644 --- a/priority_queue_test.go +++ b/priority_queue_test.go @@ -912,6 +912,34 @@ func TestPriorityQueueOutOfBounds(t *testing.T) { } } +func TestPriorityQueueRecover(t *testing.T) { + file := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) + pq, err := OpenPriorityQueue(file, ASC) + if err != nil { + t.Error(err) + } + defer pq.Drop() + + _, err = pq.EnqueueString(0, "value for item") + if err != nil { + t.Error(err) + } + + if err = pq.Close(); err != nil { + t.Error(err) + } + if err = os.Remove(file + "/MANIFEST-000000"); err != nil { + t.Error(err) + } + + if pq, err = OpenPriorityQueue(file, ASC); !IsCorrupted(err) { + t.Errorf("Expected corruption error, got %s", err) + } + if pq, err = RecoverPriorityQueue(file, ASC); err != nil { + t.Error(err) + } +} + func BenchmarkPriorityQueueEnqueue(b *testing.B) { // Open test database file := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) diff --git a/queue.go b/queue.go index 8f82d88..2087228 100644 --- a/queue.go +++ b/queue.go @@ -21,7 +21,21 @@ type Queue struct { // OpenQueue opens a queue if one exists at the given directory. If one // does not already exist, a new queue is created. +// If the underlying database is corrupt, an error for which +// IsCorrupted() returns true is returned. func OpenQueue(dataDir string) (*Queue, error) { + return openQueue(dataDir, leveldb.OpenFile) +} + +// RecoverQueue attempts to recover a corrupt queue. +func RecoverQueue(dataDir string) (*Queue, error) { + return openQueue(dataDir, leveldb.RecoverFile) +} + +// openQueue opens a queue if one exists at the given directory +// using the specified opener. If one +// does not already exist, a new queue is created. +func openQueue(dataDir string, open levelDbOpener) (*Queue, error) { var err error // Create a new Queue. @@ -34,7 +48,7 @@ func OpenQueue(dataDir string) (*Queue, error) { } // Open database for the queue. - q.db, err = leveldb.OpenFile(dataDir, nil) + q.db, err = open(dataDir, nil) if err != nil { return q, err } diff --git a/queue_test.go b/queue_test.go index 0e82a25..be2d531 100644 --- a/queue_test.go +++ b/queue_test.go @@ -458,6 +458,34 @@ func TestQueueOutOfBounds(t *testing.T) { } } +func TestQueueRecover(t *testing.T) { + file := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) + q, err := OpenQueue(file) + if err != nil { + t.Error(err) + } + defer q.Drop() + + _, err = q.EnqueueString("value for item") + if err != nil { + t.Error(err) + } + + if err = q.Close(); err != nil { + t.Error(err) + } + if err = os.Remove(file + "/MANIFEST-000000"); err != nil { + t.Error(err) + } + + if q, err = OpenQueue(file); !IsCorrupted(err) { + t.Errorf("Expected corruption error, got %s", err) + } + if q, err = RecoverQueue(file); err != nil { + t.Error(err) + } +} + func BenchmarkQueueEnqueue(b *testing.B) { // Open test database file := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) diff --git a/stack.go b/stack.go index 0b2b19e..a5d30dd 100644 --- a/stack.go +++ b/stack.go @@ -21,7 +21,21 @@ type Stack struct { // OpenStack opens a stack if one exists at the given directory. If one // does not already exist, a new stack is created. +// If the underlying database is corrupt, an error for which +// IsCorrupted() returns true is returned. func OpenStack(dataDir string) (*Stack, error) { + return openStack(dataDir, leveldb.OpenFile) +} + +// RecoverStack attempts to recover a corrupt stack. +func RecoverStack(dataDir string) (*Stack, error) { + return openStack(dataDir, leveldb.RecoverFile) +} + +// openStack opens a stack if one exists at the given directory +// using the specified opener. If one +// does not already exist, a new stack is created. +func openStack(dataDir string, open levelDbOpener) (*Stack, error) { var err error // Create a new Stack. @@ -34,7 +48,7 @@ func OpenStack(dataDir string) (*Stack, error) { } // Open database for the stack. - s.db, err = leveldb.OpenFile(dataDir, nil) + s.db, err = open(dataDir, nil) if err != nil { return s, err } diff --git a/stack_test.go b/stack_test.go index ddae420..3c53e6c 100644 --- a/stack_test.go +++ b/stack_test.go @@ -458,6 +458,34 @@ func TestStackOutOfBounds(t *testing.T) { } } +func TestStackRecover(t *testing.T) { + file := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) + s, err := OpenStack(file) + if err != nil { + t.Error(err) + } + defer s.Drop() + + _, err = s.PushString("value for item") + if err != nil { + t.Error(err) + } + + if err = s.Close(); err != nil { + t.Error(err) + } + if err = os.Remove(file + "/MANIFEST-000000"); err != nil { + t.Error(err) + } + + if s, err = OpenStack(file); !IsCorrupted(err) { + t.Errorf("Expected corruption error, got %s", err) + } + if s, err = RecoverStack(file); err != nil { + t.Error(err) + } +} + func BenchmarkStackPush(b *testing.B) { // Open test database file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())