-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathbyte_buffer.go
398 lines (345 loc) · 9.81 KB
/
byte_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
package sonic
import (
"io"
"github.com/talostrading/sonic/sonicerrors"
)
// ByteBuffer provides operations that make it easier to handle byte slices in
// networking code.
//
// A ByteBuffer has 3 areas. In order:
// - save area: [0, si)
// - read area: [si, ri)
// - write area: [ri, wi)
//
// A usual workflow is as follows:
// - Bytes are written to the write area. These bytes cannot be read yet.
// - Bytes from the write area are made available for reading in the read area,
// by calling Commit.
// - Bytes from the read area can be either Saved or Consumed. If Saved, the
// bytes are kept in the save area. If Consumed, the bytes' lifetime ends,
// they are automatically discarded. Saved bytes must be discarded later.
//
// Invariants:
// - 0 <= si <= ri <= wi <= min(len(data), cap(b.data)) <= cap(b.data)
// - everytime wi changes, b.data should grow/shrink accordingly:
// - b.data = b.data[:b.wi]
type ByteBuffer struct {
si int // End index of the save area, always smaller or equal to ri.
ri int // End index of the read area, always smaller or equal to wi.
wi int // End index of the write area.
oneByte [1]byte
data []byte
}
var (
_ io.Reader = &ByteBuffer{}
_ io.ByteReader = &ByteBuffer{}
_ io.ByteScanner = &ByteBuffer{}
_ io.Writer = &ByteBuffer{}
_ io.ByteWriter = &ByteBuffer{}
_ io.ReaderFrom = &ByteBuffer{}
_ AsyncReaderFrom = &ByteBuffer{}
_ io.WriterTo = &ByteBuffer{}
_ AsyncWriterTo = &ByteBuffer{}
)
func NewByteBuffer() *ByteBuffer {
b := &ByteBuffer{
data: make([]byte, 0, 512),
}
return b
}
// Reserve capacity for at least `n` more bytes to be written
// into the ByteBuffer's write area.
//
// This call grows the write area by at least `n` bytes. This might allocate.
func (b *ByteBuffer) Reserve(n int) {
existing := cap(b.data) - b.wi
if need := n - existing; need > 0 {
b.data = b.data[:cap(b.data)]
b.data = append(b.data, make([]byte, need)...)
}
b.data = b.data[:b.wi]
}
// Reserved returns the number of bytes that can be written
// in the write area of the buffer.
func (b *ByteBuffer) Reserved() int {
return cap(b.data) - b.wi
}
// Commit moves `n` bytes from the write area to the read area.
func (b *ByteBuffer) Commit(n int) {
if n <= 0 {
return
}
b.ri += n
if b.ri > b.wi {
b.ri = b.wi
}
}
// Prefault the buffer, forcing physical memory allocation.
//
// NOTE: this should be used sparingly. Even though an array is contiguous in
// the process' virtual memory map, it is probably fragmented in main memory.
// Iterating over the array will cause a bunch of page faults, thus triggering
// virtual to physical memory mapping. This means that if you Reserve 1GB
// initially, you will get nothing allocated. But if you Prefault after Reserve,
// you will get the entire 1GB allocated which is maybe not what you want in a
// resourced constrained application.
func (b *ByteBuffer) Prefault() {
slice := b.data[:cap(b.data)]
for i := range slice {
slice[i] = 0
}
}
// Data returns the bytes in the read area.
func (b *ByteBuffer) Data() []byte {
return b.data[b.si:b.ri]
}
// SaveLen returns the length of the save area.
func (b *ByteBuffer) SaveLen() int {
return len(b.data[0:b.si])
}
// ReadLen returns the length of the read area.
func (b *ByteBuffer) ReadLen() int {
return len(b.data[b.si:b.ri])
}
// WriteLen returns the length of the write area.
func (b *ByteBuffer) WriteLen() int {
return len(b.data[b.ri:b.wi])
}
// Len returns the length of the underlying byte slice.
func (b *ByteBuffer) Len() int {
return len(b.data)
}
// Cap returns the length of the underlying byte slice.
func (b *ByteBuffer) Cap() int {
return cap(b.data)
}
// Consume removes the first `n` bytes of the read area. The removed bytes
// cannot be referenced after a call to Consume. If that's desired, use Save.
func (b *ByteBuffer) Consume(n int) {
if n <= 0 {
return
}
if readLen := b.ReadLen(); n > readLen {
n = readLen
}
if n > 0 {
// TODO this can be smarter
copy(b.data[b.si:], b.data[b.si+n:b.wi])
b.ri -= n
b.wi -= n
b.data = b.data[:b.wi]
}
}
// Save n bytes from the read area. Save is like Consume, except that the bytes
// can still be referenced after the read area is updated.
//
// Saved bytes should be discarded at some point with
// Discard(...).
func (b *ByteBuffer) Save(n int) (slot Slot) {
if readLen := b.ReadLen(); n > readLen {
n = readLen
}
if n <= 0 {
return
}
slot.Length = n
slot.Index = b.si
b.si += n
return
}
// Saved bytes.
func (b *ByteBuffer) Saved() []byte {
return b.data[:b.si]
}
// SavedSlot ...
func (b *ByteBuffer) SavedSlot(slot Slot) []byte {
return b.data[slot.Index : slot.Index+slot.Length]
}
// Discard a previously saved slot.
//
// This call reduces the save area by slot.Length. Returns slot.Length.
func (b *ByteBuffer) Discard(slot Slot) (discarded int) {
if slot.Length <= 0 {
return 0
}
copy(b.data[slot.Index:], b.data[slot.Index+slot.Length:b.wi])
b.si -= slot.Length
b.ri -= slot.Length
b.wi -= slot.Length
b.data = b.data[:b.wi]
return slot.Length
}
// DiscardAll saved slots.
//
// The save area's size will be 0 after this call.
func (b *ByteBuffer) DiscardAll() {
b.Discard(Slot{Index: 0, Length: b.SaveLen()})
}
func (b *ByteBuffer) Reset() {
b.si = 0
b.ri = 0
b.wi = 0
b.data = b.data[:0]
}
// Read the bytes from the read area into `dst`. Consume them.
func (b *ByteBuffer) Read(dst []byte) (int, error) {
if len(dst) == 0 {
return 0, nil
}
if b.ri == 0 {
return 0, io.EOF
}
n := copy(dst, b.data[b.si:b.ri])
b.Consume(n)
return n, nil
}
// ReadByte returns and consumes one byte from the read area.
func (b *ByteBuffer) ReadByte() (byte, error) {
_, err := b.Read(b.oneByte[:])
return b.oneByte[0], err
}
// ReadFrom the supplied reader into the write area.
//
// The buffer is not automatically grown to accommodate all data from the reader.
// The responsibility is left to the caller which can reserve enough space
// through Reserve.
func (b *ByteBuffer) ReadFrom(r io.Reader) (int64, error) {
n, err := r.Read(b.data[b.wi:cap(b.data)])
if err == nil {
b.wi += n
b.data = b.data[:b.wi]
}
return int64(n), err
}
// UnreadByte from the write area.
func (b *ByteBuffer) UnreadByte() error {
if b.WriteLen() > 0 {
b.wi -= 1
b.data = b.data[:b.wi]
return nil
}
return io.EOF
}
// AsyncReadFrom the supplied asynchronous reader into the write area.
//
// The buffer is not automatically grown to accommodate all data from the reader.
// The responsibility is left to the caller which can reserve enough space
// through Reserve.
func (b *ByteBuffer) AsyncReadFrom(r AsyncReader, cb AsyncCallback) {
r.AsyncRead(b.data[b.wi:cap(b.data)], func(err error, n int) {
if err == nil {
b.wi += n
b.data = b.data[:b.wi]
}
cb(err, n)
})
}
// Write the supplied slice into the write area. Grow the write area if needed.
func (b *ByteBuffer) Write(bb []byte) (int, error) {
b.data = append(b.data, bb...)
n := len(bb)
b.wi += n
b.data = b.data[:b.wi]
return n, nil
}
// WriteByte into the write area. Grow the write area if needed.
func (b *ByteBuffer) WriteByte(bb byte) error {
b.data = append(b.data, bb)
b.wi += 1
b.data = b.data[:b.wi]
return nil
}
// WriteString into the write area. Grow the write area if needed.
func (b *ByteBuffer) WriteString(s string) (int, error) {
b.data = append(b.data, s...)
n := len(s)
b.wi += n
b.data = b.data[:b.wi]
return n, nil
}
// WriteTo the provided writer bytes from the read area. Consume them if no
// error occurred.
func (b *ByteBuffer) WriteTo(w io.Writer) (int64, error) {
var (
n int
err error
writtenBytes = 0
)
for b.si+writtenBytes < b.ri {
n, err = w.Write(b.data[b.si+writtenBytes : b.ri])
if err != nil {
break
}
writtenBytes += n
}
b.Consume(writtenBytes)
return int64(writtenBytes), err
}
// AsyncWriteTo the provided asynchronous writer bytes from the read area.
// Consume them if no error occurred.
func (b *ByteBuffer) AsyncWriteTo(w AsyncWriter, cb AsyncCallback) {
w.AsyncWriteAll(b.data[b.si:b.ri], func(err error, n int) {
if err == nil {
b.Consume(n)
}
cb(err, n)
})
}
// PrepareRead prepares n bytes to be read from the read area. If less than n
// bytes are available, ErrNeedMore is returned and no bytes are committed to
// the read area, hence made available for reading.
func (b *ByteBuffer) PrepareRead(n int) (err error) {
if need := n - b.ReadLen(); need > 0 {
if b.WriteLen() >= need {
b.Commit(need)
} else {
err = sonicerrors.ErrNeedMore
}
}
return
}
// Claim a byte slice of the write area.
//
// Claim allows callers to write directly into the write area of the buffer.
//
// `fn` implementations should return the number of bytes written into the
// provided byte slice.
//
// Callers have the option to write less than they claim. The amount is returned
// in the callback and the unused bytes will be used in future claims.
func (b *ByteBuffer) Claim(fn func(b []byte) int) {
n := fn(b.data[b.wi:cap(b.data)])
if wi := b.wi + n; n >= 0 && wi <= cap(b.data) {
// wi <= cap(b.data) because the invariant is that b.wi = min(len(b.data), cap(b.data)) after each call
b.wi = wi
b.data = b.data[:b.wi]
}
}
// ClaimFixed claims a fixed byte slice from the write area.
//
// Callers do not have the option to write less than they claim. The write area
// will grow by `n`.
func (b *ByteBuffer) ClaimFixed(n int) (claimed []byte) {
if wi := b.wi + n; n >= 0 && wi <= cap(b.data) {
claimed = b.data[b.wi:wi]
b.wi = wi
b.data = b.data[:b.wi]
}
return
}
// ShrinkBy shrinks the write area by at most `n` bytes.
func (b *ByteBuffer) ShrinkBy(n int) int {
if n <= 0 {
return 0
}
if length := b.WriteLen(); n > length {
n = length
}
b.wi -= n
b.data = b.data[:b.wi]
return n
}
// ShrinkTo shrinks the write to contain min(n, WriteLen()) bytes.
func (b *ByteBuffer) ShrinkTo(n int) (shrunkBy int) {
return b.ShrinkBy(b.WriteLen() - n)
}