Skip to content

Commit

Permalink
introduce query policy
Browse files Browse the repository at this point in the history
Signed-off-by: Angelo De Caro <[email protected]>
  • Loading branch information
adecaro committed Dec 12, 2024
1 parent 72bb066 commit c69666f
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 22 deletions.
10 changes: 7 additions & 3 deletions integration/fabric/atsa/chaincode/views/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ type ReadAssetView struct {

func (r *ReadAssetView) Call(context view.Context) (interface{}, error) {
res, err := context.RunView(
chaincode.NewQueryView(
"asset_transfer", "ReadAsset", r.ID,
).WithEndorsersFromMyOrg().WithNumRetries(2).WithRetrySleep(2 * time.Second).WithMatchEndorsementPolicy())
chaincode.NewQueryView("asset_transfer", "ReadAsset", r.ID).
WithEndorsersFromMyOrg().
WithNumRetries(2).
WithRetrySleep(2 * time.Second).
WithMatchEndorsementPolicy().
WithQueryPolicy(chaincode.QueryOne),
)
assert.NoError(err, "failed reading asset")
return res, nil
}
Expand Down
18 changes: 18 additions & 0 deletions platform/fabric/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
)

// QueryPolicy defines the policy to use to decide if a query is successful
type QueryPolicy int

const (
// QueryAll requires an answer from all selected peers
QueryAll QueryPolicy = iota
// QueryMajority requires an answer from the majority of the selected peers
QueryMajority
// QueryOne requires an answer from at least one of the selected peers
QueryOne
)

type Envelope struct {
e driver.Envelope
}
Expand Down Expand Up @@ -249,6 +261,12 @@ func (i *ChaincodeQuery) WithRetrySleep(duration time.Duration) *ChaincodeQuery
return i
}

// WithQueryPolicy sets the query policy to use
func (i *ChaincodeQuery) WithQueryPolicy(policy QueryPolicy) *ChaincodeQuery {
i.ChaincodeInvocation.WithQueryPolicy(driver.QueryPolicy(policy))
return i
}

type ChaincodeEndorse struct {
ChaincodeInvocation driver.ChaincodeInvocation
}
Expand Down
4 changes: 3 additions & 1 deletion platform/fabric/core/generic/chaincode/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ func NewChaincode(
}

func (c *Chaincode) NewInvocation(function string, args ...interface{}) driver.ChaincodeInvocation {
return NewInvoke(c, function, args...)
return NewInvoke(c, func(chaincode *Chaincode) driver.ChaincodeDiscover {
return NewDiscovery(chaincode)
}, function, args...)
}

func (c *Chaincode) NewDiscover() driver.ChaincodeDiscover {
Expand Down
80 changes: 63 additions & 17 deletions platform/fabric/core/generic/chaincode/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bytes"
"context"
"encoding/base64"
"fmt"
"strconv"
"strings"
"sync"
Expand All @@ -27,6 +28,8 @@ import (
"go.uber.org/zap/zapcore"
)

type NewChaincodeDiscoverFunc = func(chaincode *Chaincode) driver.ChaincodeDiscover

type Invoke struct {
Chaincode *Chaincode
TxID driver.TxID
Expand All @@ -46,16 +49,20 @@ type Invoke struct {
NumRetries int
RetrySleep time.Duration
Context context.Context
QueryPolicy driver.QueryPolicy
NewChaincodeDiscover NewChaincodeDiscoverFunc
}

func NewInvoke(chaincode *Chaincode, function string, args ...interface{}) *Invoke {
func NewInvoke(chaincode *Chaincode, newChaincodeDiscover NewChaincodeDiscoverFunc, function string, args ...interface{}) *Invoke {
return &Invoke{
Chaincode: chaincode,
ChaincodeName: chaincode.name,
Function: function,
Args: args,
NumRetries: int(chaincode.NumRetries),
RetrySleep: chaincode.RetrySleep,
Chaincode: chaincode,

ChaincodeName: chaincode.name,
Function: function,
Args: args,
NumRetries: int(chaincode.NumRetries),
RetrySleep: chaincode.RetrySleep,
NewChaincodeDiscover: newChaincodeDiscover,
}
}

Expand Down Expand Up @@ -257,6 +264,11 @@ func (i *Invoke) WithRetrySleep(duration time.Duration) driver.ChaincodeInvocati
return i
}

func (i *Invoke) WithQueryPolicy(policy driver.QueryPolicy) driver.ChaincodeInvocation {
i.QueryPolicy = policy
return i
}

func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalResponse, driver.SigningIdentity, error) {
var peerClients []services.PeerClient
defer func() {
Expand Down Expand Up @@ -297,7 +309,7 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon

// discover
var err error
discovery := NewDiscovery(
discovery := i.NewChaincodeDiscover(
i.Chaincode,
)
discovery.WithFilterByMSPIDs(
Expand Down Expand Up @@ -327,7 +339,9 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
}
}

// get a peer client for all discovered peers
n := len(discoveredPeers)
// get a peer client for all discovered peers and collect the errors
var errs []error
for _, peer := range discoveredPeers {
peerClient, err := i.Chaincode.Services.NewPeerClient(grpc.ConnectionConfig{
Address: peer.Endpoint,
Expand All @@ -336,19 +350,28 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
TLSRootCertBytes: peer.TLSRootCerts,
})
if err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "error getting endorser client for %s", peer.Endpoint)
errs = append(errs, errors.WithMessagef(err, "error getting endorser client for %s", peer.Endpoint))
continue
}
peerClients = append(peerClients, peerClient)
}
if err := i.checkQueryPolicy(errs, len(peerClients), n); err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given discovered peers")
}

