-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathtenant.go
224 lines (203 loc) · 5.44 KB
/
tenant.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// Copyright 2023 Sneller, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sneller
import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
"io/fs"
"os"
"runtime/trace"
"slices"
"github.com/SnellerInc/sneller/db"
"github.com/SnellerInc/sneller/fsutil"
"github.com/SnellerInc/sneller/ion/blockfmt"
"github.com/SnellerInc/sneller/plan"
"github.com/SnellerInc/sneller/tenant/dcache"
"github.com/SnellerInc/sneller/vm"
"github.com/dchest/siphash"
"golang.org/x/exp/constraints"
)
var CanVMOpen = false
// CacheLimit defines a limit such that blob
// segments will not be cached if the total scan
// size of a request in bytes exceeds the limit.
var CacheLimit = memTotal / 2
var onebuf [8]byte
func init() {
binary.LittleEndian.PutUint64(onebuf[:], 1)
}
// TenantEnv implements plan.Decoder for use
// with snellerd in tenant mode. It also
// implements plan.Env, though must have the
// embedded FSEnv initialized in order to be
// used as such.
type TenantEnv struct {
*FSEnv
}
type TenantRunner struct {
Events *os.File
Cache *dcache.Cache
}
func (r *TenantRunner) Post() {
if r.Events != nil {
r.Events.Write(onebuf[:])
}
}
func (r *TenantRunner) Run(dst vm.QuerySink, in *plan.Input, ep *plan.ExecParams) error {
// TODO: this should be reimplemented in terms
// of plan.FSRunner
ctx := ep.Context
if ctx == nil {
ctx = context.Background()
}
if !CanVMOpen {
panic("shouldn't have called Run")
}
ctx, task := trace.NewTask(ctx, "run-segments")
defer task.End()
trace.Log(ctx, "query-id", ep.Plan.ID)
segs := make([]dcache.Segment, 0, in.Blocks())
for i := range in.Descs {
in.Descs[i].Blocks.Each(func(off int) {
seg := &tenantSegment{
fs: ep.FS,
ctx: ctx, // inherit current task
desc: in.Descs[i].Descriptor,
block: off,
fields: in.Fields,
}
segs = append(segs, seg)
})
}
if len(segs) == 0 {
return nil
}
var flags dcache.Flag
if CacheLimit > 0 && in.CompressedSize() > CacheLimit {
flags = dcache.FlagNoFill
}
tbl := r.Cache.MultiTable(ctx, segs, flags)
err := tbl.WriteChunks(dst, ep.Parallel)
ep.Stats.Observe(tbl)
return err
}
// tenantSegment implements dcache.Segment
type tenantSegment struct {
fs fs.FS
ctx context.Context
desc blockfmt.Descriptor
block int
fields []string
}
// merge two sorted slices
func merge[T constraints.Ordered](dst, src []T) []T {
if slices.Equal(dst, src) {
return dst
}
var out []T
j := 0
for i := 0; i < len(dst); i++ {
if j >= len(src) {
out = append(out, dst[i:]...)
break
}
if dst[i] == src[j] {
out = append(out, dst[i])
j++
} else if dst[i] < src[j] {
out = append(out, dst[i])
} else {
out = append(out, src[j])
j++
i--
}
}
out = append(out, src[j:]...)
return out
}
func (s *tenantSegment) Merge(other dcache.Segment) {
o := other.(*tenantSegment)
all := s.fields == nil || o.fields == nil
if all {
s.fields = nil
} else {
s.fields = merge(s.fields, o.fields)
}
}
// Size is currently the blob size
func (s *tenantSegment) Size() int64 {
size := int64(0)
start, end := s.desc.Trailer.BlockRange(s.block)
size += end - start
return size
}
// ETag implements dcache.Segment.ETag
func (s *tenantSegment) ETag() string {
// we're hashing the etag+block together
// so that we get an even dispersion of
// bits for the top byte (base64-encoded)
// which we use as the directory for the
// rest of the cache contents
//
// this avoids an issue for ETags that begin
// with deterministic sequences of characters
// (like S3 ETags beginning with '"')
// causing the first level of cache directory
// indirection to become entirely useless
const (
k0 = 0x9f17c3fd5efd3ce4
k1 = 0xdbf1ba5f07eee2c0
)
var buf bytes.Buffer
fmt.Fprintf(&buf, "%s-%d", s.desc.ETag, s.block)
lo, hi := siphash.Hash128(k0, k1, buf.Bytes())
mem := buf.Bytes()[:0]
mem = binary.LittleEndian.AppendUint64(mem, lo)
mem = binary.LittleEndian.AppendUint64(mem, hi)
return base64.URLEncoding.EncodeToString(mem)
}
// Read implements dcache.Segment.Open
func (s *tenantSegment) Open() (io.ReadCloser, error) {
// NOTE: this region only times the time-to-first-byte,
// not the additional latency of actuall reading the bytes
defer trace.StartRegion(s.ctx, "cache-fill-open").End()
start, end := s.desc.Trailer.BlockRange(s.block)
name := s.desc.Path
etag := s.desc.ETag
return fsutil.OpenRange(s.fs, name, etag, start, end-start)
}
func (s *tenantSegment) Ephemeral() bool {
return s.desc.Size < db.DefaultMinMerge
}
func vmMalloc(size int) []byte {
if size > vm.PageSize {
panic("cannot allocate page with size > vm.PageSize")
}
return vm.Malloc()[:size]
}
// Decode implements dcache.Segment.Decode
func (s *tenantSegment) Decode(dst io.Writer, src []byte) error {
defer trace.StartRegion(s.ctx, "decode-segment").End()
var dec blockfmt.Decoder
dec.Malloc = vmMalloc
dec.Free = vm.Free
dec.Fields = s.fields
dec.Set(&s.desc.Trailer)
_, err := dec.CopyBytes(dst, src)
return err
}