Skip to content

Commit

Permalink
Working KV based sorting method
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott committed Jan 21, 2025
1 parent e67ec8a commit 1aa0a61
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 320 deletions.
127 changes: 24 additions & 103 deletions engine/core/processors_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package core

import (
"context"
"reflect"
"slices"
"encoding/json"

"github.com/bmeg/grip/engine/logic"
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/log"
Expand All @@ -15,108 +15,28 @@ type Sort struct {
sortFields []*gripql.SortField
}

// compareAny compares two variables of any type.
func compareAny(a, b any) int {

if a == nil && b != nil {
return -1
} else if a != nil && b == nil {
return 1
} else if a == nil && b == nil {
return 0
}

// Get the types of the variables.
ta := reflect.TypeOf(a)
tb := reflect.TypeOf(b)

// If the types are not the same, return a comparison based on type names.
if ta != tb {
return int(ta.Kind()) - int(tb.Kind())
}

// Compare values based on their types.
switch ta.Kind() {
case reflect.Int:
// Compare integer values.
return compareInts(a.(int), b.(int))
case reflect.Float64:
// Compare float values.
return compareFloats(a.(float64), b.(float64))
case reflect.String:
// Compare string values.
return compareStrings(a.(string), b.(string))
case reflect.Bool:
// Compare boolean values.
return compareBooleans(a.(bool), b.(bool))
case reflect.Struct:
// Optionally handle structs here.
// For now, just comparing based on memory address.
return comparePointers(a, b)
default:
// For unsupported types, we just use pointers for comparison.
log.Warningf("Unsupported types: %s %s", ta, tb)
return comparePointers(a, b)
}
}

// compareInts compares two integers.
func compareInts(a, b int) int {
if a < b {
return -1
} else if a > b {
return 1
}
return 0
}

// compareFloats compares two float64 values.
func compareFloats(a, b float64) int {
if a < b {
return -1
} else if a > b {
return 1
}
return 0
}

// compareStrings compares two string values.
func compareStrings(a, b string) int {
if a < b {
return -1
} else if a > b {
return 1
}
return 0
}

// compareBooleans compares two boolean values.
func compareBooleans(a, b bool) int {
if !a && b {
return -1
} else if a && !b {
return 1
// FromBytes implements logic.SortConf.
func (s *Sort) FromBytes(v []byte) gdbi.Traveler {
newTraveler := gdbi.BaseTraveler{}
err := json.Unmarshal(v, &newTraveler)
if err != nil {
log.Errorf("sort error: %s", err)
}
return 0
return &newTraveler
}

// comparePointers compares two pointers.
func comparePointers(a, b interface{}) int {
ptrA := reflect.ValueOf(a).Pointer()
ptrB := reflect.ValueOf(b).Pointer()
if ptrA < ptrB {
return -1
} else if ptrA > ptrB {
return 1
}
return 0
// ToBytes implements logic.SortConf.
func (s *Sort) ToBytes(a gdbi.Traveler) []byte {
v, _ := json.Marshal(a)
return v
}

func (s *Sort) compare(a, b gdbi.Traveler) int {
func (s *Sort) Compare(a, b gdbi.Traveler) int {
for _, f := range s.sortFields {
aVal := gdbi.TravelerPathLookup(a, f.Field)
bVal := gdbi.TravelerPathLookup(b, f.Field)
x := compareAny(aVal, bVal)
x := logic.CompareAny(aVal, bVal)
//fmt.Printf("Compare %s v %s = %d\n", aVal, bVal, x)
if x != 0 {
if f.Decending {
return -x
Expand All @@ -133,27 +53,28 @@ func (s *Sort) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, ou

signals := []gdbi.Traveler{}

list := []gdbi.Traveler{}
//sorter := logic.NewMemSorter[gdbi.Traveler](s)

tmpDir := man.GetTmpDir()
sorter := logic.NewKVSorter(tmpDir, s)

go func() {
defer close(out)
for t := range in {
if t.IsSignal() {
signals = append(signals, t)
} else {
list = append(list, t) //TODO: develop disk backed system
sorter.Add(t)
}
}
slices.SortFunc(list, s.compare)
//emit signals first (?)
for _, s := range signals {
out <- s
}
for _, s := range list {
out <- s
for i := range sorter.Sorted() {
out <- i
}

sorter.Close()
}()

return ctx
}
64 changes: 0 additions & 64 deletions engine/logic/kv_sorter.go

This file was deleted.

135 changes: 0 additions & 135 deletions engine/logic/pebble_sort.go

This file was deleted.

13 changes: 13 additions & 0 deletions engine/logic/sorter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package logic

type SortConf[SortType any] interface {
FromBytes([]byte) SortType
ToBytes(a SortType) []byte // ToBytes used for marshaling with gob
Compare(a, b SortType) int
}

type Sorter[T any] interface {
Add(T)
Sorted() chan T
Close() error
}
File renamed without changes.
Loading

0 comments on commit 1aa0a61

Please sign in to comment.