From 2f9301bb79cae449d855f3c0e696cc8d3a8f345d Mon Sep 17 00:00:00 2001 From: Alexandros Filios Date: Thu, 5 Dec 2024 17:51:52 +0100 Subject: [PATCH] Extracted endorser transaction mapping functionality for re-use Signed-off-by: Alexandros Filios --- .../fabricdev/core/fabricdev/vault/vault.go | 5 ++-- .../common/core/generic/vault/inspector.go | 18 ------------ .../common/core/generic/vault/interceptor.go | 3 +- .../core/generic/vault/interceptor_test.go | 4 +-- platform/common/core/generic/vault/vault.go | 21 +++++++------- .../common/core/generic/vault/vault_test.go | 10 +++++-- .../core/generic/committer/endorsertx.go | 28 ++++++++++++------- platform/fabric/core/generic/rwset/handler.go | 25 +++++++++++++++++ platform/fabric/core/generic/vault/vault.go | 22 +++++++++------ platform/orion/core/generic/vault/vault.go | 13 +++++---- 10 files changed, 90 insertions(+), 59 deletions(-) diff --git a/docs/fabric/fabricdev/core/fabricdev/vault/vault.go b/docs/fabric/fabricdev/core/fabricdev/vault/vault.go index c3953553e..7028ff442 100644 --- a/docs/fabric/fabricdev/core/fabricdev/vault/vault.go +++ b/docs/fabric/fabricdev/core/fabricdev/vault/vault.go @@ -38,9 +38,10 @@ func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProv ) } -func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor { +func newInterceptor(logger vault.Logger, rwSet vault.ReadWriteSet, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor { return vault.NewInterceptor[fdriver.ValidationCode]( logger, + rwSet, qe, txIDStore, txID, @@ -53,7 +54,7 @@ func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDSt // populator is the custom populator for FabricDEV type populator struct{} -func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error { +func (p *populator) Populate([]byte, ...driver.Namespace) (vault.ReadWriteSet, error) { panic("implement me") } diff --git a/platform/common/core/generic/vault/inspector.go b/platform/common/core/generic/vault/inspector.go index 497867349..20e907d14 100644 --- a/platform/common/core/generic/vault/inspector.go +++ b/platform/common/core/generic/vault/inspector.go @@ -15,24 +15,6 @@ type Inspector struct { Rws ReadWriteSet } -func NewInspector() *Inspector { - return &Inspector{ - Rws: ReadWriteSet{ - ReadSet: ReadSet{ - OrderedReads: map[string][]string{}, - Reads: Reads{}, - }, - WriteSet: WriteSet{ - OrderedWrites: map[string][]string{}, - Writes: Writes{}, - }, - MetaWriteSet: MetaWriteSet{ - MetaWrites: NamespaceKeyedMetaWrites{}, - }, - }, - } -} - func (i *Inspector) IsValid() error { return nil } diff --git a/platform/common/core/generic/vault/interceptor.go b/platform/common/core/generic/vault/interceptor.go index fe6a9db15..12ffe2ef6 100644 --- a/platform/common/core/generic/vault/interceptor.go +++ b/platform/common/core/generic/vault/interceptor.go @@ -55,6 +55,7 @@ func EmptyRWSet() ReadWriteSet { func NewInterceptor[V driver.ValidationCode]( logger Logger, + rwSet ReadWriteSet, qe VersionedQueryExecutor, txIDStore TXIDStoreReader[V], txID driver.TxID, @@ -69,7 +70,7 @@ func NewInterceptor[V driver.ValidationCode]( TxID: txID, QE: qe, TxIDStore: txIDStore, - Rws: EmptyRWSet(), + Rws: rwSet, vcProvider: vcProvider, Marshaller: marshaller, VersionComparator: versionComparator, diff --git a/platform/common/core/generic/vault/interceptor_test.go b/platform/common/core/generic/vault/interceptor_test.go index b7d65eae2..e361b095c 100644 --- a/platform/common/core/generic/vault/interceptor_test.go +++ b/platform/common/core/generic/vault/interceptor_test.go @@ -20,7 +20,7 @@ func TestConcurrency(t *testing.T) { qe := mocks.NewMockQE() idsr := mocks.MockTXIDStoreReader{} - i := newInterceptor(logging.MustGetLogger("interceptor_test"), qe, idsr, "1") + i := newInterceptor(logging.MustGetLogger("interceptor_test"), EmptyRWSet(), qe, idsr, "1") s, err := i.GetState("ns", "key") assert.NoError(t, err) assert.Equal(t, qe.State.Raw, s, "with no opts, getstate should return the FromStorage value (query executor)") @@ -64,7 +64,7 @@ func TestConcurrency(t *testing.T) { func TestAddReadAt(t *testing.T) { qe := mocks.MockQE{} idsr := mocks.MockTXIDStoreReader{} - i := newInterceptor(logging.MustGetLogger("interceptor_test"), qe, idsr, "1") + i := newInterceptor(logging.MustGetLogger("interceptor_test"), EmptyRWSet(), qe, idsr, "1") assert.NoError(t, i.AddReadAt("ns", "key", []byte("version"))) assert.Len(t, i.RWs().Reads, 1) diff --git a/platform/common/core/generic/vault/vault.go b/platform/common/core/generic/vault/vault.go index 70e722151..acbafa782 100644 --- a/platform/common/core/generic/vault/vault.go +++ b/platform/common/core/generic/vault/vault.go @@ -48,7 +48,7 @@ type TxInterceptor interface { } type Populator interface { - Populate(rws *ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error + Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (ReadWriteSet, error) } type Marshaller interface { @@ -56,7 +56,7 @@ type Marshaller interface { Append(destination *ReadWriteSet, raw []byte, nss ...string) error } -type NewInterceptorFunc[V driver.ValidationCode] func(logger Logger, qe VersionedQueryExecutor, txidStore TXIDStoreReader[V], txid driver.TxID) TxInterceptor +type NewInterceptorFunc[V driver.ValidationCode] func(logger Logger, rwSet ReadWriteSet, qe VersionedQueryExecutor, txidStore TXIDStoreReader[V], txid driver.TxID) TxInterceptor type ( VersionedPersistence = dbdriver.VersionedPersistence @@ -447,7 +447,7 @@ func (db *Vault[V]) NewRWSet(txID driver.TxID) (driver.RWSet, error) { func (db *Vault[V]) NewInspector(txID driver.TxID) (TxInterceptor, error) { db.logger.Debugf("NewRWSet[%s][%d]", txID, db.counter.Load()) - i := db.newInterceptor(db.logger, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID) + i := db.newInterceptor(db.logger, EmptyRWSet(), &interceptorQueryExecutor[V]{db}, db.txIDStore, txID) db.interceptorsLock.Lock() if _, in := db.Interceptors[txID]; in { @@ -469,12 +469,13 @@ func (db *Vault[V]) NewInspector(txID driver.TxID) (TxInterceptor, error) { func (db *Vault[V]) GetRWSet(txID driver.TxID, rwsetBytes []byte) (driver.RWSet, error) { db.logger.Debugf("GetRWSet[%s][%d]", txID, db.counter.Load()) - i := db.newInterceptor(db.logger, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID) - - if err := db.populator.Populate(i.RWs(), rwsetBytes); err != nil { + rwSet, err := db.populator.Populate(rwsetBytes) + if err != nil { return nil, errors.Wrapf(err, "failed populating tx [%s]", txID) } + i := db.newInterceptor(db.logger, rwSet, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID) + db.interceptorsLock.Lock() if i, in := db.Interceptors[txID]; in { if !i.IsClosed() { @@ -496,13 +497,11 @@ func (db *Vault[V]) GetRWSet(txID driver.TxID, rwsetBytes []byte) (driver.RWSet, } func (db *Vault[V]) InspectRWSet(rwsetBytes []byte, namespaces ...driver.Namespace) (driver.RWSet, error) { - i := NewInspector() - - if err := db.populator.Populate(&i.Rws, rwsetBytes, namespaces...); err != nil { + rwSet, err := db.populator.Populate(rwsetBytes, namespaces...) + if err != nil { return nil, errors.Wrapf(err, "failed populating ephemeral txID") } - - return i, nil + return &Inspector{Rws: rwSet}, nil } func (db *Vault[V]) Match(txID driver.TxID, rwsRaw []byte) error { diff --git a/platform/common/core/generic/vault/vault_test.go b/platform/common/core/generic/vault/vault_test.go index c7a693cd7..d5333ae55 100644 --- a/platform/common/core/generic/vault/vault_test.go +++ b/platform/common/core/generic/vault/vault_test.go @@ -71,12 +71,14 @@ func (p *testArtifactProvider) NewMarshaller() Marshaller { func newInterceptor( logger Logger, + rwSet ReadWriteSet, qe VersionedQueryExecutor, txidStore TXIDStoreReader[ValidationCode], txid driver2.TxID, ) TxInterceptor { return NewInterceptor[ValidationCode]( logger, + rwSet, qe, txidStore, txid, @@ -90,8 +92,12 @@ type populator struct { marshaller marshaller } -func (p *populator) Populate(rws *ReadWriteSet, rwsetBytes []byte, namespaces ...driver2.Namespace) error { - return p.marshaller.Append(rws, rwsetBytes, namespaces...) +func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver2.Namespace) (ReadWriteSet, error) { + rwSet := EmptyRWSet() + if err := p.marshaller.Append(&rwSet, rwsetBytes, namespaces...); err != nil { + return ReadWriteSet{}, err + } + return rwSet, nil } type marshaller struct{} diff --git a/platform/fabric/core/generic/committer/endorsertx.go b/platform/fabric/core/generic/committer/endorsertx.go index fdb967697..2924fc4ec 100644 --- a/platform/fabric/core/generic/committer/endorsertx.go +++ b/platform/fabric/core/generic/committer/endorsertx.go @@ -24,16 +24,9 @@ func (c *Committer) HandleEndorserTransaction(ctx context.Context, block *common if logger.IsEnabledFor(zapcore.DebugLevel) { logger.Debugf("[%s] EndorserClient transaction received: %s", c.ChannelConfig.ID(), tx.TxID) } - if len(block.Metadata) < int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) { - return nil, errors.Errorf("block metadata lacks transaction filter") - } - - fabricValidationCode := ValidationFlags(block.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[tx.TxNum] - event := &FinalityEvent{ - Ctx: ctx, - TxID: tx.TxID, - ValidationCode: convertValidationCode(int32(fabricValidationCode)), - ValidationMessage: pb.TxValidationCode_name[int32(fabricValidationCode)], + fabricValidationCode, event, err := MapFinalityEvent(ctx, block, tx.TxNum, tx.TxID) + if err != nil { + return nil, err } switch pb.TxValidationCode(fabricValidationCode) { @@ -62,6 +55,21 @@ func (c *Committer) HandleEndorserTransaction(ctx context.Context, block *common return event, nil } +func MapFinalityEvent(ctx context.Context, block *common.BlockMetadata, txNum driver.TxNum, txID string) (uint8, *FinalityEvent, error) { + if len(block.Metadata) < int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) { + return 0, nil, errors.Errorf("block metadata lacks transaction filter") + } + + fabricValidationCode := ValidationFlags(block.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[txNum] + event := &FinalityEvent{ + Ctx: ctx, + TxID: txID, + ValidationCode: convertValidationCode(int32(fabricValidationCode)), + ValidationMessage: pb.TxValidationCode_name[int32(fabricValidationCode)], + } + return fabricValidationCode, event, nil +} + // GetChaincodeEvents reads the chaincode events and notifies the listeners registered to the specific chaincode. func (c *Committer) GetChaincodeEvents(env *common.Envelope, blockNum driver2.BlockNum) error { chaincodeEvent, err := readChaincodeEvent(env, blockNum) diff --git a/platform/fabric/core/generic/rwset/handler.go b/platform/fabric/core/generic/rwset/handler.go index 2147f83be..3198289eb 100644 --- a/platform/fabric/core/generic/rwset/handler.go +++ b/platform/fabric/core/generic/rwset/handler.go @@ -9,6 +9,8 @@ package rwset import ( "fmt" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/core/generic/vault" + vault2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/vault" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver" "github.com/hyperledger/fabric-protos-go/common" ) @@ -36,3 +38,26 @@ func (h *endorserTransactionHandler) Load(payl *common.Payload, chdr *common.Cha } return rws, upe, nil } + +type endorserTransactionReader struct { + network string + populator vault.Populator +} + +func NewEndorserTransactionReader(network string) *endorserTransactionReader { + return &endorserTransactionReader{ + network: network, + populator: vault2.NewPopulator(), + } +} + +func (h *endorserTransactionReader) Read(payl *common.Payload, chdr *common.ChannelHeader) (vault.ReadWriteSet, error) { + upe, err := UnpackEnvelopeFromPayloadAndCHHeader(h.network, payl, chdr) + if err != nil { + return vault.ReadWriteSet{}, fmt.Errorf("failed unpacking envelope [%s]: %w", chdr.TxId, err) + } + + logger.Debugf("retrieve rws [%s,%s]", h.network, chdr.TxId) + + return h.populator.Populate(upe.Results) +} diff --git a/platform/fabric/core/generic/vault/vault.go b/platform/fabric/core/generic/vault/vault.go index 1483b28c7..b304218f1 100644 --- a/platform/fabric/core/generic/vault/vault.go +++ b/platform/fabric/core/generic/vault/vault.go @@ -45,16 +45,17 @@ func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProv txIDStore, &fdriver.ValidationCodeProvider{}, newInterceptor, - &populator{}, + NewPopulator(), metricsProvider, tracerProvider, &vault.BlockTxIndexVersionBuilder{}, ) } -func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor { +func newInterceptor(logger vault.Logger, rwSet vault.ReadWriteSet, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor { return vault.NewInterceptor[fdriver.ValidationCode]( logger, + rwSet, qe, txIDStore, txID, @@ -68,18 +69,23 @@ type populator struct { versionMarshaller vault.BlockTxIndexVersionMarshaller } -func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error { +func NewPopulator() *populator { + return &populator{} +} + +func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (vault.ReadWriteSet, error) { txRWSet := &rwset.TxReadWriteSet{} err := proto.Unmarshal(rwsetBytes, txRWSet) if err != nil { - return errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed") + return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed") } rwsIn, err := rwsetutil.TxRwSetFromProtoMsg(txRWSet) if err != nil { - return errors.Wrapf(err, "provided invalid read-write set bytes, TxRwSetFromProtoMsg failed") + return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, TxRwSetFromProtoMsg failed") } + rws := vault.EmptyRWSet() namespaceSet := collections.NewSet(namespaces...) for _, nsrws := range rwsIn.NsRwSets { ns := nsrws.NameSpace @@ -101,7 +107,7 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa for _, write := range nsrws.KvRwSet.Writes { if err := rws.WriteSet.Add(ns, write.Key, write.Value); err != nil { - return err + return vault.ReadWriteSet{}, err } } @@ -112,12 +118,12 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa } if err := rws.MetaWriteSet.Add(ns, metaWrite.Key, metadata); err != nil { - return err + return vault.ReadWriteSet{}, err } } } - return nil + return rws, nil } type marshaller struct { diff --git a/platform/orion/core/generic/vault/vault.go b/platform/orion/core/generic/vault/vault.go index f73f1e801..54bc1a982 100644 --- a/platform/orion/core/generic/vault/vault.go +++ b/platform/orion/core/generic/vault/vault.go @@ -51,12 +51,14 @@ type Interceptor struct { func newInterceptor( logger vault.Logger, + rwSet vault.ReadWriteSet, qe vault.VersionedQueryExecutor, txidStore vault.TXIDStoreReader[odriver.ValidationCode], txid string, ) vault.TxInterceptor { return &Interceptor{Interceptor: vault.NewInterceptor[odriver.ValidationCode]( logger, + rwSet, qe, txidStore, txid, @@ -88,11 +90,12 @@ type populator struct { versionMarshaller vault.BlockTxIndexVersionMarshaller } -func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error { +func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (vault.ReadWriteSet, error) { + rws := vault.EmptyRWSet() txRWSet := &types.DataTx{} err := proto.Unmarshal(rwsetBytes, txRWSet) if err != nil { - return errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed") + return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed") } for _, operation := range txRWSet.DbOperations { @@ -120,7 +123,7 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa write.Key, write.Value, ); err != nil { - return errors.Wrapf(err, "failed to add write to read-write set") + return vault.ReadWriteSet{}, errors.Wrapf(err, "failed to add write to read-write set") } // TODO: What about write.ACL? Shall we store it as metadata? } @@ -131,10 +134,10 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa del.Key, nil, ); err != nil { - return errors.Wrapf(err, "failed to add delete to read-write set") + return vault.ReadWriteSet{}, errors.Wrapf(err, "failed to add delete to read-write set") } } } - return nil + return rws, nil }