Skip to content

Commit

Permalink
GH-2576: BlockQueueConsumer: synchronized to Lock
Browse files Browse the repository at this point in the history
Fixes: #2576

* Rework all the `synchronized` blocks in the `BlockingQueueConsumer` to `Lock`
* Fix typos in Javadocs
* Some other code reformatting suggested by IDE
  • Loading branch information
artembilan committed Dec 16, 2023
1 parent 2fbe62f commit f4cb1a0
Showing 1 changed file with 49 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
Expand All @@ -36,6 +35,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.springframework.lang.Nullable;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.backoff.BackOffExecution;

Expand Down Expand Up @@ -96,7 +98,9 @@ public class BlockingQueueConsumer {

private static final int DEFAULT_RETRY_DECLARATION_INTERVAL = 60000;

private static Log logger = LogFactory.getLog(BlockingQueueConsumer.class);
private static final Log logger = LogFactory.getLog(BlockingQueueConsumer.class);

private final Lock lifecycleLock = new ReentrantLock();

private final BlockingQueue<Delivery> queue;

Expand Down Expand Up @@ -129,17 +133,19 @@ public class BlockingQueueConsumer {

private final ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter;

private final Map<String, Object> consumerArgs = new HashMap<String, Object>();
private final Map<String, Object> consumerArgs = new HashMap<>();

private final boolean noLocal;

private final boolean exclusive;

private final Set<Long> deliveryTags = new LinkedHashSet<Long>();
private final Set<Long> deliveryTags = new LinkedHashSet<>();

private final boolean defaultRequeueRejected;

private final Set<String> missingQueues = Collections.synchronizedSet(new HashSet<String>());
private final Set<String> missingQueues = ConcurrentHashMap.newKeySet();

private final Lock missingQueuesLock = new ReentrantLock();

private long retryDeclarationInterval = DEFAULT_RETRY_DECLARATION_INTERVAL;

Expand Down Expand Up @@ -214,6 +220,7 @@ public BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues) {

this(connectionFactory, messagePropertiesConverter, activeObjectCounter, acknowledgeMode, transactional,
prefetchCount, defaultRequeueRejected, null, queues);
}
Expand All @@ -237,6 +244,7 @@ public BlockingQueueConsumer(ConnectionFactory connectionFactory,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
@Nullable Map<String, Object> consumerArgs, String... queues) {

this(connectionFactory, messagePropertiesConverter, activeObjectCounter, acknowledgeMode, transactional,
prefetchCount, defaultRequeueRejected, consumerArgs, false, queues);
}
Expand All @@ -261,6 +269,7 @@ public BlockingQueueConsumer(ConnectionFactory connectionFactory,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
@Nullable Map<String, Object> consumerArgs, boolean exclusive, String... queues) {

this(connectionFactory, messagePropertiesConverter, activeObjectCounter, acknowledgeMode, transactional,
prefetchCount, defaultRequeueRejected, consumerArgs, false, exclusive, queues);
}
Expand All @@ -287,20 +296,21 @@ public BlockingQueueConsumer(ConnectionFactory connectionFactory,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
@Nullable Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {

this.connectionFactory = connectionFactory;
this.messagePropertiesConverter = messagePropertiesConverter;
this.activeObjectCounter = activeObjectCounter;
this.acknowledgeMode = acknowledgeMode;
this.transactional = transactional;
this.prefetchCount = prefetchCount;
this.defaultRequeueRejected = defaultRequeueRejected;
if (consumerArgs != null && consumerArgs.size() > 0) {
if (!CollectionUtils.isEmpty(consumerArgs)) {
this.consumerArgs.putAll(consumerArgs);
}
this.noLocal = noLocal;
this.exclusive = exclusive;
this.queues = Arrays.copyOf(queues, queues.length);
this.queue = new LinkedBlockingQueue<Delivery>(queues.length == 0 ? prefetchCount : prefetchCount * queues.length);
this.queue = new LinkedBlockingQueue<>(queues.length == 0 ? prefetchCount : prefetchCount * queues.length);
}

public Channel getChannel() {
Expand All @@ -309,8 +319,8 @@ public Channel getChannel() {

public Collection<String> getConsumerTags() {
return this.consumers.values().stream()
.map(c -> c.getConsumerTag())
.filter(tag -> tag != null)
.map(DefaultConsumer::getConsumerTag)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -489,7 +499,6 @@ private void checkShutdown() {
* shutdown. If delivery is null, we may be in shutdown mode. Check and see.
* @param delivery the delivered message contents.
* @return A message built from the contents.
* @throws InterruptedException if the thread is interrupted.
*/
@Nullable
private Message handle(@Nullable Delivery delivery) {
Expand Down Expand Up @@ -545,7 +554,7 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
logger.trace("Retrieving delivery for " + this);
}
checkShutdown();
if (this.missingQueues.size() > 0) {
if (!this.missingQueues.isEmpty()) {
checkMissingQueues();
}
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
Expand All @@ -562,7 +571,8 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
private void checkMissingQueues() {
long now = System.currentTimeMillis();
if (now - this.retryDeclarationInterval > this.lastRetryDeclaration) {
synchronized (this.missingQueues) {
this.missingQueuesLock.lock();
try {
Iterator<String> iterator = this.missingQueues.iterator();
while (iterator.hasNext()) {
boolean available = true;
Expand Down Expand Up @@ -598,6 +608,9 @@ private void checkMissingQueues() {
}
}
}
finally {
this.missingQueuesLock.unlock();
}
this.lastRetryDeclaration = now;
}
}
Expand Down Expand Up @@ -767,24 +780,30 @@ private void attemptPassiveDeclarations() {
}
}

public synchronized void stop() {
if (this.abortStarted == 0) { // signal handle delivery to use offer
this.abortStarted = System.currentTimeMillis();
}
if (!this.cancelled()) {
try {
RabbitUtils.closeMessageConsumer(this.channel, getConsumerTags(), this.transactional);
public void stop() {
this.lifecycleLock.lock();
try {
if (this.abortStarted == 0) { // signal handle delivery to use offer
this.abortStarted = System.currentTimeMillis();
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error closing consumer " + this, e);
if (!this.cancelled()) {
try {
RabbitUtils.closeMessageConsumer(this.channel, getConsumerTags(), this.transactional);
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error closing consumer " + this, e);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("Closing Rabbit Channel: " + this.channel);
}
forceCloseAndClearQueue();
}
if (logger.isDebugEnabled()) {
logger.debug("Closing Rabbit Channel: " + this.channel);
finally {
this.lifecycleLock.unlock();
}
forceCloseAndClearQueue();
}

public void forceCloseAndClearQueue() {
Expand Down Expand Up @@ -859,9 +878,8 @@ public void rollbackOnExceptionIfNecessary(Throwable ex, long tag) {
* Perform a commit or message acknowledgement, as appropriate.
* @param localTx Whether the channel is locally transacted.
* @return true if at least one delivery tag exists.
* @throws IOException Any IOException.
*/
public boolean commitIfNecessary(boolean localTx) throws IOException {
public boolean commitIfNecessary(boolean localTx) {
if (this.deliveryTags.isEmpty()) {
return false;
}
Expand Down Expand Up @@ -994,6 +1012,7 @@ public void handleCancelOk(String consumerTag) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) {

if (logger.isDebugEnabled()) {
logger.debug("Storing delivery for consumerTag: '"
+ consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "
Expand Down Expand Up @@ -1053,7 +1072,7 @@ private DeclarationException(Throwable t) {
super("Failed to declare queue(s):", t);
}

private final List<String> failedQueues = new ArrayList<String>();
private final List<String> failedQueues = new ArrayList<>();

private void addFailedQueue(String queue) {
this.failedQueues.add(queue);
Expand All @@ -1065,7 +1084,7 @@ private List<String> getFailedQueues() {

@Override
public String getMessage() {
return super.getMessage() + this.failedQueues.toString();
return super.getMessage() + this.failedQueues;
}

}
Expand Down

0 comments on commit f4cb1a0

Please sign in to comment.