diff --git a/client.go b/client.go index f778baba..6452508a 100644 --- a/client.go +++ b/client.go @@ -305,6 +305,19 @@ func (clnt *Client) GetNodeNames() []string { // Write Record Operations //------------------------------------------------------- +// PutPayload writes the raw write/delete payload to the server. +// The policy specifies the transaction timeout. +// If the policy is nil, the default relevant policy will be used. +func (clnt *Client) PutPayload(policy *WritePolicy, key *Key, payload []byte) Error { + policy = clnt.getUsableWritePolicy(policy) + command, err := newWritePayloadCommand(clnt.cluster, policy, key, payload) + if err != nil { + return err + } + + return command.Execute() +} + // Put writes record bin(s) to the server. // The policy specifies the command timeout, record expiration and how the command is // handled when the record already exists. diff --git a/client_ifc.go b/client_ifc.go index 5f5b7a1c..e9297b35 100644 --- a/client_ifc.go +++ b/client_ifc.go @@ -60,6 +60,7 @@ type ClientIfc interface { Operate(policy *WritePolicy, key *Key, operations ...*Operation) (*Record, Error) Prepend(policy *WritePolicy, key *Key, binMap BinMap) Error PrependBins(policy *WritePolicy, key *Key, bins ...*Bin) Error + PutPayload(policy *WritePolicy, key *Key, payload []byte) Error Put(policy *WritePolicy, key *Key, binMap BinMap) Error PutBins(policy *WritePolicy, key *Key, bins ...*Bin) Error Query(policy *QueryPolicy, statement *Statement) (*Recordset, Error) diff --git a/client_test.go b/client_test.go index 3d6268ef..3d6659b6 100644 --- a/client_test.go +++ b/client_test.go @@ -279,6 +279,68 @@ var _ = gg.Describe("Aerospike", func() { gm.Expect(err).ToNot(gm.HaveOccurred()) }) + gg.Context("PutPayload operations", func() { + + gg.It("must put a record", func() { + key, err = as.NewKey(ns, set, 0) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + binMap := as.BinMap{ + "Aerospike": "value", + "Aerospike1": "value2", + } + + wcmd, err := as.NewWriteCommand(nil, wpolicy, key, nil, binMap) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + err = wcmd.WriteBuffer(&wcmd) + gm.Expect(err).ToNot(gm.HaveOccurred()) + payload := wcmd.Buffer() + + client.Delete(nil, key) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + err = client.PutPayload(nil, key, payload) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + rec, err := client.Get(nil, key) + gm.Expect(err).ToNot(gm.HaveOccurred()) + gm.Expect(rec.Bins).To(gm.Equal(binMap)) + }) + + gg.It("must delete a record", func() { + key, err = as.NewKey(ns, set, 0) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + binMap := as.BinMap{ + "Aerospike": "value", + "Aerospike1": "value2", + } + + err := client.Put(nil, key, binMap) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + exists, err := client.Exists(nil, key) + gm.Expect(err).ToNot(gm.HaveOccurred()) + gm.Expect(exists).To(gm.BeTrue()) + + dcmd, err := as.NewDeleteCommand(nil, wpolicy, key) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + err = dcmd.WriteBuffer(dcmd) + gm.Expect(err).ToNot(gm.HaveOccurred()) + payload := dcmd.Buffer() + + err = client.PutPayload(nil, key, payload) + gm.Expect(err).ToNot(gm.HaveOccurred()) + + exists, err = client.Exists(nil, key) + gm.Expect(err).ToNot(gm.HaveOccurred()) + gm.Expect(exists).To(gm.BeFalse()) + }) + + }) + gg.Context("Put operations", func() { gg.Context("Expiration values", func() { diff --git a/command.go b/command.go index 1eb7dc39..d24fa1ae 100644 --- a/command.go +++ b/command.go @@ -3523,27 +3523,30 @@ func (cmd *baseCommand) executeAt(ifc command, policy *BasePolicy, deadline time return err } - // Reset timeout in send buffer (destined for server) and socket. - binary.BigEndian.PutUint32(cmd.dataBuffer[22:], 0) - if !deadline.IsZero() { - serverTimeout := time.Until(deadline) - if serverTimeout < time.Millisecond { - serverTimeout = time.Millisecond + if _, rawPayload := ifc.(*writePayloadCommand); !rawPayload { + // Reset timeout in send buffer (destined for server) and socket. + binary.BigEndian.PutUint32(cmd.dataBuffer[22:], 0) + if !deadline.IsZero() { + serverTimeout := time.Until(deadline) + if serverTimeout < time.Millisecond { + serverTimeout = time.Millisecond + } + binary.BigEndian.PutUint32(cmd.dataBuffer[22:], uint32(serverTimeout/time.Millisecond)) } - binary.BigEndian.PutUint32(cmd.dataBuffer[22:], uint32(serverTimeout/time.Millisecond)) - } - // now that the deadline has been set in the buffer, compress the contents - if err = cmd.compress(); err != nil { - applyTransactionErrorMetrics(cmd.node) - return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node).setInDoubt(ifc.isRead(), cmd.commandSentCounter) - } + // now that the deadline has been set in the buffer, compress the contents + if err = cmd.compress(); err != nil { + applyTransactionErrorMetrics(cmd.node) + return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node).setInDoubt(ifc.isRead(), cmd.commandSentCounter) + } - // now that the deadline has been set in the buffer, compress the contents - if err = cmd.prepareBuffer(ifc, deadline); err != nil { - applyTransactionErrorMetrics(cmd.node) - applyTransactionMetrics(cmd.node, ifc.commandType(), transStart) - return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node) + // TODO: Redundant. Move everything to the prepareBuffer method and remove the branch on top of this block. + // // now that the deadline has been set in the buffer, compress the contents + // if err = cmd.prepareBuffer(ifc, deadline); err != nil { + // applyTransactionErrorMetrics(cmd.node) + // applyTransactionMetrics(cmd.node, ifc.transactionType(), transStart) + // return chainErrors(err, errChain).iter(cmd.commandSentCounter).setNode(cmd.node) + // } } // Send command. diff --git a/helper_test.go b/helper_test.go index b17bd228..663d05d2 100644 --- a/helper_test.go +++ b/helper_test.go @@ -62,3 +62,38 @@ func ConfiguredAsStrongConsistency(client *Client, namespace string) bool { return p.SCMode } + +func NewWriteCommand( + cluster *Cluster, + policy *WritePolicy, + key *Key, + bins []*Bin, + binMap BinMap) (writeCommand, Error) { + return newWriteCommand( + cluster, + policy, + key, + bins, + binMap, + _WRITE) +} + +func (cmd *writeCommand) WriteBuffer(ifc command) Error { + return cmd.writeBuffer(ifc) +} + +func (cmd *writeCommand) Buffer() []byte { + return cmd.dataBuffer[:cmd.dataOffset] +} + +func NewDeleteCommand(cluster *Cluster, policy *WritePolicy, key *Key) (*deleteCommand, Error) { + return newDeleteCommand(cluster, policy, key) +} + +func (cmd *deleteCommand) WriteBuffer(ifc command) Error { + return cmd.writeBuffer(ifc) +} + +func (cmd *deleteCommand) Buffer() []byte { + return cmd.dataBuffer[:cmd.dataOffset] +} diff --git a/write_payload_command.go b/write_payload_command.go new file mode 100644 index 00000000..64fd7251 --- /dev/null +++ b/write_payload_command.go @@ -0,0 +1,120 @@ +// Copyright 2014-2022 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aerospike + +import ( + "github.com/aerospike/aerospike-client-go/v7/types" + + Buffer "github.com/aerospike/aerospike-client-go/v7/utils/buffer" +) + +// guarantee writePayloadCommand implements command interface +var _ command = &writePayloadCommand{} + +type writePayloadCommand struct { + singleCommand + + policy *WritePolicy + payload []byte +} + +func newWritePayloadCommand( + cluster *Cluster, + policy *WritePolicy, + key *Key, + payload []byte, +) (writePayloadCommand, Error) { + + var partition *Partition + var err Error + if cluster != nil { + partition, err = PartitionForWrite(cluster, &policy.BasePolicy, key) + if err != nil { + return writePayloadCommand{}, err + } + } + + newWriteCmd := writePayloadCommand{ + singleCommand: newSingleCommand(cluster, key, partition), + policy: policy, + payload: payload, + } + + return newWriteCmd, nil +} + +func (cmd *writePayloadCommand) getPolicy(ifc command) Policy { + return cmd.policy +} + +func (cmd *writePayloadCommand) writeBuffer(ifc command) Error { + cmd.dataBuffer = cmd.payload + cmd.dataOffset = len(cmd.payload) + return nil +} + +func (cmd *writePayloadCommand) getNode(ifc command) (*Node, Error) { + return cmd.partition.GetNodeWrite(cmd.cluster) +} + +func (cmd *writePayloadCommand) prepareRetry(ifc command, isTimeout bool) bool { + cmd.partition.PrepareRetryWrite(isTimeout) + return true +} + +func (cmd *writePayloadCommand) parseResult(ifc command, conn *Connection) Error { + // make sure the payload is not put back in the buffer pool + defer func() { + cmd.dataBuffer = cmd.conn.origDataBuffer + cmd.dataOffset = 0 + }() + + // Read header. + if _, err := conn.Read(cmd.dataBuffer, int(_MSG_TOTAL_HEADER_SIZE)); err != nil { + return err + } + + header := Buffer.BytesToInt64(cmd.dataBuffer, 0) + + // Validate header to make sure we are at the beginning of a message + if err := cmd.validateHeader(header); err != nil { + return err + } + + resultCode := cmd.dataBuffer[13] & 0xFF + + if resultCode != 0 { + if resultCode == byte(types.KEY_NOT_FOUND_ERROR) { + return ErrKeyNotFound.err() + } else if types.ResultCode(resultCode) == types.FILTERED_OUT { + return ErrFilteredOut.err() + } + + return newCustomNodeError(cmd.node, types.ResultCode(resultCode)) + } + return cmd.emptySocket(conn) +} + +func (cmd *writePayloadCommand) isRead() bool { + return false +} + +func (cmd *writePayloadCommand) Execute() Error { + return cmd.execute(cmd) +} + +func (cmd *writePayloadCommand) commandType() commandType { + return ttPut +}