// get endorser clients
errs = nil
for _, client := range peerClients {
endorserClient, err := client.EndorserClient()
if err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "error getting endorser client for %s", client.Address())
errs = append(errs, errors.WithMessagef(err, "error getting endorser client for %s", client.Address()))
continue
}
endorserClients = append(endorserClients, endorserClient)
}
if err := i.checkQueryPolicy(errs, len(endorserClients), n); err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given peer clients")
}
if len(endorserClients) == 0 {
return "", nil, nil, nil, errors.New("no endorser clients retrieved with the current filters")
}
Expand All @@ -366,15 +389,17 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
}

// collect responses
responses, err := i.collectResponses(endorserClients, signedProp)
responses, errs := i.collectResponses(endorserClients, signedProp)
if err != nil {
return "", nil, nil, nil, errors.Wrapf(err, "failed collecting proposal responses")
}

if len(responses) == 0 {
// this should only happen if some new code has introduced a bug
return "", nil, nil, nil, errors.New("no proposal responses received - this might indicate a bug")
}
if err := i.checkQueryPolicy(errs, len(responses), n); err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given peer clients")
}

return txID, prop, responses, signer, nil
}
Expand Down Expand Up @@ -440,12 +465,12 @@ func (i *Invoke) createChaincodeProposalWithTxIDAndTransient(typ common.HeaderTy
}

