diff --git a/integration/fabric/atsa/chaincode/views/assets.go b/integration/fabric/atsa/chaincode/views/assets.go index cc492a2a2..0f2e9bf55 100644 --- a/integration/fabric/atsa/chaincode/views/assets.go +++ b/integration/fabric/atsa/chaincode/views/assets.go @@ -60,9 +60,13 @@ type ReadAssetView struct { func (r *ReadAssetView) Call(context view.Context) (interface{}, error) { res, err := context.RunView( - chaincode.NewQueryView( - "asset_transfer", "ReadAsset", r.ID, - ).WithEndorsersFromMyOrg().WithNumRetries(2).WithRetrySleep(2 * time.Second).WithMatchEndorsementPolicy()) + chaincode.NewQueryView("asset_transfer", "ReadAsset", r.ID). + WithEndorsersFromMyOrg(). + WithNumRetries(2). + WithRetrySleep(2 * time.Second). + WithMatchEndorsementPolicy(). + WithQueryPolicy(chaincode.QueryOne), + ) assert.NoError(err, "failed reading asset") return res, nil } diff --git a/platform/fabric/chaincode.go b/platform/fabric/chaincode.go index 685702c8f..46a2e9a1d 100644 --- a/platform/fabric/chaincode.go +++ b/platform/fabric/chaincode.go @@ -15,6 +15,18 @@ import ( "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" ) +// QueryPolicy defines the policy to use to decide if a query is successful +type QueryPolicy int + +const ( + // QueryAll requires an answer from all selected peers + QueryAll QueryPolicy = iota + // QueryMajority requires an answer from the majority of the selected peers + QueryMajority + // QueryOne requires an answer from at least one of the selected peers + QueryOne +) + type Envelope struct { e driver.Envelope } @@ -249,6 +261,12 @@ func (i *ChaincodeQuery) WithRetrySleep(duration time.Duration) *ChaincodeQuery return i } +// WithQueryPolicy sets the query policy to use +func (i *ChaincodeQuery) WithQueryPolicy(policy QueryPolicy) *ChaincodeQuery { + i.ChaincodeInvocation.WithQueryPolicy(driver.QueryPolicy(policy)) + return i +} + type ChaincodeEndorse struct { ChaincodeInvocation driver.ChaincodeInvocation } diff --git a/platform/fabric/core/generic/chaincode/chaincode.go b/platform/fabric/core/generic/chaincode/chaincode.go index 8ce1cd484..fc35ddf13 100644 --- a/platform/fabric/core/generic/chaincode/chaincode.go +++ b/platform/fabric/core/generic/chaincode/chaincode.go @@ -88,7 +88,9 @@ func NewChaincode( } func (c *Chaincode) NewInvocation(function string, args ...interface{}) driver.ChaincodeInvocation { - return NewInvoke(c, function, args...) + return NewInvoke(c, func(chaincode *Chaincode) driver.ChaincodeDiscover { + return NewDiscovery(chaincode) + }, function, args...) } func (c *Chaincode) NewDiscover() driver.ChaincodeDiscover { diff --git a/platform/fabric/core/generic/chaincode/invoke.go b/platform/fabric/core/generic/chaincode/invoke.go index 6049e6831..51c283892 100644 --- a/platform/fabric/core/generic/chaincode/invoke.go +++ b/platform/fabric/core/generic/chaincode/invoke.go @@ -10,6 +10,7 @@ import ( "bytes" "context" "encoding/base64" + "fmt" "strconv" "strings" "sync" @@ -27,6 +28,8 @@ import ( "go.uber.org/zap/zapcore" ) +type NewChaincodeDiscoverFunc = func(chaincode *Chaincode) driver.ChaincodeDiscover + type Invoke struct { Chaincode *Chaincode TxID driver.TxID @@ -46,16 +49,20 @@ type Invoke struct { NumRetries int RetrySleep time.Duration Context context.Context + QueryPolicy driver.QueryPolicy + NewChaincodeDiscover NewChaincodeDiscoverFunc } -func NewInvoke(chaincode *Chaincode, function string, args ...interface{}) *Invoke { +func NewInvoke(chaincode *Chaincode, newChaincodeDiscover NewChaincodeDiscoverFunc, function string, args ...interface{}) *Invoke { return &Invoke{ - Chaincode: chaincode, - ChaincodeName: chaincode.name, - Function: function, - Args: args, - NumRetries: int(chaincode.NumRetries), - RetrySleep: chaincode.RetrySleep, + Chaincode: chaincode, + + ChaincodeName: chaincode.name, + Function: function, + Args: args, + NumRetries: int(chaincode.NumRetries), + RetrySleep: chaincode.RetrySleep, + NewChaincodeDiscover: newChaincodeDiscover, } } @@ -257,6 +264,11 @@ func (i *Invoke) WithRetrySleep(duration time.Duration) driver.ChaincodeInvocati return i } +func (i *Invoke) WithQueryPolicy(policy driver.QueryPolicy) driver.ChaincodeInvocation { + i.QueryPolicy = policy + return i +} + func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalResponse, driver.SigningIdentity, error) { var peerClients []services.PeerClient defer func() { @@ -297,7 +309,7 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon // discover var err error - discovery := NewDiscovery( + discovery := i.NewChaincodeDiscover( i.Chaincode, ) discovery.WithFilterByMSPIDs( @@ -327,7 +339,9 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon } } - // get a peer client for all discovered peers + n := len(discoveredPeers) + // get a peer client for all discovered peers and collect the errors + var errs []error for _, peer := range discoveredPeers { peerClient, err := i.Chaincode.Services.NewPeerClient(grpc.ConnectionConfig{ Address: peer.Endpoint, @@ -336,19 +350,28 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon TLSRootCertBytes: peer.TLSRootCerts, }) if err != nil { - return "", nil, nil, nil, errors.WithMessagef(err, "error getting endorser client for %s", peer.Endpoint) + errs = append(errs, errors.WithMessagef(err, "error getting endorser client for %s", peer.Endpoint)) + continue } peerClients = append(peerClients, peerClient) } + if err := i.checkQueryPolicy(errs, len(peerClients), n); err != nil { + return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given discovered peers") + } // get endorser clients + errs = nil for _, client := range peerClients { endorserClient, err := client.EndorserClient() if err != nil { - return "", nil, nil, nil, errors.WithMessagef(err, "error getting endorser client for %s", client.Address()) + errs = append(errs, errors.WithMessagef(err, "error getting endorser client for %s", client.Address())) + continue } endorserClients = append(endorserClients, endorserClient) } + if err := i.checkQueryPolicy(errs, len(endorserClients), n); err != nil { + return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given peer clients") + } if len(endorserClients) == 0 { return "", nil, nil, nil, errors.New("no endorser clients retrieved with the current filters") } @@ -366,15 +389,17 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon } // collect responses - responses, err := i.collectResponses(endorserClients, signedProp) + responses, errs := i.collectResponses(endorserClients, signedProp) if err != nil { return "", nil, nil, nil, errors.Wrapf(err, "failed collecting proposal responses") } - if len(responses) == 0 { // this should only happen if some new code has introduced a bug return "", nil, nil, nil, errors.New("no proposal responses received - this might indicate a bug") } + if err := i.checkQueryPolicy(errs, len(responses), n); err != nil { + return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given peer clients") + } return txID, prop, responses, signer, nil } @@ -440,12 +465,12 @@ func (i *Invoke) createChaincodeProposalWithTxIDAndTransient(typ common.HeaderTy } // collectResponses sends a signed proposal to a set of peers, and gathers all the responses. -func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedProposal *pb.SignedProposal) ([]*pb.ProposalResponse, error) { +func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedProposal *pb.SignedProposal) ([]*pb.ProposalResponse, []error) { responsesCh := make(chan *pb.ProposalResponse, len(endorserClients)) errorCh := make(chan error, len(endorserClients)) wg := sync.WaitGroup{} + wg.Add(len(endorserClients)) for _, endorser := range endorserClients { - wg.Add(1) go func(endorser pb.EndorserClient) { defer wg.Done() proposalResp, err := endorser.ProcessProposal(context.Background(), signedProposal) @@ -459,14 +484,15 @@ func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedPro wg.Wait() close(responsesCh) close(errorCh) + var errs []error for err := range errorCh { - return nil, err + errs = append(errs, err) } var responses []*pb.ProposalResponse for response := range responsesCh { responses = append(responses, response) } - return responses, nil + return responses, errs } // getChaincodeSpec get chaincode spec @@ -529,3 +555,23 @@ func (i *Invoke) broadcast(txID string, env *common.Envelope) error { } return i.Chaincode.Finality.IsFinal(context.Background(), txID) } + +func (i *Invoke) checkQueryPolicy(errs []error, successes int, n int) error { + switch i.QueryPolicy { + case driver.QueryAll: + if len(errs) != 0 { + return errors.Errorf("query all policy, no errors expected [%v]", errs) + } + case driver.QueryOne: + if successes == 0 { + return errors.Errorf("query one policy, errors occurred [%v]", errs) + } + case driver.QueryMajority: + if successes <= n/2 { + return errors.Errorf("query majority policy, no majority reached [%v]", errs) + } + default: + panic(fmt.Sprintf("programming error, policy [%d] is not valid", i.QueryPolicy)) + } + return nil +} diff --git a/platform/fabric/driver/chaincode.go b/platform/fabric/driver/chaincode.go index b9e47515c..2868e933d 100644 --- a/platform/fabric/driver/chaincode.go +++ b/platform/fabric/driver/chaincode.go @@ -19,6 +19,18 @@ type TxID struct { Creator []byte } +// QueryPolicy defines the policy to use to decide if a query is successful +type QueryPolicy int + +const ( + // QueryAll requires an answer from all selected peers + QueryAll QueryPolicy = iota + // QueryMajority requires an answer from the majority of the selected peers + QueryMajority + // QueryOne requires an answer from at least one of the selected peers + QueryOne +) + // ChaincodeInvocation models a client-side chaincode invocation type ChaincodeInvocation interface { Endorse() (Envelope, error) @@ -56,6 +68,8 @@ type ChaincodeInvocation interface { WithRetrySleep(duration time.Duration) ChaincodeInvocation WithContext(context context.Context) ChaincodeInvocation + + WithQueryPolicy(policy QueryPolicy) ChaincodeInvocation } // DiscoveredPeer contains the information of a discovered peer @@ -76,6 +90,8 @@ type ChaincodeDiscover interface { Call() ([]DiscoveredPeer, error) WithFilterByMSPIDs(mspIDs ...string) ChaincodeDiscover WithImplicitCollections(mspIDs ...string) ChaincodeDiscover + WithForQuery() ChaincodeDiscover + ChaincodeVersion() (string, error) } // Chaincode exposes chaincode-related functions diff --git a/platform/fabric/services/chaincode/chaincode.go b/platform/fabric/services/chaincode/chaincode.go index 9a6ffda01..93feefad9 100644 --- a/platform/fabric/services/chaincode/chaincode.go +++ b/platform/fabric/services/chaincode/chaincode.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" ) @@ -33,6 +34,7 @@ type Query interface { Call() ([]byte, error) WithNumRetries(retries uint) WithRetrySleep(sleep time.Duration) + WithQueryPolicy(policy driver.QueryPolicy) Query } type Chaincode interface { @@ -116,6 +118,11 @@ func (s *stdQuery) WithRetrySleep(duration time.Duration) { s.chq.WithRetrySleep(duration) } +func (s *stdQuery) WithQueryPolicy(policy driver.QueryPolicy) Query { + s.chq.WithQueryPolicy(fabric.QueryPolicy(policy)) + return s +} + type stdChaincode struct { ch *fabric.Chaincode } diff --git a/platform/fabric/services/chaincode/query.go b/platform/fabric/services/chaincode/query.go index 33f55e420..ce182ed17 100644 --- a/platform/fabric/services/chaincode/query.go +++ b/platform/fabric/services/chaincode/query.go @@ -10,12 +10,26 @@ import ( "time" "github.com/hyperledger-labs/fabric-smart-client/platform/fabric" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" "github.com/pkg/errors" ) +// QueryPolicy defines the policy to use to decide if a query is successful +type QueryPolicy = driver.QueryPolicy + +const ( + // QueryAll requires an answer from all selected peers + QueryAll = driver.QueryAll + // QueryMajority requires an answer from the majority of the selected peers + QueryMajority = driver.QueryMajority + // QueryOne requires an answer from at least one of the selected peers + QueryOne = driver.QueryOne +) + type queryChaincodeView struct { *InvokeCall + policy QueryPolicy } func NewQueryView(chaincode, function string, args ...interface{}) *queryChaincodeView { @@ -54,7 +68,9 @@ func (i *queryChaincodeView) Query(context view.Context) ([]byte, error) { chaincode = &stdChaincode{ch: stdChannelChaincode} logger.Debugf("chaincode [%s:%s:%s] is a standard chaincode", i.Network, i.Channel, i.ChaincodeName) - invocation := chaincode.Query(i.Function, i.Args...).WithInvokerIdentity(i.InvokerIdentity) + invocation := chaincode.Query(i.Function, i.Args...). + WithInvokerIdentity(i.InvokerIdentity). + WithQueryPolicy(i.policy) for k, v := range i.TransientMap { invocation.WithTransientEntry(k, v) } @@ -126,3 +142,8 @@ func (i *queryChaincodeView) WithRetrySleep(duration time.Duration) *queryChainc i.RetrySleep = duration return i } + +func (i *queryChaincodeView) WithQueryPolicy(policy QueryPolicy) *queryChaincodeView { + i.policy = policy + return i +}