Skip to content

Commit

Permalink
[ISSUE #688] Add information about transaction Source on endTransacti…
Browse files Browse the repository at this point in the history
…on Request (#689)
  • Loading branch information
lollipopjin authored Feb 28, 2024
1 parent 1080874 commit 74f3aec
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.rocketmq.client.java.impl.producer;

import static com.google.common.base.Preconditions.checkNotNull;

import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.EndTransactionRequest;
Expand All @@ -29,6 +27,8 @@
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.TransactionSource;
import com.google.common.base.Preconditions;
import com.google.common.math.IntMath;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -163,7 +163,7 @@ public void onSuccess(TransactionResolution resolution) {
}
final GeneralMessage generalMessage = new GeneralMessageImpl(messageView);
endTransaction(endpoints, generalMessage, messageView.getMessageId(),
transactionId, resolution);
transactionId, resolution, TransactionSource.SOURCE_SERVER_CHECK);
} catch (Throwable t) {
log.error("Exception raised while ending the transaction, messageId={}, transactionId={}, "
+ "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
Expand Down Expand Up @@ -241,7 +241,7 @@ public CompletableFuture<SendReceipt> sendAsync(Message message) {
*/
@Override
public Transaction beginTransaction() {
checkNotNull(checker, "Transaction checker should not be null");
Preconditions.checkNotNull(checker, "Transaction checker should not be null");
if (!this.isRunning()) {
log.error("Unable to begin a transaction because producer is not running, state={}, clientId={}",
this.state(), clientId);
Expand All @@ -256,9 +256,11 @@ public void close() {
}

public void endTransaction(Endpoints endpoints, GeneralMessage generalMessage, MessageId messageId,
String transactionId, final TransactionResolution resolution) throws ClientException {
String transactionId, final TransactionResolution resolution, final TransactionSource transactionSource)
throws ClientException {
final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder()
.setMessageId(messageId.toString()).setTransactionId(transactionId)
.setSource(transactionSource)
.setTopic(apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(generalMessage.getTopic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.client.java.impl.producer;

import apache.rocketmq.v2.TransactionSource;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.IOException;
import java.util.HashSet;
Expand Down Expand Up @@ -94,8 +95,14 @@ public void commit() throws ClientException {
for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
final PublishingMessageImpl publishingMessage = entry.getKey();
final SendReceiptImpl sendReceipt = entry.getValue();
producerImpl.endTransaction(sendReceipt.getEndpoints(), new GeneralMessageImpl(publishingMessage),
sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT);
producerImpl.endTransaction(
sendReceipt.getEndpoints(),
new GeneralMessageImpl(publishingMessage),
sendReceipt.getMessageId(),
sendReceipt.getTransactionId(),
TransactionResolution.COMMIT,
TransactionSource.SOURCE_CLIENT
);
}
}

Expand All @@ -107,8 +114,14 @@ public void rollback() throws ClientException {
for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
final PublishingMessageImpl publishingMessage = entry.getKey();
final SendReceiptImpl sendReceipt = entry.getValue();
producerImpl.endTransaction(sendReceipt.getEndpoints(), new GeneralMessageImpl(publishingMessage),
sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK);
producerImpl.endTransaction(
sendReceipt.getEndpoints(),
new GeneralMessageImpl(publishingMessage),
sendReceipt.getMessageId(),
sendReceipt.getTransactionId(),
TransactionResolution.ROLLBACK,
TransactionSource.SOURCE_CLIENT
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;

import apache.rocketmq.v2.TransactionSource;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -115,7 +116,7 @@ public void testCommit() throws IOException, ClientException, ExecutionException
final SendReceiptImpl sendReceipt = fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
transaction.tryAddReceipt(publishingMessage, sendReceipt);
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), any(GeneralMessage.class),
any(MessageId.class), anyString(), any(TransactionResolution.class));
any(MessageId.class), anyString(), any(TransactionResolution.class), any(TransactionSource.class));
transaction.commit();
}

Expand All @@ -130,7 +131,7 @@ public void testRollback() throws IOException, ClientException, ExecutionExcepti
final SendReceiptImpl sendReceipt = fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
transaction.tryAddReceipt(publishingMessage, sendReceipt);
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), any(GeneralMessage.class),
any(MessageId.class), anyString(), any(TransactionResolution.class));
any(MessageId.class), anyString(), any(TransactionResolution.class), any(TransactionSource.class));
transaction.rollback();
}
}

0 comments on commit 74f3aec

Please sign in to comment.