diff --git a/core/chaincode/chaincode_support.go b/core/chaincode/chaincode_support.go index ffdd503e06d..b5fd1e383fb 100644 --- a/core/chaincode/chaincode_support.go +++ b/core/chaincode/chaincode_support.go @@ -75,6 +75,8 @@ type ChaincodeSupport struct { UserRunsCC bool UseWriteBatch bool MaxSizeWriteBatch uint32 + UseGetMultipleKeys bool + MaxSizeGetMultipleKeys uint32 } // Launch starts executing chaincode if it is not already running. This method @@ -130,6 +132,8 @@ func (cs *ChaincodeSupport) HandleChaincodeStream(stream ccintf.ChaincodeStream) TotalQueryLimit: cs.TotalQueryLimit, UseWriteBatch: cs.UseWriteBatch, MaxSizeWriteBatch: cs.MaxSizeWriteBatch, + UseGetMultipleKeys: cs.UseGetMultipleKeys, + MaxSizeGetMultipleKeys: cs.MaxSizeGetMultipleKeys, } return handler.ProcessStream(stream) diff --git a/core/chaincode/config.go b/core/chaincode/config.go index 05a261aa194..255e7b01a8b 100644 --- a/core/chaincode/config.go +++ b/core/chaincode/config.go @@ -16,24 +16,27 @@ import ( ) const ( - defaultExecutionTimeout = 30 * time.Second - minimumStartupTimeout = 5 * time.Second - defaultMaxSizeWriteBatch = 1000 + defaultExecutionTimeout = 30 * time.Second + minimumStartupTimeout = 5 * time.Second + defaultMaxSizeWriteBatch = 1000 + defaultMaxSizeGetMultipleKeys = 1000 ) type Config struct { - TotalQueryLimit int - TLSEnabled bool - Keepalive time.Duration - ExecuteTimeout time.Duration - InstallTimeout time.Duration - StartupTimeout time.Duration - LogFormat string - LogLevel string - ShimLogLevel string - SCCAllowlist map[string]bool - UseWriteBatch bool - MaxSizeWriteBatch uint32 + TotalQueryLimit int + TLSEnabled bool + Keepalive time.Duration + ExecuteTimeout time.Duration + InstallTimeout time.Duration + StartupTimeout time.Duration + LogFormat string + LogLevel string + ShimLogLevel string + SCCAllowlist map[string]bool + UseWriteBatch bool + MaxSizeWriteBatch uint32 + UseGetMultipleKeys bool + MaxSizeGetMultipleKeys uint32 } func GlobalConfig() *Config { @@ -81,6 +84,13 @@ func (c *Config) load() { if c.MaxSizeWriteBatch <= 0 { c.MaxSizeWriteBatch = defaultMaxSizeWriteBatch } + if viper.IsSet("chaincode.runtimeParams.useGetMultipleKeys") { + c.UseGetMultipleKeys = viper.GetBool("chaincode.runtimeParams.useGetMultipleKeys") + } + c.MaxSizeGetMultipleKeys = viper.GetUint32("chaincode.runtimeParams.maxSizeGetMultipleKeys") + if c.MaxSizeGetMultipleKeys <= 0 { + c.MaxSizeGetMultipleKeys = defaultMaxSizeGetMultipleKeys + } } func parseBool(s string) bool { diff --git a/core/chaincode/handler.go b/core/chaincode/handler.go index 1a5eb2e20a1..83a2080d7bd 100644 --- a/core/chaincode/handler.go +++ b/core/chaincode/handler.go @@ -134,6 +134,10 @@ type Handler struct { UseWriteBatch bool // MaxSizeWriteBatch maximum batch size for the change segment MaxSizeWriteBatch uint32 + // UseGetMultipleKeys an indication that the peer can handle get multiple keys + UseGetMultipleKeys bool + // MaxSizeGetMultipleKeys maximum size of batches with get multiple keys + MaxSizeGetMultipleKeys uint32 // stateLock is used to read and set State. stateLock sync.RWMutex @@ -221,6 +225,8 @@ func (h *Handler) handleMessageReadyState(msg *pb.ChaincodeMessage) error { go h.HandleTransaction(msg, h.HandlePurgePrivateData) case pb.ChaincodeMessage_WRITE_BATCH_STATE: go h.HandleTransaction(msg, h.HandleWriteBatch) + case pb.ChaincodeMessage_GET_STATE_MULTIPLE: + go h.HandleTransaction(msg, h.HandleGetStateMultipleKeys) default: return fmt.Errorf("[%s] Fabric side handler cannot handle message (%s) while in ready state", msg.Txid, msg.Type) } @@ -449,8 +455,10 @@ func (h *Handler) sendReady() error { chaincodeLogger.Debugf("sending READY for chaincode %s", h.chaincodeID) chaincodeAdditionalParams := &pb.ChaincodeAdditionalParams{ - UseWriteBatch: h.UseWriteBatch, - MaxSizeWriteBatch: h.MaxSizeWriteBatch, + UseWriteBatch: h.UseWriteBatch, + MaxSizeWriteBatch: h.MaxSizeWriteBatch, + UseGetMultipleKeys: h.UseGetMultipleKeys, + MaxSizeGetMultipleKeys: h.MaxSizeGetMultipleKeys, } payloadBytes, err := proto.Marshal(chaincodeAdditionalParams) if err != nil { @@ -678,6 +686,46 @@ func (h *Handler) HandleGetState(msg *pb.ChaincodeMessage, txContext *Transactio return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil } +// Handles query to ledger to get state +func (h *Handler) HandleGetStateMultipleKeys(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) { + getState := &pb.GetStateMultiple{} + err := proto.Unmarshal(msg.Payload, getState) + if err != nil { + return nil, errors.Wrap(err, "unmarshal failed") + } + + var res [][]byte + namespaceID := txContext.NamespaceID + collection := getState.GetCollection() + chaincodeLogger.Debugf("[%s] getting state for chaincode %s, keys %v, channel %s", shorttxid(msg.Txid), namespaceID, getState.GetKeys(), txContext.ChannelID) + + if isCollectionSet(collection) { + if txContext.IsInitTransaction { + return nil, errors.New("private data APIs are not allowed in chaincode Init()") + } + if err = errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil { + return nil, err + } + res, err = txContext.TXSimulator.GetPrivateDataMultipleKeys(namespaceID, collection, getState.GetKeys()) + } else { + res, err = txContext.TXSimulator.GetStateMultipleKeys(namespaceID, getState.GetKeys()) + } + if err != nil { + return nil, errors.WithStack(err) + } + if len(res) == 0 { + chaincodeLogger.Debugf("[%s] No state associated with keys: %v. Sending %s with an empty payload", shorttxid(msg.Txid), getState.GetKeys(), pb.ChaincodeMessage_RESPONSE) + } + + payloadBytes, err := proto.Marshal(&pb.GetStateMultipleResult{Values: res}) + if err != nil { + return nil, errors.Wrap(err, "marshal failed") + } + + // Send response msg back to chaincode. GetState will not trigger event + return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil +} + func (h *Handler) HandleGetPrivateDataHash(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) { getState := &pb.GetState{} err := proto.Unmarshal(msg.Payload, getState) diff --git a/core/chaincode/handler_test.go b/core/chaincode/handler_test.go index 3810e0cff7f..05db9b8f9d2 100644 --- a/core/chaincode/handler_test.go +++ b/core/chaincode/handler_test.go @@ -123,10 +123,12 @@ var _ = Describe("Handler", func() { UUIDGenerator: chaincode.UUIDGeneratorFunc(func() string { return "generated-query-id" }), - AppConfig: fakeApplicationConfigRetriever, - Metrics: chaincodeMetrics, - UseWriteBatch: true, - MaxSizeWriteBatch: 1000, + AppConfig: fakeApplicationConfigRetriever, + Metrics: chaincodeMetrics, + UseWriteBatch: true, + MaxSizeWriteBatch: 1000, + UseGetMultipleKeys: true, + MaxSizeGetMultipleKeys: 1000, } chaincode.SetHandlerChatStream(handler, fakeChatStream) chaincode.SetHandlerChaincodeID(handler, "test-handler-name:1.0") @@ -2786,8 +2788,10 @@ var _ = Describe("Handler", func() { })) chaincodeAdditionalParams := &pb.ChaincodeAdditionalParams{ - UseWriteBatch: true, - MaxSizeWriteBatch: 1000, + UseWriteBatch: true, + MaxSizeWriteBatch: 1000, + UseGetMultipleKeys: true, + MaxSizeGetMultipleKeys: 1000, } payloadBytes, err := proto.Marshal(chaincodeAdditionalParams) Expect(err).NotTo(HaveOccurred()) diff --git a/integration/chaincode/multi/chaincode.go b/integration/chaincode/multi/chaincode.go index 0ca7bf62202..acd934b5b4c 100644 --- a/integration/chaincode/multi/chaincode.go +++ b/integration/chaincode/multi/chaincode.go @@ -52,6 +52,11 @@ func (t *Operations) Invoke(stub shim.ChaincodeStubInterface) *pb.Response { return shim.Error("Incorrect number of arguments. Expecting 1") } return t.putPrivateKey(stub, args[1]) + case "get-multiple-keys": + if len(args) != 1 { + return shim.Error("Incorrect number of arguments. Expecting 1") + } + return t.getMultiple(stub, args[0]) default: // error fmt.Println("invoke did not find func: " + function) @@ -97,3 +102,43 @@ func (t *Operations) putPrivateKey(stub shim.ChaincodeStubInterface, numberCalls } return shim.Success(nil) } + +// both params should be marshalled json data and base64 encoded +func (t *Operations) getMultiple(stub shim.ChaincodeStubInterface, countKeys string) *pb.Response { + num, _ := strconv.Atoi(countKeys) + + kyes := make([]string, 0, num) + + kyes = append(kyes, "non-exist-key") + for i := range num { + key := "key" + strconv.Itoa(i) + kyes = append(kyes, key) + } + + resps, err := stub.GetMultipleStates(kyes...) + if err != nil { + return shim.Error(err.Error()) + } + + if len(resps) != num+1 { + return shim.Error("number of results is not correct") + } + + // non exist key return nil + if resps[0] != nil { + errStr := fmt.Sprintf("incorrect result %d elem, got %v", 0, string(resps[0])) + return shim.Error(errStr) + } + + if string(resps[1]) != "key"+strconv.Itoa(0) { + errStr := fmt.Sprintf("incorrect result %d elem, got %v", 0, string(resps[1])) + return shim.Error(errStr) + } + + if string(resps[num]) != "key"+strconv.Itoa(num-1) { + errStr := fmt.Sprintf("incorrect result %d elem, got %v", 0, string(resps[num])) + return shim.Error(errStr) + } + + return shim.Success(nil) +} diff --git a/integration/e2e/write_batch_test.go b/integration/e2e/write_batch_test.go index 9a94421d507..cc21ec88467 100644 --- a/integration/e2e/write_batch_test.go +++ b/integration/e2e/write_batch_test.go @@ -201,6 +201,85 @@ var _ = Describe("Network", func() { RunInvoke(network, orderer, peer, "put-private-key", true, 3, 1, []string{"collection testchannel/mycc/col", "could not be found"}) }) }) + + DescribeTableSubtree("benchmark get multiple keys", func(desc string, useGetMultipleKeys bool) { + var network *nwo.Network + var ordererRunner *ginkgomon.Runner + var ordererProcess, peerProcess ifrit.Process + + BeforeEach(func() { + network = nwo.New(nwo.BasicEtcdRaft(), tempDir, client, StartPort(), components) + network.UseGetMultipleKeys = useGetMultipleKeys + + // Generate config and bootstrap the network + network.GenerateConfigTree() + network.Bootstrap() + + // Start all the fabric processes + ordererRunner, ordererProcess, peerProcess = network.StartSingleOrdererNetwork("orderer") + }) + + AfterEach(func() { + if ordererProcess != nil { + ordererProcess.Signal(syscall.SIGTERM) + Eventually(ordererProcess.Wait(), network.EventuallyTimeout).Should(Receive()) + } + + if peerProcess != nil { + peerProcess.Signal(syscall.SIGTERM) + Eventually(peerProcess.Wait(), network.EventuallyTimeout).Should(Receive()) + } + + network.Cleanup() + }) + + It("deploys and executes experiment bench", func() { + orderer := network.Orderer("orderer") + channelparticipation.JoinOrdererJoinPeersAppChannel(network, "testchannel", orderer, ordererRunner) + peer := network.Peer("Org1", "peer0") + + chaincode := nwo.Chaincode{ + Name: "mycc", + Version: "0.0", + Path: "github.com/hyperledger/fabric/integration/chaincode/multi/cmd", + Lang: "golang", + PackageFile: filepath.Join(tempDir, "multi.tar.gz"), + Ctor: `{"Args":["init"]}`, + SignaturePolicy: `AND ('Org1MSP.member','Org2MSP.member')`, + Sequence: "1", + InitRequired: true, + Label: "my_multi_operations_chaincode", + } + + network.VerifyMembership(network.PeersWithChannel("testchannel"), "testchannel") + + nwo.EnableCapabilities( + network, + "testchannel", + "Application", "V2_0", + orderer, + network.PeersWithChannel("testchannel")..., + ) + nwo.DeployChaincode(network, "testchannel", orderer, chaincode) + + RunInvoke(network, orderer, peer, "invoke", true, 10000, 0, nil) + + By("run query get state multiple keys") + experiment := gmeasure.NewExperiment("Get state multiple keys " + desc) + AddReportEntry(experiment.Name, experiment) + + experiment.SampleDuration("invoke N-10 cycle-1000", func(idx int) { + RunGetStateMultipleKeys(network, peer, 1000) + }, gmeasure.SamplingConfig{N: 10}) + + experiment.SampleDuration("invoke N-10 cycle-10000", func(idx int) { + RunGetStateMultipleKeys(network, peer, 10000) + }, gmeasure.SamplingConfig{N: 10}) + }) + }, + Entry("without peer support", "without peer support", false), + Entry("with peer support", "with peer support", true), + ) }) func RunInvoke(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, fn string, startWriteBatch bool, numberCallsPut int, exitCode int, expectedError []string) { @@ -237,3 +316,13 @@ func RunGetState(n *nwo.Network, peer *nwo.Peer, keyUniq string) { Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0)) Expect(sess).To(gbytes.Say("key" + keyUniq)) } + +func RunGetStateMultipleKeys(n *nwo.Network, peer *nwo.Peer, countKeys int) { + sess, err := n.PeerUserSession(peer, "User1", commands.ChaincodeQuery{ + ChannelID: "testchannel", + Name: "mycc", + Ctor: `{"Args":["get-multiple-keys","` + fmt.Sprint(countKeys) + `"]}`, + }) + Expect(err).NotTo(HaveOccurred()) + Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0)) +} diff --git a/integration/nwo/network.go b/integration/nwo/network.go index 06e5758f7e8..67440b2954b 100644 --- a/integration/nwo/network.go +++ b/integration/nwo/network.go @@ -161,6 +161,7 @@ type Network struct { OrdererReplicationPolicy string PeerDeliveryClientPolicy string UseWriteBatch bool + UseGetMultipleKeys bool PortsByOrdererID map[string]Ports PortsByPeerID map[string]Ports @@ -194,6 +195,7 @@ func New(c *Config, rootDir string, dockerClient *docker.Client, startPort int, PortsByPeerID: map[string]Ports{}, PeerDeliveryClientPolicy: "", UseWriteBatch: true, + UseGetMultipleKeys: true, Organizations: c.Organizations, Consensus: c.Consensus, diff --git a/integration/nwo/template/core_template.go b/integration/nwo/template/core_template.go index 8401e09d7e2..c2158c21872 100644 --- a/integration/nwo/template/core_template.go +++ b/integration/nwo/template/core_template.go @@ -190,6 +190,8 @@ chaincode: runtimeParams: useWriteBatch: {{ .UseWriteBatch }} maxSizeWriteBatch: 1000 + useGetMultipleKeys: {{ .UseGetMultipleKeys }} + maxSizeGetMultipleKeys: 1000 logging: level: info shim: warning diff --git a/internal/peer/node/start.go b/internal/peer/node/start.go index 28f345548a4..2e6e5276ea0 100644 --- a/internal/peer/node/start.go +++ b/internal/peer/node/start.go @@ -695,6 +695,8 @@ func serve(args []string) error { UserRunsCC: userRunsCC, UseWriteBatch: chaincodeConfig.UseWriteBatch, MaxSizeWriteBatch: chaincodeConfig.MaxSizeWriteBatch, + UseGetMultipleKeys: chaincodeConfig.UseGetMultipleKeys, + MaxSizeGetMultipleKeys: chaincodeConfig.MaxSizeGetMultipleKeys, } custodianLauncher := custodianLauncherAdapter{ diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index 378b78e0771..e1c032f0dc1 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -666,6 +666,10 @@ chaincode: useWriteBatch: true # MaxSizeWriteBatch maximum batch size for the change segment maxSizeWriteBatch: 1000 + # UseGetMultipleKeys an indication that the peer can handle get multiple keys + useGetMultipleKeys: true + # MaxSizeGetMultipleKeys maximum size of batches with get multiple keys + maxSizeGetMultipleKeys: 1000 ############################################################################### #