Skip to content

Commit

Permalink
Extracted endorser transaction mapping functionality for re-use
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Dec 25, 2024
1 parent 5b0424a commit 2f9301b
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 59 deletions.
5 changes: 3 additions & 2 deletions docs/fabric/fabricdev/core/fabricdev/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProv
)
}

func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
func newInterceptor(logger vault.Logger, rwSet vault.ReadWriteSet, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
return vault.NewInterceptor[fdriver.ValidationCode](
logger,
rwSet,
qe,
txIDStore,
txID,
Expand All @@ -53,7 +54,7 @@ func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDSt
// populator is the custom populator for FabricDEV
type populator struct{}

func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error {
func (p *populator) Populate([]byte, ...driver.Namespace) (vault.ReadWriteSet, error) {
panic("implement me")
}

Expand Down
18 changes: 0 additions & 18 deletions platform/common/core/generic/vault/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,6 @@ type Inspector struct {
Rws ReadWriteSet
}

func NewInspector() *Inspector {
return &Inspector{
Rws: ReadWriteSet{
ReadSet: ReadSet{
OrderedReads: map[string][]string{},
Reads: Reads{},
},
WriteSet: WriteSet{
OrderedWrites: map[string][]string{},
Writes: Writes{},
},
MetaWriteSet: MetaWriteSet{
MetaWrites: NamespaceKeyedMetaWrites{},
},
},
}
}

func (i *Inspector) IsValid() error {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion platform/common/core/generic/vault/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func EmptyRWSet() ReadWriteSet {

func NewInterceptor[V driver.ValidationCode](
logger Logger,
rwSet ReadWriteSet,
qe VersionedQueryExecutor,
txIDStore TXIDStoreReader[V],
txID driver.TxID,
Expand All @@ -69,7 +70,7 @@ func NewInterceptor[V driver.ValidationCode](
TxID: txID,
QE: qe,
TxIDStore: txIDStore,
Rws: EmptyRWSet(),
Rws: rwSet,
vcProvider: vcProvider,
Marshaller: marshaller,
VersionComparator: versionComparator,
Expand Down
4 changes: 2 additions & 2 deletions platform/common/core/generic/vault/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestConcurrency(t *testing.T) {
qe := mocks.NewMockQE()
idsr := mocks.MockTXIDStoreReader{}

i := newInterceptor(logging.MustGetLogger("interceptor_test"), qe, idsr, "1")
i := newInterceptor(logging.MustGetLogger("interceptor_test"), EmptyRWSet(), qe, idsr, "1")
s, err := i.GetState("ns", "key")
assert.NoError(t, err)
assert.Equal(t, qe.State.Raw, s, "with no opts, getstate should return the FromStorage value (query executor)")
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestConcurrency(t *testing.T) {
func TestAddReadAt(t *testing.T) {
qe := mocks.MockQE{}
idsr := mocks.MockTXIDStoreReader{}
i := newInterceptor(logging.MustGetLogger("interceptor_test"), qe, idsr, "1")
i := newInterceptor(logging.MustGetLogger("interceptor_test"), EmptyRWSet(), qe, idsr, "1")

assert.NoError(t, i.AddReadAt("ns", "key", []byte("version")))
assert.Len(t, i.RWs().Reads, 1)
Expand Down
21 changes: 10 additions & 11 deletions platform/common/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ type TxInterceptor interface {
}

type Populator interface {
Populate(rws *ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error
Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (ReadWriteSet, error)
}

type Marshaller interface {
Marshal(txID string, rws *ReadWriteSet) ([]byte, error)
Append(destination *ReadWriteSet, raw []byte, nss ...string) error
}

type NewInterceptorFunc[V driver.ValidationCode] func(logger Logger, qe VersionedQueryExecutor, txidStore TXIDStoreReader[V], txid driver.TxID) TxInterceptor
type NewInterceptorFunc[V driver.ValidationCode] func(logger Logger, rwSet ReadWriteSet, qe VersionedQueryExecutor, txidStore TXIDStoreReader[V], txid driver.TxID) TxInterceptor

type (
VersionedPersistence = dbdriver.VersionedPersistence
Expand Down Expand Up @@ -447,7 +447,7 @@ func (db *Vault[V]) NewRWSet(txID driver.TxID) (driver.RWSet, error) {

func (db *Vault[V]) NewInspector(txID driver.TxID) (TxInterceptor, error) {
db.logger.Debugf("NewRWSet[%s][%d]", txID, db.counter.Load())
i := db.newInterceptor(db.logger, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)
i := db.newInterceptor(db.logger, EmptyRWSet(), &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)

db.interceptorsLock.Lock()
if _, in := db.Interceptors[txID]; in {
Expand All @@ -469,12 +469,13 @@ func (db *Vault[V]) NewInspector(txID driver.TxID) (TxInterceptor, error) {

func (db *Vault[V]) GetRWSet(txID driver.TxID, rwsetBytes []byte) (driver.RWSet, error) {
db.logger.Debugf("GetRWSet[%s][%d]", txID, db.counter.Load())
i := db.newInterceptor(db.logger, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)

if err := db.populator.Populate(i.RWs(), rwsetBytes); err != nil {
rwSet, err := db.populator.Populate(rwsetBytes)
if err != nil {
return nil, errors.Wrapf(err, "failed populating tx [%s]", txID)
}

i := db.newInterceptor(db.logger, rwSet, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)

db.interceptorsLock.Lock()
if i, in := db.Interceptors[txID]; in {
if !i.IsClosed() {
Expand All @@ -496,13 +497,11 @@ func (db *Vault[V]) GetRWSet(txID driver.TxID, rwsetBytes []byte) (driver.RWSet,
}

func (db *Vault[V]) InspectRWSet(rwsetBytes []byte, namespaces ...driver.Namespace) (driver.RWSet, error) {
i := NewInspector()

if err := db.populator.Populate(&i.Rws, rwsetBytes, namespaces...); err != nil {
rwSet, err := db.populator.Populate(rwsetBytes, namespaces...)
if err != nil {
return nil, errors.Wrapf(err, "failed populating ephemeral txID")
}

return i, nil
return &Inspector{Rws: rwSet}, nil
}

func (db *Vault[V]) Match(txID driver.TxID, rwsRaw []byte) error {
Expand Down
10 changes: 8 additions & 2 deletions platform/common/core/generic/vault/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ func (p *testArtifactProvider) NewMarshaller() Marshaller {

func newInterceptor(
logger Logger,
rwSet ReadWriteSet,
qe VersionedQueryExecutor,
txidStore TXIDStoreReader[ValidationCode],
txid driver2.TxID,
) TxInterceptor {
return NewInterceptor[ValidationCode](
logger,
rwSet,
qe,
txidStore,
txid,
Expand All @@ -90,8 +92,12 @@ type populator struct {
marshaller marshaller
}

func (p *populator) Populate(rws *ReadWriteSet, rwsetBytes []byte, namespaces ...driver2.Namespace) error {
return p.marshaller.Append(rws, rwsetBytes, namespaces...)
func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver2.Namespace) (ReadWriteSet, error) {
rwSet := EmptyRWSet()
if err := p.marshaller.Append(&rwSet, rwsetBytes, namespaces...); err != nil {
return ReadWriteSet{}, err
}
return rwSet, nil
}

type marshaller struct{}
Expand Down
28 changes: 18 additions & 10 deletions platform/fabric/core/generic/committer/endorsertx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,9 @@ func (c *Committer) HandleEndorserTransaction(ctx context.Context, block *common
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("[%s] EndorserClient transaction received: %s", c.ChannelConfig.ID(), tx.TxID)
}
if len(block.Metadata) < int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
return nil, errors.Errorf("block metadata lacks transaction filter")
}

fabricValidationCode := ValidationFlags(block.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[tx.TxNum]
event := &FinalityEvent{
Ctx: ctx,
TxID: tx.TxID,
ValidationCode: convertValidationCode(int32(fabricValidationCode)),
ValidationMessage: pb.TxValidationCode_name[int32(fabricValidationCode)],
fabricValidationCode, event, err := MapFinalityEvent(ctx, block, tx.TxNum, tx.TxID)
if err != nil {
return nil, err
}

switch pb.TxValidationCode(fabricValidationCode) {
Expand Down Expand Up @@ -62,6 +55,21 @@ func (c *Committer) HandleEndorserTransaction(ctx context.Context, block *common
return event, nil
}

func MapFinalityEvent(ctx context.Context, block *common.BlockMetadata, txNum driver.TxNum, txID string) (uint8, *FinalityEvent, error) {
if len(block.Metadata) < int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
return 0, nil, errors.Errorf("block metadata lacks transaction filter")
}

fabricValidationCode := ValidationFlags(block.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[txNum]
event := &FinalityEvent{
Ctx: ctx,
TxID: txID,
ValidationCode: convertValidationCode(int32(fabricValidationCode)),
ValidationMessage: pb.TxValidationCode_name[int32(fabricValidationCode)],
}
return fabricValidationCode, event, nil
}

// GetChaincodeEvents reads the chaincode events and notifies the listeners registered to the specific chaincode.
func (c *Committer) GetChaincodeEvents(env *common.Envelope, blockNum driver2.BlockNum) error {
chaincodeEvent, err := readChaincodeEvent(env, blockNum)
Expand Down
25 changes: 25 additions & 0 deletions platform/fabric/core/generic/rwset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package rwset
import (
"fmt"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/core/generic/vault"
vault2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/vault"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
"github.com/hyperledger/fabric-protos-go/common"
)
Expand Down Expand Up @@ -36,3 +38,26 @@ func (h *endorserTransactionHandler) Load(payl *common.Payload, chdr *common.Cha
}
return rws, upe, nil
}

type endorserTransactionReader struct {
network string
populator vault.Populator
}

func NewEndorserTransactionReader(network string) *endorserTransactionReader {
return &endorserTransactionReader{
network: network,
populator: vault2.NewPopulator(),
}
}

func (h *endorserTransactionReader) Read(payl *common.Payload, chdr *common.ChannelHeader) (vault.ReadWriteSet, error) {
upe, err := UnpackEnvelopeFromPayloadAndCHHeader(h.network, payl, chdr)
if err != nil {
return vault.ReadWriteSet{}, fmt.Errorf("failed unpacking envelope [%s]: %w", chdr.TxId, err)
}

logger.Debugf("retrieve rws [%s,%s]", h.network, chdr.TxId)

return h.populator.Populate(upe.Results)
}
22 changes: 14 additions & 8 deletions platform/fabric/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,17 @@ func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProv
txIDStore,
&fdriver.ValidationCodeProvider{},
newInterceptor,
&populator{},
NewPopulator(),
metricsProvider,
tracerProvider,
&vault.BlockTxIndexVersionBuilder{},
)
}

func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
func newInterceptor(logger vault.Logger, rwSet vault.ReadWriteSet, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
return vault.NewInterceptor[fdriver.ValidationCode](
logger,
rwSet,
qe,
txIDStore,
txID,
Expand All @@ -68,18 +69,23 @@ type populator struct {
versionMarshaller vault.BlockTxIndexVersionMarshaller
}

func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error {
func NewPopulator() *populator {
return &populator{}
}

func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (vault.ReadWriteSet, error) {
txRWSet := &rwset.TxReadWriteSet{}
err := proto.Unmarshal(rwsetBytes, txRWSet)
if err != nil {
return errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
}

rwsIn, err := rwsetutil.TxRwSetFromProtoMsg(txRWSet)
if err != nil {
return errors.Wrapf(err, "provided invalid read-write set bytes, TxRwSetFromProtoMsg failed")
return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, TxRwSetFromProtoMsg failed")
}

rws := vault.EmptyRWSet()
namespaceSet := collections.NewSet(namespaces...)
for _, nsrws := range rwsIn.NsRwSets {
ns := nsrws.NameSpace
Expand All @@ -101,7 +107,7 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa

for _, write := range nsrws.KvRwSet.Writes {
if err := rws.WriteSet.Add(ns, write.Key, write.Value); err != nil {
return err
return vault.ReadWriteSet{}, err
}
}

Expand All @@ -112,12 +118,12 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa
}

if err := rws.MetaWriteSet.Add(ns, metaWrite.Key, metadata); err != nil {
return err
return vault.ReadWriteSet{}, err
}
}
}

return nil
return rws, nil
}

type marshaller struct {
Expand Down
13 changes: 8 additions & 5 deletions platform/orion/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ type Interceptor struct {

func newInterceptor(
logger vault.Logger,
rwSet vault.ReadWriteSet,
qe vault.VersionedQueryExecutor,
txidStore vault.TXIDStoreReader[odriver.ValidationCode],
txid string,
) vault.TxInterceptor {
return &Interceptor{Interceptor: vault.NewInterceptor[odriver.ValidationCode](
logger,
rwSet,
qe,
txidStore,
txid,
Expand Down Expand Up @@ -88,11 +90,12 @@ type populator struct {
versionMarshaller vault.BlockTxIndexVersionMarshaller
}

func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error {
func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (vault.ReadWriteSet, error) {
rws := vault.EmptyRWSet()
txRWSet := &types.DataTx{}
err := proto.Unmarshal(rwsetBytes, txRWSet)
if err != nil {
return errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
}

for _, operation := range txRWSet.DbOperations {
Expand Down Expand Up @@ -120,7 +123,7 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa
write.Key,
write.Value,
); err != nil {
return errors.Wrapf(err, "failed to add write to read-write set")
return vault.ReadWriteSet{}, errors.Wrapf(err, "failed to add write to read-write set")
}
// TODO: What about write.ACL? Shall we store it as metadata?
}
Expand All @@ -131,10 +134,10 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa
del.Key,
nil,
); err != nil {
return errors.Wrapf(err, "failed to add delete to read-write set")
return vault.ReadWriteSet{}, errors.Wrapf(err, "failed to add delete to read-write set")
}
}
}

return nil
return rws, nil
}

0 comments on commit 2f9301b

Please sign in to comment.