From 9de9f9e363bdbb1664967b584d439e97814c948c Mon Sep 17 00:00:00 2001 From: Khosrow Afroozeh Date: Sun, 1 Dec 2024 22:33:48 +0100 Subject: [PATCH] [CLIENT-3159] Support writing raw payload to the server --- client.go | 13 +++++ client_ifc.go | 1 + client_test.go | 62 ++++++++++++++++++++ command.go | 39 +++++++------ helper_test.go | 35 ++++++++++++ write_payload_command.go | 120 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 252 insertions(+), 18 deletions(-) create mode 100644 write_payload_command.go diff --git a/client.go b/client.go index 60062cbb..4077b7fe 100644 --- a/client.go +++ b/client.go @@ -278,6 +278,19 @@ func (clnt *Client) GetNodeNames() []string { // Write Record Operations //------------------------------------------------------- +// PutPayload writes the raw write/delete payload to the server. +// The policy specifies the transaction timeout. +// If the policy is nil, the default relevant policy will be used. +func (clnt *Client) PutPayload(policy *WritePolicy, key *Key, payload []byte) Error { + policy = clnt.getUsableWritePolicy(policy) + command, err := newWritePayloadCommand(clnt.cluster, policy, key, payload) + if err != nil { + return err + } + + return command.Execute() +} + // Put writes record bin(s) to the server. // The policy specifies the transaction timeout, record expiration and how the transaction is // handled when the record already exists. diff --git a/client_ifc.go b/client_ifc.go index 7cfe2a1a..a96202ba 100644 --- a/client_ifc.go +++ b/client_ifc.go @@ -60,6 +60,7 @@ type ClientIfc interface { Operate(policy *WritePolicy, key *Key, operations ...*Operation) (*Record, Error) Prepend(policy *WritePolicy, key *Key, binMap BinMap) Error PrependBins(policy *WritePolicy, key *Key, bins ...*Bin) Error + PutPayload(policy *WritePolicy, key *Key, payload []byte) Error Put(policy *WritePolicy, key *Key, binMap BinMap) Error PutBins(policy *WritePolicy, key *Key, bins ...*Bin) Error Query(policy *QueryPolicy, statement *Statement) (*Recordset, Error) diff --git a/client_test.go b/client_test.go index 3b04ba79..8c7917fd 100644 --- a/client_test.go +++ b/client_test.go @@ -285,6 +285,68 @@ var _ = gg.Describe("Aerospike", func() { gm.Expect(err).ToNot(gm.HaveOccurred()) }) + gg.Context("PutPayload operations", func() { + + gg.It("must put a record", func() { + key, err = as.NewKey(ns, set, 0) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + binMap := as.BinMap{ + "Aerospike": "value", + "Aerospike1": "value2", + } + + wcmd, err := as.NewWriteCommand(nil, wpolicy, key, nil, binMap) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + err = wcmd.WriteBuffer(&wcmd) + gm.Expect(err).ToNot(gm.HaveOccurred()) + payload := wcmd.Buffer() + + client.Delete(nil, key) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + err = client.PutPayload(nil, key, payload) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + rec, err := client.Get(nil, key) + gm.Expect(err).ToNot(gm.HaveOccurred()) + gm.Expect(rec.Bins).To(gm.Equal(binMap)) + }) + + gg.It("must delete a record", func() { + key, err = as.NewKey(ns, set, 0) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + binMap := as.BinMap{ + "Aerospike": "value", + "Aerospike1": "value2", + } + + err := client.Put(nil, key, binMap) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + exists, err := client.Exists(nil, key) + gm.Expect(err).ToNot(gm.HaveOccurred()) + gm.Expect(exists).To(gm.BeTrue()) + + dcmd, err := as.NewDeleteCommand(nil, wpolicy, key) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + err = dcmd.WriteBuffer(dcmd) + gm.Expect(err).ToNot(gm.HaveOccurred()) + payload := dcmd.Buffer() + + err = client.PutPayload(nil, key, payload) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + exists, err = client.Exists(nil, key) + gm.Expect(err).ToNot(gm.HaveOccurred()) + gm.Expect(exists).To(gm.BeFalse()) + }) + + }) + gg.Context("Put operations", func() { gg.Context("Expiration values", func() { diff --git a/command.go b/command.go index 3fa0e7c8..6684195e 100644 --- a/command.go +++ b/command.go @@ -2702,27 +2702,30 @@ func (cmd *baseCommand) executeAt(ifc command, policy *BasePolicy, deadline time return err } - // Reset timeout in send buffer (destined for server) and socket. - binary.BigEndian.PutUint32(cmd.dataBuffer[22:], 0) - if !deadline.IsZero() { - serverTimeout := time.Until(deadline) - if serverTimeout < time.Millisecond { - serverTimeout = time.Millisecond + if _, rawPayload := ifc.(*writePayloadCommand); !rawPayload { + // Reset timeout in send buffer (destined for server) and socket. + binary.BigEndian.PutUint32(cmd.dataBuffer[22:], 0) + if !deadline.IsZero() { + serverTimeout := time.Until(deadline) + if serverTimeout < time.Millisecond { + serverTimeout = time.Millisecond + } + binary.BigEndian.PutUint32(cmd.dataBuffer[22:], uint32(serverTimeout/time.Millisecond)) } - binary.BigEndian.PutUint32(cmd.dataBuffer[22:], uint32(serverTimeout/time.Millisecond)) - } - // now that the deadline has been set in the buffer, compress the contents - if err = cmd.compress(); err != nil { - applyTransactionErrorMetrics(cmd.node) - return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node).setInDoubt(ifc.isRead(), cmd.commandSentCounter) - } + // now that the deadline has been set in the buffer, compress the contents + if err = cmd.compress(); err != nil { + applyTransactionErrorMetrics(cmd.node) + return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node).setInDoubt(ifc.isRead(), cmd.commandSentCounter) + } - // now that the deadline has been set in the buffer, compress the contents - if err = cmd.prepareBuffer(ifc, deadline); err != nil { - applyTransactionErrorMetrics(cmd.node) - applyTransactionMetrics(cmd.node, ifc.transactionType(), transStart) - return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node) + // TODO: Redundant. Move everything to the prepareBuffer method and remove the branch on top of this block. + // // now that the deadline has been set in the buffer, compress the contents + // if err = cmd.prepareBuffer(ifc, deadline); err != nil { + // applyTransactionErrorMetrics(cmd.node) + // applyTransactionMetrics(cmd.node, ifc.transactionType(), transStart) + // return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node) + // } } // Send command. diff --git a/helper_test.go b/helper_test.go index 30af0ba1..f2a196e3 100644 --- a/helper_test.go +++ b/helper_test.go @@ -50,3 +50,38 @@ func (nd *Node) ConnsCount() int { func (nd *Node) CloseConnections() { nd.closeConnections() } + +func NewWriteCommand( + cluster *Cluster, + policy *WritePolicy, + key *Key, + bins []*Bin, + binMap BinMap) (writeCommand, Error) { + return newWriteCommand( + cluster, + policy, + key, + bins, + binMap, + _WRITE) +} + +func (cmd *writeCommand) WriteBuffer(ifc command) Error { + return cmd.writeBuffer(ifc) +} + +func (cmd *writeCommand) Buffer() []byte { + return cmd.dataBuffer[:cmd.dataOffset] +} + +func NewDeleteCommand(cluster *Cluster, policy *WritePolicy, key *Key) (*deleteCommand, Error) { + return newDeleteCommand(cluster, policy, key) +} + +func (cmd *deleteCommand) WriteBuffer(ifc command) Error { + return cmd.writeBuffer(ifc) +} + +func (cmd *deleteCommand) Buffer() []byte { + return cmd.dataBuffer[:cmd.dataOffset] +} diff --git a/write_payload_command.go b/write_payload_command.go new file mode 100644 index 00000000..6061a2dd --- /dev/null +++ b/write_payload_command.go @@ -0,0 +1,120 @@ +// Copyright 2014-2022 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aerospike + +import ( + "github.com/aerospike/aerospike-client-go/v7/types" + + Buffer "github.com/aerospike/aerospike-client-go/v7/utils/buffer" +) + +// guarantee writePayloadCommand implements command interface +var _ command = &writePayloadCommand{} + +type writePayloadCommand struct { + singleCommand + + policy *WritePolicy + payload []byte +} + +func newWritePayloadCommand( + cluster *Cluster, + policy *WritePolicy, + key *Key, + payload []byte, +) (writePayloadCommand, Error) { + + var partition *Partition + var err Error + if cluster != nil { + partition, err = PartitionForWrite(cluster, &policy.BasePolicy, key) + if err != nil { + return writePayloadCommand{}, err + } + } + + newWriteCmd := writePayloadCommand{ + singleCommand: newSingleCommand(cluster, key, partition), + policy: policy, + payload: payload, + } + + return newWriteCmd, nil +} + +func (cmd *writePayloadCommand) getPolicy(ifc command) Policy { + return cmd.policy +} + +func (cmd *writePayloadCommand) writeBuffer(ifc command) Error { + cmd.dataBuffer = cmd.payload + cmd.dataOffset = len(cmd.payload) + return nil +} + +func (cmd *writePayloadCommand) getNode(ifc command) (*Node, Error) { + return cmd.partition.GetNodeWrite(cmd.cluster) +} + +func (cmd *writePayloadCommand) prepareRetry(ifc command, isTimeout bool) bool { + cmd.partition.PrepareRetryWrite(isTimeout) + return true +} + +func (cmd *writePayloadCommand) parseResult(ifc command, conn *Connection) Error { + // make sure the payload is not put back in the buffer pool + defer func() { + cmd.dataBuffer = cmd.conn.origDataBuffer + cmd.dataOffset = 0 + }() + + // Read header. + if _, err := conn.Read(cmd.dataBuffer, int(_MSG_TOTAL_HEADER_SIZE)); err != nil { + return err + } + + header := Buffer.BytesToInt64(cmd.dataBuffer, 0) + + // Validate header to make sure we are at the beginning of a message + if err := cmd.validateHeader(header); err != nil { + return err + } + + resultCode := cmd.dataBuffer[13] & 0xFF + + if resultCode != 0 { + if resultCode == byte(types.KEY_NOT_FOUND_ERROR) { + return ErrKeyNotFound.err() + } else if types.ResultCode(resultCode) == types.FILTERED_OUT { + return ErrFilteredOut.err() + } + + return newCustomNodeError(cmd.node, types.ResultCode(resultCode)) + } + return cmd.emptySocket(conn) +} + +func (cmd *writePayloadCommand) isRead() bool { + return false +} + +func (cmd *writePayloadCommand) Execute() Error { + return cmd.execute(cmd) +} + +func (cmd *writePayloadCommand) transactionType() transactionType { + return ttPut +}