Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce query policy #475

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions integration/fabric/atsa/chaincode/views/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ type ReadAssetView struct {

func (r *ReadAssetView) Call(context view.Context) (interface{}, error) {
res, err := context.RunView(
chaincode.NewQueryView(
"asset_transfer", "ReadAsset", r.ID,
).WithEndorsersFromMyOrg().WithNumRetries(2).WithRetrySleep(2 * time.Second).WithMatchEndorsementPolicy())
chaincode.NewQueryView("asset_transfer", "ReadAsset", r.ID).
WithEndorsersFromMyOrg().
WithNumRetries(2).
WithRetrySleep(2 * time.Second).
WithMatchEndorsementPolicy().
WithQueryPolicy(chaincode.QueryOne),
)
assert.NoError(err, "failed reading asset")
return res, nil
}
Expand Down
23 changes: 23 additions & 0 deletions platform/fabric/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
)

// QueryPolicy defines the policy to use to decide if a query is successful
type QueryPolicy int

const (
// QueryAll requires an answer from all selected peers
QueryAll QueryPolicy = iota
// QueryMajority requires an answer from the majority of the selected peers
QueryMajority
// QueryOne requires an answer from at least one of the selected peers
QueryOne
)

type Envelope struct {
e driver.Envelope
}
Expand Down Expand Up @@ -134,6 +146,11 @@ func (i *ChaincodeDiscover) WithFilterByMSPIDs(mspIDs ...string) *ChaincodeDisco
return i
}

func (i *ChaincodeDiscover) WithContext(context context.Context) *ChaincodeDiscover {
i.ChaincodeDiscover.WithContext(context)
return i
}

type ChaincodeInvocation struct {
driver.ChaincodeInvocation
}
Expand Down Expand Up @@ -249,6 +266,12 @@ func (i *ChaincodeQuery) WithRetrySleep(duration time.Duration) *ChaincodeQuery
return i
}

// WithQueryPolicy sets the query policy to use
func (i *ChaincodeQuery) WithQueryPolicy(policy QueryPolicy) *ChaincodeQuery {
i.ChaincodeInvocation.WithQueryPolicy(driver.QueryPolicy(policy))
return i
}

type ChaincodeEndorse struct {
ChaincodeInvocation driver.ChaincodeInvocation
}
Expand Down
4 changes: 3 additions & 1 deletion platform/fabric/core/generic/chaincode/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ func NewChaincode(
}

func (c *Chaincode) NewInvocation(function string, args ...interface{}) driver.ChaincodeInvocation {
return NewInvoke(c, function, args...)
return NewInvoke(c, func(chaincode *Chaincode) driver.ChaincodeDiscover {
return NewDiscovery(chaincode)
}, function, args...)
}

