diff --git a/stream_writer.go b/stream_writer.go index 317342102..5c59f4acc 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -159,6 +159,8 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { } switch kv.Kind { case pb.KV_DATA_KEY: + sw.writeLock.Lock() + defer sw.writeLock.Unlock() y.AssertTrue(len(sw.db.opt.EncryptionKey) > 0) var dk pb.DataKey if err := proto.Unmarshal(kv.Value, &dk); err != nil { @@ -176,6 +178,8 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { } return nil case pb.KV_FILE: + sw.writeLock.Lock() + defer sw.writeLock.Unlock() // All tables should be recieved before any of the keys. if sw.processingKeys { return errors.New("Received pb.KV_FILE after pb.KV_KEY") @@ -209,12 +213,19 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { // Pass. The following code will handle the keys. } + sw.writeLock.Lock() sw.processingKeys = true + if sw.maxVersion < kv.Version { + sw.maxVersion = kv.Version + } if sw.prevLevel == 0 { - // If prevLevel is 0, that means that we have not written anything yet. Equivalently, - // we were virtually writing to the maxLevel+1. + // If prevLevel is 0, that means that we have not written anything yet. + // So, we can write to the maxLevel. newWriter writes to prevLevel - 1, + // so we can set prevLevel to len(levels). sw.prevLevel = len(sw.db.lc.levels) } + sw.writeLock.Unlock() + var meta, userMeta byte if len(kv.Meta) > 0 { meta = kv.Meta[0] @@ -222,6 +233,7 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { if len(kv.UserMeta) > 0 { userMeta = kv.UserMeta[0] } + e := &Entry{ Key: y.KeyWithTs(kv.Key, kv.Version), Value: y.Copy(kv.Value), diff --git a/stream_writer_test.go b/stream_writer_test.go index fc82611f3..da4bde736 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -600,7 +600,6 @@ func TestStreamWriterWithLargeValue(t *testing.T) { require.NoError(t, sw.Flush(), "sw.Flush() failed") }) } -<<<<<<< HEAD func TestStreamWriterIncremental(t *testing.T) { addIncremtal := func(t *testing.T, db *DB, keys [][]byte) { @@ -677,5 +676,3 @@ func TestStreamWriterIncremental(t *testing.T) { }) }) } -======= ->>>>>>> dfd26d5 (opt(stream): add option to directly copy over tables from lower levels (#1700))