Skip to content

Commit

Permalink
GH-2574: RabbitAdmin: synchronized to Lock
Browse files Browse the repository at this point in the history
Fixes: #2574

* Rework all the `synchronized` blocks in the `RabbitAdmin` to `Lock`
* Fix typos in Javadocs
* Some other code reformatting suggested by IDE
  • Loading branch information
artembilan committed Dec 16, 2023
1 parent 9fe0ef2 commit e012342
Showing 1 changed file with 54 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -129,12 +128,14 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Applicat

private final RabbitTemplate rabbitTemplate;

private final Object lifecycleMonitor = new Object();
private final Lock lifecycleLock = new ReentrantLock();

private final ConnectionFactory connectionFactory;

private final Set<Declarable> manualDeclarables = Collections.synchronizedSet(new LinkedHashSet<>());

private final Lock manualDeclarablesLock = new ReentrantLock();

private String beanName;

private RetryTemplate retryTemplate;
Expand Down Expand Up @@ -264,20 +265,19 @@ public boolean deleteExchange(final String exchangeName) {
}

private void removeExchangeBindings(final String exchangeName) {
synchronized (this.manualDeclarables) {
this.manualDeclarablesLock.lock();
try {
this.manualDeclarables.stream()
.filter(dec -> dec instanceof Exchange ex && ex.getName().equals(exchangeName))
.collect(Collectors.toSet())
.forEach(ex -> this.manualDeclarables.remove(ex));
Iterator<Declarable> iterator = this.manualDeclarables.iterator();
while (iterator.hasNext()) {
Declarable next = iterator.next();
if (next instanceof Binding binding &&
((!binding.isDestinationQueue() && binding.getDestination().equals(exchangeName))
|| binding.getExchange().equals(exchangeName))) {
iterator.remove();
}
}
.forEach(this.manualDeclarables::remove);
this.manualDeclarables.removeIf(next ->
next instanceof Binding binding
&& ((!binding.isDestinationQueue() && binding.getDestination().equals(exchangeName))
|| binding.getExchange().equals(exchangeName)));
}
finally {
this.manualDeclarablesLock.unlock();
}
}

Expand All @@ -295,8 +295,7 @@ private void removeExchangeBindings(final String exchangeName) {
* true.
*/
@Override
@ManagedOperation(description =
"Declare a queue on the broker (this operation is not available remotely)")
@ManagedOperation(description = "Declare a queue on the broker (this operation is not available remotely)")
@Nullable
public String declareQueue(final Queue queue) {
try {
Expand Down Expand Up @@ -364,19 +363,19 @@ public void deleteQueue(final String queueName, final boolean unused, final bool
}

private void removeQueueBindings(final String queueName) {
synchronized (this.manualDeclarables) {
this.manualDeclarablesLock.lock();
try {
this.manualDeclarables.stream()
.filter(dec -> dec instanceof Queue queue && queue.getName().equals(queueName))
.collect(Collectors.toSet())
.forEach(q -> this.manualDeclarables.remove(q));
Iterator<Declarable> iterator = this.manualDeclarables.iterator();
while (iterator.hasNext()) {
Declarable next = iterator.next();
if (next instanceof Binding binding &&
(binding.isDestinationQueue() && binding.getDestination().equals(queueName))) {
iterator.remove();
}
}
.forEach(this.manualDeclarables::remove);
this.manualDeclarables.removeIf(next ->
next instanceof Binding binding
&& (binding.isDestinationQueue()
&& binding.getDestination().equals(queueName)));
}
finally {
this.manualDeclarablesLock.unlock();
}
}

Expand Down Expand Up @@ -405,8 +404,7 @@ public int purgeQueue(final String queueName) {

// Binding
@Override
@ManagedOperation(description =
"Declare a binding on the broker (this operation is not available remotely)")
@ManagedOperation(description = "Declare a binding on the broker (this operation is not available remotely)")
public void declareBinding(final Binding binding) {
try {
this.rabbitTemplate.execute(channel -> {
Expand All @@ -423,8 +421,7 @@ public void declareBinding(final Binding binding) {
}

@Override
@ManagedOperation(description =
"Remove a binding from the broker (this operation is not available remotely)")
@ManagedOperation(description = "Remove a binding from the broker (this operation is not available remotely)")
public void removeBinding(final Binding binding) {
this.rabbitTemplate.execute(channel -> {
if (binding.isDestinationQueue()) {
Expand Down Expand Up @@ -520,9 +517,9 @@ public boolean isRedeclareManualDeclarations() {

/**
* Normally, when a connection is recovered, the admin only recovers auto-delete
* queues, etc, that are declared as beans in the application context. When this is
* queues, etc., that are declared as beans in the application context. When this is
* true, it will also redeclare any manually declared {@link Declarable}s via admin
* methods. When a queue or exhange is deleted, it will not longer be recovered, nor
* methods. When a queue or exchange is deleted, it will no longer be recovered, nor
* will any corresponding bindings.
* @param redeclareManualDeclarations true to redeclare.
* @since 2.4
Expand Down Expand Up @@ -583,8 +580,8 @@ public boolean isAutoStartup() {
*/
@Override
public void afterPropertiesSet() {

synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {

if (this.running || !this.autoStartup) {
return;
Expand Down Expand Up @@ -618,7 +615,7 @@ public void afterPropertiesSet() {
/*
* ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
* one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
* chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
* chatter). In fact, it might even be a good thing: exclusive queues only make sense if they are
* declared for every connection. If anyone has a problem with it: use auto-startup="false".
*/
if (this.retryTemplate != null) {
Expand All @@ -638,7 +635,9 @@ public void afterPropertiesSet() {
});

this.running = true;

}
finally {
this.lifecycleLock.unlock();
}
}

Expand All @@ -657,11 +656,11 @@ public void initialize() {
}

this.logger.debug("Initializing declarations");
Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
Collection<Exchange> contextExchanges = new LinkedList<>(
this.applicationContext.getBeansOfType(Exchange.class).values());
Collection<Queue> contextQueues = new LinkedList<Queue>(
Collection<Queue> contextQueues = new LinkedList<>(
this.applicationContext.getBeansOfType(Queue.class).values());
Collection<Binding> contextBindings = new LinkedList<Binding>(
Collection<Binding> contextBindings = new LinkedList<>(
this.applicationContext.getBeansOfType(Binding.class).values());
Collection<DeclarableCustomizer> customizers =
this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();
Expand All @@ -673,7 +672,7 @@ public void initialize() {
final Collection<Binding> bindings = filterDeclarables(contextBindings, customizers);

for (Exchange exchange : exchanges) {
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
+ exchange.getName()
+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
Expand All @@ -693,14 +692,14 @@ public void initialize() {
}
}

if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0 && this.manualDeclarables.size() == 0) {
if (exchanges.isEmpty() && queues.isEmpty() && bindings.isEmpty() && this.manualDeclarables.isEmpty()) {
this.logger.debug("Nothing to declare");
return;
}
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
declareExchanges(channel, exchanges.toArray(new Exchange[0]));
declareQueues(channel, queues.toArray(new Queue[0]));
declareBindings(channel, bindings.toArray(new Binding[0]));
return null;
});
this.logger.debug("Declarations finished");
Expand All @@ -711,8 +710,9 @@ public void initialize() {
* Process manual declarables.
*/
private void redeclareManualDeclarables() {
if (this.manualDeclarables.size() > 0) {
synchronized (this.manualDeclarables) {
if (!this.manualDeclarables.isEmpty()) {
this.manualDeclarablesLock.lock();
try {
this.logger.debug("Redeclaring manually declared Declarables");
for (Declarable dec : this.manualDeclarables) {
if (dec instanceof Queue queue) {
Expand All @@ -726,6 +726,9 @@ else if (dec instanceof Exchange exch) {
}
}
}
finally {
this.manualDeclarablesLock.unlock();
}
}

}
Expand Down Expand Up @@ -805,7 +808,7 @@ private <T extends Declarable> Collection<T> filterDeclarables(Collection<T> dec
customizers.forEach(cust -> ref.set((T) cust.apply(ref.get())));
return ref.get();
})
.collect(Collectors.toList());
.toList();
}

private <T extends Declarable> boolean declarableByMe(T dec) {
Expand All @@ -827,10 +830,10 @@ private void declareExchanges(final Channel channel, final Exchange... exchanges
if (exchange.isDelayed()) {
Map<String, Object> arguments = exchange.getArguments();
if (arguments == null) {
arguments = new HashMap<String, Object>();
arguments = new HashMap<>();
}
else {
arguments = new HashMap<String, Object>(arguments);
arguments = new HashMap<>(arguments);
}
arguments.put("x-delayed-type", exchange.getType());
channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(),
Expand All @@ -849,9 +852,8 @@ private void declareExchanges(final Channel channel, final Exchange... exchanges
}

private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException {
List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length);
for (int i = 0; i < queues.length; i++) {
Queue queue = queues[i];
List<DeclareOk> declareOks = new ArrayList<>(queues.length);
for (Queue queue : queues) {
if (!queue.getName().startsWith("amq.")) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("declaring Queue '" + queue.getName() + "'");
Expand All @@ -878,7 +880,7 @@ else if (this.logger.isDebugEnabled()) {
this.logger.debug(queue.getName() + ": Queue with name that starts with 'amq.' cannot be declared.");
}
}
return declareOks.toArray(new DeclareOk[declareOks.size()]);
return declareOks.toArray(new DeclareOk[0]);
}

private void closeChannelAfterIllegalArg(final Channel channel, Queue queue) {
Expand Down

0 comments on commit e012342

Please sign in to comment.