func (c *Chaincode) NewDiscover() driver.ChaincodeDiscover {
Expand Down
8 changes: 7 additions & 1 deletion platform/fabric/core/generic/chaincode/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ type Discovery struct {
QueryForPeers bool

DefaultTTL time.Duration
Context context.Context
}

func NewDiscovery(chaincode *Chaincode) *Discovery {
// set key to the concatenation of chaincode name and version
return &Discovery{
chaincode: chaincode,
DefaultTTL: chaincode.ChannelConfig.DiscoveryDefaultTTLS(),
Context: context.Background(),
}
}

Expand Down Expand Up @@ -230,7 +232,7 @@ func (d *Discovery) query(req *discovery.Request) (discovery.Response, error) {
ClientIdentity: signerRaw,
ClientTlsCertHash: ClientTLSCertHash,
}
timeout, cancel := context.WithTimeout(context.Background(), d.chaincode.ChannelConfig.DiscoveryTimeout())
timeout, cancel := context.WithTimeout(d.Context, d.chaincode.ChannelConfig.DiscoveryTimeout())
defer cancel()
cl, err := pc.DiscoveryClient()
if err != nil {
Expand Down Expand Up @@ -318,6 +320,10 @@ func (d *Discovery) ChaincodeVersion() (string, error) {
return "", errors.Errorf("chaincode [%s] not found", d.chaincode.name)
}

func (d *Discovery) WithContext(context context.Context) {
d.Context = context
}

func ccCall(ccNames ...string) []*peer.ChaincodeCall {
var call []*peer.ChaincodeCall
for _, ccName := range ccNames {
Expand Down
88 changes: 69 additions & 19 deletions platform/fabric/core/generic/chaincode/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bytes"
"context"
"encoding/base64"
"fmt"
"strconv"
"strings"
"sync"
Expand All @@ -27,6 +28,8 @@ import (
"go.uber.org/zap/zapcore"
)

type NewChaincodeDiscoverFunc = func(chaincode *Chaincode) driver.ChaincodeDiscover

type Invoke struct {
Chaincode *Chaincode
TxID driver.TxID
Expand All @@ -46,16 +49,21 @@ type Invoke struct {
NumRetries int
RetrySleep time.Duration
Context context.Context
QueryPolicy driver.QueryPolicy
NewChaincodeDiscover NewChaincodeDiscoverFunc
}

func NewInvoke(chaincode *Chaincode, function string, args ...interface{}) *Invoke {
func NewInvoke(chaincode *Chaincode, newChaincodeDiscover NewChaincodeDiscoverFunc, function string, args ...interface{}) *Invoke {
return &Invoke{
Chaincode: chaincode,
ChaincodeName: chaincode.name,
Function: function,
Args: args,
NumRetries: int(chaincode.NumRetries),
RetrySleep: chaincode.RetrySleep,
Chaincode: chaincode,

ChaincodeName: chaincode.name,
Function: function,
Args: args,
NumRetries: int(chaincode.NumRetries),
RetrySleep: chaincode.RetrySleep,
NewChaincodeDiscover: newChaincodeDiscover,
Context: context.Background(),
}
}

Expand Down Expand Up @@ -257,6 +265,11 @@ func (i *Invoke) WithRetrySleep(duration time.Duration) driver.ChaincodeInvocati
return i
}

func (i *Invoke) WithQueryPolicy(policy driver.QueryPolicy) driver.ChaincodeInvocation {
i.QueryPolicy = policy
return i
}

func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalResponse, driver.SigningIdentity, error) {
var peerClients []services.PeerClient
defer func() {
Expand Down Expand Up @@ -297,13 +310,15 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon

// discover
var err error
discovery := NewDiscovery(
discovery := i.NewChaincodeDiscover(
i.Chaincode,
)
discovery.WithFilterByMSPIDs(
i.EndorsersMSPIDs...,
).WithImplicitCollections(
i.ImplicitCollectionMSPIDs...,
).WithContext(
i.Context,
)
if query {
discovery.WithForQuery()
Expand All @@ -327,7 +342,9 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
}
}

// get a peer client for all discovered peers
n := len(discoveredPeers)
// get a peer client for all discovered peers and collect the errors
var errs []error
for _, peer := range discoveredPeers {
peerClient, err := i.Chaincode.Services.NewPeerClient(grpc.ConnectionConfig{
Address: peer.Endpoint,
Expand All @@ -336,19 +353,28 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
TLSRootCertBytes: peer.TLSRootCerts,
})
if err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "error getting endorser client for %s", peer.Endpoint)
errs = append(errs, errors.WithMessagef(err, "error getting endorser client for %s", peer.Endpoint))
continue
}
peerClients = append(peerClients, peerClient)
}
if err := i.checkQueryPolicy(errs, len(peerClients), n); err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given discovered peers")
}

// get endorser clients
errs = nil
for _, client := range peerClients {
endorserClient, err := client.EndorserClient()
if err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "error getting endorser client for %s", client.Address())
errs = append(errs, errors.WithMessagef(err, "error getting endorser client for %s", client.Address()))
continue
}
endorserClients = append(endorserClients, endorserClient)
}
if err := i.checkQueryPolicy(errs, len(endorserClients), n); err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given peer clients")
}
if len(endorserClients) == 0 {
return "", nil, nil, nil, errors.New("no endorser clients retrieved with the current filters")
}
Expand All @@ -366,15 +392,17 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
}

// collect responses
responses, err := i.collectResponses(endorserClients, signedProp)
responses, errs := i.collectResponses(endorserClients, signedProp)
if err != nil {
return "", nil, nil, nil, errors.Wrapf(err, "failed collecting proposal responses")
}

if len(responses) == 0 {
// this should only happen if some new code has introduced a bug
return "", nil, nil, nil, errors.New("no proposal responses received - this might indicate a bug")
}
if err := i.checkQueryPolicy(errs, len(responses), n); err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given peer clients")
}

return txID, prop, responses, signer, nil
}
Expand Down Expand Up @@ -440,15 +468,16 @@ func (i *Invoke) createChaincodeProposalWithTxIDAndTransient(typ common.HeaderTy
}

