Skip to content

Commit

Permalink
Removed unnecessary locking in the connection pool #leasr method that…
Browse files Browse the repository at this point in the history
… can lead to a deadlock in HttpClient connection management code
  • Loading branch information
ok2c committed Dec 18, 2023
1 parent 3efec35 commit c84a193
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
import static org.hamcrest.MatcherAssert.assertThat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
Expand All @@ -46,6 +52,7 @@
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.util.Timeout;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public abstract class ClassicHttpCoreTransportTest {
Expand Down Expand Up @@ -122,4 +129,51 @@ public void testSequentialRequestsNonPersistentConnection() throws Exception {
}
}

@Test
public void testMultiThreadedRequests() throws Exception {
final HttpServer server = serverStart();
final HttpRequester requester = clientStart();

final int c = 10;
final CountDownLatch latch = new CountDownLatch(c);
final AtomicLong n = new AtomicLong(c + 100);
final AtomicReference<AssertionError> exRef = new AtomicReference<>();
final ExecutorService executorService = Executors.newFixedThreadPool(c);
try {
final HttpHost target = new HttpHost(scheme.id, "localhost", server.getLocalPort());
for (int i = 0; i < c; i++) {
executorService.execute(() -> {
try {
while (n.decrementAndGet() > 0) {
try {
final HttpCoreContext context = HttpCoreContext.create();
final ClassicHttpRequest request1 = new BasicClassicHttpRequest(Method.POST, "/stuff");
request1.setEntity(new StringEntity("some stuff", ContentType.TEXT_PLAIN));
requester.execute(target, request1, TIMEOUT, context, response -> {
Assertions.assertEquals(HttpStatus.SC_OK, response.getCode());
Assertions.assertEquals("some stuff", EntityUtils.toString(response.getEntity()));
return null;
});
} catch (final Exception ex) {
Assertions.fail(ex);
}
}
} catch (final AssertionError ex) {
exRef.compareAndSet(null, ex);
} finally {
latch.countDown();
}
});
}
Assertions.assertTrue(latch.await(5, TimeUnit.MINUTES));
} finally {
executorService.shutdownNow();
}

final AssertionError assertionError = exRef.get();
if (assertionError != null) {
throw assertionError;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -468,14 +468,11 @@ public Future<PoolEntry<T, C>> lease(
@Override
public PoolEntry<T, C> get(
final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
lock.lock();
try {
return super.get(timeout, unit);
} catch (final TimeoutException ex) {
cancel();
throw ex;
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,11 @@ public Future<PoolEntry<T, C>> lease(
@Override
public PoolEntry<T, C> get(
final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
lock.lock();
try {
return super.get(timeout, unit);
} catch (final TimeoutException ex) {
cancel();
throw ex;
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@
package org.apache.hc.core5.pool;

import java.util.Collections;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.io.CloseMode;
Expand Down Expand Up @@ -100,6 +106,51 @@ public void testLeaseRelease() throws Exception {
}
}

@Test
public void testLeaseReleaseMultiThreaded() throws Exception {
try (final LaxConnPool<String, HttpConnection> pool = new LaxConnPool<>(2)) {

final int c = 10;
final CountDownLatch latch = new CountDownLatch(c);
final AtomicInteger n = new AtomicInteger(c + 100);
final AtomicReference<AssertionError> exRef = new AtomicReference<>();

final ExecutorService executorService = Executors.newFixedThreadPool(c);
try {
final Random rnd = new Random();
for (int i = 0; i < c; i++) {
executorService.execute(() -> {
try {
while (n.decrementAndGet() > 0) {
try {
final Future<PoolEntry<String, HttpConnection>> future = pool.lease("somehost", null);
final PoolEntry<String, HttpConnection> poolEntry = future.get(1, TimeUnit.MINUTES);
Thread.sleep(rnd.nextInt(1));
pool.release(poolEntry, false);
} catch (final Exception ex) {
Assertions.fail(ex.getMessage(), ex);
}
}
} catch (final AssertionError ex) {
exRef.compareAndSet(null, ex);
} finally {
latch.countDown();
}
});
}

Assertions.assertTrue(latch.await(5, TimeUnit.MINUTES));
} finally {
executorService.shutdownNow();
}

final AssertionError assertionError = exRef.get();
if (assertionError != null) {
throw assertionError;
}
}
}

@Test
public void testLeaseInvalid() throws Exception {
try (final LaxConnPool<String, HttpConnection> pool = new LaxConnPool<>(2)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@
package org.apache.hc.core5.pool;

import java.util.Collections;
import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.io.CloseMode;
Expand Down Expand Up @@ -106,6 +111,51 @@ public void testLeaseRelease() throws Exception {
}
}

@Test
public void testLeaseReleaseMultiThreaded() throws Exception {
try (final StrictConnPool<String, HttpConnection> pool = new StrictConnPool<>(2, 10)) {

final int c = 10;
final CountDownLatch latch = new CountDownLatch(c);
final AtomicInteger n = new AtomicInteger(c + 100);
final AtomicReference<AssertionError> exRef = new AtomicReference<>();

final ExecutorService executorService = Executors.newFixedThreadPool(c);
try {
final Random rnd = new Random();
for (int i = 0; i < c; i++) {
executorService.execute(() -> {
try {
while (n.decrementAndGet() > 0) {
try {
final Future<PoolEntry<String, HttpConnection>> future = pool.lease("somehost", null);
final PoolEntry<String, HttpConnection> poolEntry = future.get(1, TimeUnit.MINUTES);
Thread.sleep(rnd.nextInt(1));
pool.release(poolEntry, false);
} catch (final Exception ex) {
Assertions.fail(ex.getMessage(), ex);
}
}
} catch (final AssertionError ex) {
exRef.compareAndSet(null, ex);
} finally {
latch.countDown();
}
});
}

Assertions.assertTrue(latch.await(5, TimeUnit.MINUTES));
} finally {
executorService.shutdownNow();
}

final AssertionError assertionError = exRef.get();
if (assertionError != null) {
throw assertionError;
}
}
}

@Test
public void testLeaseInvalid() throws Exception {
try (final StrictConnPool<String, HttpConnection> pool = new StrictConnPool<>(2, 10)) {
Expand Down

0 comments on commit c84a193

Please sign in to comment.