Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extracted functionality for re-use #717

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/fabric/fabricdev/core/fabricdev/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProv
)
}

func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
func newInterceptor(logger vault.Logger, rwSet vault.ReadWriteSet, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
return vault.NewInterceptor[fdriver.ValidationCode](
logger,
rwSet,
qe,
txIDStore,
txID,
Expand All @@ -53,7 +54,7 @@ func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDSt
// populator is the custom populator for FabricDEV
type populator struct{}

func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error {
func (p *populator) Populate([]byte, ...driver.Namespace) (vault.ReadWriteSet, error) {
panic("implement me")
}

Expand Down
18 changes: 0 additions & 18 deletions platform/common/core/generic/vault/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,6 @@ type Inspector struct {
Rws ReadWriteSet
}

func NewInspector() *Inspector {
return &Inspector{
Rws: ReadWriteSet{
ReadSet: ReadSet{
OrderedReads: map[string][]string{},
Reads: Reads{},
},
WriteSet: WriteSet{
OrderedWrites: map[string][]string{},
Writes: Writes{},
},
MetaWriteSet: MetaWriteSet{
MetaWrites: NamespaceKeyedMetaWrites{},
},
},
}
}

func (i *Inspector) IsValid() error {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion platform/common/core/generic/vault/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func EmptyRWSet() ReadWriteSet {

func NewInterceptor[V driver.ValidationCode](
logger Logger,
rwSet ReadWriteSet,
qe VersionedQueryExecutor,
txIDStore TXIDStoreReader[V],
txID driver.TxID,
Expand All @@ -69,7 +70,7 @@ func NewInterceptor[V driver.ValidationCode](
TxID: txID,
QE: qe,
TxIDStore: txIDStore,
Rws: EmptyRWSet(),
Rws: rwSet,
vcProvider: vcProvider,
Marshaller: marshaller,
VersionComparator: versionComparator,
Expand Down
4 changes: 2 additions & 2 deletions platform/common/core/generic/vault/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestConcurrency(t *testing.T) {
qe := mocks.NewMockQE()
idsr := mocks.MockTXIDStoreReader{}

i := newInterceptor(logging.MustGetLogger("interceptor_test"), qe, idsr, "1")
i := newInterceptor(logging.MustGetLogger("interceptor_test"), EmptyRWSet(), qe, idsr, "1")
s, err := i.GetState("ns", "key")
assert.NoError(t, err)
assert.Equal(t, qe.State.Raw, s, "with no opts, getstate should return the FromStorage value (query executor)")
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestConcurrency(t *testing.T) {
func TestAddReadAt(t *testing.T) {
qe := mocks.MockQE{}
idsr := mocks.MockTXIDStoreReader{}
i := newInterceptor(logging.MustGetLogger("interceptor_test"), qe, idsr, "1")
i := newInterceptor(logging.MustGetLogger("interceptor_test"), EmptyRWSet(), qe, idsr, "1")

assert.NoError(t, i.AddReadAt("ns", "key", []byte("version")))
assert.Len(t, i.RWs().Reads, 1)
Expand Down
21 changes: 10 additions & 11 deletions platform/common/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ type TxInterceptor interface {
}

type Populator interface {
Populate(rws *ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error
Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (ReadWriteSet, error)
}

type Marshaller interface {
Marshal(txID string, rws *ReadWriteSet) ([]byte, error)
Append(destination *ReadWriteSet, raw []byte, nss ...string) error
}

type NewInterceptorFunc[V driver.ValidationCode] func(logger Logger, qe VersionedQueryExecutor, txidStore TXIDStoreReader[V], txid driver.TxID) TxInterceptor
type NewInterceptorFunc[V driver.ValidationCode] func(logger Logger, rwSet ReadWriteSet, qe VersionedQueryExecutor, txidStore TXIDStoreReader[V], txid driver.TxID) TxInterceptor

type (
VersionedPersistence = dbdriver.VersionedPersistence
Expand Down Expand Up @@ -447,7 +447,7 @@ func (db *Vault[V]) NewRWSet(txID driver.TxID) (driver.RWSet, error) {

func (db *Vault[V]) NewInspector(txID driver.TxID) (TxInterceptor, error) {
db.logger.Debugf("NewRWSet[%s][%d]", txID, db.counter.Load())
i := db.newInterceptor(db.logger, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)
i := db.newInterceptor(db.logger, EmptyRWSet(), &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)

db.interceptorsLock.Lock()
if _, in := db.Interceptors[txID]; in {
Expand All @@ -469,12 +469,13 @@ func (db *Vault[V]) NewInspector(txID driver.TxID) (TxInterceptor, error) {

func (db *Vault[V]) GetRWSet(txID driver.TxID, rwsetBytes []byte) (driver.RWSet, error) {
db.logger.Debugf("GetRWSet[%s][%d]", txID, db.counter.Load())
i := db.newInterceptor(db.logger, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)

if err := db.populator.Populate(i.RWs(), rwsetBytes); err != nil {
rwSet, err := db.populator.Populate(rwsetBytes)
if err != nil {
return nil, errors.Wrapf(err, "failed populating tx [%s]", txID)
}

i := db.newInterceptor(db.logger, rwSet, &interceptorQueryExecutor[V]{db}, db.txIDStore, txID)

db.interceptorsLock.Lock()
if i, in := db.Interceptors[txID]; in {
if !i.IsClosed() {
Expand All @@ -496,13 +497,11 @@ func (db *Vault[V]) GetRWSet(txID driver.TxID, rwsetBytes []byte) (driver.RWSet,
}

func (db *Vault[V]) InspectRWSet(rwsetBytes []byte, namespaces ...driver.Namespace) (driver.RWSet, error) {
i := NewInspector()

if err := db.populator.Populate(&i.Rws, rwsetBytes, namespaces...); err != nil {
rwSet, err := db.populator.Populate(rwsetBytes, namespaces...)
if err != nil {
return nil, errors.Wrapf(err, "failed populating ephemeral txID")
}

return i, nil
return &Inspector{Rws: rwSet}, nil
}

func (db *Vault[V]) Match(txID driver.TxID, rwsRaw []byte) error {
Expand Down
10 changes: 8 additions & 2 deletions platform/common/core/generic/vault/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ func (p *testArtifactProvider) NewMarshaller() Marshaller {

func newInterceptor(
logger Logger,
rwSet ReadWriteSet,
qe VersionedQueryExecutor,
txidStore TXIDStoreReader[ValidationCode],
txid driver2.TxID,
) TxInterceptor {
return NewInterceptor[ValidationCode](
logger,
rwSet,
qe,
txidStore,
txid,
Expand All @@ -90,8 +92,12 @@ type populator struct {
marshaller marshaller
}

func (p *populator) Populate(rws *ReadWriteSet, rwsetBytes []byte, namespaces ...driver2.Namespace) error {
return p.marshaller.Append(rws, rwsetBytes, namespaces...)
func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver2.Namespace) (ReadWriteSet, error) {
rwSet := EmptyRWSet()
if err := p.marshaller.Append(&rwSet, rwsetBytes, namespaces...); err != nil {
return ReadWriteSet{}, err
}
return rwSet, nil
}

type marshaller struct{}
Expand Down
28 changes: 18 additions & 10 deletions platform/fabric/core/generic/committer/endorsertx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,9 @@ func (c *Committer) HandleEndorserTransaction(ctx context.Context, block *common
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("[%s] EndorserClient transaction received: %s", c.ChannelConfig.ID(), tx.TxID)
}
if len(block.Metadata) < int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
return nil, errors.Errorf("block metadata lacks transaction filter")
}

fabricValidationCode := ValidationFlags(block.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[tx.TxNum]
event := &FinalityEvent{
Ctx: ctx,
TxID: tx.TxID,
ValidationCode: convertValidationCode(int32(fabricValidationCode)),
ValidationMessage: pb.TxValidationCode_name[int32(fabricValidationCode)],
fabricValidationCode, event, err := MapFinalityEvent(ctx, block, tx.TxNum, tx.TxID)
if err != nil {
return nil, err
}

switch pb.TxValidationCode(fabricValidationCode) {
Expand Down Expand Up @@ -62,6 +55,21 @@ func (c *Committer) HandleEndorserTransaction(ctx context.Context, block *common
return event, nil
}

func MapFinalityEvent(ctx context.Context, block *common.BlockMetadata, txNum driver.TxNum, txID string) (uint8, *FinalityEvent, error) {
if len(block.Metadata) < int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
return 0, nil, errors.Errorf("block metadata lacks transaction filter")
}

fabricValidationCode := ValidationFlags(block.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])[txNum]
event := &FinalityEvent{
Ctx: ctx,
TxID: txID,
ValidationCode: convertValidationCode(int32(fabricValidationCode)),
ValidationMessage: pb.TxValidationCode_name[int32(fabricValidationCode)],
}
return fabricValidationCode, event, nil
}

// GetChaincodeEvents reads the chaincode events and notifies the listeners registered to the specific chaincode.
func (c *Committer) GetChaincodeEvents(env *common.Envelope, blockNum driver2.BlockNum) error {
chaincodeEvent, err := readChaincodeEvent(env, blockNum)
Expand Down
25 changes: 25 additions & 0 deletions platform/fabric/core/generic/rwset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package rwset
import (
"fmt"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/core/generic/vault"
vault2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/vault"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
"github.com/hyperledger/fabric-protos-go/common"
)
Expand Down Expand Up @@ -36,3 +38,26 @@ func (h *endorserTransactionHandler) Load(payl *common.Payload, chdr *common.Cha
}
return rws, upe, nil
}

type endorserTransactionReader struct {
network string
populator vault.Populator
}

func NewEndorserTransactionReader(network string) *endorserTransactionReader {
return &endorserTransactionReader{
network: network,
populator: vault2.NewPopulator(),
}
}

func (h *endorserTransactionReader) Read(payl *common.Payload, chdr *common.ChannelHeader) (vault.ReadWriteSet, error) {
upe, err := UnpackEnvelopeFromPayloadAndCHHeader(h.network, payl, chdr)
if err != nil {
return vault.ReadWriteSet{}, fmt.Errorf("failed unpacking envelope [%s]: %w", chdr.TxId, err)
}

logger.Debugf("retrieve rws [%s,%s]", h.network, chdr.TxId)

return h.populator.Populate(upe.Results)
}
22 changes: 14 additions & 8 deletions platform/fabric/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,17 @@ func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProv
txIDStore,
&fdriver.ValidationCodeProvider{},
newInterceptor,
&populator{},
NewPopulator(),
metricsProvider,
tracerProvider,
&vault.BlockTxIndexVersionBuilder{},
)
}

func newInterceptor(logger vault.Logger, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
func newInterceptor(logger vault.Logger, rwSet vault.ReadWriteSet, qe vault.VersionedQueryExecutor, txIDStore TXIDStoreReader, txID string) vault.TxInterceptor {
return vault.NewInterceptor[fdriver.ValidationCode](
logger,
rwSet,
qe,
txIDStore,
txID,
Expand All @@ -68,18 +69,23 @@ type populator struct {
versionMarshaller vault.BlockTxIndexVersionMarshaller
}

func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error {
func NewPopulator() *populator {
return &populator{}
}

func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (vault.ReadWriteSet, error) {
txRWSet := &rwset.TxReadWriteSet{}
err := proto.Unmarshal(rwsetBytes, txRWSet)
if err != nil {
return errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
}

rwsIn, err := rwsetutil.TxRwSetFromProtoMsg(txRWSet)
if err != nil {
return errors.Wrapf(err, "provided invalid read-write set bytes, TxRwSetFromProtoMsg failed")
return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, TxRwSetFromProtoMsg failed")
}

rws := vault.EmptyRWSet()
namespaceSet := collections.NewSet(namespaces...)
for _, nsrws := range rwsIn.NsRwSets {
ns := nsrws.NameSpace
Expand All @@ -101,7 +107,7 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa

for _, write := range nsrws.KvRwSet.Writes {
if err := rws.WriteSet.Add(ns, write.Key, write.Value); err != nil {
return err
return vault.ReadWriteSet{}, err
}
}

Expand All @@ -112,12 +118,12 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa
}

if err := rws.MetaWriteSet.Add(ns, metaWrite.Key, metadata); err != nil {
return err
return vault.ReadWriteSet{}, err
}
}
}

return nil
return rws, nil
}

type marshaller struct {
Expand Down
13 changes: 8 additions & 5 deletions platform/orion/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ type Interceptor struct {

func newInterceptor(
logger vault.Logger,
rwSet vault.ReadWriteSet,
qe vault.VersionedQueryExecutor,
txidStore vault.TXIDStoreReader[odriver.ValidationCode],
txid string,
) vault.TxInterceptor {
return &Interceptor{Interceptor: vault.NewInterceptor[odriver.ValidationCode](
logger,
rwSet,
qe,
txidStore,
txid,
Expand Down Expand Up @@ -88,11 +90,12 @@ type populator struct {
versionMarshaller vault.BlockTxIndexVersionMarshaller
}

func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespaces ...driver.Namespace) error {
func (p *populator) Populate(rwsetBytes []byte, namespaces ...driver.Namespace) (vault.ReadWriteSet, error) {
rws := vault.EmptyRWSet()
txRWSet := &types.DataTx{}
err := proto.Unmarshal(rwsetBytes, txRWSet)
if err != nil {
return errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
return vault.ReadWriteSet{}, errors.Wrapf(err, "provided invalid read-write set bytes, unmarshal failed")
}

for _, operation := range txRWSet.DbOperations {
Expand Down Expand Up @@ -120,7 +123,7 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa
write.Key,
write.Value,
); err != nil {
return errors.Wrapf(err, "failed to add write to read-write set")
return vault.ReadWriteSet{}, errors.Wrapf(err, "failed to add write to read-write set")
}
// TODO: What about write.ACL? Shall we store it as metadata?
}
Expand All @@ -131,10 +134,10 @@ func (p *populator) Populate(rws *vault.ReadWriteSet, rwsetBytes []byte, namespa
del.Key,
nil,
); err != nil {
return errors.Wrapf(err, "failed to add delete to read-write set")
return vault.ReadWriteSet{}, errors.Wrapf(err, "failed to add delete to read-write set")
}
}
}

return nil
return rws, nil
}
Loading