// collectResponses sends a signed proposal to a set of peers, and gathers all the responses.
func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedProposal *pb.SignedProposal) ([]*pb.ProposalResponse, error) {
func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedProposal *pb.SignedProposal) ([]*pb.ProposalResponse, []error) {
responsesCh := make(chan *pb.ProposalResponse, len(endorserClients))
errorCh := make(chan error, len(endorserClients))
wg := sync.WaitGroup{}
wg.Add(len(endorserClients))
for _, endorser := range endorserClients {
wg.Add(1)
go func(endorser pb.EndorserClient) {
defer wg.Done()
proposalResp, err := endorser.ProcessProposal(context.Background(), signedProposal)
// TODO: we could evaluate the policy already here after we get a result to see if still need more answers
proposalResp, err := endorser.ProcessProposal(i.Context, signedProposal)
if err != nil {
errorCh <- err
return
Expand All @@ -459,14 +488,15 @@ func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedPro
wg.Wait()
close(responsesCh)
close(errorCh)
var errs []error
for err := range errorCh {
return nil, err
errs = append(errs, err)
}
var responses []*pb.ProposalResponse
for response := range responsesCh {
responses = append(responses, response)
}
return responses, nil
return responses, errs
}

// getChaincodeSpec get chaincode spec
Expand Down Expand Up @@ -527,5 +557,25 @@ func (i *Invoke) broadcast(txID string, env *common.Envelope) error {
if err := i.Chaincode.Broadcaster.Broadcast(i.Context, env); err != nil {
return err
}
return i.Chaincode.Finality.IsFinal(context.Background(), txID)
return i.Chaincode.Finality.IsFinal(i.Context, txID)
}

func (i *Invoke) checkQueryPolicy(errs []error, successes int, n int) error {
switch i.QueryPolicy {
case driver.QueryAll:
if len(errs) != 0 {
return errors.Errorf("query all policy, no errors expected [%v]", errs)
}
case driver.QueryOne:
if successes == 0 {
return errors.Errorf("query one policy, errors occurred [%v]", errs)
}
case driver.QueryMajority:
if successes <= n/2 {
return errors.Errorf("query majority policy, no majority reached [%v]", errs)
}
default:
panic(fmt.Sprintf("programming error, policy [%d] is not valid", i.QueryPolicy))
}
return nil
}
17 changes: 17 additions & 0 deletions platform/fabric/driver/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ type TxID struct {
Creator []byte
}

// QueryPolicy defines the policy to use to decide if a query is successful
type QueryPolicy int

const (
// QueryAll requires an answer from all selected peers
QueryAll QueryPolicy = iota
// QueryMajority requires an answer from the majority of the selected peers
QueryMajority
// QueryOne requires an answer from at least one of the selected peers
QueryOne
)

// ChaincodeInvocation models a client-side chaincode invocation
type ChaincodeInvocation interface {
Endorse() (Envelope, error)
Expand Down Expand Up @@ -56,6 +68,8 @@ type ChaincodeInvocation interface {
WithRetrySleep(duration time.Duration) ChaincodeInvocation

WithContext(context context.Context) ChaincodeInvocation

WithQueryPolicy(policy QueryPolicy) ChaincodeInvocation
}

// DiscoveredPeer contains the information of a discovered peer
Expand All @@ -76,6 +90,9 @@ type ChaincodeDiscover interface {
Call() ([]DiscoveredPeer, error)
WithFilterByMSPIDs(mspIDs ...string) ChaincodeDiscover
WithImplicitCollections(mspIDs ...string) ChaincodeDiscover
WithForQuery() ChaincodeDiscover
ChaincodeVersion() (string, error)
WithContext(ctx context.Context)
}

// Chaincode exposes chaincode-related functions
Expand Down
7 changes: 7 additions & 0 deletions platform/fabric/services/chaincode/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
)

Expand All @@ -33,6 +34,7 @@ type Query interface {
Call() ([]byte, error)
WithNumRetries(retries uint)
WithRetrySleep(sleep time.Duration)
WithQueryPolicy(policy driver.QueryPolicy) Query
}

type Chaincode interface {
Expand Down Expand Up @@ -116,6 +118,11 @@ func (s *stdQuery) WithRetrySleep(duration time.Duration) {
s.chq.WithRetrySleep(duration)
}

func (s *stdQuery) WithQueryPolicy(policy driver.QueryPolicy) Query {
s.chq.WithQueryPolicy(fabric.QueryPolicy(policy))
return s
}

type stdChaincode struct {
ch *fabric.Chaincode
}
Expand Down
Loading
Loading