From 1aa0a616e8f00c4356f12c44c72f3282162a46da Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 20 Jan 2025 22:18:32 -0800 Subject: [PATCH] Working KV based sorting method --- engine/core/processors_sort.go | 127 ++++------------ engine/logic/kv_sorter.go | 64 --------- engine/logic/pebble_sort.go | 135 ------------------ engine/logic/sorter.go | 13 ++ .../{slice_sort.go => sorter_compare.go} | 0 engine/logic/sorter_kv.go | 73 ++++++++++ engine/logic/sorter_mem.go | 36 +++++ engine/manager.go | 15 +- gdbi/interface.go | 8 +- 9 files changed, 151 insertions(+), 320 deletions(-) delete mode 100644 engine/logic/kv_sorter.go delete mode 100644 engine/logic/pebble_sort.go create mode 100644 engine/logic/sorter.go rename engine/logic/{slice_sort.go => sorter_compare.go} (100%) create mode 100644 engine/logic/sorter_kv.go create mode 100644 engine/logic/sorter_mem.go diff --git a/engine/core/processors_sort.go b/engine/core/processors_sort.go index a52b2704..7143476c 100644 --- a/engine/core/processors_sort.go +++ b/engine/core/processors_sort.go @@ -2,9 +2,9 @@ package core import ( "context" - "reflect" - "slices" + "encoding/json" + "github.com/bmeg/grip/engine/logic" "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/log" @@ -15,108 +15,28 @@ type Sort struct { sortFields []*gripql.SortField } -// compareAny compares two variables of any type. -func compareAny(a, b any) int { - - if a == nil && b != nil { - return -1 - } else if a != nil && b == nil { - return 1 - } else if a == nil && b == nil { - return 0 - } - - // Get the types of the variables. - ta := reflect.TypeOf(a) - tb := reflect.TypeOf(b) - - // If the types are not the same, return a comparison based on type names. - if ta != tb { - return int(ta.Kind()) - int(tb.Kind()) - } - - // Compare values based on their types. - switch ta.Kind() { - case reflect.Int: - // Compare integer values. - return compareInts(a.(int), b.(int)) - case reflect.Float64: - // Compare float values. - return compareFloats(a.(float64), b.(float64)) - case reflect.String: - // Compare string values. - return compareStrings(a.(string), b.(string)) - case reflect.Bool: - // Compare boolean values. - return compareBooleans(a.(bool), b.(bool)) - case reflect.Struct: - // Optionally handle structs here. - // For now, just comparing based on memory address. - return comparePointers(a, b) - default: - // For unsupported types, we just use pointers for comparison. - log.Warningf("Unsupported types: %s %s", ta, tb) - return comparePointers(a, b) - } -} - -// compareInts compares two integers. -func compareInts(a, b int) int { - if a < b { - return -1 - } else if a > b { - return 1 - } - return 0 -} - -// compareFloats compares two float64 values. -func compareFloats(a, b float64) int { - if a < b { - return -1 - } else if a > b { - return 1 - } - return 0 -} - -// compareStrings compares two string values. -func compareStrings(a, b string) int { - if a < b { - return -1 - } else if a > b { - return 1 - } - return 0 -} - -// compareBooleans compares two boolean values. -func compareBooleans(a, b bool) int { - if !a && b { - return -1 - } else if a && !b { - return 1 +// FromBytes implements logic.SortConf. +func (s *Sort) FromBytes(v []byte) gdbi.Traveler { + newTraveler := gdbi.BaseTraveler{} + err := json.Unmarshal(v, &newTraveler) + if err != nil { + log.Errorf("sort error: %s", err) } - return 0 + return &newTraveler } -// comparePointers compares two pointers. -func comparePointers(a, b interface{}) int { - ptrA := reflect.ValueOf(a).Pointer() - ptrB := reflect.ValueOf(b).Pointer() - if ptrA < ptrB { - return -1 - } else if ptrA > ptrB { - return 1 - } - return 0 +// ToBytes implements logic.SortConf. +func (s *Sort) ToBytes(a gdbi.Traveler) []byte { + v, _ := json.Marshal(a) + return v } -func (s *Sort) compare(a, b gdbi.Traveler) int { +func (s *Sort) Compare(a, b gdbi.Traveler) int { for _, f := range s.sortFields { aVal := gdbi.TravelerPathLookup(a, f.Field) bVal := gdbi.TravelerPathLookup(b, f.Field) - x := compareAny(aVal, bVal) + x := logic.CompareAny(aVal, bVal) + //fmt.Printf("Compare %s v %s = %d\n", aVal, bVal, x) if x != 0 { if f.Decending { return -x @@ -133,7 +53,10 @@ func (s *Sort) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, ou signals := []gdbi.Traveler{} - list := []gdbi.Traveler{} + //sorter := logic.NewMemSorter[gdbi.Traveler](s) + + tmpDir := man.GetTmpDir() + sorter := logic.NewKVSorter(tmpDir, s) go func() { defer close(out) @@ -141,19 +64,17 @@ func (s *Sort) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, ou if t.IsSignal() { signals = append(signals, t) } else { - list = append(list, t) //TODO: develop disk backed system + sorter.Add(t) } } - slices.SortFunc(list, s.compare) //emit signals first (?) for _, s := range signals { out <- s } - for _, s := range list { - out <- s + for i := range sorter.Sorted() { + out <- i } - + sorter.Close() }() - return ctx } diff --git a/engine/logic/kv_sorter.go b/engine/logic/kv_sorter.go deleted file mode 100644 index a16845ea..00000000 --- a/engine/logic/kv_sorter.go +++ /dev/null @@ -1,64 +0,0 @@ -package logic - -import ( - "encoding/json" - - "github.com/bmeg/grip/gdbi" - "github.com/cockroachdb/pebble" -) - -const ( - maxWriterBuffer = 3 << 30 -) - -type KVSorter[T any] struct { - kv *pebble.DB - batch *pebble.Batch - curSize int -} - -// Close implements gdbi.Sorter. -func (ks *KVSorter[T]) Close() error { - return ks.kv.Close() -} - -// Add implements gdbi.Sorter. -func (ks *KVSorter[T]) Add(key any, value T) { - if v, err := json.Marshal(value); err == nil { - k := EncodeAny(key) - ks.curSize += len(k) + len(v) - ks.batch.Set(k, v, nil) - if ks.curSize > maxWriterBuffer { - ks.batch.Commit(nil) - ks.batch.Reset() - ks.curSize = 0 - } - } -} - -// Sorted implements gdbi.Sorter. -func (ks *KVSorter[T]) Sorted() chan T { - ks.batch.Commit(nil) - ks.batch.Close() - - out := make(chan T) - go func() { - iter, _ := ks.kv.NewIter(nil) - for iter.First(); iter.Valid(); iter.Next() { - v := iter.Value() - var o T - json.Unmarshal(v, &o) - out <- o - } - defer close(out) - }() - return out -} - -func NewKVSorter[T any](path string) gdbi.Sorter[T] { - c := *pebble.DefaultComparer - c.Compare = CompareEncoded - kv, _ := pebble.Open(path, &pebble.Options{Comparer: &c}) - b := kv.NewBatch() - return &KVSorter[T]{kv, b, 0} -} diff --git a/engine/logic/pebble_sort.go b/engine/logic/pebble_sort.go deleted file mode 100644 index 9bfa29a9..00000000 --- a/engine/logic/pebble_sort.go +++ /dev/null @@ -1,135 +0,0 @@ -package logic - -import ( - "encoding/binary" - "fmt" - "math" -) - -const boolType byte = 0 -const intType byte = 1 -const floatType byte = 2 -const stringType byte = 3 - -func CompareEncoded(a, b []byte) int { - switch a[0] { - case boolType: - switch b[0] { - case boolType: - return CompareBooleans(DecodeBool(a), DecodeBool(b)) - default: - return int(a[0] - b[0]) - } - case intType: - switch b[0] { - case intType: - return CompareInts(DecodeInt(a), DecodeInt(b)) - case floatType: - return CompareAny(DecodeInt(a), DecodeFloat(b)) - default: - return int(a[0] - b[0]) - } - case floatType: - switch b[0] { - case floatType: - return CompareFloats(DecodeFloat(a), DecodeFloat(b)) - case intType: - return CompareAny(DecodeFloat(a), DecodeInt(b)) - default: - return int(a[0] - b[0]) - } - case stringType: - switch b[0] { - case stringType: - return CompareStrings(DecodeString(a), DecodeString(b)) - default: - return int(a[0] - b[0]) - } - default: - return int(a[0] - b[1]) - } - -} - -func EncodeInt(a int64) []byte { - out := make([]byte, 9) - out[0] = intType - binary.LittleEndian.PutUint64(out[1:], uint64(a)) - return out -} - -func EncodeFloat(a float64) []byte { - out := make([]byte, 9) - out[0] = floatType - binary.LittleEndian.PutUint64(out[1:], math.Float64bits(a)) - return out -} - -func EncodeString(a string) []byte { - out := make([]byte, len(a)+1) - out[0] = stringType - copy(out[1:], a) - return out -} - -func EncodeBool(a bool) []byte { - out := []byte{boolType, 0x00} - if a { - out[1] = 0x01 - } - return out -} - -func EncodeAny(k any) []byte { - switch y := k.(type) { - case int: - return EncodeInt(int64(y)) - case int32: - return EncodeInt(int64(y)) - case int16: - return EncodeInt(int64(y)) - case int64: - return EncodeInt(int64(y)) - case float32: - return EncodeFloat(float64(y)) - case float64: - return EncodeFloat(y) - case string: - return EncodeString(y) - case bool: - return EncodeBool(y) - default: - fmt.Printf("Missing type: %s\n", y) - return []byte{} - } -} - -func DecodeAny(k []byte) any { - switch k[0] { - case boolType: - return DecodeBool(k) - case intType: - return DecodeInt(k) - case floatType: - return DecodeFloat(k) - case stringType: - return DecodeString(k) - } - return nil -} - -func DecodeFloat(k []byte) float64 { - return math.Float64frombits(binary.LittleEndian.Uint64(k[1:])) -} - -func DecodeInt(k []byte) int64 { - return int64(binary.LittleEndian.Uint64(k[1:])) -} - -func DecodeBool(k []byte) bool { - return k[1] != 0 -} - -func DecodeString(k []byte) string { - return string(k[1:]) -} diff --git a/engine/logic/sorter.go b/engine/logic/sorter.go new file mode 100644 index 00000000..e95cc294 --- /dev/null +++ b/engine/logic/sorter.go @@ -0,0 +1,13 @@ +package logic + +type SortConf[SortType any] interface { + FromBytes([]byte) SortType + ToBytes(a SortType) []byte // ToBytes used for marshaling with gob + Compare(a, b SortType) int +} + +type Sorter[T any] interface { + Add(T) + Sorted() chan T + Close() error +} diff --git a/engine/logic/slice_sort.go b/engine/logic/sorter_compare.go similarity index 100% rename from engine/logic/slice_sort.go rename to engine/logic/sorter_compare.go diff --git a/engine/logic/sorter_kv.go b/engine/logic/sorter_kv.go new file mode 100644 index 00000000..920b3dec --- /dev/null +++ b/engine/logic/sorter_kv.go @@ -0,0 +1,73 @@ +package logic + +import ( + "github.com/cockroachdb/pebble" +) + +const ( + maxWriterBuffer = 3 << 30 +) + +type kvCompare[T any] struct { + conf SortConf[T] +} + +type KVSorter[T any] struct { + kv *pebble.DB + batch *pebble.Batch + curSize int + compare kvCompare[T] +} + +// Close implements gdbi.Sorter. +func (ks *KVSorter[T]) Close() error { + return ks.kv.Close() +} + +// Add implements gdbi.Sorter. +func (ks *KVSorter[T]) Add(value T) { + k := ks.compare.conf.ToBytes(value) + ks.curSize += len(k) + ks.batch.Set(k, nil, nil) + if ks.curSize > maxWriterBuffer { + ks.batch.Commit(nil) + ks.batch.Reset() + ks.curSize = 0 + } +} + +// Sorted implements gdbi.Sorter. +func (ks *KVSorter[T]) Sorted() chan T { + ks.batch.Commit(nil) + ks.batch.Close() + + out := make(chan T) + go func() { + iter, _ := ks.kv.NewIter(nil) + for iter.First(); iter.Valid(); iter.Next() { + v := iter.Key() + var o T = ks.compare.conf.FromBytes(v) + out <- o + } + defer close(out) + }() + return out +} + +func (ks *kvCompare[T]) compareEncoded(a, b []byte) int { + aT := ks.conf.FromBytes(a) + bT := ks.conf.FromBytes(b) + return ks.conf.Compare(aT, bT) +} + +func NewKVSorter[T any](path string, conf SortConf[T]) Sorter[T] { + comp := kvCompare[T]{conf} + c := *pebble.DefaultComparer + c.Compare = comp.compareEncoded + o := &KVSorter[T]{} + o.kv, _ = pebble.Open(path, &pebble.Options{Comparer: &c}) + o.batch = o.kv.NewBatch() + o.curSize = 0 + o.compare = comp + return o +} diff --git a/engine/logic/sorter_mem.go b/engine/logic/sorter_mem.go new file mode 100644 index 00000000..91edc0d4 --- /dev/null +++ b/engine/logic/sorter_mem.go @@ -0,0 +1,36 @@ +package logic + +import "slices" + +type MemSorter[T any] struct { + data []T + conf SortConf[T] +} + +// Add implements Sorter. +func (m *MemSorter[T]) Add(value T) { + m.data = append(m.data, value) +} + +// Close implements Sorter. +func (m *MemSorter[T]) Close() error { + return nil +} + +// Sorted implements Sorter. +func (m *MemSorter[T]) Sorted() chan T { + slices.SortFunc(m.data, m.conf.Compare) + out := make(chan T) + go func() { + defer close(out) + for _, i := range m.data { + out <- i + } + }() + return out +} + +func NewMemSorter[T any](conf SortConf[T]) Sorter[T] { + o := make([]T, 0, 10) + return &MemSorter[T]{o, conf} +} diff --git a/engine/manager.go b/engine/manager.go index 91db798d..c6c5f963 100644 --- a/engine/manager.go +++ b/engine/manager.go @@ -4,7 +4,6 @@ import ( "io" "os" - "github.com/bmeg/grip/engine/logic" "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/kvi" "github.com/bmeg/grip/kvi/badgerdb" @@ -21,16 +20,10 @@ type manager struct { workDir string } -// GetSorter implements gdbi.Manager. -func (bm *manager) GetSorter() gdbi.Sorter[gdbi.Traveler] { - td, _ := os.MkdirTemp(bm.workDir, "kvTmp") - - s := logic.NewKVSorter[gdbi.Traveler](td) - - bm.kvs = append(bm.kvs, s) - bm.paths = append(bm.paths, td) - - return s +// GetTmpDir implements gdbi.Manager. +func (bm *manager) GetTmpDir() string { + td, _ := os.MkdirTemp(bm.workDir, "tmp") + return td } func (bm *manager) GetTempKV() kvi.KVInterface { diff --git a/gdbi/interface.go b/gdbi/interface.go index 22539de4..4de3e1b6 100644 --- a/gdbi/interface.go +++ b/gdbi/interface.go @@ -187,17 +187,11 @@ type GraphInterface interface { GetInEdgeChannel(ctx context.Context, req chan ElementLookup, load bool, emitNull bool, edgeLabels []string) chan ElementLookup } -type Sorter[T any] interface { - Add(key any, value T) - Sorted() chan T - Close() error -} - // Manager is a resource manager that is passed to processors to allow them ] // to make resource requests type Manager interface { //Get handle to temporary KeyValue store driver GetTempKV() kvi.KVInterface - GetSorter() Sorter[Traveler] + GetTmpDir() string Cleanup() }