// collectResponses sends a signed proposal to a set of peers, and gathers all the responses.
func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedProposal *pb.SignedProposal) ([]*pb.ProposalResponse, error) {
func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedProposal *pb.SignedProposal) ([]*pb.ProposalResponse, []error) {
responsesCh := make(chan *pb.ProposalResponse, len(endorserClients))
errorCh := make(chan error, len(endorserClients))
wg := sync.WaitGroup{}
wg.Add(len(endorserClients))
for _, endorser := range endorserClients {
wg.Add(1)
go func(endorser pb.EndorserClient) {
defer wg.Done()
proposalResp, err := endorser.ProcessProposal(context.Background(), signedProposal)
Expand All @@ -459,14 +484,15 @@ func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedPro
wg.Wait()
close(responsesCh)
close(errorCh)
var errs []error
for err := range errorCh {
return nil, err
errs = append(errs, err)
}
var responses []*pb.ProposalResponse
for response := range responsesCh {
responses = append(responses, response)
}
return responses, nil
return responses, errs
}

// getChaincodeSpec get chaincode spec
Expand Down Expand Up @@ -529,3 +555,23 @@ func (i *Invoke) broadcast(txID string, env *common.Envelope) error {
}
return i.Chaincode.Finality.IsFinal(context.Background(), txID)
}

func (i *Invoke) checkQueryPolicy(errs []error, successes int, n int) error {
switch i.QueryPolicy {
case driver.QueryAll:
if len(errs) != 0 {
return errors.Errorf("query all policy, no errors expected [%v]", errs)
}
case driver.QueryOne:
if successes == 0 {
return errors.Errorf("query one policy, errors occurred [%v]", errs)
}
case driver.QueryMajority:
if successes <= n/2 {
return errors.Errorf("query majority policy, no majority reached [%v]", errs)
}
default:
panic(fmt.Sprintf("programming error, policy [%d] is not valid", i.QueryPolicy))
}
return nil
}
16 changes: 16 additions & 0 deletions platform/fabric/driver/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ type TxID struct {
Creator []byte
}

// QueryPolicy defines the policy to use to decide if a query is successful
type QueryPolicy int

const (
// QueryAll requires an answer from all selected peers
QueryAll QueryPolicy = iota
// QueryMajority requires an answer from the majority of the selected peers
QueryMajority
// QueryOne requires an answer from at least one of the selected peers
QueryOne
)

// ChaincodeInvocation models a client-side chaincode invocation
type ChaincodeInvocation interface {
Endorse() (Envelope, error)
Expand Down Expand Up @@ -56,6 +68,8 @@ type ChaincodeInvocation interface {
WithRetrySleep(duration time.Duration) ChaincodeInvocation

WithContext(context context.Context) ChaincodeInvocation

WithQueryPolicy(policy QueryPolicy) ChaincodeInvocation
}

// DiscoveredPeer contains the information of a discovered peer
Expand All @@ -76,6 +90,8 @@ type ChaincodeDiscover interface {
Call() ([]DiscoveredPeer, error)
WithFilterByMSPIDs(mspIDs ...string) ChaincodeDiscover
WithImplicitCollections(mspIDs ...string) ChaincodeDiscover
WithForQuery() ChaincodeDiscover
ChaincodeVersion() (string, error)
}

// Chaincode exposes chaincode-related functions
Expand Down
7 changes: 7 additions & 0 deletions platform/fabric/services/chaincode/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
)

Expand All @@ -33,6 +34,7 @@ type Query interface {
Call() ([]byte, error)
WithNumRetries(retries uint)
WithRetrySleep(sleep time.Duration)
WithQueryPolicy(policy driver.QueryPolicy) Query
}

type Chaincode interface {
Expand Down Expand Up @@ -116,6 +118,11 @@ func (s *stdQuery) WithRetrySleep(duration time.Duration) {
s.chq.WithRetrySleep(duration)
}

func (s *stdQuery) WithQueryPolicy(policy driver.QueryPolicy) Query {
s.chq.WithQueryPolicy(fabric.QueryPolicy(policy))
return s
}

type stdChaincode struct {
ch *fabric.Chaincode
}
Expand Down
23 changes: 22 additions & 1 deletion platform/fabric/services/chaincode/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,26 @@ import (
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
"github.com/pkg/errors"
)

// QueryPolicy defines the policy to use to decide if a query is successful
type QueryPolicy = driver.QueryPolicy

const (
// QueryAll requires an answer from all selected peers
QueryAll = driver.QueryAll
// QueryMajority requires an answer from the majority of the selected peers
QueryMajority = driver.QueryMajority
// QueryOne requires an answer from at least one of the selected peers
QueryOne = driver.QueryOne
)

type queryChaincodeView struct {
*InvokeCall
policy QueryPolicy
}

func NewQueryView(chaincode, function string, args ...interface{}) *queryChaincodeView {
Expand Down Expand Up @@ -54,7 +68,9 @@ func (i *queryChaincodeView) Query(context view.Context) ([]byte, error) {
chaincode = &stdChaincode{ch: stdChannelChaincode}
logger.Debugf("chaincode [%s:%s:%s] is a standard chaincode", i.Network, i.Channel, i.ChaincodeName)

invocation := chaincode.Query(i.Function, i.Args...).WithInvokerIdentity(i.InvokerIdentity)
invocation := chaincode.Query(i.Function, i.Args...).
WithInvokerIdentity(i.InvokerIdentity).
WithQueryPolicy(i.policy)
for k, v := range i.TransientMap {
invocation.WithTransientEntry(k, v)
}
Expand Down Expand Up @@ -126,3 +142,8 @@ func (i *queryChaincodeView) WithRetrySleep(duration time.Duration) *queryChainc
i.RetrySleep = duration
return i
}

func (i *queryChaincodeView) WithQueryPolicy(policy QueryPolicy) *queryChaincodeView {
i.policy = policy
return i
}

0 comments on commit c69666f

Please sign in to comment.