From feed37dc56f6b0af71530887b1e0a1032a2a00e3 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 30 Mar 2021 15:13:51 -0500 Subject: [PATCH] ADR-038 Part 1: WriteListener, listen.KVStore, MultiStore and KVStore updates (#8551) * StoreKVPair protobuf message definition and generated go types * store WriteListener * update MultiStore, CacheWrap, CacheWrapper interfaces * adjust KVStores to fit new CacheWrapper interface * new ListenKVStore * adjust multistores to fit new MultiStore interface and enable wrapping returned KVStores with the new ListenKVStore * typo fixes in adr * ListenKV Store test * update server mock KVStore and MultiStore * multistore unit test; fix multistore constructor * update changelog * fix bug identified in CI * improve codecov, minor fixes/adjustments * review fixes * review updates; flip set to delete in KVStorePair, updated proto-docs from running 'make proto-gen' --- CHANGELOG.md | 1 + docs/architecture/adr-038-state-listening.md | 37 +- docs/core/proto-docs.md | 56 ++- .../cosmos/base/store/v1beta1/listening.proto | 13 + server/mock/store.go | 16 + store/cachekv/store.go | 6 + store/cachemulti/store.go | 42 +- store/dbadapter/store.go | 6 + store/dbadapter/store_test.go | 17 + store/gaskv/store.go | 5 + store/gaskv/store_test.go | 1 + store/iavl/store.go | 6 + store/iavl/store_test.go | 17 + store/listenkv/store.go | 155 ++++++ store/listenkv/store_test.go | 298 +++++++++++ store/mem/mem_test.go | 11 + store/mem/store.go | 6 + store/prefix/store.go | 6 + store/prefix/store_test.go | 16 + store/rootmulti/store.go | 34 +- store/rootmulti/store_test.go | 144 +++++- store/tracekv/store.go | 9 +- store/tracekv/store_test.go | 6 + store/types/listening.go | 47 ++ store/types/listening.pb.go | 469 ++++++++++++++++++ store/types/listening_test.go | 65 +++ store/types/store.go | 13 + 27 files changed, 1456 insertions(+), 46 deletions(-) create mode 100644 proto/cosmos/base/store/v1beta1/listening.proto create mode 100644 store/listenkv/store.go create mode 100644 store/listenkv/store_test.go create mode 100644 store/types/listening.go create mode 100644 store/types/listening.pb.go create mode 100644 store/types/listening_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e82d2bd46ad..39cda9d4cc6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (x/upgrade) [\#8743](https://github.com/cosmos/cosmos-sdk/pull/8743) Add tracking module versions as per ADR-041 * (types) [\#8962](https://github.com/cosmos/cosmos-sdk/issues/8962) Add `Abs()` method to `sdk.Int`. * (x/bank) [\#8950](https://github.com/cosmos/cosmos-sdk/pull/8950) Improve efficiency on supply updates. +* (store) [\#8012](https://github.com/cosmos/cosmos-sdk/pull/8012) Implementation of ADR-038 WriteListener and listen.KVStore ### Bug Fixes diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index cd78e72e2caa..bee6a837c909 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -33,7 +33,7 @@ type WriteListener interface { // if value is nil then it was deleted // storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores // set bool indicates if it was a set; true: set, false: delete - OnWrite(storeKey types.StoreKey, set bool, key []byte, value []byte) + OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error } ``` @@ -72,15 +72,20 @@ func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryMarshaler) *StoreKVP } // OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs -func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, set bool, key []byte, value []byte) { +func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte, delete bool) error error { kvPair := new(types.StoreKVPair) kvPair.StoreKey = storeKey.Name() - kvPair.Set = set + kvPair.Delete = Delete kvPair.Key = key kvPair.Value = value - if by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair); err == nil { - wl.writer.Write(by) + by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair) + if err != nil { + return err } + if _, err := wl.writer.Write(by); err != nil { + return err + } + return nil } ``` @@ -110,20 +115,22 @@ func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteL func (s *Store) Set(key []byte, value []byte) { types.AssertValidKey(key) s.parent.Set(key, value) - s.onWrite(true, key, value) + s.onWrite(false, key, value) } // Delete implements the KVStore interface. It traces a write operation and // delegates the Delete call to the parent KVStore. func (s *Store) Delete(key []byte) { s.parent.Delete(key) - s.onWrite(false, key, nil) + s.onWrite(true, key, nil) } // onWrite writes a KVStore operation to all of the WriteListeners -func (s *Store) onWrite(set bool, key, value []byte) { +func (s *Store) onWrite(delete bool, key, value []byte) { for _, l := range s.listeners { - l.OnWrite(s.parentStoreKey, set, key, value) + if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil { + // log error + } } } ``` @@ -140,9 +147,9 @@ type MultiStore interface { // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey ListeningEnabled(key StoreKey) bool - // SetListeners sets the WriteListeners for the KVStore belonging to the provided StoreKey + // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey // It appends the listeners to a current set, if one already exists - SetListeners(key StoreKey, listeners []WriteListener) + AddListeners(key StoreKey, listeners []WriteListener) } ``` @@ -342,7 +349,7 @@ func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan stru case <-quitChan: return case by := <-fss.srcChan: - append(fss.stateCache, by) + fss.stateCache = append(fss.stateCache, by) } } }() @@ -380,7 +387,7 @@ We will add a new method to the `BaseApp` to enable the registration of `Streami func (app *BaseApp) RegisterHooks(s StreamingService) { // set the listeners for each StoreKey for key, lis := range s.Listeners() { - app.cms.SetListeners(key, lis) + app.cms.AddListeners(key, lis) } // register the streaming service hooks within the BaseApp // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks @@ -398,7 +405,7 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg ... // Call the streaming service hooks with the BeginBlock messages - for _ hook := range app.hooks { + for _, hook := range app.hooks { hook.ListenBeginBlock(app.deliverState.ctx, req, res) } @@ -445,7 +452,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx } // Call the streaming service hooks with the DeliverTx messages - for _, hook := range app.hook { + for _, hook := range app.hooks { hook.ListenDeliverTx(app.deliverState.ctx, req, res) } diff --git a/docs/core/proto-docs.md b/docs/core/proto-docs.md index 39ddd22a0d57..e886b1d4f4d5 100644 --- a/docs/core/proto-docs.md +++ b/docs/core/proto-docs.md @@ -80,7 +80,6 @@ - [Output](#cosmos.bank.v1beta1.Output) - [Params](#cosmos.bank.v1beta1.Params) - [SendEnabled](#cosmos.bank.v1beta1.SendEnabled) - - [Supply](#cosmos.bank.v1beta1.Supply) - [cosmos/bank/v1beta1/genesis.proto](#cosmos/bank/v1beta1/genesis.proto) - [Balance](#cosmos.bank.v1beta1.Balance) @@ -133,6 +132,9 @@ - [CommitInfo](#cosmos.base.store.v1beta1.CommitInfo) - [StoreInfo](#cosmos.base.store.v1beta1.StoreInfo) +- [cosmos/base/store/v1beta1/listening.proto](#cosmos/base/store/v1beta1/listening.proto) + - [StoreKVPair](#cosmos.base.store.v1beta1.StoreKVPair) + - [cosmos/base/store/v1beta1/snapshot.proto](#cosmos/base/store/v1beta1/snapshot.proto) - [SnapshotIAVLItem](#cosmos.base.store.v1beta1.SnapshotIAVLItem) - [SnapshotItem](#cosmos.base.store.v1beta1.SnapshotItem) @@ -1563,23 +1565,6 @@ sendable). - - - -### Supply -Supply represents a struct that passively keeps track of the total supply -amounts in the network. -This message is deprecated now that supply is indexed by denom. - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| `total` | [cosmos.base.v1beta1.Coin](#cosmos.base.v1beta1.Coin) | repeated | | - - - - - @@ -2200,6 +2185,41 @@ between a store name and the commit ID. + + + + + + + + + + + +

Top

+ +## cosmos/base/store/v1beta1/listening.proto + + + + + +### StoreKVPair +StoreKVPair is a KVStore KVPair used for listening to state changes (Sets and Deletes) +It optionally includes the StoreKey for the originating KVStore and a Boolean flag to distinguish between Sets and Deletes + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| `store_key` | [string](#string) | | the store key for the KVStore this pair originates from | +| `delete` | [bool](#bool) | | true indicates a delete operation, false indicates a set operation | +| `key` | [bytes](#bytes) | | | +| `value` | [bytes](#bytes) | | | + + + + + diff --git a/proto/cosmos/base/store/v1beta1/listening.proto b/proto/cosmos/base/store/v1beta1/listening.proto new file mode 100644 index 000000000000..d5ba74865b25 --- /dev/null +++ b/proto/cosmos/base/store/v1beta1/listening.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; +package cosmos.base.store.v1beta1; + +option go_package = "github.com/cosmos/cosmos-sdk/store/types"; + +// StoreKVPair is a KVStore KVPair used for listening to state changes (Sets and Deletes) +// It optionally includes the StoreKey for the originating KVStore and a Boolean flag to distinguish between Sets and Deletes +message StoreKVPair { + string store_key = 1; // the store key for the KVStore this pair originates from + bool delete = 2; // true indicates a delete operation, false indicates a set operation + bytes key = 3; + bytes value = 4; +} diff --git a/server/mock/store.go b/server/mock/store.go index d731767f192a..33f573518c19 100644 --- a/server/mock/store.go +++ b/server/mock/store.go @@ -31,6 +31,10 @@ func (ms multiStore) CacheWrapWithTrace(_ io.Writer, _ sdk.TraceContext) sdk.Cac panic("not implemented") } +func (ms multiStore) CacheWrapWithListeners(_ store.StoreKey, _ []store.WriteListener) store.CacheWrap { + panic("not implemented") +} + func (ms multiStore) TracingEnabled() bool { panic("not implemented") } @@ -43,6 +47,14 @@ func (ms multiStore) SetTracer(w io.Writer) sdk.MultiStore { panic("not implemented") } +func (ms multiStore) AddListeners(key store.StoreKey, listeners []store.WriteListener) { + panic("not implemented") +} + +func (ms multiStore) ListeningEnabled(key store.StoreKey) bool { + panic("not implemented") +} + func (ms multiStore) Commit() sdk.CommitID { panic("not implemented") } @@ -131,6 +143,10 @@ func (kv kvStore) CacheWrapWithTrace(w io.Writer, tc sdk.TraceContext) sdk.Cache panic("not implemented") } +func (kv kvStore) CacheWrapWithListeners(_ store.StoreKey, _ []store.WriteListener) store.CacheWrap { + panic("not implemented") +} + func (kv kvStore) GetStoreType() sdk.StoreType { panic("not implemented") } diff --git a/store/cachekv/store.go b/store/cachekv/store.go index af5d75f9c83e..74122dfe3b89 100644 --- a/store/cachekv/store.go +++ b/store/cachekv/store.go @@ -10,6 +10,7 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/internal/conv" + "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" @@ -146,6 +147,11 @@ func (store *Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types return NewStore(tracekv.NewStore(store, w, tc)) } +// CacheWrapWithListeners implements the CacheWrapper interface. +func (store *Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { + return NewStore(listenkv.NewStore(store, storeKey, listeners)) +} + //---------------------------------------- // Iteration diff --git a/store/cachemulti/store.go b/store/cachemulti/store.go index 59a29c358cc5..a11bcba5d4ad 100644 --- a/store/cachemulti/store.go +++ b/store/cachemulti/store.go @@ -25,6 +25,8 @@ type Store struct { traceWriter io.Writer traceContext types.TraceContext + + listeners map[types.StoreKey][]types.WriteListener } var _ types.CacheMultiStore = Store{} @@ -35,6 +37,7 @@ var _ types.CacheMultiStore = Store{} func NewFromKVStore( store types.KVStore, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext, + listeners map[types.StoreKey][]types.WriteListener, ) Store { cms := Store{ db: cachekv.NewStore(store), @@ -42,13 +45,20 @@ func NewFromKVStore( keys: keys, traceWriter: traceWriter, traceContext: traceContext, + listeners: listeners, } for key, store := range stores { + var cacheWrapped types.CacheWrap if cms.TracingEnabled() { - cms.stores[key] = store.CacheWrapWithTrace(cms.traceWriter, cms.traceContext) + cacheWrapped = store.CacheWrapWithTrace(cms.traceWriter, cms.traceContext) + } else { + cacheWrapped = store.CacheWrap() + } + if cms.ListeningEnabled(key) { + cms.stores[key] = cacheWrapped.CacheWrapWithListeners(key, cms.listeners[key]) } else { - cms.stores[key] = store.CacheWrap() + cms.stores[key] = cacheWrapped } } @@ -59,10 +69,10 @@ func NewFromKVStore( // CacheWrapper objects. Each CacheWrapper store is a branched store. func NewStore( db dbm.DB, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey, - traceWriter io.Writer, traceContext types.TraceContext, + traceWriter io.Writer, traceContext types.TraceContext, listeners map[types.StoreKey][]types.WriteListener, ) Store { - return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext) + return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext, listeners) } func newCacheMultiStoreFromCMS(cms Store) Store { @@ -71,7 +81,7 @@ func newCacheMultiStoreFromCMS(cms Store) Store { stores[k] = v } - return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext) + return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext, cms.listeners) } // SetTracer sets the tracer for the MultiStore that the underlying @@ -102,6 +112,23 @@ func (cms Store) TracingEnabled() bool { return cms.traceWriter != nil } +// AddListeners adds listeners for a specific KVStore +func (cms Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) { + if ls, ok := cms.listeners[key]; ok { + cms.listeners[key] = append(ls, listeners...) + } else { + cms.listeners[key] = listeners + } +} + +// ListeningEnabled returns if listening is enabled for a specific KVStore +func (cms Store) ListeningEnabled(key types.StoreKey) bool { + if ls, ok := cms.listeners[key]; ok { + return len(ls) != 0 + } + return false +} + // GetStoreType returns the type of the store. func (cms Store) GetStoreType() types.StoreType { return types.StoreTypeMulti @@ -125,6 +152,11 @@ func (cms Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.Cac return cms.CacheWrap() } +// CacheWrapWithListeners implements the CacheWrapper interface. +func (cms Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap { + return cms.CacheWrap() +} + // Implements MultiStore. func (cms Store) CacheMultiStore() types.CacheMultiStore { return newCacheMultiStoreFromCMS(cms) diff --git a/store/dbadapter/store.go b/store/dbadapter/store.go index e9ea4f847d14..2f0ceb5df54a 100644 --- a/store/dbadapter/store.go +++ b/store/dbadapter/store.go @@ -6,6 +6,7 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" ) @@ -85,5 +86,10 @@ func (dsa Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Ca return cachekv.NewStore(tracekv.NewStore(dsa, w, tc)) } +// CacheWrapWithListeners implements the CacheWrapper interface. +func (dsa Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { + return cachekv.NewStore(listenkv.NewStore(dsa, storeKey, listeners)) +} + // dbm.DB implements KVStore so we can CacheKVStore it. var _ types.KVStore = Store{} diff --git a/store/dbadapter/store_test.go b/store/dbadapter/store_test.go index c09f09331630..9f8ac71b25cf 100644 --- a/store/dbadapter/store_test.go +++ b/store/dbadapter/store_test.go @@ -5,6 +5,8 @@ import ( "errors" "testing" + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -71,3 +73,18 @@ func TestAccessors(t *testing.T) { mockDB.EXPECT().ReverseIterator(gomock.Eq(start), gomock.Eq(end)).Times(1).Return(nil, errFoo) require.Panics(t, func() { store.ReverseIterator(start, end) }) } + +func TestCacheWraps(t *testing.T) { + mockCtrl := gomock.NewController(t) + mockDB := mocks.NewMockDB(mockCtrl) + store := dbadapter.Store{mockDB} + + cacheWrapper := store.CacheWrap() + require.IsType(t, &cachekv.Store{}, cacheWrapper) + + cacheWrappedWithTrace := store.CacheWrapWithTrace(nil, nil) + require.IsType(t, &cachekv.Store{}, cacheWrappedWithTrace) + + cacheWrappedWithListeners := store.CacheWrapWithListeners(nil, nil) + require.IsType(t, &cachekv.Store{}, cacheWrappedWithListeners) +} diff --git a/store/gaskv/store.go b/store/gaskv/store.go index 06b55f5ead48..47d796727acc 100644 --- a/store/gaskv/store.go +++ b/store/gaskv/store.go @@ -98,6 +98,11 @@ func (gs *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.Cac panic("cannot CacheWrapWithTrace a GasKVStore") } +// CacheWrapWithListeners implements the CacheWrapper interface. +func (gs *Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap { + panic("cannot CacheWrapWithListeners a GasKVStore") +} + func (gs *Store) iterator(start, end []byte, ascending bool) types.Iterator { var parent types.Iterator if ascending { diff --git a/store/gaskv/store_test.go b/store/gaskv/store_test.go index 432fbe376343..e111a72329a7 100644 --- a/store/gaskv/store_test.go +++ b/store/gaskv/store_test.go @@ -26,6 +26,7 @@ func TestGasKVStoreBasic(t *testing.T) { require.Equal(t, types.StoreTypeDB, st.GetStoreType()) require.Panics(t, func() { st.CacheWrap() }) require.Panics(t, func() { st.CacheWrapWithTrace(nil, nil) }) + require.Panics(t, func() { st.CacheWrapWithListeners(nil, nil) }) require.Panics(t, func() { st.Set(nil, []byte("value")) }, "setting a nil key should panic") require.Panics(t, func() { st.Set([]byte(""), []byte("value")) }, "setting an empty key should panic") diff --git a/store/iavl/store.go b/store/iavl/store.go index e3a3f897d70f..440b26754de5 100644 --- a/store/iavl/store.go +++ b/store/iavl/store.go @@ -14,6 +14,7 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" @@ -159,6 +160,11 @@ func (st *Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Ca return cachekv.NewStore(tracekv.NewStore(st, w, tc)) } +// CacheWrapWithListeners implements the CacheWrapper interface. +func (st *Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { + return cachekv.NewStore(listenkv.NewStore(st, storeKey, listeners)) +} + // Implements types.KVStore. func (st *Store) Set(key, value []byte) { types.AssertValidKey(key) diff --git a/store/iavl/store_test.go b/store/iavl/store_test.go index cfda5efacf37..26cf27db8bf0 100644 --- a/store/iavl/store_test.go +++ b/store/iavl/store_test.go @@ -5,6 +5,8 @@ import ( "fmt" "testing" + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/iavl" "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" @@ -634,3 +636,18 @@ func TestSetInitialVersion(t *testing.T) { }) } } + +func TestCacheWraps(t *testing.T) { + db := dbm.NewMemDB() + tree, _ := newAlohaTree(t, db) + store := UnsafeNewStore(tree) + + cacheWrapper := store.CacheWrap() + require.IsType(t, &cachekv.Store{}, cacheWrapper) + + cacheWrappedWithTrace := store.CacheWrapWithTrace(nil, nil) + require.IsType(t, &cachekv.Store{}, cacheWrappedWithTrace) + + cacheWrappedWithListeners := store.CacheWrapWithListeners(nil, nil) + require.IsType(t, &cachekv.Store{}, cacheWrappedWithListeners) +} diff --git a/store/listenkv/store.go b/store/listenkv/store.go new file mode 100644 index 000000000000..dfb6dea46c2f --- /dev/null +++ b/store/listenkv/store.go @@ -0,0 +1,155 @@ +package listenkv + +import ( + "io" + + "github.com/cosmos/cosmos-sdk/store/types" +) + +var _ types.KVStore = &Store{} + +// Store implements the KVStore interface with listening enabled. +// Operations are traced on each core KVStore call and written to any of the +// underlying listeners with the proper key and operation permissions +type Store struct { + parent types.KVStore + listeners []types.WriteListener + parentStoreKey types.StoreKey +} + +// NewStore returns a reference to a new traceKVStore given a parent +// KVStore implementation and a buffered writer. +func NewStore(parent types.KVStore, parentStoreKey types.StoreKey, listeners []types.WriteListener) *Store { + return &Store{parent: parent, listeners: listeners, parentStoreKey: parentStoreKey} +} + +// Get implements the KVStore interface. It traces a read operation and +// delegates a Get call to the parent KVStore. +func (s *Store) Get(key []byte) []byte { + value := s.parent.Get(key) + return value +} + +// Set implements the KVStore interface. It traces a write operation and +// delegates the Set call to the parent KVStore. +func (s *Store) Set(key []byte, value []byte) { + types.AssertValidKey(key) + s.parent.Set(key, value) + s.onWrite(false, key, value) +} + +// Delete implements the KVStore interface. It traces a write operation and +// delegates the Delete call to the parent KVStore. +func (s *Store) Delete(key []byte) { + s.parent.Delete(key) + s.onWrite(true, key, nil) +} + +// Has implements the KVStore interface. It delegates the Has call to the +// parent KVStore. +func (s *Store) Has(key []byte) bool { + return s.parent.Has(key) +} + +// Iterator implements the KVStore interface. It delegates the Iterator call +// the to the parent KVStore. +func (s *Store) Iterator(start, end []byte) types.Iterator { + return s.iterator(start, end, true) +} + +// ReverseIterator implements the KVStore interface. It delegates the +// ReverseIterator call the to the parent KVStore. +func (s *Store) ReverseIterator(start, end []byte) types.Iterator { + return s.iterator(start, end, false) +} + +// iterator facilitates iteration over a KVStore. It delegates the necessary +// calls to it's parent KVStore. +func (s *Store) iterator(start, end []byte, ascending bool) types.Iterator { + var parent types.Iterator + + if ascending { + parent = s.parent.Iterator(start, end) + } else { + parent = s.parent.ReverseIterator(start, end) + } + + return newTraceIterator(parent, s.listeners) +} + +type listenIterator struct { + parent types.Iterator + listeners []types.WriteListener +} + +func newTraceIterator(parent types.Iterator, listeners []types.WriteListener) types.Iterator { + return &listenIterator{parent: parent, listeners: listeners} +} + +// Domain implements the Iterator interface. +func (li *listenIterator) Domain() (start []byte, end []byte) { + return li.parent.Domain() +} + +// Valid implements the Iterator interface. +func (li *listenIterator) Valid() bool { + return li.parent.Valid() +} + +// Next implements the Iterator interface. +func (li *listenIterator) Next() { + li.parent.Next() +} + +// Key implements the Iterator interface. +func (li *listenIterator) Key() []byte { + key := li.parent.Key() + return key +} + +// Value implements the Iterator interface. +func (li *listenIterator) Value() []byte { + value := li.parent.Value() + return value +} + +// Close implements the Iterator interface. +func (li *listenIterator) Close() error { + return li.parent.Close() +} + +// Error delegates the Error call to the parent iterator. +func (li *listenIterator) Error() error { + return li.parent.Error() +} + +// GetStoreType implements the KVStore interface. It returns the underlying +// KVStore type. +func (s *Store) GetStoreType() types.StoreType { + return s.parent.GetStoreType() +} + +// CacheWrap implements the KVStore interface. It panics as a Store +// cannot be cache wrapped. +func (s *Store) CacheWrap() types.CacheWrap { + panic("cannot CacheWrap a ListenKVStore") +} + +// CacheWrapWithTrace implements the KVStore interface. It panics as a +// Store cannot be cache wrapped. +func (s *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap { + panic("cannot CacheWrapWithTrace a ListenKVStore") +} + +// CacheWrapWithListeners implements the KVStore interface. It panics as a +// Store cannot be cache wrapped. +func (s *Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap { + panic("cannot CacheWrapWithListeners a ListenKVStore") +} + +// onWrite writes a KVStore operation to all of the WriteListeners +func (s *Store) onWrite(delete bool, key, value []byte) { + for _, l := range s.listeners { + l.OnWrite(s.parentStoreKey, key, value, delete) + } +} diff --git a/store/listenkv/store_test.go b/store/listenkv/store_test.go new file mode 100644 index 000000000000..6391458e8be0 --- /dev/null +++ b/store/listenkv/store_test.go @@ -0,0 +1,298 @@ +package listenkv_test + +import ( + "bytes" + "fmt" + "io" + "testing" + + "github.com/cosmos/cosmos-sdk/codec" + codecTypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/listenkv" + "github.com/cosmos/cosmos-sdk/store/prefix" + "github.com/cosmos/cosmos-sdk/store/types" + + "github.com/stretchr/testify/require" + + dbm "github.com/tendermint/tm-db" +) + +func bz(s string) []byte { return []byte(s) } + +func keyFmt(i int) []byte { return bz(fmt.Sprintf("key%0.8d", i)) } +func valFmt(i int) []byte { return bz(fmt.Sprintf("value%0.8d", i)) } + +var kvPairs = []types.KVPair{ + {Key: keyFmt(1), Value: valFmt(1)}, + {Key: keyFmt(2), Value: valFmt(2)}, + {Key: keyFmt(3), Value: valFmt(3)}, +} + +var testStoreKey = types.NewKVStoreKey("listen_test") +var interfaceRegistry = codecTypes.NewInterfaceRegistry() +var testMarshaller = codec.NewProtoCodec(interfaceRegistry) + +func newListenKVStore(w io.Writer) *listenkv.Store { + store := newEmptyListenKVStore(w) + + for _, kvPair := range kvPairs { + store.Set(kvPair.Key, kvPair.Value) + } + + return store +} + +func newEmptyListenKVStore(w io.Writer) *listenkv.Store { + listener := types.NewStoreKVPairWriteListener(w, testMarshaller) + memDB := dbadapter.Store{DB: dbm.NewMemDB()} + + return listenkv.NewStore(memDB, testStoreKey, []types.WriteListener{listener}) +} + +func TestListenKVStoreGet(t *testing.T) { + testCases := []struct { + key []byte + expectedValue []byte + }{ + { + key: kvPairs[0].Key, + expectedValue: kvPairs[0].Value, + }, + { + key: []byte("does-not-exist"), + expectedValue: nil, + }, + } + + for _, tc := range testCases { + var buf bytes.Buffer + + store := newListenKVStore(&buf) + buf.Reset() + value := store.Get(tc.key) + + require.Equal(t, tc.expectedValue, value) + } +} + +func TestListenKVStoreSet(t *testing.T) { + testCases := []struct { + key []byte + value []byte + expectedOut *types.StoreKVPair + }{ + { + key: kvPairs[0].Key, + value: kvPairs[0].Value, + expectedOut: &types.StoreKVPair{ + Key: kvPairs[0].Key, + Value: kvPairs[0].Value, + StoreKey: testStoreKey.Name(), + Delete: false, + }, + }, + { + key: kvPairs[1].Key, + value: kvPairs[1].Value, + expectedOut: &types.StoreKVPair{ + Key: kvPairs[1].Key, + Value: kvPairs[1].Value, + StoreKey: testStoreKey.Name(), + Delete: false, + }, + }, + { + key: kvPairs[2].Key, + value: kvPairs[2].Value, + expectedOut: &types.StoreKVPair{ + Key: kvPairs[2].Key, + Value: kvPairs[2].Value, + StoreKey: testStoreKey.Name(), + Delete: false, + }, + }, + } + + for _, tc := range testCases { + var buf bytes.Buffer + + store := newEmptyListenKVStore(&buf) + buf.Reset() + store.Set(tc.key, tc.value) + storeKVPair := new(types.StoreKVPair) + testMarshaller.UnmarshalBinaryLengthPrefixed(buf.Bytes(), storeKVPair) + + require.Equal(t, tc.expectedOut, storeKVPair) + } + + var buf bytes.Buffer + store := newEmptyListenKVStore(&buf) + require.Panics(t, func() { store.Set([]byte(""), []byte("value")) }, "setting an empty key should panic") + require.Panics(t, func() { store.Set(nil, []byte("value")) }, "setting a nil key should panic") + +} + +func TestListenKVStoreDelete(t *testing.T) { + testCases := []struct { + key []byte + expectedOut *types.StoreKVPair + }{ + { + key: kvPairs[0].Key, + expectedOut: &types.StoreKVPair{ + Key: kvPairs[0].Key, + Value: nil, + StoreKey: testStoreKey.Name(), + Delete: true, + }, + }, + } + + for _, tc := range testCases { + var buf bytes.Buffer + + store := newListenKVStore(&buf) + buf.Reset() + store.Delete(tc.key) + storeKVPair := new(types.StoreKVPair) + testMarshaller.UnmarshalBinaryLengthPrefixed(buf.Bytes(), storeKVPair) + + require.Equal(t, tc.expectedOut, storeKVPair) + } +} + +func TestListenKVStoreHas(t *testing.T) { + testCases := []struct { + key []byte + expected bool + }{ + { + key: kvPairs[0].Key, + expected: true, + }, + } + + for _, tc := range testCases { + var buf bytes.Buffer + + store := newListenKVStore(&buf) + buf.Reset() + ok := store.Has(tc.key) + + require.Equal(t, tc.expected, ok) + } +} + +func TestTestListenKVStoreIterator(t *testing.T) { + var buf bytes.Buffer + + store := newListenKVStore(&buf) + iterator := store.Iterator(nil, nil) + + s, e := iterator.Domain() + require.Equal(t, []byte(nil), s) + require.Equal(t, []byte(nil), e) + + testCases := []struct { + expectedKey []byte + expectedValue []byte + }{ + { + expectedKey: kvPairs[0].Key, + expectedValue: kvPairs[0].Value, + }, + { + expectedKey: kvPairs[1].Key, + expectedValue: kvPairs[1].Value, + }, + { + expectedKey: kvPairs[2].Key, + expectedValue: kvPairs[2].Value, + }, + } + + for _, tc := range testCases { + ka := iterator.Key() + require.Equal(t, tc.expectedKey, ka) + + va := iterator.Value() + require.Equal(t, tc.expectedValue, va) + + iterator.Next() + } + + require.False(t, iterator.Valid()) + require.Panics(t, iterator.Next) + require.NoError(t, iterator.Close()) +} + +func TestTestListenKVStoreReverseIterator(t *testing.T) { + var buf bytes.Buffer + + store := newListenKVStore(&buf) + iterator := store.ReverseIterator(nil, nil) + + s, e := iterator.Domain() + require.Equal(t, []byte(nil), s) + require.Equal(t, []byte(nil), e) + + testCases := []struct { + expectedKey []byte + expectedValue []byte + }{ + { + expectedKey: kvPairs[2].Key, + expectedValue: kvPairs[2].Value, + }, + { + expectedKey: kvPairs[1].Key, + expectedValue: kvPairs[1].Value, + }, + { + expectedKey: kvPairs[0].Key, + expectedValue: kvPairs[0].Value, + }, + } + + for _, tc := range testCases { + ka := iterator.Key() + require.Equal(t, tc.expectedKey, ka) + + va := iterator.Value() + require.Equal(t, tc.expectedValue, va) + + iterator.Next() + } + + require.False(t, iterator.Valid()) + require.Panics(t, iterator.Next) + require.NoError(t, iterator.Close()) +} + +func TestListenKVStorePrefix(t *testing.T) { + store := newEmptyListenKVStore(nil) + pStore := prefix.NewStore(store, []byte("listen_prefix")) + require.IsType(t, prefix.Store{}, pStore) +} + +func TestListenKVStoreGetStoreType(t *testing.T) { + memDB := dbadapter.Store{DB: dbm.NewMemDB()} + store := newEmptyListenKVStore(nil) + require.Equal(t, memDB.GetStoreType(), store.GetStoreType()) +} + +func TestListenKVStoreCacheWrap(t *testing.T) { + store := newEmptyListenKVStore(nil) + require.Panics(t, func() { store.CacheWrap() }) +} + +func TestListenKVStoreCacheWrapWithTrace(t *testing.T) { + store := newEmptyListenKVStore(nil) + require.Panics(t, func() { store.CacheWrapWithTrace(nil, nil) }) +} + +func TestListenKVStoreCacheWrapWithListeners(t *testing.T) { + store := newEmptyListenKVStore(nil) + require.Panics(t, func() { store.CacheWrapWithListeners(nil, nil) }) +} diff --git a/store/mem/mem_test.go b/store/mem/mem_test.go index cff4c37da7dc..a2fc6add8a3e 100644 --- a/store/mem/mem_test.go +++ b/store/mem/mem_test.go @@ -3,6 +3,8 @@ package mem_test import ( "testing" + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/stretchr/testify/require" "github.com/cosmos/cosmos-sdk/store/mem" @@ -25,6 +27,15 @@ func TestStore(t *testing.T) { db.Delete(key) require.Nil(t, db.Get(key)) + + cacheWrapper := db.CacheWrap() + require.IsType(t, &cachekv.Store{}, cacheWrapper) + + cacheWrappedWithTrace := db.CacheWrapWithTrace(nil, nil) + require.IsType(t, &cachekv.Store{}, cacheWrappedWithTrace) + + cacheWrappedWithListeners := db.CacheWrapWithListeners(nil, nil) + require.IsType(t, &cachekv.Store{}, cacheWrappedWithListeners) } func TestCommit(t *testing.T) { diff --git a/store/mem/store.go b/store/mem/store.go index 66591b645f59..c8aa6dca5997 100644 --- a/store/mem/store.go +++ b/store/mem/store.go @@ -7,6 +7,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachekv" "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" ) @@ -45,6 +46,11 @@ func (s Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Cach return cachekv.NewStore(tracekv.NewStore(s, w, tc)) } +// CacheWrapWithListeners implements the CacheWrapper interface. +func (s Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { + return cachekv.NewStore(listenkv.NewStore(s, storeKey, listeners)) +} + // Commit performs a no-op as entries are persistent between commitments. func (s *Store) Commit() (id types.CommitID) { return } diff --git a/store/prefix/store.go b/store/prefix/store.go index 4f9d5a75e087..295278a0a853 100644 --- a/store/prefix/store.go +++ b/store/prefix/store.go @@ -6,6 +6,7 @@ import ( "io" "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" ) @@ -57,6 +58,11 @@ func (s Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Cach return cachekv.NewStore(tracekv.NewStore(s, w, tc)) } +// CacheWrapWithListeners implements the CacheWrapper interface. +func (s Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { + return cachekv.NewStore(listenkv.NewStore(s, storeKey, listeners)) +} + // Implements KVStore func (s Store) Get(key []byte) []byte { res := s.parent.Get(s.key(key)) diff --git a/store/prefix/store_test.go b/store/prefix/store_test.go index b6f32b45f961..bf49e9cfe528 100644 --- a/store/prefix/store_test.go +++ b/store/prefix/store_test.go @@ -4,6 +4,8 @@ import ( "crypto/rand" "testing" + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" @@ -426,3 +428,17 @@ func TestPrefixDBReverseIterator4(t *testing.T) { checkInvalid(t, itr) itr.Close() } + +func TestCacheWraps(t *testing.T) { + db := dbm.NewMemDB() + store := dbadapter.Store{DB: db} + + cacheWrapper := store.CacheWrap() + require.IsType(t, &cachekv.Store{}, cacheWrapper) + + cacheWrappedWithTrace := store.CacheWrapWithTrace(nil, nil) + require.IsType(t, &cachekv.Store{}, cacheWrappedWithTrace) + + cacheWrappedWithListeners := store.CacheWrapWithListeners(nil, nil) + require.IsType(t, &cachekv.Store{}, cacheWrappedWithListeners) +} diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index 1ca43eae9d4e..b7d578f703ee 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -22,6 +22,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachemulti" "github.com/cosmos/cosmos-sdk/store/dbadapter" "github.com/cosmos/cosmos-sdk/store/iavl" + "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/mem" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/transient" @@ -58,6 +59,8 @@ type Store struct { traceContext types.TraceContext interBlockCache types.MultiStorePersistentCache + + listeners map[types.StoreKey][]types.WriteListener } var ( @@ -77,6 +80,7 @@ func NewStore(db dbm.DB) *Store { stores: make(map[types.StoreKey]types.CommitKVStore), keysByName: make(map[string]types.StoreKey), pruneHeights: make([]int64, 0), + listeners: make(map[types.StoreKey][]types.WriteListener), } } @@ -312,6 +316,23 @@ func (rs *Store) TracingEnabled() bool { return rs.traceWriter != nil } +// AddListeners adds listeners for a specific KVStore +func (rs *Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) { + if ls, ok := rs.listeners[key]; ok { + rs.listeners[key] = append(ls, listeners...) + } else { + rs.listeners[key] = listeners + } +} + +// ListeningEnabled returns if listening is enabled for a specific KVStore +func (rs *Store) ListeningEnabled(key types.StoreKey) bool { + if ls, ok := rs.listeners[key]; ok { + return len(ls) != 0 + } + return false +} + // LastCommitID implements Committer/CommitStore. func (rs *Store) LastCommitID() types.CommitID { if rs.lastCommitInfo == nil { @@ -404,6 +425,11 @@ func (rs *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.Cac return rs.CacheWrap() } +// CacheWrapWithListeners implements the CacheWrapper interface. +func (rs *Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap { + return rs.CacheWrap() +} + // CacheMultiStore creates ephemeral branch of the multi-store and returns a CacheMultiStore. // It implements the MultiStore interface. func (rs *Store) CacheMultiStore() types.CacheMultiStore { @@ -411,8 +437,7 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { for k, v := range rs.stores { stores[k] = v } - - return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext) + return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners) } // CacheMultiStoreWithVersion is analogous to CacheMultiStore except that it @@ -442,7 +467,7 @@ func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStor } } - return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.traceContext), nil + return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners), nil } // GetStore returns a mounted Store for a given StoreKey. If the StoreKey does @@ -472,6 +497,9 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { if rs.TracingEnabled() { store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext) } + if rs.ListeningEnabled(key) { + store = listenkv.NewStore(store, key, rs.listeners[key]) + } return store } diff --git a/store/rootmulti/store_test.go b/store/rootmulti/store_test.go index 4b4028c27237..6a501ae1f776 100644 --- a/store/rootmulti/store_test.go +++ b/store/rootmulti/store_test.go @@ -1,6 +1,7 @@ package rootmulti import ( + "bytes" "crypto/sha256" "encoding/binary" "encoding/hex" @@ -16,9 +17,13 @@ import ( abci "github.com/tendermint/tendermint/abci/types" dbm "github.com/tendermint/tm-db" + "github.com/cosmos/cosmos-sdk/codec" + codecTypes "github.com/cosmos/cosmos-sdk/codec/types" snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" + "github.com/cosmos/cosmos-sdk/store/cachemulti" "github.com/cosmos/cosmos-sdk/store/iavl" sdkmaps "github.com/cosmos/cosmos-sdk/store/internal/maps" + "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" ) @@ -62,6 +67,14 @@ func TestStoreMount(t *testing.T) { require.Panics(t, func() { store.MountStoreWithDB(dup1, types.StoreTypeIAVL, db) }) } +func TestCacheMultiStore(t *testing.T) { + var db dbm.DB = dbm.NewMemDB() + ms := newMultiStoreWithMounts(db, types.PruneNothing) + + cacheMulti := ms.CacheMultiStore() + require.IsType(t, cachemulti.Store{}, cacheMulti) +} + func TestCacheMultiStoreWithVersion(t *testing.T) { var db dbm.DB = dbm.NewMemDB() ms := newMultiStoreWithMounts(db, types.PruneNothing) @@ -672,6 +685,125 @@ func TestSetInitialVersion(t *testing.T) { require.True(t, iavlStore.VersionExists(5)) } +func TestAddListenersAndListeningEnabled(t *testing.T) { + db := dbm.NewMemDB() + multi := newMultiStoreWithMounts(db, types.PruneNothing) + testKey := types.NewKVStoreKey("listening_test_key") + enabled := multi.ListeningEnabled(testKey) + require.False(t, enabled) + + multi.AddListeners(testKey, []types.WriteListener{}) + enabled = multi.ListeningEnabled(testKey) + require.False(t, enabled) + + mockListener := types.NewStoreKVPairWriteListener(nil, nil) + multi.AddListeners(testKey, []types.WriteListener{mockListener}) + wrongTestKey := types.NewKVStoreKey("wrong_listening_test_key") + enabled = multi.ListeningEnabled(wrongTestKey) + require.False(t, enabled) + + enabled = multi.ListeningEnabled(testKey) + require.True(t, enabled) +} + +var ( + interfaceRegistry = codecTypes.NewInterfaceRegistry() + testMarshaller = codec.NewProtoCodec(interfaceRegistry) + testKey1 = []byte{1, 2, 3, 4, 5} + testValue1 = []byte{5, 4, 3, 2, 1} + testKey2 = []byte{2, 3, 4, 5, 6} + testValue2 = []byte{6, 5, 4, 3, 2} +) + +func TestGetListenWrappedKVStore(t *testing.T) { + buf := new(bytes.Buffer) + var db dbm.DB = dbm.NewMemDB() + ms := newMultiStoreWithMounts(db, types.PruneNothing) + ms.LoadLatestVersion() + mockListeners := []types.WriteListener{types.NewStoreKVPairWriteListener(buf, testMarshaller)} + ms.AddListeners(testStoreKey1, mockListeners) + ms.AddListeners(testStoreKey2, mockListeners) + + listenWrappedStore1 := ms.GetKVStore(testStoreKey1) + require.IsType(t, &listenkv.Store{}, listenWrappedStore1) + + listenWrappedStore1.Set(testKey1, testValue1) + expectedOutputKVPairSet1, err := testMarshaller.MarshalBinaryLengthPrefixed(&types.StoreKVPair{ + Key: testKey1, + Value: testValue1, + StoreKey: testStoreKey1.Name(), + Delete: false, + }) + require.Nil(t, err) + kvPairSet1Bytes := buf.Bytes() + buf.Reset() + require.Equal(t, expectedOutputKVPairSet1, kvPairSet1Bytes) + + listenWrappedStore1.Delete(testKey1) + expectedOutputKVPairDelete1, err := testMarshaller.MarshalBinaryLengthPrefixed(&types.StoreKVPair{ + Key: testKey1, + Value: nil, + StoreKey: testStoreKey1.Name(), + Delete: true, + }) + require.Nil(t, err) + kvPairDelete1Bytes := buf.Bytes() + buf.Reset() + require.Equal(t, expectedOutputKVPairDelete1, kvPairDelete1Bytes) + + listenWrappedStore2 := ms.GetKVStore(testStoreKey2) + require.IsType(t, &listenkv.Store{}, listenWrappedStore2) + + listenWrappedStore2.Set(testKey2, testValue2) + expectedOutputKVPairSet2, err := testMarshaller.MarshalBinaryLengthPrefixed(&types.StoreKVPair{ + Key: testKey2, + Value: testValue2, + StoreKey: testStoreKey2.Name(), + Delete: false, + }) + kvPairSet2Bytes := buf.Bytes() + buf.Reset() + require.Equal(t, expectedOutputKVPairSet2, kvPairSet2Bytes) + + listenWrappedStore2.Delete(testKey2) + expectedOutputKVPairDelete2, err := testMarshaller.MarshalBinaryLengthPrefixed(&types.StoreKVPair{ + Key: testKey2, + Value: nil, + StoreKey: testStoreKey2.Name(), + Delete: true, + }) + kvPairDelete2Bytes := buf.Bytes() + buf.Reset() + require.Equal(t, expectedOutputKVPairDelete2, kvPairDelete2Bytes) + + unwrappedStore := ms.GetKVStore(testStoreKey3) + require.IsType(t, &iavl.Store{}, unwrappedStore) + + unwrappedStore.Set(testKey2, testValue2) + kvPairSet3Bytes := buf.Bytes() + buf.Reset() + require.Equal(t, []byte{}, kvPairSet3Bytes) + + unwrappedStore.Delete(testKey2) + kvPairDelete3Bytes := buf.Bytes() + buf.Reset() + require.Equal(t, []byte{}, kvPairDelete3Bytes) +} + +func TestCacheWraps(t *testing.T) { + db := dbm.NewMemDB() + multi := newMultiStoreWithMounts(db, types.PruneNothing) + + cacheWrapper := multi.CacheWrap() + require.IsType(t, cachemulti.Store{}, cacheWrapper) + + cacheWrappedWithTrace := multi.CacheWrapWithTrace(nil, nil) + require.IsType(t, cachemulti.Store{}, cacheWrappedWithTrace) + + cacheWrappedWithListeners := multi.CacheWrapWithListeners(nil, nil) + require.IsType(t, cachemulti.Store{}, cacheWrappedWithListeners) +} + func BenchmarkMultistoreSnapshot100K(b *testing.B) { benchmarkMultistoreSnapshot(b, 10, 10000) } @@ -748,13 +880,19 @@ func benchmarkMultistoreSnapshotRestore(b *testing.B, stores uint8, storeKeys ui //----------------------------------------------------------------------- // utils +var ( + testStoreKey1 = types.NewKVStoreKey("store1") + testStoreKey2 = types.NewKVStoreKey("store2") + testStoreKey3 = types.NewKVStoreKey("store3") +) + func newMultiStoreWithMounts(db dbm.DB, pruningOpts types.PruningOptions) *Store { store := NewStore(db) store.pruningOpts = pruningOpts - store.MountStoreWithDB(types.NewKVStoreKey("store1"), types.StoreTypeIAVL, nil) - store.MountStoreWithDB(types.NewKVStoreKey("store2"), types.StoreTypeIAVL, nil) - store.MountStoreWithDB(types.NewKVStoreKey("store3"), types.StoreTypeIAVL, nil) + store.MountStoreWithDB(testStoreKey1, types.StoreTypeIAVL, nil) + store.MountStoreWithDB(testStoreKey2, types.StoreTypeIAVL, nil) + store.MountStoreWithDB(testStoreKey3, types.StoreTypeIAVL, nil) return store } diff --git a/store/tracekv/store.go b/store/tracekv/store.go index 2958d968267a..a454edc7dd5f 100644 --- a/store/tracekv/store.go +++ b/store/tracekv/store.go @@ -164,13 +164,18 @@ func (tkv *Store) GetStoreType() types.StoreType { // CacheWrap implements the KVStore interface. It panics because a Store // cannot be branched. func (tkv *Store) CacheWrap() types.CacheWrap { - panic("cannot CacheWrap a Store") + panic("cannot CacheWrap a TraceKVStore") } // CacheWrapWithTrace implements the KVStore interface. It panics as a // Store cannot be branched. func (tkv *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap { - panic("cannot CacheWrapWithTrace a Store") + panic("cannot CacheWrapWithTrace a TraceKVStore") +} + +// CacheWrapWithListeners implements the CacheWrapper interface. +func (tkv *Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap { + panic("cannot CacheWrapWithListeners a TraceKVStore") } // writeOperation writes a KVStore operation to the underlying io.Writer as diff --git a/store/tracekv/store_test.go b/store/tracekv/store_test.go index e2c4e2a0fe5d..db9a981237f4 100644 --- a/store/tracekv/store_test.go +++ b/store/tracekv/store_test.go @@ -286,7 +286,13 @@ func TestTraceKVStoreCacheWrap(t *testing.T) { store := newEmptyTraceKVStore(nil) require.Panics(t, func() { store.CacheWrap() }) } + func TestTraceKVStoreCacheWrapWithTrace(t *testing.T) { store := newEmptyTraceKVStore(nil) require.Panics(t, func() { store.CacheWrapWithTrace(nil, nil) }) } + +func TestTraceKVStoreCacheWrapWithListeners(t *testing.T) { + store := newEmptyTraceKVStore(nil) + require.Panics(t, func() { store.CacheWrapWithListeners(nil, nil) }) +} diff --git a/store/types/listening.go b/store/types/listening.go new file mode 100644 index 000000000000..96f29cce9e52 --- /dev/null +++ b/store/types/listening.go @@ -0,0 +1,47 @@ +package types + +import ( + "io" + + "github.com/cosmos/cosmos-sdk/codec" +) + +// WriteListener interface for streaming data out from a listenkv.Store +type WriteListener interface { + // if value is nil then it was deleted + // storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores + // delete bool indicates if it was a delete; true: delete, false: set + OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error +} + +// StoreKVPairWriteListener is used to configure listening to a KVStore by writing out length-prefixed +// protobuf encoded StoreKVPairs to an underlying io.Writer +type StoreKVPairWriteListener struct { + writer io.Writer + marshaller codec.BinaryMarshaler +} + +// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryMarshaler +func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryMarshaler) *StoreKVPairWriteListener { + return &StoreKVPairWriteListener{ + writer: w, + marshaller: m, + } +} + +// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs +func (wl *StoreKVPairWriteListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error { + kvPair := new(StoreKVPair) + kvPair.StoreKey = storeKey.Name() + kvPair.Delete = delete + kvPair.Key = key + kvPair.Value = value + by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair) + if err != nil { + return err + } + if _, err := wl.writer.Write(by); err != nil { + return err + } + return nil +} diff --git a/store/types/listening.pb.go b/store/types/listening.pb.go new file mode 100644 index 000000000000..d542b6047387 --- /dev/null +++ b/store/types/listening.pb.go @@ -0,0 +1,469 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: cosmos/base/store/v1beta1/listening.proto + +package types + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// StoreKVPair is a KVStore KVPair used for listening to state changes (Sets and Deletes) +// It optionally includes the StoreKey for the originating KVStore and a Boolean flag to distinguish between Sets and Deletes +type StoreKVPair struct { + StoreKey string `protobuf:"bytes,1,opt,name=store_key,json=storeKey,proto3" json:"store_key,omitempty"` + Delete bool `protobuf:"varint,2,opt,name=delete,proto3" json:"delete,omitempty"` + Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` +} + +func (m *StoreKVPair) Reset() { *m = StoreKVPair{} } +func (m *StoreKVPair) String() string { return proto.CompactTextString(m) } +func (*StoreKVPair) ProtoMessage() {} +func (*StoreKVPair) Descriptor() ([]byte, []int) { + return fileDescriptor_a5d350879fe4fecd, []int{0} +} +func (m *StoreKVPair) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StoreKVPair) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StoreKVPair.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StoreKVPair) XXX_Merge(src proto.Message) { + xxx_messageInfo_StoreKVPair.Merge(m, src) +} +func (m *StoreKVPair) XXX_Size() int { + return m.Size() +} +func (m *StoreKVPair) XXX_DiscardUnknown() { + xxx_messageInfo_StoreKVPair.DiscardUnknown(m) +} + +var xxx_messageInfo_StoreKVPair proto.InternalMessageInfo + +func (m *StoreKVPair) GetStoreKey() string { + if m != nil { + return m.StoreKey + } + return "" +} + +func (m *StoreKVPair) GetDelete() bool { + if m != nil { + return m.Delete + } + return false +} + +func (m *StoreKVPair) GetKey() []byte { + if m != nil { + return m.Key + } + return nil +} + +func (m *StoreKVPair) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func init() { + proto.RegisterType((*StoreKVPair)(nil), "cosmos.base.store.v1beta1.StoreKVPair") +} + +func init() { + proto.RegisterFile("cosmos/base/store/v1beta1/listening.proto", fileDescriptor_a5d350879fe4fecd) +} + +var fileDescriptor_a5d350879fe4fecd = []byte{ + // 224 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x4c, 0xce, 0x2f, 0xce, + 0xcd, 0x2f, 0xd6, 0x4f, 0x4a, 0x2c, 0x4e, 0xd5, 0x2f, 0x2e, 0xc9, 0x2f, 0x4a, 0xd5, 0x2f, 0x33, + 0x4c, 0x4a, 0x2d, 0x49, 0x34, 0xd4, 0xcf, 0xc9, 0x2c, 0x2e, 0x49, 0xcd, 0xcb, 0xcc, 0x4b, 0xd7, + 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x92, 0x84, 0x28, 0xd5, 0x03, 0x29, 0xd5, 0x03, 0x2b, 0xd5, + 0x83, 0x2a, 0x55, 0xca, 0xe2, 0xe2, 0x0e, 0x06, 0x09, 0x78, 0x87, 0x05, 0x24, 0x66, 0x16, 0x09, + 0x49, 0x73, 0x71, 0x82, 0xe5, 0xe3, 0xb3, 0x53, 0x2b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, + 0x38, 0xc0, 0x02, 0xde, 0xa9, 0x95, 0x42, 0x62, 0x5c, 0x6c, 0x29, 0xa9, 0x39, 0xa9, 0x25, 0xa9, + 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0x1c, 0x41, 0x50, 0x9e, 0x90, 0x00, 0x17, 0x33, 0x48, 0x39, 0xb3, + 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x88, 0x29, 0x24, 0xc2, 0xc5, 0x5a, 0x96, 0x98, 0x53, 0x9a, 0x2a, + 0xc1, 0x02, 0x16, 0x83, 0x70, 0x9c, 0x9c, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, + 0xc1, 0x23, 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8, 0xf1, 0x58, 0x8e, + 0x21, 0x4a, 0x23, 0x3d, 0xb3, 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, 0x3f, 0x57, 0x1f, 0xea, 0x2d, + 0x08, 0xa5, 0x5b, 0x9c, 0x92, 0x0d, 0xf5, 0x5c, 0x49, 0x65, 0x41, 0x6a, 0x71, 0x12, 0x1b, 0xd8, + 0x47, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x2b, 0xe0, 0xb3, 0x51, 0xfe, 0x00, 0x00, 0x00, +} + +func (m *StoreKVPair) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StoreKVPair) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StoreKVPair) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Value) > 0 { + i -= len(m.Value) + copy(dAtA[i:], m.Value) + i = encodeVarintListening(dAtA, i, uint64(len(m.Value))) + i-- + dAtA[i] = 0x22 + } + if len(m.Key) > 0 { + i -= len(m.Key) + copy(dAtA[i:], m.Key) + i = encodeVarintListening(dAtA, i, uint64(len(m.Key))) + i-- + dAtA[i] = 0x1a + } + if m.Delete { + i-- + if m.Delete { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x10 + } + if len(m.StoreKey) > 0 { + i -= len(m.StoreKey) + copy(dAtA[i:], m.StoreKey) + i = encodeVarintListening(dAtA, i, uint64(len(m.StoreKey))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintListening(dAtA []byte, offset int, v uint64) int { + offset -= sovListening(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *StoreKVPair) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.StoreKey) + if l > 0 { + n += 1 + l + sovListening(uint64(l)) + } + if m.Delete { + n += 2 + } + l = len(m.Key) + if l > 0 { + n += 1 + l + sovListening(uint64(l)) + } + l = len(m.Value) + if l > 0 { + n += 1 + l + sovListening(uint64(l)) + } + return n +} + +func sovListening(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozListening(x uint64) (n int) { + return sovListening(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *StoreKVPair) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StoreKVPair: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StoreKVPair: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StoreKey", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthListening + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthListening + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.StoreKey = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Delete", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Delete = bool(v != 0) + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthListening + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthListening + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...) + if m.Key == nil { + m.Key = []byte{} + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowListening + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthListening + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthListening + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...) + if m.Value == nil { + m.Value = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipListening(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthListening + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipListening(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowListening + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowListening + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowListening + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthListening + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupListening + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthListening + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthListening = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowListening = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupListening = fmt.Errorf("proto: unexpected end of group") +) diff --git a/store/types/listening_test.go b/store/types/listening_test.go new file mode 100644 index 000000000000..96c59ca752ce --- /dev/null +++ b/store/types/listening_test.go @@ -0,0 +1,65 @@ +package types + +import ( + "bytes" + "testing" + + "github.com/cosmos/cosmos-sdk/codec" + "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/stretchr/testify/require" +) + +func TestNewStoreKVPairWriteListener(t *testing.T) { + testWriter := new(bytes.Buffer) + interfaceRegistry := types.NewInterfaceRegistry() + testMarshaller := codec.NewProtoCodec(interfaceRegistry) + + wl := NewStoreKVPairWriteListener(testWriter, testMarshaller) + + require.IsType(t, &StoreKVPairWriteListener{}, wl) + require.Equal(t, testWriter, wl.writer) + require.Equal(t, testMarshaller, wl.marshaller) +} + +func TestOnWrite(t *testing.T) { + testWriter := new(bytes.Buffer) + interfaceRegistry := types.NewInterfaceRegistry() + testMarshaller := codec.NewProtoCodec(interfaceRegistry) + + wl := NewStoreKVPairWriteListener(testWriter, testMarshaller) + + testStoreKey := NewKVStoreKey("test_key") + testKey := []byte("testing123") + testValue := []byte("testing321") + + // test set + err := wl.OnWrite(testStoreKey, testKey, testValue, false) + require.Nil(t, err) + + outputBytes := testWriter.Bytes() + outputKVPair := new(StoreKVPair) + expectedOutputKVPair := &StoreKVPair{ + Key: testKey, + Value: testValue, + StoreKey: testStoreKey.Name(), + Delete: false, + } + testMarshaller.UnmarshalBinaryLengthPrefixed(outputBytes, outputKVPair) + require.EqualValues(t, expectedOutputKVPair, outputKVPair) + testWriter.Reset() + + // test delete + err = wl.OnWrite(testStoreKey, testKey, testValue, true) + require.Nil(t, err) + + outputBytes = testWriter.Bytes() + outputKVPair = new(StoreKVPair) + expectedOutputKVPair = &StoreKVPair{ + Key: testKey, + Value: testValue, + StoreKey: testStoreKey.Name(), + Delete: true, + } + testMarshaller.UnmarshalBinaryLengthPrefixed(outputBytes, outputKVPair) + require.EqualValues(t, expectedOutputKVPair, outputKVPair) +} diff --git a/store/types/store.go b/store/types/store.go index 8da2b26fd1f8..630cd1d040b9 100644 --- a/store/types/store.go +++ b/store/types/store.go @@ -130,6 +130,13 @@ type MultiStore interface { // implied that the caller should update the context when necessary between // tracing operations. The modified MultiStore is returned. SetTracingContext(TraceContext) MultiStore + + // ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey + ListeningEnabled(key StoreKey) bool + + // AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey + // It appends the listeners to a current set, if one already exists + AddListeners(key StoreKey, listeners []WriteListener) } // From MultiStore.CacheMultiStore().... @@ -253,6 +260,9 @@ type CacheWrap interface { // CacheWrapWithTrace recursively wraps again with tracing enabled. CacheWrapWithTrace(w io.Writer, tc TraceContext) CacheWrap + + // CacheWrapWithListeners recursively wraps again with listening enabled + CacheWrapWithListeners(storeKey StoreKey, listeners []WriteListener) CacheWrap } type CacheWrapper interface { @@ -261,6 +271,9 @@ type CacheWrapper interface { // CacheWrapWithTrace branches a store with tracing enabled. CacheWrapWithTrace(w io.Writer, tc TraceContext) CacheWrap + + // CacheWrapWithListeners recursively wraps again with listening enabled + CacheWrapWithListeners(storeKey StoreKey, listeners []WriteListener) CacheWrap } func (cid CommitID) IsZero() bool {